diff --git a/lego/base/core.go b/lego/base/core.go index 3dc29c3d2..6a4b708f5 100644 --- a/lego/base/core.go +++ b/lego/base/core.go @@ -78,4 +78,6 @@ type IRPCXService interface { RegisterFunctionName(name string, fn interface{}) (err error) RpcCall(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) RpcGo(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (call *client.Call, err error) + AcrossClusterRpcCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) + AcrossClusterRpcGo(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (call *client.Call, err error) } diff --git a/lego/base/rpcx/service.go b/lego/base/rpcx/service.go index 63677ed99..771fde1be 100644 --- a/lego/base/rpcx/service.go +++ b/lego/base/rpcx/service.go @@ -12,6 +12,7 @@ import ( "go_dreamfactory/lego/sys/event" "go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/sys/rpcx" + "github.com/smallnest/rpcx/client" ) @@ -156,3 +157,25 @@ func (this *RPCXService) RpcCall(ctx context.Context, servicePath string, servic func (this *RPCXService) RpcGo(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (call *client.Call, err error) { return rpcx.Go(ctx, servicePath, serviceMethod, args, reply, nil) } + +///跨集群 同步 执行目标远程服务方法 +//clusterTag 集群标签 +///servicePath = worker 表示采用负载的方式调用 worker类型服务执行rpc方法 +///servicePath = worker/worker_1 表示寻找目标服务节点调用rpc方法 +///servicePath = worker/!worker_1 表示选择非worker_1的节点随机选择节点执行rpc方法 +///servicePath = worker/[worker_1,worker_2] 表示随机选择[]里面的服务节点执行rpc方法 +///servicePath = worker/![worker_1,worker_2] 表示随机选择非[]里面的服务节点执行rpc方法 +func (this *RPCXService) AcrossClusterRpcCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + return rpcx.Call(ctx, servicePath, serviceMethod, args, reply) +} + +///跨集群 异步 执行目标远程服务方法 +//clusterTag 集群标签 +///servicePath = worker 表示采用负载的方式调用 worker类型服务执行rpc方法 +///servicePath = worker/worker_1 表示寻找目标服务节点调用rpc方法 +///servicePath = worker/!worker_1 表示选择非worker_1的节点随机选择节点执行rpc方法 +///servicePath = worker/[worker_1,worker_2] 表示随机选择[]里面的服务节点执行rpc方法 +///servicePath = worker/![worker_1,worker_2] 表示随机选择非[]里面的服务节点执行rpc方法 +func (this *RPCXService) AcrossClusterRpcGo(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (call *client.Call, err error) { + return rpcx.Go(ctx, servicePath, serviceMethod, args, reply, nil) +} diff --git a/lego/sys/rpcx/client.go b/lego/sys/rpcx/client.go index 64c337669..f67c99b08 100644 --- a/lego/sys/rpcx/client.go +++ b/lego/sys/rpcx/client.go @@ -18,32 +18,34 @@ import ( "github.com/smallnest/rpcx/share" ) -func newClient(options Options) (sys *Client, err error) { +func newClient(options *Options) (sys *Client, err error) { sys = &Client{ - options: options, - metadata: fmt.Sprintf("stype=%s&sid=%s&version=%s&addr=%s", options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), - clients: make(map[string]client.XClient), - conns: make(map[string]net.Conn), - connecting: make(map[string]struct{}), - serviceMap: make(map[string]*service), - msgChan: make(chan *protocol.Message, 1000), + options: options, + metadata: fmt.Sprintf("stag=%s&stype=%s&sid=%s&version=%s&addr=%s", options.ServiceTag, options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), + clients: make(map[string]client.XClient), + otherClusterClients: make(map[string]map[string]client.XClient), + conns: make(map[string]net.Conn), + connecting: make(map[string]struct{}), + serviceMap: make(map[string]*service), + msgChan: make(chan *protocol.Message, 1000), } return } type Client struct { - options Options - metadata string - writeTimeout time.Duration - AsyncWrite bool - clients map[string]client.XClient - connsMapMu sync.RWMutex - conns map[string]net.Conn - connectMapMu sync.RWMutex - connecting map[string]struct{} - serviceMapMu sync.RWMutex - serviceMap map[string]*service - msgChan chan *protocol.Message // 接收rpcXServer推送消息 + options *Options + metadata string + writeTimeout time.Duration + AsyncWrite bool + clients map[string]client.XClient + otherClusterClients map[string]map[string]client.XClient //其他集群客户端 + connsMapMu sync.RWMutex + conns map[string]net.Conn + connectMapMu sync.RWMutex + connecting map[string]struct{} + serviceMapMu sync.RWMutex + serviceMap map[string]*service + msgChan chan *protocol.Message // 接收rpcXServer推送消息 } // DoMessage 服务端消息处理 @@ -132,9 +134,10 @@ func (this *Client) Call(ctx context.Context, servicePath string, serviceMethod this.clients[spath[0]] = c } ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ - CallRoutRulesKey: servicePath, - ServiceAddrKey: "tcp@" + this.options.ServiceAddr, - ServiceMetaKey: this.metadata, + ServiceClusterTag: this.options.ServiceTag, + CallRoutRulesKey: servicePath, + ServiceAddrKey: "tcp@" + this.options.ServiceAddr, + ServiceMetaKey: this.metadata, }) err = c.Call(ctx, serviceMethod, args, reply) return @@ -164,9 +167,93 @@ func (this *Client) Go(ctx context.Context, servicePath string, serviceMethod st this.clients[spath[0]] = c } ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ - CallRoutRulesKey: servicePath, - ServiceAddrKey: "tcp@" + this.options.ServiceAddr, - ServiceMetaKey: this.metadata, + ServiceClusterTag: this.options.ServiceTag, + CallRoutRulesKey: servicePath, + ServiceAddrKey: "tcp@" + this.options.ServiceAddr, + ServiceMetaKey: this.metadata, + }) + return c.Go(ctx, string(serviceMethod), args, reply, done) +} + +//跨集群 同步调用 +func (this *Client) AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + if servicePath == "" { + err = errors.New("servicePath no cant null") + return + } + var ( + spath []string + clients map[string]client.XClient + d *client.ConsulDiscovery + c client.XClient + ok bool + ) + spath = strings.Split(servicePath, "/") + if clients, ok = this.otherClusterClients[clusterTag]; !ok { + this.otherClusterClients[clusterTag] = make(map[string]client.XClient) + clients = this.otherClusterClients[clusterTag] + } else { + if c, ok = clients[spath[0]]; !ok { + if d, err = client.NewConsulDiscovery(clusterTag, spath[0], this.options.ConsulServers, nil); err != nil { + return + } + c = client.NewBidirectionalXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption, this.msgChan) + c.GetPlugins().Add(this) + if this.options.RpcxStartType == RpcxStartByClient && this.options.AutoConnect { + c.SetSelector(newSelector(this.UpdateServer)) + } else { + c.SetSelector(newSelector(nil)) + } + clients[spath[0]] = c + } + } + ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ + ServiceClusterTag: clusterTag, + CallRoutRulesKey: servicePath, + ServiceAddrKey: "tcp@" + this.options.ServiceAddr, + ServiceMetaKey: this.metadata, + }) + err = c.Call(ctx, serviceMethod, args, reply) + return +} + +//跨集群 异步调用 +func (this *Client) AcrossClusterGo(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { + if servicePath == "" { + err = errors.New("servicePath no cant null") + return + } + var ( + spath []string + clients map[string]client.XClient + d *client.ConsulDiscovery + c client.XClient + ok bool + ) + spath = strings.Split(servicePath, "/") + if clients, ok = this.otherClusterClients[clusterTag]; !ok { + this.otherClusterClients[clusterTag] = make(map[string]client.XClient) + clients = this.otherClusterClients[clusterTag] + } else { + if c, ok = clients[spath[0]]; !ok { + if d, err = client.NewConsulDiscovery(clusterTag, spath[0], this.options.ConsulServers, nil); err != nil { + return + } + c = client.NewBidirectionalXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption, this.msgChan) + c.GetPlugins().Add(this) + if this.options.RpcxStartType == RpcxStartByClient && this.options.AutoConnect { + c.SetSelector(newSelector(this.UpdateServer)) + } else { + c.SetSelector(newSelector(nil)) + } + clients[spath[0]] = c + } + } + ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ + ServiceClusterTag: this.options.ServiceTag, + CallRoutRulesKey: servicePath, + ServiceAddrKey: "tcp@" + this.options.ServiceAddr, + ServiceMetaKey: this.metadata, }) return c.Go(ctx, string(serviceMethod), args, reply, done) } diff --git a/lego/sys/rpcx/core.go b/lego/sys/rpcx/core.go index 848247357..b76d68227 100644 --- a/lego/sys/rpcx/core.go +++ b/lego/sys/rpcx/core.go @@ -10,9 +10,10 @@ import ( ) const ( - ServiceMetaKey = "smeta" - ServiceAddrKey = "addr" - CallRoutRulesKey = "callrules" + ServiceClusterTag = "ctag" + ServiceMetaKey = "smeta" + ServiceAddrKey = "addr" + CallRoutRulesKey = "callrules" ) const RpcX_ShakeHands = "RpcX_ShakeHands" //握手 @@ -26,6 +27,8 @@ type ( UnregisterAll() (err error) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) + AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) + AcrossClusterGo(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) } ) @@ -33,18 +36,18 @@ var ( defsys ISys ) -func OnInit(config map[string]interface{}, option ...Option) (err error) { - var options Options - if options, err = newOptions(config, option...); err != nil { +func OnInit(config map[string]interface{}, opt ...Option) (err error) { + var options *Options + if options, err = newOptions(config, opt...); err != nil { return } defsys, err = newSys(options) return } -func NewSys(option ...Option) (sys ISys, err error) { - var options Options - if options, err = newOptionsByOption(option...); err != nil { +func NewSys(opt ...Option) (sys ISys, err error) { + var options *Options + if options, err = newOptionsByOption(opt...); err != nil { return } sys, err = newSys(options) @@ -92,6 +95,12 @@ func smetaToServiceNode(meta string) (node *ServiceNode, err error) { data[k] = v[0] } } + if stag, ok := data["stag"]; !ok { + err = fmt.Errorf("no found stag") + return + } else { + node.ServiceTag = stag + } if sid, ok := data["sid"]; !ok { err = fmt.Errorf("no found sid") return diff --git a/lego/sys/rpcx/options.go b/lego/sys/rpcx/options.go index c5cbae3d2..486a730c8 100644 --- a/lego/sys/rpcx/options.go +++ b/lego/sys/rpcx/options.go @@ -85,37 +85,45 @@ func SetLog(v log.ILog) Option { } } -func newOptions(config map[string]interface{}, opts ...Option) (Options, error) { - options := Options{ +func newOptions(config map[string]interface{}, opts ...Option) (options *Options, err error) { + options = &Options{ AutoConnect: true, SerializeType: protocol.MsgPack, - Debug: true, - Log: log.Clone(log.SetLoglayer(2)), } if config != nil { - mapstructure.Decode(config, &options) + mapstructure.Decode(config, options) } for _, o := range opts { - o(&options) + o(options) } if len(options.ServiceTag) == 0 || len(options.ServiceType) == 0 || len(options.ServiceId) == 0 || len(options.ConsulServers) == 0 { return options, errors.New("[Sys.RPCX] newOptions err: 启动参数异常") } + if options.Debug && options.Log == nil { + if options.Log = log.Clone(log.SetLoglayer(2)); options.Log == nil { + err = errors.New("log is nil") + } + } return options, nil } -func newOptionsByOption(opts ...Option) (Options, error) { - options := Options{ +func newOptionsByOption(opts ...Option) (options *Options, err error) { + options = &Options{ AutoConnect: true, SerializeType: protocol.MsgPack, Debug: true, Log: log.Clone(log.SetLoglayer(2)), } for _, o := range opts { - o(&options) + o(options) } if len(options.ServiceTag) == 0 || len(options.ServiceType) == 0 || len(options.ServiceId) == 0 || len(options.ConsulServers) == 0 { return options, errors.New("[Sys.RPCX] newOptions err: 启动参数异常") } + if options.Debug && options.Log == nil { + if options.Log = log.Clone(log.SetLoglayer(2)); options.Log == nil { + err = errors.New("log is nil") + } + } return options, nil } diff --git a/lego/sys/rpcx/rpcx.go b/lego/sys/rpcx/rpcx.go index 364189803..ba6e9a641 100644 --- a/lego/sys/rpcx/rpcx.go +++ b/lego/sys/rpcx/rpcx.go @@ -6,7 +6,7 @@ import ( "github.com/smallnest/rpcx/client" ) -func newSys(options Options) (sys ISys, err error) { +func newSys(options *Options) (sys ISys, err error) { if options.RpcxStartType == RpcxStartByService { //创建RPCX 服务端 sys, err = newService(options) return @@ -69,3 +69,13 @@ func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod st func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { return this.client.Go(ctx, servicePath, serviceMethod, args, reply, done) } + +//跨服同步调用 +func (this *RPCX) AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + return this.client.AcrossClusterCall(ctx, clusterTag, servicePath, serviceMethod, args, reply) +} + +//跨服异步调用 +func (this *RPCX) AcrossClusterGo(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { + return this.client.AcrossClusterGo(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) +} diff --git a/lego/sys/rpcx/selector.go b/lego/sys/rpcx/selector.go index abc89600c..bc00870c8 100644 --- a/lego/sys/rpcx/selector.go +++ b/lego/sys/rpcx/selector.go @@ -24,6 +24,7 @@ func newSelector(fn func(map[string]*ServiceNode)) *Selector { } type ServiceNode struct { + ServiceTag string `json:"stag"` //服务集群标签 ServiceId string `json:"sid"` //服务id ServiceType string `json:"stype"` //服务类型 Version string `json:"version"` //服务版本 diff --git a/lego/sys/rpcx/service.go b/lego/sys/rpcx/service.go index b8ee128d3..17da4c096 100644 --- a/lego/sys/rpcx/service.go +++ b/lego/sys/rpcx/service.go @@ -18,12 +18,12 @@ import ( "github.com/smallnest/rpcx/share" ) -func newService(options Options) (sys *Service, err error) { +func newService(options *Options) (sys *Service, err error) { sys = &Service{ options: options, - metadata: fmt.Sprintf("stype=%s&sid=%s&version=%s&addr=%s", options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), + metadata: fmt.Sprintf("stag=%s&stype=%s&sid=%s&version=%s&addr=%s", options.ServiceTag, options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), server: server.NewServer(), - selector: newSelector(nil), + selectors: make(map[string]client.Selector), clients: make(map[string]net.Conn), clientmeta: make(map[string]string), pending: make(map[uint64]*client.Call), @@ -46,10 +46,10 @@ func newService(options Options) (sys *Service, err error) { } type Service struct { - options Options + options *Options metadata string server *server.Server - selector client.Selector + selectors map[string]client.Selector clientmutex sync.Mutex clients map[string]net.Conn clientmeta map[string]string @@ -100,7 +100,7 @@ func (this *Service) Call(ctx context.Context, servicePath string, serviceMethod ) seq := new(uint64) ctx = context.WithValue(ctx, seqKey{}, seq) - if conn, done, err = this.call(ctx, servicePath, serviceMethod, args, reply, make(chan *client.Call, 1)); err != nil { + if conn, done, err = this.call(ctx, this.options.ServiceTag, servicePath, serviceMethod, args, reply, make(chan *client.Call, 1)); err != nil { return } select { @@ -130,26 +130,79 @@ func (this *Service) Call(ctx context.Context, servicePath string, serviceMethod //异步调用 远程服务 func (this *Service) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (_call *client.Call, err error) { - _, _call, err = this.call(ctx, servicePath, serviceMethod, args, reply, done) + _, _call, err = this.call(ctx, this.options.ServiceTag, servicePath, serviceMethod, args, reply, done) + return +} + +//跨服 同步调用 远程服务 +func (this *Service) AcrossClusterCall(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + var ( + done *client.Call + conn net.Conn + ) + seq := new(uint64) + ctx = context.WithValue(ctx, seqKey{}, seq) + if conn, done, err = this.call(ctx, clusterTag, servicePath, serviceMethod, args, reply, make(chan *client.Call, 1)); err != nil { + return + } + select { + case <-ctx.Done(): // cancel by context + this.mutex.Lock() + call := this.pending[*seq] + delete(this.pending, *seq) + this.mutex.Unlock() + if call != nil { + call.Error = ctx.Err() + call.Done <- call + } + return ctx.Err() + case call := <-done.Done: + err = call.Error + meta := ctx.Value(share.ResMetaDataKey) + if meta != nil && len(call.ResMetadata) > 0 { + resMeta := meta.(map[string]string) + for k, v := range call.ResMetadata { + resMeta[k] = v + } + resMeta[share.ServerAddress] = conn.RemoteAddr().String() + } + } + return +} + +//跨服 异步调用 远程服务 +func (this *Service) AcrossClusterGo(ctx context.Context, clusterTag, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (_call *client.Call, err error) { + _, _call, err = this.call(ctx, clusterTag, servicePath, serviceMethod, args, reply, done) return } //监听客户端链接到服务上 保存客户端的连接对象 func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message) error { + var ( + stag string + selector client.Selector + ok bool + ) req_metadata := ctx.Value(share.ReqMetaDataKey).(map[string]string) - if addr, ok := req_metadata[ServiceAddrKey]; ok { - if _, ok = this.clientmeta[addr]; !ok { - if smeta, ok := req_metadata[ServiceMetaKey]; ok { - servers := make(map[string]string) - this.clientmutex.Lock() - this.clientmeta[addr] = smeta - this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn) - for k, v := range this.clientmeta { - servers[k] = v + if stag, ok = req_metadata[ServiceClusterTag]; ok { + if selector, ok = this.selectors[stag]; !ok { + this.selectors[stag] = newSelector(nil) + selector = this.selectors[stag] + } + if addr, ok := req_metadata[ServiceAddrKey]; ok { + if _, ok = this.clientmeta[addr]; !ok { + if smeta, ok := req_metadata[ServiceMetaKey]; ok { + servers := make(map[string]string) + this.clientmutex.Lock() + this.clientmeta[addr] = smeta + this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn) + for k, v := range this.clientmeta { + servers[k] = v + } + this.clientmutex.Unlock() + selector.UpdateServer(servers) + this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta) } - this.clientmutex.Unlock() - this.selector.UpdateServer(servers) - this.Debugf("fond new node addr:%s smeta:%s \n", addr, smeta) } } } @@ -247,11 +300,12 @@ func (this *Service) Fatalf(format string, a ...interface{}) { } //执行远程调用 -func (this *Service) call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (conn net.Conn, _call *client.Call, err error) { +func (this *Service) call(ctx context.Context, clusterTag string, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (conn net.Conn, _call *client.Call, err error) { var ( spath []string clientaddr string metadata map[string]string + selector client.Selector ok bool ) if servicePath == "" { @@ -259,9 +313,10 @@ func (this *Service) call(ctx context.Context, servicePath string, serviceMethod return } metadata = map[string]string{ - CallRoutRulesKey: servicePath, - ServiceAddrKey: "tcp@" + this.options.ServiceAddr, - ServiceMetaKey: this.metadata, + ServiceClusterTag: clusterTag, + CallRoutRulesKey: servicePath, + ServiceAddrKey: "tcp@" + this.options.ServiceAddr, + ServiceMetaKey: this.metadata, } spath = strings.Split(servicePath, "/") ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ @@ -269,7 +324,10 @@ func (this *Service) call(ctx context.Context, servicePath string, serviceMethod ServiceAddrKey: "tcp@" + this.options.ServiceAddr, ServiceMetaKey: this.metadata, }) - if clientaddr = this.selector.Select(ctx, spath[0], serviceMethod, args); clientaddr == "" { + if selector, ok = this.selectors[clusterTag]; !ok { + err = fmt.Errorf("on found serviceTag:%s", clusterTag) + } + if clientaddr = selector.Select(ctx, spath[0], serviceMethod, args); clientaddr == "" { err = fmt.Errorf("on found servicePath:%s", servicePath) return } diff --git a/modules/gateway/agent.go b/modules/gateway/agent.go index 07fad20ce..188e8b2f2 100644 --- a/modules/gateway/agent.go +++ b/modules/gateway/agent.go @@ -7,6 +7,7 @@ import ( "go_dreamfactory/comm" "go_dreamfactory/pb" "go_dreamfactory/utils" + "strings" "sync" "sync/atomic" "time" @@ -64,13 +65,13 @@ func (this *Agent) readLoop() { locp: for { if _, data, err = this.wsConn.ReadMessage(); err != nil { - log.Errorf("agent:%s uId:%s ReadMessage err:%v", this.sessionId, this.uId, err) + this.gateway.Errorf("agent:%s uId:%s ReadMessage err:%v", this.sessionId, this.uId, err) go this.Close() break locp } if err = proto.Unmarshal(data, msg); err != nil { - log.Errorf("agent:%s uId:%s Unmarshal err:%v", this.sessionId, this.uId, err) + this.gateway.Errorf("agent:%s uId:%s Unmarshal err:%v", this.sessionId, this.uId, err) go this.Close() break locp } else { @@ -93,7 +94,7 @@ locp: } } } - log.Debugf("agent:%s uId:%s readLoop end!", this.sessionId, this.uId) + this.gateway.Debugf("agent:%s uId:%s readLoop end!", this.sessionId, this.uId) } func (this *Agent) writeLoop() { @@ -111,7 +112,7 @@ locp: if ok { data, err = proto.Marshal(msg) if err = this.wsConn.WriteMessage(websocket.BinaryMessage, data); err != nil { - log.Errorf("agent:%s uId:%d WriteMessage err:%v", this.sessionId, this.uId, err) + this.gateway.Errorf("agent:%s uId:%d WriteMessage err:%v", this.sessionId, this.uId, err) go this.Close() } } else { @@ -119,7 +120,7 @@ locp: } } } - log.Debugf("agent:%s uId:%s writeLoop end!", this.sessionId, this.uId) + this.gateway.Debugf("agent:%s uId:%s writeLoop end!", this.sessionId, this.uId) } //安全认证 所有协议 @@ -213,28 +214,58 @@ func (this *Agent) Close() { //分发用户消息 func (this *Agent) messageDistribution(msg *pb.UserMessage) (err error) { - reply := &pb.RPCMessageReply{} - log.Debugf("agent:%s uId:%s MessageDistribution msg:%s.%s", this.sessionId, this.uId, msg.MainType, msg.SubType) + var ( + reply *pb.RPCMessageReply = &pb.RPCMessageReply{} + serviceTag string = "" + servicePath string = comm.Service_Worker + rule string + ok bool + ) - servicePath := comm.Service_Worker - if rule, ok := this.gateway.GetMsgDistribute(msg.MainType, msg.SubType); ok { - servicePath = rule + this.gateway.Debugf("agent:%s uId:%s MessageDistribution msg:%s.%s", this.sessionId, this.uId, msg.MainType, msg.SubType) + if rule, ok = this.gateway.GetMsgDistribute(msg.MainType, msg.SubType); ok { + paths := strings.Split(rule, "/") + if len(paths) == 3 { + serviceTag = paths[0] + servicePath = fmt.Sprintf("%s/%s", paths[1], paths[2]) + } else if len(paths) < 3 && len(paths) > 0 { + servicePath = rule + } else { + this.gateway.Errorf("messageDistribution rule is empty!") + return + } } else { - if len(this.wId) > 0 { + if len(this.wId) > 0 { //已经绑定worker 服务器 servicePath = fmt.Sprintf("%s/%s", comm.Service_Worker, this.wId) } } - if err = this.gateway.Service().RpcCall(context.Background(), servicePath, string(comm.Rpc_GatewayRoute), &pb.AgentMessage{ - Ip: this.IP(), - UserSessionId: this.sessionId, - UserId: this.uId, - GatewayServiceId: this.gateway.Service().GetId(), - MainType: msg.MainType, - SubType: msg.SubType, - Message: msg.Data, - }, reply); err != nil { - log.Errorf("agent:%s uId:%s MessageDistribution err:%v", this.sessionId, this.uId, err) - return + + if len(serviceTag) == 0 { + if err = this.gateway.Service().RpcCall(context.Background(), servicePath, string(comm.Rpc_GatewayRoute), &pb.AgentMessage{ + Ip: this.IP(), + UserSessionId: this.sessionId, + UserId: this.uId, + GatewayServiceId: this.gateway.Service().GetId(), + MainType: msg.MainType, + SubType: msg.SubType, + Message: msg.Data, + }, reply); err != nil { + this.gateway.Errorf("agent:%s uId:%s MessageDistribution err:%v", this.sessionId, this.uId, err) + return + } + } else { //跨集群调用 + if err = this.gateway.Service().AcrossClusterRpcCall(context.Background(), serviceTag, servicePath, string(comm.Rpc_GatewayRoute), &pb.AgentMessage{ + Ip: this.IP(), + UserSessionId: this.sessionId, + UserId: this.uId, + GatewayServiceId: this.gateway.Service().GetId(), + MainType: msg.MainType, + SubType: msg.SubType, + Message: msg.Data, + }, reply); err != nil { + this.gateway.Errorf("agent:%s uId:%s MessageDistribution err:%v", this.sessionId, this.uId, err) + return + } } if reply.Code != pb.ErrorCode_Success { data, _ := anypb.New(&pb.NotifyErrorNotifyPush{ReqMainType: msg.MainType, ReqSubType: msg.SubType, Code: pb.ErrorCode(reply.Code.Number())}) diff --git a/modules/gateway/core.go b/modules/gateway/core.go index 7fd4749a6..3605f84dd 100644 --- a/modules/gateway/core.go +++ b/modules/gateway/core.go @@ -5,6 +5,7 @@ import ( "go_dreamfactory/lego/base" "go_dreamfactory/lego/core" + "go_dreamfactory/lego/sys/log" ) type ( @@ -22,6 +23,7 @@ type ( // IGateway 网关模块 接口定义 IGateway interface { core.IModule + log.Ilogf Service() base.IRPCXService Connect(a IAgent) DisConnect(a IAgent) diff --git a/modules/gateway/module.go b/modules/gateway/module.go index 7e7019342..634689e6c 100644 --- a/modules/gateway/module.go +++ b/modules/gateway/module.go @@ -1,6 +1,7 @@ package gateway import ( + "fmt" "go_dreamfactory/comm" "go_dreamfactory/lego/base" @@ -22,6 +23,7 @@ func NewModule() core.IModule { type Gateway struct { cbase.ModuleBase + options *Options service base.IRPCXService // rpcx服务接口 主要client->server wsService *WSServiceComp // websocket服务 监听websocket连接 agentMgr *AgentMgrComp // 客户端websocket连接管理 @@ -46,6 +48,7 @@ func (this *Gateway) Service() base.IRPCXService { // Init 模块初始化函数 func (this *Gateway) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) { err = this.ModuleBase.Init(service, module, options) + this.options = options.(*Options) this.service = service.(base.IRPCXService) return } @@ -98,3 +101,35 @@ func (this *Gateway) DisConnect(a IAgent) { func (this *Gateway) GetMsgDistribute(mtype, stype string) (rule string, ok bool) { return this.configure.GetMsgDistribute(mtype, stype) } + +//日志 +func (this *Gateway) Debugf(format string, a ...interface{}) { + if this.options.GetDebug() { + this.options.GetLog().Debugf(fmt.Sprintf("[Module:%s] ", this.GetType())+format, a...) + } +} +func (this *Gateway) Infof(format string, a ...interface{}) { + if this.options.GetDebug() { + this.options.GetLog().Infof(fmt.Sprintf("[Module:%s] ", this.GetType())+format, a...) + } +} +func (this *Gateway) Warnf(format string, a ...interface{}) { + if this.options.Debug { + this.options.GetLog().Warnf(fmt.Sprintf("[Module:%s] ", this.GetType())+format, a...) + } +} +func (this *Gateway) Errorf(format string, a ...interface{}) { + if this.options.GetLog() != nil { + this.options.GetLog().Errorf(fmt.Sprintf("[Module:%s] ", this.GetType())+format, a...) + } +} +func (this *Gateway) Panicf(format string, a ...interface{}) { + if this.options.GetLog() != nil { + this.options.GetLog().Panicf(fmt.Sprintf("[Module:%s] ", this.GetType())+format, a...) + } +} +func (this *Gateway) Fatalf(format string, a ...interface{}) { + if this.options.GetLog() != nil { + this.options.GetLog().Fatalf(fmt.Sprintf("[Module:%s] ", this.GetType())+format, a...) + } +} diff --git a/modules/gateway/options.go b/modules/gateway/options.go index 12075275f..6fb187a13 100644 --- a/modules/gateway/options.go +++ b/modules/gateway/options.go @@ -2,6 +2,7 @@ package gateway import ( "go_dreamfactory/lego/utils/mapstructure" + "go_dreamfactory/modules" ) /* @@ -10,7 +11,7 @@ import ( type ( Options struct { - Debug bool //日志开关 + modules.Options GinDebug bool //web引擎日志开关 ListenPort int //websocket 监听端口 } @@ -19,6 +20,9 @@ type ( // LoadConfig 配置文件序列化为Options func (this *Options) LoadConfig(settings map[string]interface{}) (err error) { if settings != nil { + if err = this.Options.LoadConfig(settings); err != nil { + return + } err = mapstructure.Decode(settings, this) } return diff --git a/modules/modulebase.go b/modules/modulebase.go index 52b611ccc..81bd1d956 100644 --- a/modules/modulebase.go +++ b/modules/modulebase.go @@ -253,11 +253,17 @@ func (this *ModuleBase) Warnf(format string, a ...interface{}) { } } func (this *ModuleBase) Errorf(format string, a ...interface{}) { - this.options.GetLog().Errorf(fmt.Sprintf("[Module:%s] ", this.module.GetType())+format, a...) + if this.options.GetLog() != nil { + this.options.GetLog().Errorf(fmt.Sprintf("[Module:%s] ", this.module.GetType())+format, a...) + } } func (this *ModuleBase) Panicf(format string, a ...interface{}) { - this.options.GetLog().Panicf(fmt.Sprintf("[Module:%s] ", this.module.GetType())+format, a...) + if this.options.GetLog() != nil { + this.options.GetLog().Panicf(fmt.Sprintf("[Module:%s] ", this.module.GetType())+format, a...) + } } func (this *ModuleBase) Fatalf(format string, a ...interface{}) { - this.options.GetLog().Fatalf(fmt.Sprintf("[Module:%s] ", this.module.GetType())+format, a...) + if this.options.GetLog() != nil { + this.options.GetLog().Fatalf(fmt.Sprintf("[Module:%s] ", this.module.GetType())+format, a...) + } } diff --git a/utils/base64.go b/utils/base64.go index f67b82908..207ed098b 100644 --- a/utils/base64.go +++ b/utils/base64.go @@ -28,7 +28,7 @@ func ValidSecretKey(secStr string) bool { clientMd5Key := secStr[3:35] rawmsg := secStr[35:] - log.Debugf("data base: %s", rawmsg) - serverMd5Key := MD5Str(rawmsg) //这里可以再加上客户端和服务端的秘钥再MD5 + // log.Debugf("data base: %s", rawmsg) + serverMd5Key := MD5Str(rawmsg) //这里可以再加上客户端和服务端的秘钥再MD5 return strings.EqualFold(strings.ToLower(serverMd5Key), strings.ToLower(clientMd5Key)) }