From 253386266f792189d767cd23fd2fc2271327d77a Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Thu, 21 Jul 2022 11:56:22 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=B8=8A=E4=BC=A0=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=8E=A8=E9=80=81bug=E4=BB=A5=E5=8F=8A=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E5=8F=91=E7=8E=B0bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lego/sys/rpcx/client.go | 46 ++++++------- lego/sys/rpcx/service.go | 129 +++++++++++++++++++++++++++--------- modules/user/api_login.go | 1 + pb/proto/user/user_db.proto | 11 ++- pb/user_db.pb.go | 6 +- 5 files changed, 128 insertions(+), 65 deletions(-) diff --git a/lego/sys/rpcx/client.go b/lego/sys/rpcx/client.go index 2d746f807..eed13bf6c 100644 --- a/lego/sys/rpcx/client.go +++ b/lego/sys/rpcx/client.go @@ -20,9 +20,8 @@ import ( func newClient(options *Options) (sys *Client, err error) { sys = &Client{ - options: options, - metadata: fmt.Sprintf("stag=%s&stype=%s&sid=%s&version=%s&addr=%s", options.ServiceTag, options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), - // clients: make(map[string]client.XClient), + options: options, + metadata: fmt.Sprintf("stag=%s&stype=%s&sid=%s&version=%s&addr=%s", options.ServiceTag, options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), clusterClients: make(map[string]map[string]client.XClient), conns: make(map[string]net.Conn), connecting: make(map[string]struct{}), @@ -33,11 +32,10 @@ func newClient(options *Options) (sys *Client, err error) { } type Client struct { - options *Options - metadata string - writeTimeout time.Duration - AsyncWrite bool - // clients map[string]client.XClient + options *Options + metadata string + writeTimeout time.Duration + AsyncWrite bool clusterClients map[string]map[string]client.XClient //其他集群客户端 connsMapMu sync.RWMutex conns map[string]net.Conn @@ -52,19 +50,21 @@ type Client struct { func (this *Client) DoMessage() { for msg := range this.msgChan { go func(req *protocol.Message) { - this.Debugf("DoMessage ServicePath:%s ServiceMethod:%s", req.ServicePath, req.ServiceMethod) - addr, ok := req.Metadata[ServiceAddrKey] - if !ok { - this.Errorf("Metadata no found ServiceAddrKey!") - return + if req.ServicePath != "" && req.ServiceMethod != "" { + this.Debugf("DoMessage :%v", req) + addr, ok := req.Metadata[ServiceAddrKey] + if !ok { + this.Errorf("Metadata no found ServiceAddrKey!") + return + } + conn, ok := this.conns[addr] + if !ok { + this.Errorf("no found conn addr:%s", addr) + return + } + res, _ := this.handleRequest(context.Background(), req) + this.sendResponse(conn, req, res) } - conn, ok := this.conns[addr] - if !ok { - this.Errorf("no found conn addr:%s", addr) - return - } - res, _ := this.handleRequest(context.Background(), req) - this.sendResponse(conn, req, res) }(msg) } } @@ -242,6 +242,7 @@ func (this *Client) UpdateServer(servers map[string]*ServiceNode) { this.connecting[v.ServiceAddr] = struct{}{} this.connectMapMu.Unlock() if err := this.Call(context.Background(), fmt.Sprintf("%s/%s", v.ServiceType, v.ServiceId), RpcX_ShakeHands, &ServiceNode{ + ServiceTag: this.options.ServiceTag, ServiceId: this.options.ServiceId, ServiceType: this.options.ServiceType, ServiceAddr: this.options.ServiceAddr}, @@ -250,6 +251,8 @@ func (this *Client) UpdateServer(servers map[string]*ServiceNode) { this.connectMapMu.Lock() delete(this.connecting, v.ServiceAddr) this.connectMapMu.Unlock() + } else { + this.Debugf("UpdateServer addr:%s ", v.ServiceAddr) } } } @@ -262,9 +265,6 @@ func (this *Client) ClientConnected(conn net.Conn) (net.Conn, error) { this.connsMapMu.Lock() this.conns[addr] = conn this.connsMapMu.Unlock() - this.connectMapMu.Lock() - delete(this.connecting, addr) - this.connectMapMu.Unlock() this.Debugf("ClientConnected addr:%v", addr) return conn, nil } diff --git a/lego/sys/rpcx/service.go b/lego/sys/rpcx/service.go index f58217054..d9bd0e8dc 100644 --- a/lego/sys/rpcx/service.go +++ b/lego/sys/rpcx/service.go @@ -194,42 +194,105 @@ func (this *Service) ClusterBroadcast(ctx context.Context, servicePath string, s return } +// func (this *Service) PreReadRequest(ctx context.Context) error { +// var ( +// stag string +// selector client.Selector +// ok bool +// ) +// req_metadata, ok := ctx.Value(share.ReqMetaDataKey).(map[string]string) +// this.Debugf("PreReadRequest Meta:%v ", ctx.Value(share.ReqMetaDataKey)) +// if stag, ok = req_metadata[ServiceClusterTag]; ok { +// if selector, ok = this.selectors[stag]; !ok { +// this.selectors[stag] = newSelector(nil) +// selector = this.selectors[stag] +// } +// if addr, ok := req_metadata[ServiceAddrKey]; ok { +// if _, ok = this.clientmeta[addr]; !ok { +// if smeta, ok := req_metadata[ServiceMetaKey]; ok { +// servers := make(map[string]string) +// this.clientmutex.Lock() +// this.clientmeta[addr] = smeta +// this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn) +// for k, v := range this.clientmeta { +// servers[k] = v +// } +// this.clientmutex.Unlock() +// selector.UpdateServer(servers) +// this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta) +// } +// } +// } +// } +// return nil +// } + //监听客户端链接到服务上 保存客户端的连接对象 -func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message) error { - var ( - stag string - selector client.Selector - ok bool - ) - req_metadata := ctx.Value(share.ReqMetaDataKey).(map[string]string) - if stag, ok = req_metadata[ServiceClusterTag]; ok { - if selector, ok = this.selectors[stag]; !ok { - this.selectors[stag] = newSelector(nil) - selector = this.selectors[stag] - } - if addr, ok := req_metadata[ServiceAddrKey]; ok { - if _, ok = this.clientmeta[addr]; !ok { - if smeta, ok := req_metadata[ServiceMetaKey]; ok { - servers := make(map[string]string) - this.clientmutex.Lock() - this.clientmeta[addr] = smeta - this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn) - for k, v := range this.clientmeta { - servers[k] = v - } - this.clientmutex.Unlock() - selector.UpdateServer(servers) - this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta) - } - } - } - } - return nil -} +// func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message) error { +// var ( +// stag string +// selector client.Selector +// ok bool +// ) +// req_metadata := ctx.Value(share.ReqMetaDataKey).(map[string]string) +// this.Debugf("PreHandleRequest ServicePath:%s ServiceMethod:%s Meta:%v ", r.ServicePath, r.ServiceMethod, ctx.Value(share.ReqMetaDataKey)) +// if stag, ok = req_metadata[ServiceClusterTag]; ok { +// if selector, ok = this.selectors[stag]; !ok { +// this.selectors[stag] = newSelector(nil) +// selector = this.selectors[stag] +// } +// if addr, ok := req_metadata[ServiceAddrKey]; ok { +// if _, ok = this.clientmeta[addr]; !ok { +// if smeta, ok := req_metadata[ServiceMetaKey]; ok { +// servers := make(map[string]string) +// this.clientmutex.Lock() +// this.clientmeta[addr] = smeta +// this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn) +// for k, v := range this.clientmeta { +// servers[k] = v +// } +// this.clientmutex.Unlock() +// selector.UpdateServer(servers) +// this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta) +// } +// } +// } +// } +// return nil +// } //监控rpc连接收到的请求消息 处理消息回调请求 func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e error) error { if isCallMessage := (r.MessageType() == protocol.Request); isCallMessage { + var ( + stag string + selector client.Selector + ok bool + ) + req_metadata := r.Metadata + this.Debugf("PreReadRequest ServicePath:%s ServicePath:%s Metadata:%v ", r.ServicePath, r.ServiceMethod, r.Metadata) + if stag, ok = req_metadata[ServiceClusterTag]; ok { + if selector, ok = this.selectors[stag]; !ok { + this.selectors[stag] = newSelector(nil) + selector = this.selectors[stag] + } + if addr, ok := req_metadata[ServiceAddrKey]; ok { + if _, ok = this.clientmeta[addr]; !ok { + if smeta, ok := req_metadata[ServiceMetaKey]; ok { + servers := make(map[string]string) + this.clientmutex.Lock() + this.clientmeta[addr] = smeta + this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn) + for k, v := range this.clientmeta { + servers[k] = v + } + this.clientmutex.Unlock() + selector.UpdateServer(servers) + this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta) + } + } + } + } return nil } e = errors.New("is callMessage") @@ -326,8 +389,8 @@ func (this *Service) call(ctx context.Context, clusterTag string, servicePath st selector client.Selector ok bool ) - if servicePath == "" { - err = errors.New("servicePath no cant null") + if clusterTag == "" || servicePath == "" { + err = fmt.Errorf("clusterTag:%s servicePath:%s no cant null", clusterTag, servicePath) return } metadata = map[string]string{ diff --git a/modules/user/api_login.go b/modules/user/api_login.go index 6cdc4a090..6dd4eb0a1 100644 --- a/modules/user/api_login.go +++ b/modules/user/api_login.go @@ -81,6 +81,7 @@ func (this *apiComp) Login(session comm.IUserSession, req *pb.UserLoginReq) (cod err = this.module.modelSession.Change(user.Uid, map[string]interface{}{ "uid": user.Uid, "sessionId": session.GetSessionId(), + "serviceTag": session.GetServiecTag(), "gatewayServiceId": session.GetGatewayServiceId(), "ip": session.GetIP(), }, diff --git a/pb/proto/user/user_db.proto b/pb/proto/user/user_db.proto index 1ff69d0e4..ece622ea8 100644 --- a/pb/proto/user/user_db.proto +++ b/pb/proto/user/user_db.proto @@ -2,12 +2,11 @@ syntax = "proto3"; option go_package = ".;pb"; message CacheUser { - string uid = 1; //用户id - string SessionId = 2; //会话id - string ServiceTag = 3; //所在服务集群 区服id - string GatewayServiceId = 4; //所在网关服务id - string ip = 5; //远程ip - // DB_UserData UserData = 4; //@go_tags(`json:",inline"`) + string uid = 1; //@go_tags(`json:"uid"`) 用户id + string SessionId = 2; //@go_tags(`json:"sessionId"`) 会话id + string ServiceTag = 3; //@go_tags(`json:"serviceTag"`) 所在服务集群 区服id + string GatewayServiceId = 4; //@go_tags(`json:"gatewayServiceId"`) 所在网关服务id + string ip = 5; //@go_tags(`json:"ip"`) 远程ip } message DBUser { diff --git a/pb/user_db.pb.go b/pb/user_db.pb.go index 05d5af8b1..d47f1ee01 100644 --- a/pb/user_db.pb.go +++ b/pb/user_db.pb.go @@ -26,9 +26,9 @@ type CacheUser struct { unknownFields protoimpl.UnknownFields Uid string `protobuf:"bytes,1,opt,name=uid,proto3" json:"uid"` //用户id - SessionId string `protobuf:"bytes,2,opt,name=SessionId,proto3" json:"SessionId"` //会话id - ServiceTag string `protobuf:"bytes,3,opt,name=ServiceTag,proto3" json:"ServiceTag"` //所在服务集群 区服id - GatewayServiceId string `protobuf:"bytes,4,opt,name=GatewayServiceId,proto3" json:"GatewayServiceId"` //所在网关服务id + SessionId string `protobuf:"bytes,2,opt,name=SessionId,proto3" json:"sessionId"` //会话id + ServiceTag string `protobuf:"bytes,3,opt,name=ServiceTag,proto3" json:"serviceTag"` //所在服务集群 区服id + GatewayServiceId string `protobuf:"bytes,4,opt,name=GatewayServiceId,proto3" json:"gatewayServiceId"` //所在网关服务id Ip string `protobuf:"bytes,5,opt,name=ip,proto3" json:"ip"` //远程ip }