This commit is contained in:
meixiongfeng 2022-06-15 14:47:34 +08:00
commit 1ea06ac54d
4 changed files with 107 additions and 6 deletions

View File

@ -139,12 +139,22 @@ func (this *RPCXService) RegisterFunctionName(name string, fn interface{}) (err
return
}
//同步 执行目标远程服务方法
///同步 执行目标远程服务方法
///servicePath = worker 表示采用负载的方式调用 worker类型服务执行rpc方法
///servicePath = worker/worker_1 表示寻找目标服务节点调用rpc方法
///servicePath = worker/!worker_1 表示选择非worker_1的节点随机选择节点执行rpc方法
///servicePath = worker/[worker_1,worker_2] 表示随机选择[]里面的服务节点执行rpc方法
///servicePath = worker/![worker_1,worker_2] 表示随机选择非[]里面的服务节点执行rpc方法
func (this *RPCXService) RpcCall(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
return rpcx.Call(ctx, servicePath, serviceMethod, args, reply)
}
//异步 执行目标远程服务方法
///异步 执行目标远程服务方法
///servicePath = worker 表示采用负载的方式调用 worker类型服务执行rpc方法
///servicePath = worker/worker_1 表示寻找目标服务节点调用rpc方法
///servicePath = worker/!worker_1 表示选择非worker_1的节点随机选择节点执行rpc方法
///servicePath = worker/[worker_1,worker_2] 表示随机选择[]里面的服务节点执行rpc方法
///servicePath = worker/![worker_1,worker_2] 表示随机选择非[]里面的服务节点执行rpc方法
func (this *RPCXService) RpcGo(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (call *client.Call, err error) {
return rpcx.Go(ctx, servicePath, serviceMethod, args, reply, nil)
}

View File

@ -5,6 +5,7 @@ import (
"go_dreamfactory/lego/utils/mapstructure"
)
//服务组件的基础参数对象设计 主要需要实现 LoadConfig 序列化参数接口
type ServiceCompOptions struct {
core.ICompOptions
}

View File

@ -6,6 +6,7 @@ import (
"net"
"os"
"os/signal"
"regexp"
"syscall"
"testing"
"time"
@ -190,4 +191,13 @@ func Test_Regular(t *testing.T) {
// result1 := reg1.FindAllStringSubmatch(str1, -1)
// fmt.Println("result1 = ", result1)
// strings.Split()
str := "worker/!worker_1"
// rex := regexp.MustCompile(`\!\[([^)]+)\]`)
rex := regexp.MustCompile(`\!([^)]+)`)
out := rex.FindAllStringSubmatch(str, -1)
for _, i := range out {
fmt.Println(i[1])
}
}

View File

@ -2,13 +2,18 @@ package rpcx
import (
"context"
"fmt"
"go_dreamfactory/lego/sys/log"
"regexp"
"strings"
"github.com/smallnest/rpcx/share"
"github.com/valyala/fastrand"
)
var rex_nogather = regexp.MustCompile(`\!\[([^)]+)\]`)
var rex_noid = regexp.MustCompile(`\!([^)]+)`)
var rex_gather = regexp.MustCompile(`\[([^)]+)\]`)
func newSelector() *Selector {
return &Selector{
servers: make(map[string]*ServiceNode),
@ -30,9 +35,8 @@ type Selector struct {
i map[string]int
}
///servicePath = [stype] or [stype/sid]
///servicePath = (worker)/(worker/worker_1)/(worker/!worker_1)/(worker/[worker_1,worker_2])/(worker/![worker_1,worker_2])
func (this *Selector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string {
fmt.Printf("Select servicePath:%v serviceMethod:%v ReqMetaData:%v \n", servicePath, serviceMethod, ctx.Value(share.ReqMetaDataKey))
routrules := ctx.Value(share.ReqMetaDataKey).(map[string]string)[CallRoutRulesKey]
service := strings.Split(routrules, "/")
leng := len(service)
@ -47,13 +51,21 @@ func (this *Selector) Select(ctx context.Context, servicePath, serviceMethod str
return nodes[i].ServiceAddr
}
} else if leng == 2 {
if node, ok := this.servers[service[1]]; ok {
result := this.ParseRoutRules(service[1])
if len(result) == 0 {
log.Errorf("Select no found any node servicePath:%v serviceMethod:%v RoutRules:%v \n", servicePath, serviceMethod, routrules)
return ""
}
i := fastrand.Uint32n(uint32(len(result)))
if node, ok := this.servers[result[i]]; ok {
return node.ServiceAddr
}
}
log.Errorf("Select no found any node servicePath:%v serviceMethod:%v RoutRules:%v \n", servicePath, serviceMethod, routrules)
return ""
}
//更新服务列表
func (this *Selector) UpdateServer(servers map[string]string) {
ss := make(map[string]*ServiceNode)
sst := make(map[string][]*ServiceNode)
@ -75,3 +87,71 @@ func (this *Selector) UpdateServer(servers map[string]string) {
this.servers = ss
this.serversType = sst
}
//路由规则解析
func (this *Selector) ParseRoutRules(rules string) (result []string) {
result = make([]string, 0)
//解析 ![sid,sid] 格式规则
if out := rex_nogather.FindAllStringSubmatch(rules, -1); len(out) == 1 && len(out[0]) == 2 {
if nogather := strings.Split(out[0][1], ","); len(nogather) > 0 {
for k, _ := range this.servers {
iskeep := false
for _, v := range nogather {
if k == v {
iskeep = true
break
}
}
if !iskeep {
result = append(result, k)
}
}
return
}
}
//解析 !sid 格式规则
if out := rex_noid.FindAllStringSubmatch(rules, -1); len(out) == 1 && len(out[0]) == 2 {
for k, _ := range this.servers {
iskeep := false
if k == out[0][1] {
iskeep = true
break
}
if !iskeep {
result = append(result, k)
}
}
return
}
//解析 [sid,sid] 格式规则
if out := rex_gather.FindAllStringSubmatch(rules, -1); len(out) == 1 && len(out[0]) == 2 {
if nogather := strings.Split(out[0][1], ","); len(nogather) > 0 {
for k, _ := range this.servers {
iskeep := false
for _, v := range nogather {
if k == v {
iskeep = true
break
}
}
if iskeep {
result = append(result, k)
}
}
return
}
}
for k, _ := range this.servers {
iskeep := false
if k == rules {
iskeep = true
break
}
if !iskeep {
result = append(result, k)
}
}
return
}