上传登录排队逻辑

This commit is contained in:
liwei1dao 2023-09-04 15:52:47 +08:00
parent f0be3be2bf
commit bc0bed7e94
10 changed files with 953 additions and 620 deletions

View File

@ -1 +1 @@
86400.000000 0.000000

View File

@ -37,7 +37,6 @@ func newAgent(gateway IGateway, conn *websocket.Conn) *Agent {
writeChan: make(chan [][]byte, 2), writeChan: make(chan [][]byte, 2),
closeSignal: make(chan bool), closeSignal: make(chan bool),
state: 1, state: 1,
protoMsg: make(map[string]int64, 0),
} }
agent.wg.Add(2) agent.wg.Add(2)
go agent.readLoop() go agent.readLoop()
@ -47,24 +46,26 @@ func newAgent(gateway IGateway, conn *websocket.Conn) *Agent {
// 用户代理 // 用户代理
type Agent struct { type Agent struct {
gateway IGateway gateway IGateway
wsConn *websocket.Conn wsConn *websocket.Conn
sessionId string sessionId string
uId string uId string
wId string wId string
writeChan chan [][]byte writeChan chan [][]byte
closeSignal chan bool closeSignal chan bool
state int32 //状态 0 关闭 1 运行 2 关闭中 state int32 //状态 0 关闭 1 运行 2 关闭中
wg sync.WaitGroup wg sync.WaitGroup
protoMsg map[string]int64 queueIndex int32 //排队编号
lastpushtime time.Time //上次推送时间
} }
func (this *Agent) readLoop() { func (this *Agent) readLoop() {
defer this.wg.Done() defer this.wg.Done()
var ( var (
data []byte data []byte
msg *pb.UserMessage = &pb.UserMessage{} msg *pb.UserMessage = &pb.UserMessage{}
err error errdata *pb.ErrorData
err error
) )
locp: locp:
for { for {
@ -93,10 +94,45 @@ locp:
}) })
continue continue
} }
var errdata *pb.ErrorData
errdata = this.secAuth(msg) errdata = this.secAuth(msg)
if errdata == nil { if errdata == nil {
// this.gateway.Debugf("----------2 agent:%s uId:%s MainType:%s SubType:%s ", this.sessionId, this.uId, msg.MainType, msg.SubType) // this.gateway.Debugf("----------2 agent:%s uId:%s MainType:%s SubType:%s ", this.sessionId, this.uId, msg.MainType, msg.SubType)
if msg.MainType == string(comm.ModuleUser) && msg.SubType == "login" { //登录排队
if this.queueIndex, err = this.gateway.InLoginQueue(this.sessionId, msg); err != nil {
this.gateway.Errorf("messageDistribution err:%v", err)
data, _ := anypb.New(&pb.NotifyErrorNotifyPush{
MsgId: msg.MsgId,
ReqMainType: msg.MainType,
ReqSubType: msg.SubType,
Arg: msg.Data,
Code: pb.ErrorCode_GatewayException,
Err: &pb.ErrorData{Title: "用户消息处理失败!", Datastring: err.Error()},
})
err = this.WriteMsg(&pb.UserMessage{
MsgId: msg.MsgId,
MainType: comm.MainTypeNotify,
SubType: comm.SubTypeErrorNotify,
Data: data,
})
go this.Close()
break locp
} else {
if this.queueIndex > 0 {
this.lastpushtime = time.Now()
data, _ := anypb.New(&pb.UserLoginQueueChangePush{
Index: this.queueIndex,
})
err = this.WriteMsg(&pb.UserMessage{
MsgId: msg.MsgId,
MainType: string(comm.ModuleUser),
SubType: "loginqueuechange",
Data: data,
})
}
}
continue
}
if err = this.messageDistribution(msg); err != nil { if err = this.messageDistribution(msg); err != nil {
this.gateway.Errorf("messageDistribution err:%v", err) this.gateway.Errorf("messageDistribution err:%v", err)
data, _ := anypb.New(&pb.NotifyErrorNotifyPush{ data, _ := anypb.New(&pb.NotifyErrorNotifyPush{
@ -116,7 +152,6 @@ locp:
go this.Close() go this.Close()
break locp break locp
} }
} else { } else {
this.gateway.Errorf("agent:%s uId:%s 密钥无效 err:%v", this.sessionId, this.uId, err) this.gateway.Errorf("agent:%s uId:%s 密钥无效 err:%v", this.sessionId, this.uId, err)
data, _ := anypb.New(&pb.NotifyErrorNotifyPush{ data, _ := anypb.New(&pb.NotifyErrorNotifyPush{
@ -136,7 +171,6 @@ locp:
break locp break locp
} }
} }
} }
} }
this.gateway.Debugf("agent:%s uId:%s readLoop end!", this.sessionId, this.uId) this.gateway.Debugf("agent:%s uId:%s readLoop end!", this.sessionId, this.uId)
@ -325,6 +359,28 @@ func (this *Agent) Close() {
this.gateway.DisConnect(this) this.gateway.DisConnect(this)
} }
//处理用户消息 提供给外部使用 比如 登录等待逻辑
func (this *Agent) HandleMessage(msg *pb.UserMessage) (err error) {
if err = this.messageDistribution(msg); err != nil {
this.gateway.Errorf("messageDistribution err:%v", err)
data, _ := anypb.New(&pb.NotifyErrorNotifyPush{
MsgId: msg.MsgId,
ReqMainType: msg.MainType,
ReqSubType: msg.SubType,
Arg: msg.Data,
Code: pb.ErrorCode_GatewayException,
Err: &pb.ErrorData{Title: "用户消息处理失败!", Datastring: err.Error()},
})
err = this.WriteMsg(&pb.UserMessage{
MsgId: msg.MsgId,
MainType: comm.MainTypeNotify,
SubType: comm.SubTypeErrorNotify,
Data: data,
})
}
return
}
// 分发用户消息 // 分发用户消息
func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) { func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) {
var ( var (
@ -440,3 +496,23 @@ func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) {
} }
return nil return nil
} }
//推送排队变化消息
func (this *Agent) PushQueueChange() {
this.queueIndex--
if time.Now().Sub(this.lastpushtime).Seconds() < 1 { //间隔少于1秒 不发送 避免io爆炸
return
}
data, _ := anypb.New(&pb.UserLoginQueueChangePush{
Index: this.queueIndex,
})
if err := this.WriteMsg(&pb.UserMessage{
MainType: string(comm.ModuleUser),
SubType: "loginqueuechange",
Data: data,
}); err != nil {
this.gateway.Errorf("pushQueueChange err:%v", err)
}
this.lastpushtime = time.Now()
return
}

