优化聊天消息广播异常问题

This commit is contained in:
liwei1dao 2022-11-10 20:05:35 +08:00
parent dc30119e55
commit 5eb2783750
4 changed files with 84 additions and 76 deletions

View File

@ -228,27 +228,31 @@ func (this *Client) ClusterBroadcast(ctx context.Context, servicePath string, se
} }
this.clusterMu.RUnlock() this.clusterMu.RUnlock()
l := len(clients) l := len(clients)
done := make(chan error, l) if l > 0 {
for _, v := range clients { done := make(chan error, l)
go func(c client.XClient) { for _, v := range clients {
done <- c.Broadcast(ctx, serviceMethod, args, reply) go func(c client.XClient) {
}(v) done <- c.Broadcast(ctx, serviceMethod, args, reply)
} }(v)
timeout := time.NewTimer(time.Minute) }
check: timeout := time.NewTimer(time.Minute)
for { check:
select { for {
case err = <-done: select {
l-- case err = <-done:
if l == 0 || err != nil { // all returns or some one returns an error l--
if l == 0 || err != nil { // all returns or some one returns an error
break check
}
case <-timeout.C:
err = errors.New(("timeout"))
break check break check
} }
case <-timeout.C:
err = errors.New(("timeout"))
break check
} }
timeout.Stop()
} else {
err = errors.New("on found any service")
} }
timeout.Stop()
return return
} }

View File

@ -127,8 +127,7 @@ func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servic
//全集群广播 //全集群广播
func (this *RPCX) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { func (this *RPCX) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
err = this.service.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply)
err = this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply)
if err != nil && strings.Contains(err.Error(), "on found") { if err != nil && strings.Contains(err.Error(), "on found") {
return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply)
} }

View File

@ -496,66 +496,71 @@ func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, s
} }
l := len(addrs) l := len(addrs)
done := make(chan error, l) if l > 0 {
for v, _ := range addrs { done := make(chan error, l)
go func(addr string) { for v, _ := range addrs {
this.clientmutex.RLock() go func(addr string) {
conn, ok := this.clients[addr] this.clientmutex.RLock()
if !ok { conn, ok := this.clients[addr]
done <- fmt.Errorf("on found clientaddr:%s", addr) if !ok {
done <- fmt.Errorf("on found clientaddr:%s", addr)
this.clientmutex.RUnlock()
return
}
this.clientmutex.RUnlock() this.clientmutex.RUnlock()
return _call := new(client.Call)
} _call.ServicePath = servicePath
this.clientmutex.RUnlock() _call.ServiceMethod = serviceMethod
_call := new(client.Call) _call.Args = args
_call.ServicePath = servicePath _call.Reply = reply
_call.ServiceMethod = serviceMethod _call.Done = make(chan *client.Call, 10)
_call.Args = args this.send(ctx, conn, spath[0], serviceMethod, metadata, _call)
_call.Reply = reply seq, _ := ctx.Value(seqKey{}).(*uint64)
_call.Done = make(chan *client.Call, 10) select {
this.send(ctx, conn, spath[0], serviceMethod, metadata, _call) case <-ctx.Done(): // cancel by context
seq, _ := ctx.Value(seqKey{}).(*uint64) this.mutex.Lock()
select { call := this.pending[*seq]
case <-ctx.Done(): // cancel by context delete(this.pending, *seq)
this.mutex.Lock() this.mutex.Unlock()
call := this.pending[*seq] if call != nil {
delete(this.pending, *seq) call.Error = ctx.Err()
this.mutex.Unlock() call.Done <- call
if call != nil {
call.Error = ctx.Err()
call.Done <- call
}
done <- ctx.Err()
case call := <-_call.Done:
err = call.Error
meta := ctx.Value(share.ResMetaDataKey)
if meta != nil && len(call.ResMetadata) > 0 {
resMeta := meta.(map[string]string)
for k, v := range call.ResMetadata {
resMeta[k] = v
} }
resMeta[share.ServerAddress] = conn.RemoteAddr().String() done <- ctx.Err()
case call := <-_call.Done:
err = call.Error
meta := ctx.Value(share.ResMetaDataKey)
if meta != nil && len(call.ResMetadata) > 0 {
resMeta := meta.(map[string]string)
for k, v := range call.ResMetadata {
resMeta[k] = v
}
resMeta[share.ServerAddress] = conn.RemoteAddr().String()
}
done <- nil
} }
done <- nil }(v)
} }
}(v) timeout := time.NewTimer(time.Minute)
} check:
timeout := time.NewTimer(time.Minute) for {
check: select {
for { case err = <-done:
select { l--
case err = <-done: if l == 0 || err != nil { // all returns or some one returns an error
l-- break check
if l == 0 || err != nil { // all returns or some one returns an error }
case <-timeout.C:
err = errors.New(("timeout"))
break check break check
} }
case <-timeout.C:
err = errors.New(("timeout"))
break check
} }
timeout.Stop()
} else {
err = errors.New("on found any service")
} }
timeout.Stop()
return err return
} }
//发送远程调用请求 //发送远程调用请求

View File

@ -82,7 +82,7 @@ func (this *Chat) OnInstallComp() {
//Event------------------------------------------------------------------------------------------------------------ //Event------------------------------------------------------------------------------------------------------------
func (this *Chat) EventUserOffline(session comm.IUserSession) { func (this *Chat) EventUserOffline(session comm.IUserSession) {
if err := this.modelChat.removeCrossChannelMember(session); err != nil { if err := this.modelChat.removeCrossChannelMember(session); err != nil {
this.Debug("EventUserOffline:", log.Field{"uid", session.GetUserId()}, log.Field{"err", err}) this.Debug("EventUserOffline:", log.Field{Key: "uid", Value: session.GetUserId()}, log.Field{Key: "err", Value: err.Error()})
} }
} }
@ -151,7 +151,7 @@ func (this *Chat) SendWorldChat(msg *pb.DBChat) (code pb.ErrorCode) {
} else { } else {
if _, err = this.service.AcrossClusterRpcGo( if _, err = this.service.AcrossClusterRpcGo(
context.Background(), context.Background(),
msg.Stag, this.GetCrossTag(),
comm.Service_Worker, comm.Service_Worker,
string(comm.Rpc_ModuleChatPushChat), string(comm.Rpc_ModuleChatPushChat),
msg, msg,
@ -182,7 +182,7 @@ func (this *Chat) SendUnionChat(msg *pb.DBChat) (code pb.ErrorCode) {
} else { } else {
if _, err = this.service.AcrossClusterRpcGo( if _, err = this.service.AcrossClusterRpcGo(
context.Background(), context.Background(),
msg.Stag, this.GetCrossTag(),
comm.Service_Worker, comm.Service_Worker,
string(comm.Rpc_ModuleChatPushChat), string(comm.Rpc_ModuleChatPushChat),
msg, msg,
@ -257,7 +257,7 @@ func (this *Chat) SendSysChatToWorld(ctype comm.ChatSystemType, appenddata inter
} else { } else {
if _, err = this.service.AcrossClusterRpcGo( if _, err = this.service.AcrossClusterRpcGo(
context.Background(), context.Background(),
msg.Stag, this.GetCrossTag(),
comm.Service_Worker, comm.Service_Worker,
string(comm.Rpc_ModuleChatPushChat), string(comm.Rpc_ModuleChatPushChat),
msg, msg,