go_dreamfactory/comm/usersession.go
2024-01-25 10:49:18 +08:00

226 lines
5.9 KiB
Go

package comm
import (
"context"
"fmt"
"go_dreamfactory/pb"
"sync"
"go_dreamfactory/lego/sys/log"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
/*
用户会话对象 跨服操作用户的对象
*/
func NewUserSessionByPools(service IService) IUserSession {
return &UserSession{
msgqueue: make([]*pb.UserMessage, 0),
mate: make(map[string]interface{}),
service: service,
}
}
type UserSession struct {
IP string
SessionId string
ServiceTag string
GatewayServiceId string //用户所在网关服务
UserId string
Group int32 //用户分组
Online bool
service IService
msgqueue []*pb.UserMessage
lock sync.RWMutex
mate map[string]interface{}
}
// 重置
func (this *UserSession) SetSession(ip, sessionId, stag, sid, uid string, group int32) {
this.IP = ip
this.SessionId = sessionId
this.ServiceTag = stag
this.GatewayServiceId = sid
this.UserId = uid
this.Group = group
this.msgqueue = this.msgqueue[:0]
this.mate = make(map[string]interface{})
if sessionId != "" {
this.Online = true
}
}
// 重置
func (this *UserSession) Reset() {
this.IP = ""
this.SessionId = ""
this.GatewayServiceId = ""
this.UserId = ""
this.Online = false
this.msgqueue = this.msgqueue[:0]
this.mate = make(map[string]interface{})
}
// 获取用户的会话id
func (this *UserSession) GetSessionId() string {
return this.SessionId
}
// 获取用户的uid
func (this *UserSession) GetUserId() string {
return this.UserId
}
//用户分组
func (this *UserSession) GetGroup() int32 {
return this.Group
}
// 获取用户的远程ip地址
func (this *UserSession) GetIP() string {
return this.IP
}
// 会话所在集群
func (this *UserSession) GetServiecTag() string {
return this.ServiceTag
}
// 用户当先所在网关服务
func (this *UserSession) GetGatewayServiceId() string {
return this.GatewayServiceId
}
// 是否登录
func (this *UserSession) IsLogin() bool {
return this.UserId != ""
}
func (this *UserSession) IsOnline() bool {
return this.Online
}
// /绑定uid 登录后操作
// /uid 用户id
// /wokerId 用户绑定worker服务id
func (this *UserSession) Bind(uid string, wokerId string) (err error) {
reply := &pb.RPCMessageReply{}
if err = this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentBind), &pb.AgentBuildReq{
UserSessionId: this.SessionId,
UserId: uid,
WorkerId: wokerId,
}, reply); err != nil {
log.Errorf("Bind UserSession:%s UserId:%s err:%v", this.SessionId, this.UserId, err)
return
}
this.UserId = uid
return
}
// 解绑uid 注销和切换账号是处理
func (this *UserSession) UnBind() (err error) {
reply := &pb.RPCMessageReply{}
if err = this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentUnBind), &pb.AgentUnBuildReq{
UserSessionId: this.SessionId,
}, reply); err != nil {
log.Errorf("UnBuild UserSession:%s UserId:%s err:%v", this.SessionId, this.UserId, err)
return
}
this.UserId = ""
return
}
//写入元数据
func (this *UserSession) SetMate(name string, value interface{}) {
this.lock.Lock()
this.mate[name] = value
this.lock.Unlock()
}
//写入元数据
func (this *UserSession) GetMate(name string) (ok bool, value interface{}) {
this.lock.RLock()
value, ok = this.mate[name]
this.lock.RUnlock()
return
}
// 向用户发送消息
func (this *UserSession) SendMsg(mainType, subType string, msg proto.Message) (err error) {
// log.Debugf("SendMsg to UserId:[%s] Data: %v", this.UserId, msg)
data, _ := anypb.New(msg)
this.msgqueue = append(this.msgqueue, &pb.UserMessage{
MainType: mainType,
SubType: subType,
Data: data,
})
return
}
// 关闭用户连接对象
func (this *UserSession) Close() (err error) {
reply := &pb.RPCMessageReply{}
if err = this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentClose), &pb.AgentCloseeReq{
UserSessionId: this.SessionId,
}, reply); err != nil {
log.Errorf("Close UserSession:%s UserId:%s err:%v", this.SessionId, this.UserId, err)
}
return
}
// 清空消息队列
func (this *UserSession) Polls() []*pb.UserMessage {
msgs := this.msgqueue
this.msgqueue = this.msgqueue[:0]
return msgs
}
// 推送消息到用户
func (this *UserSession) Push() (err error) {
// reply := &pb.RPCMessageReply{}
if len(this.msgqueue) > 0 {
if _, err = this.service.AcrossClusterRpcGo(context.Background(), this.ServiceTag, fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentSendMsg), &pb.AgentSendMessageReq{
UserSessionId: this.SessionId,
Reply: this.msgqueue,
}, nil); err != nil {
log.Errorf("Push:%v err:%s", this, err.Error())
}
}
this.msgqueue = this.msgqueue[:0]
return
}
func (this *UserSession) SyncPush() (err error) {
// reply := &pb.RPCMessageReply{}
if len(this.msgqueue) > 0 {
if err = this.service.AcrossClusterRpcCall(context.Background(), this.ServiceTag, fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentSendMsg), &pb.AgentSendMessageReq{
UserSessionId: this.SessionId,
Reply: this.msgqueue,
}, &pb.RPCMessageReply{}); err != nil {
log.Errorf("SendMsgToUsers:%v err:%v", this, err)
}
}
this.msgqueue = this.msgqueue[:0]
return
}
// 克隆
func (this *UserSession) Clone() (session IUserSession) {
session = this.service.GetUserSession()
session.SetSession(this.IP, this.SessionId, this.ServiceTag, this.GatewayServiceId, this.UserId, this.Group)
this.lock.RLock()
for k, v := range this.mate {
session.SetMate(k, v)
}
this.lock.RUnlock()
return
}
// 打印日志需要
func (this *UserSession) ToString() string {
return fmt.Sprintf("SessionId:%s UserId:%s GatewayServiceId:%s", this.SessionId, this.UserId, this.GatewayServiceId)
}