diff --git a/README.md b/README.md deleted file mode 100644 index 7d85b2e65..000000000 --- a/README.md +++ /dev/null @@ -1,927 +0,0 @@ -# go_dreamfactory - -梦工厂服务端项目 - -### 项目构架说明 -```shell -.vscode #vscode 编辑器环境配置项 包含服务调试运行配置 -bin #服务运行和发布环境 包含服务的执行文件以及相关依赖配置 - conf - gateway_1.yaml #服务配置 网关服务的配置依赖文件 - worker_1.yaml #服务配置 工作服务的配置依赖文件 - json #项目策划配置文件 - log #服务运行输出日志文件目录 -cmd #命令行工具集 -comm #项目通用常量和类型定义 - core #模块名以及rpc方法名等等的定义 - usersession #用户的会话对象实现 实现跨服与用户通信 -lego #lego 微服务框架包 - base #基础服务容器的定义 - core #lego服务和模块设计相关 - sys #一些通用的系统工具集合 - utils #一些公用的方法工具集合 -modules #项目业务功能模块集合 - friend #好友模块 提供好像系统相关业务功能 在worker服务下运行 - gateway #网关模块 提供网关服务以及用户连接对象管理功能 在gateway服务容器下运行 - mail #邮件模块 提供邮箱相关业务功能 在worker服务下运行 - pack #背包模块 提供用户背包管理相关业务功能 在worker服务下运行 - user #用户模块 提供登录祖册相关业务功能 在worker服务下运行 - web #web模块 提供一些http服务支持 在web服务下运行 - core.go #一些模块集合内通用的接口和类型定义 - gate_comp.go #模块网关组件 用户注册接收用户的消息 业务模块自定义api组件继承此组件即可 - modulebase.go #基础模块 业务模块都应继承此模块 内部封装有一些通用的接口 方便业务开发 -pb #服务协议以及数据类型定义 - proto #proto的定义文件目录 -services #服务定义目录 - gateway #网关服务 定义网关服务容器 只运行网关服务模块 - web #web服务 提供一些http服务功能 前期测试和后期运维可能需要 - worker #工作服务 游戏的业务模块都会在这个服务容器中运行 - servicebase.go #基础服务容器定义 可以实现服务的底层通用接口的实现 - comp_gateroute.go #网关服务组件 用于接收从网关服务发出的用户协议并分发给本服务内各个模块处理 -sys #公用系统工具集 - cache #缓存管理系统 封装游戏类缓存数据的处理接口 - db #存储管理系统 封装游戏mgo数据库相关数据处理接口 - configure #配置管理中心 负责加载更新管理项目中使用到的各类配置文件数据 -utils #公用方法工具集 -``` - - - - - -### 启动服务 -- 调试模式下 工具VSCode 配置 .vscode\launch.json - ```shell - { - // 使用 IntelliSense 了解相关属性。 - // 悬停以查看现有属性的描述。 - // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - { - "name": "gateway_1", - "type": "go", - "request": "launch", - "mode": "debug", - "program": "${workspaceFolder}/services/gateway", //配置Go项目启动文件路径,即main函数所在的.go文件的路径,${workspaceRoot}代表项目的根目录,也就是 /bin /pkg /src这三个文件夹所在的目录 - "args": ["-conf","./conf/gateway_1.yaml"], //指定服务启动的依赖配置文件 配置文件默认放在bin/conf 目录下 - "cwd": "${workspaceFolder}/bin", //设置工作目录 - "internalConsoleOptions": "openOnSessionStart", - "output": "${workspaceFolder}/bin/vsdebug_gateway", //设置vscode调试时生成文件的路径 - "showGlobalVariables": true, - "env": {}, //可以用来配置调试启动时所用的环境变量参数,比如gopath临时设置为某个参数就可以在这里指定,如果有多个gopath,用英文冒号:来连接多个gopath - }, - { - "name": "worker_1", - "type": "go", - "request": "launch", - "mode": "debug", - "program": "${workspaceFolder}/services/worker", //配置Go项目启动文件路径,即main函数所在的.go文件的路径,${workspaceRoot}代表项目的根目录,也就是 /bin /pkg /src这三个文件夹所在的目录 - "args": ["-conf","./conf/worker_1.yaml"], //指定服务启动的依赖配置文件 - "cwd": "${workspaceFolder}/bin", //设置工作目录 - "internalConsoleOptions": "openOnSessionStart", - "output": "${workspaceFolder}/bin/vsdebug_worker", //设置vscode调试时生成文件的路径 - "showGlobalVariables": true, - "env": {}, //可以用来配置调试启动时所用的环境变量参数,比如gopath临时设置为某个参数就可以在这里指定,如果有多个gopath,用英文冒号:来连接多个gopath - } - ] - } - ``` -- 编译启动 执行 linux-build.bat 文件 生成执行文件 暂时只编译了linux版本的执行文件 - ``` - cd ./bin - ./gateway -conf ./conf/gateway_1.yaml - ``` - -### 如何快速开发模块 -- 定义模块类型名称 在comm/core.go 中定义定义自己的业务模块名称 - ```go - const ( - SM_GateModule core.M_Modules = "gateway" //gate模块 网关服务模块 - SM_WebModule core.M_Modules = "web" //web模块 - SM_UserModule core.M_Modules = "user" //用户模块 - SM_PackModule core.M_Modules = "pack" //背包模块 - SM_MailModule core.M_Modules = "mail" //邮件模块 - SM_FriendModule core.M_Modules = "friend" //好友模块 - //定义自己的模块名称 - ) - ``` -- 创建自己的模块目录 在modules/ 目录下创建自己的模块目录 -- 定义自己的模块对象 在modules/自己的模块目录 创建module.go 文件 - ```go - package pack //修改成自己的包名 - - import ( - "go_dreamfactory/comm" - "go_dreamfactory/modules" - - "go_dreamfactory/lego/core" - ) - - /* - 模块名:Pack - 描述:背包系统模块 - 开发:李伟 - */ - func NewModule() core.IModule { - m := new(Pack) - return m - } - - //换成自己的模块对象 - type Pack struct { - modules.ModuleBase - } - - func (this *Pack) GetType() core.M_Modules { - return comm.SM_PackModule //换成自己的模块名 - } - - //组装组件的地方 - func (this *Pack) OnInstallComp() { - this.ModuleBase.OnInstallComp() - } - - ``` -- 定义自己的用户请求处理组件 在modules/自己的模块目录 创建api_comp.go 文件 名字根据自己的情况定义, - ```go - //api_comp.go - //必须继承 modules.MComp_GateComp - type Api_Comp struct { - modules.MComp_GateComp - } - - //定义具体的消息处理函数 - func (this *Api_Comp) QueryUserPackReq(ctx context.Context, session comm.IUserSession, req *pb.QueryUserPackReq) (err error) { - - } - - - - //将组件装备到模块上 - //module.go - func (this *Pack) OnInstallComp() { - this.ModuleBase.OnInstallComp() - this.api_comp = this.RegisterComp(new(Api_Comp)).(*Api_Comp) - } - ``` -- 定义自己的协议 在pb/proto 下定义自己的proto协议文件,并将文件加入到pb_2.7.py编译工具中 - ```py - buildProto('./pb/proto','./pb','comm') - buildProto('./pb/proto','./pb','errorcode') - buildProto('./pb/proto','./pb','user_db') - buildProto('./pb/proto','./pb','user_msg') - buildProto('./pb/proto','./pb','pack_db') - buildProto('./pb/proto','./pb','pack_msg') - //加入知己的协议文件 - ``` -- 将模块装备到worker服务容器中 services/worker/main.go - ```go - func main() { - flag.Parse() - s := NewService( - rpcx.SetConfPath(*conf), - rpcx.SetVersion("1.0.0.0"), - ) - s.OnInstallComp( //装备组件 - services.NewGateRouteComp(), - ) - lego.Run(s, //运行模块 - // web.NewModule(), - user.NewModule(), - pack.NewModule(), - mail.NewModule(), - //在这里吧自己的模块注册进去 - ) - - } - ``` - -### 服务容器的定义说明 -- service 容器定义 项目可更具情况组合拆分业务到具体的服务中 -```go -var ( - conf = flag.String("conf", "./conf/worker.yaml", "获取需要启动的服务配置文件") //启动服务的Id -) - -func main() { - flag.Parse() - s := NewService( - rpcx.SetConfPath(*conf), //指定配置文件路径 - rpcx.SetVersion("1.0.0.0"), //设置服务版本号 - ) - //每一个服务都只是一个进程容器,服务提供的功能业务取决服务绑定了哪些组件和模块 - s.OnInstallComp( //装备组件 - services.NewGateRouteComp(), //装备接收用户消息的组件 - ) - lego.Run(s, //装备模块 - user.NewModule(), //装备用户模块 - pack.NewModule(), //装备背包模块 - mail.NewModule(), //装备邮件模块 - ) - -} - -func NewService(ops ...rpcx.Option) core.IService { - s := new(Service) - s.Configure(ops...) - return s -} - -//继承基础服务对象 方便后期通用接口的扩展 -type Service struct { - services.ServiceBase -} - -//初始化服务中需要用到的系统工具 -func (this *Service) InitSys() { - this.ServiceBase.InitSys() - //初始化缓存系统 - if err := cache.OnInit(this.GetSettings().Sys["cache"]); err != nil { - panic(fmt.Sprintf("init sys.cache err: %s", err.Error())) - } else { - log.Infof("init sys.cache success!") - } - //初始化存储系统 - if err := db.OnInit(this.GetSettings().Sys["db"]); err != nil { - panic(fmt.Sprintf("init sys.db err: %s", err.Error())) - } else { - log.Infof("init sys.db success!") - } -} -``` - -### 模块容器的定义说明 -pack/module.go 每个模块目录必须有一个模块对象 一般将相同类型的业务分为一个模块中实现, 模块也只是一个容器,模块中具体实现功能的单元为模块组件对象,模块启动是装备具体的业务组件集合实现相关的业务功能 -```go -func NewModule() core.IModule { - m := new(Pack) - return m -} - -//模块结构对象 继承基础模块对象 -type Pack struct { - modules.ModuleBase - api_comp *Api_Comp - configure_comp *Configure_Comp -} - -//模块类型确定 模块名称 -func (this *Pack) GetType() core.M_Modules { - return comm.SM_PackModule -} -//服务启动相关模块时调用的接口 -func (this *Pack) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) { - err = this.ModuleBase.Init(service, module, options) - return -} -//模块启动时装备业务组件 可更具自己的业务划分 自己定义业务组件 -func (this *Pack) OnInstallComp() { - this.ModuleBase.OnInstallComp() - //用户背包相关协议处理组件 - this.api_comp = this.RegisterComp(new(Api_Comp)).(*Api_Comp) - //背包系统的相关配置管理组件 - this.configure_comp = this.RegisterComp(new(Configure_Comp)).(*Configure_Comp) -} -``` -pack/api_comp.go 背包用户api组件 用于接收处理用具背包请求协议 -```go -//Api 组件继承于模块网关组件 modules.MComp_GateComp 内部通过反射机制将当前组件内处理协议的接口注册到 comp_gateroute.go 网关服务组件中,之后服务组件就可以把接收到的用户消息分发给当前api组件 -type Api_Comp struct { - modules.MComp_GateComp - module *Pack -} -//每个组件都可以通过组件初始化接口拿到 当前服务容器对象以及所属的模块对象 -func (this *Api_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { - this.MComp_GateComp.Init(service, module, comp, options) - this.module = module.(*Pack) - return -} - -///查询用户背包数据 处理用处的消息接口 统一 func(ctx context.Context, session comm.IUserSession, req 消息结构对象)(err error)/////ctx 上下文控制器 可以管理方法调用中生成的多个协程 以及传递一些元数据 -//session 用户的会话对象 可以使用此对象给用户发送消息以及关闭用户的连接对象 -//req 协议处理函数接收到具体消息数据对象 -func (this *Api_Comp) QueryUserPackReq(ctx context.Context, session comm.IUserSession, req *pb.QueryUserPackReq) (err error) { - var ( - code pb.ErrorCode - pack *pb.DB_UserPackData - grids []*pb.DB_GridData - ) - defer func() { - session.SendMsg(string(this.module.GetType()), "queryuserpackresp", code, &pb.QueryUserPackResp{Grids: grids}) - }() - if !session.IsLogin() { - code = pb.ErrorCode_NoLogin - return - } - //通过缓存读取到用户的背包信息 - if pack, err = cache.Defsys.Pack_QueryUserPack(session.GetUserId()); err != nil { - log.Errorf("QueryUserPackReq err:%v", err) - code = pb.ErrorCode_CacheReadError - return - } else { - grids = this.module.configure_comp.GetPackItemByType(pack, req.IType) - } - return -} -``` - -### 客户端发送消息到服务器处理的流程介绍 - - 前后端通用协议结构定义 pb/proto/comm.proto - ```go - message UserMessage { - string MainType =1; //消息的主id 默认为具体的业务模块名称 - string SubType = 2; //消息的副id 默认为具体处理消息的函数名 - ErrorCode Code = 3; //服务端回应客户端返回的错误码 pb/proto/errorcode.proto 定义 - bytes Data = 4; //具体的消息体内容 - } - ``` - - 网关服务接收用户协议的处理 modules/gateway/agent.go - ```go - //用户代理对象 - type Agent struct { - gateway IGateway - wsConn *websocket.Conn - sessionId string - uId string - writeChan chan *pb.UserMessage - closeSignal chan bool - state int32 //状态 0 关闭 1 运行 2 关闭中 - wg sync.WaitGroup - } - - //读取用户发送过来的消息 - func (this *Agent) readLoop() { - defer this.wg.Done() - var ( - data []byte - msg *pb.UserMessage = &pb.UserMessage{} - err error - ) - locp: - for { - if _, data, err = this.wsConn.ReadMessage(); err != nil { - log.Errorf("agent:%s uId:%s ReadMessage err:%v", this.sessionId, this.uId, err) - go this.Close() - break locp - } - //读取消息 并序列化到 UserMessage 结构对象 - if err = proto.Unmarshal(data, msg); err != nil { - log.Errorf("agent:%s uId:%s Unmarshal err:%v", this.sessionId, this.uId, err) - go this.Close() - break locp - } else { - //分发处理用户的消息请求 - this.messageDistribution(msg) - } - } - log.Debugf("agent:%s uId:%s readLoop end!", this.sessionId, this.uId) - } - - //分发用户消息 将用户消息采用负载的方式分到到 worker 类型的服务 - //此处后期会补充设计用户协议分化系统,暂时采用负载分发实现基本功能线 - func (this *Agent) messageDistribution(msg *pb.UserMessage) { - reply := &pb.RPCMessageReply{} - log.Debugf("agent:%s uId:%s MessageDistribution msg:%s.%s", this.sessionId, this.uId, msg.MainType, msg.SubType) - if err := this.gateway.Service().RpcCallByType("worker", string(comm.Rpc_GatewayRoute), context.Background(), &pb.AgentMessage{ - Ip: this.IP(), - UserSessionId: this.sessionId, - UserId: this.uId, - GatewayServiceId: this.gateway.Service().GetId(), - Method: fmt.Sprintf("%s.%s", msg.MainType, msg.SubType), - Message: msg.Data, - }, reply); err != nil { - log.Errorf("agent:%s uId:%s MessageDistribution err:%v", this.sessionId, this.uId, err) - } else { - log.Debugf("agent:%s uId:%s MessageDistribution reply:%v", this.sessionId, this.uId, reply) - } - } - - ``` - - Worker 服务如何接受用户的消息 services/comp_gateroute.go - ```go - //网关服务组件启动时会注册Rpc方法 接收网关服务那边发过来的用户消息 - func (this *SComp_GateRouteComp) Start() (err error) { - this.service.RegisterFunctionName(string(comm.Rpc_GatewayRoute), this.ReceiveMsg) //注册网关路由接收接口 - err = this.ServiceCompBase.Start() - return - } - - //具体接收用户消息的方法 此处消息分发需要当前服务内有模块注册处理消息的接口 没有注册的话 此处会打印错误日志 - //msghandles 为消息处理函数的管理对象 - func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.AgentMessage, reply *pb.RPCMessageReply) error { - log.Debugf("SComp_GateRouteComp ReceiveMsg agent:%s uId:%d MessageDistribution msg:%s", args.UserSessionId, args.UserId, args.Method) - this.mrlock.RLock() - msghandle, ok := this.msghandles[args.Method] - this.mrlock.RUnlock() - if ok { - session := comm.NewUserSession(this.service, args.Ip, args.UserSessionId, args.GatewayServiceId, args.UserId) - msg := reflect.New(msghandle.msgType.Elem()).Interface() - if err := proto.Unmarshal(args.Message, msg.(proto.Message)); err != nil { - log.Errorf("UserMessage:%s Unmarshal err:%v", args.Method, err) - return err - } - msghandle.fn.Func.Call([]reflect.Value{msghandle.rcvr, reflect.ValueOf(ctx), reflect.ValueOf(session), reflect.ValueOf(msg)}) - } else { - reply.Code = pb.ErrorCode_ReqParameterError - // reply.Msg = pb.GetErrorCodeMsg(pb.ErrorCode_ReqParameterError) - } - return nil - } - ``` - - 业务模块 如何接受处理用户的消息 - ```go - #模块需要接受处理用户发送过来的消息需要装备一个继承modules/gate_comp.go 的模块网关组件,此组件启动是会自动把自身定义的消息处理函数注册到上面提到的服务网关组件services/comp_gateroute.go 中去,实现可以参考pack/api_comp.go 的定义 - /* - 模块 网关组件 接收处理用户传递消息 - */ - type MComp_GateComp struct { - cbase.ModuleCompBase - service base.IRPCXService - module core.IModule - comp core.IModuleComp - } - //组件启动是会通过反射把当前组件上定义的用户处理函数 统一注册到当前服务容器中的网关组件中 - func (this *MComp_GateComp) Start() (err error) { - if err = this.ModuleCompBase.Start(); err != nil { - return - } - var comp core.IServiceComp - //注册远程路由 - if comp, err = this.service.GetComp(comm.SC_ServiceGateRouteComp); err != nil { - return - } - this.suitableMethods(comp.(comm.ISC_GateRouteComp), reflect.TypeOf(this.comp)) - return - } - - //反射注册消息处理函数 - func (this *MComp_GateComp) suitableMethods(scomp comm.ISC_GateRouteComp, typ reflect.Type) { - for m := 0; m < typ.NumMethod(); m++ { - method := typ.Method(m) - mtype := method.Type - mname := method.Name - // Method must be exported. - if method.PkgPath != "" { - continue - } - // Method needs four ins: receiver, context.Context, *args, *reply. - if mtype.NumIn() != 4 { - continue - } - // First arg must be context.Context - ctxType := mtype.In(1) - if !ctxType.Implements(typeOfContext) { - continue - } - - // Second arg need not be a pointer. - argType := mtype.In(2) - if !argType.Implements(typeOfSession) { - continue - } - // Third arg must be a pointer. - replyType := mtype.In(3) - if replyType.Kind() != reflect.Ptr { - continue - } - // Reply type must be exported. - if !this.isExportedOrBuiltinType(replyType) { - continue - } - // Method needs one out. - if mtype.NumOut() != 1 { - continue - } - // The return type of the method must be error. - if returnType := mtype.Out(0); returnType != typeOfError { - continue - } - scomp.RegisterRoute(fmt.Sprintf("%s.%s", this.module.GetType(), strings.ToLower(mname)), reflect.ValueOf(this.comp), replyType, method) - } - } - ``` - - 消息处理函数如何给用户回复消息 - ```go - //每一个消息处理函数我们都会传递一个 IUserSession的对象 开发人员可以使用此对象向当前用户回复消息以及关闭连接等操作 - type IUserSession interface { - GetSessionId() string - GetUserId() string - GetIP() string - GetGatewayServiceId() string - IsLogin() bool - Build(uid string) (err error) - UnBuild(ServiceMethod string, msg proto.Message) (err error) - SendMsg(mainType, subType string, code pb.ErrorCode, msg proto.Message) (err error) - Close() (err error) - ToString() string - } - - - ///查询用户背包数据 - func (this *Api_Comp) QueryUserPackReq(ctx context.Context, session comm.IUserSession, req *pb.QueryUserPackReq) (err error) { - var ( - code pb.ErrorCode - pack *pb.DB_UserPackData - grids []*pb.DB_GridData - ) - defer func() { - session.SendMsg(string(this.module.GetType()), "queryuserpackresp", code, &pb.QueryUserPackResp{Grids: grids}) - }() - if !session.IsLogin() { - code = pb.ErrorCode_NoLogin - return - } - if pack, err = cache.Defsys.Pack_QueryUserPack(session.GetUserId()); err != nil { - log.Errorf("QueryUserPackReq err:%v", err) - code = pb.ErrorCode_CacheReadError - return - } else { - grids = this.module.configure_comp.GetPackItemByType(pack, req.IType) - } - return - } - ``` - - 消息处理函数如何给其他用户发送消息 - ```go - modules/modulebase.go 是所有业务模块的基础模块,大家在定义业务模块时注意继承此模块,此模块封装有给指定用户发送消息以及给多个用户同时发送消息的接口,后续一些通用的接口都会封装到这一层 方便上层业务模块快速调用,组件可以在Init方法中通过断言的方式获取到模块对象,在调用底层接口即可 - - //向目标用户发送消息 - func (this *ModuleBase) SendMsgToUser(mainType, subType string, msg proto.Message, user *pb.Cache_UserData) (err error) { - reply := &pb.RPCMessageReply{} - data, _ := proto.Marshal(msg) - if _, err = this.service.RpcGoById(user.GatewayServiceId, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentSendMessageReq{ - UserSessionId: user.SessionId, - MainType: mainType, - SubType: subType, - Data: data, - }, reply); err != nil { - log.Errorf("SendMsgToUser%d:%s [%s.%s] err:%v", user.UserData.UserId, user.SessionId, mainType, subType, err) - } - return - } - //向多个目标用户发送消息 - func (this *ModuleBase) SendMsgToUsers(mainType, subType string, msg proto.Message, user ...*pb.Cache_UserData) (err error) { - var ( - gateways map[string][]string = make(map[string][]string) - gateway []string - ok bool - ) - for _, v := range user { - if gateway, ok = gateways[v.GatewayServiceId]; !ok { - gateway = make([]string, 0) - gateways[v.GatewayServiceId] = gateway - } - gateway = append(gateway, v.SessionId) - } - reply := &pb.RPCMessageReply{} - data, _ := proto.Marshal(msg) - for k, v := range gateways { - if _, err = this.service.RpcGoById(k, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.BatchMessageReq{ - UserSessionIds: v, - MainType: mainType, - SubType: subType, - Data: data, - }, reply); err != nil { - log.Errorf("SendMsgToUsers:%s->%s.%s err:%v", k, mainType, subType, err) - } - } - return - } - ``` - -### lego 微服务构架 服务发现 registry 注册表系统 -- 服务节点定义 registry/core.go -```go - //服务发现系统 定义每一个服务节点的结构对象 - ServiceNode struct { - Tag string `json:"Tag"` //服务集群标签 - Type string `json:"Type"` //服务类型 - Category core.S_Category `json:"Category"` //服务类别 - Id string `json:"Id"` //服务Id - Version string `json:"Version"` //服务版本 - IP string `json:"Ip"` //服务Ip - Port int `json:"Port"` //端口 - RpcId string `json:"RpcId"` //服务通信Id - PreWeight float64 `json:"PreWeight"` //服务负载权重 - } -``` -- 集群服务启动时 底层会默认启动 registry 系统 实现集群内的服务节点互相发现 -```go - // lego/base/rpcx/service.go - func (this *RPCXService) InitSys() { - //初始化日志系统 - if err := log.OnInit(this.opts.Setting.Sys["log"]); err != nil { - panic(fmt.Sprintf("Sys log Init err:%v", err)) - } else { - log.Infof("Sys log Init success !") - } - //初始化事件系统 - if err := event.OnInit(this.opts.Setting.Sys["event"]); err != nil { - log.Panicf(fmt.Sprintf("Sys event Init err:%v", err)) - } else { - log.Infof("Sys event Init success !") - } - //初始化注册表系统 - if err := registry.OnInit(this.opts.Setting.Sys["registry"], registry.SetService(this.rpcxService), registry.SetListener(this.rpcxService.(registry.IListener))); err != nil { - log.Panicf(fmt.Sprintf("Sys registry Init err:%v", err)) - } else { - log.Infof("Sys registry Init success !") - } - //初始化rpcx 系统 - if err := rpcx.OnInit(this.opts.Setting.Sys["rpcx"], rpcx.SetServiceId(this.GetId()), rpcx.SetPort(this.GetPort())); err != nil { - log.Panicf(fmt.Sprintf("Sys rpcx Init err:%v", err)) - } else { - log.Infof("Sys rpcx Init success !") - } - //监听服务初始化完毕后 启动注册表和rpc服务 - event.Register(core.Event_ServiceStartEnd, func() { //阻塞 先注册服务集群 保证其他服务能及时发现 - if err := rpcx.Start(); err != nil { - log.Panicf(fmt.Sprintf("Sys rpcx Start err:%v", err)) - } - if err := registry.Start(); err != nil { - log.Panicf(fmt.Sprintf("Sys registry Start err:%v", err)) - } - }) - } - - //registry/coire.go 注册表系统的接口定义 - ISys interface { - ///启动注册表系统 向第三方服务注册当前服务信息 - Start() error - ///停止注册表系统 向第三方服务注销当前服务 - Stop() error - ///推送当前服务信息到 第三方服务 - PushServiceInfo() (err error) - ///通过id 获取集群内服务的节点信息 - GetServiceById(sId string) (n *ServiceNode, err error) - ///通过服务类型 获取当前集群下的所有服务节点信息 - GetServiceByType(sType string) (n []*ServiceNode) - ///获取当前集群下所有的服务节点信息 - GetAllServices() (n []*ServiceNode) - ///通过服务类别 获取当前集群下的所有服务节点信息 - GetServiceByCategory(category core.S_Category) (n []*ServiceNode) - ///通过RPC接口获取 提供此接口的所有服务节点信息 - GetRpcSubById(rId core.Rpc_Key) (n []*ServiceNode) - } -``` -- registry 发现集群下服务变动 通知 service -```go - //registry/options.go 启动是需要设置监听对象 - registry.SetListener(this.rpcxService.(registry.IListener)) - - // /registry/coire.go 监听注册表发送的集群变更信息 - IListener interface { - FindServiceHandlefunc(snode ServiceNode) - UpDataServiceHandlefunc(snode ServiceNode) - LoseServiceHandlefunc(sId string) - } - - - // lego/base/rpcx/service.go 服务对象实现上面三个接口 既可以接收到集群变化事件 - //注册服务会话 当有新的服务加入时 - func (this *RPCXService) FindServiceHandlefunc(node registry.ServiceNode) { - this.lock.Lock() - if _, ok := this.serverList.Load(node.Id); !ok { - if s, err := NewServiceSession(&node); err != nil { - log.Errorf("创建服务会话失败【%s】 err:%v", node.Id, err) - } else { - this.serverList.Store(node.Id, s) - } - } - this.lock.Unlock() - if this.IsInClustered { - event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件 - } else { - if node.Id == this.opts.Setting.Id { //发现自己 加入集群成功 - this.IsInClustered = true - event.TriggerEvent(core.Event_RegistryStart) - } - } - } - - //更新服务会话 当有新的服务加入时 - func (this *RPCXService) UpDataServiceHandlefunc(node registry.ServiceNode) { - if ss, ok := this.serverList.Load(node.Id); ok { //已经在缓存中 需要更新节点信息 - session := ss.(base.IRPCXServiceSession) - if session.GetRpcId() != node.RpcId { - if s, err := NewServiceSession(&node); err != nil { - log.Errorf("更新服务会话失败【%s】 err:%v", node.Id, err) - } else { - this.serverList.Store(node.Id, s) - } - event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件 - } else { - if session.GetVersion() != node.Version { - session.SetVersion(node.Version) - } - if session.GetPreWeight() != node.PreWeight { - session.SetPreWeight(node.PreWeight) - } - event.TriggerEvent(core.Event_UpDataOldService, node) //触发发现新的服务事件 - } - } - } - - //注销服务会话 - func (this *RPCXService) LoseServiceHandlefunc(sId string) { - this.lock.Lock() - session, ok := this.serverList.Load(sId) - if ok && session != nil { - session.(base.IRPCXServiceSession).Done() - this.serverList.Delete(sId) - } - this.lock.Unlock() - event.TriggerEvent(core.Event_LoseService, sId) //触发发现新的服务事件 - } - -``` - -### lego 微服务构架 服务通信 rpcx -- 服务启动时启动rpcx服务 提供rpcx服务监听 -```go - //rpcx 系统接口定义 - ISys interface { - IRPCXServer - NewRpcClient(addr, sId string) (clent IRPCXClient, err error) //创建rpc客户端对象 - } - - IRPCXServer interface { - Start() (err error) //启动rpcx服务 - Stop() (err error) //停止rpcx服务 - Register(rcvr interface{}) (err error) //注册rpc服务接口对象 - RegisterFunction(fn interface{}) (err error) //注册rpc服务接口 - RegisterFunctionName(name string, fn interface{}) (err error) //注册rpc服务接口 - UnregisterAll() (err error) //注销全部rpc服务接口 - } - //rpc客户端对象接口定义 - IRPCXClient interface { - Stop() (err error) - Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error - Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (*client.Call, error) - } - - //lego/sys/rpcx/service.go - /*RPC 服务对象*/ - type Service struct { - server *server.Server - options Options - } - - func (this *Service) Start() (err error) { - go this.server.Serve("tcp", fmt.Sprintf(":%d", this.options.Port)) - return - } - - func (this *Service) Stop() (err error) { - err = this.server.Close() - return - } - - func (this *Service) Register(rcvr interface{}) (err error) { - err = this.server.RegisterName(this.options.ServiceId, rcvr, "") - return - } - func (this *Service) RegisterFunction(fn interface{}) (err error) { - err = this.server.RegisterFunction(this.options.ServiceId, fn, "") - return - } - func (this *Service) RegisterFunctionName(name string, fn interface{}) (err error) { - err = this.server.RegisterFunctionName(this.options.ServiceId, name, fn, "") - return - } - func (this *Service) UnregisterAll() (err error) { - err = this.server.UnregisterAll() - return - } -``` -- 发现新的服务节点信息是 创建服务会话对象 lego/base/rpcx/servicesession.go - ```go - //注册服务会话 当有新的服务加入时 lego/base/rpcx/service.go - func (this *RPCXService) FindServiceHandlefunc(node registry.ServiceNode) { - this.lock.Lock() - if _, ok := this.serverList.Load(node.Id); !ok { - if s, err := NewServiceSession(&node); err != nil { - log.Errorf("创建服务会话失败【%s】 err:%v", node.Id, err) - } else { - this.serverList.Store(node.Id, s) - } - } - this.lock.Unlock() - if this.IsInClustered { - event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件 - } else { - if node.Id == this.opts.Setting.Id { //发现自己 加入集群成功 - this.IsInClustered = true - event.TriggerEvent(core.Event_RegistryStart) - } - } - } - - //创建服务会话 lego/base/rpcx/servicesession.go - func NewServiceSession(node *registry.ServiceNode) (ss base.IRPCXServiceSession, err error) { - session := new(ServiceSession) - session.node = node - session.client, err = rpcx.NewRpcClient(fmt.Sprintf("%s:%d", node.IP, node.Port), node.Id) - ss = session - return - } - - ``` -- service对象注册rpc服务接口 和调用其他服务的rpc接口 lego/base/rpcx/service.go - ```go - //注册服务对象 - func (this *RPCXService) Register(rcvr interface{}) (err error) { - err = rpcx.Register(rcvr) - return - } - - //注册服务方法 - func (this *RPCXService) RegisterFunction(fn interface{}) (err error) { - err = rpcx.RegisterFunction(fn) - return - } - - //注册服务方法 自定义方法名称 - func (this *RPCXService) RegisterFunctionName(name string, fn interface{}) (err error) { - err = rpcx.RegisterFunctionName(name, fn) - return - } - - //同步 执行目标远程服务方法 - func (this *RPCXService) RpcCallById(sId string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (err error) { - defer lego.Recover(fmt.Sprintf("RpcCallById sId:%s rkey:%v arg %v", sId, serviceMethod, args)) - this.lock.RLock() - defer this.lock.RUnlock() - ss, ok := this.serverList.Load(sId) - if !ok { - if node, err := registry.GetServiceById(sId); err != nil { - log.Errorf("未找到目标服务【%s】节点 err:%v", sId, err) - return fmt.Errorf("No Found " + sId) - } else { - ss, err = NewServiceSession(node) - if err != nil { - return fmt.Errorf(fmt.Sprintf("创建服务会话失败【%s】 err:%v", sId, err)) - } else { - this.serverList.Store(node.Id, ss) - } - } - } - err = ss.(base.IRPCXServiceSession).Call(ctx, serviceMethod, args, reply) - return - } - - //异步 执行目标远程服务方法 - func (this *RPCXService) RpcGoById(sId string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (call *client.Call, err error) { - defer lego.Recover(fmt.Sprintf("RpcGoById sId:%s rkey:%v arg %v", sId, serviceMethod, args)) - this.lock.RLock() - defer this.lock.RUnlock() - ss, ok := this.serverList.Load(sId) - if !ok { - if node, err := registry.GetServiceById(sId); err != nil { - log.Errorf("未找到目标服务【%s】节点 err:%v", sId, err) - return nil, fmt.Errorf("No Found " + sId) - } else { - ss, err = NewServiceSession(node) - if err != nil { - return nil, fmt.Errorf(fmt.Sprintf("创建服务会话失败【%s】 err:%v", sId, err)) - } else { - this.serverList.Store(node.Id, ss) - } - } - } - call, err = ss.(base.IRPCXServiceSession).Go(ctx, serviceMethod, args, reply) - return - } - - - /* - DefauleRpcRouteRules 此方法采用的是默认负载方式 版本信息和权重信息排序 找到最佳服务节点 - 新版本正在偶合rpcx系统中的 selector 设计 支持 随机 轮询 权重 和 网络质量 等 服务策略 - */ - func (this *RPCXService) RpcCallByType(sType string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (err error) { - defer lego.Recover(fmt.Sprintf("RpcCallByType sType:%s rkey:%s arg %v", sType, serviceMethod, args)) - this.lock.RLock() - defer this.lock.RUnlock() - ss, err := this.rpcxService.DefauleRpcRouteRules(sType, core.AutoIp) - if err != nil { - log.Errorf("未找到目标服务【%s】节点 err:%v", sType, err) - return err - } - err = ss.Call(ctx, serviceMethod, args, reply) - return - } - - func (this *RPCXService) RpcGoByType(sType string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (call *client.Call, err error) { - defer lego.Recover(fmt.Sprintf("RpcCallByType sType:%s rkey:%s arg %v", sType, serviceMethod, args)) - this.lock.RLock() - defer this.lock.RUnlock() - ss, err := this.rpcxService.DefauleRpcRouteRules(sType, core.AutoIp) - if err != nil { - log.Errorf("未找到目标服务【%s】节点 err:%v", sType, err) - return nil, err - } - call, err = ss.Go(ctx, serviceMethod, args, reply) - return - } - ``` \ No newline at end of file diff --git a/bin/cmd b/bin/cmd new file mode 100755 index 000000000..584652188 Binary files /dev/null and b/bin/cmd differ diff --git a/bin/gateway b/bin/gateway index a43147fdb..77d8a00e9 100755 Binary files a/bin/gateway and b/bin/gateway differ diff --git a/bin/gmcorr.json b/bin/gmcorr.json new file mode 100644 index 000000000..1e25fb230 --- /dev/null +++ b/bin/gmcorr.json @@ -0,0 +1,27 @@ +{ + "AreaId": "dfcross_1", + "Loglevel": 5, + "MaxAgeTime": 7, + "ConsulAddr": [ + "10.0.0.9:10012" + ], + "IsCross": true, + "BelongCrossServerId": "dfcross_1", + "LoaclDB": { + "RedisIsCluster": false, + "RedisAddr": [ + "10.0.0.9:10011" + ], + "RedisPassword": "li13451234", + "RedisDB": 11, + "MongodbUrl": "mongodb://10.0.0.9:10013", + "MongodbDatabase": "dreamfactory11" + }, + "Mainte": "127.0.0.1:9571", + "MaintePort": 8001, + "Gateways": [], + "GatewayPorts": [], + "Workers": [ + "127.0.0.1:9570" + ] +} \ No newline at end of file diff --git a/bin/mainte b/bin/mainte index 05db630b5..08144d2f0 100755 Binary files a/bin/mainte and b/bin/mainte differ diff --git a/bin/start.sh b/bin/start.sh index 8d3862fe6..f0ca97539 100755 --- a/bin/start.sh +++ b/bin/start.sh @@ -1,7 +1,9 @@ -./stup.sh start worker_cross worker ./conf/worker_cross.yaml +./stup.sh start dfcross_1_worker0 worker ./conf/dfcross_1_worker0.yaml sleep 1 -./stup.sh start mainte mainte ./conf/mainte.yaml +./stup.sh start dfcross_1_mainte mainte ./conf/dfcross_1_mainte.yaml sleep 1 -./stup.sh start worker_1 worker ./conf/worker_1.yaml +./stup.sh start df01_mainte mainte ./conf/df01_mainte.yaml sleep 1 -./stup.sh start gateway_1 gateway ./conf/gateway_1.yaml +./stup.sh start df01_worker0 worker ./conf/df01_worker0.yaml +sleep 1 +./stup.sh start df01_gateway0 gateway ./conf/df01_gateway0.yaml diff --git a/bin/stop.sh b/bin/stop.sh index f4f421867..dae93470b 100755 --- a/bin/stop.sh +++ b/bin/stop.sh @@ -1,7 +1,9 @@ -./stup.sh stop gateway_1 +./stup.sh stop df01_gateway0 -./stup.sh stop mainte +./stup.sh stop df01_mainte -./stup.sh stop worker_1 +./stup.sh stop df01_worker0 -./stup.sh stop worker_cross +./stup.sh stop dfcross_1_worker0 + +./stup.sh stop dfcross_1_mainte diff --git a/bin/stup.sh b/bin/stup.sh index ceed5aff4..4f98e0eca 100755 --- a/bin/stup.sh +++ b/bin/stup.sh @@ -7,8 +7,8 @@ num=`ps -ef | grep conf | grep $SERVICE | grep -v grep | grep -v /bin/bash | wc -l` if [ $num -eq 0 ] then - # nohup $CMD > /dev/null 2>&1 & - nohup $CMD > $SERVICE.log 2>&1 & + nohup $CMD > /dev/null 2>&1 & + # nohup $CMD > $SERVICE.log 2>&1 & if [ $? -ne 0 ] then echo "start failed, please check the log!" diff --git a/bin/worker b/bin/worker index 66b081d8f..e730094df 100755 Binary files a/bin/worker and b/bin/worker differ