From 10881f141188c31559e79ec10c4ab5d994a07dc5 Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Fri, 22 Jul 2022 13:41:44 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E9=80=89=E6=8B=A9=E5=99=A8?= =?UTF-8?q?=E9=94=81=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lego/sys/rpcx/service.go | 80 ++++------------------------------------ 1 file changed, 8 insertions(+), 72 deletions(-) diff --git a/lego/sys/rpcx/service.go b/lego/sys/rpcx/service.go index bb6714eaa..598f3bf23 100644 --- a/lego/sys/rpcx/service.go +++ b/lego/sys/rpcx/service.go @@ -195,73 +195,6 @@ func (this *Service) ClusterBroadcast(ctx context.Context, servicePath string, s return } -// func (this *Service) PreReadRequest(ctx context.Context) error { -// var ( -// stag string -// selector client.Selector -// ok bool -// ) -// req_metadata, ok := ctx.Value(share.ReqMetaDataKey).(map[string]string) -// this.Debugf("PreReadRequest Meta:%v ", ctx.Value(share.ReqMetaDataKey)) -// if stag, ok = req_metadata[ServiceClusterTag]; ok { -// if selector, ok = this.selectors[stag]; !ok { -// this.selectors[stag] = newSelector(nil) -// selector = this.selectors[stag] -// } -// if addr, ok := req_metadata[ServiceAddrKey]; ok { -// if _, ok = this.clientmeta[addr]; !ok { -// if smeta, ok := req_metadata[ServiceMetaKey]; ok { -// servers := make(map[string]string) -// this.clientmutex.Lock() -// this.clientmeta[addr] = smeta -// this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn) -// for k, v := range this.clientmeta { -// servers[k] = v -// } -// this.clientmutex.Unlock() -// selector.UpdateServer(servers) -// this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta) -// } -// } -// } -// } -// return nil -// } - -//监听客户端链接到服务上 保存客户端的连接对象 -// func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message) error { -// var ( -// stag string -// selector client.Selector -// ok bool -// ) -// req_metadata := ctx.Value(share.ReqMetaDataKey).(map[string]string) -// this.Debugf("PreHandleRequest ServicePath:%s ServiceMethod:%s Meta:%v ", r.ServicePath, r.ServiceMethod, ctx.Value(share.ReqMetaDataKey)) -// if stag, ok = req_metadata[ServiceClusterTag]; ok { -// if selector, ok = this.selectors[stag]; !ok { -// this.selectors[stag] = newSelector(nil) -// selector = this.selectors[stag] -// } -// if addr, ok := req_metadata[ServiceAddrKey]; ok { -// if _, ok = this.clientmeta[addr]; !ok { -// if smeta, ok := req_metadata[ServiceMetaKey]; ok { -// servers := make(map[string]string) -// this.clientmutex.Lock() -// this.clientmeta[addr] = smeta -// this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn) -// for k, v := range this.clientmeta { -// servers[k] = v -// } -// this.clientmutex.Unlock() -// selector.UpdateServer(servers) -// this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta) -// } -// } -// } -// } -// return nil -// } - //监控rpc连接收到的请求消息 处理消息回调请求 func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e error) error { if isCallMessage := (r.MessageType() == protocol.Request); isCallMessage { @@ -273,11 +206,14 @@ func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e req_metadata := r.Metadata this.Debugf("PreReadRequest ServicePath:%s ServicePath:%s Metadata:%v ", r.ServicePath, r.ServiceMethod, r.Metadata) if stag, ok = req_metadata[ServiceClusterTag]; ok { - if selector, ok = this.selectors[stag]; !ok { - this.mutex.Lock() - this.selectors[stag] = newSelector(nil) - selector = this.selectors[stag] - this.mutex.Unlock() + this.selectormutex.RLock() + selector, ok = this.selectors[stag] + this.selectormutex.RUnlock() + if !ok { + selector = newSelector(nil) + this.selectormutex.Lock() + this.selectors[stag] = selector + this.selectormutex.Unlock() } if addr, ok := req_metadata[ServiceAddrKey]; ok {