package service import ( "encoding/base64" "go_dreamfactory/cmd/v2/lib" "go_dreamfactory/cmd/v2/lib/common" "go_dreamfactory/comm" "go_dreamfactory/lego/sys/log" "go_dreamfactory/pb" "strings" "sync" "time" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" "github.com/tidwall/gjson" "google.golang.org/protobuf/proto" ) type WsCli struct { addr string conn *websocket.Conn // reqData []byte //请求数据 uid string reqDatas *lib.Queue[[]byte] conns sync.Map flag bool reqData []byte } func NewWsCli(addr string, timeout time.Duration) (lib.Handler, error) { cli := &WsCli{ addr: addr, reqDatas: lib.NewQueue[[]byte](), } // if err := cli.connect(timeout); err != nil { // return nil, err // } return cli, nil } func (cli *WsCli) GetConnMap() sync.Map { return cli.conns } func (cli *WsCli) connect(timeout time.Duration) error { if cli.conn == nil { dialer := &websocket.Dialer{ HandshakeTimeout: 2 * time.Second, } conn, _, err := dialer.Dial(cli.addr, nil) if err != nil { logrus.Errorf("websocket conn err:%v", err) return err } cli.conn = conn } // ping // go func() { // timer := time.NewTimer(2 * time.Second) // for { // timer.Reset(2 * time.Second) // <-timer.C // if err := cli.ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { // break // } // } // }() return nil } // 检查相应 func (cli *WsCli) checkResp(data []byte) bool { msg := &pb.UserMessage{} if err := proto.Unmarshal(data, msg); err != nil { logrus.Error("结果解析失败") return false } if msg.MainType == "user" && msg.SubType == "login" { rsp := &pb.UserLoginResp{} if !comm.ProtoUnmarshal(msg, rsp) { logrus.Error("unmarshal err") return false } if rsp.Data != nil { if rsp.Data.Uid != "" { logrus.WithField("uid", rsp.Data.Uid).Debug("登录响应") cli.uid = rsp.Data.Uid return true } } return true } else { if cli.uid != "" { return true } } return false } // 检查推送(包括错误推送) func (cli *WsCli) checkPush(data []byte) bool { msg := &pb.UserMessage{} if err := proto.Unmarshal(data, msg); err != nil { logrus.Error("结果解析失败") return false } methodStr := msg.Data.TypeUrl methodName := common.SubStr(methodStr, 20, len(methodStr)) if strings.HasSuffix(methodName, "Push") { if methodName == "NotifyErrorNotifyPush" { push := &pb.NotifyErrorNotifyPush{} if !comm.ProtoUnmarshal(msg, push) { logrus.Error("unmarsh err") return false } logrus.WithField("methodName", methodName).WithField("code", push.Code).Debug("收到错误码") } else { logrus.WithField("methodName", methodName).Debug("收到推送") } return true } return false } func (cli *WsCli) SetReq(data []byte, flag bool) { cli.flag = flag if flag { cli.reqData = data } cli.reqDatas.Add(data) } func (cli *WsCli) BuildReq() lib.RawReq { id := time.Now().UnixNano() var ( data []byte err error ) data, err = cli.reqDatas.Pop() if err != nil { if cli.flag { data = cli.reqData } else { return lib.RawReq{} } } rawReq := lib.RawReq{ID: id, Req: data} return rawReq } func (cli *WsCli) Call(req []byte) ([]byte, error) { msg := &pb.UserMessage{} if err := proto.Unmarshal(req, msg); err != nil { logrus.Error("结果解析失败") return nil, err } base64Str := msg.Sec dec, err := base64.StdEncoding.DecodeString(base64Str[35:]) if err != nil { log.Errorf("base64 decode err %v", err) return nil, err } jsonRet := gjson.Parse(string(dec)) account := jsonRet.Get("account").String() dialer := &websocket.Dialer{ HandshakeTimeout: 2 * time.Second, } var conn *websocket.Conn if c, ok := cli.conns.Load(account); ok { conn = c.(*websocket.Conn) } else { conn, _, err = dialer.Dial(cli.addr, nil) if err != nil { logrus.Errorf("websocket conn err:%v", err) return nil, err } } if msg.MainType == "user" && msg.SubType == "login" { cli.conns.Store(account, conn) } // 向连接写数据 conn.WriteMessage(websocket.BinaryMessage, req) // 读数据 var res []byte for { _, data, err := conn.ReadMessage() if err != nil { logrus.Errorf("readMessage err:%v", err) break } // if cli.checkResp(data) { // return data, nil // } else { if !cli.checkPush(data) { return data, nil } // } } return res, nil } func (cli *WsCli) Check(req lib.RawReq, resp lib.RawResp) *lib.CallResult { var result lib.CallResult result.Id = resp.ID result.Req = req result.Resp = resp msg := &pb.UserMessage{} if err := proto.Unmarshal(resp.Resp, msg); err != nil { logrus.Error("结果解析失败") return &result } // logrus.WithFields(logrus.Fields{"msg": msg}).Debug("检查结果") return &result }