上传选择器锁处理

This commit is contained in:
liwei1dao 2022-07-21 17:01:00 +08:00
parent 253386266f
commit 39230faea3

View File

@ -46,16 +46,17 @@ func newService(options *Options) (sys *Service, err error) {
}
type Service struct {
options *Options
metadata string
server *server.Server
selectors map[string]ISelector
clientmutex sync.RWMutex
clients map[string]net.Conn
clientmeta map[string]string
mutex sync.Mutex // protects following
seq uint64
pending map[uint64]*client.Call
options *Options
metadata string
server *server.Server
selectormutex sync.RWMutex
selectors map[string]ISelector
clientmutex sync.RWMutex
clients map[string]net.Conn
clientmeta map[string]string
mutex sync.Mutex // protects following
seq uint64
pending map[uint64]*client.Call
}
//RPC 服务启动
@ -266,16 +267,22 @@ func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e
if isCallMessage := (r.MessageType() == protocol.Request); isCallMessage {
var (
stag string
selector client.Selector
selector ISelector
ok bool
)
req_metadata := r.Metadata
this.Debugf("PreReadRequest ServicePath:%s ServicePath:%s Metadata:%v ", r.ServicePath, r.ServiceMethod, r.Metadata)
if stag, ok = req_metadata[ServiceClusterTag]; ok {
if selector, ok = this.selectors[stag]; !ok {
this.selectors[stag] = newSelector(nil)
selector = this.selectors[stag]
this.selectormutex.RLock()
selector, ok = this.selectors[stag]
this.selectormutex.RUnlock()
if !ok {
selector = newSelector(nil)
this.selectormutex.Lock()
this.selectors[stag] = selector
this.selectormutex.Unlock()
}
if addr, ok := req_metadata[ServiceAddrKey]; ok {
if _, ok = this.clientmeta[addr]; !ok {
if smeta, ok := req_metadata[ServiceMetaKey]; ok {
@ -405,8 +412,12 @@ func (this *Service) call(ctx context.Context, clusterTag string, servicePath st
ServiceAddrKey: "tcp@" + this.options.ServiceAddr,
ServiceMetaKey: this.metadata,
})
if selector, ok = this.selectors[clusterTag]; !ok {
this.selectormutex.RLock()
selector, ok = this.selectors[clusterTag]
this.selectormutex.RUnlock()
if !ok {
err = fmt.Errorf("on found serviceTag:%s", clusterTag)
return
}
this.clientmutex.RLock()
if clientaddr = selector.Select(ctx, spath[0], serviceMethod, args); clientaddr == "" {
@ -462,7 +473,10 @@ func (this *Service) broadcast(ctx context.Context, clusterTag string, servicePa
ServiceAddrKey: "tcp@" + this.options.ServiceAddr,
ServiceMetaKey: this.metadata,
})
if selector, ok = this.selectors[clusterTag]; !ok {
this.selectormutex.RLock()
selector, ok = this.selectors[clusterTag]
this.selectormutex.RUnlock()
if !ok {
err = fmt.Errorf("on found serviceTag:%s", clusterTag)
}
if clientaddrs = selector.Find(ctx, spath[0], serviceMethod, args); clientaddrs == nil || len(clientaddrs) == 0 {
@ -556,6 +570,7 @@ func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, s
ServiceMetaKey: this.metadata,
})
addrs := make(map[string]struct{})
this.selectormutex.RLock()
for _, v := range this.selectors {
if clientaddrs = v.Find(ctx, spath[0], serviceMethod, args); clientaddrs != nil && len(clientaddrs) >= 0 {
for _, v1 := range clientaddrs {
@ -563,6 +578,7 @@ func (this *Service) clusterbroadcast(ctx context.Context, servicePath string, s
}
}
}
this.selectormutex.RUnlock()
if len(addrs) == 0 {
err = fmt.Errorf("on found service:%s", spath[0])
return