package gateway import ( "context" "go_dreamfactory/comm" "go_dreamfactory/pb" "sync" "sync/atomic" "github.com/gorilla/websocket" "github.com/liwei1dao/lego/sys/log" "github.com/liwei1dao/lego/utils/container/id" "google.golang.org/protobuf/proto" ) func newAgent(gateway IGateway, conn *websocket.Conn) *Agent { agent := &Agent{ gateway: gateway, wsConn: conn, sessionId: id.NewUUId(), uId: 0, writeChan: make(chan *pb.Message, 2), closeSignal: make(chan bool), state: 1, } agent.wg.Add(2) go agent.readLoop() go agent.writeLoop() return agent } //用户代理 type Agent struct { gateway IGateway wsConn *websocket.Conn sessionId string uId uint32 writeChan chan *pb.Message closeSignal chan bool state int32 //状态 0 关闭 1 运行 2 关闭中 wg sync.WaitGroup } func (this *Agent) readLoop() { defer this.wg.Done() var ( data []byte msg *pb.Message = &pb.Message{} err error ) locp: for { if _, data, err = this.wsConn.ReadMessage(); err != nil { log.Errorf("agent:%s uId:%d ReadMessage err:%v", this.sessionId, this.uId, err) go this.Close() break locp } if err = proto.Unmarshal(data, msg); err != nil { log.Errorf("agent:%s uId:%d Unmarshal err:%v", this.sessionId, this.uId, err) go this.Close() break locp } else { this.messageDistribution(msg) } } log.Debugf("agent:%s uId:%d readLoop end!", this.sessionId, this.uId) } func (this *Agent) writeLoop() { defer this.wg.Done() var ( data []byte err error ) locp: for { select { case <-this.closeSignal: break locp case msg, ok := <-this.writeChan: if ok { data, err = proto.Marshal(msg) if err = this.wsConn.WriteMessage(websocket.BinaryMessage, data); err != nil { log.Errorf("agent:%s uId:%d WriteMessage err:%v", this.sessionId, this.uId, err) go this.Close() } } else { go this.Close() } } } log.Debugf("agent:%s uId:%d writeLoop end!", this.sessionId, this.uId) } func (this *Agent) SessionId() string { return this.sessionId } func (this *Agent) IP() string { return this.wsConn.RemoteAddr().String() } func (this *Agent) UserId() uint32 { return this.uId } func (this *Agent) WriteMsg(msg *pb.UserMessage) (err error) { return } //外部代用关闭 func (this *Agent) Close() { if !atomic.CompareAndSwapInt32(&this.state, 1, 2) { return } this.wsConn.Close() this.closeSignal <- true this.wg.Wait() atomic.StoreInt32(&this.state, 0) this.gateway.DisConnect(this) } //分发用户消息 func (this *Agent) messageDistribution(msg *pb.Message) { reply := &pb.UserMessageReply{} log.Debugf("agent:%s uId:%d MessageDistribution msg:%s", this.sessionId, this.uId, msg.Head.ServiceMethod) if err := this.gateway.Service().RpcCallByType("worker", string(comm.Rpc_GatewayRoute), context.Background(), &pb.UserMessage{ Ip: this.IP(), UserSessionId: this.sessionId, UserId: this.uId, GatewayServiceId: this.gateway.Service().GetId(), Method: msg.Head.ServiceMethod, Message: msg.Data, }, reply); err != nil { log.Errorf("agent:%s uId:%d MessageDistribution err:%v", this.sessionId, this.uId, err) } else { log.Debugf("agent:%s uId:%d MessageDistribution reply:%v", this.sessionId, this.uId, reply) } }