368 lines
11 KiB
Go
368 lines
11 KiB
Go
package robot
|
|
|
|
import (
|
|
"fmt"
|
|
"go_dreamfactory/comm"
|
|
"go_dreamfactory/lego/core"
|
|
"go_dreamfactory/lego/sys/log"
|
|
"go_dreamfactory/pb"
|
|
cfg "go_dreamfactory/sys/configure/structs"
|
|
"go_dreamfactory/utils"
|
|
"math/rand"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
jsoniter "github.com/json-iterator/go"
|
|
"github.com/nacos-group/nacos-sdk-go/util"
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/types/known/anypb"
|
|
)
|
|
|
|
//机器人统计数据
|
|
type RobotStatistics struct {
|
|
message string
|
|
time int64
|
|
reqsize int64
|
|
}
|
|
|
|
type Robot struct {
|
|
Client
|
|
monitor IRobotMonitor
|
|
debug bool
|
|
index int32 //编号
|
|
account, serverId string
|
|
curreq string //当前请求
|
|
await chan *MessageResp
|
|
modules map[core.M_Modules]IModuleRobot
|
|
cycle bool
|
|
pipeline []*Pipeline //执行流水线
|
|
statistics []*RobotStatistics
|
|
receiveNum map[string]int32 //接收消息统计
|
|
receiveSize map[string]int64 //接收消息统计
|
|
wtaskerror error
|
|
awaitState int32 //状态 1没有等待 2等待回应 3关闭
|
|
}
|
|
|
|
func (this *Robot) Account() string {
|
|
return this.account
|
|
}
|
|
|
|
func (this *Robot) ServerId() string {
|
|
return this.serverId
|
|
}
|
|
|
|
func (this *Robot) GetModule(module core.M_Modules) IModuleRobot {
|
|
return this.modules[module]
|
|
}
|
|
|
|
//初始化
|
|
func (this *Robot) Init(addr string, client IClient) (err error) {
|
|
if err = this.Client.Init(addr, client); err != nil {
|
|
return
|
|
}
|
|
this.awaitState = 1
|
|
this.await = make(chan *MessageResp)
|
|
this.modules = make(map[core.M_Modules]IModuleRobot)
|
|
this.statistics = make([]*RobotStatistics, 0, 100)
|
|
this.receiveNum = make(map[string]int32)
|
|
this.receiveSize = make(map[string]int64)
|
|
this.modules[comm.ModuleUser] = new(ModuleRobot_User)
|
|
this.modules[comm.ModuleSys] = new(ModuleRobot_Sys)
|
|
this.modules[comm.ModuleGM] = new(ModuleRobot_GM)
|
|
this.modules[comm.ModuleHero] = new(ModuleRobot_Hero)
|
|
this.modules[comm.ModuleEquipment] = new(ModuleRobot_Equipment)
|
|
this.modules[comm.ModuleItems] = new(ModuleRobot_Item)
|
|
this.modules[comm.ModuleWtask] = new(ModuleRobot_WTask)
|
|
this.modules[comm.ModulePractice] = new(ModuleRobot_Practice)
|
|
this.modules[comm.ModuleMainline] = new(ModuleRobot_MainLine)
|
|
this.modules[comm.ModuleArena] = new(ModuleRobot_Arena)
|
|
this.modules[comm.ModulePagoda] = new(ModuleRobot_Pagoda)
|
|
this.modules[comm.ModuleViking] = new(ModuleRobot_Viking)
|
|
this.modules[comm.ModuleChat] = new(ModuleRobot_Chat)
|
|
this.modules[comm.ModuleShop] = new(ModuleRobot_Shop)
|
|
this.modules[comm.ModuleCaravan] = new(ModuleRobot_Caravan)
|
|
this.modules[comm.ModuleHoroscope] = new(ModuleRobot_Horoscope)
|
|
this.modules[comm.ModuleCombat] = new(ModuleRobot_Combat)
|
|
this.modules[comm.ModuleSmithy] = new(ModuleRobot_Smithy)
|
|
this.modules[comm.ModuleFriend] = new(ModuleRobot_Friend)
|
|
this.modules[comm.ModulePasson] = new(ModuleRobot_Passon)
|
|
this.modules[comm.ModuleStoryLine] = new(ModuleRobot_Storyline)
|
|
this.modules[comm.ModuleDailytask] = new(ModuleRobot_Dailytask)
|
|
this.modules[comm.ModuleDragon] = new(ModuleRobot_Dragon)
|
|
this.modules[comm.ModuleIntegral] = new(ModuleRobot_Integral)
|
|
this.modules[comm.ModuleMail] = new(ModuleRobot_Mail)
|
|
|
|
for _, v := range this.modules {
|
|
v.Init()
|
|
}
|
|
go this.run()
|
|
return
|
|
}
|
|
|
|
//接收消息
|
|
func (this *Robot) Receive(msg *pb.UserMessage) (err error) {
|
|
var (
|
|
msgpath string
|
|
message proto.Message
|
|
module IModuleRobot
|
|
ok bool
|
|
)
|
|
msgpath = fmt.Sprintf("%s.%s", msg.MainType, msg.SubType)
|
|
|
|
if msgpath == "gateway.heartbeat" { //心跳 屏蔽掉
|
|
return
|
|
}
|
|
this.receiveNum[msgpath]++
|
|
this.receiveSize[msgpath] += int64(len(msg.Data.Value))
|
|
// log.Debug("[机器人 Resp]", log.Field{Key: "Account", Value: this.account}, log.Field{Key: "message", Value: msgpath})
|
|
//序列化用户消息对象
|
|
if message, err = msg.Data.UnmarshalNew(); err != nil {
|
|
log.Errorf("[Robot Receive] UserMessage:%s Unmarshal err:%v", msgpath, err)
|
|
return err
|
|
}
|
|
if msgpath == "notify.errornotify" { //错误通知
|
|
resp := message.(*pb.NotifyErrorNotifyPush)
|
|
reqpath := fmt.Sprintf("%s.%s", resp.ReqMainType, resp.ReqSubType)
|
|
if reqpath == this.curreq { //收到回应
|
|
if atomic.CompareAndSwapInt32(&this.awaitState, 2, 1) { //状态说判断
|
|
this.await <- &MessageResp{
|
|
resp: nil,
|
|
errdata: resp.Err,
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
if module, ok = this.modules[core.M_Modules(msg.MainType)]; ok {
|
|
module.Receive(this, msg.SubType, message)
|
|
} else {
|
|
// log.Errorf("[Robot Receive] no register module:%s", msg.MainType)
|
|
return
|
|
}
|
|
|
|
if msgpath == this.curreq { //收到回应
|
|
if atomic.CompareAndSwapInt32(&this.awaitState, 2, 1) { //状态说判断
|
|
this.await <- &MessageResp{
|
|
resp: message,
|
|
errdata: nil,
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (this *Robot) WriteMsg(msg *pb.UserMessage) (err error) {
|
|
msg.Sec = this.buildSecStr()
|
|
return this.Client.WriteMsg(msg)
|
|
}
|
|
|
|
//发送消息
|
|
func (this *Robot) SendMessage(mtype, stype string, msg proto.Message) (resp proto.Message, errdata *pb.ErrorData) {
|
|
var (
|
|
messageresp *MessageResp
|
|
err error
|
|
)
|
|
stime := time.Now()
|
|
data, _ := anypb.New(msg)
|
|
message := &pb.UserMessage{
|
|
MainType: mtype,
|
|
SubType: stype,
|
|
Data: data,
|
|
}
|
|
|
|
if mtype != "gateway" {
|
|
this.curreq = fmt.Sprintf("%s.%s", mtype, stype)
|
|
if err = this.WriteMsg(message); err != nil {
|
|
errdata = &pb.ErrorData{
|
|
Code: pb.ErrorCode_ClientError,
|
|
Title: pb.ErrorCode_ClientError.String(),
|
|
Message: err.Error(),
|
|
}
|
|
return
|
|
}
|
|
if !atomic.CompareAndSwapInt32(&this.awaitState, 1, 2) { //状态说判断
|
|
errdata = &pb.ErrorData{
|
|
Code: pb.ErrorCode_ClientError,
|
|
Title: pb.ErrorCode_ClientError.String(),
|
|
Message: err.Error(),
|
|
}
|
|
return
|
|
}
|
|
messageresp = <-this.await //等待回应
|
|
resp = messageresp.resp
|
|
errdata = messageresp.errdata
|
|
if this.debug {
|
|
if errdata == nil {
|
|
log.Debug("[机器人 Message]", log.Field{Key: "t", Value: time.Since(stime).Milliseconds()}, log.Field{Key: "Account", Value: this.account}, log.Field{Key: "message", Value: fmt.Sprintf("%s.%s", mtype, stype)}, log.Field{Key: "errdata", Value: errdata == nil})
|
|
} else {
|
|
log.Error("[机器人 Message]", log.Field{Key: "t", Value: time.Since(stime).Milliseconds()}, log.Field{Key: "Account", Value: this.account}, log.Field{Key: "message", Value: fmt.Sprintf("%s.%s", mtype, stype)}, log.Field{Key: "errdata", Value: errdata.String()})
|
|
}
|
|
}
|
|
this.statistics = append(this.statistics, &RobotStatistics{
|
|
message: fmt.Sprintf("%s.%s", mtype, stype),
|
|
time: time.Since(stime).Milliseconds(),
|
|
reqsize: int64(len(data.Value)),
|
|
})
|
|
time.Sleep(time.Millisecond * time.Duration(50+rand.Int31n(300)))
|
|
} else {
|
|
if err = this.WriteMsg(message); err != nil {
|
|
errdata = &pb.ErrorData{
|
|
Code: pb.ErrorCode_ClientError,
|
|
Title: pb.ErrorCode_ClientError.String(),
|
|
Message: err.Error(),
|
|
}
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
//发送消息
|
|
func (this *Robot) SendTaskMessage(task, comdi int32, mtype, stype string, msg proto.Message) (resp proto.Message, errdata *pb.ErrorData) {
|
|
var (
|
|
messageresp *MessageResp
|
|
err error
|
|
)
|
|
stime := time.Now()
|
|
data, _ := anypb.New(msg)
|
|
message := &pb.UserMessage{
|
|
MainType: mtype,
|
|
SubType: stype,
|
|
Data: data,
|
|
}
|
|
if err = this.WriteMsg(message); err != nil {
|
|
errdata = &pb.ErrorData{
|
|
Code: pb.ErrorCode_ClientError,
|
|
Title: pb.ErrorCode_ClientError.String(),
|
|
Message: err.Error(),
|
|
}
|
|
return
|
|
}
|
|
if mtype != "gateway" {
|
|
this.curreq = fmt.Sprintf("%s.%s", mtype, stype)
|
|
if !atomic.CompareAndSwapInt32(&this.awaitState, 1, 2) { //状态说判断
|
|
errdata = &pb.ErrorData{
|
|
Code: pb.ErrorCode_ClientError,
|
|
Title: pb.ErrorCode_ClientError.String(),
|
|
Message: err.Error(),
|
|
}
|
|
return
|
|
}
|
|
messageresp = <-this.await //等待回应
|
|
resp = messageresp.resp
|
|
errdata = messageresp.errdata
|
|
if this.debug {
|
|
if errdata == nil {
|
|
log.Debug("[机器人 Message]", log.Field{Key: "t", Value: time.Since(stime).Milliseconds()}, log.Field{Key: "Account", Value: this.account}, log.Field{Key: "Task", Value: fmt.Sprintf("[%d-%d]", task, comdi)}, log.Field{Key: "message", Value: fmt.Sprintf("%s.%s", mtype, stype)}, log.Field{Key: "errdata", Value: errdata == nil})
|
|
} else {
|
|
log.Error("[机器人 Message]", log.Field{Key: "t", Value: time.Since(stime).Milliseconds()}, log.Field{Key: "Account", Value: this.account}, log.Field{Key: "Task", Value: fmt.Sprintf("[%d-%d]", task, comdi)}, log.Field{Key: "message", Value: fmt.Sprintf("%s.%s", mtype, stype)}, log.Field{Key: "errdata", Value: errdata.String()})
|
|
}
|
|
}
|
|
|
|
this.statistics = append(this.statistics, &RobotStatistics{
|
|
message: fmt.Sprintf("%s.%s", mtype, stype),
|
|
time: time.Since(stime).Milliseconds(),
|
|
})
|
|
time.Sleep(time.Millisecond * time.Duration(50+rand.Int31n(300)))
|
|
}
|
|
return
|
|
}
|
|
|
|
func (this *Robot) buildSecStr() string {
|
|
jsonByte, _ := jsoniter.Marshal(&SecBuild{
|
|
Account: this.account,
|
|
ServerId: this.serverId,
|
|
TimeStamp: time.Now().Unix(),
|
|
})
|
|
jsonBase64 := utils.Base64Encode(jsonByte)
|
|
// log.Printf("client base64:%s", jsonBase64)
|
|
clientMd5key := util.Md5(jsonBase64)
|
|
// log.Printf("client md5:%s", clientMd5key)
|
|
return fmt.Sprintf("CE:%s%s", clientMd5key, jsonBase64)
|
|
}
|
|
func (this *Robot) Heartbeat() {
|
|
this.SendMessage("gateway", "heartbeat", &pb.GatewayHeartbeatReq{})
|
|
}
|
|
|
|
func (this *Robot) OnClose() {
|
|
log.Debug("[机器人 End]", log.Field{Key: "Account", Value: this.account})
|
|
if atomic.CompareAndSwapInt32(&this.awaitState, 2, 3) { //等待中
|
|
this.await <- &MessageResp{
|
|
resp: nil,
|
|
errdata: &pb.ErrorData{
|
|
Code: pb.ErrorCode_ClientError,
|
|
Message: "client link closed",
|
|
},
|
|
}
|
|
} else {
|
|
atomic.StoreInt32(&this.awaitState, 3)
|
|
}
|
|
|
|
this.monitor.RobotFinishedTest(this)
|
|
}
|
|
|
|
func (this *Robot) DoTask(taskconf *cfg.GameWorldTaskData, condconf *cfg.GameBuriedCondiData, moduleStr core.M_Modules) (err error) {
|
|
var (
|
|
ok bool
|
|
module IModuleRobot
|
|
)
|
|
if module, ok = this.modules[moduleStr]; ok {
|
|
err = module.DoTask(this, taskconf, condconf)
|
|
} else {
|
|
err = fmt.Errorf("no fund module:%s", moduleStr)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
//运行
|
|
func (this *Robot) run() {
|
|
var (
|
|
module IModuleRobot
|
|
err error
|
|
)
|
|
for _, v := range this.pipeline {
|
|
module = this.modules[core.M_Modules(v.Module)]
|
|
if err = module.OncePipeline(this); err != nil {
|
|
log.Error("[机器人 初始执行错误]", log.Field{Key: "Account", Value: this.account}, log.Field{Key: "module", Value: v}, log.Field{Key: "err", Value: err.Error()})
|
|
this.Close()
|
|
return
|
|
}
|
|
time.Sleep(time.Millisecond * time.Duration(100+rand.Int31n(1000)))
|
|
}
|
|
|
|
for this.cycle {
|
|
this.cycle = false
|
|
for _, v := range this.pipeline {
|
|
if v.CurrNum < v.Exenum {
|
|
module = this.modules[core.M_Modules(v.Module)]
|
|
if err = module.DoPipeline(this); err != nil {
|
|
|
|
if this.debug {
|
|
log.Error("[机器人 执行异常]", log.Field{Key: "Account", Value: this.account}, log.Field{Key: "module", Value: v}, log.Field{Key: "err", Value: err.Error()})
|
|
}
|
|
|
|
if atomic.LoadInt32(&this.awaitState) == 3 { //已经关闭 直接退出
|
|
if this.debug {
|
|
log.Infof("[机器人 执行异常] 链接已经关闭", log.Field{Key: "Account", Value: this.account}, log.Field{Key: "module", Value: v})
|
|
}
|
|
return
|
|
}
|
|
|
|
if !v.ErrNotStop {
|
|
log.Debug("[机器人 退出循环]")
|
|
this.Close()
|
|
return
|
|
}
|
|
}
|
|
v.CurrNum++
|
|
if v.CurrNum < v.Exenum {
|
|
this.cycle = true
|
|
}
|
|
}
|
|
time.Sleep(time.Millisecond * time.Duration(100+rand.Int31n(1000)))
|
|
}
|
|
}
|
|
this.Close()
|
|
}
|