From fa07f06abc3a17ecde7a08e945870128c765db7c Mon Sep 17 00:00:00 2001 From: wh_zcy Date: Fri, 16 Dec 2022 11:09:19 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B6=85=E6=97=B6=E7=BB=9F=E8=AE=A1=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- busi/login.go | 3 +- lib/ai.go | 43 +++++--------- lib/base.go | 20 ++++--- lib/event.go | 3 + lib/robot.go | 141 +++++++++++++++++++++++++++++++++------------ lib/scene.go | 4 +- test/robot_test.go | 23 +++++++- ui/mainwindow.go | 77 ++++++++++++++++++------- ui/ui.go | 2 + 9 files changed, 217 insertions(+), 99 deletions(-) diff --git a/busi/login.go b/busi/login.go index ad8d513..d38c6e7 100644 --- a/busi/login.go +++ b/busi/login.go @@ -30,7 +30,8 @@ func (l *LoginScene) Run(robot lib.IRobot) error { } rsp := &pb.UserLoginResp{} - if code := robot.SendMsg("user", "login", req, rsp); code == pb.ErrorCode_Success { + code := robot.SendMsg("user", "login", req, rsp) + if code == pb.ErrorCode_Success { robot.Store("user.login", rsp) } return nil diff --git a/lib/ai.go b/lib/ai.go index 7ab0ee5..b9135c2 100644 --- a/lib/ai.go +++ b/lib/ai.go @@ -15,7 +15,7 @@ import ( "legu.airobot/storage" ) -type myAI struct { +type MyAI struct { robots []*Robot scenes []*scene iscenes []IScene @@ -29,13 +29,12 @@ type myAI struct { ReportMap map[int]map[string]*Statistics //测试报告 key1:场景 key2:协议 } -func NewAI(aip AIParam) (*myAI, error) { +func NewAI(aip AIParam) (*MyAI, error) { if err := aip.Check(); err != nil { return nil, err } - ai := &myAI{ - Obs: NewObserver(), + ai := &MyAI{ scenes: make([]*scene, 0), config: aip.Config, iscenes: aip.Scenes, @@ -49,7 +48,7 @@ func NewAI(aip AIParam) (*myAI, error) { return ai, nil } -func (m *myAI) init() error { +func (m *MyAI) init() error { var buf bytes.Buffer buf.WriteString("初始化AI") @@ -66,13 +65,13 @@ func (m *myAI) init() error { return nil } -func (m *myAI) AppendRobot(robot *Robot) { +func (m *MyAI) AppendRobot(robot *Robot) { defer m.lock.Unlock() m.lock.Lock() m.robots = append(m.robots, robot) } -func (m *myAI) Start() bool { +func (m *MyAI) Start() bool { if len(m.config.Scenes) == 0 { logrus.Warn("还未设置场景") return false @@ -85,35 +84,20 @@ func (m *myAI) Start() bool { robot := NewRobot(m) robot.SetScenes() m.AppendRobot(robot) - robot.Start() atomic.AddUint32(&m.useCountTotal, 1) + m.Obs.Notify(EVENT_ROBOT, int32(1)) + robot.Start() }() } }() - - go func() { - start := time.Now() - for { - total := atomic.LoadUint32(&m.useCountTotal) - if total == m.config.Global.UserCountTotal { - elipse := time.Since(start) - logrus.Debug("开始生成测试报告") - m.MergeResult() - m.genReport(elipse) - m.Obs.Notify(EVENT_PROGRESS, int32(0)) - break - } - } - }() - return true } -func (m *myAI) Stop() { +func (m *MyAI) Stop() { } -func (m *myAI) MergeResult() { +func (m *MyAI) MergeResult() { m.lock.Lock() defer m.lock.Unlock() n := make(map[int]map[string]*Statistics) @@ -148,6 +132,7 @@ func (m *myAI) MergeResult() { statis.MinElapse = min statis.Route = a.Route statis.SceneName = a.SceneName + statis.TimeoutCount = (a.TimeoutCount + b.TimeoutCount) if n[i][k] == nil { n[i] = make(map[string]*Statistics) } @@ -179,7 +164,7 @@ func (m *myAI) MergeResult() { m.ReportMap = n } -func (m *myAI) genReport(elipse time.Duration) { +func (m *MyAI) GenReport(elipse time.Duration) { var msgs []string var i []int for key, _ := range m.ReportMap { @@ -188,8 +173,8 @@ func (m *myAI) genReport(elipse time.Duration) { sort.Ints(i) for _, routes := range i { for r, d := range m.ReportMap[routes] { - msgs = append(msgs, fmt.Sprintf("【%s】协议:%s 调用次数:%d 总耗时:%v 平均耗时:%v 最大耗时:%v 最小耗时:%v", - d.SceneName, r, d.CallCount, d.ElapseTotal.String(), d.AvgElapse.String(), d.MaxElapse.String(), d.MinElapse.String())) + msgs = append(msgs, fmt.Sprintf("【%s】协议:%s 调用次数:%d 总耗时:%v 平均耗时:%v 最大耗时:%v 最小耗时:%v 超时次数:%v", + d.SceneName, r, d.CallCount, d.ElapseTotal.String(), d.AvgElapse.String(), d.MaxElapse.String(), d.MinElapse.String(), d.TimeoutCount)) } } record := strings.Join(msgs, "\n") diff --git a/lib/base.go b/lib/base.go index b369dad..03d5dd0 100644 --- a/lib/base.go +++ b/lib/base.go @@ -1,6 +1,8 @@ package lib -import "time" +import ( + "time" +) type LoginParam struct { Account string `json:"account"` @@ -30,18 +32,20 @@ type CallResult struct { SceneName string MainType string SubType string + Code int32 Elapse time.Duration // 耗时 Num int } type Statistics struct { - ElapseTotal time.Duration //总耗时 - MaxElapse time.Duration //最大耗时 - MinElapse time.Duration //最小耗时 - AvgElapse time.Duration //平均耗时 - CallCount int64 //调用次数 - Route string //协议名称 - SceneName string //场景名称 + ElapseTotal time.Duration //总耗时 + MaxElapse time.Duration //最大耗时 + MinElapse time.Duration //最小耗时 + AvgElapse time.Duration //平均耗时 + CallCount int64 //调用次数 + Route string //协议名称 + SceneName string //场景名称 + TimeoutCount int32 //超时次数 } const ( diff --git a/lib/event.go b/lib/event.go index b34cd19..99a2870 100644 --- a/lib/event.go +++ b/lib/event.go @@ -3,4 +3,7 @@ package lib const ( EVENT_FLAG = "dispatch" EVENT_PROGRESS = "progress" + EVENT_REPORT = "report" + EVENT_ROBOT = "robot" + EVENT_CLOSECON = "closeCon" ) diff --git a/lib/robot.go b/lib/robot.go index fb348f4..f5ea557 100644 --- a/lib/robot.go +++ b/lib/robot.go @@ -27,14 +27,15 @@ type IRobot interface { type Robot struct { IStore - ai *myAI + ai *MyAI scene IScene //场景 account string //账号 sid string //区服编号 conn *websocket.Conn //连接对象 data map[string]interface{} //机器人缓存数据 status uint32 //状态 - cache sync.Mutex //缓存锁 + cacheLock sync.Mutex //缓存锁 + resLock sync.Mutex //结果处理锁 sceneQueue *Queue[IScene] //场景队列 config *storage.Config //配置 resultCh chan *CallResult //请求结果通道 @@ -42,7 +43,7 @@ type Robot struct { elipseTotal time.Duration //场景总耗时 } -func NewRobot(ai *myAI) *Robot { +func NewRobot(ai *MyAI) *Robot { robot := &Robot{ ai: ai, data: make(map[string]interface{}), @@ -52,25 +53,35 @@ func NewRobot(ai *myAI) *Robot { ReportMap: make(map[int]map[string]*Statistics), } robot.Store("sid", ai.config.Global.SId) + + ai.Obs.AddListener(EVENT_CLOSECON, Listener{ + OnNotify: func(data interface{}, args ...interface{}) { + d := data.(bool) + if d { + robot.prepareToStop() + robot.conn.Close() + } + }, + }) return robot } //存数据 func (a *Robot) Store(key string, data interface{}) { - defer a.cache.Unlock() - a.cache.Lock() + defer a.cacheLock.Unlock() + a.cacheLock.Lock() a.data[key] = data } //取数据 func (a *Robot) Get(key string) interface{} { - defer a.cache.Unlock() - a.cache.Lock() + defer a.cacheLock.Unlock() + a.cacheLock.Lock() return a.data[key] } // 发送消息 -func (r *Robot) SendMsg(mainType, subType string, req proto.Message, rsp proto.Message) pb.ErrorCode { +func (r *Robot) SendMsg(mainType, subType string, req proto.Message, rsp proto.Message) (code pb.ErrorCode) { start := time.Now() defer func() { t := time.Since(start) @@ -86,7 +97,9 @@ func (r *Robot) SendMsg(mainType, subType string, req proto.Message, rsp proto.M SubType: subType, Elapse: t, Num: r.getSortNum(name), + Code: int32(code), }) + }() head := &pb.UserMessage{MainType: mainType, SubType: subType} @@ -106,12 +119,42 @@ func (r *Robot) SendMsg(mainType, subType string, req proto.Message, rsp proto.M // logrus.WithFields(logrus.Fields{"MainType": mainType, "SubType": subType, "req": req}).Debug("发送消息") for { + // 调用状态:0-未调用或调用中;1-调用完成;2-调用超时。 + var callStatus uint32 + timer := time.AfterFunc(time.Duration(r.config.Global.TimeoutMs*int32(time.Millisecond)), func() { + if !atomic.CompareAndSwapUint32(&callStatus, 0, 2) { + return + } + logrus.WithFields(logrus.Fields{"MainType": mainType, "SubType": subType, + "大于": r.config.Global.TimeoutMs}).Error("超时了") + + var name string + if r.scene != nil { + name = r.scene.Info().Name + } + r.SendResult(&CallResult{ + SceneName: name, + MainType: mainType, + SubType: subType, + Elapse: time.Duration(r.config.Global.TimeoutMs), + Num: r.getSortNum(name), + Code: int32(pb.ErrorCode_TimestampTimeout), + }) + }) + _, data, err := r.conn.ReadMessage() if err != nil { logrus.WithField("err", err).Error("读数据异常") + r.ai.Obs.Notify(EVENT_REPORT, true) + r.conn.Close() break } + if !atomic.CompareAndSwapUint32(&callStatus, 0, 1) { + return + } + timer.Stop() + msg := &pb.UserMessage{} if err := proto.Unmarshal(data, msg); err != nil { logrus.Error("pb解析失败") @@ -120,15 +163,20 @@ func (r *Robot) SendMsg(mainType, subType string, req proto.Message, rsp proto.M if msg.MainType == mainType && msg.SubType == subType { if !ProtoUnmarshal(msg, rsp) { + logrus.Error("pb解析失败") break } - return pb.ErrorCode_Success + return } else if msg.MainType == "notify" && msg.SubType == "errornotify" { rsp := &pb.NotifyErrorNotifyPush{} if !ProtoUnmarshal(msg, rsp) { + logrus.Error("pb解析失败") break } - return rsp.Code + code = rsp.Code + return + } else { + // logrus.Debugf("推送 %s.%s", msg.MainType, msg.SubType) } } } @@ -189,6 +237,10 @@ func (m *Robot) Start() bool { m.conn = conn + if err := m.conn.SetReadDeadline(time.Now().Add(20 * time.Second)); err != nil { + logrus.Error("SetReadDeadline %v", err) + } + //检查具备启动的状态 if !atomic.CompareAndSwapUint32( &m.status, STATUS_ORIGINAL, STATUS_STARTING) { @@ -207,7 +259,11 @@ func (m *Robot) Start() bool { } func (m *Robot) Stop() bool { - return false + for { + if _, err := m.sceneQueue.Pop(); err != nil { + return true + } + } } func (m *Robot) syncCall() { @@ -217,7 +273,7 @@ func (m *Robot) syncCall() { err, ok := interface{}(p).(error) var errMsg string if ok { - errMsg = fmt.Sprintf("调用时Panic! (error: %s)", err) + errMsg = fmt.Sprintf("业务模块异常! (error: %s)", err) } else { errMsg = fmt.Sprintf("调用时Panic! (clue: %#v)", p) } @@ -283,46 +339,59 @@ func (m *Robot) processResult() { min = r.Elapse } statis := &Statistics{ - Route: head, - SceneName: r.SceneName, - ElapseTotal: route.ElapseTotal + r.Elapse, - MaxElapse: max, - MinElapse: min, - CallCount: route.CallCount + 1, + Route: head, + SceneName: r.SceneName, + } + if r.Code == int32(pb.ErrorCode_TimestampTimeout) { + statis.TimeoutCount += route.TimeoutCount + } else { + statis.ElapseTotal = route.ElapseTotal + r.Elapse + statis.MaxElapse = max + statis.MinElapse = min + statis.CallCount += 1 + avg := (statis.MaxElapse + statis.MinElapse) / 2 + statis.AvgElapse = avg } - avg := (statis.MaxElapse + statis.MinElapse) / 2 - statis.AvgElapse = avg routes[head] = statis } else { statis := &Statistics{ - Route: head, - SceneName: r.SceneName, - ElapseTotal: r.Elapse, - MaxElapse: r.Elapse, - MinElapse: r.Elapse, - CallCount: 1, + Route: head, + SceneName: r.SceneName, + CallCount: 1, + } + if r.Code == int32(pb.ErrorCode_TimestampTimeout) { + statis.TimeoutCount = 1 + } else { + statis.ElapseTotal = r.Elapse + statis.MaxElapse = r.Elapse + statis.MinElapse = r.Elapse + avg := (statis.MaxElapse + statis.MinElapse) / 2 + statis.AvgElapse = avg } - avg := (statis.MaxElapse + statis.MinElapse) / 2 - statis.AvgElapse = avg routes[head] = statis } m.ReportMap[r.Num] = routes } else { route := make(map[string]*Statistics) statis := &Statistics{ - Route: head, - SceneName: r.SceneName, - ElapseTotal: r.Elapse, - MaxElapse: r.Elapse, - MinElapse: r.Elapse, - CallCount: 1, + Route: head, + SceneName: r.SceneName, + CallCount: 1, + } + if r.Code == int32(pb.ErrorCode_TimestampTimeout) { + statis.TimeoutCount = 1 + } else { + statis.ElapseTotal = r.Elapse + statis.MaxElapse = r.Elapse + statis.MinElapse = r.Elapse + avg := (statis.MaxElapse + statis.MinElapse) / 2 + statis.AvgElapse = avg } - avg := (statis.MaxElapse + statis.MinElapse) / 2 - statis.AvgElapse = avg route[head] = statis m.ReportMap[r.Num] = route } } + } func (m *Robot) printIgnoredResult(result *CallResult, cause string) { diff --git a/lib/scene.go b/lib/scene.go index a46cb7c..cffff78 100644 --- a/lib/scene.go +++ b/lib/scene.go @@ -10,7 +10,7 @@ type IScene interface { } type scene struct { - ai *myAI + ai *MyAI Name string Desc string callerQueue *Queue[ICaller] //确定运行的caller队列 @@ -20,7 +20,7 @@ type scene struct { } // 创建场景 -func NewScene(ai *myAI, param SceneParam) *scene { +func NewScene(ai *MyAI, param SceneParam) *scene { s := &scene{ ai: ai, Name: param.Name, diff --git a/test/robot_test.go b/test/robot_test.go index a16b49a..cbd4c4f 100644 --- a/test/robot_test.go +++ b/test/robot_test.go @@ -4,8 +4,8 @@ import ( "fmt" "math" "testing" + "time" - "github.com/Pallinder/go-randomdata" "legu.airobot/lib" "legu.airobot/storage" ) @@ -132,7 +132,24 @@ func divide(a int, b int) int { } -func TestDiv(t *testing.T) { +func TestStop(t *testing.T) { + timeout := func() { + fmt.Println("超时了") + } + read := func() { + time.Sleep(time.Second * 2) - fmt.Println(randomdata.Number(0, 5)) + } + + for { + timer := time.AfterFunc(time.Second*1, timeout) + + read() + fmt.Println("read") + timer.Stop() + read() + fmt.Println("process") + + break + } } diff --git a/ui/mainwindow.go b/ui/mainwindow.go index ceca87a..bd7f59d 100644 --- a/ui/mainwindow.go +++ b/ui/mainwindow.go @@ -8,6 +8,7 @@ import ( "path/filepath" "sort" "strings" + "sync/atomic" "time" "fyne.io/fyne/v2" @@ -29,18 +30,24 @@ var _ IWindow = (*MainWindow)(nil) type MainWindow struct { UIImpl + ai *lib.MyAI w fyne.Window statusbar *statusBar //状态栏 mainMenu *mainMenu //菜单 progress *widget.ProgressBar //进度条 - progressCounter int //进度条计数 - tipLabel *widget.Label //提示Label - startBtn *widget.Button //开始按钮 + progressCounter int32 //进度条计数 + endProgress chan interface{} + progressChan chan struct{} + tipLabel *widget.Label //提示Label + startBtn *widget.Button //开始按钮 + robotCount int32 } func NewMainWdindow(ui *UIImpl) IWindow { mw := &MainWindow{ - UIImpl: *ui, + UIImpl: *ui, + progressChan: make(chan struct{}), + endProgress: make(chan interface{}, 1), } globalWindow = mw return mw @@ -81,6 +88,7 @@ func (mw *MainWindow) changeContent(content fyne.CanvasObject) { func (mw *MainWindow) startContainer() { mw.startBtn = widget.NewButton("启动", nil) mw.startBtn.Disable() + config := mw.config // 检查全局配置 if config.Global == nil { @@ -105,10 +113,24 @@ func (mw *MainWindow) startContainer() { mw.progress = widget.NewProgressBar() mw.tipLabel = widget.NewLabel("") - mw.progress.SetValue(0) + + mw.UIImpl.obs.AddListener(lib.EVENT_PROGRESS, lib.Listener{ + OnNotify: mw.showProgress, + }) + + mw.UIImpl.obs.AddListener(lib.EVENT_ROBOT, lib.Listener{ + OnNotify: mw.listenRobot, + }) mw.startBtn.Enable() mw.startBtn.OnTapped = func() { + atomic.StoreInt32(&mw.robotCount, 0) + atomic.StoreInt32(&mw.progressCounter, 0) + mw.progress.SetValue(0) + + mw.tipLabel.SetText("进行中...") + mw.startBtn.Disable() + param := lib.AIParam{ Scenes: mw.UIImpl.scenes, Config: mw.UIImpl.config, @@ -118,12 +140,11 @@ func (mw *MainWindow) startContainer() { dialog.ShowError(err, mw.w) return } - mw.tipLabel.SetText("进行中...") - mw.startBtn.Disable() - ai.Obs.AddListener(lib.EVENT_PROGRESS, lib.Listener{ - OnNotify: mw.showProgress, - }) + ai.Obs = mw.UIImpl.obs + mw.ai = ai + ai.Start() + mw.showReport() } content := container.NewBorder(mw.progress, nil, nil, nil, @@ -131,19 +152,25 @@ func (mw *MainWindow) startContainer() { mw.changeContent(content) } +func (mw *MainWindow) showReport() { + start := time.Now() + <-mw.progressChan + logrus.Debug("开始生成测试报告") + elipse := time.Since(start) + mw.ai.MergeResult() + mw.ai.GenReport(elipse) + mw.startBtn.Enable() + atomic.StoreInt32(&mw.progressCounter, 0) + mw.tipLabel.SetText("已生成报表") +} + func (mw *MainWindow) showProgress(data interface{}, args ...interface{}) { count := data.(int32) - if count == 0 { //表示结束 - mw.startBtn.Enable() - mw.progressCounter = 0 - mw.tipLabel.SetText("已生成报表") - } - - max := int(mw.config.Global.UserCountTotal) * len(mw.config.Scenes) - + rc := atomic.LoadInt32(&mw.progressCounter) + atomic.StoreInt32(&mw.progressCounter, count+rc) //计算进度 - mw.progressCounter += int(count) - expr := fmt.Sprintf("%v/%v", mw.progressCounter, max) + max := int(mw.config.Global.UserCountTotal) * len(mw.config.Scenes) + expr := fmt.Sprintf("%v/%v", atomic.LoadInt32(&mw.progressCounter), max) logrus.Debug(expr) r, err := engine.ParseAndExec(expr) if err != nil { @@ -153,9 +180,19 @@ func (mw *MainWindow) showProgress(data interface{}, args ...interface{}) { mw.progress.SetValue(r) if r >= 1 { mw.tipLabel.SetText("正在处理结果...") + mw.progressChan <- struct{}{} + logrus.Debug("....") } } +func (mw *MainWindow) listenRobot(data interface{}, args ...interface{}) { + count := data.(int32) + rc := atomic.LoadInt32(&mw.robotCount) + atomic.StoreInt32(&mw.robotCount, count+rc) + + mw.tipLabel.SetText(fmt.Sprintf("投放用户数:%d/%d", atomic.LoadInt32(&mw.robotCount), mw.config.Global.UserCountTotal)) +} + // 全局配置 func (mw *MainWindow) configContainer() { config := mw.config.Global diff --git a/ui/ui.go b/ui/ui.go index 2e686e8..3c68ed9 100644 --- a/ui/ui.go +++ b/ui/ui.go @@ -16,6 +16,7 @@ type UIImpl struct { storage storage.Storage config *storage.Config scenes []lib.IScene + obs lib.Observer } func NewUI(app fyne.App, scenes []lib.IScene) (*UIImpl, error) { @@ -39,6 +40,7 @@ func NewUI(app fyne.App, scenes []lib.IScene) (*UIImpl, error) { storage: storage, config: config, scenes: scenes, + obs: lib.NewObserver(), }, nil }