go_dreamfactory/lego/sys/rpcx/rpcx_test.go
2022-06-15 14:45:49 +08:00

204 lines
5.6 KiB
Go

package rpcx
import (
"context"
"fmt"
"net"
"os"
"os/signal"
"regexp"
"syscall"
"testing"
"time"
"go_dreamfactory/lego/sys/log"
"github.com/rcrowley/go-metrics"
"github.com/smallnest/rpcx/client"
"github.com/smallnest/rpcx/protocol"
"github.com/smallnest/rpcx/server"
"github.com/smallnest/rpcx/serverplugin"
"github.com/smallnest/rpcx/share"
)
func Test_Sys(t *testing.T) {
if err := log.OnInit(nil); err != nil {
fmt.Printf("err:%v", err)
return
}
if sys, err := NewSys(
SetServiceTag("dreamfactory"),
SetServiceType("worker"),
SetServiceId("worker_1"),
SetServiceVersion("1.0.0"),
SetServiceAddr("127.0.0.1:9978"),
SetConsulServers([]string{"10.0.0.9:8500"}),
); err != nil {
fmt.Printf("err:%v", err)
return
} else {
if err = sys.Register(new(Arith)); err != nil {
fmt.Printf("err:%v", err)
return
}
if err = sys.Start(); err != nil {
fmt.Printf("err:%v", err)
return
}
go func() {
time.Sleep(time.Second * 3)
if err = sys.Call(context.Background(), "worker/worker_1", "Mul", &Args{A: 1, B: 2}, &Reply{}); err != nil {
fmt.Printf("Call:%v \n", err)
return
}
}()
}
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigterm:
fmt.Printf("terminating: via signal\n")
}
}
var addr = "127.0.0.1:9978"
// go server.go
func Test_RPCX(t *testing.T) {
s := server.NewServer()
if err := addRegistryPlugin(s); err != nil {
fmt.Printf("err:%v", err)
return
}
go func() {
time.Sleep(time.Second)
s.RegisterName("worker", new(Arith), "stype=worker&sid=worker_1&version=1.0.0&addr=tcp@127.0.0.1:9978")
}()
go func() {
time.Sleep(time.Second * 3)
if d, err := client.NewConsulDiscovery("rpcx_test", "worker", []string{"10.0.0.9:8500"}, nil); err != nil {
fmt.Printf("NewConsulDiscovery err:%v", err)
return
} else {
xclient := client.NewXClient("worker", client.Failfast, client.RandomSelect, d, client.DefaultOption)
xclient.SetSelector(newSelector())
ctx := context.WithValue(context.Background(), share.ReqMetaDataKey, map[string]string{"RoutRules": "worker/worker_1"})
if err = xclient.Call(ctx, "Mul", &Args{A: 1, B: 2}, &Reply{}); err != nil {
fmt.Printf("Call:%v \n", err)
return
}
}
}()
s.Serve("tcp", addr)
}
func addRegistryPlugin(s *server.Server) (err error) {
r := &serverplugin.ConsulRegisterPlugin{
ServiceAddress: "tcp@" + addr,
ConsulServers: []string{"10.0.0.9:8500"},
BasePath: "rpcx_test",
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
err = r.Start()
if err != nil {
return
}
s.Plugins.Add(r)
s.Plugins.Add(&call{})
return
}
type call struct{}
// func (this *call) PostCall(ctx context.Context, serviceName, methodName string, args, reply interface{}) (interface{}, error) {
// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// RoutRules := ctx.Value("RoutRules")
// fmt.Printf("PostCall servicePath:%v serviceMethod:%v RoutRules:%s RemoteAddr:%v \n", serviceName, methodName, RoutRules, clientConn.RemoteAddr().String())
// return args, nil
// }
// func (this *call) PreCall(ctx context.Context, serviceName, methodName string, args interface{}) (interface{}, error) {
// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// RoutRules := ctx.Value("RoutRules").(string)
// fmt.Printf("PostCall servicePath:%v serviceMethod:%v RoutRules:%s RemoteAddr:%v \n", serviceName, methodName, RoutRules, clientConn.RemoteAddr().String())
// return args, nil
// }
// func (this *call) PreReadRequest(ctx context.Context) error {
// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// RoutRules := ctx.Value(share.ReqMetaDataKey).(map[string]string)
// fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String())
// return nil
// }
// func (this *call) PreWriteResponse(ctx context.Context, args *protocol.Message, repy *protocol.Message, errInter error) error {
// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// RoutRules := ctx.Value("RoutRules").(string)
// fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String())
// return nil
// }
func (this *call) PreHandleRequest(ctx context.Context, r *protocol.Message) error {
clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
RoutRules := ctx.Value(share.ReqMetaDataKey).(map[string]string)
fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String())
return nil
}
type Args struct {
A int
B int
}
type Reply struct {
C int
}
type Arith int
func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
reply.C = args.A * args.B
fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C)
return nil
}
func (t *Arith) Add(ctx context.Context, args *Args, reply *Reply) error {
reply.C = args.A + args.B
fmt.Printf("call: %d + %d = %d\n", args.A, args.B, reply.C)
return nil
}
func (t *Arith) Say(ctx context.Context, args *string, reply *string) error {
*reply = "hello " + *args
return nil
}
///正则测试
func Test_Regular(t *testing.T) {
// str1 := "worker"
// str2 := "worker/worker_1"
// str3 := "worker/!worker_1"
// str4 := "worker/[worker_1,worker_2]"
// str5 := "worker/![worker_1,worker_2]"
// reg1 := regexp.MustCompile(`/`)
//根据规则提取关键信息
// result1 := reg1.FindAllStringSubmatch(str1, -1)
// fmt.Println("result1 = ", result1)
// strings.Split()
str := "worker/!worker_1"
// rex := regexp.MustCompile(`\!\[([^)]+)\]`)
rex := regexp.MustCompile(`\!([^)]+)`)
out := rex.FindAllStringSubmatch(str, -1)
for _, i := range out {
fmt.Println(i[1])
}
}