package battle import ( "context" "go_dreamfactory/lego/sys/log" "go_dreamfactory/pb" "sync" "sync/atomic" "time" "github.com/gorilla/websocket" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" ) func newClient(addr string, mgr IClientMgr, log log.ILogger) (c *client, err error) { msg := &pb.BattleRpcMessage{ Rid: 0, Method: "Heartbeat", } data, _ := proto.Marshal(msg) c = &client{addr: addr, mgr: mgr, log: log, seq: 1, state: 1, pending: make(map[uint64]*MessageCall), heartbeatpack: data} dialer := websocket.Dialer{} ctx, _ := context.WithTimeout(context.Background(), time.Second*5) if c.conn, _, err = dialer.DialContext(ctx, addr, nil); err != nil { return } go c.run() c.heartbeat() return } type client struct { addr string mgr IClientMgr log log.ILogger conn *websocket.Conn state int32 //状态 0 关闭 1 运行 2 关闭中 seq uint64 pendingmutex sync.Mutex pending map[uint64]*MessageCall //高并发回调 heartbeatpack []byte } //校验战斗过程 func (this *client) callBattle(ctx context.Context, method string, req proto.Message, reply proto.Message) (err error) { call := new(MessageCall) call.Done = make(chan *MessageCall, 10) call.Args = req call.Reply = reply this.pendingmutex.Lock() seq := this.seq this.seq++ this.pending[seq] = call this.pendingmutex.Unlock() msg := &pb.BattleRpcMessage{ Rid: seq, Method: method, } msg.Data, _ = anypb.New(req) data, _ := proto.Marshal(msg) if err = this.conn.WriteMessage(websocket.BinaryMessage, data); err != nil { this.pendingmutex.Lock() delete(this.pending, seq) this.pendingmutex.Unlock() return } select { case <-ctx.Done(): // cancel by context this.pendingmutex.Lock() call := this.pending[seq] delete(this.pending, seq) this.pendingmutex.Unlock() if call != nil { call.Error = ctx.Err() call.done(this.log) } return ctx.Err() case call := <-call.Done: err = call.Error } return } func (this *client) run() { var ( data []byte // msg *pb.BattleRpcMessage = &pb.BattleRpcMessage{} err error ) locp: for { if _, data, err = this.conn.ReadMessage(); err != nil { this.log.Errorf("client err:%v", err) this.Close() break locp } msg := &pb.BattleRpcMessage{} if err = proto.Unmarshal(data, msg); err != nil { this.log.Errorf("client Unmarshal err:%v", err) continue } this.log.Debugf("PVP OnMessage:%v", msg) this.log.Debugf("接收战斗校验结果:%v", msg) go this.handleresponse(msg) } } func (this *client) handleresponse(resp *pb.BattleRpcMessage) { var call *MessageCall if resp.Rid != 0 { this.pendingmutex.Lock() call = this.pending[resp.Rid] delete(this.pending, resp.Rid) this.pendingmutex.Unlock() call.Error = resp.Data.UnmarshalTo(call.Reply) call.done(this.log) } else { switch resp.Method { case "BattleOutCmd": //输出指令 if msg, err := resp.Data.UnmarshalNew(); err != nil { this.log.Errorf("C# OutCmd Unmarshal err:%v", err) break } else { this.mgr.BattleOutCmd(msg.(*pb.BattleOutCmdPush)) } break case "BattleFished": //战斗结束指令 if msg, err := resp.Data.UnmarshalNew(); err != nil { this.log.Errorf("C# OutCmd Unmarshal err:%v", err) break } else { this.mgr.BattleFinish(msg.(*pb.BattleFinishPush)) } break default: this.log.Warnf("收到异常消息回包 resp:%v", resp) } } } func (this *client) heartbeat() { go func() { timer := time.NewTicker(time.Second * 30) locp: for { select { case <-timer.C: if err := this.conn.WriteMessage(websocket.BinaryMessage, this.heartbeatpack); err != nil { break locp } } } timer.Stop() }() } //外部代用关闭 func (this *client) Close() { if !atomic.CompareAndSwapInt32(&this.state, 1, 2) { return } this.conn.Close() atomic.StoreInt32(&this.state, 0) this.mgr.Shutdown(this) }