287 lines
8.1 KiB
Go
287 lines
8.1 KiB
Go
package gateway
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"go_dreamfactory/comm"
|
|
"go_dreamfactory/pb"
|
|
"go_dreamfactory/sys/db"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"go_dreamfactory/lego/base"
|
|
"go_dreamfactory/lego/core"
|
|
"go_dreamfactory/lego/core/cbase"
|
|
"go_dreamfactory/lego/sys/log"
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
/*
|
|
用户代理对象管理组件
|
|
*/
|
|
|
|
type AgentMgrComp struct {
|
|
cbase.ModuleCompBase
|
|
options *Options
|
|
service base.IRPCXService
|
|
module *Gateway
|
|
agents *sync.Map
|
|
users *sync.Map
|
|
onlineuser int32
|
|
}
|
|
|
|
func (this *AgentMgrComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
|
|
err = this.ModuleCompBase.Init(service, module, comp, options)
|
|
this.options = options.(*Options)
|
|
this.service = service.(base.IRPCXService)
|
|
this.module = module.(*Gateway)
|
|
this.agents = new(sync.Map)
|
|
this.users = new(sync.Map)
|
|
return
|
|
}
|
|
|
|
func (this *AgentMgrComp) Start() (err error) {
|
|
err = this.ModuleCompBase.Start()
|
|
return
|
|
}
|
|
|
|
func (this *AgentMgrComp) getAgent(sid string) (agent IAgent) {
|
|
var (
|
|
a any
|
|
ok bool
|
|
)
|
|
if a, ok = this.agents.Load(sid); !ok {
|
|
return
|
|
}
|
|
agent = a.(IAgent)
|
|
return
|
|
}
|
|
|
|
func (this *AgentMgrComp) getonlineNum() int32 {
|
|
return atomic.LoadInt32(&this.onlineuser)
|
|
}
|
|
|
|
// Connect 加入新的用户
|
|
func (this *AgentMgrComp) Connect(a IAgent) {
|
|
this.agents.Store(a.SessionId(), a)
|
|
}
|
|
|
|
// DisConnect 移除断开的用户
|
|
func (this *AgentMgrComp) DisConnect(a IAgent) {
|
|
if a.(IAgent).UserId() != "" {
|
|
this.users.Delete(a.(IAgent).UserId())
|
|
}
|
|
|
|
this.agents.Delete(a.SessionId())
|
|
if a.UserId() != "" { //登录用户 通知业务服务处理玩家离线相关
|
|
atomic.AddInt32(&this.onlineuser, -1)
|
|
if _, err := this.service.RpcGo(context.Background(), fmt.Sprintf("%s/%s", comm.Service_Worker, a.WorkerId()), string(comm.Rpc_GatewayNoticeUserClose), &pb.NoticeUserCloseReq{
|
|
Ip: a.IP(),
|
|
ServiceTag: this.service.GetTag(),
|
|
GatewayServiceId: this.service.GetId(),
|
|
UserSessionId: a.SessionId(),
|
|
UserId: a.UserId(),
|
|
}, nil); err != nil {
|
|
log.Errorf("uId:%s Rpc_NoticeUserClose err:%v", a.UserId(), err)
|
|
}
|
|
|
|
//通知运维服务器 处理用户离线数据同步
|
|
if _, err := this.service.RpcGo(context.Background(), comm.Service_Mainte, string(comm.Rpc_GatewayNoticeUserClose), &pb.NoticeUserCloseReq{
|
|
Ip: a.IP(),
|
|
ServiceTag: this.service.GetTag(),
|
|
GatewayServiceId: this.service.GetId(),
|
|
UserSessionId: a.SessionId(),
|
|
UserId: a.UserId(),
|
|
}, nil); err != nil {
|
|
log.Errorf("uId:%s Rpc_NoticeUserClose err:%v", a.UserId(), err)
|
|
}
|
|
// if this.options.SpanServiceTag != "" {
|
|
//推送跨服集群处理
|
|
if _, err := this.service.AcrossClusterRpcGo(context.Background(), db.CrossTag(), fmt.Sprintf("%s/%s", comm.Service_Worker, a.CrosssWorkerId()), string(comm.Rpc_GatewayNoticeUserClose), &pb.NoticeUserCloseReq{
|
|
Ip: a.IP(),
|
|
ServiceTag: this.service.GetTag(),
|
|
GatewayServiceId: this.service.GetId(),
|
|
UserSessionId: a.SessionId(),
|
|
UserId: a.UserId(),
|
|
}, nil); err != nil {
|
|
log.Errorf("uId:%s Rpc_NoticeUserClose err:%v", a.UserId(), err)
|
|
}
|
|
|
|
if _, err := this.service.AcrossClusterRpcGo(context.Background(), db.CrossTag(), comm.Service_Mainte, string(comm.Rpc_GatewayNoticeUserClose), &pb.NoticeUserCloseReq{
|
|
Ip: a.IP(),
|
|
ServiceTag: this.service.GetTag(),
|
|
GatewayServiceId: this.service.GetId(),
|
|
UserSessionId: a.SessionId(),
|
|
UserId: a.UserId(),
|
|
}, nil); err != nil {
|
|
log.Errorf("uId:%s Rpc_NoticeUserClose err:%v", a.UserId(), err)
|
|
}
|
|
// }
|
|
}
|
|
}
|
|
|
|
func (this *AgentMgrComp) Logined(a IAgent) {
|
|
this.users.Store(a.UserId(), a)
|
|
atomic.AddInt32(&this.onlineuser, 1)
|
|
}
|
|
|
|
// UnBind 用户解绑Id
|
|
func (this *AgentMgrComp) UnBind(ctx context.Context, args *pb.AgentUnBuildReq, reply *pb.RPCMessageReply) error {
|
|
if a, ok := this.agents.Load(args.UserSessionId); ok {
|
|
this.users.Delete(a.(IAgent).UserId())
|
|
a.(IAgent).UnBind()
|
|
} else {
|
|
reply.ErrorData = &pb.ErrorData{
|
|
Code: pb.ErrorCode_UserSessionNobeing,
|
|
Title: pb.ErrorCode_UserSessionNobeing.ToString(),
|
|
Message: fmt.Sprintf("解绑SessionId:%s失败!", args.UserSessionId),
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SendMsgToAgent 向用户发送消息
|
|
func (this *AgentMgrComp) SendMsgToAgent(ctx context.Context, args *pb.AgentSendMessageReq, reply *pb.RPCMessageReply) error {
|
|
this.module.Debugf("SendMsgToAgent: agent:%s msg:%v", args.UserSessionId, args.Reply)
|
|
if a, ok := this.agents.Load(args.UserSessionId); ok {
|
|
for _, v := range args.Reply {
|
|
a.(IAgent).WriteMsg(v)
|
|
}
|
|
} else {
|
|
reply.ErrorData = &pb.ErrorData{
|
|
Code: pb.ErrorCode_UserSessionNobeing,
|
|
Title: pb.ErrorCode_UserSessionNobeing.ToString(),
|
|
Message: fmt.Sprintf("解绑SessionId:%s失败!", args.UserSessionId),
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SendMsgToAgents 向多个户发送消息
|
|
func (this *AgentMgrComp) SendMsgToAgents(ctx context.Context, args *pb.BatchMessageReq, reply *pb.RPCMessageReply) (err error) {
|
|
var (
|
|
data []byte
|
|
)
|
|
msg := &pb.UserMessage{
|
|
MainType: args.MainType,
|
|
SubType: args.SubType,
|
|
Data: args.Data,
|
|
}
|
|
this.module.Debugf("SendMsgToAgents: agents:%v msg:%v", args.UserSessionIds, msg)
|
|
if data, err = proto.Marshal(msg); err != nil {
|
|
return
|
|
}
|
|
for _, v := range args.UserSessionIds {
|
|
if a, ok := this.agents.Load(v); ok {
|
|
agent := a.(IAgent)
|
|
if agent.UserId() != "" { //自发送登录用户
|
|
if err = agent.WriteBytes(data); err != nil {
|
|
this.module.Errorln(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SendMsgToAllAgent 向所有户发送消息
|
|
func (this *AgentMgrComp) SendMsgToAllAgent(ctx context.Context, args *pb.BroadCastMessageReq, reply *pb.RPCMessageReply) (err error) {
|
|
var (
|
|
data []byte
|
|
)
|
|
msg := &pb.UserMessage{
|
|
MainType: args.MainType,
|
|
SubType: args.SubType,
|
|
Data: args.Data,
|
|
}
|
|
this.module.Debugf("SendMsgToAllAgent: msg:%v", msg)
|
|
if data, err = proto.Marshal(msg); err != nil {
|
|
return
|
|
}
|
|
this.agents.Range(func(key, value any) bool {
|
|
agent := value.(IAgent)
|
|
if agent.UserId() != "" { //只发送登录用户
|
|
agent.WriteBytes(data)
|
|
}
|
|
return true
|
|
})
|
|
return
|
|
}
|
|
|
|
// SendMsgToAllAgent 向所有户发送消息
|
|
func (this *AgentMgrComp) SendMsgToGroupAgent(ctx context.Context, args *pb.BroadCastMessageReq, reply *pb.RPCMessageReply) (err error) {
|
|
var (
|
|
data []byte
|
|
)
|
|
msg := &pb.UserMessage{
|
|
MainType: args.MainType,
|
|
SubType: args.SubType,
|
|
Data: args.Data,
|
|
}
|
|
this.module.Debugf("SendMsgToAllAgent: msg:%v", msg)
|
|
if data, err = proto.Marshal(msg); err != nil {
|
|
return
|
|
}
|
|
this.agents.Range(func(key, value any) bool {
|
|
agent := value.(IAgent)
|
|
if agent.UserId() != "" && agent.Group() == args.Group { //只发送登录用户
|
|
agent.WriteBytes(data)
|
|
}
|
|
return true
|
|
})
|
|
return
|
|
}
|
|
|
|
// SendMsgToAllAgent 向所有户发送消息
|
|
func (this *AgentMgrComp) SendMsgToUsers(ctx context.Context, args *pb.BatchUsersMessageReq, reply *pb.RPCMessageReply) (err error) {
|
|
var (
|
|
data []byte
|
|
)
|
|
msg := &pb.UserMessage{
|
|
MainType: args.MainType,
|
|
SubType: args.SubType,
|
|
Data: args.Data,
|
|
}
|
|
this.module.Debugf("SendMsgToAgents: agents:%v msg:%v", args.Uids, msg)
|
|
if data, err = proto.Marshal(msg); err != nil {
|
|
return
|
|
}
|
|
for _, v := range args.Uids {
|
|
if a, ok := this.users.Load(v); ok {
|
|
agent := a.(IAgent)
|
|
if err = agent.WriteBytes(data); err != nil {
|
|
this.module.Errorln(err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CloseAgent 关闭某个用户
|
|
func (this *AgentMgrComp) CloseAgent(ctx context.Context, args *pb.AgentCloseeReq, reply *pb.RPCMessageReply) error {
|
|
if a, ok := this.agents.Load(args.UserSessionId); ok {
|
|
if a.(IAgent).UserId() != "" {
|
|
this.users.Delete(a.(IAgent).UserId())
|
|
}
|
|
a.(IAgent).Close()
|
|
this.agents.Delete(args.UserSessionId)
|
|
} else {
|
|
reply.ErrorData = &pb.ErrorData{
|
|
Code: pb.ErrorCode_UserSessionNobeing,
|
|
Title: pb.ErrorCode_UserSessionNobeing.ToString(),
|
|
Message: fmt.Sprintf("解绑SessionId:%s失败!", args.UserSessionId),
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (this *AgentMgrComp) QueueChange(sessionId []string) {
|
|
for _, v := range sessionId {
|
|
if a, ok := this.agents.Load(v); ok {
|
|
agent := a.(IAgent)
|
|
agent.PushQueueChange()
|
|
}
|
|
}
|
|
}
|