From ea84fb817cbff6bead43137532bc42ff8cadf6ee Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Fri, 17 Jun 2022 17:32:25 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0rpc=20=E5=8F=8C=E5=90=91?= =?UTF-8?q?=E9=80=9A=E4=BF=A1=E5=B0=81=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lego/base/core.go | 1 - lego/base/rpcx/service.go | 6 - lego/sys/rpcx/client.go | 408 +++++++++++++++++++++++++++++++++++-- lego/sys/rpcx/core.go | 17 +- lego/sys/rpcx/options.go | 22 +- lego/sys/rpcx/pools.go | 68 +++++++ lego/sys/rpcx/rpcx.go | 84 ++------ lego/sys/rpcx/rpcx_test.go | 144 +------------ lego/sys/rpcx/service.go | 356 +++++++++++++++++++++++++------- lego/sys/rpcx/utils.go | 87 ++++++++ services/worker/main.go | 2 - 11 files changed, 869 insertions(+), 326 deletions(-) create mode 100644 lego/sys/rpcx/pools.go create mode 100644 lego/sys/rpcx/utils.go diff --git a/lego/base/core.go b/lego/base/core.go index 372f62a16..3dc29c3d2 100644 --- a/lego/base/core.go +++ b/lego/base/core.go @@ -74,7 +74,6 @@ type IRPCXServiceSession interface { type IRPCXService interface { IClusterServiceBase - Register(rcvr interface{}) (err error) RegisterFunction(fn interface{}) (err error) RegisterFunctionName(name string, fn interface{}) (err error) RpcCall(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) diff --git a/lego/base/rpcx/service.go b/lego/base/rpcx/service.go index d2e774891..2df101975 100644 --- a/lego/base/rpcx/service.go +++ b/lego/base/rpcx/service.go @@ -121,12 +121,6 @@ func (this *RPCXService) Destroy() (err error) { return } -//注册服务对象 -func (this *RPCXService) Register(rcvr interface{}) (err error) { - err = rpcx.Register(rcvr) - return -} - //注册服务方法 func (this *RPCXService) RegisterFunction(fn interface{}) (err error) { err = rpcx.RegisterFunction(fn) diff --git a/lego/sys/rpcx/client.go b/lego/sys/rpcx/client.go index 440a2ea67..1e52de33f 100644 --- a/lego/sys/rpcx/client.go +++ b/lego/sys/rpcx/client.go @@ -3,45 +3,105 @@ package rpcx import ( "context" "errors" + "fmt" + "net" + "reflect" + "runtime" "strings" + "sync" + "time" + "unicode" + "unicode/utf8" "github.com/smallnest/rpcx/client" + "github.com/smallnest/rpcx/protocol" "github.com/smallnest/rpcx/share" ) -func newClient(rpcx *RPCX) (c *Client) { - c = &Client{ - rpcx: rpcx, - clients: make(map[string]client.XClient), - // msgChan: make(chan *protocol.Message, 1000), +func newClient(options Options) (sys *Client, err error) { + sys = &Client{ + options: options, + metadata: fmt.Sprintf("stype=%s&sid=%s&version=%s&addr=%s", options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), + clients: make(map[string]client.XClient), + conns: make(map[string]net.Conn), + serviceMap: make(map[string]*service), + msgChan: make(chan *protocol.Message, 1000), } return } type Client struct { - rpcx *RPCX - clients map[string]client.XClient - // msgChan chan *protocol.Message // 接收rpcXServer推送消息 + options Options + metadata string + writeTimeout time.Duration + AsyncWrite bool + clients map[string]client.XClient + connsMapMu sync.RWMutex + conns map[string]net.Conn + serviceMapMu sync.RWMutex + serviceMap map[string]*service + msgChan chan *protocol.Message // 接收rpcXServer推送消息 } // DoMessage 服务端消息处理 func (this *Client) DoMessage() { - // for msg := range this.msgChan { - - // } + for msg := range this.msgChan { + go func(req *protocol.Message) { + this.Debugf("DoMessage ServicePath:%s ServiceMethod:%s", req.ServicePath, req.ServiceMethod) + addr, ok := req.Metadata[ServiceAddrKey] + if !ok { + this.Errorf("Metadata no found ServiceAddrKey!") + return + } + conn, ok := this.conns[addr] + if !ok { + this.Errorf("no found conn addr:%s", addr) + return + } + res, _ := this.handleRequest(context.Background(), req) + this.sendResponse(conn, req, res) + }(msg) + } } +//启动RPC 服务 接收消息处理 func (this *Client) Start() (err error) { + go this.DoMessage() return } +//停止RPC 服务 func (this *Client) Stop() (err error) { for _, v := range this.clients { v.Close() } + close(this.msgChan) //关闭消息处理 return } +//注册Rpc 服务 +func (this *Client) RegisterFunction(fn interface{}) (err error) { + _, err = this.registerFunction(this.options.ServiceType, fn, "", false) + if err != nil { + return err + } + return +} + +//注册Rpc 服务 +func (this *Client) RegisterFunctionName(name string, fn interface{}) (err error) { + _, err = this.registerFunction(this.options.ServiceType, fn, name, true) + if err != nil { + return err + } + return +} + +//注销 暂不处理 +func (this *Client) UnregisterAll() (err error) { + return nil +} + //同步调用 func (this *Client) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { if servicePath == "" { @@ -56,17 +116,19 @@ func (this *Client) Call(ctx context.Context, servicePath string, serviceMethod ) spath = strings.Split(servicePath, "/") if c, ok = this.clients[spath[0]]; !ok { - if d, err = client.NewConsulDiscovery(this.rpcx.options.ServiceTag, spath[0], this.rpcx.options.ConsulServers, nil); err != nil { + if d, err = client.NewConsulDiscovery(this.options.ServiceTag, spath[0], this.options.ConsulServers, nil); err != nil { return } - c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption) + c = client.NewBidirectionalXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption, this.msgChan) + c.GetPlugins().Add(this) c.SetSelector(newSelector()) this.clients[spath[0]] = c + } ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ CallRoutRulesKey: servicePath, - ServiceAddrKey: "tcp@" + this.rpcx.options.ServiceAddr, - ServiceMetaKey: this.rpcx.metadata, + ServiceAddrKey: "tcp@" + this.options.ServiceAddr, + ServiceMetaKey: this.metadata, }) err = c.Call(ctx, serviceMethod, args, reply) return @@ -87,17 +149,327 @@ func (this *Client) Go(ctx context.Context, servicePath string, serviceMethod st ) spath = strings.Split(servicePath, "/") if c, ok = this.clients[spath[0]]; !ok { - if d, err = client.NewConsulDiscovery(this.rpcx.options.ServiceTag, spath[0], this.rpcx.options.ConsulServers, nil); err != nil { + if d, err = client.NewConsulDiscovery(this.options.ServiceTag, spath[0], this.options.ConsulServers, nil); err != nil { return } c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption) + c.GetPlugins().Add(this) c.SetSelector(newSelector()) this.clients[spath[0]] = c } ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ CallRoutRulesKey: servicePath, - ServiceAddrKey: "tcp@" + this.rpcx.options.ServiceAddr, - ServiceMetaKey: this.rpcx.metadata, + ServiceAddrKey: "tcp@" + this.options.ServiceAddr, + ServiceMetaKey: this.metadata, }) return c.Go(ctx, string(serviceMethod), args, reply, done) } + +//监控连接建立 +func (this *Client) ClientConnected(conn net.Conn) (net.Conn, error) { + addr := "tcp@" + conn.RemoteAddr().String() + this.connsMapMu.Lock() + this.conns[addr] = conn + this.connsMapMu.Unlock() + return conn, nil +} + +//监听连接关闭 +func (this *Client) ClientConnectionClose(conn net.Conn) error { + addr := "tcp@" + conn.RemoteAddr().String() + this.connsMapMu.Lock() + delete(this.conns, addr) + this.connsMapMu.Unlock() + return nil +} + +///日志*********************************************************************** +func (this *Client) Debug() bool { + return this.options.Debug +} + +func (this *Client) Debugf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Debugf("[SYS RPCX] "+format, a...) + } +} + +func (this *Client) Infof(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Infof("[SYS RPCX] "+format, a...) + } +} + +func (this *Client) Warnf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Warnf("[SYS RPCX] "+format, a...) + } +} + +func (this *Client) Errorf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Errorf("[SYS RPCX] "+format, a...) + } +} + +func (this *Client) Panicf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Panicf("[SYS RPCX] "+format, a...) + } +} + +func (this *Client) Fatalf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Fatalf("[SYS RPCX] "+format, a...) + } +} + +//注册服务方法 +func (this *Client) registerFunction(servicePath string, fn interface{}, name string, useName bool) (string, error) { + this.serviceMapMu.Lock() + defer this.serviceMapMu.Unlock() + + ss := this.serviceMap[servicePath] + if ss == nil { + ss = new(service) + ss.name = servicePath + ss.function = make(map[string]*functionType) + } + + f, ok := fn.(reflect.Value) + if !ok { + f = reflect.ValueOf(fn) + } + if f.Kind() != reflect.Func { + return "", errors.New("function must be func or bound method") + } + + fname := runtime.FuncForPC(reflect.Indirect(f).Pointer()).Name() + if fname != "" { + i := strings.LastIndex(fname, ".") + if i >= 0 { + fname = fname[i+1:] + } + } + if useName { + fname = name + } + if fname == "" { + errorStr := "rpcx.registerFunction: no func name for type " + f.Type().String() + this.Errorf(errorStr) + return fname, errors.New(errorStr) + } + + t := f.Type() + if t.NumIn() != 3 { + return fname, fmt.Errorf("rpcx.registerFunction: has wrong number of ins: %s", f.Type().String()) + } + if t.NumOut() != 1 { + return fname, fmt.Errorf("rpcx.registerFunction: has wrong number of outs: %s", f.Type().String()) + } + + // First arg must be context.Context + ctxType := t.In(0) + if !ctxType.Implements(typeOfContext) { + return fname, fmt.Errorf("function %s must use context as the first parameter", f.Type().String()) + } + + argType := t.In(1) + if !isExportedOrBuiltinType(argType) { + return fname, fmt.Errorf("function %s parameter type not exported: %v", f.Type().String(), argType) + } + + replyType := t.In(2) + if replyType.Kind() != reflect.Ptr { + return fname, fmt.Errorf("function %s reply type not a pointer: %s", f.Type().String(), replyType) + } + if !isExportedOrBuiltinType(replyType) { + return fname, fmt.Errorf("function %s reply type not exported: %v", f.Type().String(), replyType) + } + + // The return type of the method must be error. + if returnType := t.Out(0); returnType != typeOfError { + return fname, fmt.Errorf("function %s returns %s, not error", f.Type().String(), returnType.String()) + } + + // Install the methods + ss.function[fname] = &functionType{fn: f, ArgType: argType, ReplyType: replyType} + this.serviceMap[servicePath] = ss + + // init pool for reflect.Type of args and reply + reflectTypePools.Init(argType) + reflectTypePools.Init(replyType) + return fname, nil +} + +//处理远程服务请求 +func (this *Client) handleRequest(ctx context.Context, req *protocol.Message) (res *protocol.Message, err error) { + serviceName := req.ServicePath + methodName := req.ServiceMethod + + res = req.Clone() + res.SetMessageType(protocol.Response) + this.serviceMapMu.RLock() + service := this.serviceMap[serviceName] + this.serviceMapMu.RUnlock() + if service == nil { + err = errors.New("rpcx: can't find service " + serviceName) + return handleError(res, err) + } + mtype := service.method[methodName] + if mtype == nil { + if service.function[methodName] != nil { // check raw functions + return this.handleRequestForFunction(ctx, req) + } + err = errors.New("rpcx: can't find method " + methodName) + return handleError(res, err) + } + // get a argv object from object pool + argv := reflectTypePools.Get(mtype.ArgType) + + codec := share.Codecs[req.SerializeType()] + if codec == nil { + err = fmt.Errorf("can not find codec for %d", req.SerializeType()) + return handleError(res, err) + } + err = codec.Decode(req.Payload, argv) + if err != nil { + return handleError(res, err) + } + + // and get a reply object from object pool + replyv := reflectTypePools.Get(mtype.ReplyType) + if mtype.ArgType.Kind() != reflect.Ptr { + err = service.call(ctx, mtype, reflect.ValueOf(argv).Elem(), reflect.ValueOf(replyv)) + } else { + err = service.call(ctx, mtype, reflect.ValueOf(argv), reflect.ValueOf(replyv)) + } + reflectTypePools.Put(mtype.ArgType, argv) + if err != nil { + if replyv != nil { + data, err := codec.Encode(replyv) + // return reply to object pool + reflectTypePools.Put(mtype.ReplyType, replyv) + if err != nil { + return handleError(res, err) + } + res.Payload = data + } + return handleError(res, err) + } + + if !req.IsOneway() { + data, err := codec.Encode(replyv) + // return reply to object pool + reflectTypePools.Put(mtype.ReplyType, replyv) + if err != nil { + return handleError(res, err) + } + res.Payload = data + } else if replyv != nil { + reflectTypePools.Put(mtype.ReplyType, replyv) + } + this.Debugf("server called service %+v for an request %+v", service, req) + return res, nil +} + +//处理远程服务请求 for 方法 +func (this *Client) handleRequestForFunction(ctx context.Context, req *protocol.Message) (res *protocol.Message, err error) { + res = req.Clone() + + res.SetMessageType(protocol.Response) + + serviceName := req.ServicePath + methodName := req.ServiceMethod + this.serviceMapMu.RLock() + service := this.serviceMap[serviceName] + this.serviceMapMu.RUnlock() + if service == nil { + err = errors.New("rpcx: can't find service for func raw function") + return handleError(res, err) + } + mtype := service.function[methodName] + if mtype == nil { + err = errors.New("rpcx: can't find method " + methodName) + return handleError(res, err) + } + + argv := reflectTypePools.Get(mtype.ArgType) + + codec := share.Codecs[req.SerializeType()] + if codec == nil { + err = fmt.Errorf("can not find codec for %d", req.SerializeType()) + return handleError(res, err) + } + + err = codec.Decode(req.Payload, argv) + if err != nil { + return handleError(res, err) + } + + replyv := reflectTypePools.Get(mtype.ReplyType) + + if mtype.ArgType.Kind() != reflect.Ptr { + err = service.callForFunction(ctx, mtype, reflect.ValueOf(argv).Elem(), reflect.ValueOf(replyv)) + } else { + err = service.callForFunction(ctx, mtype, reflect.ValueOf(argv), reflect.ValueOf(replyv)) + } + + reflectTypePools.Put(mtype.ArgType, argv) + + if err != nil { + reflectTypePools.Put(mtype.ReplyType, replyv) + return handleError(res, err) + } + + if !req.IsOneway() { + data, err := codec.Encode(replyv) + reflectTypePools.Put(mtype.ReplyType, replyv) + if err != nil { + return handleError(res, err) + } + res.Payload = data + } else if replyv != nil { + reflectTypePools.Put(mtype.ReplyType, replyv) + } + + return res, nil +} + +//发送远程服务调用 回应消息 +func (this *Client) sendResponse(conn net.Conn, req, res *protocol.Message) { + if len(res.Payload) > 1024 && req.CompressType() != protocol.None { + res.SetCompressType(req.CompressType()) + } + data := res.EncodeSlicePointer() + if this.writeTimeout != 0 { + conn.SetWriteDeadline(time.Now().Add(this.writeTimeout)) + } + conn.Write(*data) + protocol.PutData(data) +} + +//请求错误 封装回应消息 +func handleError(res *protocol.Message, err error) (*protocol.Message, error) { + res.SetMessageStatusType(protocol.Error) + if res.Metadata == nil { + res.Metadata = make(map[string]string) + } + res.Metadata[protocol.ServiceError] = err.Error() + return res, err +} + +//服务注册 类型判断 +func isExportedOrBuiltinType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + // PkgPath will be non-empty even for an exported type, + // so we need to check the type name as well. + return isExported(t.Name()) || t.PkgPath() == "" +} + +func isExported(name string) bool { + rune, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(rune) +} diff --git a/lego/sys/rpcx/core.go b/lego/sys/rpcx/core.go index cfd0a0d1c..5368e45ec 100644 --- a/lego/sys/rpcx/core.go +++ b/lego/sys/rpcx/core.go @@ -16,21 +16,12 @@ const ( type ( ISys interface { - IRPCXServer - IRPCXClient - } - IRPCXServer interface { Start() (err error) Stop() (err error) - Register(rcvr interface{}) (err error) + // Register(rcvr interface{}) (err error) RegisterFunction(fn interface{}) (err error) RegisterFunctionName(name string, fn interface{}) (err error) UnregisterAll() (err error) - } - - IRPCXClient interface { - Start() (err error) - Stop() (err error) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) } @@ -66,9 +57,9 @@ func Stop() (err error) { return defsys.Stop() } -func Register(rcvr interface{}) (err error) { - return defsys.Register(rcvr) -} +// func Register(rcvr interface{}) (err error) { +// return defsys.Register(rcvr) +// } func RegisterFunction(fn interface{}) (err error) { return defsys.RegisterFunction(fn) } diff --git a/lego/sys/rpcx/options.go b/lego/sys/rpcx/options.go index 675fdced9..c7c188460 100644 --- a/lego/sys/rpcx/options.go +++ b/lego/sys/rpcx/options.go @@ -4,6 +4,8 @@ import ( "errors" "go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/utils/mapstructure" + + "github.com/smallnest/rpcx/protocol" ) type RpcxStartType int8 @@ -23,7 +25,8 @@ type Options struct { ServiceAddr string //服务地址 ConsulServers []string //Consul集群服务地址 RpcxStartType RpcxStartType //Rpcx启动类型 - Debug bool //日志是否开启 + SerializeType protocol.SerializeType + Debug bool //日志是否开启 Log log.ILog } @@ -63,6 +66,13 @@ func SetConsulServers(v []string) Option { } } +//设置启动类型 +func SetRpcxStartType(v RpcxStartType) Option { + return func(o *Options) { + o.RpcxStartType = v + } +} + func SetDebug(v bool) Option { return func(o *Options) { o.Debug = v @@ -76,8 +86,9 @@ func SetLog(v log.ILog) Option { func newOptions(config map[string]interface{}, opts ...Option) (Options, error) { options := Options{ - Debug: true, - Log: log.Clone(log.SetLoglayer(2)), + SerializeType: protocol.MsgPack, + Debug: true, + Log: log.Clone(log.SetLoglayer(2)), } if config != nil { mapstructure.Decode(config, &options) @@ -93,8 +104,9 @@ func newOptions(config map[string]interface{}, opts ...Option) (Options, error) func newOptionsByOption(opts ...Option) (Options, error) { options := Options{ - Debug: true, - Log: log.Clone(log.SetLoglayer(2)), + SerializeType: protocol.MsgPack, + Debug: true, + Log: log.Clone(log.SetLoglayer(2)), } for _, o := range opts { o(&options) diff --git a/lego/sys/rpcx/pools.go b/lego/sys/rpcx/pools.go new file mode 100644 index 000000000..32fb732ce --- /dev/null +++ b/lego/sys/rpcx/pools.go @@ -0,0 +1,68 @@ +package rpcx + +import ( + "reflect" + "sync" +) + +var UsePool bool + +type Reset interface { + Reset() +} + +var reflectTypePools = &typePools{ + pools: make(map[reflect.Type]*sync.Pool), + New: func(t reflect.Type) interface{} { + var argv reflect.Value + + if t.Kind() == reflect.Ptr { // reply must be ptr + argv = reflect.New(t.Elem()) + } else { + argv = reflect.New(t) + } + + return argv.Interface() + }, +} + +type typePools struct { + mu sync.RWMutex + pools map[reflect.Type]*sync.Pool + New func(t reflect.Type) interface{} +} + +func (p *typePools) Init(t reflect.Type) { + tp := &sync.Pool{} + tp.New = func() interface{} { + return p.New(t) + } + p.mu.Lock() + defer p.mu.Unlock() + p.pools[t] = tp +} + +func (p *typePools) Put(t reflect.Type, x interface{}) { + if !UsePool { + return + } + if o, ok := x.(Reset); ok { + o.Reset() + } + + p.mu.RLock() + pool := p.pools[t] + p.mu.RUnlock() + pool.Put(x) +} + +func (p *typePools) Get(t reflect.Type) interface{} { + if !UsePool { + return p.New(t) + } + p.mu.RLock() + pool := p.pools[t] + p.mu.RUnlock() + + return pool.Get() +} diff --git a/lego/sys/rpcx/rpcx.go b/lego/sys/rpcx/rpcx.go index 65ff7ae93..364189803 100644 --- a/lego/sys/rpcx/rpcx.go +++ b/lego/sys/rpcx/rpcx.go @@ -2,33 +2,36 @@ package rpcx import ( "context" - "fmt" "github.com/smallnest/rpcx/client" ) -func newSys(options Options) (sys *RPCX, err error) { - sys = &RPCX{ - options: options, - metadata: fmt.Sprintf("stype=%s&sid=%s&version=%s&addr=%s", options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), +func newSys(options Options) (sys ISys, err error) { + if options.RpcxStartType == RpcxStartByService { //创建RPCX 服务端 + sys, err = newService(options) + return } - sys.service, err = newService(sys) - sys.client = newClient(sys) - // if options.RpcxStartType == RpcxStartByAll || options.RpcxStartType == RpcxStartByService { //创建RPCX 服务端 - // } - - // if options.RpcxStartType == RpcxStartByAll || options.RpcxStartType == RpcxStartByClient { //创建RPCX 客户端 - - // } + if options.RpcxStartType == RpcxStartByClient { //创建RPCX 客户端 + sys, err = newClient(options) + return + } + var ( + service ISys + client ISys + ) + service, err = newService(options) + client, err = newClient(options) + sys = &RPCX{ + service: service, + client: client, + } return } type RPCX struct { - options Options - metadata string - service IRPCXServer - client IRPCXClient + service ISys + client ISys } func (this *RPCX) Start() (err error) { @@ -43,11 +46,6 @@ func (this *RPCX) Stop() (err error) { return } -func (this *RPCX) Register(rcvr interface{}) (err error) { - this.service.Register(rcvr) - return -} - func (this *RPCX) RegisterFunction(fn interface{}) (err error) { this.service.RegisterFunction(fn) return @@ -71,45 +69,3 @@ func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod st func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { return this.client.Go(ctx, servicePath, serviceMethod, args, reply, done) } - -// func (this *RPCX) PostCall(ctx context.Context, serviceName, methodName string, args, reply interface{}) (interface{}, error) { -// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) -// fmt.Printf("PostCall servicePath:%v serviceMethod:%v RemoteAddr:%v \n", serviceName, methodName, clientConn.RemoteAddr().String()) -// return args, nil -// } - -///日志*********************************************************************** -func (this *RPCX) Debug() bool { - return this.options.Debug -} - -func (this *RPCX) Debugf(format string, a ...interface{}) { - if this.options.Debug { - this.options.Log.Debugf("[SYS RPCX] "+format, a...) - } -} -func (this *RPCX) Infof(format string, a ...interface{}) { - if this.options.Debug { - this.options.Log.Infof("[SYS RPCX] "+format, a...) - } -} -func (this *RPCX) Warnf(format string, a ...interface{}) { - if this.options.Debug { - this.options.Log.Warnf("[SYS RPCX] "+format, a...) - } -} -func (this *RPCX) Errorf(format string, a ...interface{}) { - if this.options.Debug { - this.options.Log.Errorf("[SYS RPCX] "+format, a...) - } -} -func (this *RPCX) Panicf(format string, a ...interface{}) { - if this.options.Debug { - this.options.Log.Panicf("[SYS RPCX] "+format, a...) - } -} -func (this *RPCX) Fatalf(format string, a ...interface{}) { - if this.options.Debug { - this.options.Log.Fatalf("[SYS RPCX] "+format, a...) - } -} diff --git a/lego/sys/rpcx/rpcx_test.go b/lego/sys/rpcx/rpcx_test.go index fc470b2c1..2d7618601 100644 --- a/lego/sys/rpcx/rpcx_test.go +++ b/lego/sys/rpcx/rpcx_test.go @@ -3,22 +3,13 @@ package rpcx import ( "context" "fmt" - "net" "os" "os/signal" - "regexp" "syscall" "testing" "time" "go_dreamfactory/lego/sys/log" - - "github.com/rcrowley/go-metrics" - "github.com/smallnest/rpcx/client" - "github.com/smallnest/rpcx/protocol" - "github.com/smallnest/rpcx/server" - "github.com/smallnest/rpcx/serverplugin" - "github.com/smallnest/rpcx/share" ) func Test_Sys(t *testing.T) { @@ -37,7 +28,7 @@ func Test_Sys(t *testing.T) { fmt.Printf("err:%v", err) return } else { - if err = sys.Register(new(Arith)); err != nil { + if err = sys.RegisterFunction(RpcxTestHandle); err != nil { fmt.Printf("err:%v", err) return } @@ -62,142 +53,15 @@ func Test_Sys(t *testing.T) { } } -var addr = "127.0.0.1:9978" - -// go server.go -func Test_RPCX(t *testing.T) { - s := server.NewServer() - if err := addRegistryPlugin(s); err != nil { - fmt.Printf("err:%v", err) - return - } - go func() { - time.Sleep(time.Second) - s.RegisterName("worker", new(Arith), "stype=worker&sid=worker_1&version=1.0.0&addr=tcp@127.0.0.1:9978") - }() - - go func() { - time.Sleep(time.Second * 3) - if d, err := client.NewConsulDiscovery("rpcx_test", "worker", []string{"10.0.0.9:8500"}, nil); err != nil { - fmt.Printf("NewConsulDiscovery err:%v", err) - return - } else { - xclient := client.NewXClient("worker", client.Failfast, client.RandomSelect, d, client.DefaultOption) - xclient.SetSelector(newSelector()) - ctx := context.WithValue(context.Background(), share.ReqMetaDataKey, map[string]string{"RoutRules": "worker/worker_1"}) - if err = xclient.Call(ctx, "Mul", &Args{A: 1, B: 2}, &Reply{}); err != nil { - fmt.Printf("Call:%v \n", err) - return - } - } - - }() - - s.Serve("tcp", addr) -} - -func addRegistryPlugin(s *server.Server) (err error) { - r := &serverplugin.ConsulRegisterPlugin{ - ServiceAddress: "tcp@" + addr, - ConsulServers: []string{"10.0.0.9:8500"}, - BasePath: "rpcx_test", - Metrics: metrics.NewRegistry(), - UpdateInterval: time.Minute, - } - err = r.Start() - if err != nil { - return - } - s.Plugins.Add(r) - s.Plugins.Add(&call{}) - return -} - -type call struct{} - -// func (this *call) PostCall(ctx context.Context, serviceName, methodName string, args, reply interface{}) (interface{}, error) { -// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) -// RoutRules := ctx.Value("RoutRules") -// fmt.Printf("PostCall servicePath:%v serviceMethod:%v RoutRules:%s RemoteAddr:%v \n", serviceName, methodName, RoutRules, clientConn.RemoteAddr().String()) -// return args, nil -// } - -// func (this *call) PreCall(ctx context.Context, serviceName, methodName string, args interface{}) (interface{}, error) { -// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) -// RoutRules := ctx.Value("RoutRules").(string) -// fmt.Printf("PostCall servicePath:%v serviceMethod:%v RoutRules:%s RemoteAddr:%v \n", serviceName, methodName, RoutRules, clientConn.RemoteAddr().String()) -// return args, nil -// } - -// func (this *call) PreReadRequest(ctx context.Context) error { -// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) -// RoutRules := ctx.Value(share.ReqMetaDataKey).(map[string]string) -// fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String()) -// return nil -// } - -// func (this *call) PreWriteResponse(ctx context.Context, args *protocol.Message, repy *protocol.Message, errInter error) error { -// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) -// RoutRules := ctx.Value("RoutRules").(string) -// fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String()) -// return nil -// } - -func (this *call) PreHandleRequest(ctx context.Context, r *protocol.Message) error { - clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) - RoutRules := ctx.Value(share.ReqMetaDataKey).(map[string]string) - fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String()) - return nil -} - type Args struct { A int B int } - type Reply struct { - C int + Error string } -type Arith int - -func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error { - reply.C = args.A * args.B - fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C) +func RpcxTestHandle(ctx context.Context, args *Args, reply *Reply) error { + fmt.Printf("A:%d + B:%d = %d", args.A, args.B, args.A+args.B) return nil } - -func (t *Arith) Add(ctx context.Context, args *Args, reply *Reply) error { - reply.C = args.A + args.B - fmt.Printf("call: %d + %d = %d\n", args.A, args.B, reply.C) - return nil -} - -func (t *Arith) Say(ctx context.Context, args *string, reply *string) error { - *reply = "hello " + *args - return nil -} - -///正则测试 -func Test_Regular(t *testing.T) { - // str1 := "worker" - // str2 := "worker/worker_1" - // str3 := "worker/!worker_1" - // str4 := "worker/[worker_1,worker_2]" - // str5 := "worker/![worker_1,worker_2]" - - // reg1 := regexp.MustCompile(`/`) - //根据规则提取关键信息 - // result1 := reg1.FindAllStringSubmatch(str1, -1) - // fmt.Println("result1 = ", result1) - // strings.Split() - - str := "worker/!worker_1" - // rex := regexp.MustCompile(`\!\[([^)]+)\]`) - rex := regexp.MustCompile(`\!([^)]+)`) - out := rex.FindAllStringSubmatch(str, -1) - - for _, i := range out { - fmt.Println(i[1]) - } -} diff --git a/lego/sys/rpcx/service.go b/lego/sys/rpcx/service.go index 05b1062da..ebb991570 100644 --- a/lego/sys/rpcx/service.go +++ b/lego/sys/rpcx/service.go @@ -2,136 +2,338 @@ package rpcx import ( "context" - "go_dreamfactory/lego/sys/log" + "errors" + "fmt" + "log" "net" + "strings" + "sync" "time" "github.com/rcrowley/go-metrics" "github.com/smallnest/rpcx/client" + "github.com/smallnest/rpcx/protocol" "github.com/smallnest/rpcx/server" "github.com/smallnest/rpcx/serverplugin" + "github.com/smallnest/rpcx/share" ) -func newService(rpcx *RPCX) (s *Service, err error) { - s = &Service{ - server: server.NewServer(), - rpcx: rpcx, - // clients: make(map[string]net.Conn), - // clientmeta: make(map[string]string), +func newService(options Options) (sys *Service, err error) { + sys = &Service{ + options: options, + metadata: fmt.Sprintf("stype=%s&sid=%s&version=%s&addr=%s", options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), + server: server.NewServer(), + selector: newSelector(), + clients: make(map[string]net.Conn), + clientmeta: make(map[string]string), + pending: make(map[uint64]*client.Call), } r := &serverplugin.ConsulRegisterPlugin{ - ServiceAddress: "tcp@" + rpcx.options.ServiceAddr, - ConsulServers: rpcx.options.ConsulServers, - BasePath: rpcx.options.ServiceTag, + ServiceAddress: "tcp@" + options.ServiceAddr, + ConsulServers: options.ConsulServers, + BasePath: options.ServiceTag, Metrics: metrics.NewRegistry(), UpdateInterval: time.Minute, } if err = r.Start(); err != nil { return } - s.server.Plugins.Add(r) + sys.server.Plugins.Add(r) + sys.server.Plugins.Add(sys) return } type Service struct { - rpcx *RPCX - server *server.Server - selector client.Selector - clients map[string]net.Conn - clientmeta map[string]string + options Options + metadata string + server *server.Server + selector client.Selector + clientmutex sync.Mutex + clients map[string]net.Conn + clientmeta map[string]string + mutex sync.Mutex // protects following + seq uint64 + pending map[uint64]*client.Call } +//RPC 服务启动 func (this *Service) Start() (err error) { - go func() { - if err = this.server.Serve("tcp", this.rpcx.options.ServiceAddr); err != nil { - log.Errorf("rpcx server exit!") + if err = this.server.Serve("tcp", this.options.ServiceAddr); err != nil { + this.Errorf("rpcx server exit!") } }() return } +//服务停止 func (this *Service) Stop() (err error) { err = this.server.Close() return } -func (this *Service) Register(rcvr interface{}) (err error) { - err = this.server.RegisterName(this.rpcx.options.ServiceType, rcvr, this.rpcx.metadata) - return -} - +//注册RPC 服务 func (this *Service) RegisterFunction(fn interface{}) (err error) { - err = this.server.RegisterFunction(this.rpcx.options.ServiceType, fn, this.rpcx.metadata) + err = this.server.RegisterFunction(this.options.ServiceType, fn, this.metadata) return } + +//注册RPC 服务 func (this *Service) RegisterFunctionName(name string, fn interface{}) (err error) { - err = this.server.RegisterFunctionName(this.rpcx.options.ServiceType, name, fn, this.rpcx.metadata) + err = this.server.RegisterFunctionName(this.options.ServiceType, name, fn, this.metadata) return } + +//注销 暂时不处理 func (this *Service) UnregisterAll() (err error) { - err = this.server.UnregisterAll() + // err = this.server.UnregisterAll() return } -//同步调用 -func (this *Service) Call(servicePath string, ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) { - // var ( - // spath string - // clientaddr string - // conn net.Conn - // ok bool - // ) - // if servicePath == "" { - // err = errors.New("servicePath no cant null") - // return - // } - // spath := strings.Split(servicePath, "/") - // ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ - // CallRoutRulesKey: servicePath, - // ServiceAddrKey: "tcp@" + this.options.ServiceAddr, - // ServiceMetaKey: this.metadata, - // }) - // if clientaddr = this.selector.Select(ctx, spath[0], serviceMethod, args); clientaddr == "" { - // err = fmt.Errorf("on found routRules:%s", routRules) - // return - // } - // if conn, ok = this.clients[clientaddr]; !ok { - // err = fmt.Errorf("on found clientaddr:%s", clientaddr) - // return - // } - // err := this.server.SendMessage(conn, spath[0], serviceMethod, nil, []byte("abcde")){ - - // } +//同步调用远程服务 +func (this *Service) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + var ( + done *client.Call + conn net.Conn + ) + seq := new(uint64) + ctx = context.WithValue(ctx, seqKey{}, seq) + if conn, done, err = this.call(ctx, servicePath, serviceMethod, args, reply, make(chan *client.Call, 1)); err != nil { + return + } + select { + case <-ctx.Done(): // cancel by context + this.mutex.Lock() + call := this.pending[*seq] + delete(this.pending, *seq) + this.mutex.Unlock() + if call != nil { + call.Error = ctx.Err() + call.Done <- call + } + return ctx.Err() + case call := <-done.Done: + err = call.Error + meta := ctx.Value(share.ResMetaDataKey) + if meta != nil && len(call.ResMetadata) > 0 { + resMeta := meta.(map[string]string) + for k, v := range call.ResMetadata { + resMeta[k] = v + } + resMeta[share.ServerAddress] = conn.RemoteAddr().String() + } + } return } -//异步调用 -func (this *Service) Go(routRules string, ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) { +//异步调用 远程服务 +func (this *Service) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (_call *client.Call, err error) { + _, _call, err = this.call(ctx, servicePath, serviceMethod, args, reply, done) return } -//发现服务 -func (this *Service) Discovery(addr string, conn net.Conn, meta string) { - this.clientmeta[addr] = meta - this.clients[addr] = conn - this.selector.UpdateServer(this.clientmeta) +//监听客户端链接到服务上 保存客户端的连接对象 +func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message) error { + req_metadata := ctx.Value(share.ReqMetaDataKey).(map[string]string) + if addr, ok := req_metadata[ServiceAddrKey]; ok { + if _, ok = this.clientmeta[addr]; !ok { + if smeta, ok := req_metadata[ServiceMetaKey]; ok { + servers := make(map[string]string) + this.clientmutex.Lock() + this.clientmeta[addr] = smeta + this.clients[addr] = ctx.Value(server.RemoteConnContextKey).(net.Conn) + for k, v := range this.clientmeta { + servers[k] = v + } + this.clientmutex.Unlock() + this.selector.UpdateServer(servers) + this.Debugf("PreReadRequest addr:%s smeta:%s \n", addr, smeta) + } + } + } + return nil } -// //监听客户端链接到服务上 保存客户端的连接对象 -// func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message) error { -// if smeta, ok := ctx.Value(share.ReqMetaDataKey).(map[string]string)[ServiceAddrKey]; ok { -// // log.Errorf("smeta:%s err:%v", smeta, ok) -// if node, err := smetaToServiceNode(smeta); err == nil { -// if _, ok = this.clientmeta[node.ServiceId]; !ok { -// this.clientmeta[node.ServiceId] = smeta -// } -// } -// } +//监控rpc连接收到的请求消息 处理消息回调请求 +func (this *Service) PostReadRequest(ctx context.Context, r *protocol.Message, e error) error { + if isCallMessage := (r.MessageType() == protocol.Request); isCallMessage { + return nil + } + e = errors.New("is callMessage") + seq := r.Seq() + this.mutex.Lock() + call := this.pending[seq] + delete(this.pending, seq) + this.mutex.Unlock() + switch { + case call == nil: + this.Errorf("callmessage no found call:%d", seq) + case r.MessageStatusType() == protocol.Error: + if len(r.Metadata) > 0 { + call.ResMetadata = r.Metadata + call.Error = errors.New(r.Metadata[protocol.ServiceError]) + } + if len(r.Payload) > 0 { + data := r.Payload + codec := share.Codecs[r.SerializeType()] + if codec != nil { + _ = codec.Decode(data, call.Reply) + } + } + call.Done <- call + default: + data := r.Payload + if len(data) > 0 { + codec := share.Codecs[r.SerializeType()] + if codec == nil { + call.Error = errors.New(client.ErrUnsupportedCodec.Error()) + } else { + err := codec.Decode(data, call.Reply) + if err != nil { + call.Error = err + } + } + } + if len(r.Metadata) > 0 { + call.ResMetadata = r.Metadata + } + call.Done <- call + } + return nil +} -// // clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) +///日志*********************************************************************** +func (this *Service) Debug() bool { + return this.options.Debug +} -// // fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String()) -// return nil -// } +func (this *Service) Debugf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Debugf("[SYS RPCX] "+format, a...) + } +} +func (this *Service) Infof(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Infof("[SYS RPCX] "+format, a...) + } +} +func (this *Service) Warnf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Warnf("[SYS RPCX] "+format, a...) + } +} +func (this *Service) Errorf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Errorf("[SYS RPCX] "+format, a...) + } +} +func (this *Service) Panicf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Panicf("[SYS RPCX] "+format, a...) + } +} +func (this *Service) Fatalf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Fatalf("[SYS RPCX] "+format, a...) + } +} + +//执行远程调用 +func (this *Service) call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (conn net.Conn, _call *client.Call, err error) { + var ( + spath []string + clientaddr string + metadata map[string]string + ok bool + ) + if servicePath == "" { + err = errors.New("servicePath no cant null") + return + } + metadata = map[string]string{ + CallRoutRulesKey: servicePath, + ServiceAddrKey: "tcp@" + this.options.ServiceAddr, + ServiceMetaKey: this.metadata, + } + spath = strings.Split(servicePath, "/") + ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ + CallRoutRulesKey: servicePath, + ServiceAddrKey: "tcp@" + this.options.ServiceAddr, + ServiceMetaKey: this.metadata, + }) + if clientaddr = this.selector.Select(ctx, spath[0], serviceMethod, args); clientaddr == "" { + err = fmt.Errorf("on found servicePath:%s", servicePath) + return + } + if conn, ok = this.clients[clientaddr]; !ok { + err = fmt.Errorf("on found clientaddr:%s", clientaddr) + return + } + + _call = new(client.Call) + _call.ServicePath = servicePath + _call.ServiceMethod = serviceMethod + _call.Args = args + _call.Reply = reply + if done == nil { + done = make(chan *client.Call, 10) // buffered. + } else { + if cap(done) == 0 { + log.Panic("rpc: done channel is unbuffered") + } + } + _call.Done = done + this.send(ctx, conn, spath[0], serviceMethod, metadata, _call) + return +} + +//发送远程调用请求 +func (this *Service) send(ctx context.Context, conn net.Conn, servicePath string, serviceMethod string, metadata map[string]string, call *client.Call) { + defer func() { + if call.Error != nil { + call.Done <- call + } + }() + serializeType := this.options.SerializeType + codec := share.Codecs[serializeType] + if codec == nil { + call.Error = client.ErrUnsupportedCodec + return + } + data, err := codec.Encode(call.Args) + if err != nil { + call.Error = err + return + } + + this.mutex.Lock() + seq := this.seq + this.seq++ + this.pending[seq] = call + this.mutex.Unlock() + if cseq, ok := ctx.Value(seqKey{}).(*uint64); ok { + *cseq = seq + } + req := protocol.GetPooledMsg() + req.SetMessageType(protocol.Request) + req.SetSeq(seq) + req.SetOneway(true) + req.SetSerializeType(this.options.SerializeType) + req.ServicePath = servicePath + req.ServiceMethod = serviceMethod + req.Metadata = metadata + req.Payload = data + + b := req.EncodeSlicePointer() + if _, err = conn.Write(*b); err != nil { + call.Error = err + this.mutex.Lock() + delete(this.pending, seq) + this.mutex.Unlock() + return + } + protocol.PutData(b) + protocol.FreeMsg(req) + return +} diff --git a/lego/sys/rpcx/utils.go b/lego/sys/rpcx/utils.go new file mode 100644 index 000000000..f007f2060 --- /dev/null +++ b/lego/sys/rpcx/utils.go @@ -0,0 +1,87 @@ +package rpcx + +import ( + "context" + "fmt" + "go_dreamfactory/lego/sys/log" + "reflect" + "runtime" + "sync" +) + +var typeOfError = reflect.TypeOf((*error)(nil)).Elem() +var typeOfContext = reflect.TypeOf((*context.Context)(nil)).Elem() + +type seqKey struct{} +type methodType struct { + sync.Mutex // protects counters + method reflect.Method + ArgType reflect.Type + ReplyType reflect.Type + // numCalls uint +} + +type functionType struct { + sync.Mutex // protects counters + fn reflect.Value + ArgType reflect.Type + ReplyType reflect.Type +} + +type service struct { + name string // name of service + rcvr reflect.Value // receiver of methods for the service + typ reflect.Type // type of the receiver + method map[string]*methodType // registered methods + function map[string]*functionType // registered functions +} + +func (this *service) call(ctx context.Context, mtype *methodType, argv, replyv reflect.Value) (err error) { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + n := runtime.Stack(buf, false) + buf = buf[:n] + + err = fmt.Errorf("[service internal error]: %v, method: %s, argv: %+v, stack: %s", + r, mtype.method.Name, argv.Interface(), buf) + log.Errorf("err%v", err) + } + }() + + function := mtype.method.Func + // Invoke the method, providing a new value for the reply. + returnValues := function.Call([]reflect.Value{this.rcvr, reflect.ValueOf(ctx), argv, replyv}) + // The return value for the method is an error. + errInter := returnValues[0].Interface() + if errInter != nil { + return errInter.(error) + } + return nil +} + +//执行注册方法 +func (this *service) callForFunction(ctx context.Context, ft *functionType, argv, replyv reflect.Value) (err error) { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + n := runtime.Stack(buf, false) + buf = buf[:n] + + // log.Errorf("failed to invoke service: %v, stacks: %s", r, string(debug.Stack())) + err = fmt.Errorf("[service internal error]: %v, function: %s, argv: %+v, stack: %s", + r, runtime.FuncForPC(ft.fn.Pointer()), argv.Interface(), buf) + log.Errorf("err:%v", err) + } + }() + + // Invoke the function, providing a new value for the reply. + returnValues := ft.fn.Call([]reflect.Value{reflect.ValueOf(ctx), argv, replyv}) + // The return value for the method is an error. + errInter := returnValues[0].Interface() + if errInter != nil { + return errInter.(error) + } + + return nil +} diff --git a/services/worker/main.go b/services/worker/main.go index cca649e65..3458ede0f 100644 --- a/services/worker/main.go +++ b/services/worker/main.go @@ -3,7 +3,6 @@ package main import ( "flag" "fmt" - "go_dreamfactory/modules/dbservice" "go_dreamfactory/modules/friend" "go_dreamfactory/modules/mail" "go_dreamfactory/modules/pack" @@ -41,7 +40,6 @@ func main() { pack.NewModule(), mail.NewModule(), friend.NewModule(), - dbservice.NewModule(), ) }