airobot/lib/robot.go
2022-12-14 19:39:09 +08:00

342 lines
8.0 KiB
Go

package lib
import (
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"legu.airobot/pb"
"legu.airobot/storage"
)
type IRobot interface {
// 发送消息
SendMsg(mainType, subType string, req proto.Message, rsp proto.Message) pb.ErrorCode
// 存储数据
Store(key string, data interface{})
// 获取数据
Get(key string) interface{}
// 停止运行
Stop() bool
}
type Robot struct {
IStore
ai *myAI
scene IScene //场景
account string
sid string
conn *websocket.Conn
data map[string]interface{} //机器人缓存数据
status uint32 //状态
cache sync.Mutex
lock sync.Mutex //结果锁
sceneQueue *Queue[IScene] //场景队列
config *storage.Config //配置
resultCh chan *CallResult //请求结果通道
// sceneResultCh chan *CallResult //场景结果通道
ReportMap map[int]map[string]*Statistics //测试报告 key1:场景序号 key2:协议
elipseTotal time.Duration
}
func NewRobot(ai *myAI) *Robot {
robot := &Robot{
ai: ai,
data: make(map[string]interface{}),
sceneQueue: NewQueue[IScene](),
config: ai.config,
resultCh: make(chan *CallResult, 1000),
// sceneResultCh: make(chan *CallResult, 50),
ReportMap: make(map[int]map[string]*Statistics),
}
robot.Store("sid", ai.config.Global.SId)
return robot
}
//存数据
func (a *Robot) Store(key string, data interface{}) {
defer a.cache.Unlock()
a.cache.Lock()
a.data[key] = data
}
//取数据
func (a *Robot) Get(key string) interface{} {
defer a.cache.Unlock()
a.cache.Lock()
return a.data[key]
}
// 发送消息
func (r *Robot) SendMsg(mainType, subType string, req proto.Message, rsp proto.Message) pb.ErrorCode {
start := time.Now()
defer func() {
t := time.Since(start)
// logrus.WithFields(logrus.Fields{"MainType": mainType, "SubType": subType, "rsp": rsp, "since": t.String()}).Debug("接收消息")
var name string
if r.scene != nil {
name = r.scene.Info().Name
}
// 发送请求结果
r.SendResult(&CallResult{
SceneName: name,
MainType: mainType,
SubType: subType,
Elapse: t,
Num: r.getSortNum(name),
})
}()
head := &pb.UserMessage{MainType: mainType, SubType: subType}
if mainType == "user" && subType == "login" {
loginReq := req.(*pb.UserLoginReq)
head.Sec = BuildSecStr(loginReq.Sid, loginReq.Account)
} else {
head.Sec = BuildSecStr(r.sid, r.account)
}
if ProtoMarshal(req, head) {
data, _ := proto.Marshal(head)
if err := r.conn.WriteMessage(websocket.BinaryMessage, data); err != nil {
logrus.WithField("err", err).Error("写数据异常")
return pb.ErrorCode_SystemError
}
// logrus.WithFields(logrus.Fields{"MainType": mainType, "SubType": subType, "req": req}).Debug("发送消息")
for {
_, data, err := r.conn.ReadMessage()
if err != nil {
logrus.WithField("err", err).Error("读数据异常")
break
}
msg := &pb.UserMessage{}
if err := proto.Unmarshal(data, msg); err != nil {
logrus.Error("pb解析失败")
break
}
if msg.MainType == mainType && msg.SubType == subType {
if !ProtoUnmarshal(msg, rsp) {
break
}
return pb.ErrorCode_Success
} else if msg.MainType == "notify" && msg.SubType == "errornotify" {
rsp := &pb.NotifyErrorNotifyPush{}
if !ProtoUnmarshal(msg, rsp) {
break
}
return rsp.Code
}
}
}
return pb.ErrorCode_Success
}
// 设置场景队列
func (a *Robot) SetScenes(scenes []IScene) {
scensConf := a.config.Scenes
sort.SliceStable(scensConf, func(i, j int) bool {
return scensConf[i].Num < scensConf[j].Num
})
for _, conf := range scensConf {
for _, v := range scenes {
info := v.Info()
if conf.Name == info.Name {
if conf.Loop == 0 || conf.Loop == 1 {
a.sceneQueue.Add(v)
continue
} else if conf.Loop > 1 {
for i := int32(0); i < conf.Loop; i++ {
a.sceneQueue.Add(v)
}
continue
}
}
}
}
}
// 根据场景名称获取顺序号
func (a *Robot) getSortNum(name string) int {
var num int
for _, v := range a.config.Scenes {
if v.Name == name {
return v.Num
}
}
return num
}
func (m *Robot) Start() bool {
if len(m.sceneQueue.List()) == 0 {
logrus.Warn("没有设置场景队列")
return false
}
// 创建链接
dialer := &websocket.Dialer{
HandshakeTimeout: 2 * time.Second,
}
conn, _, err := dialer.Dial(m.config.Global.WsAddr, nil)
if err != nil {
logrus.Error(err)
return false
}
m.conn = conn
//检查具备启动的状态
if !atomic.CompareAndSwapUint32(
&m.status, STATUS_ORIGINAL, STATUS_STARTING) {
if !atomic.CompareAndSwapUint32(&m.status, STATUS_STOPPED, STATUS_STARTING) {
return false
}
}
//设置状态为已启动
atomic.StoreUint32(&m.status, STATUS_STARTED)
m.syncCall()
return true
}
func (m *Robot) Stop() bool {
return false
}
func (m *Robot) syncCall() {
defer func() {
if p := recover(); p != nil {
err, ok := interface{}(p).(error)
var errMsg string
if ok {
errMsg = fmt.Sprintf("调用时Panic! (error: %s)", err)
} else {
errMsg = fmt.Sprintf("调用时Panic! (clue: %#v)", p)
}
logrus.Error(errMsg)
}
}()
for {
scene, err := m.sceneQueue.Pop()
if err != nil {
logrus.WithField("err", err).Debug("所有场景执行结束")
m.prepareToStop()
return
}
m.scene = scene
info := m.scene.Info()
start := time.Now()
//这里执行会花很长时间
if err := scene.Run(m); err != nil {
logrus.WithField("err", err).Error("执行业务时发生错误")
continue
}
elapsedTime := time.Since(start)
m.elipseTotal += elapsedTime
logrus.WithField("t", elapsedTime.String()).Debug("场景【" + info.Name + "】执行完毕耗时统计")
//显示场景结果
m.processResult()
}
}
func (m *Robot) prepareToStop() {
atomic.CompareAndSwapUint32(&m.status, STATUS_STARTED, STATUS_STOPPING)
logrus.Debug("关闭结果通道...")
close(m.resultCh)
atomic.StoreUint32(&m.status, STATUS_STOPPED)
}
func (m *Robot) SendResult(result *CallResult) bool {
if atomic.LoadUint32(&m.status) != STATUS_STARTED {
m.printIgnoredResult(result, "停止发送")
return false
}
select {
case m.resultCh <- result:
// logrus.WithField("res", result).Debug("发送结果")
return true
default:
m.printIgnoredResult(result, "通道满了")
return false
}
}
func (m *Robot) processResult() {
go func() {
for r := range m.resultCh {
head := fmt.Sprintf("%s.%s", r.MainType, r.SubType)
if routes, ok := m.ReportMap[r.Num]; ok {
if route, y := routes[head]; y {
max := route.MaxElapse
min := route.MinElapse
if r.Elapse > max {
max = r.Elapse
}
if r.Elapse < min {
min = r.Elapse
}
statis := &Statistics{
Route: head,
SceneName: r.SceneName,
ElapseTotal: route.ElapseTotal + r.Elapse,
MaxElapse: max,
MinElapse: min,
CallCount: route.CallCount + 1,
}
avg := (statis.MaxElapse + statis.MinElapse) / 2
statis.AvgElapse = avg
routes[head] = statis
} else {
statis := &Statistics{
Route: head,
SceneName: r.SceneName,
ElapseTotal: r.Elapse,
MaxElapse: r.Elapse,
MinElapse: r.Elapse,
CallCount: 1,
}
avg := (statis.MaxElapse + statis.MinElapse) / 2
statis.AvgElapse = avg
routes[head] = statis
}
m.ReportMap[r.Num] = routes
} else {
route := make(map[string]*Statistics)
statis := &Statistics{
Route: head,
SceneName: r.SceneName,
ElapseTotal: r.Elapse,
MaxElapse: r.Elapse,
MinElapse: r.Elapse,
CallCount: 1,
}
avg := (statis.MaxElapse + statis.MinElapse) / 2
statis.AvgElapse = avg
route[head] = statis
m.ReportMap[r.Num] = route
}
}
}()
}
func (m *Robot) printIgnoredResult(result *CallResult, cause string) {
resultMsg := fmt.Sprintf(
"MainType=%s, SubType=%s, Elapse=%v",
result.MainType, result.SubType, result.Elapse)
logrus.Warnf("Ignored result: %s. (cause: %s)\n", resultMsg, cause)
}