package gateway import ( "context" "fmt" "go_dreamfactory/comm" "go_dreamfactory/pb" "go_dreamfactory/sys/db" "sync" "go_dreamfactory/lego/base" "go_dreamfactory/lego/core" "go_dreamfactory/lego/core/cbase" "go_dreamfactory/lego/sys/log" "google.golang.org/protobuf/proto" ) /* 用户代理对象管理组件 */ type AgentMgrComp struct { cbase.ModuleCompBase options *Options service base.IRPCXService module *Gateway agents *sync.Map users *sync.Map } func (this *AgentMgrComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { err = this.ModuleCompBase.Init(service, module, comp, options) this.options = options.(*Options) this.service = service.(base.IRPCXService) this.module = module.(*Gateway) this.agents = new(sync.Map) this.users = new(sync.Map) return } func (this *AgentMgrComp) Start() (err error) { err = this.ModuleCompBase.Start() return } func (this *AgentMgrComp) getAgent(sid string) (agent IAgent) { var ( a any ok bool ) if a, ok = this.agents.Load(sid); !ok { return } agent = a.(IAgent) return } // Connect 加入新的用户 func (this *AgentMgrComp) Connect(a IAgent) { this.agents.Store(a.SessionId(), a) } // DisConnect 移除断开的用户 func (this *AgentMgrComp) DisConnect(a IAgent) { if a.(IAgent).UserId() != "" { this.users.Delete(a.(IAgent).UserId()) } this.agents.Delete(a.SessionId()) if a.UserId() != "" { //登录用户 通知业务服务处理玩家离线相关 if _, err := this.service.RpcGo(context.Background(), fmt.Sprintf("%s/%s", comm.Service_Worker, a.WorkerId()), string(comm.Rpc_GatewayNoticeUserClose), &pb.NoticeUserCloseReq{ Ip: a.IP(), ServiceTag: this.service.GetTag(), GatewayServiceId: this.service.GetId(), UserSessionId: a.SessionId(), UserId: a.UserId(), }, nil); err != nil { log.Errorf("uId:%s Rpc_NoticeUserClose err:%v", a.UserId(), err) } //通知运维服务器 处理用户离线数据同步 if _, err := this.service.RpcGo(context.Background(), comm.Service_Mainte, string(comm.Rpc_GatewayNoticeUserClose), &pb.NoticeUserCloseReq{ Ip: a.IP(), ServiceTag: this.service.GetTag(), GatewayServiceId: this.service.GetId(), UserSessionId: a.SessionId(), UserId: a.UserId(), }, nil); err != nil { log.Errorf("uId:%s Rpc_NoticeUserClose err:%v", a.UserId(), err) } // if this.options.SpanServiceTag != "" { //推送跨服集群处理 if _, err := this.service.AcrossClusterRpcGo(context.Background(), db.CrossTag(), fmt.Sprintf("%s/%s", comm.Service_Worker, a.CrosssWorkerId()), string(comm.Rpc_GatewayNoticeUserClose), &pb.NoticeUserCloseReq{ Ip: a.IP(), ServiceTag: this.service.GetTag(), GatewayServiceId: this.service.GetId(), UserSessionId: a.SessionId(), UserId: a.UserId(), }, nil); err != nil { log.Errorf("uId:%s Rpc_NoticeUserClose err:%v", a.UserId(), err) } if _, err := this.service.AcrossClusterRpcGo(context.Background(), db.CrossTag(), comm.Service_Mainte, string(comm.Rpc_GatewayNoticeUserClose), &pb.NoticeUserCloseReq{ Ip: a.IP(), ServiceTag: this.service.GetTag(), GatewayServiceId: this.service.GetId(), UserSessionId: a.SessionId(), UserId: a.UserId(), }, nil); err != nil { log.Errorf("uId:%s Rpc_NoticeUserClose err:%v", a.UserId(), err) } // } } } func (this *AgentMgrComp) Logined(a IAgent) { this.users.Store(a.UserId(), a) } // UnBind 用户解绑Id func (this *AgentMgrComp) UnBind(ctx context.Context, args *pb.AgentUnBuildReq, reply *pb.RPCMessageReply) error { if a, ok := this.agents.Load(args.UserSessionId); ok { this.users.Delete(a.(IAgent).UserId()) a.(IAgent).UnBind() } else { reply.ErrorData = &pb.ErrorData{ Code: pb.ErrorCode_UserSessionNobeing, Title: pb.ErrorCode_UserSessionNobeing.ToString(), Message: fmt.Sprintf("解绑SessionId:%s失败!", args.UserSessionId), } } return nil } // SendMsgToAgent 向用户发送消息 func (this *AgentMgrComp) SendMsgToAgent(ctx context.Context, args *pb.AgentSendMessageReq, reply *pb.RPCMessageReply) error { this.module.Debugf("SendMsgToAgent: agent:%s msg:%v", args.UserSessionId, args.Reply) if a, ok := this.agents.Load(args.UserSessionId); ok { for _, v := range args.Reply { a.(IAgent).WriteMsg(v) } } else { reply.ErrorData = &pb.ErrorData{ Code: pb.ErrorCode_UserSessionNobeing, Title: pb.ErrorCode_UserSessionNobeing.ToString(), Message: fmt.Sprintf("解绑SessionId:%s失败!", args.UserSessionId), } } return nil } // SendMsgToAgents 向多个户发送消息 func (this *AgentMgrComp) SendMsgToAgents(ctx context.Context, args *pb.BatchMessageReq, reply *pb.RPCMessageReply) (err error) { var ( data []byte ) msg := &pb.UserMessage{ MainType: args.MainType, SubType: args.SubType, Data: args.Data, } this.module.Debugf("SendMsgToAgents: agents:%v msg:%v", args.UserSessionIds, msg) if data, err = proto.Marshal(msg); err != nil { return } for _, v := range args.UserSessionIds { if a, ok := this.agents.Load(v); ok { agent := a.(IAgent) if agent.UserId() != "" { //自发送登录用户 if err = agent.WriteBytes(data); err != nil { this.module.Errorln(err) } } } } return nil } // SendMsgToAllAgent 向所有户发送消息 func (this *AgentMgrComp) SendMsgToAllAgent(ctx context.Context, args *pb.BroadCastMessageReq, reply *pb.RPCMessageReply) (err error) { var ( data []byte ) msg := &pb.UserMessage{ MainType: args.MainType, SubType: args.SubType, Data: args.Data, } this.module.Debugf("SendMsgToAllAgent: msg:%v", msg) if data, err = proto.Marshal(msg); err != nil { return } this.agents.Range(func(key, value any) bool { agent := value.(IAgent) if agent.UserId() != "" { //只发送登录用户 agent.WriteBytes(data) } return true }) return } // SendMsgToAllAgent 向所有户发送消息 func (this *AgentMgrComp) SendMsgToUsers(ctx context.Context, args *pb.BatchUsersMessageReq, reply *pb.RPCMessageReply) (err error) { var ( data []byte ) msg := &pb.UserMessage{ MainType: args.MainType, SubType: args.SubType, Data: args.Data, } this.module.Debugf("SendMsgToAgents: agents:%v msg:%v", args.Uids, msg) if data, err = proto.Marshal(msg); err != nil { return } for _, v := range args.Uids { if a, ok := this.users.Load(v); ok { agent := a.(IAgent) if err = agent.WriteBytes(data); err != nil { this.module.Errorln(err) } } } return nil } // CloseAgent 关闭某个用户 func (this *AgentMgrComp) CloseAgent(ctx context.Context, args *pb.AgentCloseeReq, reply *pb.RPCMessageReply) error { if a, ok := this.agents.Load(args.UserSessionId); ok { if a.(IAgent).UserId() != "" { this.users.Delete(a.(IAgent).UserId()) } a.(IAgent).Close() this.agents.Delete(args.UserSessionId) } else { reply.ErrorData = &pb.ErrorData{ Code: pb.ErrorCode_UserSessionNobeing, Title: pb.ErrorCode_UserSessionNobeing.ToString(), Message: fmt.Sprintf("解绑SessionId:%s失败!", args.UserSessionId), } } return nil } func (this *AgentMgrComp) QueueChange(sessionId []string) { for _, v := range sessionId { if a, ok := this.agents.Load(v); ok { agent := a.(IAgent) agent.PushQueueChange() } } }