go_dreamfactory/modules/gateway/agentmgr_comp.go
2022-06-07 20:18:22 +08:00

106 lines
2.9 KiB
Go

package gateway
import (
"context"
"go_dreamfactory/pb"
"sync"
"go_dreamfactory/lego/core"
"go_dreamfactory/lego/core/cbase"
)
type AgentMgr_Comp struct {
cbase.ModuleCompBase
agents *sync.Map
}
func (this *AgentMgr_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
err = this.ModuleCompBase.Init(service, module, comp, options)
this.agents = new(sync.Map)
return
}
func (this *AgentMgr_Comp) Connect(a IAgent) {
this.agents.Store(a.SessionId(), a)
}
func (this *AgentMgr_Comp) DisConnect(a IAgent) {
this.agents.Delete(a.SessionId())
}
//用户登录绑定Id
func (this *AgentMgr_Comp) Build(ctx context.Context, args *pb.AgentBuildReq, reply *pb.RPCMessageReply) error {
if a, ok := this.agents.Load(args.UserSessionId); ok {
a.(IAgent).Build(args.UserId)
} else {
reply.Code = pb.ErrorCode_UserSessionNobeing
reply.Msg = pb.GetErrorCodeMsg(pb.ErrorCode_UserSessionNobeing)
}
return nil
}
//用户登录解绑Id
func (this *AgentMgr_Comp) UnBuild(ctx context.Context, args *pb.AgentUnBuildReq, reply *pb.RPCMessageReply) error {
if a, ok := this.agents.Load(args.UserSessionId); ok {
a.(IAgent).UnBuild()
} else {
reply.Code = pb.ErrorCode_UserSessionNobeing
reply.Msg = pb.GetErrorCodeMsg(pb.ErrorCode_UserSessionNobeing)
}
return nil
}
//向用户发送消息
func (this *AgentMgr_Comp) SendMsgToAgent(ctx context.Context, args *pb.AgentSendMessageReq, reply *pb.RPCMessageReply) error {
if a, ok := this.agents.Load(args.UserSessionId); ok {
a.(IAgent).WriteMsg(&pb.UserMessage{
MainType: args.MainType,
SubType: args.SubType,
Code: args.Code,
Data: args.Data,
})
} else {
reply.Code = pb.ErrorCode_UserSessionNobeing
reply.Msg = pb.GetErrorCodeMsg(pb.ErrorCode_UserSessionNobeing)
}
return nil
}
//向多个户发送消息
func (this *AgentMgr_Comp) SendMsgToAgents(ctx context.Context, args *pb.BatchMessageReq, reply *pb.RPCMessageReply) error {
msg := &pb.UserMessage{
MainType: args.MainType,
SubType: args.SubType,
Data: args.Data,
}
for _, v := range args.UserSessionIds {
if a, ok := this.agents.Load(v); ok {
a.(IAgent).WriteMsg(msg)
}
}
return nil
}
//向所有户发送消息
func (this *AgentMgr_Comp) SendMsgToAllAgent(ctx context.Context, args *pb.BroadCastMessageReq, reply *pb.RPCMessageReply) error {
msg := &pb.UserMessage{
MainType: args.MainType,
SubType: args.SubType,
Data: args.Data,
}
this.agents.Range(func(key, value any) bool {
value.(IAgent).WriteMsg(msg)
return true
})
return nil
}
//关闭代理
func (this *AgentMgr_Comp) CloseAgent(ctx context.Context, args *pb.AgentCloseeReq, reply *pb.RPCMessageReply) error {
if a, ok := this.agents.Load(args.UserSessionId); ok {
a.(IAgent).Close()
} else {
reply.Code = pb.ErrorCode_UserSessionNobeing
reply.Msg = pb.GetErrorCodeMsg(pb.ErrorCode_UserSessionNobeing)
}
return nil
}