上传选择器锁处理

This commit is contained in:
liwei1dao 2022-07-22 13:41:44 +08:00
parent 680d6f5c0e
commit 10881f1411

View File

@ -195,73 +195,6 @@ func (this *Service) ClusterBroadcast(ctx context.Context, servicePath string, s
return
}
// func (this *Service) PreReadRequest(ctx context.Context) error {
// var (
// stag string
// selector client.Selector
// ok bool
// )
// req_metadata, ok := ctx.Value(share.ReqMetaDataKey).(map[string]string)
// this.Debugf("PreReadRequest Meta:%v ", ctx.Value(share.ReqMetaDataKey))
// if stag, ok = req_metadata[ServiceClusterTag]; ok {
// if selector, ok = this.selectors[stag]; !ok {
// this.selectors[stag] = newSelector(nil)
// selector = this.selectors[stag]
// }
// if addr, ok := req_metadata[ServiceAddrKey]; ok {
// if _, ok = this.clientmeta[addr]; !ok {
// if smeta, ok := req_metadata[ServiceMetaKey]; ok {
// servers := make(map[string]string)
// this.clientmutex.Lock()
// this.clientmeta[addr] = smeta
// this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn)
// for k, v := range this.clientmeta {
// servers[k] = v
// }
// this.clientmutex.Unlock()
// selector.UpdateServer(servers)
// this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta)
// }
// }
// }
// }
// return nil
// }
//监听客户端链接到服务上 保存客户端的连接对象
// func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message) error {
// var (
// stag string
// selector client.Selector
// ok bool
// )
// req_metadata := ctx.Value(share.ReqMetaDataKey).(map[string]string)
// this.Debugf("PreHandleRequest ServicePath:%s ServiceMethod:%s Meta:%v ", r.ServicePath, r.ServiceMethod, ctx.Value(share.ReqMetaDataKey))
// if stag, ok = req_metadata[ServiceClusterTag]; ok {
// if selector, ok = this.selectors[stag]; !ok {
// this.selectors[stag] = newSelector(nil)
// selector = this.selectors[stag]
// }
// if addr, ok := req_metadata[ServiceAddrKey]; ok {
// if _, ok = this.clientmeta[addr]; !ok {
// if smeta, ok := req_metadata[ServiceMetaKey]; ok {
// servers := make(map[string]string)
// this.clientmutex.Lock()
// this.clientmeta[addr] = smeta
// this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn)
// for k, v := range this.clientmeta {
// servers[k] = v
// }
// this.clientmutex.Unlock()
// selector.UpdateServer(servers)
// this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta)
// }
// }
// }
// }
// return nil
// }
//监控rpc连接收到的请求消息 处理消息回调请求
func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e error) error {
if isCallMessage := (r.MessageType() == protocol.Request); isCallMessage {
@ -273,11 +206,14 @@ func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e
req_metadata := r.Metadata
this.Debugf("PreReadRequest ServicePath:%s ServicePath:%s Metadata:%v ", r.ServicePath, r.ServiceMethod, r.Metadata)
if stag, ok = req_metadata[ServiceClusterTag]; ok {
if selector, ok = this.selectors[stag]; !ok {
this.mutex.Lock()
this.selectors[stag] = newSelector(nil)
selector = this.selectors[stag]
this.mutex.Unlock()
this.selectormutex.RLock()
selector, ok = this.selectors[stag]
this.selectormutex.RUnlock()
if !ok {
selector = newSelector(nil)
this.selectormutex.Lock()
this.selectors[stag] = selector
this.selectormutex.Unlock()
}
if addr, ok := req_metadata[ServiceAddrKey]; ok {