package lib import ( "fmt" "sort" "sync" "sync/atomic" "time" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" "legu.airobot/pb" "legu.airobot/storage" ) type IRobot interface { // 发送消息 SendMsg(mainType, subType string, req proto.Message, rsp proto.Message) pb.ErrorCode // 存储数据 Store(key string, data interface{}) // 获取数据 Get(key string) interface{} // 停止运行 Stop() bool } type Robot struct { IStore ai *MyAI scene IScene //场景 account string //账号 sid string //区服编号 conn *websocket.Conn //连接对象 data map[string]interface{} //机器人缓存数据 status uint32 //状态 cacheLock sync.Mutex //缓存锁 resLock sync.Mutex //结果处理锁 sceneQueue *Queue[IScene] //场景队列 config *storage.Config //配置 resultCh chan *CallResult //请求结果通道 ReportMap map[int]map[string]*Statistics //测试报告 key1:场景序号 key2:协议 elipseTotal time.Duration //场景总耗时 } func NewRobot(ai *MyAI) *Robot { robot := &Robot{ ai: ai, data: make(map[string]interface{}), sceneQueue: NewQueue[IScene](), config: ai.config, resultCh: make(chan *CallResult, 1000), ReportMap: make(map[int]map[string]*Statistics), } robot.Store("sid", ai.config.Global.SId) ai.Obs.AddListener(EVENT_CLOSECON, Listener{ OnNotify: func(data interface{}, args ...interface{}) { d := data.(bool) if d { robot.prepareToStop() robot.conn.Close() } }, }) return robot } //存数据 func (a *Robot) Store(key string, data interface{}) { defer a.cacheLock.Unlock() a.cacheLock.Lock() a.data[key] = data } //取数据 func (a *Robot) Get(key string) interface{} { defer a.cacheLock.Unlock() a.cacheLock.Lock() return a.data[key] } // 发送消息 func (r *Robot) SendMsg(mainType, subType string, req proto.Message, rsp proto.Message) (code pb.ErrorCode) { start := time.Now() defer func() { t := time.Since(start) // logrus.WithFields(logrus.Fields{"MainType": mainType, "SubType": subType, "rsp": rsp, "since": t.String()}).Debug("接收消息") var name string if r.scene != nil { name = r.scene.Info().Name } // 发送请求结果 r.SendResult(&CallResult{ SceneName: name, MainType: mainType, SubType: subType, Elapse: t, Num: r.getSortNum(name), Code: int32(code), }) }() head := &pb.UserMessage{MainType: mainType, SubType: subType} if mainType == "user" && subType == "login" { loginReq := req.(*pb.UserLoginReq) head.Sec = BuildSecStr(loginReq.Sid, loginReq.Account) } else { head.Sec = BuildSecStr(r.sid, r.account) } if ProtoMarshal(req, head) { data, _ := proto.Marshal(head) if err := r.conn.WriteMessage(websocket.BinaryMessage, data); err != nil { logrus.WithField("err", err).Error("写数据异常") return pb.ErrorCode_SystemError } // logrus.WithFields(logrus.Fields{"MainType": mainType, "SubType": subType, "req": req}).Debug("发送消息") for { // 调用状态:0-未调用或调用中;1-调用完成;2-调用超时。 var callStatus uint32 timer := time.AfterFunc(time.Duration(r.config.Global.TimeoutMs*int32(time.Millisecond)), func() { if !atomic.CompareAndSwapUint32(&callStatus, 0, 2) { return } logrus.WithFields(logrus.Fields{"MainType": mainType, "SubType": subType, "大于": r.config.Global.TimeoutMs}).Error("超时了") var name string if r.scene != nil { name = r.scene.Info().Name } r.SendResult(&CallResult{ SceneName: name, MainType: mainType, SubType: subType, Elapse: time.Duration(r.config.Global.TimeoutMs), Num: r.getSortNum(name), Code: int32(pb.ErrorCode_TimestampTimeout), }) }) _, data, err := r.conn.ReadMessage() if err != nil { logrus.WithField("err", err).Error("读数据异常") r.ai.Obs.Notify(EVENT_REPORT, true) r.conn.Close() break } if !atomic.CompareAndSwapUint32(&callStatus, 0, 1) { return } timer.Stop() msg := &pb.UserMessage{} if err := proto.Unmarshal(data, msg); err != nil { logrus.Error("pb解析失败") break } if msg.MainType == mainType && msg.SubType == subType { if !ProtoUnmarshal(msg, rsp) { logrus.Error("pb解析失败") break } return } else if msg.MainType == "notify" && msg.SubType == "errornotify" { rsp := &pb.NotifyErrorNotifyPush{} if !ProtoUnmarshal(msg, rsp) { logrus.Error("pb解析失败") break } code = rsp.Code return } else { // logrus.Debugf("推送 %s.%s", msg.MainType, msg.SubType) } } } return pb.ErrorCode_Success } // 设置场景队列 func (a *Robot) SetScenes() { scensConf := a.config.Scenes sort.SliceStable(scensConf, func(i, j int) bool { return scensConf[i].Num < scensConf[j].Num }) for _, conf := range scensConf { for _, v := range a.ai.iscenes { info := v.Info() if conf.Name == info.Name { if conf.Loop == 0 || conf.Loop == 1 { a.sceneQueue.Add(v) continue } else if conf.Loop > 1 { for i := int32(0); i < conf.Loop; i++ { a.sceneQueue.Add(v) } continue } } } } } // 根据场景名称获取顺序号 func (a *Robot) getSortNum(name string) int { var num int for _, v := range a.config.Scenes { if v.Name == name { return v.Num } } return num } func (m *Robot) Start() bool { if len(m.sceneQueue.List()) == 0 { logrus.Warn("没有设置场景队列") return false } // 创建链接 dialer := &websocket.Dialer{ HandshakeTimeout: 2 * time.Second, } conn, _, err := dialer.Dial(m.config.Global.WsAddr, nil) if err != nil { logrus.Error(err) return false } m.conn = conn if err := m.conn.SetReadDeadline(time.Now().Add(600 * time.Second)); err != nil { logrus.Error("SetReadDeadline %v", err) } //检查具备启动的状态 if !atomic.CompareAndSwapUint32( &m.status, STATUS_ORIGINAL, STATUS_STARTING) { if !atomic.CompareAndSwapUint32(&m.status, STATUS_STOPPED, STATUS_STARTING) { return false } } //设置状态为已启动 atomic.StoreUint32(&m.status, STATUS_STARTED) //显示场景结果 go m.processResult() m.syncCall() return true } func (m *Robot) Stop() bool { for { if _, err := m.sceneQueue.Pop(); err != nil { return true } } } func (m *Robot) syncCall() { for { defer func() { if p := recover(); p != nil { err, ok := interface{}(p).(error) var errMsg string if ok { errMsg = fmt.Sprintf("业务模块异常! (error: %s)", err) } else { errMsg = fmt.Sprintf("调用时Panic! (clue: %#v)", p) } logrus.Error(errMsg) } }() scene, err := m.sceneQueue.Pop() if err != nil { logrus.WithField("err", err).Debug("所有场景执行结束") m.prepareToStop() return } m.scene = scene info := m.scene.Info() start := time.Now() //这里执行会花很长时间 if err := scene.Run(m); err != nil { logrus.WithField("err", err).Error("执行业务时发生错误") continue } elapsedTime := time.Since(start) m.elipseTotal += elapsedTime logrus.WithField("t", elapsedTime.String()).Debug("场景【" + info.Name + "】执行完毕耗时统计") m.ai.Obs.Notify(EVENT_PROGRESS, int32(1)) } } func (m *Robot) prepareToStop() { atomic.CompareAndSwapUint32(&m.status, STATUS_STARTED, STATUS_STOPPING) close(m.resultCh) atomic.StoreUint32(&m.status, STATUS_STOPPED) } func (m *Robot) SendResult(result *CallResult) bool { if atomic.LoadUint32(&m.status) != STATUS_STARTED { m.printIgnoredResult(result, "停止发送") return false } select { case m.resultCh <- result: // logrus.WithField("res", result).Debug("发送结果") return true default: m.printIgnoredResult(result, "通道满了") return false } } func (m *Robot) processResult() { for r := range m.resultCh { head := fmt.Sprintf("%s.%s", r.MainType, r.SubType) if routes, ok := m.ReportMap[r.Num]; ok { if route, y := routes[head]; y { max := route.MaxElapse min := route.MinElapse if r.Elapse > max { max = r.Elapse } if r.Elapse < min { min = r.Elapse } statis := &Statistics{ Route: head, SceneName: r.SceneName, } if r.Code == int32(pb.ErrorCode_TimestampTimeout) { statis.TimeoutCount += route.TimeoutCount } else { statis.ElapseTotal = route.ElapseTotal + r.Elapse statis.MaxElapse = max statis.MinElapse = min statis.CallCount += 1 avg := (statis.MaxElapse + statis.MinElapse) / 2 statis.AvgElapse = avg } routes[head] = statis } else { statis := &Statistics{ Route: head, SceneName: r.SceneName, CallCount: 1, } if r.Code == int32(pb.ErrorCode_TimestampTimeout) { statis.TimeoutCount = 1 } else { statis.ElapseTotal = r.Elapse statis.MaxElapse = r.Elapse statis.MinElapse = r.Elapse avg := (statis.MaxElapse + statis.MinElapse) / 2 statis.AvgElapse = avg } routes[head] = statis } m.ReportMap[r.Num] = routes } else { route := make(map[string]*Statistics) statis := &Statistics{ Route: head, SceneName: r.SceneName, CallCount: 1, } if r.Code == int32(pb.ErrorCode_TimestampTimeout) { statis.TimeoutCount = 1 } else { statis.ElapseTotal = r.Elapse statis.MaxElapse = r.Elapse statis.MinElapse = r.Elapse avg := (statis.MaxElapse + statis.MinElapse) / 2 statis.AvgElapse = avg } route[head] = statis m.ReportMap[r.Num] = route } } } func (m *Robot) printIgnoredResult(result *CallResult, cause string) { resultMsg := fmt.Sprintf( "MainType=%s, SubType=%s, Elapse=%v", result.MainType, result.SubType, result.Elapse) logrus.Warnf("Ignored result: %s. (cause: %s)\n", resultMsg, cause) }