package battle import ( "context" "go_dreamfactory/pb" "sync" "sync/atomic" "go_dreamfactory/lego/core" "go_dreamfactory/lego/core/cbase" "go_dreamfactory/lego/sys/log" "github.com/gorilla/websocket" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" ) /* 战斗服务客户端组件 */ type clientComp struct { cbase.ModuleCompBase options *Options service core.IService module *Battle mutexs []sync.Mutex clinets []*client i int } //组件初始化接口 func (this *clientComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { this.ModuleCompBase.Init(service, module, comp, options) this.options = options.(*Options) this.module = module.(*Battle) this.mutexs = make([]sync.Mutex, len(this.options.BattleServerAddr)) this.clinets = make([]*client, len(this.options.BattleServerAddr)) return } func (this *clientComp) Start() (err error) { err = this.ModuleCompBase.Start() for i, v := range this.options.BattleServerAddr { if this.clinets[i], err = newClient(v, i, this, this.options.Log); err != nil { return } } return } //关闭客户端连接对象 func (this *clientComp) Shutdown(c *client) { this.mutexs[c.index].Lock() defer this.mutexs[c.index].Unlock() this.clinets[c.index] = nil this.module.Errorf("战斗校验服务%d 被关闭! ", c.index) } func (this *clientComp) CheckBattle(ctx context.Context, req proto.Message) (reply *pb.BattleCheckResults, err error) { var ( c *client ) reply = &pb.BattleCheckResults{} if c, err = this.selector(); err != nil { return } err = c.callBattle(ctx, "Check", req, reply) return } //选择连接服务器 func (this *clientComp) selector() (c *client, err error) { i := this.i % len(this.clinets) this.i += 1 if this.clinets[i] != nil { c = this.clinets[i] return } else { if this.clinets[i], err = newClient(this.options.BattleServerAddr[i], i, this, this.options.Log); err != nil { this.module.Errorln(err) } } return } func newClient(addr string, i int, mgr IClientMgr, log log.ILogger) (c *client, err error) { c = &client{mgr: mgr, log: log, index: i, state: 1, pending: make(map[uint64]*MessageCall)} dialer := websocket.Dialer{} c.conn, _, err = dialer.Dial(addr, nil) return } type client struct { mgr IClientMgr log log.ILogger index int conn *websocket.Conn state int32 //状态 0 关闭 1 运行 2 关闭中 seq uint64 pendingmutex sync.Mutex pending map[uint64]*MessageCall //高并发回调 } //校验战斗过程 func (this *client) callBattle(ctx context.Context, method string, req proto.Message, reply proto.Message) (err error) { call := new(MessageCall) call.Done = make(chan *MessageCall, 10) call.Args = req call.Reply = reply this.pendingmutex.Lock() seq := this.seq this.seq++ this.pending[seq] = call this.pendingmutex.Unlock() msg := &pb.BattleRpcMessage{ Rid: seq, Method: method, } msg.Data, _ = anypb.New(req) data, _ := proto.Marshal(msg) if err = this.conn.WriteMessage(websocket.BinaryMessage, data); err != nil { this.pendingmutex.Lock() delete(this.pending, seq) this.pendingmutex.Unlock() return } select { case <-ctx.Done(): // cancel by context this.pendingmutex.Lock() call := this.pending[seq] delete(this.pending, seq) this.pendingmutex.Unlock() if call != nil { call.Error = ctx.Err() call.done(this.log) } return ctx.Err() case call := <-call.Done: err = call.Error } return } func (this *client) run() { var ( data []byte msg *pb.BattleRpcMessage = &pb.BattleRpcMessage{} err error ) locp: for { if _, data, err = this.conn.ReadMessage(); err != nil { this.log.Errorf("client err:%v", err) this.Close() break locp } if err = proto.Unmarshal(data, msg); err != nil { this.log.Errorf("client Unmarshal err:%v", err) continue } go this.handleresponse(msg) } } func (this *client) handleresponse(resp *pb.BattleRpcMessage) { var call *MessageCall this.pendingmutex.Lock() call = this.pending[resp.Rid] delete(this.pending, resp.Rid) this.pendingmutex.Unlock() call.Error = resp.Data.UnmarshalTo(call.Reply) call.done(this.log) } //外部代用关闭 func (this *client) Close() { if !atomic.CompareAndSwapInt32(&this.state, 1, 2) { return } this.conn.Close() atomic.StoreInt32(&this.state, 0) this.mgr.Shutdown(this) }