diff --git a/lego/sys/rpcx/service.go b/lego/sys/rpcx/service.go index d9bd0e8dc..49031e606 100644 --- a/lego/sys/rpcx/service.go +++ b/lego/sys/rpcx/service.go @@ -46,16 +46,17 @@ func newService(options *Options) (sys *Service, err error) { } type Service struct { - options *Options - metadata string - server *server.Server - selectors map[string]ISelector - clientmutex sync.RWMutex - clients map[string]net.Conn - clientmeta map[string]string - mutex sync.Mutex // protects following - seq uint64 - pending map[uint64]*client.Call + options *Options + metadata string + server *server.Server + selectormutex sync.RWMutex + selectors map[string]ISelector + clientmutex sync.RWMutex + clients map[string]net.Conn + clientmeta map[string]string + mutex sync.Mutex // protects following + seq uint64 + pending map[uint64]*client.Call } //RPC 服务启动 @@ -266,16 +267,22 @@ func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e if isCallMessage := (r.MessageType() == protocol.Request); isCallMessage { var ( stag string - selector client.Selector + selector ISelector ok bool ) req_metadata := r.Metadata this.Debugf("PreReadRequest ServicePath:%s ServicePath:%s Metadata:%v ", r.ServicePath, r.ServiceMethod, r.Metadata) if stag, ok = req_metadata[ServiceClusterTag]; ok { - if selector, ok = this.selectors[stag]; !ok { - this.selectors[stag] = newSelector(nil) - selector = this.selectors[stag] + this.selectormutex.RLock() + selector, ok = this.selectors[stag] + this.selectormutex.RUnlock() + if !ok { + selector = newSelector(nil) + this.selectormutex.Lock() + this.selectors[stag] = selector + this.selectormutex.Unlock() } + if addr, ok := req_metadata[ServiceAddrKey]; ok { if _, ok = this.clientmeta[addr]; !ok { if smeta, ok := req_metadata[ServiceMetaKey]; ok { @@ -405,8 +412,12 @@ func (this *Service) call(ctx context.Context, clusterTag string, servicePath st ServiceAddrKey: "tcp@" + this.options.ServiceAddr, ServiceMetaKey: this.metadata, }) - if selector, ok = this.selectors[clusterTag]; !ok { + this.selectormutex.RLock() + selector, ok = this.selectors[clusterTag] + this.selectormutex.RUnlock() + if !ok { err = fmt.Errorf("on found serviceTag:%s", clusterTag) + return } this.clientmutex.RLock() if clientaddr = selector.Select(ctx, spath[0], serviceMethod, args); clientaddr == "" { @@ -462,7 +473,10 @@ func (this *Service) broadcast(ctx context.Context, clusterTag string, servicePa ServiceAddrKey: "tcp@" + this.options.ServiceAddr, ServiceMetaKey: this.metadata, }) - if selector, ok = this.selectors[clusterTag]; !ok { + this.selectormutex.RLock() + selector, ok = this.selectors[clusterTag] + this.selectormutex.RUnlock() + if !ok { err = fmt.Errorf("on found serviceTag:%s", clusterTag) } if clientaddrs = selector.Find(ctx, spath[0], serviceMethod, args); clientaddrs == nil || len(clientaddrs) == 0 { @@ -556,6 +570,7 @@ func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, s ServiceMetaKey: this.metadata, }) addrs := make(map[string]struct{}) + this.selectormutex.RLock() for _, v := range this.selectors { if clientaddrs = v.Find(ctx, spath[0], serviceMethod, args); clientaddrs != nil && len(clientaddrs) >= 0 { for _, v1 := range clientaddrs { @@ -563,6 +578,7 @@ func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, s } } } + this.selectormutex.RUnlock() if len(addrs) == 0 { err = fmt.Errorf("on found service:%s", spath[0]) return