From f28acb2f29f478f34eb9c16fcd405b7955518c1b Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Mon, 17 Oct 2022 11:52:06 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0rpc=20worker=20=E7=AB=AF?= =?UTF-8?q?=E9=80=9A=E4=BF=A1=E6=8E=A5=E5=8F=A3=E5=BC=80=E6=94=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lego/sys/rpcx/rpcx.go | 44 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/lego/sys/rpcx/rpcx.go b/lego/sys/rpcx/rpcx.go index 5f8d27e2e..670977a69 100644 --- a/lego/sys/rpcx/rpcx.go +++ b/lego/sys/rpcx/rpcx.go @@ -2,6 +2,7 @@ package rpcx import ( "context" + "strings" "github.com/smallnest/rpcx/client" ) @@ -66,36 +67,65 @@ func (this *RPCX) UnregisterAll() (err error) { //同步调用 func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { - return this.client.Call(ctx, servicePath, serviceMethod, args, reply) + //先排查下 服务端是否存在连接对象 不存在 在使用客户端对象连接 + err = this.service.Call(ctx, servicePath, serviceMethod, args, reply) + if strings.Contains(err.Error(), "on found") { + return this.client.Call(ctx, servicePath, serviceMethod, args, reply) + } + return } //广播调用 func (this *RPCX) Broadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { - return this.client.Broadcast(ctx, servicePath, serviceMethod, args, reply) + err = this.service.Broadcast(ctx, servicePath, serviceMethod, args, reply) + if strings.Contains(err.Error(), "on found") { + return this.client.Broadcast(ctx, servicePath, serviceMethod, args, reply) + } + return } //异步调用 func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { - return this.client.Go(ctx, servicePath, serviceMethod, args, reply, done) + call, err = this.service.Go(ctx, servicePath, serviceMethod, args, reply, done) + if strings.Contains(err.Error(), "on found") { + return this.client.Go(ctx, servicePath, serviceMethod, args, reply, done) + } + return } //跨服同步调用 func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { - return this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) + err = this.service.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) + if strings.Contains(err.Error(), "on found") { + return this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) + } + return } //跨集群 广播 func (this *RPCX) AcrossClusterBroadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { - err = this.client.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) + err = this.service.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) + if strings.Contains(err.Error(), "on found") { + return this.client.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) + } return } //跨服异步调用 func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { - return this.client.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) + call, err = this.service.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) + if strings.Contains(err.Error(), "on found") { + return this.client.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) + } + return } //全集群广播 func (this *RPCX) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { - return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) + + err = this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) + if strings.Contains(err.Error(), "on found") { + return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) + } + return }