上传rpc worker 端通信接口开放

This commit is contained in:
liwei1dao 2022-10-17 11:52:06 +08:00
parent 61b35955c8
commit f28acb2f29

View File

@ -2,6 +2,7 @@ package rpcx
import ( import (
"context" "context"
"strings"
"github.com/smallnest/rpcx/client" "github.com/smallnest/rpcx/client"
) )
@ -66,36 +67,65 @@ func (this *RPCX) UnregisterAll() (err error) {
//同步调用 //同步调用
func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
return this.client.Call(ctx, servicePath, serviceMethod, args, reply) //先排查下 服务端是否存在连接对象 不存在 在使用客户端对象连接
err = this.service.Call(ctx, servicePath, serviceMethod, args, reply)
if strings.Contains(err.Error(), "on found") {
return this.client.Call(ctx, servicePath, serviceMethod, args, reply)
}
return
} }
//广播调用 //广播调用
func (this *RPCX) Broadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { func (this *RPCX) Broadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
return this.client.Broadcast(ctx, servicePath, serviceMethod, args, reply) err = this.service.Broadcast(ctx, servicePath, serviceMethod, args, reply)
if strings.Contains(err.Error(), "on found") {
return this.client.Broadcast(ctx, servicePath, serviceMethod, args, reply)
}
return
} }
//异步调用 //异步调用
func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) {
return this.client.Go(ctx, servicePath, serviceMethod, args, reply, done) call, err = this.service.Go(ctx, servicePath, serviceMethod, args, reply, done)
if strings.Contains(err.Error(), "on found") {
return this.client.Go(ctx, servicePath, serviceMethod, args, reply, done)
}
return
} }
//跨服同步调用 //跨服同步调用
func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
return this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) err = this.service.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply)
if strings.Contains(err.Error(), "on found") {
return this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply)
}
return
} }
//跨集群 广播 //跨集群 广播
func (this *RPCX) AcrossClusterBroadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { func (this *RPCX) AcrossClusterBroadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
err = this.client.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply) err = this.service.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply)
if strings.Contains(err.Error(), "on found") {
return this.client.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply)
}
return return
} }
//跨服异步调用 //跨服异步调用
func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) {
return this.client.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) call, err = this.service.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done)
if strings.Contains(err.Error(), "on found") {
return this.client.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done)
}
return
} }
//全集群广播 //全集群广播
func (this *RPCX) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { func (this *RPCX) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply)
err = this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply)
if strings.Contains(err.Error(), "on found") {
return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply)
}
return
} }