152 lines
3.5 KiB
Go
152 lines
3.5 KiB
Go
package gateway
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"go_dreamfactory/comm"
|
|
"go_dreamfactory/pb"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/liwei1dao/lego/sys/log"
|
|
"github.com/liwei1dao/lego/utils/container/id"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
func newAgent(gateway IGateway, conn *websocket.Conn) *Agent {
|
|
agent := &Agent{
|
|
gateway: gateway,
|
|
wsConn: conn,
|
|
sessionId: id.NewUUId(),
|
|
uId: 0,
|
|
writeChan: make(chan *pb.UserMessage, 2),
|
|
closeSignal: make(chan bool),
|
|
state: 1,
|
|
}
|
|
agent.wg.Add(2)
|
|
go agent.readLoop()
|
|
go agent.writeLoop()
|
|
return agent
|
|
}
|
|
|
|
//用户代理
|
|
type Agent struct {
|
|
gateway IGateway
|
|
wsConn *websocket.Conn
|
|
sessionId string
|
|
uId uint32
|
|
writeChan chan *pb.UserMessage
|
|
closeSignal chan bool
|
|
state int32 //状态 0 关闭 1 运行 2 关闭中
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func (this *Agent) readLoop() {
|
|
defer this.wg.Done()
|
|
var (
|
|
data []byte
|
|
msg *pb.UserMessage = &pb.UserMessage{}
|
|
err error
|
|
)
|
|
locp:
|
|
for {
|
|
if _, data, err = this.wsConn.ReadMessage(); err != nil {
|
|
log.Errorf("agent:%s uId:%d ReadMessage err:%v", this.sessionId, this.uId, err)
|
|
go this.Close()
|
|
break locp
|
|
}
|
|
if err = proto.Unmarshal(data, msg); err != nil {
|
|
log.Errorf("agent:%s uId:%d Unmarshal err:%v", this.sessionId, this.uId, err)
|
|
go this.Close()
|
|
break locp
|
|
} else {
|
|
this.messageDistribution(msg)
|
|
}
|
|
}
|
|
log.Debugf("agent:%s uId:%d readLoop end!", this.sessionId, this.uId)
|
|
}
|
|
|
|
func (this *Agent) writeLoop() {
|
|
defer this.wg.Done()
|
|
var (
|
|
data []byte
|
|
err error
|
|
)
|
|
locp:
|
|
for {
|
|
select {
|
|
case <-this.closeSignal:
|
|
break locp
|
|
case msg, ok := <-this.writeChan:
|
|
if ok {
|
|
data, err = proto.Marshal(msg)
|
|
if err = this.wsConn.WriteMessage(websocket.BinaryMessage, data); err != nil {
|
|
log.Errorf("agent:%s uId:%d WriteMessage err:%v", this.sessionId, this.uId, err)
|
|
go this.Close()
|
|
}
|
|
} else {
|
|
go this.Close()
|
|
}
|
|
}
|
|
}
|
|
log.Debugf("agent:%s uId:%d writeLoop end!", this.sessionId, this.uId)
|
|
}
|
|
|
|
func (this *Agent) SessionId() string {
|
|
return this.sessionId
|
|
}
|
|
|
|
func (this *Agent) IP() string {
|
|
return this.wsConn.RemoteAddr().String()
|
|
}
|
|
func (this *Agent) UserId() uint32 {
|
|
return this.uId
|
|
}
|
|
|
|
func (this *Agent) Build(uId uint32) {
|
|
this.uId = uId
|
|
}
|
|
|
|
func (this *Agent) UnBuild() {
|
|
this.uId = 0
|
|
}
|
|
|
|
func (this *Agent) WriteMsg(msg *pb.UserMessage) (err error) {
|
|
if atomic.LoadInt32(&this.state) != 1 {
|
|
return
|
|
}
|
|
this.writeChan <- msg
|
|
return
|
|
}
|
|
|
|
//外部代用关闭
|
|
func (this *Agent) Close() {
|
|
if !atomic.CompareAndSwapInt32(&this.state, 1, 2) {
|
|
return
|
|
}
|
|
this.wsConn.Close()
|
|
this.closeSignal <- true
|
|
this.wg.Wait()
|
|
atomic.StoreInt32(&this.state, 0)
|
|
this.gateway.DisConnect(this)
|
|
}
|
|
|
|
//分发用户消息
|
|
func (this *Agent) messageDistribution(msg *pb.UserMessage) {
|
|
reply := &pb.RPCMessageReply{}
|
|
log.Debugf("agent:%s uId:%d MessageDistribution msg:%s.%s", this.sessionId, this.uId, msg.MainType, msg.SubType)
|
|
if err := this.gateway.Service().RpcCallByType("worker", string(comm.Rpc_GatewayRoute), context.Background(), &pb.AgentMessage{
|
|
Ip: this.IP(),
|
|
UserSessionId: this.sessionId,
|
|
UserId: this.uId,
|
|
GatewayServiceId: this.gateway.Service().GetId(),
|
|
Method: fmt.Sprintf("%s.%s", msg.MainType, msg.SubType),
|
|
Message: msg.Data,
|
|
}, reply); err != nil {
|
|
log.Errorf("agent:%s uId:%d MessageDistribution err:%v", this.sessionId, this.uId, err)
|
|
} else {
|
|
log.Debugf("agent:%s uId:%d MessageDistribution reply:%v", this.sessionId, this.uId, reply)
|
|
}
|
|
}
|