上传rpcx底层服务发现机制优化,以及自定义选择器实现

This commit is contained in:
liwei1dao 2022-06-13 10:09:28 +08:00
parent 33436e7c98
commit 5b419e3ef6
7 changed files with 46 additions and 29 deletions

View File

@ -60,8 +60,8 @@ type IUserSession interface {
GetIP() string GetIP() string
GetGatewayServiceId() string GetGatewayServiceId() string
IsLogin() bool IsLogin() bool
Build(uid string) (err error) Bind(uid string, wokerId string) (err error)
UnBuild(ServiceMethod string, msg proto.Message) (err error) UnBind() (err error)
SendMsg(mainType, subType string, code pb.ErrorCode, msg proto.Message) (err error) SendMsg(mainType, subType string, code pb.ErrorCode, msg proto.Message) (err error)
Close() (err error) Close() (err error)
ToString() string ToString() string

View File

@ -58,25 +58,27 @@ func (this *UserSession) IsLogin() bool {
return this.UserId != "" return this.UserId != ""
} }
//绑定uid 登录后操作 ///绑定uid 登录后操作
func (this *UserSession) Build(uid string) (err error) { ///uid 用户id
///wokerId 用户绑定worker服务id
func (this *UserSession) Bind(uid string, wokerId string) (err error) {
reply := &pb.RPCMessageReply{} reply := &pb.RPCMessageReply{}
if err := this.service.RpcCallById(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId),, string(Rpc_GatewayAgentBuild), context.Background(), &pb.AgentBuildReq{ if err := this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentBuild), &pb.AgentBuildReq{
UserSessionId: this.SessionId, UserSessionId: this.SessionId,
UserId: uid, UserId: uid,
}, reply); err != nil { }, reply); err != nil {
log.Errorf("UserSession:%s UserId:%s Build:%s err:%v", this.SessionId, this.UserId, err) log.Errorf("Bind UserSession:%s UserId:%s err:%v", this.SessionId, this.UserId, err)
} }
return return
} }
//解绑uid 注销和切换账号是处理 //解绑uid 注销和切换账号是处理
func (this *UserSession) UnBuild(ServiceMethod string, msg proto.Message) (err error) { func (this *UserSession) UnBind() (err error) {
reply := &pb.RPCMessageReply{} reply := &pb.RPCMessageReply{}
if err := this.service.RpcCallById(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId),, string(Rpc_GatewayAgentUnBuild), context.Background(), &pb.AgentUnBuildReq{ if err := this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentUnBuild), &pb.AgentUnBuildReq{
UserSessionId: this.SessionId, UserSessionId: this.SessionId,
}, reply); err != nil { }, reply); err != nil {
log.Errorf("UserSession:%s UserId:%s UnBuild err:%v", this.SessionId, this.UserId, err) log.Errorf("UnBuild UserSession:%s UserId:%s err:%v", this.SessionId, this.UserId, err)
} }
return return
} }
@ -93,7 +95,7 @@ func (this *UserSession) SendMsg(mainType, subType string, code pb.ErrorCode, ms
Code: code, Code: code,
Data: data, Data: data,
}, reply); err != nil { }, reply); err != nil {
log.Errorf("UserSession:%s UserId:%s SendMsg:%s err:%v", this.SessionId, this.UserId, mainType, err) log.Errorf("SendMsg:%s UserSession:%s UserId:%s err:%v", mainType, this.SessionId, this.UserId, err)
} }
return return
} }
@ -104,7 +106,7 @@ func (this *UserSession) Close() (err error) {
if err := this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentSendMsg), &pb.AgentCloseeReq{ if err := this.service.RpcCall(context.Background(), fmt.Sprintf("%s/%s", Service_Gateway, this.GatewayServiceId), string(Rpc_GatewayAgentSendMsg), &pb.AgentCloseeReq{
UserSessionId: this.SessionId, UserSessionId: this.SessionId,
}, reply); err != nil { }, reply); err != nil {
log.Errorf("UserSession:%s UserId:%d Close:%s err:%v", this.SessionId, this.UserId, err) log.Errorf("Close UserSession:%s UserId:%d err:%v", this.SessionId, this.UserId, err)
} }
return return
} }

View File

@ -56,7 +56,7 @@ func (this *Client) Call(ctx context.Context, servicePath string, serviceMethod
) )
spath = strings.Split(servicePath, "/") spath = strings.Split(servicePath, "/")
if c, ok = this.clients[spath[0]]; !ok { if c, ok = this.clients[spath[0]]; !ok {
if d, err = client.NewConsulDiscovery(this.rpcx.options.ServiceTag, this.rpcx.options.ServiceType, this.rpcx.options.ConsulServers, nil); err != nil { if d, err = client.NewConsulDiscovery(this.rpcx.options.ServiceTag, spath[0], this.rpcx.options.ConsulServers, nil); err != nil {
return return
} }
c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption) c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption)
@ -87,7 +87,7 @@ func (this *Client) Go(ctx context.Context, servicePath string, serviceMethod st
) )
spath = strings.Split(servicePath, "/") spath = strings.Split(servicePath, "/")
if c, ok = this.clients[spath[0]]; !ok { if c, ok = this.clients[spath[0]]; !ok {
if d, err = client.NewConsulDiscovery(this.rpcx.options.ServiceTag, this.rpcx.options.ServiceType, this.rpcx.options.ConsulServers, nil); err != nil { if d, err = client.NewConsulDiscovery(this.rpcx.options.ServiceTag, spath[0], this.rpcx.options.ConsulServers, nil); err != nil {
return return
} }
c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption) c = client.NewXClient(spath[0], client.Failfast, client.RandomSelect, d, client.DefaultOption)

View File

@ -41,12 +41,20 @@ var (
) )
func OnInit(config map[string]interface{}, option ...Option) (err error) { func OnInit(config map[string]interface{}, option ...Option) (err error) {
defsys, err = newSys(newOptions(config, option...)) var options Options
if options, err = newOptions(config, option...); err != nil {
return
}
defsys, err = newSys(options)
return return
} }
func NewSys(option ...Option) (sys ISys, err error) { func NewSys(option ...Option) (sys ISys, err error) {
sys, err = newSys(newOptionsByOption(option...)) var options Options
if options, err = newOptionsByOption(option...); err != nil {
return
}
sys, err = newSys(options)
return return
} }

