梦工厂服务端项目
Go to file
2022-06-29 17:34:11 +08:00
bin/json 更新配置 2022-06-29 11:23:39 +08:00
cmd 修复卡叠加逻辑 2022-06-29 17:15:31 +08:00
comm 添加叠加字段 2022-06-29 10:27:59 +08:00
lego 初始化英雄 2022-06-28 12:03:14 +08:00
modules 增加注释 2022-06-29 17:34:11 +08:00
pb 修复卡叠加逻辑 2022-06-29 17:15:31 +08:00
services Merge branch 'dev' of http://git.legu.cc/liwei_3d/go_dreamfactory into liwei 2022-06-28 18:20:05 +08:00
sys 更新配置 2022-06-29 11:23:39 +08:00
test 添加叠加字段 2022-06-29 10:27:59 +08:00
utils 解决冲突 2022-06-27 19:08:54 +08:00
.gitignore Merge branch 'master' of http://git.legu.cc/liwei_3d/go_dreamfactory 2022-06-01 15:55:45 +08:00
build.bat 打包脚本 2022-06-29 17:15:31 +08:00
go.mod merge 2022-06-27 19:48:39 +08:00
go.sum 解决冲突 2022-06-27 19:08:54 +08:00
linux-build.bat 上传代码 2022-06-01 14:52:33 +08:00
pb.bat 上传武器相关配置和接口重构 2022-06-27 17:45:02 +08:00
probuf转go.cmd update 2022-06-22 19:31:18 +08:00
py_consul.py 上传配置以及pyhont 工具脚本 2022-05-31 16:59:28 +08:00
README.md 日志写表 2022-06-13 17:40:10 +08:00

go_dreamfactory

梦工厂服务端项目

项目构架说明

.vscode                     #vscode 编辑器环境配置项 包含服务调试运行配置
bin                         #服务运行和发布环境 包含服务的执行文件以及相关依赖配置
    conf
        gateway_1.yaml      #服务配置 网关服务的配置依赖文件
        worker_1.yaml       #服务配置 工作服务的配置依赖文件
    json                    #项目策划配置文件
    log                     #服务运行输出日志文件目录
cmd                         #命令行工具集
comm                        #项目通用常量和类型定义
    core                        #模块名以及rpc方法名等等的定义
    usersession                 #用户的会话对象实现 实现跨服与用户通信
lego                        #lego 微服务框架包
    base                        #基础服务容器的定义
    core                        #lego服务和模块设计相关
    sys                         #一些通用的系统工具集合
    utils                       #一些公用的方法工具集合
modules                     #项目业务功能模块集合
    friend                      #好友模块 提供好像系统相关业务功能 在worker服务下运行
    gateway                     #网关模块 提供网关服务以及用户连接对象管理功能 在gateway服务容器下运行
    mail                        #邮件模块 提供邮箱相关业务功能 在worker服务下运行
    pack                        #背包模块 提供用户背包管理相关业务功能 在worker服务下运行
    user                        #用户模块 提供登录祖册相关业务功能 在worker服务下运行
    web                         #web模块 提供一些http服务支持 在web服务下运行
    core.go                     #一些模块集合内通用的接口和类型定义
    gate_comp.go                #模块网关组件 用户注册接收用户的消息 业务模块自定义api组件继承此组件即可
    modulebase.go               #基础模块 业务模块都应继承此模块 内部封装有一些通用的接口 方便业务开发
pb                          #服务协议以及数据类型定义
    proto                       #proto的定义文件目录
services                    #服务定义目录
    gateway                     #网关服务   定义网关服务容器 只运行网关服务模块
    web                         #web服务    提供一些http服务功能 前期测试和后期运维可能需要
    worker                      #工作服务   游戏的业务模块都会在这个服务容器中运行
    servicebase.go              #基础服务容器定义 可以实现服务的底层通用接口的实现
    comp_gateroute.go           #网关服务组件 用于接收从网关服务发出的用户协议并分发给本服务内各个模块处理
sys                         #公用系统工具集
    cache                       #缓存管理系统 封装游戏类缓存数据的处理接口
    db                          #存储管理系统 封装游戏mgo数据库相关数据处理接口
    configure                   #配置管理中心 负责加载更新管理项目中使用到的各类配置文件数据
utils                       #公用方法工具集

