上传底层广播消息奔溃处理

This commit is contained in:
liwei1dao 2023-09-11 16:18:47 +08:00
parent 5b41afb95c
commit 9bfdbfa20c
8 changed files with 131 additions and 40 deletions

26
bin/robot002.txt Normal file
View File

@ -0,0 +1,26 @@
机器人总数: 1000
成功数量: 1000
失败数量: 0
消息总吞吐量: 24187
---消息压测详情----------------------------------------------------------------------------------------------------
消息名:gm.cmd 请求次数:13706 耗时最小:0 ms 耗时最大:57866ms 平均耗时:160.28ms 中位耗时:18.00ms
消息名:equipment.upgrade 请求次数:142 耗时最小:15 ms 耗时最大:16214ms 平均耗时:455.92ms 中位耗时:20.00ms
消息名:wtask.accept 请求次数:1022 耗时最小:14 ms 耗时最大:47111ms 平均耗时:77.09ms 中位耗时:18.00ms
消息名:hero.list 请求次数:979 耗时最小:14 ms 耗时最大:56824ms 平均耗时:441.82ms 中位耗时:19.00ms
消息名:equipment.getlist 请求次数:979 耗时最小:12 ms 耗时最大:57799ms 平均耗时:294.41ms 中位耗时:18.00ms
消息名:items.getlist 请求次数:979 耗时最小:13 ms 耗时最大:59418ms 平均耗时:290.49ms 中位耗时:18.00ms
消息名:wtask.info 请求次数:979 耗时最小:14 ms 耗时最大:13520ms 平均耗时:125.14ms 中位耗时:19.00ms
消息名:pagoda.getlist 请求次数:143 耗时最小:14 ms 耗时最大:56245ms 平均耗时:3448.13ms 中位耗时:20.00ms
消息名:arena.matche 请求次数:1 耗时最小:18 ms 耗时最大:18 ms 平均耗时:18.00ms 中位耗时:18.00ms
消息名:user.create 请求次数:1000 耗时最小:14 ms 耗时最大:13918ms 平均耗时:117.23ms 中位耗时:23.00ms
消息名:shop.getlist 请求次数:143 耗时最小:14 ms 耗时最大:56492ms 平均耗时:4076.59ms 中位耗时:26.00ms
消息名:hero.drawcard 请求次数:142 耗时最小:52 ms 耗时最大:60895ms 平均耗时:3408.51ms 中位耗时:572.00ms
消息名:wtask.completecondi 请求次数:284 耗时最小:14 ms 耗时最大:3142 ms 平均耗时:31.56ms 中位耗时:19.00ms
消息名:wtask.finish 请求次数:711 耗时最小:15 ms 耗时最大:1667 ms 平均耗时:39.04ms 中位耗时:20.00ms
消息名:wtask.battlestart 请求次数:284 耗时最小:14 ms 耗时最大:1678 ms 平均耗时:112.29ms 中位耗时:19.00ms
消息名:wtask.battlefinish 请求次数:284 耗时最小:22 ms 耗时最大:3460 ms 平均耗时:127.08ms 中位耗时:43.00ms
消息名:user.login 请求次数:1000 耗时最小:18 ms 耗时最大:1645 ms 平均耗时:520.85ms 中位耗时:516.50ms
消息名:hero.talentlist 请求次数:979 耗时最小:13 ms 耗时最大:56305ms 平均耗时:391.68ms 中位耗时:18.00ms
消息名:arena.info 请求次数:146 耗时最小:13 ms 耗时最大:25784ms 平均耗时:695.86ms 中位耗时:19.00ms
消息名:horoscope.info 请求次数:142 耗时最小:13 ms 耗时最大:59744ms 平均耗时:2767.27ms 中位耗时:19.00ms
消息名:sys.funcgetlist 请求次数:142 耗时最小:14 ms 耗时最大:59843ms 平均耗时:3031.15ms 中位耗时:74.00ms

View File

