diff --git a/comm/core.go b/comm/core.go index 1888e5496..f04f79f4d 100644 --- a/comm/core.go +++ b/comm/core.go @@ -14,6 +14,11 @@ const ( SC_ServiceGateRouteComp core.S_Comps = "SC_GateRouteComp" //s_comps.ISC_GateRouteComp ) +const ( + Service_Gateway = "gateway" + Service_Worker = "worker" +) + //模块名定义处 const ( SM_GateModule core.M_Modules = "gateway" //gate模块 网关服务模块 diff --git a/comm/usersession.go b/comm/usersession.go index 58c382607..da6f9dd95 100644 --- a/comm/usersession.go +++ b/comm/usersession.go @@ -61,7 +61,7 @@ func (this *UserSession) IsLogin() bool { //绑定uid 登录后操作 func (this *UserSession) Build(uid string) (err error) { reply := &pb.RPCMessageReply{} - if err := this.service.RpcCallById(this.GatewayServiceId, string(Rpc_GatewayAgentBuild), context.Background(), &pb.AgentBuildReq{ + if err := this.service.RpcCallById(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId),, string(Rpc_GatewayAgentBuild), context.Background(), &pb.AgentBuildReq{ UserSessionId: this.SessionId, UserId: uid, }, reply); err != nil { @@ -73,7 +73,7 @@ func (this *UserSession) Build(uid string) (err error) { //解绑uid 注销和切换账号是处理 func (this *UserSession) UnBuild(ServiceMethod string, msg proto.Message) (err error) { reply := &pb.RPCMessageReply{} - if err := this.service.RpcCallById(this.GatewayServiceId, string(Rpc_GatewayAgentUnBuild), context.Background(), &pb.AgentUnBuildReq{ + if err := this.service.RpcCallById(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId),, string(Rpc_GatewayAgentUnBuild), context.Background(), &pb.AgentUnBuildReq{ UserSessionId: this.SessionId, }, reply); err != nil { log.Errorf("UserSession:%s UserId:%s UnBuild err:%v", this.SessionId, this.UserId, err) @@ -86,7 +86,7 @@ func (this *UserSession) SendMsg(mainType, subType string, code pb.ErrorCode, ms reply := &pb.RPCMessageReply{} data, _ := proto.Marshal(msg) log.Debugf("SendMsg to SessionId:[%s] UserId:[%s] Code:%d Data: %v", this.UserId, code, msg) - if err := this.service.RpcCallById(this.GatewayServiceId, string(Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentSendMessageReq{ + if err := this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentSendMsg), &pb.AgentSendMessageReq{ UserSessionId: this.SessionId, MainType: mainType, SubType: subType, @@ -101,7 +101,7 @@ func (this *UserSession) SendMsg(mainType, subType string, code pb.ErrorCode, ms //关闭用户连接对象 func (this *UserSession) Close() (err error) { reply := &pb.RPCMessageReply{} - if err := this.service.RpcCallById(this.GatewayServiceId, string(Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentCloseeReq{ + if err := this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentSendMsg), &pb.AgentCloseeReq{ UserSessionId: this.SessionId, }, reply); err != nil { log.Errorf("UserSession:%s UserId:%d Close:%s err:%v", this.SessionId, this.UserId, err) diff --git a/lego/base/core.go b/lego/base/core.go index efc0baff7..372f62a16 100644 --- a/lego/base/core.go +++ b/lego/base/core.go @@ -74,12 +74,9 @@ type IRPCXServiceSession interface { type IRPCXService interface { IClusterServiceBase - DefauleRpcRouteRules(stype string, sip string) (ss IRPCXServiceSession, err error) //默认rpc路由规则 Register(rcvr interface{}) (err error) RegisterFunction(fn interface{}) (err error) RegisterFunctionName(name string, fn interface{}) (err error) - RpcCallById(sId string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (err error) - RpcGoById(sId string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (call *client.Call, err error) - RpcCallByType(sType string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (err error) - RpcGoByType(sType string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (call *client.Call, err error) + RpcCall(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) + RpcGo(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (call *client.Call, err error) } diff --git a/modules/gateway/agent.go b/modules/gateway/agent.go index f970d7ec0..ab2f39e10 100644 --- a/modules/gateway/agent.go +++ b/modules/gateway/agent.go @@ -143,7 +143,7 @@ func (this *Agent) Close() { func (this *Agent) messageDistribution(msg *pb.UserMessage) { reply := &pb.RPCMessageReply{} log.Debugf("agent:%s uId:%s MessageDistribution msg:%s.%s", this.sessionId, this.uId, msg.MainType, msg.SubType) - if err := this.gateway.Service().RpcCallByType("worker", string(comm.Rpc_GatewayRoute), context.Background(), &pb.AgentMessage{ + if err := this.gateway.Service().RpcCall(context.Background(), comm.Service_Worker, string(comm.Rpc_GatewayRoute), &pb.AgentMessage{ Ip: this.IP(), UserSessionId: this.sessionId, UserId: this.uId, diff --git a/modules/modulebase.go b/modules/modulebase.go index 5d5a48e39..f26ea5b53 100644 --- a/modules/modulebase.go +++ b/modules/modulebase.go @@ -2,6 +2,7 @@ package modules import ( "context" + "fmt" "go_dreamfactory/comm" "go_dreamfactory/pb" @@ -32,7 +33,7 @@ func (this *ModuleBase) Init(service core.IService, module core.IModule, options func (this *ModuleBase) SendMsgToUser(mainType, subType string, msg proto.Message, user *pb.Cache_UserData) (err error) { reply := &pb.RPCMessageReply{} data, _ := proto.Marshal(msg) - if _, err = this.service.RpcGoById(user.GatewayServiceId, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentSendMessageReq{ + if _, err = this.service.RpcGo(context.Background(), fmt.Sprintf("%s/%s", comm.Service_Gateway, user.GatewayServiceId), string(comm.Rpc_GatewayAgentSendMsg), &pb.AgentSendMessageReq{ UserSessionId: user.SessionId, MainType: mainType, SubType: subType, @@ -60,7 +61,7 @@ func (this *ModuleBase) SendMsgToUsers(mainType, subType string, msg proto.Messa reply := &pb.RPCMessageReply{} data, _ := proto.Marshal(msg) for k, v := range gateways { - if _, err = this.service.RpcGoById(k, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.BatchMessageReq{ + if _, err = this.service.RpcGo(context.Background(), fmt.Sprintf("%s/%s", comm.Service_Gateway, k), string(comm.Rpc_GatewayAgentSendMsg), &pb.BatchMessageReq{ UserSessionIds: v, MainType: mainType, SubType: subType,