保存rpcx代码优化

This commit is contained in:
liwei1dao 2022-06-10 19:22:02 +08:00
parent cdf146f287
commit d4d8c074e0
17 changed files with 702 additions and 361 deletions

3
go.mod
View File

@ -13,6 +13,7 @@ require (
github.com/mitchellh/hashstructure v1.1.0
github.com/nacos-group/nacos-sdk-go v1.0.8
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/robfig/cron/v3 v3.0.1
github.com/rs/xid v1.3.0
github.com/satori/go.uuid v1.2.0
@ -53,6 +54,7 @@ require (
github.com/go-ping/ping v0.0.0-20211130115550-779d1e919534 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-redis/redis_rate/v9 v9.1.2 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
@ -68,6 +70,7 @@ require (
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/serf v0.9.7 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/juju/ratelimit v1.0.1 // indirect
github.com/julienschmidt/httprouter v1.3.0 // indirect

3
go.sum
View File

@ -170,6 +170,7 @@ github.com/go-redis/redis/v8 v8.8.2/go.mod h1:F7resOH5Kdug49Otu24RjHWwgK7u9AmtqW
github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-redis/redis_rate/v9 v9.1.2 h1:H0l5VzoAtOE6ydd38j8MCq3ABlGLnvvbA1xDSVVCHgQ=
github.com/go-redis/redis_rate/v9 v9.1.2/go.mod h1:oam2de2apSgRG8aJzwJddXbNu91Iyz1m8IKJE2vpvlQ=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
@ -359,6 +360,7 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c h1:qSHzRbhzK8RdXOsAdfDgO49TtqC1oZ+acxPrkfTxcCs=
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
@ -548,6 +550,7 @@ github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=

View File

@ -4,29 +4,22 @@ import (
"context"
"fmt"
"runtime"
"sync"
"go_dreamfactory/lego"
"go_dreamfactory/lego/base"
"go_dreamfactory/lego/core"
"go_dreamfactory/lego/core/cbase"
"go_dreamfactory/lego/sys/cron"
"go_dreamfactory/lego/sys/event"
"go_dreamfactory/lego/sys/log"
"go_dreamfactory/lego/sys/registry"
"go_dreamfactory/lego/sys/rpcx"
"go_dreamfactory/lego/utils/container/sortslice"
"go_dreamfactory/lego/utils/container/version"
"github.com/smallnest/rpcx/client"
)
type RPCXService struct {
cbase.ServiceBase //继承服务基类
cbase.ServiceBase //继承服务基类+
opts *Options //服务启动的配置信息数据
serverList sync.Map //集群服务会话管理列表对象
rpcxService base.IRPCXService //服务自身 通过接口可以实现上层服务类重构底层接口
IsInClustered bool //当前服务是否已加入到集群中
}
func (this *RPCXService) GetTag() string {
@ -89,12 +82,7 @@ func (this *RPCXService) InitSys() {
} else {
log.Infof("Sys event Init success !")
}
if err := registry.OnInit(this.opts.Setting.Sys["registry"], registry.SetService(this.rpcxService), registry.SetListener(this.rpcxService.(registry.IListener))); err != nil {
log.Panicf(fmt.Sprintf("Sys registry Init err:%v", err))
} else {
log.Infof("Sys registry Init success !")
}
if err := rpcx.OnInit(this.opts.Setting.Sys["rpcx"], rpcx.SetServiceId(this.GetId()), rpcx.SetPort(this.GetPort())); err != nil {
if err := rpcx.OnInit(this.opts.Setting.Sys["rpcx"], rpcx.SetServiceTag(this.GetTag()), rpcx.SetServiceId(this.GetId()), rpcx.SetServiceType(this.GetType()), rpcx.SetServiceVersion(this.GetVersion()), rpcx.SetServiceAddr(fmt.Sprintf("%s:%d", this.GetIp(), this.GetPort()))); err != nil {
log.Panicf(fmt.Sprintf("Sys rpcx Init err:%v", err))
} else {
log.Infof("Sys rpcx Init success !")
@ -103,9 +91,6 @@ func (this *RPCXService) InitSys() {
if err := rpcx.Start(); err != nil {
log.Panicf(fmt.Sprintf("Sys rpcx Start err:%v", err))
}
if err := registry.Start(); err != nil {
log.Panicf(fmt.Sprintf("Sys registry Start err:%v", err))
}
})
}
@ -126,142 +111,11 @@ func (this *RPCXService) Destroy() (err error) {
if err = rpcx.Stop(); err != nil {
return
}
if err = registry.Stop(); err != nil {
return
}
cron.Stop()
err = this.ServiceBase.Destroy()
return
}
//注册服务会话 当有新的服务加入时
func (this *RPCXService) FindServiceHandlefunc(node registry.ServiceNode) {
if _, ok := this.serverList.Load(node.Id); !ok {
if s, err := NewServiceSession(&node); err != nil {
log.Errorf("创建服务会话失败【%s】 err:%v", node.Id, err)
} else {
this.serverList.Store(node.Id, s)
}
}
if this.IsInClustered {
event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件
} else {
if node.Id == this.opts.Setting.Id { //发现自己 加入集群成功
this.IsInClustered = true
event.TriggerEvent(core.Event_RegistryStart)
}
}
}
//更新服务会话 当有新的服务加入时
func (this *RPCXService) UpDataServiceHandlefunc(node registry.ServiceNode) {
if ss, ok := this.serverList.Load(node.Id); ok { //已经在缓存中 需要更新节点信息
session := ss.(base.IRPCXServiceSession)
if session.GetRpcId() != node.RpcId {
if s, err := NewServiceSession(&node); err != nil {
log.Errorf("更新服务会话失败【%s】 err:%v", node.Id, err)
} else {
this.serverList.Store(node.Id, s)
}
event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件
} else {
if session.GetVersion() != node.Version {
session.SetVersion(node.Version)
}
if session.GetPreWeight() != node.PreWeight {
session.SetPreWeight(node.PreWeight)
}
event.TriggerEvent(core.Event_UpDataOldService, node) //触发发现新的服务事件
}
}
}
//注销服务会话
func (this *RPCXService) LoseServiceHandlefunc(sId string) {
session, ok := this.serverList.Load(sId)
if ok && session != nil {
session.(base.IRPCXServiceSession).Done()
this.serverList.Delete(sId)
}
event.TriggerEvent(core.Event_LoseService, sId) //触发发现新的服务事件
}
func (this *RPCXService) getServiceSessionByType(sType string, sIp string) (ss []base.IRPCXServiceSession, err error) {
ss = make([]base.IRPCXServiceSession, 0)
if nodes := registry.GetServiceByType(sType); nodes == nil {
log.Errorf("获取目标类型 type【%s】ip [%s] 服务集失败", sType, sIp)
return nil, err
} else {
if sIp == core.AutoIp {
for _, v := range nodes {
if s, ok := this.serverList.Load(v.Id); ok {
ss = append(ss, s.(base.IRPCXServiceSession))
} else {
s, err = NewServiceSession(v)
if err != nil {
log.Errorf("创建服务会话失败【%s】 err:%v", v.Id, err)
continue
} else {
this.serverList.Store(v.Id, s)
ss = append(ss, s.(base.IRPCXServiceSession))
}
}
}
} else {
for _, v := range nodes {
if v.IP == sIp {
if s, ok := this.serverList.Load(v.Id); ok {
ss = append(ss, s.(base.IRPCXServiceSession))
} else {
s, err = NewServiceSession(v)
if err != nil {
log.Errorf("创建服务会话失败【%s】 err:%v", v.Id, err)
continue
} else {
this.serverList.Store(v.Id, s)
ss = append(ss, s.(base.IRPCXServiceSession))
}
}
}
}
}
}
return
}
//默认路由规则
func (this *RPCXService) DefauleRpcRouteRules(stype string, sip string) (ss base.IRPCXServiceSession, err error) {
if s, e := this.getServiceSessionByType(stype, sip); e != nil {
return nil, e
} else {
ss := make([]interface{}, len(s))
for i, v := range s {
ss[i] = v
}
if len(ss) > 0 {
//排序找到最优服务
sortslice.Sort(ss, func(a interface{}, b interface{}) int8 {
as := a.(base.IRPCXServiceSession)
bs := b.(base.IRPCXServiceSession)
if iscompare := version.CompareStrVer(as.GetVersion(), bs.GetVersion()); iscompare != 0 {
return iscompare
} else {
if as.GetPreWeight() < bs.GetPreWeight() {
return 1
} else if as.GetPreWeight() > bs.GetPreWeight() {
return -1
} else {
return 0
}
}
})
return ss[0].(base.IRPCXServiceSession), nil
} else {
return nil, fmt.Errorf("未找到IP[%s]类型%s】的服务信息", sip, stype)
}
}
}
//注册服务对象
func (this *RPCXService) Register(rcvr interface{}) (err error) {
err = rpcx.Register(rcvr)
@ -281,65 +135,11 @@ func (this *RPCXService) RegisterFunctionName(name string, fn interface{}) (err
}
//同步 执行目标远程服务方法
func (this *RPCXService) RpcCallById(sId string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (err error) {
defer lego.Recover(fmt.Sprintf("RpcCallById sId:%s rkey:%v arg %v", sId, serviceMethod, args))
ss, ok := this.serverList.Load(sId)
if !ok {
if node, err := registry.GetServiceById(sId); err != nil {
log.Errorf("未找到目标服务【%s】节点 err:%v", sId, err)
return fmt.Errorf("No Found " + sId)
} else {
ss, err = NewServiceSession(node)
if err != nil {
return fmt.Errorf(fmt.Sprintf("创建服务会话失败【%s】 err:%v", sId, err))
} else {
this.serverList.Store(node.Id, ss)
}
}
}
err = ss.(base.IRPCXServiceSession).Call(ctx, serviceMethod, args, reply)
return
func (this *RPCXService) RpcCall(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
return rpcx.Call(ctx, servicePath, serviceMethod, args, reply)
}
//异步 执行目标远程服务方法
func (this *RPCXService) RpcGoById(sId string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (call *client.Call, err error) {
defer lego.Recover(fmt.Sprintf("RpcGoById sId:%s rkey:%v arg %v", sId, serviceMethod, args))
ss, ok := this.serverList.Load(sId)
if !ok {
if node, err := registry.GetServiceById(sId); err != nil {
log.Errorf("未找到目标服务【%s】节点 err:%v", sId, err)
return nil, fmt.Errorf("No Found " + sId)
} else {
ss, err = NewServiceSession(node)
if err != nil {
return nil, fmt.Errorf(fmt.Sprintf("创建服务会话失败【%s】 err:%v", sId, err))
} else {
this.serverList.Store(node.Id, ss)
}
}
}
call, err = ss.(base.IRPCXServiceSession).Go(ctx, serviceMethod, args, reply)
return
}
func (this *RPCXService) RpcCallByType(sType string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (err error) {
defer lego.Recover(fmt.Sprintf("RpcCallByType sType:%s rkey:%s arg %v", sType, serviceMethod, args))
ss, err := this.rpcxService.DefauleRpcRouteRules(sType, core.AutoIp)
if err != nil {
log.Errorf("未找到目标服务【%s】节点 err:%v", sType, err)
return err
}
err = ss.Call(ctx, serviceMethod, args, reply)
return
}
func (this *RPCXService) RpcGoByType(sType string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (call *client.Call, err error) {
defer lego.Recover(fmt.Sprintf("RpcCallByType sType:%s rkey:%s arg %v", sType, serviceMethod, args))
ss, err := this.rpcxService.DefauleRpcRouteRules(sType, core.AutoIp)
if err != nil {
log.Errorf("未找到目标服务【%s】节点 err:%v", sType, err)
return nil, err
}
call, err = ss.Go(ctx, serviceMethod, args, reply)
return
func (this *RPCXService) RpcGo(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (call *client.Call, err error) {
return rpcx.Go(ctx, servicePath, serviceMethod, args, reply, nil)
}

View File

@ -1,64 +0,0 @@
package rpcx
import (
"context"
"fmt"
"go_dreamfactory/lego/base"
"go_dreamfactory/lego/sys/registry"
"go_dreamfactory/lego/sys/rpcx"
"github.com/smallnest/rpcx/client"
)
func NewServiceSession(node *registry.ServiceNode) (ss base.IRPCXServiceSession, err error) {
session := new(ServiceSession)
session.node = node
session.client, err = rpcx.NewRpcClient(fmt.Sprintf("%s:%d", node.IP, node.Port), node.Id)
ss = session
return
}
type ServiceSession struct {
node *registry.ServiceNode
client rpcx.IRPCXClient
}
func (this *ServiceSession) GetId() string {
return this.node.Id
}
func (this *ServiceSession) GetIp() string {
return this.node.IP
}
func (this *ServiceSession) GetRpcId() string {
return this.node.RpcId
}
func (this *ServiceSession) GetType() string {
return this.node.Type
}
func (this *ServiceSession) GetVersion() string {
return this.node.Version
}
func (this *ServiceSession) SetVersion(v string) {
this.node.Version = v
}
func (this *ServiceSession) GetPreWeight() float64 {
return this.node.PreWeight
}
func (this *ServiceSession) SetPreWeight(p float64) {
this.node.PreWeight = p
}
func (this *ServiceSession) Done() {
this.client.Stop()
}
func (this *ServiceSession) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error {
return this.client.Call(ctx, serviceMethod, args, reply)
}
func (this *ServiceSession) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (*client.Call, error) {
return this.client.Go(ctx, serviceMethod, args, reply, nil)
}

View File

@ -2,30 +2,102 @@ package rpcx
import (
"context"
"errors"
"strings"
"github.com/smallnest/rpcx/client"
"github.com/smallnest/rpcx/share"
)
func newClient(addr string, sId string) (c *Client, err error) {
c = &Client{}
d, err := client.NewPeer2PeerDiscovery("tcp@"+addr, "")
c.xclient = client.NewXClient(sId, client.Failfast, client.RandomSelect, d, client.DefaultOption)
func newClient(rpcx *RPCX) (c *Client) {
c = &Client{
rpcx: rpcx,
clients: make(map[string]client.XClient),
// msgChan: make(chan *protocol.Message, 1000),
}
return
}
type Client struct {
xclient client.XClient
rpcx *RPCX
clients map[string]client.XClient
// msgChan chan *protocol.Message // 接收rpcXServer推送消息
}
// DoMessage 服务端消息处理
func (this *Client) DoMessage() {
// for msg := range this.msgChan {
// }
}
func (this *Client) Start() (err error) {
return
}
func (this *Client) Stop() (err error) {
err = this.xclient.Close()
for _, v := range this.clients {
v.Close()
}
return
}
func (this *Client) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) {
err = this.xclient.Call(ctx, string(serviceMethod), args, reply)
//同步调用
func (this *Client) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
if servicePath == "" {
err = errors.New("servicePath no cant null")
return
}
var (
spath []string
d *client.ConsulDiscovery
c client.XClient
ok bool
)
spath = strings.Split(servicePath, "/")
if c, ok = this.clients[spath[0]]; !ok {
if d, err = client.NewConsulDiscovery(this.rpcx.options.ServiceTag, this.rpcx.options.ServiceType, this.rpcx.options.ConsulServers, nil); err != nil {
return
}
c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption)
c.SetSelector(newSelector())
this.clients[spath[0]] = c
}
ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{
CallRoutRulesKey: servicePath,
ServiceAddrKey: "tcp@" + this.rpcx.options.ServiceAddr,
ServiceMetaKey: this.rpcx.metadata,
})
err = c.Call(ctx, serviceMethod, args, reply)
return
}
func (this *Client) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (*client.Call, error) {
return this.xclient.Go(ctx, string(serviceMethod), args, reply, done)
//异步调用
func (this *Client) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) {
// return this.xclient.Go(ctx, string(serviceMethod), args, reply, done)
if servicePath == "" {
err = errors.New("servicePath no cant null")
return
}
var (
spath []string
d *client.ConsulDiscovery
c client.XClient
ok bool
)
spath = strings.Split(servicePath, "/")
if c, ok = this.clients[spath[0]]; !ok {
if d, err = client.NewConsulDiscovery(this.rpcx.options.ServiceTag, this.rpcx.options.ServiceType, this.rpcx.options.ConsulServers, nil); err != nil {
return
}
c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption)
c.SetSelector(newSelector())
this.clients[spath[0]] = c
}
ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{
CallRoutRulesKey: servicePath,
ServiceAddrKey: "tcp@" + this.rpcx.options.ServiceAddr,
ServiceMetaKey: this.rpcx.metadata,
})
return c.Go(ctx, string(serviceMethod), args, reply, done)
}

View File

@ -2,16 +2,23 @@ package rpcx
import (
"context"
"fmt"
"net/url"
"github.com/smallnest/rpcx/client"
)
const (
ServiceMetaKey = "smeta"
ServiceAddrKey = "addr"
CallRoutRulesKey = "callrules"
)
type (
ISys interface {
IRPCXServer
NewRpcClient(addr, sId string) (clent IRPCXClient, err error)
IRPCXClient
}
IRPCXServer interface {
Start() (err error)
Stop() (err error)
@ -22,9 +29,10 @@ type (
}
IRPCXClient interface {
Start() (err error)
Stop() (err error)
Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (*client.Call, error)
Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error)
Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error)
}
)
@ -64,6 +72,51 @@ func UnregisterAll() (err error) {
return defsys.UnregisterAll()
}
func NewRpcClient(addr, sId string) (clent IRPCXClient, err error) {
return defsys.NewRpcClient(addr, sId)
func Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
return defsys.Call(ctx, servicePath, serviceMethod, args, reply)
}
func Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) {
return defsys.Go(ctx, servicePath, serviceMethod, args, reply, done)
}
//服务元数据转服务节点信息
func smetaToServiceNode(meta string) (node *ServiceNode, err error) {
if meta == "" {
fmt.Errorf("meta is nill")
return
}
node = &ServiceNode{}
data := make(map[string]string)
metadata, _ := url.ParseQuery(meta)
for k, v := range metadata {
if len(v) > 0 {
data[k] = v[0]
}
}
if sid, ok := data["sid"]; !ok {
err = fmt.Errorf("no found sid")
return
} else {
node.ServiceId = sid
}
if stype, ok := data["stype"]; !ok {
err = fmt.Errorf("no found stype")
return
} else {
node.ServiceType = stype
}
if version, ok := data["version"]; !ok {
err = fmt.Errorf("no found version")
return
} else {
node.Version = version
}
if addr, ok := data["addr"]; !ok {
err = fmt.Errorf("no found addr")
return
} else {
node.ServiceAddr = addr
}
return
}

View File

@ -3,29 +3,65 @@ package rpcx
import (
"go_dreamfactory/lego/sys/log"
"go_dreamfactory/lego/utils/mapstructure"
)
"github.com/smallnest/rpcx/client"
type RpcxStartType int8
const (
RpcxStartByService RpcxStartType = iota //启动服务端
RpcxStartByClient //启动客户端
RpcxStartByAll //服务端客户端都启动
)
type Option func(*Options)
type Options struct {
ServiceTag string //集群标签
ServiceType string //服务类型
ServiceId string //服务id
Port int //监听地址
FailMode client.FailMode //失败模式
ServiceVersion string //服务版本
ServiceAddr string //服务地址
ConsulServers []string //Consul集群服务地址
RpcxStartType RpcxStartType //Rpcx启动类型
Debug bool //日志是否开启
Log log.ILog
}
func SetServiceTag(v string) Option {
return func(o *Options) {
o.ServiceTag = v
}
}
func SetServiceType(v string) Option {
return func(o *Options) {
o.ServiceType = v
}
}
func SetServiceId(v string) Option {
return func(o *Options) {
o.ServiceId = v
}
}
func SetPort(v int) Option {
func SetServiceVersion(v string) Option {
return func(o *Options) {
o.Port = v
o.ServiceVersion = v
}
}
func SetServiceAddr(v string) Option {
return func(o *Options) {
o.ServiceAddr = v
}
}
func SetConsulServers(v []string) Option {
return func(o *Options) {
o.ConsulServers = v
}
}
func SetDebug(v bool) Option {
return func(o *Options) {
o.Debug = v

View File

@ -1,40 +1,60 @@
package rpcx
import (
"context"
"fmt"
"github.com/smallnest/rpcx/client"
)
func newSys(options Options) (sys *RPCX, err error) {
sys = &RPCX{
options: options,
service: newService(options),
metadata: fmt.Sprintf("stype=%s&sid=%s&version=%s&addr=%s", options.ServiceType, options.ServiceId, options.ServiceVersion, "tcp@"+options.ServiceAddr),
}
sys.service = newService(sys)
sys.client = newClient(sys)
// if options.RpcxStartType == RpcxStartByAll || options.RpcxStartType == RpcxStartByService { //创建RPCX 服务端
// }
// if options.RpcxStartType == RpcxStartByAll || options.RpcxStartType == RpcxStartByClient { //创建RPCX 客户端
// }
return
}
type RPCX struct {
options Options
metadata string
service IRPCXServer
client IRPCXClient
}
func (this *RPCX) Start() (err error) {
this.service.Start()
this.client.Start()
return
}
func (this *RPCX) Stop() (err error) {
err = this.service.Stop()
this.service.Stop()
this.client.Stop()
return
}
func (this *RPCX) Register(rcvr interface{}) (err error) {
err = this.service.Register(rcvr)
this.service.Register(rcvr)
return
}
func (this *RPCX) RegisterFunction(fn interface{}) (err error) {
err = this.service.RegisterFunction(fn)
this.service.RegisterFunction(fn)
return
}
func (this *RPCX) RegisterFunctionName(name string, fn interface{}) (err error) {
err = this.service.RegisterFunctionName(name, fn)
this.service.RegisterFunctionName(name, fn)
return
}
@ -42,6 +62,54 @@ func (this *RPCX) UnregisterAll() (err error) {
return this.service.UnregisterAll()
}
func (this *RPCX) NewRpcClient(addr, sId string) (clent IRPCXClient, err error) {
return newClient(addr, sId)
//同步调用
func (this *RPCX) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) (err error) {
return this.client.Call(ctx, servicePath, serviceMethod, args, reply)
}
//异步调用
func (this *RPCX) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (call *client.Call, err error) {
return this.client.Go(ctx, servicePath, serviceMethod, args, reply, done)
}
// func (this *RPCX) PostCall(ctx context.Context, serviceName, methodName string, args, reply interface{}) (interface{}, error) {
// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// fmt.Printf("PostCall servicePath:%v serviceMethod:%v RemoteAddr:%v \n", serviceName, methodName, clientConn.RemoteAddr().String())
// return args, nil
// }
///日志***********************************************************************
func (this *RPCX) Debug() bool {
return this.options.Debug
}
func (this *RPCX) Debugf(format string, a ...interface{}) {
if this.options.Debug {
this.options.Log.Debugf("[SYS RPCX] "+format, a...)
}
}
func (this *RPCX) Infof(format string, a ...interface{}) {
if this.options.Debug {
this.options.Log.Infof("[SYS RPCX] "+format, a...)
}
}
func (this *RPCX) Warnf(format string, a ...interface{}) {
if this.options.Debug {
this.options.Log.Warnf("[SYS RPCX] "+format, a...)
}
}
func (this *RPCX) Errorf(format string, a ...interface{}) {
if this.options.Debug {
this.options.Log.Errorf("[SYS RPCX] "+format, a...)
}
}
func (this *RPCX) Panicf(format string, a ...interface{}) {
if this.options.Debug {
this.options.Log.Panicf("[SYS RPCX] "+format, a...)
}
}
func (this *RPCX) Fatalf(format string, a ...interface{}) {
if this.options.Debug {
this.options.Log.Fatalf("[SYS RPCX] "+format, a...)
}
}

178
lego/sys/rpcx/rpcx_test.go Normal file
View File

@ -0,0 +1,178 @@
package rpcx
import (
"context"
"fmt"
"net"
"os"
"os/signal"
"syscall"
"testing"
"time"
"go_dreamfactory/lego/sys/log"
"github.com/rcrowley/go-metrics"
"github.com/smallnest/rpcx/client"
"github.com/smallnest/rpcx/protocol"
"github.com/smallnest/rpcx/server"
"github.com/smallnest/rpcx/serverplugin"
"github.com/smallnest/rpcx/share"
)
func Test_Sys(t *testing.T) {
if err := log.OnInit(nil); err != nil {
fmt.Printf("err:%v", err)
return
}
if sys, err := NewSys(
SetServiceTag("rpcx_test"),
SetServiceType("worker"),
SetServiceId("worker_1"),
SetServiceVersion("1.0.0"),
SetServiceAddr("127.0.0.1:9978"),
SetConsulServers([]string{"10.0.0.9:8500"}),
); err != nil {
fmt.Printf("err:%v", err)
return
} else {
if err = sys.Register(new(Arith)); err != nil {
fmt.Printf("err:%v", err)
return
}
if err = sys.Start(); err != nil {
fmt.Printf("err:%v", err)
return
}
go func() {
time.Sleep(time.Second * 3)
if err = sys.Call(context.Background(), "worker/worker_1", "Mul", &Args{A: 1, B: 2}, &Reply{}); err != nil {
fmt.Printf("Call:%v \n", err)
return
}
}()
}
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigterm:
fmt.Printf("terminating: via signal\n")
}
}
var addr = "127.0.0.1:9978"
// go server.go
func Test_RPCX(t *testing.T) {
s := server.NewServer()
if err := addRegistryPlugin(s); err != nil {
fmt.Printf("err:%v", err)
return
}
go func() {
time.Sleep(time.Second)
s.RegisterName("worker", new(Arith), "stype=worker&sid=worker_1&version=1.0.0")
}()
go func() {
time.Sleep(time.Second * 3)
if d, err := client.NewConsulDiscovery("rpcx_test", "worker", []string{"10.0.0.9:8500"}, nil); err != nil {
fmt.Printf("NewConsulDiscovery err:%v", err)
return
} else {
xclient := client.NewXClient("worker", client.Failfast, client.RandomSelect, d, client.DefaultOption)
xclient.SetSelector(newSelector())
ctx := context.WithValue(context.Background(), share.ReqMetaDataKey, map[string]string{"RoutRules": "worker/worker_1"})
if err = xclient.Call(ctx, "Mul", &Args{A: 1, B: 2}, &Reply{}); err != nil {
fmt.Printf("Call:%v \n", err)
return
}
}
}()
s.Serve("tcp", addr)
}
func addRegistryPlugin(s *server.Server) (err error) {
r := &serverplugin.ConsulRegisterPlugin{
ServiceAddress: "tcp@" + addr,
ConsulServers: []string{"10.0.0.9:8500"},
BasePath: "rpcx_test",
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
err = r.Start()
if err != nil {
return
}
s.Plugins.Add(r)
s.Plugins.Add(&call{})
return
}
type call struct{}
// func (this *call) PostCall(ctx context.Context, serviceName, methodName string, args, reply interface{}) (interface{}, error) {
// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// RoutRules := ctx.Value("RoutRules")
// fmt.Printf("PostCall servicePath:%v serviceMethod:%v RoutRules:%s RemoteAddr:%v \n", serviceName, methodName, RoutRules, clientConn.RemoteAddr().String())
// return args, nil
// }
// func (this *call) PreCall(ctx context.Context, serviceName, methodName string, args interface{}) (interface{}, error) {
// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// RoutRules := ctx.Value("RoutRules").(string)
// fmt.Printf("PostCall servicePath:%v serviceMethod:%v RoutRules:%s RemoteAddr:%v \n", serviceName, methodName, RoutRules, clientConn.RemoteAddr().String())
// return args, nil
// }
// func (this *call) PreReadRequest(ctx context.Context) error {
// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// RoutRules := ctx.Value(share.ReqMetaDataKey).(map[string]string)
// fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String())
// return nil
// }
// func (this *call) PreWriteResponse(ctx context.Context, args *protocol.Message, repy *protocol.Message, errInter error) error {
// clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// RoutRules := ctx.Value("RoutRules").(string)
// fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String())
// return nil
// }
func (this *call) PreHandleRequest(ctx context.Context, r *protocol.Message) error {
clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
RoutRules := ctx.Value(share.ReqMetaDataKey).(map[string]string)
fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String())
return nil
}
type Args struct {
A int
B int
}
type Reply struct {
C int
}
type Arith int
func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
reply.C = args.A * args.B
fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C)
return nil
}
func (t *Arith) Add(ctx context.Context, args *Args, reply *Reply) error {
reply.C = args.A + args.B
fmt.Printf("call: %d + %d = %d\n", args.A, args.B, reply.C)
return nil
}
func (t *Arith) Say(ctx context.Context, args *string, reply *string) error {
*reply = "hello " + *args
return nil
}

75
lego/sys/rpcx/selector.go Normal file
View File

@ -0,0 +1,75 @@
package rpcx
import (
"context"
"fmt"
"strings"
"github.com/smallnest/rpcx/share"
)
func newSelector() *Selector {
return &Selector{
servers: make(map[string]*ServiceNode),
serversType: make(map[string][]*ServiceNode),
i: make(map[string]int),
}
}
type ServiceNode struct {
ServiceId string `json:"sid"` //服务id
ServiceType string `json:"stype"` //服务类型
Version string `json:"version"` //服务版本
ServiceAddr string `json:"addr"` //服务地址
}
type Selector struct {
servers map[string]*ServiceNode
serversType map[string][]*ServiceNode
i map[string]int
}
///servicePath = [stype] or [stype/sid]
func (this *Selector) Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string {
fmt.Printf("Select servicePath:%v serviceMethod:%v RoutRules:%v \n", servicePath, serviceMethod, ctx.Value("RoutRules"))
routrules := ctx.Value(share.ReqMetaDataKey).(map[string]string)["RoutRules"]
service := strings.Split(routrules, "/")
leng := len(service)
if leng == 1 {
if nodes, ok := this.serversType[service[0]]; ok {
i, ok := this.i[service[0]]
if !ok {
i = 0
}
i = i % len(nodes)
this.i[service[0]] = i + 1
return nodes[i].ServiceAddr
}
} else if leng == 2 {
if node, ok := this.servers[service[1]]; ok {
return node.ServiceAddr
}
}
return ""
}
func (this *Selector) UpdateServer(servers map[string]string) {
ss := make(map[string]*ServiceNode)
sst := make(map[string][]*ServiceNode)
for _, v := range servers {
if node, err := smetaToServiceNode(v); err != nil {
continue
} else {
ss[node.ServiceId] = node
if ssts, ok := sst[node.ServiceType]; !ok {
sst[node.ServiceType] = make([]*ServiceNode, 0)
sst[node.ServiceType] = append(sst[node.ServiceType], node)
} else {
ssts = append(ssts, node)
}
}
}
this.servers = ss
this.serversType = sst
}

View File

@ -1,26 +1,49 @@
package rpcx
import (
"fmt"
"context"
"net"
"time"
"github.com/rcrowley/go-metrics"
"github.com/smallnest/rpcx/client"
"github.com/smallnest/rpcx/server"
"github.com/smallnest/rpcx/serverplugin"
)
func newService(options Options) (s *Service) {
func newService(rpcx *RPCX) (s *Service) {
s = &Service{
server: server.NewServer(),
options: options,
rpcx: rpcx,
// clients: make(map[string]net.Conn),
// clientmeta: make(map[string]string),
}
return
}
type Service struct {
rpcx *RPCX
server *server.Server
options Options
selector client.Selector
clients map[string]net.Conn
clientmeta map[string]string
}
func (this *Service) Start() (err error) {
go this.server.Serve("tcp", fmt.Sprintf(":%d", this.options.Port))
r := &serverplugin.ConsulRegisterPlugin{
ServiceAddress: "tcp@" + this.rpcx.options.ServiceAddr,
ConsulServers: []string{"10.0.0.9:8500"},
BasePath: this.rpcx.options.ServiceAddr,
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
if err = r.Start(); err != nil {
return
}
this.server.Plugins.Add(r)
go func() {
this.server.Serve("tcp", this.rpcx.options.ServiceAddr)
}()
return
}
@ -30,18 +53,80 @@ func (this *Service) Stop() (err error) {
}
func (this *Service) Register(rcvr interface{}) (err error) {
err = this.server.RegisterName(this.options.ServiceId, rcvr, "")
err = this.server.RegisterName(this.rpcx.options.ServiceType, rcvr, this.rpcx.metadata)
return
}
func (this *Service) RegisterFunction(fn interface{}) (err error) {
err = this.server.RegisterFunction(this.options.ServiceId, fn, "")
err = this.server.RegisterFunction(this.rpcx.options.ServiceType, fn, this.rpcx.metadata)
return
}
func (this *Service) RegisterFunctionName(name string, fn interface{}) (err error) {
err = this.server.RegisterFunctionName(this.options.ServiceId, name, fn, "")
err = this.server.RegisterFunctionName(this.rpcx.options.ServiceType, name, fn, this.rpcx.metadata)
return
}
func (this *Service) UnregisterAll() (err error) {
err = this.server.UnregisterAll()
return
}
//同步调用
func (this *Service) Call(servicePath string, ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) {
// var (
// spath string
// clientaddr string
// conn net.Conn
// ok bool
// )
// if servicePath == "" {
// err = errors.New("servicePath no cant null")
// return
// }
// spath := strings.Split(servicePath, "/")
// ctx = context.WithValue(ctx, share.ReqMetaDataKey, map[string]string{
// CallRoutRulesKey: servicePath,
// ServiceAddrKey: "tcp@" + this.options.ServiceAddr,
// ServiceMetaKey: this.metadata,
// })
// if clientaddr = this.selector.Select(ctx, spath[0], serviceMethod, args); clientaddr == "" {
// err = fmt.Errorf("on found routRules:%s", routRules)
// return
// }
// if conn, ok = this.clients[clientaddr]; !ok {
// err = fmt.Errorf("on found clientaddr:%s", clientaddr)
// return
// }
// err := this.server.SendMessage(conn, spath[0], serviceMethod, nil, []byte("abcde")){
// }
return
}
//异步调用
func (this *Service) Go(routRules string, ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) {
return
}
//发现服务
func (this *Service) Discovery(addr string, conn net.Conn, meta string) {
this.clientmeta[addr] = meta
this.clients[addr] = conn
this.selector.UpdateServer(this.clientmeta)
}
// //监听客户端链接到服务上 保存客户端的连接对象
// func (this *Service) PreHandleRequest(ctx context.Context, r *protocol.Message) error {
// if smeta, ok := ctx.Value(share.ReqMetaDataKey).(map[string]string)[ServiceAddrKey]; ok {
// // log.Errorf("smeta:%s err:%v", smeta, ok)
// if node, err := smetaToServiceNode(smeta); err == nil {
// if _, ok = this.clientmeta[node.ServiceId]; !ok {
// this.clientmeta[node.ServiceId] = smeta
// }
// }
// }
// // clientConn := ctx.Value(server.RemoteConnContextKey).(net.Conn)
// // fmt.Printf("PreReadRequest RoutRules:%s RemoteAddr:%v \n", RoutRules, clientConn.RemoteAddr().String())
// return nil
// }

View File

@ -30,6 +30,7 @@ type MComp_GateComp struct {
service base.IRPCXService //rpc服务对象
module core.IModule //当前业务模块
comp core.IModuleComp //网关组件自己
scomp comm.ISC_GateRouteComp
}
//组件初始化接口
@ -51,54 +52,52 @@ func (this *MComp_GateComp) Start() (err error) {
if comp, err = this.service.GetComp(comm.SC_ServiceGateRouteComp); err != nil {
return
}
this.suitableMethods(comp.(comm.ISC_GateRouteComp), reflect.TypeOf(this.comp))
this.scomp = comp.(comm.ISC_GateRouteComp)
this.suitableMethods(reflect.TypeOf(this.comp))
return
}
//反射注册相关接口道services/comp_gateroute.go 对象中
func (this *MComp_GateComp) suitableMethods(scomp comm.ISC_GateRouteComp, typ reflect.Type) {
func (this *MComp_GateComp) suitableMethods(typ reflect.Type) {
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
this.reflectionRouteHandle(method)
}
}
//反射路由处理函数
func (this *MComp_GateComp) reflectionRouteHandle(method reflect.Method) bool {
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
continue
return false
}
// Method needs four ins: receiver, context.Context, *args, *reply.
if mtype.NumIn() != 4 {
continue
return false
}
// First arg must be context.Context
ctxType := mtype.In(1)
if !ctxType.Implements(typeOfContext) {
continue
return false
}
// Second arg need not be a pointer.
argType := mtype.In(2)
if !argType.Implements(typeOfSession) {
continue
return false
}
// Third arg must be a pointer.
replyType := mtype.In(3)
if replyType.Kind() != reflect.Ptr {
continue
return false
}
// Reply type must be exported.
if !this.isExportedOrBuiltinType(replyType) {
continue
return false
}
// Method needs one out.
if mtype.NumOut() != 1 {
continue
return false
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
continue
}
scomp.RegisterRoute(fmt.Sprintf("%s.%s", this.module.GetType(), strings.ToLower(mname)), reflect.ValueOf(this.comp), replyType, method)
return false
}
this.scomp.RegisterRoute(fmt.Sprintf("%s.%s", this.module.GetType(), strings.ToLower(mname)), reflect.ValueOf(this.comp), replyType, method)
return true
}
func (this *MComp_GateComp) isExportedOrBuiltinType(t reflect.Type) bool {

View File

@ -62,6 +62,7 @@ locp:
go this.Close()
break locp
}
if err = proto.Unmarshal(data, msg); err != nil {
log.Errorf("agent:%s uId:%s Unmarshal err:%v", this.sessionId, this.uId, err)
go this.Close()

View File

@ -10,7 +10,7 @@ import (
)
//参数校验
func (this *Api_Comp) getlist_check(ctx context.Context, session comm.IUserSession, req *pb.GetlistReq) (code pb.ErrorCode) {
func (this *Api_Comp) Getlist_Check(ctx context.Context, session comm.IUserSession, req *pb.GetlistReq) (code pb.ErrorCode) {
if !session.IsLogin() {
code = pb.ErrorCode_NoLogin
return
@ -36,7 +36,7 @@ func (this *Api_Comp) Getlist(ctx context.Context, session comm.IUserSession, re
}()
}
}()
if code = this.getlist_check(ctx, session, req); code != pb.ErrorCode_Success {
if code = this.Getlist_Check(ctx, session, req); code != pb.ErrorCode_Success {
return
}
if pack, err = cache.Defsys.Pack_QueryUserPack(session.GetUserId()); err != nil {

View File

@ -7,7 +7,7 @@ import (
)
//参数校验
func (this *Api_Comp) sellItem_check(ctx context.Context, session comm.IUserSession, req *pb.SellItemReq) (code pb.ErrorCode) {
func (this *Api_Comp) SellItem_Check(ctx context.Context, session comm.IUserSession, req *pb.SellItemReq) (code pb.ErrorCode) {
if !session.IsLogin() {
code = pb.ErrorCode_NoLogin
return
@ -23,7 +23,7 @@ func (this *Api_Comp) SellItem(ctx context.Context, session comm.IUserSession, r
defer func() {
session.SendMsg(string(this.module.GetType()), SellItemResp, code, &pb.SellItemResp{})
}()
if code = this.sellItem_check(ctx, session, req); code != pb.ErrorCode_Success {
if code = this.SellItem_Check(ctx, session, req); code != pb.ErrorCode_Success {
return
}
return

View File

@ -7,7 +7,7 @@ import (
)
//参数校验
func (this *Api_Comp) useitem_check(ctx context.Context, session comm.IUserSession, req *pb.UseItemReq) (code pb.ErrorCode) {
func (this *Api_Comp) Useitem_Check(ctx context.Context, session comm.IUserSession, req *pb.UseItemReq) (code pb.ErrorCode) {
if !session.IsLogin() {
code = pb.ErrorCode_NoLogin
return
@ -23,7 +23,7 @@ func (this *Api_Comp) Useitem(ctx context.Context, session comm.IUserSession, re
defer func() {
session.SendMsg(string(this.module.GetType()), UseItemResp, code, &pb.UseItemResp{})
}()
if code = this.useitem_check(ctx, session, req); code != pb.ErrorCode_Success {
if code = this.Useitem_Check(ctx, session, req); code != pb.ErrorCode_Success {
return
}
return

View File

@ -2,6 +2,7 @@ package services
import (
"context"
"fmt"
"go_dreamfactory/comm"
"go_dreamfactory/pb"
"reflect"
@ -36,6 +37,7 @@ type SComp_GateRouteComp struct {
cbase.ServiceCompBase
service base.IRPCXService //rpc服务对象 通过这个对象可以发布服务和调用其他服务的接口
mrlock sync.RWMutex //msghandles 对象的锁
msgcheck map[string]*msghandle //处理函数的校验接口
msghandles map[string]*msghandle //处理函数的管理对象
}
@ -56,6 +58,15 @@ func (this *SComp_GateRouteComp) Init(service core.IService, comp core.IServiceC
func (this *SComp_GateRouteComp) Start() (err error) {
this.service.RegisterFunctionName(string(comm.Rpc_GatewayRoute), this.ReceiveMsg) //注册网关路由接收接口
err = this.ServiceCompBase.Start()
for k, v := range this.msghandles {
if v1, ok := this.msgcheck[k]; !ok {
err = fmt.Errorf("注册用户消息处理函数:%s 没有实现参数校验接口", k)
return
} else if v.msgType != v1.msgType {
err = fmt.Errorf("注册用户消息处理函数:%s 实现参数校验接口不一致 请检查代码!", k)
return
}
}
return
}
@ -78,11 +89,31 @@ func (this *SComp_GateRouteComp) RegisterRoute(methodName string, comp reflect.V
this.mrlock.Unlock()
}
//业务模块注册用户消息处理路由
func (this *SComp_GateRouteComp) RegisterRouteCheck(methodName string, comp reflect.Value, msg reflect.Type, fn reflect.Method) {
log.Debugf("注册用户路由校验[%s]", methodName)
this.mrlock.RLock()
_, ok := this.msgcheck[methodName]
this.mrlock.RUnlock()
if ok {
log.Errorf("重复 注册用户路由校验[%s]", methodName)
return
}
this.mrlock.Lock()
this.msgcheck[methodName] = &msghandle{
rcvr: comp,
msgType: msg,
fn: fn,
}
this.mrlock.Unlock()
}
//Rpc_GatewayRoute服务接口的接收函数
func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.AgentMessage, reply *pb.RPCMessageReply) error {
log.Debugf("SComp_GateRouteComp ReceiveMsg agent:%s uId:%s MessageDistribution msg:%s", args.UserSessionId, args.UserId, args.Method)
this.mrlock.RLock()
msghandle, ok := this.msghandles[args.Method]
msgcheck := this.msghandles[args.Method]
this.mrlock.RUnlock()
if ok {
session := comm.NewUserSession(this.service, args.Ip, args.UserSessionId, args.GatewayServiceId, args.UserId)
@ -91,6 +122,7 @@ func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.AgentM
log.Errorf("UserMessage:%s Unmarshal err:%v", args.Method, err)
return err
}
msghandle.fn.Func.Call([]reflect.Value{msgcheck.rcvr, reflect.ValueOf(ctx), reflect.ValueOf(session), reflect.ValueOf(msg)})
msghandle.fn.Func.Call([]reflect.Value{msghandle.rcvr, reflect.ValueOf(ctx), reflect.ValueOf(session), reflect.ValueOf(msg)})
} else {
reply.Code = pb.ErrorCode_ReqParameterError