go_dreamfactory/modules/battle/battleclient.go
2023-03-04 20:59:44 +08:00

167 lines
3.9 KiB
Go

package battle
import (
"context"
"go_dreamfactory/lego/sys/log"
"go_dreamfactory/pb"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
func newClient(addr string, mgr IClientMgr, log log.ILogger) (c *client, err error) {
msg := &pb.BattleRpcMessage{
Rid: 0,
Method: "Heartbeat",
}
data, _ := proto.Marshal(msg)
c = &client{addr: addr, mgr: mgr, log: log, seq: 1, state: 1, pending: make(map[uint64]*MessageCall), heartbeatpack: data}
dialer := websocket.Dialer{}
ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
if c.conn, _, err = dialer.DialContext(ctx, addr, nil); err != nil {
return
}
go c.run()
c.heartbeat()
return
}
type client struct {
addr string
mgr IClientMgr
log log.ILogger
conn *websocket.Conn
state int32 //状态 0 关闭 1 运行 2 关闭中
seq uint64
pendingmutex sync.Mutex
pending map[uint64]*MessageCall //高并发回调
heartbeatpack []byte
}
//校验战斗过程
func (this *client) callBattle(ctx context.Context, method string, req proto.Message, reply proto.Message) (err error) {
call := new(MessageCall)
call.Done = make(chan *MessageCall, 10)
call.Args = req
call.Reply = reply
this.pendingmutex.Lock()
seq := this.seq
this.seq++
this.pending[seq] = call
this.pendingmutex.Unlock()
msg := &pb.BattleRpcMessage{
Rid: seq,
Method: method,
}
msg.Data, _ = anypb.New(req)
data, _ := proto.Marshal(msg)
if err = this.conn.WriteMessage(websocket.BinaryMessage, data); err != nil {
this.pendingmutex.Lock()
delete(this.pending, seq)
this.pendingmutex.Unlock()
return
}
select {
case <-ctx.Done(): // cancel by context
this.pendingmutex.Lock()
call := this.pending[seq]
delete(this.pending, seq)
this.pendingmutex.Unlock()
if call != nil {
call.Error = ctx.Err()
call.done(this.log)
}
return ctx.Err()
case call := <-call.Done:
err = call.Error
}
return
}
func (this *client) run() {
var (
data []byte
// msg *pb.BattleRpcMessage = &pb.BattleRpcMessage{}
err error
)
locp:
for {
if _, data, err = this.conn.ReadMessage(); err != nil {
this.log.Errorf("client err:%v", err)
this.Close()
break locp
}
msg := &pb.BattleRpcMessage{}
if err = proto.Unmarshal(data, msg); err != nil {
this.log.Errorf("client Unmarshal err:%v", err)
continue
}
this.log.Debugf("PVP OnMessage:%v", msg)
this.log.Debugf("接收战斗校验结果:%v", msg)
go this.handleresponse(msg)
}
}
func (this *client) handleresponse(resp *pb.BattleRpcMessage) {
var call *MessageCall
if resp.Rid != 0 {
this.pendingmutex.Lock()
call = this.pending[resp.Rid]
delete(this.pending, resp.Rid)
this.pendingmutex.Unlock()
call.Error = resp.Data.UnmarshalTo(call.Reply)
call.done(this.log)
} else {
switch resp.Method {
case "BattleOutCmd": //输出指令
if msg, err := resp.Data.UnmarshalNew(); err != nil {
this.log.Errorf("C# OutCmd Unmarshal err:%v", err)
break
} else {
this.mgr.BattleOutCmd(msg.(*pb.BattleOutCmdPush))
}
break
case "BattleFished": //战斗结束指令
if msg, err := resp.Data.UnmarshalNew(); err != nil {
this.log.Errorf("C# OutCmd Unmarshal err:%v", err)
break
} else {
this.mgr.BattleFinish(msg.(*pb.BattleFinishPush))
}
break
default:
this.log.Warnf("收到异常消息回包 resp:%v", resp)
}
}
}
func (this *client) heartbeat() {
go func() {
timer := time.NewTicker(time.Second * 30)
locp:
for {
select {
case <-timer.C:
if err := this.conn.WriteMessage(websocket.BinaryMessage, this.heartbeatpack); err != nil {
break locp
}
}
}
timer.Stop()
}()
}
//外部代用关闭
func (this *client) Close() {
if !atomic.CompareAndSwapInt32(&this.state, 1, 2) {
return
}
this.conn.Close()
atomic.StoreInt32(&this.state, 0)
this.mgr.Shutdown(this)
}