From 6c44a4632ec01ebc502cd28856dbb73dfd979332 Mon Sep 17 00:00:00 2001 From: meixiongfeng <766881921@qq.com> Date: Wed, 8 Mar 2023 09:41:29 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=8F=E8=AE=AE=E5=8A=A0=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/gateway/agent.go | 42 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/modules/gateway/agent.go b/modules/gateway/agent.go index cfea8d01b..411a0bae5 100644 --- a/modules/gateway/agent.go +++ b/modules/gateway/agent.go @@ -37,6 +37,7 @@ func newAgent(gateway IGateway, conn *websocket.Conn) *Agent { writeChan: make(chan []byte, 2), closeSignal: make(chan bool), state: 1, + protoMsg: map[string]int64{}, } agent.wg.Add(2) go agent.readLoop() @@ -55,6 +56,8 @@ type Agent struct { closeSignal chan bool state int32 //状态 0 关闭 1 运行 2 关闭中 wg sync.WaitGroup + hlock sync.RWMutex + protoMsg map[string]int64 } func (this *Agent) readLoop() { @@ -111,6 +114,14 @@ locp: break locp } } + if this.uId != "" { + key := this.uId + msg.MainType + msg.SubType + this.hlock.Lock() + if v, ok := this.protoMsg[key]; ok && v != 0 { + v = 0 + } + this.hlock.Unlock() + } } } this.gateway.Debugf("agent:%s uId:%s readLoop end!", this.sessionId, this.uId) @@ -129,11 +140,12 @@ locp: break locp case msg, ok := <-this.writeChan: if ok { - // data, err = proto.Marshal(msg) + //data, err = proto.Marshal(msg) if err = this.wsConn.WriteMessage(websocket.BinaryMessage, msg); err != nil { this.gateway.Errorf("agent:%s uId:%d WriteMessage err:%v", this.sessionId, this.uId, err) go this.Close() } + } else { go this.Close() } @@ -340,6 +352,34 @@ func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) { log.Field{Key: "req", Value: fmt.Sprintf("%s:%s %v", req.MainType, req.SubType, req.Message.String())}, log.Field{Key: "reply", Value: reply.String()}, ) + if this.uId != "" { + key := this.uId + msg.MainType + msg.SubType // 加锁 + if v, ok := this.protoMsg[key]; ok { + if v != 0 && configure.Now().Unix()-v < 2 { + // 返回错误码 + this.hlock.Lock() + this.protoMsg[key] = configure.Now().Unix() + this.hlock.Unlock() + + data, _ := anypb.New(&pb.NotifyErrorNotifyPush{ + ReqMainType: msg.MainType, + ReqSubType: msg.SubType, + Arg: msg.Data, + Code: pb.ErrorCode_InsufficientPermissions}) + err = this.WriteMsg(&pb.UserMessage{ + MainType: comm.MainTypeNotify, + SubType: comm.SubTypeErrorNotify, + Data: data, + }) + return + } + } + this.hlock.Lock() + this.protoMsg[key] = configure.Now().Unix() + this.hlock.Unlock() + + } + if reply.Code != pb.ErrorCode_Success { data, _ := anypb.New(&pb.NotifyErrorNotifyPush{ ReqMainType: msg.MainType,