View File

@ -42,6 +42,18 @@ func (this *AgentMgrComp) Start() (err error) {
return return
} }
func (this *AgentMgrComp) getAgent(sid string) (agent IAgent) {
var (
a any
ok bool
)
if a, ok = this.agents.Load(sid); !ok {
return
}
agent = a.(IAgent)
return
}
// Connect 加入新的用户 // Connect 加入新的用户
func (this *AgentMgrComp) Connect(a IAgent) { func (this *AgentMgrComp) Connect(a IAgent) {
this.agents.Store(a.SessionId(), a) this.agents.Store(a.SessionId(), a)
@ -210,17 +222,11 @@ func (this *AgentMgrComp) CloseAgent(ctx context.Context, args *pb.AgentCloseeRe
return nil return nil
} }
// 用户代理心跳 维护session表对象 func (this *AgentMgrComp) QueueChange(sessionId []string) {
func (this *AgentMgrComp) agentsHeartbeat() { for _, v := range sessionId {
// var ( if a, ok := this.agents.Load(v); ok {
// loadpipe *pipe.RedisPipe agent := a.(IAgent)
// loadpipe *pipe.RedisPipe agent.PushQueueChange()
// ) }
// this.agents.Range(func(key, value any) bool { }
// agent := value.(IAgent)
// if agent.UserId() != "" { //只发送登录用户
// }
// return true
// })
} }

View File

@ -20,6 +20,8 @@ type (
UnBind() UnBind()
WriteMsg(msg *pb.UserMessage) (err error) WriteMsg(msg *pb.UserMessage) (err error)
WriteBytes(data []byte) (err error) WriteBytes(data []byte) (err error)
HandleMessage(msg *pb.UserMessage) (err error)
PushQueueChange()
Close() //主动关闭接口 Close() //主动关闭接口
} }
// IGateway 网关模块 接口定义 // IGateway 网关模块 接口定义
@ -30,6 +32,7 @@ type (
Connect(a IAgent) Connect(a IAgent)
DisConnect(a IAgent) DisConnect(a IAgent)
GetMsgDistribute(msgmid, msguid string) (rule string, ok bool) GetMsgDistribute(msgmid, msguid string) (rule string, ok bool)
InLoginQueue(sessionId string, login *pb.UserMessage) (index int32, err error)
} }
) )

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"go_dreamfactory/comm" "go_dreamfactory/comm"
"go_dreamfactory/pb"
"go_dreamfactory/lego/base" "go_dreamfactory/lego/base"
"go_dreamfactory/lego/core" "go_dreamfactory/lego/core"
@ -29,6 +30,7 @@ type Gateway struct {
service base.IRPCXService // rpcx服务接口 主要client->server service base.IRPCXService // rpcx服务接口 主要client->server
wsService *WSServiceComp // websocket服务 监听websocket连接 wsService *WSServiceComp // websocket服务 监听websocket连接
agentMgr *AgentMgrComp // 客户端websocket连接管理 agentMgr *AgentMgrComp // 客户端websocket连接管理
queueComp *QueueComp // 等待队列组件
configure *configureComp configure *configureComp
} }
@ -90,6 +92,7 @@ func (this *Gateway) Start() (err error) {
func (this *Gateway) OnInstallComp() { func (this *Gateway) OnInstallComp() {
this.ModuleBase.OnInstallComp() this.ModuleBase.OnInstallComp()
this.agentMgr = this.RegisterComp(new(AgentMgrComp)).(*AgentMgrComp) this.agentMgr = this.RegisterComp(new(AgentMgrComp)).(*AgentMgrComp)
this.queueComp = this.RegisterComp(new(QueueComp)).(*QueueComp)
this.wsService = this.RegisterComp(new(WSServiceComp)).(*WSServiceComp) this.wsService = this.RegisterComp(new(WSServiceComp)).(*WSServiceComp)
this.configure = this.RegisterComp(new(configureComp)).(*configureComp) this.configure = this.RegisterComp(new(configureComp)).(*configureComp)
} }
@ -111,6 +114,11 @@ func (this *Gateway) GetMsgDistribute(msgmid, msguid string) (rule string, ok bo
return this.configure.GetMsgDistribute(msgmid, msguid) return this.configure.GetMsgDistribute(msgmid, msguid)
} }
// 加入登录等待队列
func (this *Gateway) InLoginQueue(sessionId string, login *pb.UserMessage) (index int32, err error) {
return this.queueComp.inLoginQueue(sessionId, login)
}
//日志 //日志
func (this *Gateway) Enabled(lvl log.Loglevel) bool { func (this *Gateway) Enabled(lvl log.Loglevel) bool {
return this.options.GetLog().Enabled(lvl) return this.options.GetLog().Enabled(lvl)

View File

@ -0,0 +1,108 @@
package gateway
import (
"go_dreamfactory/lego/core"
"go_dreamfactory/lego/core/cbase"
"go_dreamfactory/lego/sys/log"
"go_dreamfactory/pb"
"sync"
"time"
"google.golang.org/protobuf/proto"
)
type UserLoginData struct {
sessionId string
Account string //账号
loginreq *pb.UserMessage //登录请求
}
/*
登录排队组件
*/
type QueueComp struct {
cbase.ModuleCompBase
options *Options
module *Gateway // 网关
lock sync.RWMutex
loginQueue []*UserLoginData //登录队列
}
func (this *QueueComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
err = this.ModuleCompBase.Init(service, module, comp, options)
this.options = options.(*Options)
this.module = module.(*Gateway)
this.loginQueue = make([]*UserLoginData, 0, 2000)
return
}
func (this *QueueComp) Start() (err error) {
err = this.ModuleCompBase.Start()
go this.run()
return
}
func (this *QueueComp) inLoginQueue(sessionId string, req *pb.UserMessage) (index int32, err error) {
var (
msg proto.Message
loginreq *pb.UserLoginReq
)
if msg, err = req.Data.UnmarshalNew(); err != nil {
log.Errorf("[Handle Api] UserMessage:user.login Unmarshal err:%v", err)
return
}
loginreq = msg.(*pb.UserLoginReq)
this.lock.Lock()
this.loginQueue = append(this.loginQueue, &UserLoginData{
sessionId: sessionId,
Account: loginreq.Account,
loginreq: &pb.UserMessage{
MsgId: req.MsgId,
MainType: req.MainType,
SubType: req.SubType,
Data: req.Data,
ServicePath: req.ServicePath,
},
})
index = int32(len(this.loginQueue))
this.lock.Unlock()
return
}
func (this *QueueComp) run() {
var (
loginreq *UserLoginData
agent IAgent
err error
num int
sessionIds []string = make([]string, 0)
)
for {
sessionIds = sessionIds[:0]
this.lock.Lock()
num = len(this.loginQueue)
if num > 0 {
loginreq = this.loginQueue[0]
this.loginQueue = this.loginQueue[1:]
for _, v := range this.loginQueue {
sessionIds = append(sessionIds, v.sessionId)
}
}
this.lock.Unlock()
if num == 0 {
time.Sleep(time.Second * 1)
continue
}
agent = this.module.agentMgr.getAgent(loginreq.sessionId)
if agent != nil && agent.UserId() == "" { //未登录
if err = agent.HandleMessage(loginreq.loginreq); err != nil { //登录失败处理
}
} else { //离线处理
}
if len(sessionIds) > 0 {
this.module.agentMgr.QueueChange(sessionIds)
}
}
}

View File

@ -16,14 +16,22 @@ import (
"google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/anypb"
) )
//机器人统计数据
type RobotStatistics struct {
message string
time int64
}
type Robot struct { type Robot struct {
Client Client
robotmgrComp *robotmgrComp
index int32 //编号 index int32 //编号
account, serverId string account, serverId string
curreq string //当前请求 curreq string //当前请求
await chan *MessageResp await chan *MessageResp
modules map[core.M_Modules]IModuleRobot modules map[core.M_Modules]IModuleRobot
pipeline []string //执行流水线 pipeline []string //执行流水线
statistics []*RobotStatistics
} }
func (this *Robot) Account() string { func (this *Robot) Account() string {
@ -43,6 +51,7 @@ func (this *Robot) Init(addr string, client IClient) (err error) {
this.Client.Init(addr, client) this.Client.Init(addr, client)
this.await = make(chan *MessageResp) this.await = make(chan *MessageResp)
this.modules = make(map[core.M_Modules]IModuleRobot) this.modules = make(map[core.M_Modules]IModuleRobot)
this.statistics = make([]*RobotStatistics, 0, 100)
this.modules[comm.ModuleUser] = new(ModuleRobot_User) this.modules[comm.ModuleUser] = new(ModuleRobot_User)
this.modules[comm.ModuleSys] = new(ModuleRobot_Sys) this.modules[comm.ModuleSys] = new(ModuleRobot_Sys)
this.modules[comm.ModuleHero] = new(ModuleRobot_Hero) this.modules[comm.ModuleHero] = new(ModuleRobot_Hero)
@ -121,7 +130,7 @@ func (this *Robot) SendMessage(mtype, stype string, msg proto.Message) (resp pro
messageresp *MessageResp messageresp *MessageResp
err error err error
) )
// stime := time.Now() stime := time.Now()
data, _ := anypb.New(msg) data, _ := anypb.New(msg)
message := &pb.UserMessage{ message := &pb.UserMessage{
MainType: mtype, MainType: mtype,
@ -141,6 +150,10 @@ func (this *Robot) SendMessage(mtype, stype string, msg proto.Message) (resp pro
messageresp = <-this.await //等待回应 messageresp = <-this.await //等待回应
resp = messageresp.resp resp = messageresp.resp
errdata = messageresp.errdata errdata = messageresp.errdata
this.statistics = append(this.statistics, &RobotStatistics{
message: fmt.Sprintf("%s.%s", mtype, stype),
time: time.Since(stime).Milliseconds(),
})
// log.Debug("[机器人 Message]", log.Field{Key: "t", Value: time.Since(stime).Milliseconds()}, log.Field{Key: "Account", Value: this.account}, log.Field{Key: "message", Value: fmt.Sprintf("%s.%s", mtype, stype)}, log.Field{Key: "resp", Value: errdata == nil}) // log.Debug("[机器人 Message]", log.Field{Key: "t", Value: time.Since(stime).Milliseconds()}, log.Field{Key: "Account", Value: this.account}, log.Field{Key: "message", Value: fmt.Sprintf("%s.%s", mtype, stype)}, log.Field{Key: "resp", Value: errdata == nil})
} }
return return
@ -177,7 +190,10 @@ func (this *Robot) SendTaskMessage(task, comdi int32, mtype, stype string, msg p
} else { } else {
log.Error("[机器人 Message]", log.Field{Key: "t", Value: time.Since(stime).Milliseconds()}, log.Field{Key: "Account", Value: this.account}, log.Field{Key: "Task", Value: fmt.Sprintf("[%d-%d]", task, comdi)}, log.Field{Key: "message", Value: fmt.Sprintf("%s.%s", mtype, stype)}, log.Field{Key: "errdata", Value: errdata.String()}) log.Error("[机器人 Message]", log.Field{Key: "t", Value: time.Since(stime).Milliseconds()}, log.Field{Key: "Account", Value: this.account}, log.Field{Key: "Task", Value: fmt.Sprintf("[%d-%d]", task, comdi)}, log.Field{Key: "message", Value: fmt.Sprintf("%s.%s", mtype, stype)}, log.Field{Key: "errdata", Value: errdata.String()})
} }
this.statistics = append(this.statistics, &RobotStatistics{
message: fmt.Sprintf("%s.%s", mtype, stype),
time: time.Since(stime).Milliseconds(),
})
} }
return return
} }
@ -200,6 +216,7 @@ func (this *Robot) Heartbeat() {
func (this *Robot) OnClose() { func (this *Robot) OnClose() {
log.Debug("[机器人 End]", log.Field{Key: "Account", Value: this.account}) log.Debug("[机器人 End]", log.Field{Key: "Account", Value: this.account})
this.robotmgrComp.robotEnd(this)
} }
func (this *Robot) DoTask(taskconf *cfg.GameWorldTaskData, condconf *cfg.GameBuriedCondiData, moduleStr core.M_Modules) (err error) { func (this *Robot) DoTask(taskconf *cfg.GameWorldTaskData, condconf *cfg.GameBuriedCondiData, moduleStr core.M_Modules) (err error) {

View File

@ -5,6 +5,7 @@ import (
"go_dreamfactory/lego/core" "go_dreamfactory/lego/core"
"go_dreamfactory/lego/core/cbase" "go_dreamfactory/lego/core/cbase"
"go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/sys/log"
"sync"
"time" "time"
) )
@ -16,6 +17,8 @@ type robotmgrComp struct {
service core.IService service core.IService
module *RobotModule module *RobotModule
currRobotNum int32 currRobotNum int32
lock sync.RWMutex
statistical map[string][]*RobotStatistics
} }
//组件初始化接口 //组件初始化接口
@ -23,6 +26,7 @@ func (this *robotmgrComp) Init(service core.IService, module core.IModule, comp
this.ModuleCompBase.Init(service, module, comp, options) this.ModuleCompBase.Init(service, module, comp, options)
this.module = module.(*RobotModule) this.module = module.(*RobotModule)
this.service = service this.service = service
this.statistical = make(map[string][]*RobotStatistics)
return return
} }
@ -57,10 +61,11 @@ func (this *robotmgrComp) createRobot(index int32) {
err error err error
) )
robot = &Robot{ robot = &Robot{
index: index, robotmgrComp: this,
account: fmt.Sprintf("it_%d", index), index: index,
serverId: this.module.options.ServerID, account: fmt.Sprintf("it_%d", index),
pipeline: this.module.options.Pipeline, serverId: this.module.options.ServerID,
pipeline: this.module.options.Pipeline,
} }
if err = robot.Init(this.module.options.ServerAddr, robot); err != nil { if err = robot.Init(this.module.options.ServerAddr, robot); err != nil {
log.Errorln(err) log.Errorln(err)
@ -68,3 +73,9 @@ func (this *robotmgrComp) createRobot(index int32) {
} }
return return
} }
func (this *robotmgrComp) robotEnd(robot *Robot) {
this.lock.Lock()
this.statistical[robot.account] = robot.statistics
this.lock.Unlock()
}

View File

@ -0,0 +1,39 @@
package robot
import (
"go_dreamfactory/lego/core"
"go_dreamfactory/lego/core/cbase"
)
/*
统计组件
*/
type statisticalComp struct {
cbase.ModuleCompBase
succclientNum int32 //链接成功客户端数
failclientNum int32 //链接失败客户端数
totalmessage int32 //总消息两
robotdata map[string][]*RobotStatistics //机器人统计数据
}
//组件初始化接口
func (this *statisticalComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
this.ModuleCompBase.Init(service, module, comp, options)
this.robotdata = make(map[string][]*RobotStatistics)
return
}
//添加成功客户端数
func (this *statisticalComp) AddSuccClient(robot *Robot) {
this.succclientNum++
}
//添加失败客户端数
func (this *statisticalComp) AddFailClient(robot *Robot, err error) {
this.succclientNum++
}
//机器人测试结束
func (this *statisticalComp) RobotFinishedTest(robot *Robot) {
}

File diff suppressed because it is too large Load Diff