From 9bfdbfa20ca82da4eba10a9daeb9c7476844a8e1 Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Mon, 11 Sep 2023 16:18:47 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E5=BA=95=E5=B1=82=E5=B9=BF?= =?UTF-8?q?=E6=92=AD=E6=B6=88=E6=81=AF=E5=A5=94=E6=BA=83=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bin/robot002.txt | 26 ++++++++++++ lego/sys/rpcx/service.go | 17 ++++---- modules/robot/modulerobot_gm.go | 5 ++- modules/robot/modulerobot_user.go | 26 +++++++----- modules/robot/robot.go | 70 ++++++++++++++++++++++++++----- modules/robot/robotmgrcomp.go | 19 +++++---- modules/user/api_login.go | 5 ++- services/cmd/main.go | 3 +- 8 files changed, 131 insertions(+), 40 deletions(-) create mode 100644 bin/robot002.txt diff --git a/bin/robot002.txt b/bin/robot002.txt new file mode 100644 index 000000000..d516b472a --- /dev/null +++ b/bin/robot002.txt @@ -0,0 +1,26 @@ +机器人总数: 1000 +成功数量: 1000 +失败数量: 0 +消息总吞吐量: 24187 +---消息压测详情---------------------------------------------------------------------------------------------------- +消息名:gm.cmd 请求次数:13706 耗时最小:0 ms 耗时最大:57866ms 平均耗时:160.28ms 中位耗时:18.00ms +消息名:equipment.upgrade 请求次数:142 耗时最小:15 ms 耗时最大:16214ms 平均耗时:455.92ms 中位耗时:20.00ms +消息名:wtask.accept 请求次数:1022 耗时最小:14 ms 耗时最大:47111ms 平均耗时:77.09ms 中位耗时:18.00ms +消息名:hero.list 请求次数:979 耗时最小:14 ms 耗时最大:56824ms 平均耗时:441.82ms 中位耗时:19.00ms +消息名:equipment.getlist 请求次数:979 耗时最小:12 ms 耗时最大:57799ms 平均耗时:294.41ms 中位耗时:18.00ms +消息名:items.getlist 请求次数:979 耗时最小:13 ms 耗时最大:59418ms 平均耗时:290.49ms 中位耗时:18.00ms +消息名:wtask.info 请求次数:979 耗时最小:14 ms 耗时最大:13520ms 平均耗时:125.14ms 中位耗时:19.00ms +消息名:pagoda.getlist 请求次数:143 耗时最小:14 ms 耗时最大:56245ms 平均耗时:3448.13ms 中位耗时:20.00ms +消息名:arena.matche 请求次数:1 耗时最小:18 ms 耗时最大:18 ms 平均耗时:18.00ms 中位耗时:18.00ms +消息名:user.create 请求次数:1000 耗时最小:14 ms 耗时最大:13918ms 平均耗时:117.23ms 中位耗时:23.00ms +消息名:shop.getlist 请求次数:143 耗时最小:14 ms 耗时最大:56492ms 平均耗时:4076.59ms 中位耗时:26.00ms +消息名:hero.drawcard 请求次数:142 耗时最小:52 ms 耗时最大:60895ms 平均耗时:3408.51ms 中位耗时:572.00ms +消息名:wtask.completecondi 请求次数:284 耗时最小:14 ms 耗时最大:3142 ms 平均耗时:31.56ms 中位耗时:19.00ms +消息名:wtask.finish 请求次数:711 耗时最小:15 ms 耗时最大:1667 ms 平均耗时:39.04ms 中位耗时:20.00ms +消息名:wtask.battlestart 请求次数:284 耗时最小:14 ms 耗时最大:1678 ms 平均耗时:112.29ms 中位耗时:19.00ms +消息名:wtask.battlefinish 请求次数:284 耗时最小:22 ms 耗时最大:3460 ms 平均耗时:127.08ms 中位耗时:43.00ms +消息名:user.login 请求次数:1000 耗时最小:18 ms 耗时最大:1645 ms 平均耗时:520.85ms 中位耗时:516.50ms +消息名:hero.talentlist 请求次数:979 耗时最小:13 ms 耗时最大:56305ms 平均耗时:391.68ms 中位耗时:18.00ms +消息名:arena.info 请求次数:146 耗时最小:13 ms 耗时最大:25784ms 平均耗时:695.86ms 中位耗时:19.00ms +消息名:horoscope.info 请求次数:142 耗时最小:13 ms 耗时最大:59744ms 平均耗时:2767.27ms 中位耗时:19.00ms +消息名:sys.funcgetlist 请求次数:142 耗时最小:14 ms 耗时最大:59843ms 平均耗时:3031.15ms 中位耗时:74.00ms diff --git a/lego/sys/rpcx/service.go b/lego/sys/rpcx/service.go index dfeacc74f..71a8d78c7 100644 --- a/lego/sys/rpcx/service.go +++ b/lego/sys/rpcx/service.go @@ -500,7 +500,9 @@ func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, s if l > 0 { done := make(chan error, l) for v, _ := range addrs { - go func(addr string) { + seq := new(uint64) + tempctx := context.WithValue(ctx, seqKey{}, seq) + go func(_ctx context.Context, addr string) { this.clientmutex.RLock() conn, ok := this.clients[addr] if !ok { @@ -515,22 +517,23 @@ func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, s _call.Args = args _call.Reply = reply _call.Done = make(chan *client.Call, 10) - this.send(ctx, conn, spath[0], serviceMethod, metadata, _call) + + this.send(_ctx, conn, spath[0], serviceMethod, metadata, _call) seq, _ := ctx.Value(seqKey{}).(*uint64) select { - case <-ctx.Done(): // cancel by context + case <-_ctx.Done(): // cancel by context this.mutex.Lock() call := this.pending[*seq] delete(this.pending, *seq) this.mutex.Unlock() if call != nil { - call.Error = ctx.Err() + call.Error = _ctx.Err() call.Done <- call } - done <- ctx.Err() + done <- _ctx.Err() case call := <-_call.Done: err = call.Error - meta := ctx.Value(share.ResMetaDataKey) + meta := _ctx.Value(share.ResMetaDataKey) if meta != nil && len(call.ResMetadata) > 0 { resMeta := meta.(map[string]string) for k, v := range call.ResMetadata { @@ -540,7 +543,7 @@ func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, s } done <- nil } - }(v) + }(tempctx, v) } timeout := time.NewTimer(time.Minute) check: diff --git a/modules/robot/modulerobot_gm.go b/modules/robot/modulerobot_gm.go index 968fa5a91..066d78a29 100644 --- a/modules/robot/modulerobot_gm.go +++ b/modules/robot/modulerobot_gm.go @@ -35,7 +35,10 @@ func (this *ModuleRobot_GM) OncePipeline(robot IRobot) (err error) { err = errors.New(fmt.Sprintf("code:%d message:%s", errdata.Code, errdata.Message)) return } - + if _, errdata = robot.SendMessage("gm", "cmd", &pb.GMCmdReq{Cmod: "bingo:attr,starcoin,1000000"}); errdata != nil { + err = errors.New(fmt.Sprintf("code:%d message:%s", errdata.Code, errdata.Message)) + return + } //英雄 if _, errdata = robot.SendMessage("gm", "cmd", &pb.GMCmdReq{Cmod: "bingo:hero,13001,1"}); errdata != nil { err = errors.New(fmt.Sprintf("code:%d message:%s", errdata.Code, errdata.Message)) diff --git a/modules/robot/modulerobot_user.go b/modules/robot/modulerobot_user.go index 1ede310d5..063785208 100644 --- a/modules/robot/modulerobot_user.go +++ b/modules/robot/modulerobot_user.go @@ -45,20 +45,24 @@ func (this *ModuleRobot_User) OncePipeline(robot IRobot) (err error) { err = errors.New(fmt.Sprintf("code:%d message:%s", errdata.Code, errdata.Message)) return } - //创角 - if _, errdata = robot.SendMessage("user", "create", &pb.UserCreateReq{ - NickName: robot.Account(), - Figure: 100, - Gender: 1, - }); errdata != nil { - if errdata.Code == pb.ErrorCode_RoleCreated { //已创角 - err = nil - } else { - err = errors.New(fmt.Sprintf("code:%d message:%s", errdata.Code, errdata.Message)) + + if !this.user.Created { + //创角 + if _, errdata = robot.SendMessage("user", "create", &pb.UserCreateReq{ + NickName: robot.Account(), + Figure: 100, + Gender: 1, + }); errdata != nil { + if errdata.Code == pb.ErrorCode_RoleCreated { //已创角 + err = nil + } else { + err = errors.New(fmt.Sprintf("code:%d message:%s", errdata.Code, errdata.Message)) + } + return } - return } + return } diff --git a/modules/robot/robot.go b/modules/robot/robot.go index bca2116be..30445c332 100644 --- a/modules/robot/robot.go +++ b/modules/robot/robot.go @@ -9,6 +9,7 @@ import ( cfg "go_dreamfactory/sys/configure/structs" "go_dreamfactory/utils" "math/rand" + "sync/atomic" "time" jsoniter "github.com/json-iterator/go" @@ -36,6 +37,7 @@ type Robot struct { pipeline []*Pipeline //执行流水线 statistics []*RobotStatistics wtaskerror error + awaitState int32 //状态 1没有等待 2等待回应 3关闭 } func (this *Robot) Account() string { @@ -52,7 +54,10 @@ func (this *Robot) GetModule(module core.M_Modules) IModuleRobot { //初始化 func (this *Robot) Init(addr string, client IClient) (err error) { - this.Client.Init(addr, client) + if err = this.Client.Init(addr, client); err != nil { + return + } + this.awaitState = 1 this.await = make(chan *MessageResp) this.modules = make(map[core.M_Modules]IModuleRobot) this.statistics = make([]*RobotStatistics, 0, 100) @@ -101,9 +106,11 @@ func (this *Robot) Receive(msg *pb.UserMessage) (err error) { resp := message.(*pb.NotifyErrorNotifyPush) reqpath := fmt.Sprintf("%s.%s", resp.ReqMainType, resp.ReqSubType) if reqpath == this.curreq { //收到回应 - this.await <- &MessageResp{ - resp: nil, - errdata: resp.Err, + if atomic.CompareAndSwapInt32(&this.awaitState, 2, 1) { //状态说判断 + this.await <- &MessageResp{ + resp: nil, + errdata: resp.Err, + } } } return @@ -116,9 +123,11 @@ func (this *Robot) Receive(msg *pb.UserMessage) (err error) { } if msgpath == this.curreq { //收到回应 - this.await <- &MessageResp{ - resp: message, - errdata: nil, + if atomic.CompareAndSwapInt32(&this.awaitState, 2, 1) { //状态说判断 + this.await <- &MessageResp{ + resp: message, + errdata: nil, + } } } return @@ -153,6 +162,14 @@ func (this *Robot) SendMessage(mtype, stype string, msg proto.Message) (resp pro } return } + if !atomic.CompareAndSwapInt32(&this.awaitState, 1, 2) { //状态说判断 + errdata = &pb.ErrorData{ + Code: pb.ErrorCode_ClientError, + Title: pb.ErrorCode_ClientError.String(), + Message: err.Error(), + } + return + } messageresp = <-this.await //等待回应 resp = messageresp.resp errdata = messageresp.errdata @@ -204,6 +221,14 @@ func (this *Robot) SendTaskMessage(task, comdi int32, mtype, stype string, msg p } if mtype != "gateway" { this.curreq = fmt.Sprintf("%s.%s", mtype, stype) + if !atomic.CompareAndSwapInt32(&this.awaitState, 1, 2) { //状态说判断 + errdata = &pb.ErrorData{ + Code: pb.ErrorCode_ClientError, + Title: pb.ErrorCode_ClientError.String(), + Message: err.Error(), + } + return + } messageresp = <-this.await //等待回应 resp = messageresp.resp errdata = messageresp.errdata @@ -241,6 +266,18 @@ func (this *Robot) Heartbeat() { func (this *Robot) OnClose() { log.Debug("[机器人 End]", log.Field{Key: "Account", Value: this.account}) + if atomic.CompareAndSwapInt32(&this.awaitState, 2, 3) { //等待中 + this.await <- &MessageResp{ + resp: nil, + errdata: &pb.ErrorData{ + Code: pb.ErrorCode_ClientError, + Message: "client link closed", + }, + } + } else { + atomic.StoreInt32(&this.awaitState, 3) + } + this.monitor.RobotFinishedTest(this) } @@ -268,9 +305,10 @@ func (this *Robot) run() { module = this.modules[core.M_Modules(v.Module)] if err = module.OncePipeline(this); err != nil { log.Error("[机器人 初始执行错误]", log.Field{Key: "Account", Value: this.account}, log.Field{Key: "module", Value: v}, log.Field{Key: "err", Value: err.Error()}) - break + this.Close() + return } - time.Sleep(time.Millisecond * time.Duration(2000+rand.Int31n(2000))) + time.Sleep(time.Millisecond * time.Duration(100+rand.Int31n(1000))) } for this.cycle { @@ -279,12 +317,22 @@ func (this *Robot) run() { if v.CurrNum < v.Exenum { module = this.modules[core.M_Modules(v.Module)] if err = module.DoPipeline(this); err != nil { + if this.debug { log.Error("[机器人 执行异常]", log.Field{Key: "Account", Value: this.account}, log.Field{Key: "module", Value: v}, log.Field{Key: "err", Value: err.Error()}) } + + if atomic.LoadInt32(&this.awaitState) == 3 { //已经关闭 直接退出 + if this.debug { + log.Infof("[机器人 执行异常] 链接已经关闭", log.Field{Key: "Account", Value: this.account}, log.Field{Key: "module", Value: v}) + } + return + } + if !v.ErrNotStop { log.Debug("[机器人 退出循环]") - break + this.Close() + return } } v.CurrNum++ @@ -292,7 +340,7 @@ func (this *Robot) run() { this.cycle = true } } - time.Sleep(time.Millisecond * time.Duration(2000+rand.Int31n(2000))) + time.Sleep(time.Millisecond * time.Duration(100+rand.Int31n(1000))) } } this.Close() diff --git a/modules/robot/robotmgrcomp.go b/modules/robot/robotmgrcomp.go index 1a8d3ffaf..190c47f1f 100644 --- a/modules/robot/robotmgrcomp.go +++ b/modules/robot/robotmgrcomp.go @@ -42,22 +42,27 @@ locp: for { select { case <-timer.C: - if this.currRobotNum < this.module.options.RobotTotalNum { - for i := 0; i < int(this.module.options.RobotSingleNum); i++ { - this.createRobot(this.currRobotNum) - this.currRobotNum++ + rnum := 0 + for this.currRobotNum < this.module.options.RobotTotalNum { + this.currRobotNum++ + if err := this.createRobot(this.currRobotNum); err == nil { + rnum++ + if rnum >= int(this.module.options.RobotSingleNum) { + break + } + continue } - } else { + } + if this.currRobotNum >= this.module.options.RobotTotalNum { break locp } } } } -func (this *robotmgrComp) createRobot(index int32) { +func (this *robotmgrComp) createRobot(index int32) (err error) { var ( robot *Robot - err error ) robot = &Robot{ debug: this.module.options.RobotLog, diff --git a/modules/user/api_login.go b/modules/user/api_login.go index 9af69f361..f8b7e8282 100644 --- a/modules/user/api_login.go +++ b/modules/user/api_login.go @@ -94,8 +94,9 @@ func (this *apiComp) Login(session comm.IUserSession, req *pb.UserLoginReq) (err if err != nil { this.module.Errorf("set user session err:%v", err) errdata = &pb.ErrorData{ - Code: pb.ErrorCode_UserSessionNobeing, - Title: pb.ErrorCode_UserSessionNobeing.ToString(), + Code: pb.ErrorCode_UserSessionNobeing, + Title: pb.ErrorCode_UserSessionNobeing.ToString(), + Message: err.Error(), } return } diff --git a/services/cmd/main.go b/services/cmd/main.go index ef8d6d959..a3a774d31 100644 --- a/services/cmd/main.go +++ b/services/cmd/main.go @@ -302,7 +302,8 @@ func convertServiceSttings(config *comm.GameConfig, id int, stype string, ip str "ListenPort": lport, } break - case comm.Service_Worker: //业务服务 + //业务服务 + case comm.Service_Worker: sseting.Id = fmt.Sprintf("%s_%s%d", config.AreaId, comm.Service_Worker, id) sseting.Type = comm.Service_Worker sseting.Sys["rpcx"]["RpcxStartType"] = 2