启动服务

  • 调试模式下 工具VSCode 配置 .vscode\launch.json
    {
      // 使用 IntelliSense 了解相关属性。 
      // 悬停以查看现有属性的描述。
      // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387
      "version": "0.2.0",
      "configurations": [
        {
          "name": "gateway_1",
          "type": "go",
          "request": "launch",
          "mode": "debug",
          "program": "${workspaceFolder}/services/gateway",  //配置Go项目启动文件路径即main函数所在的.go文件的路径,${workspaceRoot}代表项目的根目录,也就是 /bin /pkg /src这三个文件夹所在的目录
          "args": ["-conf","./conf/gateway_1.yaml"],         //指定服务启动的依赖配置文件 配置文件默认放在bin/conf 目录下
          "cwd": "${workspaceFolder}/bin", //设置工作目录
          "internalConsoleOptions": "openOnSessionStart",
          "output": "${workspaceFolder}/bin/vsdebug_gateway", //设置vscode调试时生成文件的路径
          "showGlobalVariables": true,
          "env": {}, //可以用来配置调试启动时所用的环境变量参数,比如gopath临时设置为某个参数就可以在这里指定如果有多个gopath用英文冒号:来连接多个gopath
        },
        {
          "name": "worker_1",
          "type": "go",
          "request": "launch",
          "mode": "debug",
          "program": "${workspaceFolder}/services/worker", //配置Go项目启动文件路径即main函数所在的.go文件的路径,${workspaceRoot}代表项目的根目录,也就是 /bin /pkg /src这三个文件夹所在的目录
          "args": ["-conf","./conf/worker_1.yaml"],        //指定服务启动的依赖配置文件
          "cwd": "${workspaceFolder}/bin", //设置工作目录
          "internalConsoleOptions": "openOnSessionStart",
          "output": "${workspaceFolder}/bin/vsdebug_worker", //设置vscode调试时生成文件的路径
          "showGlobalVariables": true,
          "env": {}, //可以用来配置调试启动时所用的环境变量参数,比如gopath临时设置为某个参数就可以在这里指定如果有多个gopath用英文冒号:来连接多个gopath
        }
      ]
    }
    
  • 编译启动 执行 linux-build.bat 文件 生成执行文件 暂时只编译了linux版本的执行文件
      cd ./bin
      ./gateway -conf ./conf/gateway_1.yaml  
    

如何快速开发模块

  • 定义模块类型名称 在comm/core.go 中定义定义自己的业务模块名称
    const (
        SM_GateModule   core.M_Modules = "gateway" //gate模块 	网关服务模块
        SM_WebModule    core.M_Modules = "web"     //web模块
        SM_UserModule   core.M_Modules = "user"    //用户模块
        SM_PackModule   core.M_Modules = "pack"    //背包模块
        SM_MailModule   core.M_Modules = "mail"    //邮件模块
        SM_FriendModule core.M_Modules = "friend"  //好友模块
        //定义自己的模块名称
    )
    
  • 创建自己的模块目录 在modules/ 目录下创建自己的模块目录
  • 定义自己的模块对象 在modules/自己的模块目录 创建module.go 文件
    package pack    //修改成自己的包名
    
    import (
        "go_dreamfactory/comm"
        "go_dreamfactory/modules"
    
        "go_dreamfactory/lego/core"
    )
    
    /*
    模块名:Pack
    描述:背包系统模块
    开发:李伟
    */
    func NewModule() core.IModule {
        m := new(Pack)
        return m
    }
    
    //换成自己的模块对象
    type Pack struct {
        modules.ModuleBase
    }
    
    func (this *Pack) GetType() core.M_Modules {
        return comm.SM_PackModule               //换成自己的模块名
    }
    
    //组装组件的地方
    func (this *Pack) OnInstallComp() {
        this.ModuleBase.OnInstallComp()
    }
    
    
  • 定义自己的用户请求处理组件 在modules/自己的模块目录 创建api_comp.go 文件 名字根据自己的情况定义,
    //api_comp.go
    //必须继承  modules.MComp_GateComp
    type Api_Comp struct {
        modules.MComp_GateComp
    }
    
    //定义具体的消息处理函数
    func (this *Api_Comp) QueryUserPackReq(ctx context.Context, session comm.IUserSession, req *pb.QueryUserPackReq) (err error) {
    
    }
    
    
    
    //将组件装备到模块上
    //module.go
    func (this *Pack) OnInstallComp() {
        this.ModuleBase.OnInstallComp()
        this.api_comp = this.RegisterComp(new(Api_Comp)).(*Api_Comp)
    }
    
  • 定义自己的协议 在pb/proto 下定义自己的proto协议文件并将文件加入到pb_2.7.py编译工具中
        buildProto('./pb/proto','./pb','comm')
        buildProto('./pb/proto','./pb','errorcode')
        buildProto('./pb/proto','./pb','user_db')
        buildProto('./pb/proto','./pb','user_msg')
        buildProto('./pb/proto','./pb','pack_db')
        buildProto('./pb/proto','./pb','pack_msg')
        //加入知己的协议文件
    
  • 将模块装备到worker服务容器中 services/worker/main.go
    func main() {
        flag.Parse()
        s := NewService(
            rpcx.SetConfPath(*conf),
            rpcx.SetVersion("1.0.0.0"),
        )
        s.OnInstallComp( //装备组件
            services.NewGateRouteComp(),
        )
        lego.Run(s, //运行模块
            // web.NewModule(),
            user.NewModule(),
            pack.NewModule(),
            mail.NewModule(),
            //在这里吧自己的模块注册进去
        )
    
    }
    

