381 lines
9.5 KiB
Go
381 lines
9.5 KiB
Go
package lib
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io/fs"
|
|
"io/ioutil"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/sirupsen/logrus"
|
|
"google.golang.org/protobuf/proto"
|
|
"legu.airobot/pb"
|
|
"legu.airobot/storage"
|
|
)
|
|
|
|
type IRobot interface {
|
|
// 发送消息
|
|
SendMsg(mainType, subType string, req proto.Message, rsp proto.Message) pb.ErrorCode
|
|
// 存储数据
|
|
Store(key string, data interface{})
|
|
// 获取数据
|
|
Get(key string) interface{}
|
|
// 停止运行
|
|
Stop() bool
|
|
}
|
|
|
|
type Robot struct {
|
|
IStore
|
|
scene IScene //场景
|
|
account string
|
|
sid string
|
|
conn *websocket.Conn
|
|
data map[string]interface{} //机器人缓存数据
|
|
status uint32 //状态
|
|
lock sync.Mutex //数据缓存锁
|
|
sceneQueue *Queue[IScene] //场景队列
|
|
config *storage.Config //配置
|
|
resultCh chan *CallResult //请求结果通道
|
|
sceneResultCh chan *CallResult //场景结果通道
|
|
ReportMap map[int]map[string]*Statistics //测试报告 key1:场景序号 key2:协议
|
|
elipseTotal time.Duration
|
|
}
|
|
|
|
func NewRobot(config *storage.Config) *Robot {
|
|
robot := &Robot{
|
|
data: make(map[string]interface{}),
|
|
sceneQueue: NewQueue[IScene](),
|
|
config: config,
|
|
resultCh: make(chan *CallResult, 100),
|
|
sceneResultCh: make(chan *CallResult, 50),
|
|
ReportMap: make(map[int]map[string]*Statistics),
|
|
}
|
|
robot.Store("sid", config.Global.SId)
|
|
return robot
|
|
}
|
|
|
|
//存数据
|
|
func (a *Robot) Store(key string, data interface{}) {
|
|
defer a.lock.Unlock()
|
|
a.lock.Lock()
|
|
a.data[key] = data
|
|
}
|
|
|
|
//取数据
|
|
func (a *Robot) Get(key string) interface{} {
|
|
defer a.lock.Unlock()
|
|
a.lock.Lock()
|
|
return a.data[key]
|
|
}
|
|
|
|
// 发送消息
|
|
func (r *Robot) SendMsg(mainType, subType string, req proto.Message, rsp proto.Message) pb.ErrorCode {
|
|
start := time.Now()
|
|
defer func() {
|
|
t := time.Since(start)
|
|
logrus.WithFields(logrus.Fields{"MainType": mainType, "SubType": subType, "rsp": rsp, "since": t.String()}).Debug("接收消息")
|
|
var name string
|
|
if r.scene != nil {
|
|
name = r.scene.Info().Name
|
|
}
|
|
// 发送请求结果
|
|
r.SendResult(&CallResult{
|
|
SceneName: name,
|
|
MainType: mainType,
|
|
SubType: subType,
|
|
Elapse: t,
|
|
Num: r.getSortNum(name),
|
|
})
|
|
}()
|
|
|
|
head := &pb.UserMessage{MainType: mainType, SubType: subType}
|
|
if mainType == "user" && subType == "login" {
|
|
loginReq := req.(*pb.UserLoginReq)
|
|
head.Sec = BuildSecStr(loginReq.Sid, loginReq.Account)
|
|
} else {
|
|
head.Sec = BuildSecStr(r.sid, r.account)
|
|
}
|
|
|
|
if ProtoMarshal(req, head) {
|
|
data, _ := proto.Marshal(head)
|
|
if err := r.conn.WriteMessage(websocket.BinaryMessage, data); err != nil {
|
|
logrus.WithField("err", err).Error("写数据异常")
|
|
return pb.ErrorCode_SystemError
|
|
}
|
|
|
|
logrus.WithFields(logrus.Fields{"MainType": mainType, "SubType": subType, "req": req}).Debug("发送消息")
|
|
for {
|
|
_, data, err := r.conn.ReadMessage()
|
|
if err != nil {
|
|
logrus.WithField("err", err).Error("读数据异常")
|
|
break
|
|
}
|
|
|
|
msg := &pb.UserMessage{}
|
|
if err := proto.Unmarshal(data, msg); err != nil {
|
|
logrus.Error("pb解析失败")
|
|
break
|
|
}
|
|
|
|
if msg.MainType == mainType && msg.SubType == subType {
|
|
if !ProtoUnmarshal(msg, rsp) {
|
|
break
|
|
}
|
|
|
|
return pb.ErrorCode_Success
|
|
}
|
|
}
|
|
}
|
|
|
|
return pb.ErrorCode_Success
|
|
}
|
|
|
|
// 设置场景队列
|
|
func (a *Robot) SetScenes(scenes []IScene) {
|
|
scensConf := a.config.Scenes
|
|
sort.SliceStable(scensConf, func(i, j int) bool {
|
|
return scensConf[i].Num < scensConf[j].Num
|
|
})
|
|
for _, conf := range scensConf {
|
|
for _, v := range scenes {
|
|
info := v.Info()
|
|
if conf.Name == info.Name {
|
|
if conf.Loop == 0 || conf.Loop == 1 {
|
|
a.sceneQueue.Add(v)
|
|
continue
|
|
} else if conf.Loop > 1 {
|
|
for i := int32(0); i < conf.Loop; i++ {
|
|
a.sceneQueue.Add(v)
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// 根据场景名称获取顺序号
|
|
func (a *Robot) getSortNum(name string) int {
|
|
var num int
|
|
for _, v := range a.config.Scenes {
|
|
if v.Name == name {
|
|
return v.Num
|
|
}
|
|
}
|
|
return num
|
|
}
|
|
|
|
func (m *Robot) Start() bool {
|
|
if len(m.sceneQueue.List()) == 0 {
|
|
logrus.Warn("没有设置场景队列")
|
|
return false
|
|
}
|
|
|
|
// 创建链接
|
|
dialer := &websocket.Dialer{
|
|
HandshakeTimeout: 2 * time.Second,
|
|
}
|
|
conn, _, err := dialer.Dial(m.config.Global.WsAddr, nil)
|
|
if err != nil {
|
|
logrus.Error(err)
|
|
return false
|
|
}
|
|
|
|
m.conn = conn
|
|
|
|
//检查具备启动的状态
|
|
if !atomic.CompareAndSwapUint32(
|
|
&m.status, STATUS_ORIGINAL, STATUS_STARTING) {
|
|
if !atomic.CompareAndSwapUint32(&m.status, STATUS_STOPPED, STATUS_STARTING) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
//设置状态为已启动
|
|
atomic.StoreUint32(&m.status, STATUS_STARTED)
|
|
|
|
m.syncCall()
|
|
|
|
return true
|
|
}
|
|
|
|
func (m *Robot) Stop() bool {
|
|
return false
|
|
}
|
|
|
|
func (m *Robot) syncCall() {
|
|
for {
|
|
scene, err := m.sceneQueue.Pop()
|
|
if err != nil {
|
|
logrus.WithField("err", err).Debug("所有场景执行结束")
|
|
m.prepareToStop()
|
|
return
|
|
}
|
|
m.scene = scene
|
|
info := m.scene.Info()
|
|
|
|
start := time.Now()
|
|
//这里执行会花很长时间
|
|
if err := scene.Run(m); err != nil {
|
|
logrus.WithField("err", err).Error("执行业务时发生错误")
|
|
break
|
|
}
|
|
elapsedTime := time.Since(start)
|
|
|
|
m.elipseTotal += elapsedTime
|
|
logrus.WithField("t", elapsedTime.String()).Debug("场景【" + info.Name + "】执行完毕耗时统计")
|
|
//显示场景结果
|
|
m.processResult()
|
|
}
|
|
}
|
|
|
|
func (m *Robot) prepareToStop() {
|
|
atomic.CompareAndSwapUint32(&m.status, STATUS_STARTED, STATUS_STOPPING)
|
|
logrus.Debug("关闭结果通道...")
|
|
close(m.resultCh)
|
|
atomic.StoreUint32(&m.status, STATUS_STOPPED)
|
|
}
|
|
|
|
func (m *Robot) SendResult(result *CallResult) bool {
|
|
if atomic.LoadUint32(&m.status) != STATUS_STARTED {
|
|
m.printIgnoredResult(result, "停止发送")
|
|
return false
|
|
}
|
|
select {
|
|
case m.resultCh <- result:
|
|
logrus.WithField("res", result).Debug("发送结果")
|
|
return true
|
|
default:
|
|
m.printIgnoredResult(result, "通道满了")
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (m *Robot) processResult() {
|
|
go func() {
|
|
defer m.lock.Unlock()
|
|
m.lock.Lock()
|
|
for r := range m.resultCh {
|
|
head := fmt.Sprintf("%s.%s", r.MainType, r.SubType)
|
|
if routes, ok := m.ReportMap[r.Num]; ok {
|
|
if route, y := routes[head]; y {
|
|
max := route.MaxElapse
|
|
min := route.MinElapse
|
|
if r.Elapse > max {
|
|
max = r.Elapse
|
|
}
|
|
|
|
if r.Elapse < min {
|
|
min = r.Elapse
|
|
}
|
|
statis := &Statistics{
|
|
Route: head,
|
|
SceneName: r.SceneName,
|
|
ElapseTotal: route.ElapseTotal + r.Elapse,
|
|
MaxElapse: max,
|
|
MinElapse: min,
|
|
CallCount: route.CallCount + 1,
|
|
}
|
|
avg := (statis.MaxElapse + statis.MinElapse) / 2
|
|
statis.AvgElapse = avg
|
|
routes[head] = statis
|
|
} else {
|
|
statis := &Statistics{
|
|
Route: head,
|
|
SceneName: r.SceneName,
|
|
ElapseTotal: r.Elapse,
|
|
MaxElapse: r.Elapse,
|
|
MinElapse: r.Elapse,
|
|
CallCount: 1,
|
|
}
|
|
avg := (statis.MaxElapse + statis.MinElapse) / 2
|
|
statis.AvgElapse = avg
|
|
routes[head] = statis
|
|
}
|
|
m.ReportMap[r.Num] = routes
|
|
} else {
|
|
route := make(map[string]*Statistics)
|
|
statis := &Statistics{
|
|
Route: head,
|
|
SceneName: r.SceneName,
|
|
ElapseTotal: r.Elapse,
|
|
MaxElapse: r.Elapse,
|
|
MinElapse: r.Elapse,
|
|
CallCount: 1,
|
|
}
|
|
avg := (statis.MaxElapse + statis.MinElapse) / 2
|
|
statis.AvgElapse = avg
|
|
route[head] = statis
|
|
m.ReportMap[r.Num] = route
|
|
}
|
|
}
|
|
|
|
}()
|
|
}
|
|
|
|
// 将统计结果写入文件
|
|
// Deprecated
|
|
func (m *Robot) genReport(e time.Duration) {
|
|
var buf bytes.Buffer
|
|
buf.WriteString("测试报告\n")
|
|
buf.WriteString(fmt.Sprintf("用户总数:%d 场景总耗时:%s\n",
|
|
m.config.Global.UserCountTotal, e.String()))
|
|
var msgs []string
|
|
for k, routes := range m.ReportMap {
|
|
// buf.WriteString(fmt.Sprintf("【%s】\n", k))
|
|
for r, d := range routes {
|
|
msgs = append(msgs, fmt.Sprintf("【%s】协议:%s 调用次数:%d 总耗时:%v 平均耗时:%v 最大耗时:%v 最小耗时:%v",
|
|
k, r, d.CallCount, d.ElapseTotal.String(), d.AvgElapse.String(), d.MaxElapse.String(), d.MinElapse.String()))
|
|
}
|
|
|
|
}
|
|
record := strings.Join(msgs, "\n")
|
|
buf.WriteString(record)
|
|
buf.WriteString("\n------------------------------------------------------------------------------\n")
|
|
// logrus.WithField("res", buf.String()).Debug("报告内容")
|
|
if err := ioutil.WriteFile(filepath.Join("./", "report.log"), buf.Bytes(), fs.ModePerm); err != nil {
|
|
logrus.WithField("err", err).Error("测试报告")
|
|
}
|
|
}
|
|
|
|
func (m *Robot) printIgnoredResult(result *CallResult, cause string) {
|
|
resultMsg := fmt.Sprintf(
|
|
"MainType=%s, SubType=%s, Elapse=%v",
|
|
result.MainType, result.SubType, result.Elapse)
|
|
logrus.Warnf("Ignored result: %s. (cause: %s)\n", resultMsg, cause)
|
|
}
|
|
|
|
// Deprecated
|
|
func (m *Robot) checkResp(data []byte, rsp proto.Message) bool {
|
|
msg := &pb.UserMessage{}
|
|
if err := proto.Unmarshal(data, msg); err != nil {
|
|
logrus.Error("pb解析失败")
|
|
return false
|
|
}
|
|
methodStr := msg.Data.TypeUrl
|
|
methodName := SubStr(methodStr, 20, len(methodStr))
|
|
|
|
if strings.HasSuffix(methodName, "Push") {
|
|
if methodName == "NotifyErrorNotifyPush" {
|
|
push := &pb.NotifyErrorNotifyPush{}
|
|
if !ProtoUnmarshal(msg, push) {
|
|
logrus.Error("pb解析失败")
|
|
return false
|
|
}
|
|
logrus.WithField("methodName", methodName).WithField("code", push.Code).Debug("收到错误推送")
|
|
}
|
|
return false
|
|
} else {
|
|
if !ProtoUnmarshal(msg, rsp) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|