diff --git a/lego/sys/rpcx/rpcx.go b/lego/sys/rpcx/rpcx.go index 86344ec70..65ff7ae93 100644 --- a/lego/sys/rpcx/rpcx.go +++ b/lego/sys/rpcx/rpcx.go @@ -12,7 +12,7 @@ func newSys(options Options) (sys *RPCX, err error) { options: options, metadata: fmt.Sprintf("stype=%s&sid=%s&version=%s&addr=%s", options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), } - sys.service = newService(sys) + sys.service, err = newService(sys) sys.client = newClient(sys) // if options.RpcxStartType == RpcxStartByAll || options.RpcxStartType == RpcxStartByService { //创建RPCX 服务端 diff --git a/lego/sys/rpcx/rpcx_test.go b/lego/sys/rpcx/rpcx_test.go index 70a592f59..0e535f86c 100644 --- a/lego/sys/rpcx/rpcx_test.go +++ b/lego/sys/rpcx/rpcx_test.go @@ -26,7 +26,7 @@ func Test_Sys(t *testing.T) { return } if sys, err := NewSys( - SetServiceTag("rpcx_test"), + SetServiceTag("dreamfactory"), SetServiceType("worker"), SetServiceId("worker_1"), SetServiceVersion("1.0.0"), @@ -72,7 +72,7 @@ func Test_RPCX(t *testing.T) { } go func() { time.Sleep(time.Second) - s.RegisterName("worker", new(Arith), "stype=worker&sid=worker_1&version=1.0.0") + s.RegisterName("worker", new(Arith), "stype=worker&sid=worker_1&version=1.0.0&addr=tcp@127.0.0.1:9978") }() go func() { diff --git a/lego/sys/rpcx/selector.go b/lego/sys/rpcx/selector.go index 4d29ab35f..83a5fa089 100644 --- a/lego/sys/rpcx/selector.go +++ b/lego/sys/rpcx/selector.go @@ -3,6 +3,7 @@ package rpcx import ( "context" "fmt" + "go_dreamfactory/lego/sys/log" "strings" "github.com/smallnest/rpcx/share" @@ -31,8 +32,8 @@ type Selector struct { ///servicePath = [stype] or [stype/sid] func (this *Selector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string { - fmt.Printf("Select servicePath:%v serviceMethod:%v RoutRules:%v \n", servicePath, serviceMethod, ctx.Value("RoutRules")) - routrules := ctx.Value(share.ReqMetaDataKey).(map[string]string)["RoutRules"] + fmt.Printf("Select servicePath:%v serviceMethod:%v ReqMetaData:%v \n", servicePath, serviceMethod, ctx.Value(share.ReqMetaDataKey)) + routrules := ctx.Value(share.ReqMetaDataKey).(map[string]string)[CallRoutRulesKey] service := strings.Split(routrules, "/") leng := len(service) if leng == 1 { @@ -58,6 +59,7 @@ func (this *Selector) UpdateServer(servers map[string]string) { sst := make(map[string][]*ServiceNode) for _, v := range servers { if node, err := smetaToServiceNode(v); err != nil { + log.Errorf("smetaToServiceNode:%s err:%v", v, err) continue } else { ss[node.ServiceId] = node diff --git a/lego/sys/rpcx/service.go b/lego/sys/rpcx/service.go index d89206b2b..7a5e36589 100644 --- a/lego/sys/rpcx/service.go +++ b/lego/sys/rpcx/service.go @@ -2,6 +2,7 @@ package rpcx import ( "context" + "go_dreamfactory/lego/sys/log" "net" "time" @@ -11,13 +12,25 @@ import ( "github.com/smallnest/rpcx/serverplugin" ) -func newService(rpcx *RPCX) (s *Service) { +func newService(rpcx *RPCX) (s *Service, err error) { s = &Service{ server: server.NewServer(), rpcx: rpcx, // clients: make(map[string]net.Conn), // clientmeta: make(map[string]string), } + + r := &serverplugin.ConsulRegisterPlugin{ + ServiceAddress: "tcp@" + rpcx.options.ServiceAddr, + ConsulServers: []string{"10.0.0.9:8500"}, + BasePath: rpcx.options.ServiceTag, + Metrics: metrics.NewRegistry(), + UpdateInterval: time.Minute, + } + if err = r.Start(); err != nil { + return + } + s.server.Plugins.Add(r) return } @@ -30,19 +43,11 @@ type Service struct { } func (this *Service) Start() (err error) { - r := &serverplugin.ConsulRegisterPlugin{ - ServiceAddress: "tcp@" + this.rpcx.options.ServiceAddr, - ConsulServers: []string{"10.0.0.9:8500"}, - BasePath: this.rpcx.options.ServiceAddr, - Metrics: metrics.NewRegistry(), - UpdateInterval: time.Minute, - } - if err = r.Start(); err != nil { - return - } - this.server.Plugins.Add(r) + go func() { - this.server.Serve("tcp", this.rpcx.options.ServiceAddr) + if err = this.server.Serve("tcp", this.rpcx.options.ServiceAddr); err != nil { + log.Errorf("rpcx server exit!") + } }() return }