服务容器的定义说明

  • service 容器定义 项目可更具情况组合拆分业务到具体的服务中
var (
	conf = flag.String("conf", "./conf/worker.yaml", "获取需要启动的服务配置文件") //启动服务的Id
)

func main() {
	flag.Parse()
	s := NewService(
		rpcx.SetConfPath(*conf),            //指定配置文件路径
		rpcx.SetVersion("1.0.0.0"),         //设置服务版本号
	)
    //每一个服务都只是一个进程容器,服务提供的功能业务取决服务绑定了哪些组件和模块
	s.OnInstallComp( //装备组件
		services.NewGateRouteComp(),                //装备接收用户消息的组件     
	)
	lego.Run(s, //装备模块
		user.NewModule(),                           //装备用户模块          
		pack.NewModule(),                           //装备背包模块          
		mail.NewModule(),                           //装备邮件模块   
	)

}

func NewService(ops ...rpcx.Option) core.IService {
	s := new(Service)
	s.Configure(ops...)
	return s
}

//继承基础服务对象 方便后期通用接口的扩展
type Service struct {
	services.ServiceBase
}

//初始化服务中需要用到的系统工具
func (this *Service) InitSys() {
	this.ServiceBase.InitSys()
    //初始化缓存系统
	if err := cache.OnInit(this.GetSettings().Sys["cache"]); err != nil {
		panic(fmt.Sprintf("init sys.cache err: %s", err.Error()))
	} else {
		log.Infof("init sys.cache success!")
	}
    //初始化存储系统
	if err := db.OnInit(this.GetSettings().Sys["db"]); err != nil {
		panic(fmt.Sprintf("init sys.db err: %s", err.Error()))
	} else {
		log.Infof("init sys.db success!")
	}
}

模块容器的定义说明

pack/module.go 每个模块目录必须有一个模块对象 一般将相同类型的业务分为一个模块中实现, 模块也只是一个容器,模块中具体实现功能的单元为模块组件对象,模块启动是装备具体的业务组件集合实现相关的业务功能

func NewModule() core.IModule {
	m := new(Pack)
	return m
}

//模块结构对象 继承基础模块对象
type Pack struct {
	modules.ModuleBase
	api_comp       *Api_Comp
	configure_comp *Configure_Comp
}

//模块类型确定 模块名称
func (this *Pack) GetType() core.M_Modules {
	return comm.SM_PackModule
}
//服务启动相关模块时调用的接口
func (this *Pack) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) {
	err = this.ModuleBase.Init(service, module, options)	
	return
}
//模块启动时装备业务组件 可更具自己的业务划分 自己定义业务组件
func (this *Pack) OnInstallComp() {
	this.ModuleBase.OnInstallComp()
    //用户背包相关协议处理组件
	this.api_comp = this.RegisterComp(new(Api_Comp)).(*Api_Comp)
    //背包系统的相关配置管理组件             
	this.configure_comp = this.RegisterComp(new(Configure_Comp)).(*Configure_Comp)
}

pack/api_comp.go 背包用户api组件 用于接收处理用具背包请求协议

//Api 组件继承于模块网关组件 modules.MComp_GateComp 内部通过反射机制将当前组件内处理协议的接口注册到 comp_gateroute.go 网关服务组件中之后服务组件就可以把接收到的用户消息分发给当前api组件
type Api_Comp struct {
	modules.MComp_GateComp
	module *Pack
}
//每个组件都可以通过组件初始化接口拿到 当前服务容器对象以及所属的模块对象
func (this *Api_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
	this.MComp_GateComp.Init(service, module, comp, options)
	this.module = module.(*Pack)
	return
}

///查询用户背包数据 处理用处的消息接口 统一 func(ctx context.Context, session comm.IUserSession, req 消息结构对象)(err error)/////ctx 上下文控制器 可以管理方法调用中生成的多个协程 以及传递一些元数据
//session 用户的会话对象 可以使用此对象给用户发送消息以及关闭用户的连接对象
//req 协议处理函数接收到具体消息数据对象
func (this *Api_Comp) QueryUserPackReq(ctx context.Context, session comm.IUserSession, req *pb.QueryUserPackReq) (err error) {
	var (
		code  pb.ErrorCode
		pack  *pb.DB_UserPackData
		grids []*pb.DB_GridData
	)
	defer func() {
		session.SendMsg(string(this.module.GetType()), "queryuserpackresp", code, &pb.QueryUserPackResp{Grids: grids})
	}()
	if !session.IsLogin() {
		code = pb.ErrorCode_NoLogin
		return
	}
    //通过缓存读取到用户的背包信息
	if pack, err = cache.Defsys.Pack_QueryUserPack(session.GetUserId()); err != nil {
		log.Errorf("QueryUserPackReq err:%v", err)
		code = pb.ErrorCode_CacheReadError
		return
	} else {
		grids = this.module.configure_comp.GetPackItemByType(pack, req.IType)
	}
	return
}

