go_dreamfactory/modules/battle/clients.go
2022-11-28 15:26:39 +08:00

139 lines
3.2 KiB
Go

package battle
import (
"context"
"go_dreamfactory/modules"
"go_dreamfactory/pb"
"sync"
"go_dreamfactory/lego/core"
"go_dreamfactory/lego/sys/log"
"github.com/gorilla/websocket"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
/*
战斗服务客户端组件
*/
type clientComp struct {
modules.MCompGate
options *Options
service core.IService
module *Battle
clinets []*client
i int
}
//组件初始化接口
func (this *clientComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
this.MCompGate.Init(service, module, comp, options)
this.options = options.(*Options)
this.module = module.(*Battle)
this.clinets = make([]*client, len(this.options.BattleServerAddr))
return
}
func (this *clientComp) Start() (err error) {
err = this.MCompGate.Start()
for i, v := range this.options.BattleServerAddr {
if this.clinets[i], err = newClient(v, this.options.Log); err != nil {
return
}
}
return
}
func (this *clientComp) callBattle(ctx context.Context, method string, req proto.Message, reply proto.Message) (err error) {
i := this.i % len(this.clinets)
this.i += 1
err = this.clinets[i].callBattle(ctx, method, req, reply)
return
}
func newClient(addr string, log log.ILogger) (c *client, err error) {
c = &client{log: log, pending: make(map[uint64]*MessageCall)}
dialer := websocket.Dialer{}
c.conn, _, err = dialer.Dial(addr, nil)
return
}
type client struct {
log log.ILogger
conn *websocket.Conn
seq uint64
pendingmutex sync.Mutex
pending map[uint64]*MessageCall //高并发回调
}
//校验战斗过程
func (this *client) callBattle(ctx context.Context, method string, req proto.Message, reply proto.Message) (err error) {
call := new(MessageCall)
call.Done = make(chan *MessageCall, 10)
call.Args = req
call.Reply = reply
this.pendingmutex.Lock()
seq := this.seq
this.seq++
this.pending[seq] = call
this.pendingmutex.Unlock()
msg := &pb.BattleRpcMessage{
Rid: seq,
Method: method,
}
msg.Data, _ = anypb.New(req)
data, _ := proto.Marshal(msg)
if err = this.conn.WriteMessage(websocket.BinaryMessage, data); err != nil {
this.pendingmutex.Lock()
delete(this.pending, seq)
this.pendingmutex.Unlock()
return
}
select {
case <-ctx.Done(): // cancel by context
this.pendingmutex.Lock()
call := this.pending[seq]
delete(this.pending, seq)
this.pendingmutex.Unlock()
if call != nil {
call.Error = ctx.Err()
call.done(this.log)
}
return ctx.Err()
case call := <-call.Done:
err = call.Error
}
return
}
func (this *client) run() {
var (
data []byte
msg *pb.BattleRpcMessage = &pb.BattleRpcMessage{}
err error
)
locp:
for {
if _, data, err = this.conn.ReadMessage(); err != nil {
this.log.Errorf("client err:%v", err)
break locp
}
if err = proto.Unmarshal(data, msg); err != nil {
this.log.Errorf("client Unmarshal err:%v", err)
break locp
}
go this.handleresponse(msg)
}
}
func (this *client) handleresponse(resp *pb.BattleRpcMessage) {
var call *MessageCall
this.pendingmutex.Lock()
call = this.pending[resp.Rid]
delete(this.pending, resp.Rid)
this.pendingmutex.Unlock()
call.Error = resp.Data.UnmarshalTo(call.Reply)
call.done(this.log)
}