138 lines
4.4 KiB
Go
138 lines
4.4 KiB
Go
package rpcx
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
|
|
"github.com/smallnest/rpcx/client"
|
|
)
|
|
|
|
func newSys(options *Options) (sys ISys, err error) {
|
|
if options.RpcxStartType == RpcxStartByService { //创建RPCX 服务端
|
|
sys, err = newService(options)
|
|
return
|
|
}
|
|
|
|
if options.RpcxStartType == RpcxStartByClient { //创建RPCX 客户端
|
|
sys, err = newClient(options)
|
|
return
|
|
}
|
|
var (
|
|
service ISys
|
|
client ISys
|
|
)
|
|
service, err = newService(options)
|
|
client, err = newClient(options)
|
|
sys = &RPCX{
|
|
options: options,
|
|
service: service,
|
|
client: client,
|
|
}
|
|
return
|
|
}
|
|
|
|
type RPCX struct {
|
|
options *Options
|
|
service ISys
|
|
client ISys
|
|
}
|
|
|
|
func (this *RPCX) Start() (err error) {
|
|
this.service.Start()
|
|
this.client.Start()
|
|
|
|
return
|
|
}
|
|
|
|
func (this *RPCX) Stop() (err error) {
|
|
this.service.Stop()
|
|
this.client.Stop()
|
|
return
|
|
}
|
|
|
|
// 获取服务集群列表
|
|
func (this *RPCX) GetServiceTags() []string {
|
|
return this.GetServiceTags()
|
|
}
|
|
func (this *RPCX) RegisterFunction(fn interface{}) (err error) {
|
|
this.service.RegisterFunction(fn)
|
|
this.client.RegisterFunction(fn)
|
|
return
|
|
}
|
|
|
|
func (this *RPCX) RegisterFunctionName(name string, fn interface{}) (err error) {
|
|
this.service.RegisterFunctionName(name, fn)
|
|
this.client.RegisterFunctionName(name, fn)
|
|
return
|
|
}
|
|
|
|
func (this *RPCX) UnregisterAll() (err error) {
|
|
err = this.service.UnregisterAll()
|
|
err = this.client.UnregisterAll()
|
|
return
|
|
}
|
|
|
|
// 同步调用
|
|
func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
|
|
//先排查下 服务端是否存在连接对象 不存在 在使用客户端对象连接
|
|
err = this.service.Call(ctx, servicePath, serviceMethod, args, reply)
|
|
if err != nil && strings.Contains(err.Error(), "on found") {
|
|
return this.client.Call(ctx, servicePath, serviceMethod, args, reply)
|
|
}
|
|
return
|
|
}
|
|
|
|
// 广播调用
|
|
func (this *RPCX) Broadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
|
|
err = this.service.Broadcast(ctx, servicePath, serviceMethod, args, reply)
|
|
if err != nil && strings.Contains(err.Error(), "on found") {
|
|
return this.client.Broadcast(ctx, servicePath, serviceMethod, args, reply)
|
|
}
|
|
return
|
|
}
|
|
|
|
// 异步调用
|
|
func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) {
|
|
call, err = this.service.Go(ctx, servicePath, serviceMethod, args, reply, done)
|
|
if err != nil && strings.Contains(err.Error(), "on found") {
|
|
return this.client.Go(ctx, servicePath, serviceMethod, args, reply, done)
|
|
}
|
|
return
|
|
}
|
|
|
|
// 跨服同步调用
|
|
func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
|
|
err = this.service.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply)
|
|
if err != nil && strings.Contains(err.Error(), "on found") {
|
|
return this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply)
|
|
}
|
|
return
|
|
}
|
|
|
|
// 跨集群 广播
|
|
func (this *RPCX) AcrossClusterBroadcast(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
|
|
err = this.service.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply)
|
|
if err != nil && strings.Contains(err.Error(), "on found") {
|
|
return this.client.AcrossClusterBroadcast(ctx, clusterTag, servicePath, serviceMethod, args, reply)
|
|
}
|
|
return
|
|
}
|
|
|
|
// 跨服异步调用
|
|
func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) {
|
|
call, err = this.service.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done)
|
|
if err != nil && strings.Contains(err.Error(), "on found") {
|
|
return this.client.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done)
|
|
}
|
|
return
|
|
}
|
|
|
|
// 全集群广播
|
|
func (this *RPCX) ClusterBroadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
|
|
err = this.service.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply)
|
|
if err != nil && strings.Contains(err.Error(), "on found") {
|
|
return this.client.ClusterBroadcast(ctx, servicePath, serviceMethod, args, reply)
|
|
}
|
|
return
|
|
}
|