diff --git a/lego/sys/rpcx/client.go b/lego/sys/rpcx/client.go index 1e52de33f..64c337669 100644 --- a/lego/sys/rpcx/client.go +++ b/lego/sys/rpcx/client.go @@ -24,6 +24,7 @@ func newClient(options Options) (sys *Client, err error) { metadata: fmt.Sprintf("stype=%s&sid=%s&version=%s&addr=%s", options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), clients: make(map[string]client.XClient), conns: make(map[string]net.Conn), + connecting: make(map[string]struct{}), serviceMap: make(map[string]*service), msgChan: make(chan *protocol.Message, 1000), } @@ -38,6 +39,8 @@ type Client struct { clients map[string]client.XClient connsMapMu sync.RWMutex conns map[string]net.Conn + connectMapMu sync.RWMutex + connecting map[string]struct{} serviceMapMu sync.RWMutex serviceMap map[string]*service msgChan chan *protocol.Message // 接收rpcXServer推送消息 @@ -121,9 +124,12 @@ func (this *Client) Call(ctx context.Context, servicePath string, serviceMethod } c = client.NewBidirectionalXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption, this.msgChan) c.GetPlugins().Add(this) - c.SetSelector(newSelector()) + if this.options.RpcxStartType == RpcxStartByClient && this.options.AutoConnect { + c.SetSelector(newSelector(this.UpdateServer)) + } else { + c.SetSelector(newSelector(nil)) + } this.clients[spath[0]] = c - } ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ CallRoutRulesKey: servicePath, @@ -154,7 +160,7 @@ func (this *Client) Go(ctx context.Context, servicePath string, serviceMethod st } c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption) c.GetPlugins().Add(this) - c.SetSelector(newSelector()) + c.SetSelector(newSelector(this.UpdateServer)) this.clients[spath[0]] = c } ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ @@ -165,12 +171,45 @@ func (this *Client) Go(ctx context.Context, servicePath string, serviceMethod st return c.Go(ctx, string(serviceMethod), args, reply, done) } +//监控服务发现,发现没有连接上的额服务端 就连接上去 +func (this *Client) UpdateServer(servers map[string]*ServiceNode) { + for _, v := range servers { + this.connsMapMu.RLock() + _, ok := this.conns[v.ServiceAddr] + this.connsMapMu.RUnlock() + if !ok { + this.connectMapMu.RLock() + _, ok := this.connecting[v.ServiceAddr] + this.connectMapMu.RUnlock() + if !ok { + this.connectMapMu.Lock() + this.connecting[v.ServiceAddr] = struct{}{} + this.connectMapMu.Unlock() + if err := this.Call(context.Background(), fmt.Sprintf("%s/%s", v.ServiceType, v.ServiceId), RpcX_ShakeHands, &ServiceNode{ + ServiceId: this.options.ServiceId, + ServiceType: this.options.ServiceType, + ServiceAddr: this.options.ServiceAddr}, + &ServiceNode{}); err != nil { + this.Errorf("ShakeHands new node addr:%s err:%v", v.ServiceAddr, err) + this.connectMapMu.Lock() + delete(this.connecting, v.ServiceAddr) + this.connectMapMu.Unlock() + } + } + } + } +} + //监控连接建立 func (this *Client) ClientConnected(conn net.Conn) (net.Conn, error) { addr := "tcp@" + conn.RemoteAddr().String() this.connsMapMu.Lock() this.conns[addr] = conn this.connsMapMu.Unlock() + this.connectMapMu.Lock() + delete(this.connecting, addr) + this.connectMapMu.Unlock() + this.Errorf("ClientConnected addr:%v", addr) return conn, nil } diff --git a/lego/sys/rpcx/core.go b/lego/sys/rpcx/core.go index 5368e45ec..848247357 100644 --- a/lego/sys/rpcx/core.go +++ b/lego/sys/rpcx/core.go @@ -2,6 +2,7 @@ package rpcx import ( "context" + "errors" "fmt" "net/url" @@ -14,11 +15,12 @@ const ( CallRoutRulesKey = "callrules" ) +const RpcX_ShakeHands = "RpcX_ShakeHands" //握手 + type ( ISys interface { Start() (err error) Stop() (err error) - // Register(rcvr interface{}) (err error) RegisterFunction(fn interface{}) (err error) RegisterFunctionName(name string, fn interface{}) (err error) UnregisterAll() (err error) @@ -57,9 +59,6 @@ func Stop() (err error) { return defsys.Stop() } -// func Register(rcvr interface{}) (err error) { -// return defsys.Register(rcvr) -// } func RegisterFunction(fn interface{}) (err error) { return defsys.RegisterFunction(fn) } @@ -82,7 +81,7 @@ func Go(ctx context.Context, servicePath string, serviceMethod string, args inte //服务元数据转服务节点信息 func smetaToServiceNode(meta string) (node *ServiceNode, err error) { if meta == "" { - fmt.Errorf("meta is nill") + err = errors.New("meta is nill") return } node = &ServiceNode{} diff --git a/lego/sys/rpcx/options.go b/lego/sys/rpcx/options.go index c7c188460..c5cbae3d2 100644 --- a/lego/sys/rpcx/options.go +++ b/lego/sys/rpcx/options.go @@ -25,6 +25,7 @@ type Options struct { ServiceAddr string //服务地址 ConsulServers []string //Consul集群服务地址 RpcxStartType RpcxStartType //Rpcx启动类型 + AutoConnect bool //自动连接 客户端启动模式下 主动连接发现的节点服务器 SerializeType protocol.SerializeType Debug bool //日志是否开启 Log log.ILog @@ -86,6 +87,7 @@ func SetLog(v log.ILog) Option { func newOptions(config map[string]interface{}, opts ...Option) (Options, error) { options := Options{ + AutoConnect: true, SerializeType: protocol.MsgPack, Debug: true, Log: log.Clone(log.SetLoglayer(2)), @@ -104,6 +106,7 @@ func newOptions(config map[string]interface{}, opts ...Option) (Options, error) func newOptionsByOption(opts ...Option) (Options, error) { options := Options{ + AutoConnect: true, SerializeType: protocol.MsgPack, Debug: true, Log: log.Clone(log.SetLoglayer(2)), diff --git a/lego/sys/rpcx/selector.go b/lego/sys/rpcx/selector.go index 759f86847..abc89600c 100644 --- a/lego/sys/rpcx/selector.go +++ b/lego/sys/rpcx/selector.go @@ -14,11 +14,12 @@ var rex_nogather = regexp.MustCompile(`\!\[([^)]+)\]`) var rex_noid = regexp.MustCompile(`\!([^)]+)`) var rex_gather = regexp.MustCompile(`\[([^)]+)\]`) -func newSelector() *Selector { +func newSelector(fn func(map[string]*ServiceNode)) *Selector { return &Selector{ - servers: make(map[string]*ServiceNode), - serversType: make(map[string][]*ServiceNode), - i: make(map[string]int), + updateServerEvent: fn, + servers: make(map[string]*ServiceNode), + serversType: make(map[string][]*ServiceNode), + i: make(map[string]int), } } @@ -30,9 +31,10 @@ type ServiceNode struct { } type Selector struct { - servers map[string]*ServiceNode - serversType map[string][]*ServiceNode - i map[string]int + updateServerEvent func(map[string]*ServiceNode) + servers map[string]*ServiceNode + serversType map[string][]*ServiceNode + i map[string]int } ///servicePath = (worker)/(worker/worker_1)/(worker/!worker_1)/(worker/[worker_1,worker_2])/(worker/![worker_1,worker_2]) @@ -86,6 +88,9 @@ func (this *Selector) UpdateServer(servers map[string]string) { } this.servers = ss this.serversType = sst + if this.updateServerEvent != nil { + go this.updateServerEvent(ss) + } } //路由规则解析 diff --git a/lego/sys/rpcx/service.go b/lego/sys/rpcx/service.go index ebb991570..fb2310265 100644 --- a/lego/sys/rpcx/service.go +++ b/lego/sys/rpcx/service.go @@ -23,7 +23,7 @@ func newService(options Options) (sys *Service, err error) { options: options, metadata: fmt.Sprintf("stype=%s&sid=%s&version=%s&addr=%s", options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), server: server.NewServer(), - selector: newSelector(), + selector: newSelector(nil), clients: make(map[string]net.Conn), clientmeta: make(map[string]string), pending: make(map[uint64]*client.Call), @@ -41,6 +41,7 @@ func newService(options Options) (sys *Service, err error) { } sys.server.Plugins.Add(r) sys.server.Plugins.Add(sys) + sys.RegisterFunctionName(RpcX_ShakeHands, sys.RpcxShakeHands) //注册握手函数 return } @@ -148,7 +149,7 @@ func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message) } this.clientmutex.Unlock() this.selector.UpdateServer(servers) - this.Debugf("PreReadRequest addr:%s smeta:%s \n", addr, smeta) + this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta) } } } @@ -203,6 +204,12 @@ func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e return nil } +//Rpcx握手默认提供给client连接使用 +func (this *Service) RpcxShakeHands(ctx context.Context, args *ServiceNode, reply *ServiceNode) error { + // this.Debugf("RpcxShakeHands:%+v", ctx.Value(share.ReqMetaDataKey).(map[string]string)) + return nil +} + ///日志*********************************************************************** func (this *Service) Debug() bool { return this.options.Debug