修复上传消息推送bug以及服务发现bug

This commit is contained in:
liwei1dao 2022-07-21 11:56:22 +08:00
parent 28d91331f5
commit 253386266f
5 changed files with 128 additions and 65 deletions

View File

@ -20,9 +20,8 @@ import (
func newClient(options *Options) (sys *Client, err error) {
sys = &Client{
options: options,
metadata: fmt.Sprintf("stag=%s&stype=%s&sid=%s&version=%s&addr=%s", options.ServiceTag, options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr),
// clients: make(map[string]client.XClient),
options: options,
metadata: fmt.Sprintf("stag=%s&stype=%s&sid=%s&version=%s&addr=%s", options.ServiceTag, options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr),
clusterClients: make(map[string]map[string]client.XClient),
conns: make(map[string]net.Conn),
connecting: make(map[string]struct{}),
@ -33,11 +32,10 @@ func newClient(options *Options) (sys *Client, err error) {
}
type Client struct {
options *Options
metadata string
writeTimeout time.Duration
AsyncWrite bool
// clients map[string]client.XClient
options *Options
metadata string
writeTimeout time.Duration
AsyncWrite bool
clusterClients map[string]map[string]client.XClient //其他集群客户端
connsMapMu sync.RWMutex
conns map[string]net.Conn
@ -52,19 +50,21 @@ type Client struct {
func (this *Client) DoMessage() {
for msg := range this.msgChan {
go func(req *protocol.Message) {
this.Debugf("DoMessage ServicePath:%s ServiceMethod:%s", req.ServicePath, req.ServiceMethod)
addr, ok := req.Metadata[ServiceAddrKey]
if !ok {
this.Errorf("Metadata no found ServiceAddrKey!")
return
if req.ServicePath != "" && req.ServiceMethod != "" {
this.Debugf("DoMessage :%v", req)
addr, ok := req.Metadata[ServiceAddrKey]
if !ok {
this.Errorf("Metadata no found ServiceAddrKey!")
return
}
conn, ok := this.conns[addr]
if !ok {
this.Errorf("no found conn addr:%s", addr)
return
}
res, _ := this.handleRequest(context.Background(), req)
this.sendResponse(conn, req, res)
}
conn, ok := this.conns[addr]
if !ok {
this.Errorf("no found conn addr:%s", addr)
return
}
res, _ := this.handleRequest(context.Background(), req)
this.sendResponse(conn, req, res)
}(msg)
}
}
@ -242,6 +242,7 @@ func (this *Client) UpdateServer(servers map[string]*ServiceNode) {
this.connecting[v.ServiceAddr] = struct{}{}
this.connectMapMu.Unlock()
if err := this.Call(context.Background(), fmt.Sprintf("%s/%s", v.ServiceType, v.ServiceId), RpcX_ShakeHands, &ServiceNode{
ServiceTag: this.options.ServiceTag,
ServiceId: this.options.ServiceId,
ServiceType: this.options.ServiceType,
ServiceAddr: this.options.ServiceAddr},
@ -250,6 +251,8 @@ func (this *Client) UpdateServer(servers map[string]*ServiceNode) {
this.connectMapMu.Lock()
delete(this.connecting, v.ServiceAddr)
this.connectMapMu.Unlock()
} else {
this.Debugf("UpdateServer addr:%s ", v.ServiceAddr)
}
}
}
@ -262,9 +265,6 @@ func (this *Client) ClientConnected(conn net.Conn) (net.Conn, error) {
this.connsMapMu.Lock()
this.conns[addr] = conn
this.connsMapMu.Unlock()
this.connectMapMu.Lock()
delete(this.connecting, addr)
this.connectMapMu.Unlock()
this.Debugf("ClientConnected addr:%v", addr)
return conn, nil
}

View File

@ -194,42 +194,105 @@ func (this *Service) ClusterBroadcast(ctx context.Context, servicePath string, s
return
}
// func (this *Service) PreReadRequest(ctx context.Context) error {
// var (
// stag string
// selector client.Selector
// ok bool
// )
// req_metadata, ok := ctx.Value(share.ReqMetaDataKey).(map[string]string)
// this.Debugf("PreReadRequest Meta:%v ", ctx.Value(share.ReqMetaDataKey))
// if stag, ok = req_metadata[ServiceClusterTag]; ok {
// if selector, ok = this.selectors[stag]; !ok {
// this.selectors[stag] = newSelector(nil)
// selector = this.selectors[stag]
// }
// if addr, ok := req_metadata[ServiceAddrKey]; ok {
// if _, ok = this.clientmeta[addr]; !ok {
// if smeta, ok := req_metadata[ServiceMetaKey]; ok {
// servers := make(map[string]string)
// this.clientmutex.Lock()
// this.clientmeta[addr] = smeta
// this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn)
// for k, v := range this.clientmeta {
// servers[k] = v
// }
// this.clientmutex.Unlock()
// selector.UpdateServer(servers)
// this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta)
// }
// }
// }
// }
// return nil
// }
//监听客户端链接到服务上 保存客户端的连接对象
func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message) error {
var (
stag string
selector client.Selector
ok bool
)
req_metadata := ctx.Value(share.ReqMetaDataKey).(map[string]string)
if stag, ok = req_metadata[ServiceClusterTag]; ok {
if selector, ok = this.selectors[stag]; !ok {
this.selectors[stag] = newSelector(nil)
selector = this.selectors[stag]
}
if addr, ok := req_metadata[ServiceAddrKey]; ok {
if _, ok = this.clientmeta[addr]; !ok {
if smeta, ok := req_metadata[ServiceMetaKey]; ok {
servers := make(map[string]string)
this.clientmutex.Lock()
this.clientmeta[addr] = smeta
this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn)
for k, v := range this.clientmeta {
servers[k] = v
}
this.clientmutex.Unlock()
selector.UpdateServer(servers)
this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta)
}
}
}
}
return nil
}
// func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message) error {
// var (
// stag string
// selector client.Selector
// ok bool
// )
// req_metadata := ctx.Value(share.ReqMetaDataKey).(map[string]string)
// this.Debugf("PreHandleRequest ServicePath:%s ServiceMethod:%s Meta:%v ", r.ServicePath, r.ServiceMethod, ctx.Value(share.ReqMetaDataKey))
// if stag, ok = req_metadata[ServiceClusterTag]; ok {
// if selector, ok = this.selectors[stag]; !ok {
// this.selectors[stag] = newSelector(nil)
// selector = this.selectors[stag]
// }
// if addr, ok := req_metadata[ServiceAddrKey]; ok {
// if _, ok = this.clientmeta[addr]; !ok {
// if smeta, ok := req_metadata[ServiceMetaKey]; ok {
// servers := make(map[string]string)
// this.clientmutex.Lock()
// this.clientmeta[addr] = smeta
// this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn)
// for k, v := range this.clientmeta {
// servers[k] = v
// }
// this.clientmutex.Unlock()
// selector.UpdateServer(servers)
// this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta)
// }
// }
// }
// }
// return nil
// }
//监控rpc连接收到的请求消息 处理消息回调请求
func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e error) error {
if isCallMessage := (r.MessageType() == protocol.Request); isCallMessage {
var (
stag string
selector client.Selector
ok bool
)
req_metadata := r.Metadata
this.Debugf("PreReadRequest ServicePath:%s ServicePath:%s Metadata:%v ", r.ServicePath, r.ServiceMethod, r.Metadata)
if stag, ok = req_metadata[ServiceClusterTag]; ok {
if selector, ok = this.selectors[stag]; !ok {
this.selectors[stag] = newSelector(nil)
selector = this.selectors[stag]
}
if addr, ok := req_metadata[ServiceAddrKey]; ok {
if _, ok = this.clientmeta[addr]; !ok {
if smeta, ok := req_metadata[ServiceMetaKey]; ok {
servers := make(map[string]string)
this.clientmutex.Lock()
this.clientmeta[addr] = smeta
this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn)
for k, v := range this.clientmeta {
servers[k] = v
}
this.clientmutex.Unlock()
selector.UpdateServer(servers)
this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta)
}
}
}
}
return nil
}
e = errors.New("is callMessage")
@ -326,8 +389,8 @@ func (this *Service) call(ctx context.Context, clusterTag string, servicePath st
selector client.Selector
ok bool
)
if servicePath == "" {
err = errors.New("servicePath no cant null")
if clusterTag == "" || servicePath == "" {
err = fmt.Errorf("clusterTag:%s servicePath:%s no cant null", clusterTag, servicePath)
return
}
metadata = map[string]string{

View File

@ -81,6 +81,7 @@ func (this *apiComp) Login(session comm.IUserSession, req *pb.UserLoginReq) (cod
err = this.module.modelSession.Change(user.Uid, map[string]interface{}{
"uid": user.Uid,
"sessionId": session.GetSessionId(),
"serviceTag": session.GetServiecTag(),
"gatewayServiceId": session.GetGatewayServiceId(),
"ip": session.GetIP(),
},

View File

@ -2,12 +2,11 @@ syntax = "proto3";
option go_package = ".;pb";
message CacheUser {
string uid = 1; //id
string SessionId = 2; //id
string ServiceTag = 3; // id
string GatewayServiceId = 4; //id
string ip = 5; //ip
// DB_UserData UserData = 4; //@go_tags(`json:",inline"`)
string uid = 1; //@go_tags(`json:"uid"`) id
string SessionId = 2; //@go_tags(`json:"sessionId"`) id
string ServiceTag = 3; //@go_tags(`json:"serviceTag"`) id
string GatewayServiceId = 4; //@go_tags(`json:"gatewayServiceId"`) id
string ip = 5; //@go_tags(`json:"ip"`) ip
}
message DBUser {

View File

@ -26,9 +26,9 @@ type CacheUser struct {
unknownFields protoimpl.UnknownFields
Uid string `protobuf:"bytes,1,opt,name=uid,proto3" json:"uid"` //用户id
SessionId string `protobuf:"bytes,2,opt,name=SessionId,proto3" json:"SessionId"` //会话id
ServiceTag string `protobuf:"bytes,3,opt,name=ServiceTag,proto3" json:"ServiceTag"` //所在服务集群 区服id
GatewayServiceId string `protobuf:"bytes,4,opt,name=GatewayServiceId,proto3" json:"GatewayServiceId"` //所在网关服务id
SessionId string `protobuf:"bytes,2,opt,name=SessionId,proto3" json:"sessionId"` //会话id
ServiceTag string `protobuf:"bytes,3,opt,name=ServiceTag,proto3" json:"serviceTag"` //所在服务集群 区服id
GatewayServiceId string `protobuf:"bytes,4,opt,name=GatewayServiceId,proto3" json:"gatewayServiceId"` //所在网关服务id
Ip string `protobuf:"bytes,5,opt,name=ip,proto3" json:"ip"` //远程ip
}