diff --git a/modules/arena/api_challengereward.go b/modules/arena/api_challengereward.go index 5fd46ab2f..fabc3f7f5 100644 --- a/modules/arena/api_challengereward.go +++ b/modules/arena/api_challengereward.go @@ -76,8 +76,7 @@ func (this *apiComp) ChallengeReward(session comm.IUserSession, req *pb.ArenaCha redstate = pb.BattleRecordState_AttackWin bulestate = pb.BattleRecordState_DefendLost } - // this.module.ModuleRtask.SendToRtask(session, comm.Rtype131, 1) - go this.module.ModuleBuried.TriggerBuried(session.Clone(), comm.GetBuriedParam(comm.Rtype131, 1)) + tasks = append(tasks, comm.GetBuriedParam(comm.Rtype131, 1)) } else { info.Streak = 0 if req.Revengeid != "" { diff --git a/modules/caravan/api_buyorsell.go b/modules/caravan/api_buyorsell.go index 4dbf5ecb5..b0eeb4888 100644 --- a/modules/caravan/api_buyorsell.go +++ b/modules/caravan/api_buyorsell.go @@ -251,8 +251,11 @@ func (this *apiComp) BuyOrSell(session comm.IUserSession, req *pb.CaravanBuyOrSe // Rtype211 TaskType = 211 // 向指定X城市,贩卖价值X虚拟币以上的对应城市急需货物 tasks = append(tasks, comm.GetBuriedParam(comm.Rtype211, req.City, sellSpValue)) } - go this.module.AsynHandleSession(session.Clone(), func(session comm.IUserSession) { - this.module.ModuleBuried.TriggerBuried(session, tasks...) - }) + if len(tasks) > 0 { + go this.module.AsynHandleSession(session.Clone(), func(session comm.IUserSession) { + this.module.ModuleBuried.TriggerBuried(session, tasks...) + }) + } + return } diff --git a/modules/gateway/agent.go b/modules/gateway/agent.go index 19cb5b2b2..0ccebd631 100644 --- a/modules/gateway/agent.go +++ b/modules/gateway/agent.go @@ -98,15 +98,46 @@ locp: if errdata == nil { // this.gateway.Debugf("----------2 agent:%s uId:%s MainType:%s SubType:%s ", this.sessionId, this.uId, msg.MainType, msg.SubType) if msg.MainType == string(comm.ModuleUser) && msg.SubType == "login" { //登录排队 - if this.queueIndex, err = this.gateway.InLoginQueue(this.sessionId, msg); err != nil { - this.gateway.Errorf("messageDistribution err:%v", err) + if this.uId == "" { + if this.queueIndex, err = this.gateway.InLoginQueue(this.sessionId, msg); err != nil { + this.gateway.Errorf("messageDistribution err:%v", err) + data, _ := anypb.New(&pb.NotifyErrorNotifyPush{ + MsgId: msg.MsgId, + ReqMainType: msg.MainType, + ReqSubType: msg.SubType, + Arg: msg.Data, + Code: pb.ErrorCode_GatewayException, + Err: &pb.ErrorData{Code: pb.ErrorCode_GatewayException, Title: pb.ErrorCode_GatewayException.String(), Datastring: err.Error()}, + }) + err = this.WriteMsg(&pb.UserMessage{ + MsgId: msg.MsgId, + MainType: comm.MainTypeNotify, + SubType: comm.SubTypeErrorNotify, + Data: data, + }) + go this.Close() + break locp + } else { + if this.queueIndex > 0 { + this.lastpushtime = time.Now() + data, _ := anypb.New(&pb.UserLoginQueueChangePush{ + Index: this.queueIndex, + }) + err = this.WriteMsg(&pb.UserMessage{ + MainType: string(comm.ModuleUser), + SubType: "loginqueuechange", + Data: data, + }) + } + } + } else { data, _ := anypb.New(&pb.NotifyErrorNotifyPush{ MsgId: msg.MsgId, ReqMainType: msg.MainType, ReqSubType: msg.SubType, Arg: msg.Data, - Code: pb.ErrorCode_GatewayException, - Err: &pb.ErrorData{Title: "用户消息处理失败!", Datastring: err.Error()}, + Code: pb.ErrorCode_ReqParameterError, + Err: &pb.ErrorData{Code: pb.ErrorCode_ReqParameterError, Title: "Repeat login!", Datastring: err.Error()}, }) err = this.WriteMsg(&pb.UserMessage{ MsgId: msg.MsgId, @@ -114,20 +145,6 @@ locp: SubType: comm.SubTypeErrorNotify, Data: data, }) - go this.Close() - break locp - } else { - if this.queueIndex > 0 { - this.lastpushtime = time.Now() - data, _ := anypb.New(&pb.UserLoginQueueChangePush{ - Index: this.queueIndex, - }) - err = this.WriteMsg(&pb.UserMessage{ - MainType: string(comm.ModuleUser), - SubType: "loginqueuechange", - Data: data, - }) - } } continue } @@ -487,6 +504,18 @@ func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) { for _, v := range reply.Reply { if v.MainType == msg.MainType && v.SubType == msg.SubType { v.MsgId = msg.MsgId + if msg.MainType == string(comm.ModuleUser) && msg.SubType == "login" { //登录回包 + var ( + resp proto.Message + loginresp *pb.UserLoginResp + ) + if resp, err = v.Data.UnmarshalNew(); err != nil { + return + } + loginresp = resp.(*pb.UserLoginResp) + this.uId = loginresp.Data.Uid + this.gateway.LoginNotice(this) + } } } if err = this.WriteMsgs(reply.Reply); err != nil { diff --git a/modules/gateway/agentmgr_comp.go b/modules/gateway/agentmgr_comp.go index c605ab620..58717a9a0 100644 --- a/modules/gateway/agentmgr_comp.go +++ b/modules/gateway/agentmgr_comp.go @@ -99,31 +99,31 @@ func (this *AgentMgrComp) DisConnect(a IAgent) { } // Bind 用户绑定Id -func (this *AgentMgrComp) Bind(ctx context.Context, args *pb.AgentBuildReq, reply *pb.RPCMessageReply) error { - if a, ok := this.agents.Load(args.UserSessionId); ok { - agent := a.(IAgent) - agent.Bind(args.UserId, args.WorkerId) - // if this.options.SpanServiceTag != "" { //跨服集群配置存在 推送通知过去 - //推送跨服集群处理 - if _, err := this.service.AcrossClusterRpcGo(context.Background(), db.CrossTag(), comm.Service_Worker, string(comm.Rpc_GatewayNoticeUserLogin), &pb.NoticeUserLoginReq{ - Ip: agent.IP(), - ServiceTag: this.service.GetTag(), - GatewayServiceId: this.service.GetId(), - UserSessionId: agent.SessionId(), - UserId: agent.UserId(), - }, nil); err != nil { - log.Errorf("uId:%s clusterTag:%s Rpc_NoticeUserLogin err:%v", agent.UserId(), db.CrossTag(), err) - } - // } - } else { - reply.ErrorData = &pb.ErrorData{ - Code: pb.ErrorCode_UserSessionNobeing, - Title: pb.ErrorCode_UserSessionNobeing.ToString(), - Message: fmt.Sprintf("绑定uid:%s失败!", args.WorkerId), - } - } - return nil -} +// func (this *AgentMgrComp) Bind(ctx context.Context, args *pb.AgentBuildReq, reply *pb.RPCMessageReply) error { +// if a, ok := this.agents.Load(args.UserSessionId); ok { +// agent := a.(IAgent) +// agent.Bind(args.UserId, args.WorkerId) +// // if this.options.SpanServiceTag != "" { //跨服集群配置存在 推送通知过去 +// //推送跨服集群处理 +// if _, err := this.service.AcrossClusterRpcGo(context.Background(), db.CrossTag(), comm.Service_Worker, string(comm.Rpc_GatewayNoticeUserLogin), &pb.NoticeUserLoginReq{ +// Ip: agent.IP(), +// ServiceTag: this.service.GetTag(), +// GatewayServiceId: this.service.GetId(), +// UserSessionId: agent.SessionId(), +// UserId: agent.UserId(), +// }, nil); err != nil { +// log.Errorf("uId:%s clusterTag:%s Rpc_NoticeUserLogin err:%v", agent.UserId(), db.CrossTag(), err) +// } +// // } +// } else { +// reply.ErrorData = &pb.ErrorData{ +// Code: pb.ErrorCode_UserSessionNobeing, +// Title: pb.ErrorCode_UserSessionNobeing.ToString(), +// Message: fmt.Sprintf("绑定uid:%s失败!", args.WorkerId), +// } +// } +// return nil +// } // UnBind 用户解绑Id func (this *AgentMgrComp) UnBind(ctx context.Context, args *pb.AgentUnBuildReq, reply *pb.RPCMessageReply) error { diff --git a/modules/gateway/core.go b/modules/gateway/core.go index 2848e4877..91282626f 100644 --- a/modules/gateway/core.go +++ b/modules/gateway/core.go @@ -31,6 +31,7 @@ type ( Service() base.IRPCXService Connect(a IAgent) DisConnect(a IAgent) + LoginNotice(a IAgent) GetMsgDistribute(msgmid, msguid string) (rule string, ok bool) InLoginQueue(sessionId string, login *pb.UserMessage) (index int32, err error) } diff --git a/modules/gateway/module.go b/modules/gateway/module.go index 3db824fa7..5e2b9b7a8 100644 --- a/modules/gateway/module.go +++ b/modules/gateway/module.go @@ -5,6 +5,7 @@ import ( "fmt" "go_dreamfactory/comm" "go_dreamfactory/pb" + "go_dreamfactory/sys/db" "go_dreamfactory/lego/base" "go_dreamfactory/lego/core" @@ -62,7 +63,7 @@ func (this *Gateway) Start() (err error) { _name2Func := map[string]any{ // 注册用户绑定uid接口 登录成功后触发 - string(comm.Rpc_GatewayAgentBind): this.agentMgr.Bind, + // string(comm.Rpc_GatewayAgentBind): this.agentMgr.Bind, // 注册用户解绑uid接口 登出或则切换账号是触发 string(comm.Rpc_GatewayAgentUnBind): this.agentMgr.UnBind, // 向用户发送消息接口 @@ -109,6 +110,18 @@ func (this *Gateway) DisConnect(a IAgent) { this.agentMgr.DisConnect(a) } +func (this *Gateway) LoginNotice(a IAgent) { + if _, err := this.service.AcrossClusterRpcGo(context.Background(), db.CrossTag(), comm.Service_Worker, string(comm.Rpc_GatewayNoticeUserLogin), &pb.NoticeUserLoginReq{ + Ip: a.IP(), + ServiceTag: this.service.GetTag(), + GatewayServiceId: this.service.GetId(), + UserSessionId: a.SessionId(), + UserId: a.UserId(), + }, nil); err != nil { + log.Errorf("uId:%s clusterTag:%s Rpc_NoticeUserLogin err:%v", a.UserId(), db.CrossTag(), err) + } +} + // GetMsgDistribute 读取消息分发规则 func (this *Gateway) GetMsgDistribute(msgmid, msguid string) (rule string, ok bool) { return this.configure.GetMsgDistribute(msgmid, msguid) diff --git a/modules/moonfantasy/modelDream.go b/modules/moonfantasy/modelDream.go index 8ef7db52b..d10e1265c 100644 --- a/modules/moonfantasy/modelDream.go +++ b/modules/moonfantasy/modelDream.go @@ -134,7 +134,6 @@ func (this *modelDreamComp) trigger(session comm.IUserSession) { } this.module.modelDream.noticeuserfriend(session, mdata.Id, chat) session.SendMsg(string(this.module.GetType()), "trigger", &pb.MoonfantasyTriggerPush{Issucc: true, Mid: mdata.Id, Monster: mdata.Monster}) - // this.module.ModuleRtask.SendToRtask(session, comm.Rtype87, 1) go this.module.AsynHandleSession(session.Clone(), func(session comm.IUserSession) { this.module.ModuleBuried.TriggerBuried(session, comm.GetBuriedParam(comm.Rtype87, 1)) }) diff --git a/modules/wtask/api_eventcomplete.go b/modules/wtask/api_eventcomplete.go index 85a0500d8..23aa96e29 100644 --- a/modules/wtask/api_eventcomplete.go +++ b/modules/wtask/api_eventcomplete.go @@ -73,14 +73,6 @@ func (this *apiComp) EventComplete(session comm.IUserSession, req *pb.WTaskEvent if errdata, award = this.module.DispenseAtno(session, conf.Getitem, true); err != nil { return } - // award = make([]*pb.UserAssets, 0) - // for _, v := range conf.Getitem { - // award = append(award, &pb.UserAssets{ - // A: v.A, - // T: v.T, - // N: v.N, - // }) - // } } delete(info.Events, req.Group) @@ -98,10 +90,10 @@ func (this *apiComp) EventComplete(session comm.IUserSession, req *pb.WTaskEvent tasks = append(tasks, comm.GetBuriedParam(comm.Rtype206, 1)) tasks = append(tasks, comm.GetBuriedParam(comm.Rtype207, 1, req.Group)) + session.SendMsg(string(this.module.GetType()), "eventcomplete", &pb.WTaskEventCompleteResp{Group: req.Group, Event: event, Award: award}) + go this.module.AsynHandleSession(session.Clone(), func(session comm.IUserSession) { this.module.ModuleBuried.TriggerBuried(session, tasks...) }) - - session.SendMsg(string(this.module.GetType()), "eventcomplete", &pb.WTaskEventCompleteResp{Group: req.Group, Event: event, Award: award}) return }