package robot import ( "fmt" "go_dreamfactory/pb" "sync" "sync/atomic" "time" "github.com/gorilla/websocket" "google.golang.org/protobuf/proto" ) type Client struct { client IClient wsConn *websocket.Conn writeChan chan []byte state int32 //状态 0 关闭 1 运行 2 关闭中 closeSignal chan bool wg sync.WaitGroup account, sid string } func (this *Client) Init(addr string, client IClient) (err error) { var ( comm *websocket.Conn ) if comm, _, err = websocket.DefaultDialer.Dial(addr, nil); err != nil { return } this.client = client this.state = 1 this.writeChan = make(chan []byte, 2) this.closeSignal = make(chan bool) this.wsConn = comm this.wg.Add(2) go this.readLoop() go this.writeLoop() return } func (this *Client) readLoop() { defer this.wg.Done() var ( data []byte msg *pb.UserMessage = &pb.UserMessage{} err error ) locp: for { if _, data, err = this.wsConn.ReadMessage(); err != nil { go this.client.Close() break locp } if err = proto.Unmarshal(data, msg); err != nil { go this.client.Close() break locp } else { // this.gateway.Debugf("----------2 agent:%s uId:%s MainType:%s SubType:%s ", this.sessionId, this.uId, msg.MainType, msg.SubType) this.client.Receive(msg) } } } func (this *Client) writeLoop() { defer this.wg.Done() var ( err error timer *time.Ticker ) timer = time.NewTicker(time.Second * 30) locp: for { select { case <-this.closeSignal: break locp case msg, ok := <-this.writeChan: if ok { if err = this.wsConn.WriteMessage(websocket.BinaryMessage, msg); err != nil { go this.client.Close() } } else { go this.client.Close() } case <-timer.C: this.client.Heartbeat() } } timer.Stop() } func (this *Client) WriteMsg(msg *pb.UserMessage) (err error) { if atomic.LoadInt32(&this.state) != 1 { err = fmt.Errorf("Client state closed !") return } var ( data []byte ) if data, err = proto.Marshal(msg); err == nil { this.writeChan <- data } return } // 外部代用关闭 func (this *Client) Close() { if !atomic.CompareAndSwapInt32(&this.state, 1, 2) { return } this.wsConn.Close() this.closeSignal <- true this.wg.Wait() atomic.StoreInt32(&this.state, 0) this.client.OnClose() } // 分发用户消息 func (this *Client) Receive(msg *pb.UserMessage) (err error) { return } func (this *Client) Heartbeat() { }