go_dreamfactory/cmd/v2/service/connService.go
2022-09-08 15:49:17 +08:00

176 lines
3.8 KiB
Go

package service
import (
"go_dreamfactory/cmd/v2/lib/common"
"go_dreamfactory/cmd/v2/model"
"go_dreamfactory/cmd/v2/service/observer"
"go_dreamfactory/comm"
"go_dreamfactory/pb"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)
var (
conn *ConnServiceImpl
)
type ConnService interface {
WsConnect(wsUrl string) error
HttpConnect(url string) ([]byte, error)
SendMsg(msg *pb.UserMessage, rsp proto.Message) (err error)
ReceiveMsg() (code pb.ErrorCode, msg *pb.UserMessage)
ListenerPush()
}
type ConnServiceImpl struct {
ws *websocket.Conn
obs observer.Observer
}
func NewConnService(obs observer.Observer) ConnService {
conn = &ConnServiceImpl{obs: obs}
return conn
}
func GetConnService() *ConnServiceImpl {
return conn
}
// connect ...
func (c *ConnServiceImpl) WsConnect(wsUrl string) error {
dialer := &websocket.Dialer{
HandshakeTimeout: 5 * time.Second,
}
ws, _, err := dialer.Dial(wsUrl, nil)
if err != nil {
logrus.Errorf("websocket conn err:%v", err)
return err
}
c.ws = ws
go func() {
timer := time.NewTimer(2 * time.Second)
for {
timer.Reset(2 * time.Second)
<-timer.C
if err := c.ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
c.obs.Notify(observer.EVENT_PING, err)
break
}
}
}()
return nil
}
func (c *ConnServiceImpl) HttpConnect(url string) ([]byte, error) {
res, err := http.Get(url)
if err != nil {
return nil, err
}
defer res.Body.Close()
return ioutil.ReadAll(res.Body)
}
// listener push
func (c *ConnServiceImpl) ListenerPush() {
go func() {
for {
// time.Sleep(time.Millisecond * 200)
msg := &pb.UserMessage{}
_, data, err := c.ws.ReadMessage()
if err != nil {
logrus.Errorf("readMessage err:%v", err)
return
}
if err = proto.Unmarshal(data, msg); err != nil {
return
}
// logrus.WithFields(
// logrus.Fields{"MainType": msg.MainType, "SubType": msg.SubType},
// ).Debug(msg.Data.TypeUrl)
methodStr := msg.Data.TypeUrl
methodName := common.SubStr(methodStr, 20, len(methodStr))
p := &model.PushModel{
MethodName: methodName,
DataTime: time.Now().Format(time.RFC3339),
Msg: msg,
}
logrus.WithFields(
logrus.Fields{"MainType": msg.MainType, "SubType": msg.SubType},
).Debug(methodName)
renderRespPanel := func(p *model.PushModel) {
c.obs.Notify(observer.EVENT_REQ_RSP, p.Msg)
}
if strings.HasSuffix(methodName, "Push") {
c.obs.Notify(observer.EVENT_APP_MONI, p)
c.obs.Notify(observer.EVENT_USER_CHANGE, p)
if methodName == "NotifyErrorNotifyPush" {
renderRespPanel(p)
}
} else {
//render appTestcase respPanel
renderRespPanel(p)
}
}
}()
}
// SendMsg ....
func (c *ConnServiceImpl) SendMsg(msg *pb.UserMessage, rsp proto.Message) (err error) {
// msg.Sec = r.BuildSecStr()
if comm.ProtoMarshal(rsp, msg) {
if data, err := proto.Marshal(msg); err != nil {
return err
} else {
return c.ws.WriteMessage(websocket.BinaryMessage, data)
}
}
return
}
// ReceiveMsg ....
func (c *ConnServiceImpl) ReceiveMsg() (code pb.ErrorCode, msg *pb.UserMessage) {
msg = &pb.UserMessage{}
_, data, err := c.ws.ReadMessage()
if err != nil {
code = pb.ErrorCode_SystemError
logrus.Errorf("readMessage err:%v", err)
return
}
if err = proto.Unmarshal(data, msg); err != nil {
return
}
if code = c.handleNotify(msg); code != pb.ErrorCode_Success {
return
}
return
}
// err notify
func (c *ConnServiceImpl) handleNotify(msg *pb.UserMessage) (code pb.ErrorCode) {
if msg.MainType == "notify" && msg.SubType == "errornotify" {
rsp := &pb.NotifyErrorNotifyPush{}
if !comm.ProtoUnmarshal(msg, rsp) {
code = pb.ErrorCode_PbError
return
}
code = rsp.Code
return
}
return
}