From 46922d0579d6c1ac2b0d09a799313c6211b231c4 Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Wed, 15 Jun 2022 14:45:49 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90rpcx=20=E8=B7=AF=E7=94=B1?= =?UTF-8?q?=E8=A7=84=E5=88=99=E8=B0=83=E7=94=A8=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lego/base/rpcx/service.go | 14 ++++- lego/core/cbase/servicecompoptions.go | 1 + lego/sys/rpcx/rpcx_test.go | 10 +++ lego/sys/rpcx/selector.go | 88 +++++++++++++++++++++++++-- 4 files changed, 107 insertions(+), 6 deletions(-) diff --git a/lego/base/rpcx/service.go b/lego/base/rpcx/service.go index b3d4f7aac..d2e774891 100644 --- a/lego/base/rpcx/service.go +++ b/lego/base/rpcx/service.go @@ -139,12 +139,22 @@ func (this *RPCXService) RegisterFunctionName(name string, fn interface{}) (err return } -//同步 执行目标远程服务方法 +///同步 执行目标远程服务方法 +///servicePath = worker 表示采用负载的方式调用 worker类型服务执行rpc方法 +///servicePath = worker/worker_1 表示寻找目标服务节点调用rpc方法 +///servicePath = worker/!worker_1 表示选择非worker_1的节点随机选择节点执行rpc方法 +///servicePath = worker/[worker_1,worker_2] 表示随机选择[]里面的服务节点执行rpc方法 +///servicePath = worker/![worker_1,worker_2] 表示随机选择非[]里面的服务节点执行rpc方法 func (this *RPCXService) RpcCall(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { return rpcx.Call(ctx, servicePath, serviceMethod, args, reply) } -//异步 执行目标远程服务方法 +///异步 执行目标远程服务方法 +///servicePath = worker 表示采用负载的方式调用 worker类型服务执行rpc方法 +///servicePath = worker/worker_1 表示寻找目标服务节点调用rpc方法 +///servicePath = worker/!worker_1 表示选择非worker_1的节点随机选择节点执行rpc方法 +///servicePath = worker/[worker_1,worker_2] 表示随机选择[]里面的服务节点执行rpc方法 +///servicePath = worker/![worker_1,worker_2] 表示随机选择非[]里面的服务节点执行rpc方法 func (this *RPCXService) RpcGo(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (call *client.Call, err error) { return rpcx.Go(ctx, servicePath, serviceMethod, args, reply, nil) } diff --git a/lego/core/cbase/servicecompoptions.go b/lego/core/cbase/servicecompoptions.go index 2e81b8779..96689af18 100644 --- a/lego/core/cbase/servicecompoptions.go +++ b/lego/core/cbase/servicecompoptions.go @@ -5,6 +5,7 @@ import ( "go_dreamfactory/lego/utils/mapstructure" ) +//服务组件的基础参数对象设计 主要需要实现 LoadConfig 序列化参数接口 type ServiceCompOptions struct { core.ICompOptions } diff --git a/lego/sys/rpcx/rpcx_test.go b/lego/sys/rpcx/rpcx_test.go index 9893ee54e..fc470b2c1 100644 --- a/lego/sys/rpcx/rpcx_test.go +++ b/lego/sys/rpcx/rpcx_test.go @@ -6,6 +6,7 @@ import ( "net" "os" "os/signal" + "regexp" "syscall" "testing" "time" @@ -190,4 +191,13 @@ func Test_Regular(t *testing.T) { // result1 := reg1.FindAllStringSubmatch(str1, -1) // fmt.Println("result1 = ", result1) // strings.Split() + + str := "worker/!worker_1" + // rex := regexp.MustCompile(`\!\[([^)]+)\]`) + rex := regexp.MustCompile(`\!([^)]+)`) + out := rex.FindAllStringSubmatch(str, -1) + + for _, i := range out { + fmt.Println(i[1]) + } } diff --git a/lego/sys/rpcx/selector.go b/lego/sys/rpcx/selector.go index 83a5fa089..13a045c06 100644 --- a/lego/sys/rpcx/selector.go +++ b/lego/sys/rpcx/selector.go @@ -2,13 +2,18 @@ package rpcx import ( "context" - "fmt" "go_dreamfactory/lego/sys/log" + "regexp" "strings" "github.com/smallnest/rpcx/share" + "github.com/valyala/fastrand" ) +var rex_nogather = regexp.MustCompile(`\!\[([^)]+)\]`) +var rex_noid = regexp.MustCompile(`\!([^)]+)`) +var rex_gather = regexp.MustCompile(`\[([^)]+)\]`) + func newSelector() *Selector { return &Selector{ servers: make(map[string]*ServiceNode), @@ -30,9 +35,8 @@ type Selector struct { i map[string]int } -///servicePath = [stype] or [stype/sid] +///servicePath = (worker)/(worker/worker_1)/(worker/!worker_1)/(worker/[worker_1,worker_2])/(worker/![worker_1,worker_2]) func (this *Selector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string { - fmt.Printf("Select servicePath:%v serviceMethod:%v ReqMetaData:%v \n", servicePath, serviceMethod, ctx.Value(share.ReqMetaDataKey)) routrules := ctx.Value(share.ReqMetaDataKey).(map[string]string)[CallRoutRulesKey] service := strings.Split(routrules, "/") leng := len(service) @@ -47,13 +51,21 @@ func (this *Selector) Select(ctx context.Context, servicePath, serviceMethod str return nodes[i].ServiceAddr } } else if leng == 2 { - if node, ok := this.servers[service[1]]; ok { + result := this.ParseRoutRules(service[1]) + if len(result) == 0 { + log.Errorf("Select no found any node servicePath:%v serviceMethod:%v RoutRules:%v \n", servicePath, serviceMethod, routrules) + return "" + } + i := fastrand.Uint32n(uint32(len(result))) + if node, ok := this.servers[result[i]]; ok { return node.ServiceAddr } } + log.Errorf("Select no found any node servicePath:%v serviceMethod:%v RoutRules:%v \n", servicePath, serviceMethod, routrules) return "" } +//更新服务列表 func (this *Selector) UpdateServer(servers map[string]string) { ss := make(map[string]*ServiceNode) sst := make(map[string][]*ServiceNode) @@ -75,3 +87,71 @@ func (this *Selector) UpdateServer(servers map[string]string) { this.servers = ss this.serversType = sst } + +//路由规则解析 +func (this *Selector) ParseRoutRules(rules string) (result []string) { + result = make([]string, 0) + + //解析 ![sid,sid] 格式规则 + if out := rex_nogather.FindAllStringSubmatch(rules, -1); len(out) == 1 && len(out[0]) == 2 { + if nogather := strings.Split(out[0][1], ","); len(nogather) > 0 { + for k, _ := range this.servers { + iskeep := false + for _, v := range nogather { + if k == v { + iskeep = true + break + } + } + if !iskeep { + result = append(result, k) + } + } + return + } + } + //解析 !sid 格式规则 + if out := rex_noid.FindAllStringSubmatch(rules, -1); len(out) == 1 && len(out[0]) == 2 { + for k, _ := range this.servers { + iskeep := false + if k == out[0][1] { + iskeep = true + break + } + if !iskeep { + result = append(result, k) + } + } + return + } + //解析 [sid,sid] 格式规则 + if out := rex_gather.FindAllStringSubmatch(rules, -1); len(out) == 1 && len(out[0]) == 2 { + if nogather := strings.Split(out[0][1], ","); len(nogather) > 0 { + for k, _ := range this.servers { + iskeep := false + for _, v := range nogather { + if k == v { + iskeep = true + break + } + } + if iskeep { + result = append(result, k) + } + } + return + } + } + + for k, _ := range this.servers { + iskeep := false + if k == rules { + iskeep = true + break + } + if !iskeep { + result = append(result, k) + } + } + return +}