客户端发送消息到服务器处理的流程介绍

  • 前后端通用协议结构定义 pb/proto/comm.proto
    message UserMessage  {
        string MainType =1;         //消息的主id 默认为具体的业务模块名称
        string SubType = 2;         //消息的副id 默认为具体处理消息的函数名
        ErrorCode Code = 3;         //服务端回应客户端返回的错误码 pb/proto/errorcode.proto 定义
        bytes Data = 4;             //具体的消息体内容
    }
    
  • 网关服务接收用户协议的处理 modules/gateway/agent.go
        //用户代理对象
        type Agent struct {
            gateway     IGateway
            wsConn      *websocket.Conn
            sessionId   string
            uId         string
            writeChan   chan *pb.UserMessage
            closeSignal chan bool
            state       int32 //状态 0 关闭 1 运行 2 关闭中
            wg          sync.WaitGroup
        }
    
        //读取用户发送过来的消息
        func (this *Agent) readLoop() {
            defer this.wg.Done()
            var (
                data []byte
                msg  *pb.UserMessage = &pb.UserMessage{}
                err  error
            )
        locp:
            for {
                if _, data, err = this.wsConn.ReadMessage(); err != nil {
                    log.Errorf("agent:%s uId:%s ReadMessage err:%v", this.sessionId, this.uId, err)
                    go this.Close()
                    break locp
                }
                //读取消息 并序列化到 UserMessage 结构对象
                if err = proto.Unmarshal(data, msg); err != nil {
                    log.Errorf("agent:%s uId:%s Unmarshal err:%v", this.sessionId, this.uId, err)
                    go this.Close()
                    break locp
                } else {
                    //分发处理用户的消息请求
                    this.messageDistribution(msg)
                }
            }
            log.Debugf("agent:%s uId:%s readLoop end!", this.sessionId, this.uId)
        }
    
        //分发用户消息 将用户消息采用负载的方式分到到 worker 类型的服务
        //此处后期会补充设计用户协议分化系统,暂时采用负载分发实现基本功能线
        func (this *Agent) messageDistribution(msg *pb.UserMessage) {
            reply := &pb.RPCMessageReply{}
            log.Debugf("agent:%s uId:%s MessageDistribution msg:%s.%s", this.sessionId, this.uId, msg.MainType, msg.SubType)
            if err := this.gateway.Service().RpcCallByType("worker", string(comm.Rpc_GatewayRoute), context.Background(), &pb.AgentMessage{
                Ip:               this.IP(),
                UserSessionId:    this.sessionId,
                UserId:           this.uId,
                GatewayServiceId: this.gateway.Service().GetId(),
                Method:           fmt.Sprintf("%s.%s", msg.MainType, msg.SubType),
                Message:          msg.Data,
            }, reply); err != nil {
                log.Errorf("agent:%s uId:%s MessageDistribution err:%v", this.sessionId, this.uId, err)
            } else {
                log.Debugf("agent:%s uId:%s MessageDistribution reply:%v", this.sessionId, this.uId, reply)
            }
        }
    
    
  • Worker 服务如何接受用户的消息 services/comp_gateroute.go
    //网关服务组件启动时会注册Rpc方法 接收网关服务那边发过来的用户消息
    func (this *SComp_GateRouteComp) Start() (err error) {
        this.service.RegisterFunctionName(string(comm.Rpc_GatewayRoute), this.ReceiveMsg) //注册网关路由接收接口
        err = this.ServiceCompBase.Start()
        return
    }
    
    //具体接收用户消息的方法 此处消息分发需要当前服务内有模块注册处理消息的接口 没有注册的话 此处会打印错误日志
    //msghandles 为消息处理函数的管理对象
    func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.AgentMessage, reply *pb.RPCMessageReply) error {
        log.Debugf("SComp_GateRouteComp ReceiveMsg agent:%s uId:%d MessageDistribution msg:%s", args.UserSessionId, args.UserId, args.Method)
        this.mrlock.RLock()
        msghandle, ok := this.msghandles[args.Method]
        this.mrlock.RUnlock()
        if ok {
            session := comm.NewUserSession(this.service, args.Ip, args.UserSessionId, args.GatewayServiceId, args.UserId)
            msg := reflect.New(msghandle.msgType.Elem()).Interface()
            if err := proto.Unmarshal(args.Message, msg.(proto.Message)); err != nil {
                log.Errorf("UserMessage:%s Unmarshal err:%v", args.Method, err)
                return err
            }
            msghandle.fn.Func.Call([]reflect.Value{msghandle.rcvr, reflect.ValueOf(ctx), reflect.ValueOf(session), reflect.ValueOf(msg)})
        } else {
            reply.Code = pb.ErrorCode_ReqParameterError
            // reply.Msg = pb.GetErrorCodeMsg(pb.ErrorCode_ReqParameterError)
        }
        return nil
    }
    
  • 业务模块 如何接受处理用户的消息
    #模块需要接受处理用户发送过来的消息需要装备一个继承modules/gate_comp.go 的模块网关组件此组件启动是会自动把自身定义的消息处理函数注册到上面提到的服务网关组件services/comp_gateroute.go 中去实现可以参考pack/api_comp.go 的定义
    /*
    模块 网关组件 接收处理用户传递消息
    */
    type MComp_GateComp struct {
        cbase.ModuleCompBase
        service base.IRPCXService
        module  core.IModule
        comp    core.IModuleComp
    }
    //组件启动是会通过反射把当前组件上定义的用户处理函数 统一注册到当前服务容器中的网关组件中
    func (this *MComp_GateComp) Start() (err error) {
        if err = this.ModuleCompBase.Start(); err != nil {
            return
        }
        var comp core.IServiceComp
        //注册远程路由
        if comp, err = this.service.GetComp(comm.SC_ServiceGateRouteComp); err != nil {
            return
        }
        this.suitableMethods(comp.(comm.ISC_GateRouteComp), reflect.TypeOf(this.comp))
        return
    }
    
    //反射注册消息处理函数
    func (this *MComp_GateComp) suitableMethods(scomp comm.ISC_GateRouteComp, typ reflect.Type) {
        for m := 0; m < typ.NumMethod(); m++ {
            method := typ.Method(m)
            mtype := method.Type
            mname := method.Name
            // Method must be exported.
            if method.PkgPath != "" {
                continue
            }
            // Method needs four ins: receiver, context.Context, *args, *reply.
            if mtype.NumIn() != 4 {
                continue
            }
            // First arg must be context.Context
            ctxType := mtype.In(1)
            if !ctxType.Implements(typeOfContext) {
                continue
            }
    
            // Second arg need not be a pointer.
            argType := mtype.In(2)
            if !argType.Implements(typeOfSession) {
                continue
            }
            // Third arg must be a pointer.
            replyType := mtype.In(3)
            if replyType.Kind() != reflect.Ptr {
                continue
            }
            // Reply type must be exported.
            if !this.isExportedOrBuiltinType(replyType) {
                continue
            }
            // Method needs one out.
            if mtype.NumOut() != 1 {
                continue
            }
            // The return type of the method must be error.
            if returnType := mtype.Out(0); returnType != typeOfError {
                continue
            }
            scomp.RegisterRoute(fmt.Sprintf("%s.%s", this.module.GetType(), strings.ToLower(mname)), reflect.ValueOf(this.comp), replyType, method)
        }
    }
    
  • 消息处理函数如何给用户回复消息
    //每一个消息处理函数我们都会传递一个 IUserSession的对象 开发人员可以使用此对象向当前用户回复消息以及关闭连接等操作
    type IUserSession interface {
        GetSessionId() string
        GetUserId() string
        GetIP() string
        GetGatewayServiceId() string
        IsLogin() bool
        Build(uid string) (err error)
        UnBuild(ServiceMethod string, msg proto.Message) (err error)
        SendMsg(mainType, subType string, code pb.ErrorCode, msg proto.Message) (err error)
        Close() (err error)
        ToString() string
    }
    
    
    ///查询用户背包数据 
    func (this *Api_Comp) QueryUserPackReq(ctx context.Context, session comm.IUserSession, req *pb.QueryUserPackReq) (err error) {
        var (
            code  pb.ErrorCode
            pack  *pb.DB_UserPackData
            grids []*pb.DB_GridData
        )
        defer func() {
            session.SendMsg(string(this.module.GetType()), "queryuserpackresp", code, &pb.QueryUserPackResp{Grids: grids})
        }()
        if !session.IsLogin() {
            code = pb.ErrorCode_NoLogin
            return
        }
        if pack, err = cache.Defsys.Pack_QueryUserPack(session.GetUserId()); err != nil {
            log.Errorf("QueryUserPackReq err:%v", err)
            code = pb.ErrorCode_CacheReadError
            return
        } else {
            grids = this.module.configure_comp.GetPackItemByType(pack, req.IType)
        }
        return
    }
    
  • 消息处理函数如何给其他用户发送消息
    modules/modulebase.go 是所有业务模块的基础模块,大家在定义业务模块时注意继承此模块,此模块封装有给指定用户发送消息以及给多个用户同时发送消息的接口后续一些通用的接口都会封装到这一层 方便上层业务模块快速调用,组件可以在Init方法中通过断言的方式获取到模块对象,在调用底层接口即可
    
    //向目标用户发送消息
    func (this *ModuleBase) SendMsgToUser(mainType, subType string, msg proto.Message, user *pb.Cache_UserData) (err error) {
        reply := &pb.RPCMessageReply{}
        data, _ := proto.Marshal(msg)
        if _, err = this.service.RpcGoById(user.GatewayServiceId, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentSendMessageReq{
            UserSessionId: user.SessionId,
            MainType:      mainType,
            SubType:       subType,
            Data:          data,
        }, reply); err != nil {
            log.Errorf("SendMsgToUser%d:%s [%s.%s] err:%v", user.UserData.UserId, user.SessionId, mainType, subType, err)
        }
        return
    }
    //向多个目标用户发送消息
    func (this *ModuleBase) SendMsgToUsers(mainType, subType string, msg proto.Message, user ...*pb.Cache_UserData) (err error) {
        var (
            gateways map[string][]string = make(map[string][]string)
            gateway  []string
            ok       bool
        )
        for _, v := range user {
            if gateway, ok = gateways[v.GatewayServiceId]; !ok {
                gateway = make([]string, 0)
                gateways[v.GatewayServiceId] = gateway
            }
            gateway = append(gateway, v.SessionId)
        }
        reply := &pb.RPCMessageReply{}
        data, _ := proto.Marshal(msg)
        for k, v := range gateways {
            if _, err = this.service.RpcGoById(k, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.BatchMessageReq{
                UserSessionIds: v,
                MainType:       mainType,
                SubType:        subType,
                Data:           data,
            }, reply); err != nil {
                log.Errorf("SendMsgToUsers:%s->%s.%s err:%v", k, mainType, subType, err)
            }
        }
        return
    }
    