View File

@ -1,6 +1,7 @@
package rpcx package rpcx
import ( import (
"errors"
"go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/sys/log"
"go_dreamfactory/lego/utils/mapstructure" "go_dreamfactory/lego/utils/mapstructure"
) )
@ -73,7 +74,7 @@ func SetLog(v log.ILog) Option {
} }
} }
func newOptions(config map[string]interface{}, opts ...Option) Options { func newOptions(config map[string]interface{}, opts ...Option) (Options, error) {
options := Options{ options := Options{
Debug: true, Debug: true,
Log: log.Clone(log.SetLoglayer(2)), Log: log.Clone(log.SetLoglayer(2)),
@ -84,10 +85,13 @@ func newOptions(config map[string]interface{}, opts ...Option) Options {
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
return options if len(options.ServiceTag) == 0 || len(options.ServiceType) == 0 || len(options.ServiceId) == 0 || len(options.ConsulServers) == 0 {
return options, errors.New("[Sys.RPCX] newOptions err: 启动参数异常")
}
return options, nil
} }
func newOptionsByOption(opts ...Option) Options { func newOptionsByOption(opts ...Option) (Options, error) {
options := Options{ options := Options{
Debug: true, Debug: true,
Log: log.Clone(log.SetLoglayer(2)), Log: log.Clone(log.SetLoglayer(2)),
@ -95,5 +99,8 @@ func newOptionsByOption(opts ...Option) Options {
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
return options if len(options.ServiceTag) == 0 || len(options.ServiceType) == 0 || len(options.ServiceId) == 0 || len(options.ConsulServers) == 0 {
return options, errors.New("[Sys.RPCX] newOptions err: 启动参数异常")
}
return options, nil
} }

View File

@ -27,8 +27,8 @@ var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
*/ */
type MComp_GateComp struct { type MComp_GateComp struct {
cbase.ModuleCompBase cbase.ModuleCompBase
service base.IRPCXService //rpc服务对象 S base.IRPCXService //rpc服务对象
module core.IModule //当前业务模块 M core.IModule //当前业务模块
comp core.IModuleComp //网关组件自己 comp core.IModuleComp //网关组件自己
scomp comm.ISC_GateRouteComp scomp comm.ISC_GateRouteComp
} }
@ -36,8 +36,8 @@ type MComp_GateComp struct {
//组件初始化接口 //组件初始化接口
func (this *MComp_GateComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { func (this *MComp_GateComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
this.ModuleCompBase.Init(service, module, comp, options) this.ModuleCompBase.Init(service, module, comp, options)
this.service = service.(base.IRPCXService) this.S = service.(base.IRPCXService)
this.module = module this.M = module
this.comp = comp this.comp = comp
return return
} }
@ -49,7 +49,7 @@ func (this *MComp_GateComp) Start() (err error) {
} }
var comp core.IServiceComp var comp core.IServiceComp
//注册远程路由 //注册远程路由
if comp, err = this.service.GetComp(comm.SC_ServiceGateRouteComp); err != nil { if comp, err = this.S.GetComp(comm.SC_ServiceGateRouteComp); err != nil {
return return
} }
this.scomp = comp.(comm.ISC_GateRouteComp) this.scomp = comp.(comm.ISC_GateRouteComp)
@ -96,7 +96,7 @@ func (this *MComp_GateComp) reflectionRouteHandle(method reflect.Method) bool {
if returnType := mtype.Out(0); returnType != typeOfError { if returnType := mtype.Out(0); returnType != typeOfError {
return false return false
} }
this.scomp.RegisterRoute(fmt.Sprintf("%s.%s", this.module.GetType(), strings.ToLower(mname)), reflect.ValueOf(this.comp), replyType, method) this.scomp.RegisterRoute(fmt.Sprintf("%s.%s", this.M.GetType(), strings.ToLower(mname)), reflect.ValueOf(this.comp), replyType, method)
return true return true
} }

View File

@ -68,9 +68,9 @@ func (this *LoginComp) Login(ctx context.Context, session comm.IUserSession, req
log.Errorf("User_CreateUser err %v", err) log.Errorf("User_CreateUser err %v", err)
return err return err
} }
session.Build(user.UserId) session.Bind(user.UserId, this.S.GetId())
} else { } else {
session.Build(db_user.UserId) session.Bind(db_user.UserId, this.S.GetId())
} }
cache_user := &pb.Cache_UserData{ cache_user := &pb.Cache_UserData{