协议加锁
This commit is contained in:
parent
690c219463
commit
6c44a4632e
@ -37,6 +37,7 @@ func newAgent(gateway IGateway, conn *websocket.Conn) *Agent {
|
||||
writeChan: make(chan []byte, 2),
|
||||
closeSignal: make(chan bool),
|
||||
state: 1,
|
||||
protoMsg: map[string]int64{},
|
||||
}
|
||||
agent.wg.Add(2)
|
||||
go agent.readLoop()
|
||||
@ -55,6 +56,8 @@ type Agent struct {
|
||||
closeSignal chan bool
|
||||
state int32 //状态 0 关闭 1 运行 2 关闭中
|
||||
wg sync.WaitGroup
|
||||
hlock sync.RWMutex
|
||||
protoMsg map[string]int64
|
||||
}
|
||||
|
||||
func (this *Agent) readLoop() {
|
||||
@ -111,6 +114,14 @@ locp:
|
||||
break locp
|
||||
}
|
||||
}
|
||||
if this.uId != "" {
|
||||
key := this.uId + msg.MainType + msg.SubType
|
||||
this.hlock.Lock()
|
||||
if v, ok := this.protoMsg[key]; ok && v != 0 {
|
||||
v = 0
|
||||
}
|
||||
this.hlock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
this.gateway.Debugf("agent:%s uId:%s readLoop end!", this.sessionId, this.uId)
|
||||
@ -129,11 +140,12 @@ locp:
|
||||
break locp
|
||||
case msg, ok := <-this.writeChan:
|
||||
if ok {
|
||||
// data, err = proto.Marshal(msg)
|
||||
//data, err = proto.Marshal(msg)
|
||||
if err = this.wsConn.WriteMessage(websocket.BinaryMessage, msg); err != nil {
|
||||
this.gateway.Errorf("agent:%s uId:%d WriteMessage err:%v", this.sessionId, this.uId, err)
|
||||
go this.Close()
|
||||
}
|
||||
|
||||
} else {
|
||||
go this.Close()
|
||||
}
|
||||
@ -340,6 +352,34 @@ func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) {
|
||||
log.Field{Key: "req", Value: fmt.Sprintf("%s:%s %v", req.MainType, req.SubType, req.Message.String())},
|
||||
log.Field{Key: "reply", Value: reply.String()},
|
||||
)
|
||||
if this.uId != "" {
|
||||
key := this.uId + msg.MainType + msg.SubType // 加锁
|
||||
if v, ok := this.protoMsg[key]; ok {
|
||||
if v != 0 && configure.Now().Unix()-v < 2 {
|
||||
// 返回错误码
|
||||
this.hlock.Lock()
|
||||
this.protoMsg[key] = configure.Now().Unix()
|
||||
this.hlock.Unlock()
|
||||
|
||||
data, _ := anypb.New(&pb.NotifyErrorNotifyPush{
|
||||
ReqMainType: msg.MainType,
|
||||
ReqSubType: msg.SubType,
|
||||
Arg: msg.Data,
|
||||
Code: pb.ErrorCode_InsufficientPermissions})
|
||||
err = this.WriteMsg(&pb.UserMessage{
|
||||
MainType: comm.MainTypeNotify,
|
||||
SubType: comm.SubTypeErrorNotify,
|
||||
Data: data,
|
||||
})
|
||||
return
|
||||
}
|
||||
}
|
||||
this.hlock.Lock()
|
||||
this.protoMsg[key] = configure.Now().Unix()
|
||||
this.hlock.Unlock()
|
||||
|
||||
}
|
||||
|
||||
if reply.Code != pb.ErrorCode_Success {
|
||||
data, _ := anypb.New(&pb.NotifyErrorNotifyPush{
|
||||
ReqMainType: msg.MainType,
|
||||
|
Loading…
Reference in New Issue
Block a user