lego 微服务构架 服务发现 registry 注册表系统

  • 服务节点定义 registry/core.go
    //服务发现系统 定义每一个服务节点的结构对象
	ServiceNode struct {
		Tag       string          `json:"Tag"`       //服务集群标签
		Type      string          `json:"Type"`      //服务类型
		Category  core.S_Category `json:"Category"`  //服务类别
		Id        string          `json:"Id"`        //服务Id
		Version   string          `json:"Version"`   //服务版本
		IP        string          `json:"Ip"`        //服务Ip
		Port      int             `json:"Port"`      //端口
		RpcId     string          `json:"RpcId"`     //服务通信Id
		PreWeight float64         `json:"PreWeight"` //服务负载权重
	}
  • 集群服务启动时 底层会默认启动 registry 系统 实现集群内的服务节点互相发现
    // lego/base/rpcx/service.go 
    func (this *RPCXService) InitSys() {
        //初始化日志系统
        if err := log.OnInit(this.opts.Setting.Sys["log"]); err != nil {
            panic(fmt.Sprintf("Sys log Init err:%v", err))
        } else {
            log.Infof("Sys log Init success !")
        }
        //初始化事件系统
        if err := event.OnInit(this.opts.Setting.Sys["event"]); err != nil {
            log.Panicf(fmt.Sprintf("Sys event Init err:%v", err))
        } else {
            log.Infof("Sys event Init success !")
        }
        //初始化注册表系统
        if err := registry.OnInit(this.opts.Setting.Sys["registry"], registry.SetService(this.rpcxService), registry.SetListener(this.rpcxService.(registry.IListener))); err != nil {
            log.Panicf(fmt.Sprintf("Sys registry Init err:%v", err))
        } else {
            log.Infof("Sys registry Init success !")
        }
        //初始化rpcx 系统
        if err := rpcx.OnInit(this.opts.Setting.Sys["rpcx"], rpcx.SetServiceId(this.GetId()), rpcx.SetPort(this.GetPort())); err != nil {
            log.Panicf(fmt.Sprintf("Sys rpcx Init err:%v", err))
        } else {
            log.Infof("Sys rpcx Init success !")
        }
        //监听服务初始化完毕后 启动注册表和rpc服务
        event.Register(core.Event_ServiceStartEnd, func() { //阻塞 先注册服务集群 保证其他服务能及时发现
            if err := rpcx.Start(); err != nil {
                log.Panicf(fmt.Sprintf("Sys rpcx Start err:%v", err))
            }
            if err := registry.Start(); err != nil {
                log.Panicf(fmt.Sprintf("Sys registry Start err:%v", err))
            }
        })
    }

    //registry/coire.go 注册表系统的接口定义
	ISys interface {
        ///启动注册表系统 向第三方服务注册当前服务信息
		Start() error
        ///停止注册表系统 向第三方服务注销当前服务
		Stop() error
        ///推送当前服务信息到 第三方服务
		PushServiceInfo() (err error)
        ///通过id 获取集群内服务的节点信息
		GetServiceById(sId string) (n *ServiceNode, err error)
        ///通过服务类型 获取当前集群下的所有服务节点信息
		GetServiceByType(sType string) (n []*ServiceNode)
        ///获取当前集群下所有的服务节点信息
		GetAllServices() (n []*ServiceNode)
        ///通过服务类别 获取当前集群下的所有服务节点信息
		GetServiceByCategory(category core.S_Category) (n []*ServiceNode)
        ///通过RPC接口获取 提供此接口的所有服务节点信息
		GetRpcSubById(rId core.Rpc_Key) (n []*ServiceNode)
	}
  • registry 发现集群下服务变动 通知 service
    //registry/options.go 启动是需要设置监听对象
    registry.SetListener(this.rpcxService.(registry.IListener))

    // /registry/coire.go  监听注册表发送的集群变更信息
    IListener interface {
		FindServiceHandlefunc(snode ServiceNode)
		UpDataServiceHandlefunc(snode ServiceNode)
		LoseServiceHandlefunc(sId string)
	}


    // lego/base/rpcx/service.go  服务对象实现上面三个接口 既可以接收到集群变化事件
    //注册服务会话 当有新的服务加入时
    func (this *RPCXService) FindServiceHandlefunc(node registry.ServiceNode) {
        this.lock.Lock()
        if _, ok := this.serverList.Load(node.Id); !ok {
            if s, err := NewServiceSession(&node); err != nil {
                log.Errorf("创建服务会话失败【%s】 err:%v", node.Id, err)
            } else {
                this.serverList.Store(node.Id, s)
            }
        }
        this.lock.Unlock()
        if this.IsInClustered {
            event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件
        } else {
            if node.Id == this.opts.Setting.Id { //发现自己 加入集群成功
                this.IsInClustered = true
                event.TriggerEvent(core.Event_RegistryStart)
            }
        }
    }

    //更新服务会话 当有新的服务加入时
    func (this *RPCXService) UpDataServiceHandlefunc(node registry.ServiceNode) {
        if ss, ok := this.serverList.Load(node.Id); ok { //已经在缓存中 需要更新节点信息
            session := ss.(base.IRPCXServiceSession)
            if session.GetRpcId() != node.RpcId {
                if s, err := NewServiceSession(&node); err != nil {
                    log.Errorf("更新服务会话失败【%s】 err:%v", node.Id, err)
                } else {
                    this.serverList.Store(node.Id, s)
                }
                event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件
            } else {
                if session.GetVersion() != node.Version {
                    session.SetVersion(node.Version)
                }
                if session.GetPreWeight() != node.PreWeight {
                    session.SetPreWeight(node.PreWeight)
                }
                event.TriggerEvent(core.Event_UpDataOldService, node) //触发发现新的服务事件
            }
        }
    }

    //注销服务会话
    func (this *RPCXService) LoseServiceHandlefunc(sId string) {
        this.lock.Lock()
        session, ok := this.serverList.Load(sId)
        if ok && session != nil {
            session.(base.IRPCXServiceSession).Done()
            this.serverList.Delete(sId)
        }
        this.lock.Unlock()
        event.TriggerEvent(core.Event_LoseService, sId) //触发发现新的服务事件
    }

