修复在线玩家

This commit is contained in:
wh_zcy 2023-03-16 18:13:20 +08:00
parent 457ed31fff
commit d3f0f0aada
2 changed files with 42 additions and 28 deletions

View File

@ -31,7 +31,7 @@ func (this *ModelSession) Start() (err error) {
return return
} }
//获取用户 // 获取用户
func (this *ModelSession) getUserSession(uid string) (user *pb.CacheUser) { func (this *ModelSession) getUserSession(uid string) (user *pb.CacheUser) {
user = &pb.CacheUser{} user = &pb.CacheUser{}
if err := this.GetListObj(comm.RDS_EMPTY, uid, user); err != nil { if err := this.GetListObj(comm.RDS_EMPTY, uid, user); err != nil {
@ -43,14 +43,21 @@ func (this *ModelSession) getUserSession(uid string) (user *pb.CacheUser) {
return user return user
} }
//设置用户session // 设置用户session
func (this *ModelSession) addUserSession(uid string, session comm.IUserSession) (err error) { func (this *ModelSession) addUserSession(uid string, session comm.IUserSession) (err error) {
if err = this.AddList(comm.RDS_EMPTY, uid, map[string]interface{}{ // if err = this.AddList(comm.RDS_EMPTY, uid, map[string]interface{}{
"uid": uid, // "uid": uid,
"sessionId": session.GetSessionId(), // "sessionId": session.GetSessionId(),
"serviceTag": session.GetServiecTag(), // "serviceTag": session.GetServiecTag(),
"gatewayServiceId": session.GetGatewayServiceId(), // "gatewayServiceId": session.GetGatewayServiceId(),
"ip": session.GetIP(), // "ip": session.GetIP(),
// }, db.SetDBMgoLog(false)); err != nil {
if err = this.AddList(comm.RDS_EMPTY, uid, &pb.CacheUser{
Uid: uid,
SessionId: session.GetSessionId(),
ServiceTag: session.GetServiecTag(),
GatewayServiceId: session.GetGatewayServiceId(),
Ip: session.GetIP(),
}, db.SetDBMgoLog(false)); err != nil { }, db.SetDBMgoLog(false)); err != nil {
log.Debug("setUserSession err:%v", log.Field{Key: "uid", Value: uid}, log.Field{Key: "err", Value: err}) log.Debug("setUserSession err:%v", log.Field{Key: "uid", Value: uid}, log.Field{Key: "err", Value: err})
return return

View File

@ -22,14 +22,14 @@ import (
"google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/anypb"
) )
//用户协议处理函数注册的反射对象 // 用户协议处理函数注册的反射对象
type msghandle struct { type msghandle struct {
rcvr reflect.Value rcvr reflect.Value
msgType reflect.Type //消息请求类型 msgType reflect.Type //消息请求类型
handle reflect.Method //处理函数 handle reflect.Method //处理函数
} }
//组件参数 // 组件参数
type CompOptions struct { type CompOptions struct {
} }
@ -49,7 +49,7 @@ func NewGateRouteComp() comm.ISC_GateRouteComp {
return comp return comp
} }
//服务网关组件 // 服务网关组件
type SCompGateRoute struct { type SCompGateRoute struct {
cbase.ServiceCompBase cbase.ServiceCompBase
options *CompOptions options *CompOptions
@ -59,7 +59,7 @@ type SCompGateRoute struct {
pools sync.Pool pools sync.Pool
} }
//设置服务组件名称 方便业务模块中获取此组件对象 // 设置服务组件名称 方便业务模块中获取此组件对象
func (this *SCompGateRoute) GetName() core.S_Comps { func (this *SCompGateRoute) GetName() core.S_Comps {
return comm.SC_ServiceGateRouteComp return comm.SC_ServiceGateRouteComp
} }
@ -68,7 +68,7 @@ func (this *SCompGateRoute) NewOptions() (options core.ICompOptions) {
return new(CompOptions) return new(CompOptions)
} }
//组件初始化函数 // 组件初始化函数
func (this *SCompGateRoute) Init(service core.IService, comp core.IServiceComp, options core.ICompOptions) (err error) { func (this *SCompGateRoute) Init(service core.IService, comp core.IServiceComp, options core.ICompOptions) (err error) {
err = this.ServiceCompBase.Init(service, comp, options) err = this.ServiceCompBase.Init(service, comp, options)
this.options = options.(*CompOptions) this.options = options.(*CompOptions)
@ -83,7 +83,7 @@ func (this *SCompGateRoute) Init(service core.IService, comp core.IServiceComp,
return err return err
} // } //
//组件启动时注册rpc服务监听 // 组件启动时注册rpc服务监听
func (this *SCompGateRoute) Start() (err error) { func (this *SCompGateRoute) Start() (err error) {
this.service.RegisterFunctionName(string(comm.Rpc_GatewayRoute), this.ReceiveMsg) //注册网关路由接收接口 this.service.RegisterFunctionName(string(comm.Rpc_GatewayRoute), this.ReceiveMsg) //注册网关路由接收接口
this.service.RegisterFunctionName(string(comm.Rpc_GatewayNoticeUserLogin), this.NoticeUserLogin) //注册用户登录通知 this.service.RegisterFunctionName(string(comm.Rpc_GatewayNoticeUserLogin), this.NoticeUserLogin) //注册用户登录通知
@ -96,7 +96,7 @@ func (this *SCompGateRoute) Start() (err error) {
return return
} }
//业务模块注册用户消息处理路由 // 业务模块注册用户消息处理路由
func (this *SCompGateRoute) RegisterRoute(methodName string, comp reflect.Value, msg reflect.Type, handele reflect.Method) { func (this *SCompGateRoute) RegisterRoute(methodName string, comp reflect.Value, msg reflect.Type, handele reflect.Method) {
//log.Debugf("注册用户路由【%s】", methodName) //log.Debugf("注册用户路由【%s】", methodName)
this.mrlock.RLock() this.mrlock.RLock()
@ -115,7 +115,7 @@ func (this *SCompGateRoute) RegisterRoute(methodName string, comp reflect.Value,
this.mrlock.Unlock() this.mrlock.Unlock()
} }
//Rpc_GatewayRoute服务接口的接收函数 // Rpc_GatewayRoute服务接口的接收函数
func (this *SCompGateRoute) ReceiveMsg(ctx context.Context, args *pb.AgentMessage, reply *pb.RPCMessageReply) (err error) { func (this *SCompGateRoute) ReceiveMsg(ctx context.Context, args *pb.AgentMessage, reply *pb.RPCMessageReply) (err error) {
method := fmt.Sprintf("%s.%s", args.MainType, args.SubType) method := fmt.Sprintf("%s.%s", args.MainType, args.SubType)
// defer func() { //程序异常 收集异常信息传递给前端显示 // defer func() { //程序异常 收集异常信息传递给前端显示
@ -188,7 +188,7 @@ func (this *SCompGateRoute) ReceiveMsg(ctx context.Context, args *pb.AgentMessag
return nil return nil
} }
//RPC_NoticeUserClose 接收用户登录通知 // RPC_NoticeUserClose 接收用户登录通知
func (this *SCompGateRoute) NoticeUserLogin(ctx context.Context, args *pb.NoticeUserLoginReq, reply *pb.RPCMessageReply) error { func (this *SCompGateRoute) NoticeUserLogin(ctx context.Context, args *pb.NoticeUserLoginReq, reply *pb.RPCMessageReply) error {
conn, err := db.Local() conn, err := db.Local()
if err != nil { if err != nil {
@ -196,12 +196,19 @@ func (this *SCompGateRoute) NoticeUserLogin(ctx context.Context, args *pb.Notice
return err return err
} }
model := db.NewDBModel(comm.TableSession, 0, conn) model := db.NewDBModel(comm.TableSession, 0, conn)
model.AddList(comm.RDS_EMPTY, args.UserId, map[string]interface{}{ // model.AddList(comm.RDS_EMPTY, args.UserId, map[string]interface{}{
"uid": args.UserId, // "uid": args.UserId,
"sessionId": args.UserSessionId, // "sessionId": args.UserSessionId,
"serviceTag": args.ServiceTag, // "serviceTag": args.ServiceTag,
"gatewayServiceId": args.GatewayServiceId, // "gatewayServiceId": args.GatewayServiceId,
"ip": args.Ip, // "ip": args.Ip,
// }, db.SetDBMgoLog(false))
model.AddList(comm.RDS_EMPTY, args.UserId, &pb.CacheUser{
Uid: args.UserId,
SessionId: args.UserSessionId,
ServiceTag: args.ServiceTag,
GatewayServiceId: args.GatewayServiceId,
Ip: args.Ip,
}, db.SetDBMgoLog(false)) }, db.SetDBMgoLog(false))
session := this.pools.Get().(comm.IUserSession) session := this.pools.Get().(comm.IUserSession)
session.SetSession(args.Ip, args.UserSessionId, args.ServiceTag, args.GatewayServiceId, args.UserId) session.SetSession(args.Ip, args.UserSessionId, args.ServiceTag, args.GatewayServiceId, args.UserId)
@ -209,7 +216,7 @@ func (this *SCompGateRoute) NoticeUserLogin(ctx context.Context, args *pb.Notice
return nil return nil
} }
//RPC_NoticeUserClose 接收用户离线通知 // RPC_NoticeUserClose 接收用户离线通知
func (this *SCompGateRoute) NoticeUserClose(ctx context.Context, args *pb.NoticeUserCloseReq, reply *pb.RPCMessageReply) error { func (this *SCompGateRoute) NoticeUserClose(ctx context.Context, args *pb.NoticeUserCloseReq, reply *pb.RPCMessageReply) error {
session := this.pools.Get().(comm.IUserSession) session := this.pools.Get().(comm.IUserSession)
session.SetSession(args.Ip, args.UserSessionId, args.ServiceTag, args.GatewayServiceId, args.UserId) session.SetSession(args.Ip, args.UserSessionId, args.ServiceTag, args.GatewayServiceId, args.UserId)
@ -217,28 +224,28 @@ func (this *SCompGateRoute) NoticeUserClose(ctx context.Context, args *pb.Notice
return nil return nil
} }
//RPC_ConfigureUpDate 接收配置更新消息 // RPC_ConfigureUpDate 接收配置更新消息
func (this *SCompGateRoute) ConfigureUpDate(ctx context.Context, args *pb.EmptyReq, reply *pb.EmptyResp) error { func (this *SCompGateRoute) ConfigureUpDate(ctx context.Context, args *pb.EmptyReq, reply *pb.EmptyResp) error {
log.Debugln("RPC_ConfigureUpDate") log.Debugln("RPC_ConfigureUpDate")
configure.Update() configure.Update()
return nil return nil
} }
//RPC_DBSyncCross 接收配置更新消息 // RPC_DBSyncCross 接收配置更新消息
func (this *SCompGateRoute) DBSyncCross(ctx context.Context, args *pb.EmptyReq, reply *pb.EmptyResp) (err error) { func (this *SCompGateRoute) DBSyncCross(ctx context.Context, args *pb.EmptyReq, reply *pb.EmptyResp) (err error) {
log.Debugln("RPC_DBSyncCross") log.Debugln("RPC_DBSyncCross")
err = db.SyncServiceList() err = db.SyncServiceList()
return return
} }
//获取用户的会话对象 // 获取用户的会话对象
func (this *SCompGateRoute) GetUserSession(udata *pb.CacheUser) (session comm.IUserSession) { func (this *SCompGateRoute) GetUserSession(udata *pb.CacheUser) (session comm.IUserSession) {
session = this.pools.Get().(comm.IUserSession) session = this.pools.Get().(comm.IUserSession)
session.SetSession(udata.Ip, udata.SessionId, udata.ServiceTag, udata.GatewayServiceId, udata.Uid) session.SetSession(udata.Ip, udata.SessionId, udata.ServiceTag, udata.GatewayServiceId, udata.Uid)
return return
} }
//获取用户的会话对象 // 获取用户的会话对象
func (this *SCompGateRoute) PutUserSession(session comm.IUserSession) { func (this *SCompGateRoute) PutUserSession(session comm.IUserSession) {
this.pools.Put(session) this.pools.Put(session)
return return