diff --git a/lego/sys/rpcx/rpcx.go b/lego/sys/rpcx/rpcx.go index 5f8d27e2e..670977a69 100644 --- a/lego/sys/rpcx/rpcx.go +++ b/lego/sys/rpcx/rpcx.go @@ -2,6 +2,7 @@ package rpcx import ( "context" + "strings" "github.com/smallnest/rpcx/client" ) @@ -66,36 +67,65 @@ func (this *RPCX) UnregisterAll() (err error) { //同步调用 func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { - return this.client.Call(ctx, servicePath, serviceMethod, args, reply) + //先排查下 服务端是否存在连接对象 不存在 在使用客户端对象连接 + err = this.service.Call(ctx, servicePath, serviceMethod, args, reply) + if strings.Contains(err.Error(), "on found") { + return this.client.Call(ctx, servicePath, serviceMethod, args, reply) + } + return } //广播调用 func (this *RPCX) Broadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { - return this.client.Broadcast(ctx, servicePath, serviceMethod, args, reply) + err = this.service.Broadcast(ctx, servicePath, serviceMethod, args, reply) + if strings.Contains(err.Error(), "on found") { + return this.client.Broadcast(ctx, servicePath, serviceMethod, args, reply) + } + return } //异步调用 func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { - return this.client.Go(ctx, servicePath, serviceMethod, args, reply, done) + call, err = this.service.Go(ctx, servicePath, serviceMethod, args, reply, done) + if strings.Contains(err.Error(), "on found") { + return this.client.Go(ctx, servicePath, serviceMethod, args, reply, done) + } + return } //跨服同步调用 func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { - return this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) + err = this.service.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) + if strings.Contains(err.Error(), "on found") { + return this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) + } + return } //跨集群 广播 func (this *RPCX) AcrossClusterBroadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { - err = this.client.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) + err = this.service.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) + if strings.Contains(err.Error(), "on found") { + return this.client.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) + } return } //跨服异步调用 func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { - return this.client.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) + call, err = this.service.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) + if strings.Contains(err.Error(), "on found") { + return this.client.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) + } + return } //全集群广播 func (this *RPCX) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { - return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) + + err = this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) + if strings.Contains(err.Error(), "on found") { + return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply) + } + return }