lego 微服务构架 服务通信 rpcx

  • 服务启动时启动rpcx服务 提供rpcx服务监听
    //rpcx 系统接口定义
	ISys interface {
		IRPCXServer
		NewRpcClient(addr, sId string) (clent IRPCXClient, err error)   //创建rpc客户端对象
	}

	IRPCXServer interface {
		Start() (err error) //启动rpcx服务
		Stop() (err error)  //停止rpcx服务
		Register(rcvr interface{}) (err error)  //注册rpc服务接口对象
		RegisterFunction(fn interface{}) (err error)    //注册rpc服务接口
		RegisterFunctionName(name string, fn interface{}) (err error) //注册rpc服务接口
		UnregisterAll() (err error) //注销全部rpc服务接口
	}
    //rpc客户端对象接口定义
    IRPCXClient interface {
		Stop() (err error)
		Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
		Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (*client.Call, error)
	}

    //lego/sys/rpcx/service.go
    /*RPC 服务对象*/  
    type Service struct {
        server  *server.Server
        options Options
    }

    func (this *Service) Start() (err error) {
        go this.server.Serve("tcp", fmt.Sprintf(":%d", this.options.Port))
        return
    }

    func (this *Service) Stop() (err error) {
        err = this.server.Close()
        return
    }

    func (this *Service) Register(rcvr interface{}) (err error) {
        err = this.server.RegisterName(this.options.ServiceId, rcvr, "")
        return
    }
    func (this *Service) RegisterFunction(fn interface{}) (err error) {
        err = this.server.RegisterFunction(this.options.ServiceId, fn, "")
        return
    }
    func (this *Service) RegisterFunctionName(name string, fn interface{}) (err error) {
        err = this.server.RegisterFunctionName(this.options.ServiceId, name, fn, "")
        return
    }
    func (this *Service) UnregisterAll() (err error) {
        err = this.server.UnregisterAll()
        return
    }
  • 发现新的服务节点信息是 创建服务会话对象 lego/base/rpcx/servicesession.go
    //注册服务会话 当有新的服务加入时 lego/base/rpcx/service.go
    func (this *RPCXService) FindServiceHandlefunc(node registry.ServiceNode) {
        this.lock.Lock()
        if _, ok := this.serverList.Load(node.Id); !ok {
            if s, err := NewServiceSession(&node); err != nil {
                log.Errorf("创建服务会话失败【%s】 err:%v", node.Id, err)
            } else {
                this.serverList.Store(node.Id, s)
            }
        }
        this.lock.Unlock()
        if this.IsInClustered {
            event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件
        } else {
            if node.Id == this.opts.Setting.Id { //发现自己 加入集群成功
                this.IsInClustered = true
                event.TriggerEvent(core.Event_RegistryStart)
            }
        }
    }
    
    //创建服务会话 lego/base/rpcx/servicesession.go
    func NewServiceSession(node *registry.ServiceNode) (ss base.IRPCXServiceSession, err error) {
        session := new(ServiceSession)
        session.node = node
        session.client, err = rpcx.NewRpcClient(fmt.Sprintf("%s:%d", node.IP, node.Port), node.Id)
        ss = session
        return
    }
    
    
  • service对象注册rpc服务接口 和调用其他服务的rpc接口 lego/base/rpcx/service.go
    //注册服务对象 
    func (this *RPCXService) Register(rcvr interface{}) (err error) {
        err = rpcx.Register(rcvr)
        return
    }
    
    //注册服务方法
    func (this *RPCXService) RegisterFunction(fn interface{}) (err error) {
        err = rpcx.RegisterFunction(fn)
        return
    }
    
    //注册服务方法 自定义方法名称
    func (this *RPCXService) RegisterFunctionName(name string, fn interface{}) (err error) {
        err = rpcx.RegisterFunctionName(name, fn)
        return
    }
    
    //同步 执行目标远程服务方法
    func (this *RPCXService) RpcCallById(sId string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (err error) {
        defer lego.Recover(fmt.Sprintf("RpcCallById sId:%s rkey:%v arg %v", sId, serviceMethod, args))
        this.lock.RLock()
        defer this.lock.RUnlock()
        ss, ok := this.serverList.Load(sId)
        if !ok {
            if node, err := registry.GetServiceById(sId); err != nil {
                log.Errorf("未找到目标服务【%s】节点 err:%v", sId, err)
                return fmt.Errorf("No Found " + sId)
            } else {
                ss, err = NewServiceSession(node)
                if err != nil {
                    return fmt.Errorf(fmt.Sprintf("创建服务会话失败【%s】 err:%v", sId, err))
                } else {
                    this.serverList.Store(node.Id, ss)
                }
            }
        }
        err = ss.(base.IRPCXServiceSession).Call(ctx, serviceMethod, args, reply)
        return
    }
    
    //异步 执行目标远程服务方法
    func (this *RPCXService) RpcGoById(sId string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (call *client.Call, err error) {
        defer lego.Recover(fmt.Sprintf("RpcGoById sId:%s rkey:%v arg %v", sId, serviceMethod, args))
        this.lock.RLock()
        defer this.lock.RUnlock()
        ss, ok := this.serverList.Load(sId)
        if !ok {
            if node, err := registry.GetServiceById(sId); err != nil {
                log.Errorf("未找到目标服务【%s】节点 err:%v", sId, err)
                return nil, fmt.Errorf("No Found " + sId)
            } else {
                ss, err = NewServiceSession(node)
                if err != nil {
                    return nil, fmt.Errorf(fmt.Sprintf("创建服务会话失败【%s】 err:%v", sId, err))
                } else {
                    this.serverList.Store(node.Id, ss)
                }
            }
        }
        call, err = ss.(base.IRPCXServiceSession).Go(ctx, serviceMethod, args, reply)
        return
    }
    
    
    /*
        DefauleRpcRouteRules 此方法采用的是默认负载方式 版本信息和权重信息排序 找到最佳服务节点
        新版本正在偶合rpcx系统中的 selector 设计 支持 随机 轮询 权重 和 网络质量 等 服务策略
    */
    func (this *RPCXService) RpcCallByType(sType string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (err error) {
        defer lego.Recover(fmt.Sprintf("RpcCallByType sType:%s rkey:%s arg %v", sType, serviceMethod, args))
        this.lock.RLock()
        defer this.lock.RUnlock()
        ss, err := this.rpcxService.DefauleRpcRouteRules(sType, core.AutoIp)
        if err != nil {
            log.Errorf("未找到目标服务【%s】节点 err:%v", sType, err)
            return err
        }
        err = ss.Call(ctx, serviceMethod, args, reply)
        return
    }
    
    func (this *RPCXService) RpcGoByType(sType string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (call *client.Call, err error) {
        defer lego.Recover(fmt.Sprintf("RpcCallByType sType:%s rkey:%s arg %v", sType, serviceMethod, args))
        this.lock.RLock()
        defer this.lock.RUnlock()
        ss, err := this.rpcxService.DefauleRpcRouteRules(sType, core.AutoIp)
        if err != nil {
            log.Errorf("未找到目标服务【%s】节点 err:%v", sType, err)
            return nil, err
        }
        call, err = ss.Go(ctx, serviceMethod, args, reply)
        return
    }