diff --git a/go.mod b/go.mod index c632988c2..ca004b778 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/mitchellh/hashstructure v1.1.0 github.com/nacos-group/nacos-sdk-go v1.0.8 github.com/natefinch/lumberjack v2.0.0+incompatible + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/robfig/cron/v3 v3.0.1 github.com/rs/xid v1.3.0 github.com/satori/go.uuid v1.2.0 @@ -53,6 +54,7 @@ require ( github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 // indirect github.com/go-playground/locales v0.14.0 // indirect github.com/go-playground/universal-translator v0.18.0 // indirect + github.com/go-redis/redis_rate/v9 v9.1.2 // indirect github.com/go-stack/stack v1.8.0 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -68,6 +70,7 @@ require ( github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/serf v0.9.7 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/juju/ratelimit v1.0.1 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect diff --git a/go.sum b/go.sum index b304dc2d4..992ed8e23 100644 --- a/go.sum +++ b/go.sum @@ -170,6 +170,7 @@ github.com/go-redis/redis/v8 v8.8.2/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqW github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-redis/redis_rate/v9 v9.1.2 h1:H0l5VzoAtOE6ydd38j8MCq3ABlGLnvvbA1xDSVVCHgQ= github.com/go-redis/redis_rate/v9 v9.1.2/go.mod h1:oam2de2apSgRG8aJzwJddXbNu91Iyz1m8IKJE2vpvlQ= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= @@ -359,6 +360,7 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c h1:qSHzRbhzK8RdXOsAdfDgO49TtqC1oZ+acxPrkfTxcCs= github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4= github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag= @@ -548,6 +550,7 @@ github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= diff --git a/lego/base/rpcx/service.go b/lego/base/rpcx/service.go index fb0462c5b..28b09d2c5 100644 --- a/lego/base/rpcx/service.go +++ b/lego/base/rpcx/service.go @@ -4,29 +4,22 @@ import ( "context" "fmt" "runtime" - "sync" - "go_dreamfactory/lego" "go_dreamfactory/lego/base" "go_dreamfactory/lego/core" "go_dreamfactory/lego/core/cbase" "go_dreamfactory/lego/sys/cron" "go_dreamfactory/lego/sys/event" "go_dreamfactory/lego/sys/log" - "go_dreamfactory/lego/sys/registry" "go_dreamfactory/lego/sys/rpcx" - "go_dreamfactory/lego/utils/container/sortslice" - "go_dreamfactory/lego/utils/container/version" "github.com/smallnest/rpcx/client" ) type RPCXService struct { - cbase.ServiceBase //继承服务基类 + cbase.ServiceBase //继承服务基类+ opts *Options //服务启动的配置信息数据 - serverList sync.Map //集群服务会话管理列表对象 rpcxService base.IRPCXService //服务自身 通过接口可以实现上层服务类重构底层接口 - IsInClustered bool //当前服务是否已加入到集群中 } func (this *RPCXService) GetTag() string { @@ -89,12 +82,7 @@ func (this *RPCXService) InitSys() { } else { log.Infof("Sys event Init success !") } - if err := registry.OnInit(this.opts.Setting.Sys["registry"], registry.SetService(this.rpcxService), registry.SetListener(this.rpcxService.(registry.IListener))); err != nil { - log.Panicf(fmt.Sprintf("Sys registry Init err:%v", err)) - } else { - log.Infof("Sys registry Init success !") - } - if err := rpcx.OnInit(this.opts.Setting.Sys["rpcx"], rpcx.SetServiceId(this.GetId()), rpcx.SetPort(this.GetPort())); err != nil { + if err := rpcx.OnInit(this.opts.Setting.Sys["rpcx"], rpcx.SetServiceTag(this.GetTag()), rpcx.SetServiceId(this.GetId()), rpcx.SetServiceType(this.GetType()), rpcx.SetServiceVersion(this.GetVersion()), rpcx.SetServiceAddr(fmt.Sprintf("%s:%d", this.GetIp(), this.GetPort()))); err != nil { log.Panicf(fmt.Sprintf("Sys rpcx Init err:%v", err)) } else { log.Infof("Sys rpcx Init success !") @@ -103,9 +91,6 @@ func (this *RPCXService) InitSys() { if err := rpcx.Start(); err != nil { log.Panicf(fmt.Sprintf("Sys rpcx Start err:%v", err)) } - if err := registry.Start(); err != nil { - log.Panicf(fmt.Sprintf("Sys registry Start err:%v", err)) - } }) } @@ -126,142 +111,11 @@ func (this *RPCXService) Destroy() (err error) { if err = rpcx.Stop(); err != nil { return } - if err = registry.Stop(); err != nil { - return - } cron.Stop() err = this.ServiceBase.Destroy() return } -//注册服务会话 当有新的服务加入时 -func (this *RPCXService) FindServiceHandlefunc(node registry.ServiceNode) { - if _, ok := this.serverList.Load(node.Id); !ok { - if s, err := NewServiceSession(&node); err != nil { - log.Errorf("创建服务会话失败【%s】 err:%v", node.Id, err) - } else { - this.serverList.Store(node.Id, s) - } - } - if this.IsInClustered { - event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件 - } else { - if node.Id == this.opts.Setting.Id { //发现自己 加入集群成功 - this.IsInClustered = true - event.TriggerEvent(core.Event_RegistryStart) - } - } -} - -//更新服务会话 当有新的服务加入时 -func (this *RPCXService) UpDataServiceHandlefunc(node registry.ServiceNode) { - if ss, ok := this.serverList.Load(node.Id); ok { //已经在缓存中 需要更新节点信息 - session := ss.(base.IRPCXServiceSession) - if session.GetRpcId() != node.RpcId { - if s, err := NewServiceSession(&node); err != nil { - log.Errorf("更新服务会话失败【%s】 err:%v", node.Id, err) - } else { - this.serverList.Store(node.Id, s) - } - event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件 - } else { - if session.GetVersion() != node.Version { - session.SetVersion(node.Version) - } - if session.GetPreWeight() != node.PreWeight { - session.SetPreWeight(node.PreWeight) - } - event.TriggerEvent(core.Event_UpDataOldService, node) //触发发现新的服务事件 - } - } -} - -//注销服务会话 -func (this *RPCXService) LoseServiceHandlefunc(sId string) { - session, ok := this.serverList.Load(sId) - if ok && session != nil { - session.(base.IRPCXServiceSession).Done() - this.serverList.Delete(sId) - } - event.TriggerEvent(core.Event_LoseService, sId) //触发发现新的服务事件 -} - -func (this *RPCXService) getServiceSessionByType(sType string, sIp string) (ss []base.IRPCXServiceSession, err error) { - ss = make([]base.IRPCXServiceSession, 0) - if nodes := registry.GetServiceByType(sType); nodes == nil { - log.Errorf("获取目标类型 type【%s】ip [%s] 服务集失败", sType, sIp) - return nil, err - } else { - if sIp == core.AutoIp { - for _, v := range nodes { - if s, ok := this.serverList.Load(v.Id); ok { - ss = append(ss, s.(base.IRPCXServiceSession)) - } else { - s, err = NewServiceSession(v) - if err != nil { - log.Errorf("创建服务会话失败【%s】 err:%v", v.Id, err) - continue - } else { - this.serverList.Store(v.Id, s) - ss = append(ss, s.(base.IRPCXServiceSession)) - } - } - } - } else { - for _, v := range nodes { - if v.IP == sIp { - if s, ok := this.serverList.Load(v.Id); ok { - ss = append(ss, s.(base.IRPCXServiceSession)) - } else { - s, err = NewServiceSession(v) - if err != nil { - log.Errorf("创建服务会话失败【%s】 err:%v", v.Id, err) - continue - } else { - this.serverList.Store(v.Id, s) - ss = append(ss, s.(base.IRPCXServiceSession)) - } - } - } - } - } - } - return -} - -//默认路由规则 -func (this *RPCXService) DefauleRpcRouteRules(stype string, sip string) (ss base.IRPCXServiceSession, err error) { - if s, e := this.getServiceSessionByType(stype, sip); e != nil { - return nil, e - } else { - ss := make([]interface{}, len(s)) - for i, v := range s { - ss[i] = v - } - if len(ss) > 0 { - //排序找到最优服务 - sortslice.Sort(ss, func(a interface{}, b interface{}) int8 { - as := a.(base.IRPCXServiceSession) - bs := b.(base.IRPCXServiceSession) - if iscompare := version.CompareStrVer(as.GetVersion(), bs.GetVersion()); iscompare != 0 { - return iscompare - } else { - if as.GetPreWeight() < bs.GetPreWeight() { - return 1 - } else if as.GetPreWeight() > bs.GetPreWeight() { - return -1 - } else { - return 0 - } - } - }) - return ss[0].(base.IRPCXServiceSession), nil - } else { - return nil, fmt.Errorf("未找到IP[%s]类型%s】的服务信息", sip, stype) - } - } -} - //注册服务对象 func (this *RPCXService) Register(rcvr interface{}) (err error) { err = rpcx.Register(rcvr) @@ -281,65 +135,11 @@ func (this *RPCXService) RegisterFunctionName(name string, fn interface{}) (err } //同步 执行目标远程服务方法 -func (this *RPCXService) RpcCallById(sId string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (err error) { - defer lego.Recover(fmt.Sprintf("RpcCallById sId:%s rkey:%v arg %v", sId, serviceMethod, args)) - ss, ok := this.serverList.Load(sId) - if !ok { - if node, err := registry.GetServiceById(sId); err != nil { - log.Errorf("未找到目标服务【%s】节点 err:%v", sId, err) - return fmt.Errorf("No Found " + sId) - } else { - ss, err = NewServiceSession(node) - if err != nil { - return fmt.Errorf(fmt.Sprintf("创建服务会话失败【%s】 err:%v", sId, err)) - } else { - this.serverList.Store(node.Id, ss) - } - } - } - err = ss.(base.IRPCXServiceSession).Call(ctx, serviceMethod, args, reply) - return +func (this *RPCXService) RpcCall(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + return rpcx.Call(ctx, servicePath, serviceMethod, args, reply) } //异步 执行目标远程服务方法 -func (this *RPCXService) RpcGoById(sId string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (call *client.Call, err error) { - defer lego.Recover(fmt.Sprintf("RpcGoById sId:%s rkey:%v arg %v", sId, serviceMethod, args)) - ss, ok := this.serverList.Load(sId) - if !ok { - if node, err := registry.GetServiceById(sId); err != nil { - log.Errorf("未找到目标服务【%s】节点 err:%v", sId, err) - return nil, fmt.Errorf("No Found " + sId) - } else { - ss, err = NewServiceSession(node) - if err != nil { - return nil, fmt.Errorf(fmt.Sprintf("创建服务会话失败【%s】 err:%v", sId, err)) - } else { - this.serverList.Store(node.Id, ss) - } - } - } - call, err = ss.(base.IRPCXServiceSession).Go(ctx, serviceMethod, args, reply) - return -} - -func (this *RPCXService) RpcCallByType(sType string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (err error) { - defer lego.Recover(fmt.Sprintf("RpcCallByType sType:%s rkey:%s arg %v", sType, serviceMethod, args)) - ss, err := this.rpcxService.DefauleRpcRouteRules(sType, core.AutoIp) - if err != nil { - log.Errorf("未找到目标服务【%s】节点 err:%v", sType, err) - return err - } - err = ss.Call(ctx, serviceMethod, args, reply) - return -} - -func (this *RPCXService) RpcGoByType(sType string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (call *client.Call, err error) { - defer lego.Recover(fmt.Sprintf("RpcCallByType sType:%s rkey:%s arg %v", sType, serviceMethod, args)) - ss, err := this.rpcxService.DefauleRpcRouteRules(sType, core.AutoIp) - if err != nil { - log.Errorf("未找到目标服务【%s】节点 err:%v", sType, err) - return nil, err - } - call, err = ss.Go(ctx, serviceMethod, args, reply) - return +func (this *RPCXService) RpcGo(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (call *client.Call, err error) { + return rpcx.Go(ctx, servicePath, serviceMethod, args, reply, nil) } diff --git a/lego/base/rpcx/servicesession.go b/lego/base/rpcx/servicesession.go deleted file mode 100644 index 44085e5af..000000000 --- a/lego/base/rpcx/servicesession.go +++ /dev/null @@ -1,64 +0,0 @@ -package rpcx - -import ( - "context" - "fmt" - - "go_dreamfactory/lego/base" - "go_dreamfactory/lego/sys/registry" - "go_dreamfactory/lego/sys/rpcx" - - "github.com/smallnest/rpcx/client" -) - -func NewServiceSession(node *registry.ServiceNode) (ss base.IRPCXServiceSession, err error) { - session := new(ServiceSession) - session.node = node - session.client, err = rpcx.NewRpcClient(fmt.Sprintf("%s:%d", node.IP, node.Port), node.Id) - ss = session - return -} - -type ServiceSession struct { - node *registry.ServiceNode - client rpcx.IRPCXClient -} - -func (this *ServiceSession) GetId() string { - return this.node.Id -} -func (this *ServiceSession) GetIp() string { - return this.node.IP -} - -func (this *ServiceSession) GetRpcId() string { - return this.node.RpcId -} - -func (this *ServiceSession) GetType() string { - return this.node.Type -} -func (this *ServiceSession) GetVersion() string { - return this.node.Version -} -func (this *ServiceSession) SetVersion(v string) { - this.node.Version = v -} - -func (this *ServiceSession) GetPreWeight() float64 { - return this.node.PreWeight -} - -func (this *ServiceSession) SetPreWeight(p float64) { - this.node.PreWeight = p -} -func (this *ServiceSession) Done() { - this.client.Stop() -} -func (this *ServiceSession) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error { - return this.client.Call(ctx, serviceMethod, args, reply) -} - -func (this *ServiceSession) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (*client.Call, error) { - return this.client.Go(ctx, serviceMethod, args, reply, nil) -} diff --git a/lego/sys/rpcx/client.go b/lego/sys/rpcx/client.go index 3a3425bd8..cd0546fcc 100644 --- a/lego/sys/rpcx/client.go +++ b/lego/sys/rpcx/client.go @@ -2,30 +2,102 @@ package rpcx import ( "context" + "errors" + "strings" "github.com/smallnest/rpcx/client" + "github.com/smallnest/rpcx/share" ) -func newClient(addr string, sId string) (c *Client, err error) { - c = &Client{} - d, err := client.NewPeer2PeerDiscovery("tcp@"+addr, "") - c.xclient = client.NewXClient(sId, client.Failfast, client.RandomSelect, d, client.DefaultOption) +func newClient(rpcx *RPCX) (c *Client) { + c = &Client{ + rpcx: rpcx, + clients: make(map[string]client.XClient), + // msgChan: make(chan *protocol.Message, 1000), + } return } type Client struct { - xclient client.XClient + rpcx *RPCX + clients map[string]client.XClient + // msgChan chan *protocol.Message // 接收rpcXServer推送消息 +} + +// DoMessage 服务端消息处理 +func (this *Client) DoMessage() { + // for msg := range this.msgChan { + + // } +} + +func (this *Client) Start() (err error) { + return } func (this *Client) Stop() (err error) { - err = this.xclient.Close() + for _, v := range this.clients { + v.Close() + } return } -func (this *Client) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) { - err = this.xclient.Call(ctx, string(serviceMethod), args, reply) +//同步调用 +func (this *Client) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + if servicePath == "" { + err = errors.New("servicePath no cant null") + return + } + var ( + spath []string + d *client.ConsulDiscovery + c client.XClient + ok bool + ) + spath = strings.Split(servicePath, "/") + if c, ok = this.clients[spath[0]]; !ok { + if d, err = client.NewConsulDiscovery(this.rpcx.options.ServiceTag, this.rpcx.options.ServiceType, this.rpcx.options.ConsulServers, nil); err != nil { + return + } + c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption) + c.SetSelector(newSelector()) + this.clients[spath[0]] = c + } + ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ + CallRoutRulesKey: servicePath, + ServiceAddrKey: "tcp@" + this.rpcx.options.ServiceAddr, + ServiceMetaKey: this.rpcx.metadata, + }) + err = c.Call(ctx, serviceMethod, args, reply) return } -func (this *Client) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (*client.Call, error) { - return this.xclient.Go(ctx, string(serviceMethod), args, reply, done) + +//异步调用 +func (this *Client) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { + // return this.xclient.Go(ctx, string(serviceMethod), args, reply, done) + if servicePath == "" { + err = errors.New("servicePath no cant null") + return + } + var ( + spath []string + d *client.ConsulDiscovery + c client.XClient + ok bool + ) + spath = strings.Split(servicePath, "/") + if c, ok = this.clients[spath[0]]; !ok { + if d, err = client.NewConsulDiscovery(this.rpcx.options.ServiceTag, this.rpcx.options.ServiceType, this.rpcx.options.ConsulServers, nil); err != nil { + return + } + c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption) + c.SetSelector(newSelector()) + this.clients[spath[0]] = c + } + ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ + CallRoutRulesKey: servicePath, + ServiceAddrKey: "tcp@" + this.rpcx.options.ServiceAddr, + ServiceMetaKey: this.rpcx.metadata, + }) + return c.Go(ctx, string(serviceMethod), args, reply, done) } diff --git a/lego/sys/rpcx/core.go b/lego/sys/rpcx/core.go index c1f6f48af..3b2b812f1 100644 --- a/lego/sys/rpcx/core.go +++ b/lego/sys/rpcx/core.go @@ -2,16 +2,23 @@ package rpcx import ( "context" + "fmt" + "net/url" "github.com/smallnest/rpcx/client" ) +const ( + ServiceMetaKey = "smeta" + ServiceAddrKey = "addr" + CallRoutRulesKey = "callrules" +) + type ( ISys interface { IRPCXServer - NewRpcClient(addr, sId string) (clent IRPCXClient, err error) + IRPCXClient } - IRPCXServer interface { Start() (err error) Stop() (err error) @@ -22,9 +29,10 @@ type ( } IRPCXClient interface { + Start() (err error) Stop() (err error) - Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error - Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (*client.Call, error) + Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) + Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) } ) @@ -64,6 +72,51 @@ func UnregisterAll() (err error) { return defsys.UnregisterAll() } -func NewRpcClient(addr, sId string) (clent IRPCXClient, err error) { - return defsys.NewRpcClient(addr, sId) +func Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + return defsys.Call(ctx, servicePath, serviceMethod, args, reply) +} + +func Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { + return defsys.Go(ctx, servicePath, serviceMethod, args, reply, done) +} + +//服务元数据转服务节点信息 +func smetaToServiceNode(meta string) (node *ServiceNode, err error) { + if meta == "" { + fmt.Errorf("meta is nill") + return + } + node = &ServiceNode{} + data := make(map[string]string) + metadata, _ := url.ParseQuery(meta) + for k, v := range metadata { + if len(v) > 0 { + data[k] = v[0] + } + } + if sid, ok := data["sid"]; !ok { + err = fmt.Errorf("no found sid") + return + } else { + node.ServiceId = sid + } + if stype, ok := data["stype"]; !ok { + err = fmt.Errorf("no found stype") + return + } else { + node.ServiceType = stype + } + if version, ok := data["version"]; !ok { + err = fmt.Errorf("no found version") + return + } else { + node.Version = version + } + if addr, ok := data["addr"]; !ok { + err = fmt.Errorf("no found addr") + return + } else { + node.ServiceAddr = addr + } + return } diff --git a/lego/sys/rpcx/options.go b/lego/sys/rpcx/options.go index aab6fd0b1..cd2bd1ce5 100644 --- a/lego/sys/rpcx/options.go +++ b/lego/sys/rpcx/options.go @@ -3,17 +3,39 @@ package rpcx import ( "go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/utils/mapstructure" +) - "github.com/smallnest/rpcx/client" +type RpcxStartType int8 + +const ( + RpcxStartByService RpcxStartType = iota //启动服务端 + RpcxStartByClient //启动客户端 + RpcxStartByAll //服务端客户端都启动 ) type Option func(*Options) type Options struct { - ServiceId string //服务id - Port int //监听地址 - FailMode client.FailMode //失败模式 - Debug bool //日志是否开启 - Log log.ILog + ServiceTag string //集群标签 + ServiceType string //服务类型 + ServiceId string //服务id + ServiceVersion string //服务版本 + ServiceAddr string //服务地址 + ConsulServers []string //Consul集群服务地址 + RpcxStartType RpcxStartType //Rpcx启动类型 + Debug bool //日志是否开启 + Log log.ILog +} + +func SetServiceTag(v string) Option { + return func(o *Options) { + o.ServiceTag = v + } +} + +func SetServiceType(v string) Option { + return func(o *Options) { + o.ServiceType = v + } } func SetServiceId(v string) Option { @@ -21,11 +43,25 @@ func SetServiceId(v string) Option { o.ServiceId = v } } -func SetPort(v int) Option { + +func SetServiceVersion(v string) Option { return func(o *Options) { - o.Port = v + o.ServiceVersion = v } } + +func SetServiceAddr(v string) Option { + return func(o *Options) { + o.ServiceAddr = v + } +} + +func SetConsulServers(v []string) Option { + return func(o *Options) { + o.ConsulServers = v + } +} + func SetDebug(v bool) Option { return func(o *Options) { o.Debug = v diff --git a/lego/sys/rpcx/rpcx.go b/lego/sys/rpcx/rpcx.go index 9bc9a1272..86344ec70 100644 --- a/lego/sys/rpcx/rpcx.go +++ b/lego/sys/rpcx/rpcx.go @@ -1,40 +1,60 @@ package rpcx +import ( + "context" + "fmt" + + "github.com/smallnest/rpcx/client" +) + func newSys(options Options) (sys *RPCX, err error) { sys = &RPCX{ - options: options, - service: newService(options), + options: options, + metadata: fmt.Sprintf("stype=%s&sid=%s&version=%s&addr=%s", options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr), } + sys.service = newService(sys) + sys.client = newClient(sys) + // if options.RpcxStartType == RpcxStartByAll || options.RpcxStartType == RpcxStartByService { //创建RPCX 服务端 + + // } + + // if options.RpcxStartType == RpcxStartByAll || options.RpcxStartType == RpcxStartByClient { //创建RPCX 客户端 + + // } return } type RPCX struct { - options Options - service IRPCXServer + options Options + metadata string + service IRPCXServer + client IRPCXClient } func (this *RPCX) Start() (err error) { this.service.Start() + this.client.Start() return } func (this *RPCX) Stop() (err error) { - err = this.service.Stop() + this.service.Stop() + this.client.Stop() return } func (this *RPCX) Register(rcvr interface{}) (err error) { - err = this.service.Register(rcvr) + this.service.Register(rcvr) return } func (this *RPCX) RegisterFunction(fn interface{}) (err error) { - err = this.service.RegisterFunction(fn) + this.service.RegisterFunction(fn) return } func (this *RPCX) RegisterFunctionName(name string, fn interface{}) (err error) { - err = this.service.RegisterFunctionName(name, fn) + this.service.RegisterFunctionName(name, fn) return } @@ -42,6 +62,54 @@ func (this *RPCX) UnregisterAll() (err error) { return this.service.UnregisterAll() } -func (this *RPCX) NewRpcClient(addr, sId string) (clent IRPCXClient, err error) { - return newClient(addr, sId) +//同步调用 +func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) { + return this.client.Call(ctx, servicePath, serviceMethod, args, reply) +} + +//异步调用 +func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) { + return this.client.Go(ctx, servicePath, serviceMethod, args, reply, done) +} + +// func (this *RPCX) PostCall(ctx context.Context, serviceName, methodName string, args, reply interface{}) (interface{}, error) { +// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) +// fmt.Printf("PostCall servicePath:%v serviceMethod:%v RemoteAddr:%v \n", serviceName, methodName, clientConn.RemoteAddr().String()) +// return args, nil +// } + +///日志*********************************************************************** +func (this *RPCX) Debug() bool { + return this.options.Debug +} + +func (this *RPCX) Debugf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Debugf("[SYS RPCX] "+format, a...) + } +} +func (this *RPCX) Infof(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Infof("[SYS RPCX] "+format, a...) + } +} +func (this *RPCX) Warnf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Warnf("[SYS RPCX] "+format, a...) + } +} +func (this *RPCX) Errorf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Errorf("[SYS RPCX] "+format, a...) + } +} +func (this *RPCX) Panicf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Panicf("[SYS RPCX] "+format, a...) + } +} +func (this *RPCX) Fatalf(format string, a ...interface{}) { + if this.options.Debug { + this.options.Log.Fatalf("[SYS RPCX] "+format, a...) + } } diff --git a/lego/sys/rpcx/rpcx_test.go b/lego/sys/rpcx/rpcx_test.go new file mode 100644 index 000000000..70a592f59 --- /dev/null +++ b/lego/sys/rpcx/rpcx_test.go @@ -0,0 +1,178 @@ +package rpcx + +import ( + "context" + "fmt" + "net" + "os" + "os/signal" + "syscall" + "testing" + "time" + + "go_dreamfactory/lego/sys/log" + + "github.com/rcrowley/go-metrics" + "github.com/smallnest/rpcx/client" + "github.com/smallnest/rpcx/protocol" + "github.com/smallnest/rpcx/server" + "github.com/smallnest/rpcx/serverplugin" + "github.com/smallnest/rpcx/share" +) + +func Test_Sys(t *testing.T) { + if err := log.OnInit(nil); err != nil { + fmt.Printf("err:%v", err) + return + } + if sys, err := NewSys( + SetServiceTag("rpcx_test"), + SetServiceType("worker"), + SetServiceId("worker_1"), + SetServiceVersion("1.0.0"), + SetServiceAddr("127.0.0.1:9978"), + SetConsulServers([]string{"10.0.0.9:8500"}), + ); err != nil { + fmt.Printf("err:%v", err) + return + } else { + if err = sys.Register(new(Arith)); err != nil { + fmt.Printf("err:%v", err) + return + } + if err = sys.Start(); err != nil { + fmt.Printf("err:%v", err) + return + } + go func() { + time.Sleep(time.Second * 3) + if err = sys.Call(context.Background(), "worker/worker_1", "Mul", &Args{A: 1, B: 2}, &Reply{}); err != nil { + fmt.Printf("Call:%v \n", err) + return + } + }() + } + + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) + select { + case <-sigterm: + fmt.Printf("terminating: via signal\n") + } +} + +var addr = "127.0.0.1:9978" + +// go server.go +func Test_RPCX(t *testing.T) { + s := server.NewServer() + if err := addRegistryPlugin(s); err != nil { + fmt.Printf("err:%v", err) + return + } + go func() { + time.Sleep(time.Second) + s.RegisterName("worker", new(Arith), "stype=worker&sid=worker_1&version=1.0.0") + }() + + go func() { + time.Sleep(time.Second * 3) + if d, err := client.NewConsulDiscovery("rpcx_test", "worker", []string{"10.0.0.9:8500"}, nil); err != nil { + fmt.Printf("NewConsulDiscovery err:%v", err) + return + } else { + xclient := client.NewXClient("worker", client.Failfast, client.RandomSelect, d, client.DefaultOption) + xclient.SetSelector(newSelector()) + ctx := context.WithValue(context.Background(), share.ReqMetaDataKey, map[string]string{"RoutRules": "worker/worker_1"}) + if err = xclient.Call(ctx, "Mul", &Args{A: 1, B: 2}, &Reply{}); err != nil { + fmt.Printf("Call:%v \n", err) + return + } + } + + }() + + s.Serve("tcp", addr) +} + +func addRegistryPlugin(s *server.Server) (err error) { + r := &serverplugin.ConsulRegisterPlugin{ + ServiceAddress: "tcp@" + addr, + ConsulServers: []string{"10.0.0.9:8500"}, + BasePath: "rpcx_test", + Metrics: metrics.NewRegistry(), + UpdateInterval: time.Minute, + } + err = r.Start() + if err != nil { + return + } + s.Plugins.Add(r) + s.Plugins.Add(&call{}) + return +} + +type call struct{} + +// func (this *call) PostCall(ctx context.Context, serviceName, methodName string, args, reply interface{}) (interface{}, error) { +// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) +// RoutRules := ctx.Value("RoutRules") +// fmt.Printf("PostCall servicePath:%v serviceMethod:%v RoutRules:%s RemoteAddr:%v \n", serviceName, methodName, RoutRules, clientConn.RemoteAddr().String()) +// return args, nil +// } + +// func (this *call) PreCall(ctx context.Context, serviceName, methodName string, args interface{}) (interface{}, error) { +// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) +// RoutRules := ctx.Value("RoutRules").(string) +// fmt.Printf("PostCall servicePath:%v serviceMethod:%v RoutRules:%s RemoteAddr:%v \n", serviceName, methodName, RoutRules, clientConn.RemoteAddr().String()) +// return args, nil +// } + +// func (this *call) PreReadRequest(ctx context.Context) error { +// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) +// RoutRules := ctx.Value(share.ReqMetaDataKey).(map[string]string) +// fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String()) +// return nil +// } + +// func (this *call) PreWriteResponse(ctx context.Context, args *protocol.Message, repy *protocol.Message, errInter error) error { +// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) +// RoutRules := ctx.Value("RoutRules").(string) +// fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String()) +// return nil +// } + +func (this *call) PreHandleRequest(ctx context.Context, r *protocol.Message) error { + clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) + RoutRules := ctx.Value(share.ReqMetaDataKey).(map[string]string) + fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String()) + return nil +} + +type Args struct { + A int + B int +} + +type Reply struct { + C int +} + +type Arith int + +func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error { + reply.C = args.A * args.B + fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C) + return nil +} + +func (t *Arith) Add(ctx context.Context, args *Args, reply *Reply) error { + reply.C = args.A + args.B + fmt.Printf("call: %d + %d = %d\n", args.A, args.B, reply.C) + return nil +} + +func (t *Arith) Say(ctx context.Context, args *string, reply *string) error { + *reply = "hello " + *args + return nil +} diff --git a/lego/sys/rpcx/selector.go b/lego/sys/rpcx/selector.go new file mode 100644 index 000000000..4d29ab35f --- /dev/null +++ b/lego/sys/rpcx/selector.go @@ -0,0 +1,75 @@ +package rpcx + +import ( + "context" + "fmt" + "strings" + + "github.com/smallnest/rpcx/share" +) + +func newSelector() *Selector { + return &Selector{ + servers: make(map[string]*ServiceNode), + serversType: make(map[string][]*ServiceNode), + i: make(map[string]int), + } +} + +type ServiceNode struct { + ServiceId string `json:"sid"` //服务id + ServiceType string `json:"stype"` //服务类型 + Version string `json:"version"` //服务版本 + ServiceAddr string `json:"addr"` //服务地址 +} + +type Selector struct { + servers map[string]*ServiceNode + serversType map[string][]*ServiceNode + i map[string]int +} + +///servicePath = [stype] or [stype/sid] +func (this *Selector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string { + fmt.Printf("Select servicePath:%v serviceMethod:%v RoutRules:%v \n", servicePath, serviceMethod, ctx.Value("RoutRules")) + routrules := ctx.Value(share.ReqMetaDataKey).(map[string]string)["RoutRules"] + service := strings.Split(routrules, "/") + leng := len(service) + if leng == 1 { + if nodes, ok := this.serversType[service[0]]; ok { + i, ok := this.i[service[0]] + if !ok { + i = 0 + } + i = i % len(nodes) + this.i[service[0]] = i + 1 + return nodes[i].ServiceAddr + } + } else if leng == 2 { + if node, ok := this.servers[service[1]]; ok { + return node.ServiceAddr + } + } + return "" +} + +func (this *Selector) UpdateServer(servers map[string]string) { + ss := make(map[string]*ServiceNode) + sst := make(map[string][]*ServiceNode) + for _, v := range servers { + if node, err := smetaToServiceNode(v); err != nil { + continue + } else { + ss[node.ServiceId] = node + if ssts, ok := sst[node.ServiceType]; !ok { + sst[node.ServiceType] = make([]*ServiceNode, 0) + sst[node.ServiceType] = append(sst[node.ServiceType], node) + } else { + ssts = append(ssts, node) + } + } + + } + this.servers = ss + this.serversType = sst +} diff --git a/lego/sys/rpcx/service.go b/lego/sys/rpcx/service.go index 86c2dc5bb..d89206b2b 100644 --- a/lego/sys/rpcx/service.go +++ b/lego/sys/rpcx/service.go @@ -1,26 +1,49 @@ package rpcx import ( - "fmt" + "context" + "net" + "time" + "github.com/rcrowley/go-metrics" + "github.com/smallnest/rpcx/client" "github.com/smallnest/rpcx/server" + "github.com/smallnest/rpcx/serverplugin" ) -func newService(options Options) (s *Service) { +func newService(rpcx *RPCX) (s *Service) { s = &Service{ - server: server.NewServer(), - options: options, + server: server.NewServer(), + rpcx: rpcx, + // clients: make(map[string]net.Conn), + // clientmeta: make(map[string]string), } return } type Service struct { - server *server.Server - options Options + rpcx *RPCX + server *server.Server + selector client.Selector + clients map[string]net.Conn + clientmeta map[string]string } func (this *Service) Start() (err error) { - go this.server.Serve("tcp", fmt.Sprintf(":%d", this.options.Port)) + r := &serverplugin.ConsulRegisterPlugin{ + ServiceAddress: "tcp@" + this.rpcx.options.ServiceAddr, + ConsulServers: []string{"10.0.0.9:8500"}, + BasePath: this.rpcx.options.ServiceAddr, + Metrics: metrics.NewRegistry(), + UpdateInterval: time.Minute, + } + if err = r.Start(); err != nil { + return + } + this.server.Plugins.Add(r) + go func() { + this.server.Serve("tcp", this.rpcx.options.ServiceAddr) + }() return } @@ -30,18 +53,80 @@ func (this *Service) Stop() (err error) { } func (this *Service) Register(rcvr interface{}) (err error) { - err = this.server.RegisterName(this.options.ServiceId, rcvr, "") + err = this.server.RegisterName(this.rpcx.options.ServiceType, rcvr, this.rpcx.metadata) return } + func (this *Service) RegisterFunction(fn interface{}) (err error) { - err = this.server.RegisterFunction(this.options.ServiceId, fn, "") + err = this.server.RegisterFunction(this.rpcx.options.ServiceType, fn, this.rpcx.metadata) return } func (this *Service) RegisterFunctionName(name string, fn interface{}) (err error) { - err = this.server.RegisterFunctionName(this.options.ServiceId, name, fn, "") + err = this.server.RegisterFunctionName(this.rpcx.options.ServiceType, name, fn, this.rpcx.metadata) return } func (this *Service) UnregisterAll() (err error) { err = this.server.UnregisterAll() return } + +//同步调用 +func (this *Service) Call(servicePath string, ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) { + // var ( + // spath string + // clientaddr string + // conn net.Conn + // ok bool + // ) + // if servicePath == "" { + // err = errors.New("servicePath no cant null") + // return + // } + // spath := strings.Split(servicePath, "/") + // ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{ + // CallRoutRulesKey: servicePath, + // ServiceAddrKey: "tcp@" + this.options.ServiceAddr, + // ServiceMetaKey: this.metadata, + // }) + // if clientaddr = this.selector.Select(ctx, spath[0], serviceMethod, args); clientaddr == "" { + // err = fmt.Errorf("on found routRules:%s", routRules) + // return + // } + // if conn, ok = this.clients[clientaddr]; !ok { + // err = fmt.Errorf("on found clientaddr:%s", clientaddr) + // return + // } + // err := this.server.SendMessage(conn, spath[0], serviceMethod, nil, []byte("abcde")){ + + // } + return +} + +//异步调用 +func (this *Service) Go(routRules string, ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) { + return +} + +//发现服务 +func (this *Service) Discovery(addr string, conn net.Conn, meta string) { + this.clientmeta[addr] = meta + this.clients[addr] = conn + this.selector.UpdateServer(this.clientmeta) +} + +// //监听客户端链接到服务上 保存客户端的连接对象 +// func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message) error { +// if smeta, ok := ctx.Value(share.ReqMetaDataKey).(map[string]string)[ServiceAddrKey]; ok { +// // log.Errorf("smeta:%s err:%v", smeta, ok) +// if node, err := smetaToServiceNode(smeta); err == nil { +// if _, ok = this.clientmeta[node.ServiceId]; !ok { +// this.clientmeta[node.ServiceId] = smeta +// } +// } +// } + +// // clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn) + +// // fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String()) +// return nil +// } diff --git a/modules/gate_comp.go b/modules/gate_comp.go index 097ff4dd7..a00e645cc 100644 --- a/modules/gate_comp.go +++ b/modules/gate_comp.go @@ -30,6 +30,7 @@ type MComp_GateComp struct { service base.IRPCXService //rpc服务对象 module core.IModule //当前业务模块 comp core.IModuleComp //网关组件自己 + scomp comm.ISC_GateRouteComp } //组件初始化接口 @@ -51,56 +52,54 @@ func (this *MComp_GateComp) Start() (err error) { if comp, err = this.service.GetComp(comm.SC_ServiceGateRouteComp); err != nil { return } - this.suitableMethods(comp.(comm.ISC_GateRouteComp), reflect.TypeOf(this.comp)) + this.scomp = comp.(comm.ISC_GateRouteComp) + this.suitableMethods(reflect.TypeOf(this.comp)) return } //反射注册相关接口道services/comp_gateroute.go 对象中 -func (this *MComp_GateComp) suitableMethods(scomp comm.ISC_GateRouteComp, typ reflect.Type) { +func (this *MComp_GateComp) suitableMethods(typ reflect.Type) { for m := 0; m < typ.NumMethod(); m++ { method := typ.Method(m) - mtype := method.Type - mname := method.Name - // Method must be exported. - if method.PkgPath != "" { - continue - } - // Method needs four ins: receiver, context.Context, *args, *reply. - if mtype.NumIn() != 4 { - continue - } - // First arg must be context.Context - ctxType := mtype.In(1) - if !ctxType.Implements(typeOfContext) { - continue - } - - // Second arg need not be a pointer. - argType := mtype.In(2) - if !argType.Implements(typeOfSession) { - continue - } - // Third arg must be a pointer. - replyType := mtype.In(3) - if replyType.Kind() != reflect.Ptr { - continue - } - // Reply type must be exported. - if !this.isExportedOrBuiltinType(replyType) { - continue - } - // Method needs one out. - if mtype.NumOut() != 1 { - continue - } - // The return type of the method must be error. - if returnType := mtype.Out(0); returnType != typeOfError { - continue - } - scomp.RegisterRoute(fmt.Sprintf("%s.%s", this.module.GetType(), strings.ToLower(mname)), reflect.ValueOf(this.comp), replyType, method) + this.reflectionRouteHandle(method) } } +//反射路由处理函数 +func (this *MComp_GateComp) reflectionRouteHandle(method reflect.Method) bool { + mtype := method.Type + mname := method.Name + if method.PkgPath != "" { + return false + } + if mtype.NumIn() != 4 { + return false + } + ctxType := mtype.In(1) + if !ctxType.Implements(typeOfContext) { + return false + } + argType := mtype.In(2) + if !argType.Implements(typeOfSession) { + return false + } + replyType := mtype.In(3) + if replyType.Kind() != reflect.Ptr { + return false + } + if !this.isExportedOrBuiltinType(replyType) { + return false + } + if mtype.NumOut() != 1 { + return false + } + if returnType := mtype.Out(0); returnType != typeOfError { + return false + } + this.scomp.RegisterRoute(fmt.Sprintf("%s.%s", this.module.GetType(), strings.ToLower(mname)), reflect.ValueOf(this.comp), replyType, method) + return true +} + func (this *MComp_GateComp) isExportedOrBuiltinType(t reflect.Type) bool { for t.Kind() == reflect.Ptr { t = t.Elem() diff --git a/modules/gateway/agent.go b/modules/gateway/agent.go index 2371e9ba5..f970d7ec0 100644 --- a/modules/gateway/agent.go +++ b/modules/gateway/agent.go @@ -62,6 +62,7 @@ locp: go this.Close() break locp } + if err = proto.Unmarshal(data, msg); err != nil { log.Errorf("agent:%s uId:%s Unmarshal err:%v", this.sessionId, this.uId, err) go this.Close() diff --git a/modules/pack/api_getlist.go b/modules/pack/api_getlist.go index 4f5921606..4d65750a2 100644 --- a/modules/pack/api_getlist.go +++ b/modules/pack/api_getlist.go @@ -10,7 +10,7 @@ import ( ) //参数校验 -func (this *Api_Comp) getlist_check(ctx context.Context, session comm.IUserSession, req *pb.GetlistReq) (code pb.ErrorCode) { +func (this *Api_Comp) Getlist_Check(ctx context.Context, session comm.IUserSession, req *pb.GetlistReq) (code pb.ErrorCode) { if !session.IsLogin() { code = pb.ErrorCode_NoLogin return @@ -36,7 +36,7 @@ func (this *Api_Comp) Getlist(ctx context.Context, session comm.IUserSession, re }() } }() - if code = this.getlist_check(ctx, session, req); code != pb.ErrorCode_Success { + if code = this.Getlist_Check(ctx, session, req); code != pb.ErrorCode_Success { return } if pack, err = cache.Defsys.Pack_QueryUserPack(session.GetUserId()); err != nil { diff --git a/modules/pack/api_sellItem.go b/modules/pack/api_sellItem.go index 33c9a27cc..da9a3f651 100644 --- a/modules/pack/api_sellItem.go +++ b/modules/pack/api_sellItem.go @@ -7,7 +7,7 @@ import ( ) //参数校验 -func (this *Api_Comp) sellItem_check(ctx context.Context, session comm.IUserSession, req *pb.SellItemReq) (code pb.ErrorCode) { +func (this *Api_Comp) SellItem_Check(ctx context.Context, session comm.IUserSession, req *pb.SellItemReq) (code pb.ErrorCode) { if !session.IsLogin() { code = pb.ErrorCode_NoLogin return @@ -23,7 +23,7 @@ func (this *Api_Comp) SellItem(ctx context.Context, session comm.IUserSession, r defer func() { session.SendMsg(string(this.module.GetType()), SellItemResp, code, &pb.SellItemResp{}) }() - if code = this.sellItem_check(ctx, session, req); code != pb.ErrorCode_Success { + if code = this.SellItem_Check(ctx, session, req); code != pb.ErrorCode_Success { return } return diff --git a/modules/pack/api_useItem.go b/modules/pack/api_useItem.go index 6371c7a9a..930296641 100644 --- a/modules/pack/api_useItem.go +++ b/modules/pack/api_useItem.go @@ -7,7 +7,7 @@ import ( ) //参数校验 -func (this *Api_Comp) useitem_check(ctx context.Context, session comm.IUserSession, req *pb.UseItemReq) (code pb.ErrorCode) { +func (this *Api_Comp) Useitem_Check(ctx context.Context, session comm.IUserSession, req *pb.UseItemReq) (code pb.ErrorCode) { if !session.IsLogin() { code = pb.ErrorCode_NoLogin return @@ -23,7 +23,7 @@ func (this *Api_Comp) Useitem(ctx context.Context, session comm.IUserSession, re defer func() { session.SendMsg(string(this.module.GetType()), UseItemResp, code, &pb.UseItemResp{}) }() - if code = this.useitem_check(ctx, session, req); code != pb.ErrorCode_Success { + if code = this.Useitem_Check(ctx, session, req); code != pb.ErrorCode_Success { return } return diff --git a/services/comp_gateroute.go b/services/comp_gateroute.go index eee7a21b9..9f1ed05c9 100644 --- a/services/comp_gateroute.go +++ b/services/comp_gateroute.go @@ -2,6 +2,7 @@ package services import ( "context" + "fmt" "go_dreamfactory/comm" "go_dreamfactory/pb" "reflect" @@ -36,6 +37,7 @@ type SComp_GateRouteComp struct { cbase.ServiceCompBase service base.IRPCXService //rpc服务对象 通过这个对象可以发布服务和调用其他服务的接口 mrlock sync.RWMutex //msghandles 对象的锁 + msgcheck map[string]*msghandle //处理函数的校验接口 msghandles map[string]*msghandle //处理函数的管理对象 } @@ -56,6 +58,15 @@ func (this *SComp_GateRouteComp) Init(service core.IService, comp core.IServiceC func (this *SComp_GateRouteComp) Start() (err error) { this.service.RegisterFunctionName(string(comm.Rpc_GatewayRoute), this.ReceiveMsg) //注册网关路由接收接口 err = this.ServiceCompBase.Start() + for k, v := range this.msghandles { + if v1, ok := this.msgcheck[k]; !ok { + err = fmt.Errorf("注册用户消息处理函数:%s 没有实现参数校验接口", k) + return + } else if v.msgType != v1.msgType { + err = fmt.Errorf("注册用户消息处理函数:%s 实现参数校验接口不一致 请检查代码!", k) + return + } + } return } @@ -78,11 +89,31 @@ func (this *SComp_GateRouteComp) RegisterRoute(methodName string, comp reflect.V this.mrlock.Unlock() } +//业务模块注册用户消息处理路由 +func (this *SComp_GateRouteComp) RegisterRouteCheck(methodName string, comp reflect.Value, msg reflect.Type, fn reflect.Method) { + log.Debugf("注册用户路由校验[%s]", methodName) + this.mrlock.RLock() + _, ok := this.msgcheck[methodName] + this.mrlock.RUnlock() + if ok { + log.Errorf("重复 注册用户路由校验[%s]", methodName) + return + } + this.mrlock.Lock() + this.msgcheck[methodName] = &msghandle{ + rcvr: comp, + msgType: msg, + fn: fn, + } + this.mrlock.Unlock() +} + //Rpc_GatewayRoute服务接口的接收函数 func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.AgentMessage, reply *pb.RPCMessageReply) error { log.Debugf("SComp_GateRouteComp ReceiveMsg agent:%s uId:%s MessageDistribution msg:%s", args.UserSessionId, args.UserId, args.Method) this.mrlock.RLock() msghandle, ok := this.msghandles[args.Method] + msgcheck := this.msghandles[args.Method] this.mrlock.RUnlock() if ok { session := comm.NewUserSession(this.service, args.Ip, args.UserSessionId, args.GatewayServiceId, args.UserId) @@ -91,6 +122,7 @@ func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.AgentM log.Errorf("UserMessage:%s Unmarshal err:%v", args.Method, err) return err } + msghandle.fn.Func.Call([]reflect.Value{msgcheck.rcvr, reflect.ValueOf(ctx), reflect.ValueOf(session), reflect.ValueOf(msg)}) msghandle.fn.Func.Call([]reflect.Value{msghandle.rcvr, reflect.ValueOf(ctx), reflect.ValueOf(session), reflect.ValueOf(msg)}) } else { reply.Code = pb.ErrorCode_ReqParameterError