@ -500,7 +500,9 @@ func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, s
if l > 0 {
done := make(chan error, l)
for v, _ := range addrs {
go func(addr string) {
seq := new(uint64)
tempctx := context.WithValue(ctx, seqKey{}, seq)
go func(_ctx context.Context, addr string) {
this.clientmutex.RLock()
conn, ok := this.clients[addr]
if !ok {
@ -515,22 +517,23 @@ func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, s
_call.Args = args
_call.Reply = reply
_call.Done = make(chan *client.Call, 10)
this.send(ctx, conn, spath[0], serviceMethod, metadata, _call)
this.send(_ctx, conn, spath[0], serviceMethod, metadata, _call)
seq, _ := ctx.Value(seqKey{}).(*uint64)
select {
case <-ctx.Done(): // cancel by context
case <-_ctx.Done(): // cancel by context
this.mutex.Lock()
call := this.pending[*seq]
delete(this.pending, *seq)
this.mutex.Unlock()
if call != nil {
call.Error = ctx.Err()
call.Error = _ctx.Err()
call.Done <- call
}
done <- ctx.Err()
done <- _ctx.Err()
case call := <-_call.Done:
err = call.Error
meta := ctx.Value(share.ResMetaDataKey)
meta := _ctx.Value(share.ResMetaDataKey)
if meta != nil && len(call.ResMetadata) > 0 {
resMeta := meta.(map[string]string)
for k, v := range call.ResMetadata {
@ -540,7 +543,7 @@ func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, s
}
done <- nil
}
}(v)
}(tempctx, v)
}
timeout := time.NewTimer(time.Minute)
check:

View File

@ -35,7 +35,10 @@ func (this *ModuleRobot_GM) OncePipeline(robot IRobot) (err error) {
err = errors.New(fmt.Sprintf("code:%d message:%s", errdata.Code, errdata.Message))
return
}
if _, errdata = robot.SendMessage("gm", "cmd", &pb.GMCmdReq{Cmod: "bingo:attr,starcoin,1000000"}); errdata != nil {
err = errors.New(fmt.Sprintf("code:%d message:%s", errdata.Code, errdata.Message))
return
}
//英雄
if _, errdata = robot.SendMessage("gm", "cmd", &pb.GMCmdReq{Cmod: "bingo:hero,13001,1"}); errdata != nil {
err = errors.New(fmt.Sprintf("code:%d message:%s", errdata.Code, errdata.Message))

View File

@ -45,6 +45,8 @@ func (this *ModuleRobot_User) OncePipeline(robot IRobot) (err error) {
err = errors.New(fmt.Sprintf("code:%d message:%s", errdata.Code, errdata.Message))
return
}
if !this.user.Created {
//创角
if _, errdata = robot.SendMessage("user", "create", &pb.UserCreateReq{
NickName: robot.Account(),
@ -56,9 +58,11 @@ func (this *ModuleRobot_User) OncePipeline(robot IRobot) (err error) {
} else {
err = errors.New(fmt.Sprintf("code:%d message:%s", errdata.Code, errdata.Message))
}
return
}
}
return
}

View File

@ -9,6 +9,7 @@ import (
cfg "go_dreamfactory/sys/configure/structs"
"go_dreamfactory/utils"
"math/rand"
"sync/atomic"
"time"
jsoniter "github.com/json-iterator/go"
@ -36,6 +37,7 @@ type Robot struct {
pipeline []*Pipeline //执行流水线
statistics []*RobotStatistics
wtaskerror error
awaitState int32 //状态 1没有等待 2等待回应 3关闭
}
func (this *Robot) Account() string {
@ -52,7 +54,10 @@ func (this *Robot) GetModule(module core.M_Modules) IModuleRobot {
//初始化
func (this *Robot) Init(addr string, client IClient) (err error) {
this.Client.Init(addr, client)
if err = this.Client.Init(addr, client); err != nil {
return
}
this.awaitState = 1
this.await = make(chan *MessageResp)
this.modules = make(map[core.M_Modules]IModuleRobot)
this.statistics = make([]*RobotStatistics, 0, 100)
@ -101,11 +106,13 @@ func (this *Robot) Receive(msg *pb.UserMessage) (err error) {
resp := message.(*pb.NotifyErrorNotifyPush)
reqpath := fmt.Sprintf("%s.%s", resp.ReqMainType, resp.ReqSubType)
if reqpath == this.curreq { //收到回应
if atomic.CompareAndSwapInt32(&this.awaitState, 2, 1) { //状态说判断
this.await <- &MessageResp{
resp: nil,
errdata: resp.Err,
}
}
}
return
}
if module, ok = this.modules[core.M_Modules(msg.MainType)]; ok {
@ -116,11 +123,13 @@ func (this *Robot) Receive(msg *pb.UserMessage) (err error) {
}
if msgpath == this.curreq { //收到回应
if atomic.CompareAndSwapInt32(&this.awaitState, 2, 1) { //状态说判断
this.await <- &MessageResp{
resp: message,
errdata: nil,
}
}
}
return
}
@ -153,6 +162,14 @@ func (this *Robot) SendMessage(mtype, stype string, msg proto.Message) (resp pro
}
return
}
if !atomic.CompareAndSwapInt32(&this.awaitState, 1, 2) { //状态说判断
errdata = &pb.ErrorData{
Code: pb.ErrorCode_ClientError,
Title: pb.ErrorCode_ClientError.String(),
Message: err.Error(),
}
return
}
messageresp = <-this.await //等待回应
resp = messageresp.resp
errdata = messageresp.errdata
@ -204,6 +221,14 @@ func (this *Robot) SendTaskMessage(task, comdi int32, mtype, stype string, msg p
}
if mtype != "gateway" {
this.curreq = fmt.Sprintf("%s.%s", mtype, stype)
if !atomic.CompareAndSwapInt32(&this.awaitState, 1, 2) { //状态说判断
errdata = &pb.ErrorData{
Code: pb.ErrorCode_ClientError,
Title: pb.ErrorCode_ClientError.String(),
Message: err.Error(),
}
return
}
messageresp = <-this.await //等待回应
resp = messageresp.resp
errdata = messageresp.errdata
@ -241,6 +266,18 @@ func (this *Robot) Heartbeat() {
func (this *Robot) OnClose() {
log.Debug("[机器人 End]", log.Field{Key: "Account", Value: this.account})
if atomic.CompareAndSwapInt32(&this.awaitState, 2, 3) { //等待中
this.await <- &MessageResp{
resp: nil,
errdata: &pb.ErrorData{
Code: pb.ErrorCode_ClientError,
Message: "client link closed",
},
}
} else {
atomic.StoreInt32(&this.awaitState, 3)
}
this.monitor.RobotFinishedTest(this)
}
@ -268,9 +305,10 @@ func (this *Robot) run() {
module = this.modules[core.M_Modules(v.Module)]
if err = module.OncePipeline(this); err != nil {
log.Error("[机器人 初始执行错误]", log.Field{Key: "Account", Value: this.account}, log.Field{Key: "module", Value: v}, log.Field{Key: "err", Value: err.Error()})
break
this.Close()
return
}
time.Sleep(time.Millisecond * time.Duration(2000+rand.Int31n(2000)))
time.Sleep(time.Millisecond * time.Duration(100+rand.Int31n(1000)))
}
for this.cycle {
@ -279,12 +317,22 @@ func (this *Robot) run() {
if v.CurrNum < v.Exenum {
module = this.modules[core.M_Modules(v.Module)]
if err = module.DoPipeline(this); err != nil {
if this.debug {
log.Error("[机器人 执行异常]", log.Field{Key: "Account", Value: this.account}, log.Field{Key: "module", Value: v}, log.Field{Key: "err", Value: err.Error()})
}
if atomic.LoadInt32(&this.awaitState) == 3 { //已经关闭 直接退出
if this.debug {
log.Infof("[机器人 执行异常] 链接已经关闭", log.Field{Key: "Account", Value: this.account}, log.Field{Key: "module", Value: v})
}
return
}
if !v.ErrNotStop {
log.Debug("[机器人 退出循环]")
break
this.Close()
return
}
}
v.CurrNum++
@ -292,7 +340,7 @@ func (this *Robot) run() {
this.cycle = true
}
}
time.Sleep(time.Millisecond * time.Duration(2000+rand.Int31n(2000)))
time.Sleep(time.Millisecond * time.Duration(100+rand.Int31n(1000)))
}
}
this.Close()

View File

@ -42,22 +42,27 @@ locp:
for {
select {
case <-timer.C:
if this.currRobotNum < this.module.options.RobotTotalNum {
for i := 0; i < int(this.module.options.RobotSingleNum); i++ {
this.createRobot(this.currRobotNum)
rnum := 0
for this.currRobotNum < this.module.options.RobotTotalNum {
this.currRobotNum++
if err := this.createRobot(this.currRobotNum); err == nil {
rnum++
if rnum >= int(this.module.options.RobotSingleNum) {
break
}
} else {
continue
}
}
if this.currRobotNum >= this.module.options.RobotTotalNum {
break locp
}
}
}
}
func (this *robotmgrComp) createRobot(index int32) {
func (this *robotmgrComp) createRobot(index int32) (err error) {
var (
robot *Robot
err error
)
robot = &Robot{
debug: this.module.options.RobotLog,

View File

@ -96,6 +96,7 @@ func (this *apiComp) Login(session comm.IUserSession, req *pb.UserLoginReq) (err
errdata = &pb.ErrorData{
Code: pb.ErrorCode_UserSessionNobeing,
Title: pb.ErrorCode_UserSessionNobeing.ToString(),
Message: err.Error(),
}
return
}

View File

@ -302,7 +302,8 @@ func convertServiceSttings(config *comm.GameConfig, id int, stype string, ip str
"ListenPort": lport,
}
break
case comm.Service_Worker: //业务服务
//业务服务
case comm.Service_Worker:
sseting.Id = fmt.Sprintf("%s_%s%d", config.AreaId, comm.Service_Worker, id)
sseting.Type = comm.Service_Worker
sseting.Sys["rpcx"]["RpcxStartType"] = 2