package lib import ( "bytes" "fmt" "io/fs" "io/ioutil" "path/filepath" "sort" "strings" "sync" "sync/atomic" "time" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" "legu.airobot/pb" "legu.airobot/storage" ) type IRobot interface { // 发送消息 SendMsg(mainType, subType string, req proto.Message, rsp proto.Message) pb.ErrorCode // 存储数据 Store(key string, data interface{}) // 获取数据 Get(key string) interface{} // 停止运行 Stop() bool } type Robot struct { IStore scene IScene //场景 account string sid string conn *websocket.Conn data map[string]interface{} //机器人缓存数据 status uint32 //状态 lock sync.Mutex //数据缓存锁 sceneQueue *Queue[IScene] //场景队列 config *storage.Config //配置 resultCh chan *CallResult //请求结果通道 sceneResultCh chan *CallResult //场景结果通道 ReportMap map[int]map[string]*Statistics //测试报告 key1:场景序号 key2:协议 elipseTotal time.Duration } func NewRobot(config *storage.Config) *Robot { robot := &Robot{ data: make(map[string]interface{}), sceneQueue: NewQueue[IScene](), config: config, resultCh: make(chan *CallResult, 100), sceneResultCh: make(chan *CallResult, 50), ReportMap: make(map[int]map[string]*Statistics), } robot.Store("sid", config.Global.SId) return robot } //存数据 func (a *Robot) Store(key string, data interface{}) { defer a.lock.Unlock() a.lock.Lock() a.data[key] = data } //取数据 func (a *Robot) Get(key string) interface{} { defer a.lock.Unlock() a.lock.Lock() return a.data[key] } // 发送消息 func (r *Robot) SendMsg(mainType, subType string, req proto.Message, rsp proto.Message) pb.ErrorCode { start := time.Now() defer func() { t := time.Since(start) logrus.WithFields(logrus.Fields{"MainType": mainType, "SubType": subType, "rsp": rsp, "since": t.String()}).Debug("接收消息") var name string if r.scene != nil { name = r.scene.Info().Name } // 发送请求结果 r.SendResult(&CallResult{ SceneName: name, MainType: mainType, SubType: subType, Elapse: t, Num: r.getSortNum(name), }) }() head := &pb.UserMessage{MainType: mainType, SubType: subType} if mainType == "user" && subType == "login" { loginReq := req.(*pb.UserLoginReq) head.Sec = BuildSecStr(loginReq.Sid, loginReq.Account) } else { head.Sec = BuildSecStr(r.sid, r.account) } if ProtoMarshal(req, head) { data, _ := proto.Marshal(head) if err := r.conn.WriteMessage(websocket.BinaryMessage, data); err != nil { logrus.WithField("err", err).Error("写数据异常") return pb.ErrorCode_SystemError } logrus.WithFields(logrus.Fields{"MainType": mainType, "SubType": subType, "req": req}).Debug("发送消息") for { _, data, err := r.conn.ReadMessage() if err != nil { logrus.WithField("err", err).Error("读数据异常") break } msg := &pb.UserMessage{} if err := proto.Unmarshal(data, msg); err != nil { logrus.Error("pb解析失败") break } if msg.MainType == mainType && msg.SubType == subType { if !ProtoUnmarshal(msg, rsp) { break } return pb.ErrorCode_Success } else if msg.MainType == "notify" && msg.SubType == "errornotify" { rsp := &pb.NotifyErrorNotifyPush{} if !ProtoUnmarshal(msg, rsp) { break } logrus.Info(rsp.Code) return rsp.Code } } } return pb.ErrorCode_Success } // 设置场景队列 func (a *Robot) SetScenes(scenes []IScene) { scensConf := a.config.Scenes sort.SliceStable(scensConf, func(i, j int) bool { return scensConf[i].Num < scensConf[j].Num }) for _, conf := range scensConf { for _, v := range scenes { info := v.Info() if conf.Name == info.Name { if conf.Loop == 0 || conf.Loop == 1 { a.sceneQueue.Add(v) continue } else if conf.Loop > 1 { for i := int32(0); i < conf.Loop; i++ { a.sceneQueue.Add(v) } continue } } } } } // 根据场景名称获取顺序号 func (a *Robot) getSortNum(name string) int { var num int for _, v := range a.config.Scenes { if v.Name == name { return v.Num } } return num } func (m *Robot) Start() bool { if len(m.sceneQueue.List()) == 0 { logrus.Warn("没有设置场景队列") return false } // 创建链接 dialer := &websocket.Dialer{ HandshakeTimeout: 2 * time.Second, } conn, _, err := dialer.Dial(m.config.Global.WsAddr, nil) if err != nil { logrus.Error(err) return false } m.conn = conn //检查具备启动的状态 if !atomic.CompareAndSwapUint32( &m.status, STATUS_ORIGINAL, STATUS_STARTING) { if !atomic.CompareAndSwapUint32(&m.status, STATUS_STOPPED, STATUS_STARTING) { return false } } //设置状态为已启动 atomic.StoreUint32(&m.status, STATUS_STARTED) m.syncCall() return true } func (m *Robot) Stop() bool { return false } func (m *Robot) syncCall() { for { scene, err := m.sceneQueue.Pop() if err != nil { logrus.WithField("err", err).Debug("所有场景执行结束") m.prepareToStop() return } m.scene = scene info := m.scene.Info() start := time.Now() //这里执行会花很长时间 if err := scene.Run(m); err != nil { logrus.WithField("err", err).Error("执行业务时发生错误") continue } elapsedTime := time.Since(start) m.elipseTotal += elapsedTime logrus.WithField("t", elapsedTime.String()).Debug("场景【" + info.Name + "】执行完毕耗时统计") //显示场景结果 m.processResult() } } func (m *Robot) prepareToStop() { atomic.CompareAndSwapUint32(&m.status, STATUS_STARTED, STATUS_STOPPING) logrus.Debug("关闭结果通道...") close(m.resultCh) atomic.StoreUint32(&m.status, STATUS_STOPPED) } func (m *Robot) SendResult(result *CallResult) bool { if atomic.LoadUint32(&m.status) != STATUS_STARTED { m.printIgnoredResult(result, "停止发送") return false } select { case m.resultCh <- result: logrus.WithField("res", result).Debug("发送结果") return true default: m.printIgnoredResult(result, "通道满了") return false } } func (m *Robot) processResult() { go func() { defer m.lock.Unlock() m.lock.Lock() for r := range m.resultCh { head := fmt.Sprintf("%s.%s", r.MainType, r.SubType) if routes, ok := m.ReportMap[r.Num]; ok { if route, y := routes[head]; y { max := route.MaxElapse min := route.MinElapse if r.Elapse > max { max = r.Elapse } if r.Elapse < min { min = r.Elapse } statis := &Statistics{ Route: head, SceneName: r.SceneName, ElapseTotal: route.ElapseTotal + r.Elapse, MaxElapse: max, MinElapse: min, CallCount: route.CallCount + 1, } avg := (statis.MaxElapse + statis.MinElapse) / 2 statis.AvgElapse = avg routes[head] = statis } else { statis := &Statistics{ Route: head, SceneName: r.SceneName, ElapseTotal: r.Elapse, MaxElapse: r.Elapse, MinElapse: r.Elapse, CallCount: 1, } avg := (statis.MaxElapse + statis.MinElapse) / 2 statis.AvgElapse = avg routes[head] = statis } m.ReportMap[r.Num] = routes } else { route := make(map[string]*Statistics) statis := &Statistics{ Route: head, SceneName: r.SceneName, ElapseTotal: r.Elapse, MaxElapse: r.Elapse, MinElapse: r.Elapse, CallCount: 1, } avg := (statis.MaxElapse + statis.MinElapse) / 2 statis.AvgElapse = avg route[head] = statis m.ReportMap[r.Num] = route } } }() } // 将统计结果写入文件 // Deprecated func (m *Robot) genReport(e time.Duration) { var buf bytes.Buffer buf.WriteString("测试报告\n") buf.WriteString(fmt.Sprintf("用户总数:%d 场景总耗时:%s\n", m.config.Global.UserCountTotal, e.String())) var msgs []string for k, routes := range m.ReportMap { // buf.WriteString(fmt.Sprintf("【%s】\n", k)) for r, d := range routes { msgs = append(msgs, fmt.Sprintf("【%s】协议:%s 调用次数:%d 总耗时:%v 平均耗时:%v 最大耗时:%v 最小耗时:%v", k, r, d.CallCount, d.ElapseTotal.String(), d.AvgElapse.String(), d.MaxElapse.String(), d.MinElapse.String())) } } record := strings.Join(msgs, "\n") buf.WriteString(record) buf.WriteString("\n------------------------------------------------------------------------------\n") // logrus.WithField("res", buf.String()).Debug("报告内容") if err := ioutil.WriteFile(filepath.Join("./", "report.log"), buf.Bytes(), fs.ModePerm); err != nil { logrus.WithField("err", err).Error("测试报告") } } func (m *Robot) printIgnoredResult(result *CallResult, cause string) { resultMsg := fmt.Sprintf( "MainType=%s, SubType=%s, Elapse=%v", result.MainType, result.SubType, result.Elapse) logrus.Warnf("Ignored result: %s. (cause: %s)\n", resultMsg, cause) } // Deprecated func (m *Robot) checkResp(data []byte, rsp proto.Message) bool { msg := &pb.UserMessage{} if err := proto.Unmarshal(data, msg); err != nil { logrus.Error("pb解析失败") return false } methodStr := msg.Data.TypeUrl methodName := SubStr(methodStr, 20, len(methodStr)) if strings.HasSuffix(methodName, "Push") { if methodName == "NotifyErrorNotifyPush" { push := &pb.NotifyErrorNotifyPush{} if !ProtoUnmarshal(msg, push) { logrus.Error("pb解析失败") return false } logrus.WithField("methodName", methodName).WithField("code", push.Code).Debug("收到错误推送") } return false } else { if !ProtoUnmarshal(msg, rsp) { return false } } return true }