package comm import ( "context" "fmt" "go_dreamfactory/pb" "sync" "go_dreamfactory/lego/sys/log" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" ) /* 用户会话对象 跨服操作用户的对象 */ func NewUserSessionByPools(service IService) IUserSession { return &UserSession{ msgqueue: make([]*pb.UserMessage, 0), mate: make(map[string]interface{}), service: service, } } type UserSession struct { IP string SessionId string ServiceTag string GatewayServiceId string //用户所在网关服务 UserId string Group int32 //用户分组 Online bool service IService msgqueue []*pb.UserMessage lock sync.RWMutex mate map[string]interface{} } // 重置 func (this *UserSession) SetSession(ip, sessionId, stag, sid, uid string, group int32) { this.IP = ip this.SessionId = sessionId this.ServiceTag = stag this.GatewayServiceId = sid this.UserId = uid this.Group = group this.msgqueue = this.msgqueue[:0] this.mate = make(map[string]interface{}) if sessionId != "" { this.Online = true } } // 重置 func (this *UserSession) Reset() { this.IP = "" this.SessionId = "" this.GatewayServiceId = "" this.UserId = "" this.Online = false this.msgqueue = this.msgqueue[:0] this.mate = make(map[string]interface{}) } // 获取用户的会话id func (this *UserSession) GetSessionId() string { return this.SessionId } // 获取用户的uid func (this *UserSession) GetUserId() string { return this.UserId } //用户分组 func (this *UserSession) GetGroup() int32 { return this.Group } // 获取用户的远程ip地址 func (this *UserSession) GetIP() string { return this.IP } // 会话所在集群 func (this *UserSession) GetServiecTag() string { return this.ServiceTag } // 用户当先所在网关服务 func (this *UserSession) GetGatewayServiceId() string { return this.GatewayServiceId } // 是否登录 func (this *UserSession) IsLogin() bool { return this.UserId != "" } func (this *UserSession) IsOnline() bool { return this.Online } // /绑定uid 登录后操作 // /uid 用户id // /wokerId 用户绑定worker服务id func (this *UserSession) Bind(uid string, wokerId string) (err error) { reply := &pb.RPCMessageReply{} if err = this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentBind), &pb.AgentBuildReq{ UserSessionId: this.SessionId, UserId: uid, WorkerId: wokerId, }, reply); err != nil { log.Errorf("Bind UserSession:%s UserId:%s err:%v", this.SessionId, this.UserId, err) return } this.UserId = uid return } // 解绑uid 注销和切换账号是处理 func (this *UserSession) UnBind() (err error) { reply := &pb.RPCMessageReply{} if err = this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentUnBind), &pb.AgentUnBuildReq{ UserSessionId: this.SessionId, }, reply); err != nil { log.Errorf("UnBuild UserSession:%s UserId:%s err:%v", this.SessionId, this.UserId, err) return } this.UserId = "" return } //写入元数据 func (this *UserSession) SetMate(name string, value interface{}) { this.lock.Lock() this.mate[name] = value this.lock.Unlock() } //写入元数据 func (this *UserSession) GetMate(name string) (ok bool, value interface{}) { this.lock.RLock() value, ok = this.mate[name] this.lock.RUnlock() return } // 向用户发送消息 func (this *UserSession) SendMsg(mainType, subType string, msg proto.Message) (err error) { // log.Debugf("SendMsg to UserId:[%s] Data: %v", this.UserId, msg) data, _ := anypb.New(msg) this.msgqueue = append(this.msgqueue, &pb.UserMessage{ MainType: mainType, SubType: subType, Data: data, }) return } // 关闭用户连接对象 func (this *UserSession) Close() (err error) { reply := &pb.RPCMessageReply{} if err = this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentClose), &pb.AgentCloseeReq{ UserSessionId: this.SessionId, }, reply); err != nil { log.Errorf("Close UserSession:%s UserId:%s err:%v", this.SessionId, this.UserId, err) } return } // 清空消息队列 func (this *UserSession) Polls() []*pb.UserMessage { msgs := this.msgqueue this.msgqueue = this.msgqueue[:0] return msgs } // 推送消息到用户 func (this *UserSession) Push() (err error) { // reply := &pb.RPCMessageReply{} if len(this.msgqueue) > 0 { if _, err = this.service.AcrossClusterRpcGo(context.Background(), this.ServiceTag, fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentSendMsg), &pb.AgentSendMessageReq{ UserSessionId: this.SessionId, Reply: this.msgqueue, }, nil); err != nil { log.Errorf("Push:%v err:%s", this, err.Error()) } } this.msgqueue = this.msgqueue[:0] return } func (this *UserSession) SyncPush() (err error) { // reply := &pb.RPCMessageReply{} if len(this.msgqueue) > 0 { if err = this.service.AcrossClusterRpcCall(context.Background(), this.ServiceTag, fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentSendMsg), &pb.AgentSendMessageReq{ UserSessionId: this.SessionId, Reply: this.msgqueue, }, &pb.RPCMessageReply{}); err != nil { log.Errorf("SendMsgToUsers:%v err:%v", this, err) } } this.msgqueue = this.msgqueue[:0] return } // 克隆 func (this *UserSession) Clone() (session IUserSession) { session = this.service.GetUserSession() session.SetSession(this.IP, this.SessionId, this.ServiceTag, this.GatewayServiceId, this.UserId, this.Group) this.lock.RLock() for k, v := range this.mate { session.SetMate(k, v) } this.lock.RUnlock() return } // 打印日志需要 func (this *UserSession) ToString() string { return fmt.Sprintf("SessionId:%s UserId:%s GatewayServiceId:%s", this.SessionId, this.UserId, this.GatewayServiceId) }