From 5eb2783750fec5bcc675cf13b1351f275f09ef89 Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Thu, 10 Nov 2022 20:05:35 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=81=8A=E5=A4=A9=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=B9=BF=E6=92=AD=E5=BC=82=E5=B8=B8=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lego/sys/rpcx/client.go | 38 ++++++++------ lego/sys/rpcx/rpcx.go | 3 +- lego/sys/rpcx/service.go | 111 ++++++++++++++++++++------------------- modules/chat/module.go | 8 +-- 4 files changed, 84 insertions(+), 76 deletions(-) diff --git a/lego/sys/rpcx/client.go b/lego/sys/rpcx/client.go index d54a5c157..d6e888d03 100644 --- a/lego/sys/rpcx/client.go +++ b/lego/sys/rpcx/client.go @@ -228,27 +228,31 @@ func (this *Client) ClusterBroadcast(ctx context.Context, servicePath string, se } this.clusterMu.RUnlock() l := len(clients) - done := make(chan error, l) - for _, v := range clients { - go func(c client.XClient) { - done <- c.Broadcast(ctx, serviceMethod, args, reply) - }(v) - } - timeout := time.NewTimer(time.Minute) -check: - for { - select { - case err = <-done: - l-- - if l == 0 || err != nil { // all returns or some one returns an error + if l > 0 { + done := make(chan error, l) + for _, v := range clients { + go func(c client.XClient) { + done <- c.Broadcast(ctx, serviceMethod, args, reply) + }(v) + } + timeout := time.NewTimer(time.Minute) + check: + for { + select { + case err = <-done: + l-- + if l == 0 || err != nil { // all returns or some one returns an error + break check + } + case <-timeout.C: + err = errors.New(("timeout")) break check } - case <-timeout.C: - err = errors.New(("timeout")) - break check } + timeout.Stop() + } else { + err = errors.New("on found any service") } - timeout.Stop() return } diff --git a/lego/sys/rpcx/rpcx.go b/lego/sys/rpcx/rpcx.go index 8e3476927..f3c913af8 100644 --- a/lego/sys/rpcx/rpcx.go +++ b/lego/sys/rpcx/rpcx.go @@ -127,8 +127,7 @@ func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servic //全集群广播 func (this *RPCX) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { - - err = this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) + err = this.service.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) if err != nil && strings.Contains(err.Error(), "on found") { return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) } diff --git a/lego/sys/rpcx/service.go b/lego/sys/rpcx/service.go index 2ba304583..cd221f15f 100644 --- a/lego/sys/rpcx/service.go +++ b/lego/sys/rpcx/service.go @@ -496,66 +496,71 @@ func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, s } l := len(addrs) - done := make(chan error, l) - for v, _ := range addrs { - go func(addr string) { - this.clientmutex.RLock() - conn, ok := this.clients[addr] - if !ok { - done <- fmt.Errorf("on found clientaddr:%s", addr) + if l > 0 { + done := make(chan error, l) + for v, _ := range addrs { + go func(addr string) { + this.clientmutex.RLock() + conn, ok := this.clients[addr] + if !ok { + done <- fmt.Errorf("on found clientaddr:%s", addr) + this.clientmutex.RUnlock() + return + } this.clientmutex.RUnlock() - return - } - this.clientmutex.RUnlock() - _call := new(client.Call) - _call.ServicePath = servicePath - _call.ServiceMethod = serviceMethod - _call.Args = args - _call.Reply = reply - _call.Done = make(chan *client.Call, 10) - this.send(ctx, conn, spath[0], serviceMethod, metadata, _call) - seq, _ := ctx.Value(seqKey{}).(*uint64) - select { - case <-ctx.Done(): // cancel by context - this.mutex.Lock() - call := this.pending[*seq] - delete(this.pending, *seq) - this.mutex.Unlock() - if call != nil { - call.Error = ctx.Err() - call.Done <- call - } - done <- ctx.Err() - case call := <-_call.Done: - err = call.Error - meta := ctx.Value(share.ResMetaDataKey) - if meta != nil && len(call.ResMetadata) > 0 { - resMeta := meta.(map[string]string) - for k, v := range call.ResMetadata { - resMeta[k] = v + _call := new(client.Call) + _call.ServicePath = servicePath + _call.ServiceMethod = serviceMethod + _call.Args = args + _call.Reply = reply + _call.Done = make(chan *client.Call, 10) + this.send(ctx, conn, spath[0], serviceMethod, metadata, _call) + seq, _ := ctx.Value(seqKey{}).(*uint64) + select { + case <-ctx.Done(): // cancel by context + this.mutex.Lock() + call := this.pending[*seq] + delete(this.pending, *seq) + this.mutex.Unlock() + if call != nil { + call.Error = ctx.Err() + call.Done <- call } - resMeta[share.ServerAddress] = conn.RemoteAddr().String() + done <- ctx.Err() + case call := <-_call.Done: + err = call.Error + meta := ctx.Value(share.ResMetaDataKey) + if meta != nil && len(call.ResMetadata) > 0 { + resMeta := meta.(map[string]string) + for k, v := range call.ResMetadata { + resMeta[k] = v + } + resMeta[share.ServerAddress] = conn.RemoteAddr().String() + } + done <- nil } - done <- nil - } - }(v) - } - timeout := time.NewTimer(time.Minute) -check: - for { - select { - case err = <-done: - l-- - if l == 0 || err != nil { // all returns or some one returns an error + }(v) + } + timeout := time.NewTimer(time.Minute) + check: + for { + select { + case err = <-done: + l-- + if l == 0 || err != nil { // all returns or some one returns an error + break check + } + case <-timeout.C: + err = errors.New(("timeout")) break check } - case <-timeout.C: - err = errors.New(("timeout")) - break check } + timeout.Stop() + } else { + err = errors.New("on found any service") } - timeout.Stop() - return err + + return } //发送远程调用请求 diff --git a/modules/chat/module.go b/modules/chat/module.go index 5bd4284b1..c00e73a35 100644 --- a/modules/chat/module.go +++ b/modules/chat/module.go @@ -82,7 +82,7 @@ func (this *Chat) OnInstallComp() { //Event------------------------------------------------------------------------------------------------------------ func (this *Chat) EventUserOffline(session comm.IUserSession) { if err := this.modelChat.removeCrossChannelMember(session); err != nil { - this.Debug("EventUserOffline:", log.Field{"uid", session.GetUserId()}, log.Field{"err", err}) + this.Debug("EventUserOffline:", log.Field{Key: "uid", Value: session.GetUserId()}, log.Field{Key: "err", Value: err.Error()}) } } @@ -151,7 +151,7 @@ func (this *Chat) SendWorldChat(msg *pb.DBChat) (code pb.ErrorCode) { } else { if _, err = this.service.AcrossClusterRpcGo( context.Background(), - msg.Stag, + this.GetCrossTag(), comm.Service_Worker, string(comm.Rpc_ModuleChatPushChat), msg, @@ -182,7 +182,7 @@ func (this *Chat) SendUnionChat(msg *pb.DBChat) (code pb.ErrorCode) { } else { if _, err = this.service.AcrossClusterRpcGo( context.Background(), - msg.Stag, + this.GetCrossTag(), comm.Service_Worker, string(comm.Rpc_ModuleChatPushChat), msg, @@ -257,7 +257,7 @@ func (this *Chat) SendSysChatToWorld(ctype comm.ChatSystemType, appenddata inter } else { if _, err = this.service.AcrossClusterRpcGo( context.Background(), - msg.Stag, + this.GetCrossTag(), comm.Service_Worker, string(comm.Rpc_ModuleChatPushChat), msg,