package lib import ( "errors" "fmt" "strings" "sync" "sync/atomic" "time" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" "legu.airobot/pb" ) type IRobot interface { // 启动机器人 Start() bool // 选择场景 SelScene(scene *scene) // 当前场景 GetCurrentScene() *scene } type Robot struct { IStore scene *scene conn *websocket.Conn data map[string][]byte //caller缓存数据 status uint32 //状态 lock sync.Mutex // } func NewRobot() *Robot { robot := &Robot{ data: make(map[string][]byte), } return robot } //存数据 func (a *Robot) Store(key string, data []byte) { defer a.lock.Unlock() a.lock.Lock() a.data[key] = data } //取数据 func (a *Robot) Get(key string) []byte { defer a.lock.Unlock() a.lock.Lock() return a.data[key] } func (m *Robot) SelScene(scene *scene) { m.scene = scene } func (m *Robot) GetCurrentScene() *scene { return m.scene } func (m *Robot) Start() bool { if m.scene == nil { logrus.Warn("选择一个测试场景") return false } if len(m.scene.CallerList()) == 0 { logrus.Warn("还没有给场景添加调用器") return false } // 创建链接 dialer := &websocket.Dialer{ HandshakeTimeout: 2 * time.Second, } conn, _, err := dialer.Dial("", nil) if err != nil { logrus.Error(err) return false } m.conn = conn //检查具备启动的状态 if !atomic.CompareAndSwapUint32( &m.status, STATUS_ORIGINAL, STATUS_STARTING) { if !atomic.CompareAndSwapUint32(&m.status, STATUS_STOPPED, STATUS_STARTING) { return false } } //设置状态为已启动 atomic.StoreUint32(&m.status, STATUS_STARTED) m.syncCall() logrus.Debug("机器人运行了") return true } func (m *Robot) syncCall() { for { caller, err := m.scene.callerQueue.Pop() if err != nil { return } req := caller.BuildReq(m, &pb.UserMessage{}) m.callOne(&req) } } func (m *Robot) callOne(rawReq *RawReq) *RawResp { start := time.Now().UnixNano() rsp, err := m.call(rawReq) end := time.Now().UnixNano() elapsedTime := time.Duration(end - start) var rawResp RawResp if err != nil { errMsg := fmt.Sprintf("Call Error: %s.", err) rawResp = RawResp{ ID: rawReq.ID, Err: errors.New(errMsg), Elapse: elapsedTime} } else { rawResp = RawResp{ ID: rawReq.ID, Resp: rsp, Elapse: elapsedTime} } return &rawResp } func (m *Robot) call(rawReq *RawReq) ([]byte, error) { m.conn.WriteMessage(websocket.BinaryMessage, rawReq.Req) var ( data []byte err error ) for { _, data, err = m.conn.ReadMessage() if err != nil { logrus.Errorf("readMessage err:%v", err) return nil, err } if !m.checkPush(data) { return data, nil } } } func (m *Robot) checkPush(data []byte) bool { msg := &pb.UserMessage{} if err := proto.Unmarshal(data, msg); err != nil { logrus.Error("结果解析失败") return false } methodStr := msg.Data.TypeUrl methodName := SubStr(methodStr, 20, len(methodStr)) if strings.HasSuffix(methodName, "Push") { if methodName == "NotifyErrorNotifyPush" { push := &pb.NotifyErrorNotifyPush{} if !ProtoUnmarshal(msg, push) { logrus.Error("unmarsh err") return false } logrus.WithField("methodName", methodName).WithField("code", push.Code).Debug("收到错误码") } else { logrus.WithField("methodName", methodName).Debug("收到推送") } return true } return false }