package service import ( "go_dreamfactory/cmd/v2/lib/common" "go_dreamfactory/cmd/v2/model" "go_dreamfactory/cmd/v2/service/observer" "go_dreamfactory/comm" "go_dreamfactory/pb" "strings" "time" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" ) var ( conn *ConnServiceImpl ) type ConnService interface { Connect(wsUrl string) error SendMsg(msg *pb.UserMessage, rsp proto.Message) (err error) ReceiveMsg() (code pb.ErrorCode, msg *pb.UserMessage) ListenerPush() } type ConnServiceImpl struct { ws *websocket.Conn obs observer.Observer } func NewConnService(obs observer.Observer) ConnService { conn = &ConnServiceImpl{obs: obs} return conn } func GetConnService() *ConnServiceImpl { return conn } // connect ... func (c *ConnServiceImpl) Connect(wsUrl string) error { dialer := &websocket.Dialer{ HandshakeTimeout: 5 * time.Second, } ws, _, err := dialer.Dial(wsUrl, nil) if err != nil { logrus.Errorf("websocket conn err:%v", err) return err } c.ws = ws ticker := time.NewTicker(2 * time.Second) go func() { for { _ = <-ticker.C if err := c.ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { c.obs.Notify(observer.EVENT_PING, err) break } } }() return nil } // listener push func (c *ConnServiceImpl) ListenerPush() { go func() { for { time.Sleep(time.Millisecond * 200) msg := &pb.UserMessage{} _, data, err := c.ws.ReadMessage() if err != nil { logrus.Errorf("readMessage err:%v", err) return } if err = proto.Unmarshal(data, msg); err != nil { return } // logrus.WithFields( // logrus.Fields{"MainType": msg.MainType, "SubType": msg.SubType}, // ).Debug(msg.Data.TypeUrl) methodStr := msg.Data.TypeUrl methodName := common.SubStr(methodStr, 20, len(methodStr)) p := &model.PushModel{ MethodName: methodName, DataTime: time.Now().Format(time.RFC3339), Msg: msg, } logrus.WithFields( logrus.Fields{"MainType": msg.MainType, "SubType": msg.SubType}, ).Debug(methodName) renderRespPanel := func(p *model.PushModel) { c.obs.Notify(observer.EVENT_REQ_RSP, p.Msg) } if strings.HasSuffix(methodName, "Push") { c.obs.Notify(observer.EVENT_APP_MONI, p) c.obs.Notify(observer.EVENT_USER_CHANGE, p) if methodName == "NotifyErrorNotifyPush" { renderRespPanel(p) } } else { //render appTestcase respPanel renderRespPanel(p) } } }() } // SendMsg .... func (c *ConnServiceImpl) SendMsg(msg *pb.UserMessage, rsp proto.Message) (err error) { // msg.Sec = r.BuildSecStr() if comm.ProtoMarshal(rsp, msg) { if data, err := proto.Marshal(msg); err != nil { return err } else { return c.ws.WriteMessage(websocket.BinaryMessage, data) } } return } // ReceiveMsg .... func (c *ConnServiceImpl) ReceiveMsg() (code pb.ErrorCode, msg *pb.UserMessage) { msg = &pb.UserMessage{} _, data, err := c.ws.ReadMessage() if err != nil { code = pb.ErrorCode_SystemError logrus.Errorf("readMessage err:%v", err) return } if err = proto.Unmarshal(data, msg); err != nil { return } if code = c.handleNotify(msg); code != pb.ErrorCode_Success { return } return } // err notify func (c *ConnServiceImpl) handleNotify(msg *pb.UserMessage) (code pb.ErrorCode) { if msg.MainType == "notify" && msg.SubType == "errornotify" { rsp := &pb.NotifyErrorNotifyPush{} if !comm.ProtoUnmarshal(msg, rsp) { code = pb.ErrorCode_PbError return } code = rsp.Code return } return } // response // func (c *ConnServiceImpl) RespHandle(t *model.TestCase) { // for { // if code, msg := c.ReceiveMsg(); code != pb.ErrorCode_Success { // c.obs.Notify(observer.EVENT_APP_LOG, cast.ToString(code)) // } else { // if msg.MainType == t.MainType && msg.SubType == t.SubType { // if t.Print == nil { // if a, err := ptypes.MarshalAny(msg.Data); err != nil { // c.obs.Notify(observer.EVENT_APP_LOG, err.Error()) // } else { // c.obs.Notify(observer.EVENT_APP_LOG, a.String()) // } // } else { // if !comm.ProtoUnmarshal(msg, t.Rsp) { // return // } // c.obs.Notify(observer.EVENT_APP_LOG, t.Print(t.Rsp)) // } // break // } // } // } // }