From 5b419e3ef60871b193cb11e06f80ccad7e506ec3 Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Mon, 13 Jun 2022 10:09:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0rpcx=E5=BA=95=E5=B1=82?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=8F=91=E7=8E=B0=E6=9C=BA=E5=88=B6=E4=BC=98?= =?UTF-8?q?=E5=8C=96=EF=BC=8C=E4=BB=A5=E5=8F=8A=E8=87=AA=E5=AE=9A=E4=B9=89?= =?UTF-8?q?=E9=80=89=E6=8B=A9=E5=99=A8=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- comm/core.go | 4 ++-- comm/usersession.go | 20 +++++++++++--------- lego/sys/rpcx/client.go | 4 ++-- lego/sys/rpcx/core.go | 12 ++++++++++-- lego/sys/rpcx/options.go | 15 +++++++++++---- modules/gate_comp.go | 16 ++++++++-------- modules/user/login_comp.go | 4 ++-- 7 files changed, 46 insertions(+), 29 deletions(-) diff --git a/comm/core.go b/comm/core.go index f04f79f4d..6bc46b22a 100644 --- a/comm/core.go +++ b/comm/core.go @@ -60,8 +60,8 @@ type IUserSession interface { GetIP() string GetGatewayServiceId() string IsLogin() bool - Build(uid string) (err error) - UnBuild(ServiceMethod string, msg proto.Message) (err error) + Bind(uid string, wokerId string) (err error) + UnBind() (err error) SendMsg(mainType, subType string, code pb.ErrorCode, msg proto.Message) (err error) Close() (err error) ToString() string diff --git a/comm/usersession.go b/comm/usersession.go index da6f9dd95..02a70e262 100644 --- a/comm/usersession.go +++ b/comm/usersession.go @@ -58,25 +58,27 @@ func (this *UserSession) IsLogin() bool { return this.UserId != "" } -//绑定uid 登录后操作 -func (this *UserSession) Build(uid string) (err error) { +///绑定uid 登录后操作 +///uid 用户id +///wokerId 用户绑定worker服务id +func (this *UserSession) Bind(uid string, wokerId string) (err error) { reply := &pb.RPCMessageReply{} - if err := this.service.RpcCallById(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId),, string(Rpc_GatewayAgentBuild), context.Background(), &pb.AgentBuildReq{ + if err := this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentBuild), &pb.AgentBuildReq{ UserSessionId: this.SessionId, UserId: uid, }, reply); err != nil { - log.Errorf("UserSession:%s UserId:%s Build:%s err:%v", this.SessionId, this.UserId, err) + log.Errorf("Bind UserSession:%s UserId:%s err:%v", this.SessionId, this.UserId, err) } return } //解绑uid 注销和切换账号是处理 -func (this *UserSession) UnBuild(ServiceMethod string, msg proto.Message) (err error) { +func (this *UserSession) UnBind() (err error) { reply := &pb.RPCMessageReply{} - if err := this.service.RpcCallById(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId),, string(Rpc_GatewayAgentUnBuild), context.Background(), &pb.AgentUnBuildReq{ + if err := this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentUnBuild), &pb.AgentUnBuildReq{ UserSessionId: this.SessionId, }, reply); err != nil { - log.Errorf("UserSession:%s UserId:%s UnBuild err:%v", this.SessionId, this.UserId, err) + log.Errorf("UnBuild UserSession:%s UserId:%s err:%v", this.SessionId, this.UserId, err) } return } @@ -93,7 +95,7 @@ func (this *UserSession) SendMsg(mainType, subType string, code pb.ErrorCode, ms Code: code, Data: data, }, reply); err != nil { - log.Errorf("UserSession:%s UserId:%s SendMsg:%s err:%v", this.SessionId, this.UserId, mainType, err) + log.Errorf("SendMsg:%s UserSession:%s UserId:%s err:%v", mainType, this.SessionId, this.UserId, err) } return } @@ -104,7 +106,7 @@ func (this *UserSession) Close() (err error) { if err := this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentSendMsg), &pb.AgentCloseeReq{ UserSessionId: this.SessionId, }, reply); err != nil { - log.Errorf("UserSession:%s UserId:%d Close:%s err:%v", this.SessionId, this.UserId, err) + log.Errorf("Close UserSession:%s UserId:%d err:%v", this.SessionId, this.UserId, err) } return } diff --git a/lego/sys/rpcx/client.go b/lego/sys/rpcx/client.go index cd0546fcc..440a2ea67 100644 --- a/lego/sys/rpcx/client.go +++ b/lego/sys/rpcx/client.go @@ -56,7 +56,7 @@ func (this *Client) Call(ctx context.Context, servicePath string, serviceMethod ) spath = strings.Split(servicePath, "/") if c, ok = this.clients[spath[0]]; !ok { - if d, err = client.NewConsulDiscovery(this.rpcx.options.ServiceTag, this.rpcx.options.ServiceType, this.rpcx.options.ConsulServers, nil); err != nil { + if d, err = client.NewConsulDiscovery(this.rpcx.options.ServiceTag, spath[0], this.rpcx.options.ConsulServers, nil); err != nil { return } c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption) @@ -87,7 +87,7 @@ func (this *Client) Go(ctx context.Context, servicePath string, serviceMethod st ) spath = strings.Split(servicePath, "/") if c, ok = this.clients[spath[0]]; !ok { - if d, err = client.NewConsulDiscovery(this.rpcx.options.ServiceTag, this.rpcx.options.ServiceType, this.rpcx.options.ConsulServers, nil); err != nil { + if d, err = client.NewConsulDiscovery(this.rpcx.options.ServiceTag, spath[0], this.rpcx.options.ConsulServers, nil); err != nil { return } c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption) diff --git a/lego/sys/rpcx/core.go b/lego/sys/rpcx/core.go index 3b2b812f1..cfd0a0d1c 100644 --- a/lego/sys/rpcx/core.go +++ b/lego/sys/rpcx/core.go @@ -41,12 +41,20 @@ var ( ) func OnInit(config map[string]interface{}, option ...Option) (err error) { - defsys, err = newSys(newOptions(config, option...)) + var options Options + if options, err = newOptions(config, option...); err != nil { + return + } + defsys, err = newSys(options) return } func NewSys(option ...Option) (sys ISys, err error) { - sys, err = newSys(newOptionsByOption(option...)) + var options Options + if options, err = newOptionsByOption(option...); err != nil { + return + } + sys, err = newSys(options) return } diff --git a/lego/sys/rpcx/options.go b/lego/sys/rpcx/options.go index cd2bd1ce5..675fdced9 100644 --- a/lego/sys/rpcx/options.go +++ b/lego/sys/rpcx/options.go @@ -1,6 +1,7 @@ package rpcx import ( + "errors" "go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/utils/mapstructure" ) @@ -73,7 +74,7 @@ func SetLog(v log.ILog) Option { } } -func newOptions(config map[string]interface{}, opts ...Option) Options { +func newOptions(config map[string]interface{}, opts ...Option) (Options, error) { options := Options{ Debug: true, Log: log.Clone(log.SetLoglayer(2)), @@ -84,10 +85,13 @@ func newOptions(config map[string]interface{}, opts ...Option) Options { for _, o := range opts { o(&options) } - return options + if len(options.ServiceTag) == 0 || len(options.ServiceType) == 0 || len(options.ServiceId) == 0 || len(options.ConsulServers) == 0 { + return options, errors.New("[Sys.RPCX] newOptions err: 启动参数异常") + } + return options, nil } -func newOptionsByOption(opts ...Option) Options { +func newOptionsByOption(opts ...Option) (Options, error) { options := Options{ Debug: true, Log: log.Clone(log.SetLoglayer(2)), @@ -95,5 +99,8 @@ func newOptionsByOption(opts ...Option) Options { for _, o := range opts { o(&options) } - return options + if len(options.ServiceTag) == 0 || len(options.ServiceType) == 0 || len(options.ServiceId) == 0 || len(options.ConsulServers) == 0 { + return options, errors.New("[Sys.RPCX] newOptions err: 启动参数异常") + } + return options, nil } diff --git a/modules/gate_comp.go b/modules/gate_comp.go index a00e645cc..c2cc0c4c7 100644 --- a/modules/gate_comp.go +++ b/modules/gate_comp.go @@ -27,17 +27,17 @@ var typeOfError = reflect.TypeOf((*error)(nil)).Elem() */ type MComp_GateComp struct { cbase.ModuleCompBase - service base.IRPCXService //rpc服务对象 - module core.IModule //当前业务模块 - comp core.IModuleComp //网关组件自己 - scomp comm.ISC_GateRouteComp + S base.IRPCXService //rpc服务对象 + M core.IModule //当前业务模块 + comp core.IModuleComp //网关组件自己 + scomp comm.ISC_GateRouteComp } //组件初始化接口 func (this *MComp_GateComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { this.ModuleCompBase.Init(service, module, comp, options) - this.service = service.(base.IRPCXService) - this.module = module + this.S = service.(base.IRPCXService) + this.M = module this.comp = comp return } @@ -49,7 +49,7 @@ func (this *MComp_GateComp) Start() (err error) { } var comp core.IServiceComp //注册远程路由 - if comp, err = this.service.GetComp(comm.SC_ServiceGateRouteComp); err != nil { + if comp, err = this.S.GetComp(comm.SC_ServiceGateRouteComp); err != nil { return } this.scomp = comp.(comm.ISC_GateRouteComp) @@ -96,7 +96,7 @@ func (this *MComp_GateComp) reflectionRouteHandle(method reflect.Method) bool { if returnType := mtype.Out(0); returnType != typeOfError { return false } - this.scomp.RegisterRoute(fmt.Sprintf("%s.%s", this.module.GetType(), strings.ToLower(mname)), reflect.ValueOf(this.comp), replyType, method) + this.scomp.RegisterRoute(fmt.Sprintf("%s.%s", this.M.GetType(), strings.ToLower(mname)), reflect.ValueOf(this.comp), replyType, method) return true } diff --git a/modules/user/login_comp.go b/modules/user/login_comp.go index 66f82e951..9a382e2c7 100644 --- a/modules/user/login_comp.go +++ b/modules/user/login_comp.go @@ -68,9 +68,9 @@ func (this *LoginComp) Login(ctx context.Context, session comm.IUserSession, req log.Errorf("User_CreateUser err %v", err) return err } - session.Build(user.UserId) + session.Bind(user.UserId, this.S.GetId()) } else { - session.Build(db_user.UserId) + session.Bind(db_user.UserId, this.S.GetId()) } cache_user := &pb.Cache_UserData{