go_dreamfactory/modules/robot/client.go
2023-08-23 16:14:10 +08:00

136 lines
2.6 KiB
Go

package robot
import (
"go_dreamfactory/pb"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
type Client struct {
client IClient
wsConn *websocket.Conn
writeChan chan []byte
state int32 //状态 0 关闭 1 运行 2 关闭中
closeSignal chan bool
wg sync.WaitGroup
account, sid string
}
func (this *Client) Init(addr string, client IClient) (err error) {
var (
comm *websocket.Conn
)
if comm, _, err = websocket.DefaultDialer.Dial(addr, nil); err != nil {
return
}
this.client = client
this.state = 1
this.writeChan = make(chan []byte, 2)
this.closeSignal = make(chan bool)
this.wsConn = comm
this.wg.Add(2)
go this.readLoop()
go this.writeLoop()
return
}
func (this *Client) readLoop() {
defer this.wg.Done()
var (
data []byte
msg *pb.UserMessage = &pb.UserMessage{}
err error
)
locp:
for {
if _, data, err = this.wsConn.ReadMessage(); err != nil {
go this.client.Close()
break locp
}
if err = proto.Unmarshal(data, msg); err != nil {
go this.client.Close()
break locp
} else {
// this.gateway.Debugf("----------2 agent:%s uId:%s MainType:%s SubType:%s ", this.sessionId, this.uId, msg.MainType, msg.SubType)
this.client.Receive(msg)
}
}
}
func (this *Client) writeLoop() {
defer this.wg.Done()
var (
err error
timer *time.Ticker
)
timer = time.NewTicker(time.Second * 30)
locp:
for {
select {
case <-this.closeSignal:
break locp
case msg, ok := <-this.writeChan:
if ok {
if err = this.wsConn.WriteMessage(websocket.BinaryMessage, msg); err != nil {
go this.client.Close()
}
} else {
go this.client.Close()
}
case <-timer.C:
this.client.Heartbeat()
}
}
}
func (this *Client) WriteMsg(msg *pb.UserMessage) (err error) {
if atomic.LoadInt32(&this.state) != 1 {
return
}
var (
data []byte
)
if data, err = proto.Marshal(msg); err == nil {
this.writeChan <- data
}
return
}
// 外部代用关闭
func (this *Client) Close() {
if !atomic.CompareAndSwapInt32(&this.state, 1, 2) {
return
}
this.wsConn.Close()
this.closeSignal <- true
this.wg.Wait()
atomic.StoreInt32(&this.state, 0)
this.client.OnClose()
}
// 分发用户消息
func (this *Client) Receive(msg *pb.UserMessage) (err error) {
return
}
func (this *Client) SendMessage(mtype, stype string, msg proto.Message) (err error) {
data, _ := anypb.New(msg)
message := &pb.UserMessage{
MainType: mtype,
SubType: stype,
Data: data,
}
err = this.WriteMsg(message)
return
}
func (this *Client) Heartbeat() {
}