From dec719d2f29fbad4a961688c0a0298e7e6b30d31 Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Tue, 12 Sep 2023 19:10:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=99=BB=E5=BD=95=E6=B5=81?= =?UTF-8?q?=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/gateway/agent.go | 65 +++++++++++++++++++++--------- modules/gateway/agentmgr_comp.go | 50 +++++++++++------------ modules/gateway/core.go | 1 + modules/gateway/module.go | 15 ++++++- modules/wtask/api_eventcomplete.go | 8 ---- 5 files changed, 87 insertions(+), 52 deletions(-) diff --git a/modules/gateway/agent.go b/modules/gateway/agent.go index 19cb5b2b2..0ccebd631 100644 --- a/modules/gateway/agent.go +++ b/modules/gateway/agent.go @@ -98,15 +98,46 @@ locp: if errdata == nil { // this.gateway.Debugf("----------2 agent:%s uId:%s MainType:%s SubType:%s ", this.sessionId, this.uId, msg.MainType, msg.SubType) if msg.MainType == string(comm.ModuleUser) && msg.SubType == "login" { //登录排队 - if this.queueIndex, err = this.gateway.InLoginQueue(this.sessionId, msg); err != nil { - this.gateway.Errorf("messageDistribution err:%v", err) + if this.uId == "" { + if this.queueIndex, err = this.gateway.InLoginQueue(this.sessionId, msg); err != nil { + this.gateway.Errorf("messageDistribution err:%v", err) + data, _ := anypb.New(&pb.NotifyErrorNotifyPush{ + MsgId: msg.MsgId, + ReqMainType: msg.MainType, + ReqSubType: msg.SubType, + Arg: msg.Data, + Code: pb.ErrorCode_GatewayException, + Err: &pb.ErrorData{Code: pb.ErrorCode_GatewayException, Title: pb.ErrorCode_GatewayException.String(), Datastring: err.Error()}, + }) + err = this.WriteMsg(&pb.UserMessage{ + MsgId: msg.MsgId, + MainType: comm.MainTypeNotify, + SubType: comm.SubTypeErrorNotify, + Data: data, + }) + go this.Close() + break locp + } else { + if this.queueIndex > 0 { + this.lastpushtime = time.Now() + data, _ := anypb.New(&pb.UserLoginQueueChangePush{ + Index: this.queueIndex, + }) + err = this.WriteMsg(&pb.UserMessage{ + MainType: string(comm.ModuleUser), + SubType: "loginqueuechange", + Data: data, + }) + } + } + } else { data, _ := anypb.New(&pb.NotifyErrorNotifyPush{ MsgId: msg.MsgId, ReqMainType: msg.MainType, ReqSubType: msg.SubType, Arg: msg.Data, - Code: pb.ErrorCode_GatewayException, - Err: &pb.ErrorData{Title: "用户消息处理失败!", Datastring: err.Error()}, + Code: pb.ErrorCode_ReqParameterError, + Err: &pb.ErrorData{Code: pb.ErrorCode_ReqParameterError, Title: "Repeat login!", Datastring: err.Error()}, }) err = this.WriteMsg(&pb.UserMessage{ MsgId: msg.MsgId, @@ -114,20 +145,6 @@ locp: SubType: comm.SubTypeErrorNotify, Data: data, }) - go this.Close() - break locp - } else { - if this.queueIndex > 0 { - this.lastpushtime = time.Now() - data, _ := anypb.New(&pb.UserLoginQueueChangePush{ - Index: this.queueIndex, - }) - err = this.WriteMsg(&pb.UserMessage{ - MainType: string(comm.ModuleUser), - SubType: "loginqueuechange", - Data: data, - }) - } } continue } @@ -487,6 +504,18 @@ func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) { for _, v := range reply.Reply { if v.MainType == msg.MainType && v.SubType == msg.SubType { v.MsgId = msg.MsgId + if msg.MainType == string(comm.ModuleUser) && msg.SubType == "login" { //登录回包 + var ( + resp proto.Message + loginresp *pb.UserLoginResp + ) + if resp, err = v.Data.UnmarshalNew(); err != nil { + return + } + loginresp = resp.(*pb.UserLoginResp) + this.uId = loginresp.Data.Uid + this.gateway.LoginNotice(this) + } } } if err = this.WriteMsgs(reply.Reply); err != nil { diff --git a/modules/gateway/agentmgr_comp.go b/modules/gateway/agentmgr_comp.go index c605ab620..58717a9a0 100644 --- a/modules/gateway/agentmgr_comp.go +++ b/modules/gateway/agentmgr_comp.go @@ -99,31 +99,31 @@ func (this *AgentMgrComp) DisConnect(a IAgent) { } // Bind 用户绑定Id -func (this *AgentMgrComp) Bind(ctx context.Context, args *pb.AgentBuildReq, reply *pb.RPCMessageReply) error { - if a, ok := this.agents.Load(args.UserSessionId); ok { - agent := a.(IAgent) - agent.Bind(args.UserId, args.WorkerId) - // if this.options.SpanServiceTag != "" { //跨服集群配置存在 推送通知过去 - //推送跨服集群处理 - if _, err := this.service.AcrossClusterRpcGo(context.Background(), db.CrossTag(), comm.Service_Worker, string(comm.Rpc_GatewayNoticeUserLogin), &pb.NoticeUserLoginReq{ - Ip: agent.IP(), - ServiceTag: this.service.GetTag(), - GatewayServiceId: this.service.GetId(), - UserSessionId: agent.SessionId(), - UserId: agent.UserId(), - }, nil); err != nil { - log.Errorf("uId:%s clusterTag:%s Rpc_NoticeUserLogin err:%v", agent.UserId(), db.CrossTag(), err) - } - // } - } else { - reply.ErrorData = &pb.ErrorData{ - Code: pb.ErrorCode_UserSessionNobeing, - Title: pb.ErrorCode_UserSessionNobeing.ToString(), - Message: fmt.Sprintf("绑定uid:%s失败!", args.WorkerId), - } - } - return nil -} +// func (this *AgentMgrComp) Bind(ctx context.Context, args *pb.AgentBuildReq, reply *pb.RPCMessageReply) error { +// if a, ok := this.agents.Load(args.UserSessionId); ok { +// agent := a.(IAgent) +// agent.Bind(args.UserId, args.WorkerId) +// // if this.options.SpanServiceTag != "" { //跨服集群配置存在 推送通知过去 +// //推送跨服集群处理 +// if _, err := this.service.AcrossClusterRpcGo(context.Background(), db.CrossTag(), comm.Service_Worker, string(comm.Rpc_GatewayNoticeUserLogin), &pb.NoticeUserLoginReq{ +// Ip: agent.IP(), +// ServiceTag: this.service.GetTag(), +// GatewayServiceId: this.service.GetId(), +// UserSessionId: agent.SessionId(), +// UserId: agent.UserId(), +// }, nil); err != nil { +// log.Errorf("uId:%s clusterTag:%s Rpc_NoticeUserLogin err:%v", agent.UserId(), db.CrossTag(), err) +// } +// // } +// } else { +// reply.ErrorData = &pb.ErrorData{ +// Code: pb.ErrorCode_UserSessionNobeing, +// Title: pb.ErrorCode_UserSessionNobeing.ToString(), +// Message: fmt.Sprintf("绑定uid:%s失败!", args.WorkerId), +// } +// } +// return nil +// } // UnBind 用户解绑Id func (this *AgentMgrComp) UnBind(ctx context.Context, args *pb.AgentUnBuildReq, reply *pb.RPCMessageReply) error { diff --git a/modules/gateway/core.go b/modules/gateway/core.go index 2848e4877..91282626f 100644 --- a/modules/gateway/core.go +++ b/modules/gateway/core.go @@ -31,6 +31,7 @@ type ( Service() base.IRPCXService Connect(a IAgent) DisConnect(a IAgent) + LoginNotice(a IAgent) GetMsgDistribute(msgmid, msguid string) (rule string, ok bool) InLoginQueue(sessionId string, login *pb.UserMessage) (index int32, err error) } diff --git a/modules/gateway/module.go b/modules/gateway/module.go index 3db824fa7..5e2b9b7a8 100644 --- a/modules/gateway/module.go +++ b/modules/gateway/module.go @@ -5,6 +5,7 @@ import ( "fmt" "go_dreamfactory/comm" "go_dreamfactory/pb" + "go_dreamfactory/sys/db" "go_dreamfactory/lego/base" "go_dreamfactory/lego/core" @@ -62,7 +63,7 @@ func (this *Gateway) Start() (err error) { _name2Func := map[string]any{ // 注册用户绑定uid接口 登录成功后触发 - string(comm.Rpc_GatewayAgentBind): this.agentMgr.Bind, + // string(comm.Rpc_GatewayAgentBind): this.agentMgr.Bind, // 注册用户解绑uid接口 登出或则切换账号是触发 string(comm.Rpc_GatewayAgentUnBind): this.agentMgr.UnBind, // 向用户发送消息接口 @@ -109,6 +110,18 @@ func (this *Gateway) DisConnect(a IAgent) { this.agentMgr.DisConnect(a) } +func (this *Gateway) LoginNotice(a IAgent) { + if _, err := this.service.AcrossClusterRpcGo(context.Background(), db.CrossTag(), comm.Service_Worker, string(comm.Rpc_GatewayNoticeUserLogin), &pb.NoticeUserLoginReq{ + Ip: a.IP(), + ServiceTag: this.service.GetTag(), + GatewayServiceId: this.service.GetId(), + UserSessionId: a.SessionId(), + UserId: a.UserId(), + }, nil); err != nil { + log.Errorf("uId:%s clusterTag:%s Rpc_NoticeUserLogin err:%v", a.UserId(), db.CrossTag(), err) + } +} + // GetMsgDistribute 读取消息分发规则 func (this *Gateway) GetMsgDistribute(msgmid, msguid string) (rule string, ok bool) { return this.configure.GetMsgDistribute(msgmid, msguid) diff --git a/modules/wtask/api_eventcomplete.go b/modules/wtask/api_eventcomplete.go index 5c9d4542a..23aa96e29 100644 --- a/modules/wtask/api_eventcomplete.go +++ b/modules/wtask/api_eventcomplete.go @@ -73,14 +73,6 @@ func (this *apiComp) EventComplete(session comm.IUserSession, req *pb.WTaskEvent if errdata, award = this.module.DispenseAtno(session, conf.Getitem, true); err != nil { return } - // award = make([]*pb.UserAssets, 0) - // for _, v := range conf.Getitem { - // award = append(award, &pb.UserAssets{ - // A: v.A, - // T: v.T, - // N: v.N, - // }) - // } } delete(info.Events, req.Group)