package battle import ( "context" "go_dreamfactory/modules" "go_dreamfactory/pb" "sync" "go_dreamfactory/lego/core" "go_dreamfactory/lego/sys/log" "github.com/gorilla/websocket" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" ) /* 战斗服务客户端组件 */ type clientComp struct { modules.MCompGate options *Options service core.IService module *Battle clinets []*client i int } //组件初始化接口 func (this *clientComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { this.MCompGate.Init(service, module, comp, options) this.options = options.(*Options) this.module = module.(*Battle) this.clinets = make([]*client, len(this.options.BattleServerAddr)) return } func (this *clientComp) Start() (err error) { err = this.MCompGate.Start() for i, v := range this.options.BattleServerAddr { if this.clinets[i], err = newClient(v, this.options.Log); err != nil { return } } return } func (this *clientComp) callBattle(ctx context.Context, method string, req proto.Message, reply proto.Message) (err error) { i := this.i % len(this.clinets) this.i += 1 err = this.clinets[i].callBattle(ctx, method, req, reply) return } func newClient(addr string, log log.ILogger) (c *client, err error) { c = &client{log: log, pending: make(map[uint64]*MessageCall)} dialer := websocket.Dialer{} c.conn, _, err = dialer.Dial(addr, nil) return } type client struct { log log.ILogger conn *websocket.Conn seq uint64 pendingmutex sync.Mutex pending map[uint64]*MessageCall //高并发回调 } //校验战斗过程 func (this *client) callBattle(ctx context.Context, method string, req proto.Message, reply proto.Message) (err error) { call := new(MessageCall) call.Done = make(chan *MessageCall, 10) call.Args = req call.Reply = reply this.pendingmutex.Lock() seq := this.seq this.seq++ this.pending[seq] = call this.pendingmutex.Unlock() msg := &pb.BattleRpcMessage{ Rid: seq, Method: method, } msg.Data, _ = anypb.New(req) data, _ := proto.Marshal(msg) if err = this.conn.WriteMessage(websocket.BinaryMessage, data); err != nil { this.pendingmutex.Lock() delete(this.pending, seq) this.pendingmutex.Unlock() return } select { case <-ctx.Done(): // cancel by context this.pendingmutex.Lock() call := this.pending[seq] delete(this.pending, seq) this.pendingmutex.Unlock() if call != nil { call.Error = ctx.Err() call.done(this.log) } return ctx.Err() case call := <-call.Done: err = call.Error } return } func (this *client) run() { var ( data []byte msg *pb.BattleRpcMessage = &pb.BattleRpcMessage{} err error ) locp: for { if _, data, err = this.conn.ReadMessage(); err != nil { this.log.Errorf("client err:%v", err) break locp } if err = proto.Unmarshal(data, msg); err != nil { this.log.Errorf("client Unmarshal err:%v", err) break locp } go this.handleresponse(msg) } } func (this *client) handleresponse(resp *pb.BattleRpcMessage) { var call *MessageCall this.pendingmutex.Lock() call = this.pending[resp.Rid] delete(this.pending, resp.Rid) this.pendingmutex.Unlock() call.Error = resp.Data.UnmarshalTo(call.Reply) call.done(this.log) }