From 497282c53bf612e43d3a64c6fa46f9b88734d0df Mon Sep 17 00:00:00 2001 From: zhaocy Date: Tue, 13 Dec 2022 08:13:44 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=8A=A5=E5=91=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/ai.go | 35 +++++++------ lib/base.go | 19 ++++--- lib/param.go | 20 +++++-- lib/robot.go | 126 ++++++++++++++++++++++++++++++++++++++++----- storage/config.go | 11 ++-- test/robot_test.go | 4 +- ui/mainwindow.go | 12 ++--- 7 files changed, 173 insertions(+), 54 deletions(-) diff --git a/lib/ai.go b/lib/ai.go index ea5c7ef..b36815d 100644 --- a/lib/ai.go +++ b/lib/ai.go @@ -5,33 +5,31 @@ import ( "fmt" "sync" "sync/atomic" + "time" "github.com/sirupsen/logrus" "legu.airobot/storage" ) type myAI struct { - robots []*Robot - scenes []*scene - iscenes []IScene - tickets Tickets //票池 - countTotal uint32 //机器人总数 - useCount uint32 //使用数量 - lock sync.Mutex // - config *storage.Config //配置 + robots []*Robot + scenes []*scene + iscenes []IScene + tickets Tickets //票池 + useCount uint32 //计数(压入的用户数) + lock sync.Mutex // + config *storage.Config //配置 } func NewAI(aip AIParam) (*myAI, error) { - logrus.Debug("创建AI") if err := aip.Check(); err != nil { return nil, err } ai := &myAI{ - scenes: make([]*scene, 0), - countTotal: aip.RobotCount, - config: aip.Config, - iscenes: aip.Scenes, + scenes: make([]*scene, 0), + config: aip.Config, + iscenes: aip.Scenes, } if err := ai.init(); err != nil { @@ -45,14 +43,15 @@ func (m *myAI) init() error { var buf bytes.Buffer buf.WriteString("初始化AI") - tickets, err := NewTickets(m.countTotal) + uct := m.config.Global.UserCountTotal + tickets, err := NewTickets(uct) if err != nil { return err } m.tickets = tickets - buf.WriteString(fmt.Sprintf("完成 机器人数量:%d", m.countTotal)) + buf.WriteString(fmt.Sprintf("完成 用户数量:%d", uct)) logrus.Debug(buf.String()) return nil } @@ -72,8 +71,12 @@ func (m *myAI) Start() bool { go func() { for { m.tickets.Take() - atomic.AddUint32(&m.useCount, 1) + if m.useCount >= uint32(m.config.Global.UserCount) { + atomic.StoreUint32(&m.useCount, 0) + time.Sleep(time.Duration(m.config.Global.IntervalS) * time.Second) + } go func() { + atomic.AddUint32(&m.useCount, 1) robot := NewRobot(m.config) robot.SetScenes(m.iscenes) m.appendRobot(robot) diff --git a/lib/base.go b/lib/base.go index e8e8215..2dd6eb5 100644 --- a/lib/base.go +++ b/lib/base.go @@ -26,12 +26,19 @@ type SceneInfo struct { } type CallResult struct { - ID int64 // ID。 - Req RawReq // 原生请求。 - Resp RawResp // 原生响应。 - Code RetCode // 响应代码。 - Msg string // 结果成因的简述。 - Elapse time.Duration // 耗时。 + ID int64 // ID。 + MainType string + SubType string + Elapse time.Duration // 耗时。 +} + +type Statistics struct { + ElapseTotal time.Duration //总耗时 + MaxElapse time.Duration //最大耗时 + MinElapse time.Duration //最小耗时 + AvgElapse time.Duration //平均耗时 + CallCount int64 //调用次数 + Route string //协议名称 } const ( diff --git a/lib/param.go b/lib/param.go index 81b38c2..f8f6001 100644 --- a/lib/param.go +++ b/lib/param.go @@ -13,8 +13,6 @@ import ( type AIParam struct { Config *storage.Config Scenes []IScene - // 机器人数量 - RobotCount uint32 } type SceneParam struct { Name string @@ -23,21 +21,33 @@ type SceneParam struct { func (a *AIParam) Check() error { var errMsgs []string - if a.RobotCount == 0 { + if a.Config.Global.UserCountTotal == 0 { errMsgs = append(errMsgs, "机器人数量至少1个") } + if a.Config.Global.UserCount == 0 { + errMsgs = append(errMsgs, "每次压入的用户数至少是1") + } + + if a.Config.Global.SId == "" { + errMsgs = append(errMsgs, "缺少区服ID") + } + + if a.Config.Global.IntervalS < 0 { + errMsgs = append(errMsgs, "压入用户的间隔时间应该是0+") + } + var buf bytes.Buffer buf.WriteString("AI 参数校验") if errMsgs != nil { errMsg := strings.Join(errMsgs, " ") buf.WriteString(fmt.Sprintf("未通过 (%s)", errMsg)) - logrus.Debug(buf.String()) + logrus.Error(buf.String()) return errors.New(errMsg) } buf.WriteString( - fmt.Sprintf("通过 机器人数量:%v", a.RobotCount)) + fmt.Sprintf("通过 机器人数量:%v", a.Config.Global.UserCountTotal)) logrus.Debug(buf.String()) return nil } diff --git a/lib/robot.go b/lib/robot.go index 1ff1087..804f13e 100644 --- a/lib/robot.go +++ b/lib/robot.go @@ -1,6 +1,7 @@ package lib import ( + "fmt" "sort" "strings" "sync" @@ -8,6 +9,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/nacos-group/nacos-sdk-go/common/logger" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" "legu.airobot/pb" @@ -27,23 +29,25 @@ type IRobot interface { type Robot struct { IStore - account string - sid string - conn *websocket.Conn - data map[string]interface{} //机器人缓存数据 - status uint32 //状态 - lock sync.Mutex // - sceneQueue *Queue[IScene] //场景队列 - config *storage.Config //配置 - resultCh chan *CallResult + account string + sid string + conn *websocket.Conn + data map[string]interface{} //机器人缓存数据 + status uint32 //状态 + lock sync.Mutex // + sceneQueue *Queue[IScene] //场景队列 + config *storage.Config //配置 + resultCh chan *CallResult //请求结果通道 + sceneResultCh chan *CallResult //场景结果通道 } func NewRobot(config *storage.Config) *Robot { robot := &Robot{ - data: make(map[string]interface{}), - sceneQueue: NewQueue[IScene](), - config: config, - resultCh: make(chan *CallResult, 50), + data: make(map[string]interface{}), + sceneQueue: NewQueue[IScene](), + config: config, + resultCh: make(chan *CallResult, 100), + sceneResultCh: make(chan *CallResult, 50), } robot.Store("sid", config.Global.SId) return robot @@ -69,6 +73,12 @@ func (r *Robot) SendMsg(mainType, subType string, req proto.Message, rsp proto.M defer func() { t := time.Since(start) logrus.WithFields(logrus.Fields{"MainType": mainType, "SubType": subType, "rsp": rsp, "since": t.String()}).Debug("接收消息") + // 发送请求结果 + r.SendResult(&CallResult{ + MainType: mainType, + SubType: subType, + Elapse: t, + }) }() head := &pb.UserMessage{MainType: mainType, SubType: subType} @@ -186,9 +196,99 @@ func (m *Robot) syncCall() { elapsedTime := time.Since(start) info := scene.Info() logrus.WithField("t", elapsedTime.String()).Debug("场景【" + info.Name + "】执行完毕耗时统计") + //结束 + m.prepareToStop() + //显示场景结果 + } } +func (m *Robot) prepareToStop() { + atomic.CompareAndSwapUint32(&m.status, STATUS_STARTED, STATUS_STOPPING) + logger.Infof("Closing result channel...") + close(m.resultCh) + atomic.StoreUint32(&m.status, STATUS_STOPPED) +} + +func (m *Robot) SendResult(result *CallResult) bool { + if atomic.LoadUint32(&m.status) != STATUS_STARTED { + m.printIgnoredResult(result, "stopped load generator") + return false + } + select { + case m.resultCh <- result: + fmt.Printf("准备发送结果: %v\n", result) + return true + default: + m.printIgnoredResult(result, "full result channel") + return false + } +} + +func (m *Robot) ShowResult() { + // statistics := &Statistics{} + // max := statistics.MaxElapse + // min := statistics.MinElapse + routes := make(map[string]*Statistics) + for r := range m.resultCh { + head := fmt.Sprintf("%s.%s", r.MainType, r.SubType) + if len(routes) == 0 { + statis := &Statistics{ + Route: head, + ElapseTotal: r.Elapse, + MaxElapse: r.Elapse, + MinElapse: r.Elapse, + CallCount: 1, + } + avg := (statis.MaxElapse + statis.MinElapse) / 2 + statis.AvgElapse = avg + routes[head] = statis + } else { + if route, ok := routes[head]; ok { + max := route.MaxElapse + min := route.MinElapse + if r.Elapse > max { + max = r.Elapse + } + + if r.Elapse < min { + min = r.Elapse + } + statis := &Statistics{ + Route: head, + ElapseTotal: route.ElapseTotal + r.Elapse, + MaxElapse: max, + MinElapse: min, + CallCount: route.CallCount + 1, + } + avg := (statis.MaxElapse + statis.MinElapse) / 2 + statis.AvgElapse = avg + routes[head] = statis + } else { + statis := &Statistics{ + Route: head, + ElapseTotal: r.Elapse, + MaxElapse: r.Elapse, + MinElapse: r.Elapse, + CallCount: 1, + } + avg := (statis.MaxElapse + statis.MinElapse) / 2 + statis.AvgElapse = avg + routes[head] = statis + } + } + } + + //TODO 将统计结果写入文件 +} + +func (m *Robot) printIgnoredResult(result *CallResult, cause string) { + resultMsg := fmt.Sprintf( + "MainType=%s, SubType=%s, Elapse=%v", + result.MainType, result.SubType, result.Elapse) + logrus.Warnf("Ignored result: %s. (cause: %s)\n", resultMsg, cause) +} + // Deprecated func (m *Robot) checkResp(data []byte, rsp proto.Message) bool { msg := &pb.UserMessage{} diff --git a/storage/config.go b/storage/config.go index 086056d..838b355 100644 --- a/storage/config.go +++ b/storage/config.go @@ -11,11 +11,12 @@ type Config struct { } type Global struct { - UserCount int32 `json:"UserCount,omitempty"` //用户数 - SId string `json:"sid,omitempty"` //区服ID - WsAddr string `json:"wsAddr,omitempty"` //websocket addr - IntervalS int32 `json:"intervalS,omitempty"` //间隔时间s - TimeoutMs int32 `json:"timeoutMs,omitempty"` //超时时间 + UserCount uint32 `json:"UserCount,omitempty"` //用户数(每次压入的数量) + UserCountTotal uint32 `json:"UserCountTotal,omitempty"` //用户总数(压入的总数) + SId string `json:"sid,omitempty"` //区服ID + WsAddr string `json:"wsAddr,omitempty"` //websocket addr + IntervalS int32 `json:"intervalS,omitempty"` //间隔时间s(每次压入用户的间隔时间) + TimeoutMs int32 `json:"timeoutMs,omitempty"` //超时时间 (请求超时时间) } type Scene struct { diff --git a/test/robot_test.go b/test/robot_test.go index 4388eb8..b439092 100644 --- a/test/robot_test.go +++ b/test/robot_test.go @@ -9,9 +9,7 @@ import ( ) func TestAction(t *testing.T) { - aip := lib.AIParam{ - RobotCount: 1, - } + aip := lib.AIParam{} ai, _ := lib.NewAI(aip) // 创建场景 diff --git a/ui/mainwindow.go b/ui/mainwindow.go index 4cb963e..bc23a0e 100644 --- a/ui/mainwindow.go +++ b/ui/mainwindow.go @@ -102,9 +102,8 @@ func (mw *MainWindow) startContainer() { startBtn.Enable() startBtn.OnTapped = func() { param := lib.AIParam{ - Scenes: mw.UIImpl.scenes, - Config: mw.UIImpl.config, - RobotCount: uint32(config.Global.UserCount), + Scenes: mw.UIImpl.scenes, + Config: mw.UIImpl.config, } ai, err := lib.NewAI(param) if err != nil { @@ -124,10 +123,11 @@ func (mw *MainWindow) configContainer() { wsAddrEntry := widget.NewEntry() sidEntry := widget.NewEntry() userCountEntry := widget.NewEntry() + userCountEntry.PlaceHolder = "每次压入的用户数" userTotalEntry := widget.NewEntry() - userTotalEntry.PlaceHolder = "用户数上限" + userTotalEntry.PlaceHolder = "总共需要压入的最大用户数" timeoutEntry := widget.NewEntry() - timeoutEntry.PlaceHolder = "请求超时" + timeoutEntry.PlaceHolder = "请求超时时间" intervalEntry := widget.NewEntry() intervalEntry.PlaceHolder = "每次压入用户间隔时间" @@ -157,7 +157,7 @@ func (mw *MainWindow) configContainer() { global := &storage.Global{ WsAddr: wsAddrEntry.Text, SId: sidEntry.Text, - UserCount: cast.ToInt32(userCountEntry.Text), + UserCount: cast.ToUint32(userCountEntry.Text), TimeoutMs: cast.ToInt32(timeoutEntry.Text), IntervalS: cast.ToInt32(intervalEntry.Text), }