diff --git a/README.md b/README.md index 2e5ecac12..091a12f13 100644 --- a/README.md +++ b/README.md @@ -541,4 +541,340 @@ func (this *Api_Comp) QueryUserPackReq(ctx context.Context, session comm.IUserSe } return } + ``` + +### lego 微服务构架 服务发现 registry 注册表系统 +- 服务节点定义 registry/core.go +```go + //服务发现系统 定义每一个服务节点的结构对象 + ServiceNode struct { + Tag string `json:"Tag"` //服务集群标签 + Type string `json:"Type"` //服务类型 + Category core.S_Category `json:"Category"` //服务类别 + Id string `json:"Id"` //服务Id + Version string `json:"Version"` //服务版本 + IP string `json:"Ip"` //服务Ip + Port int `json:"Port"` //端口 + RpcId string `json:"RpcId"` //服务通信Id + PreWeight float64 `json:"PreWeight"` //服务负载权重 + } +``` +- 集群服务启动时 底层会默认启动 registry 系统 实现集群内的服务节点互相发现 +```go + // lego/base/rpcx/service.go + func (this *RPCXService) InitSys() { + //初始化日志系统 + if err := log.OnInit(this.opts.Setting.Sys["log"]); err != nil { + panic(fmt.Sprintf("Sys log Init err:%v", err)) + } else { + log.Infof("Sys log Init success !") + } + //初始化事件系统 + if err := event.OnInit(this.opts.Setting.Sys["event"]); err != nil { + log.Panicf(fmt.Sprintf("Sys event Init err:%v", err)) + } else { + log.Infof("Sys event Init success !") + } + //初始化注册表系统 + if err := registry.OnInit(this.opts.Setting.Sys["registry"], registry.SetService(this.rpcxService), registry.SetListener(this.rpcxService.(registry.IListener))); err != nil { + log.Panicf(fmt.Sprintf("Sys registry Init err:%v", err)) + } else { + log.Infof("Sys registry Init success !") + } + //初始化rpcx 系统 + if err := rpcx.OnInit(this.opts.Setting.Sys["rpcx"], rpcx.SetServiceId(this.GetId()), rpcx.SetPort(this.GetPort())); err != nil { + log.Panicf(fmt.Sprintf("Sys rpcx Init err:%v", err)) + } else { + log.Infof("Sys rpcx Init success !") + } + //监听服务初始化完毕后 启动注册表和rpc服务 + event.Register(core.Event_ServiceStartEnd, func() { //阻塞 先注册服务集群 保证其他服务能及时发现 + if err := rpcx.Start(); err != nil { + log.Panicf(fmt.Sprintf("Sys rpcx Start err:%v", err)) + } + if err := registry.Start(); err != nil { + log.Panicf(fmt.Sprintf("Sys registry Start err:%v", err)) + } + }) + } + + //registry/coire.go 注册表系统的接口定义 + ISys interface { + ///启动注册表系统 向第三方服务注册当前服务信息 + Start() error + ///停止注册表系统 向第三方服务注销当前服务 + Stop() error + ///推送当前服务信息到 第三方服务 + PushServiceInfo() (err error) + ///通过id 获取集群内服务的节点信息 + GetServiceById(sId string) (n *ServiceNode, err error) + ///通过服务类型 获取当前集群下的所有服务节点信息 + GetServiceByType(sType string) (n []*ServiceNode) + ///获取当前集群下所有的服务节点信息 + GetAllServices() (n []*ServiceNode) + ///通过服务类别 获取当前集群下的所有服务节点信息 + GetServiceByCategory(category core.S_Category) (n []*ServiceNode) + ///通过RPC接口获取 提供此接口的所有服务节点信息 + GetRpcSubById(rId core.Rpc_Key) (n []*ServiceNode) + } +``` +- registry 发现集群下服务变动 通知 service +```go + //registry/options.go 启动是需要设置监听对象 + registry.SetListener(this.rpcxService.(registry.IListener)) + + // /registry/coire.go 监听注册表发送的集群变更信息 + IListener interface { + FindServiceHandlefunc(snode ServiceNode) + UpDataServiceHandlefunc(snode ServiceNode) + LoseServiceHandlefunc(sId string) + } + + + // lego/base/rpcx/service.go 服务对象实现上面三个接口 既可以接收到集群变化事件 + //注册服务会话 当有新的服务加入时 + func (this *RPCXService) FindServiceHandlefunc(node registry.ServiceNode) { + this.lock.Lock() + if _, ok := this.serverList.Load(node.Id); !ok { + if s, err := NewServiceSession(&node); err != nil { + log.Errorf("创建服务会话失败【%s】 err:%v", node.Id, err) + } else { + this.serverList.Store(node.Id, s) + } + } + this.lock.Unlock() + if this.IsInClustered { + event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件 + } else { + if node.Id == this.opts.Setting.Id { //发现自己 加入集群成功 + this.IsInClustered = true + event.TriggerEvent(core.Event_RegistryStart) + } + } + } + + //更新服务会话 当有新的服务加入时 + func (this *RPCXService) UpDataServiceHandlefunc(node registry.ServiceNode) { + if ss, ok := this.serverList.Load(node.Id); ok { //已经在缓存中 需要更新节点信息 + session := ss.(base.IRPCXServiceSession) + if session.GetRpcId() != node.RpcId { + if s, err := NewServiceSession(&node); err != nil { + log.Errorf("更新服务会话失败【%s】 err:%v", node.Id, err) + } else { + this.serverList.Store(node.Id, s) + } + event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件 + } else { + if session.GetVersion() != node.Version { + session.SetVersion(node.Version) + } + if session.GetPreWeight() != node.PreWeight { + session.SetPreWeight(node.PreWeight) + } + event.TriggerEvent(core.Event_UpDataOldService, node) //触发发现新的服务事件 + } + } + } + + //注销服务会话 + func (this *RPCXService) LoseServiceHandlefunc(sId string) { + this.lock.Lock() + session, ok := this.serverList.Load(sId) + if ok && session != nil { + session.(base.IRPCXServiceSession).Done() + this.serverList.Delete(sId) + } + this.lock.Unlock() + event.TriggerEvent(core.Event_LoseService, sId) //触发发现新的服务事件 + } + +``` + +### lego 微服务构架 服务通信 rpcx +- 服务启动时启动rpcx服务 提供rpcx服务监听 +```go + //rpcx 系统接口定义 + ISys interface { + IRPCXServer + NewRpcClient(addr, sId string) (clent IRPCXClient, err error) //创建rpc客户端对象 + } + + IRPCXServer interface { + Start() (err error) //启动rpcx服务 + Stop() (err error) //停止rpcx服务 + Register(rcvr interface{}) (err error) //注册rpc服务接口对象 + RegisterFunction(fn interface{}) (err error) //注册rpc服务接口 + RegisterFunctionName(name string, fn interface{}) (err error) //注册rpc服务接口 + UnregisterAll() (err error) //注销全部rpc服务接口 + } + //rpc客户端对象接口定义 + IRPCXClient interface { + Stop() (err error) + Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error + Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (*client.Call, error) + } + + //lego/sys/rpcx/service.go + /*RPC 服务对象*/ + type Service struct { + server *server.Server + options Options + } + + func (this *Service) Start() (err error) { + go this.server.Serve("tcp", fmt.Sprintf(":%d", this.options.Port)) + return + } + + func (this *Service) Stop() (err error) { + err = this.server.Close() + return + } + + func (this *Service) Register(rcvr interface{}) (err error) { + err = this.server.RegisterName(this.options.ServiceId, rcvr, "") + return + } + func (this *Service) RegisterFunction(fn interface{}) (err error) { + err = this.server.RegisterFunction(this.options.ServiceId, fn, "") + return + } + func (this *Service) RegisterFunctionName(name string, fn interface{}) (err error) { + err = this.server.RegisterFunctionName(this.options.ServiceId, name, fn, "") + return + } + func (this *Service) UnregisterAll() (err error) { + err = this.server.UnregisterAll() + return + } +``` +- 发现新的服务节点信息是 创建服务会话对象 lego/base/rpcx/servicesession.go + ```go + //注册服务会话 当有新的服务加入时 lego/base/rpcx/service.go + func (this *RPCXService) FindServiceHandlefunc(node registry.ServiceNode) { + this.lock.Lock() + if _, ok := this.serverList.Load(node.Id); !ok { + if s, err := NewServiceSession(&node); err != nil { + log.Errorf("创建服务会话失败【%s】 err:%v", node.Id, err) + } else { + this.serverList.Store(node.Id, s) + } + } + this.lock.Unlock() + if this.IsInClustered { + event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件 + } else { + if node.Id == this.opts.Setting.Id { //发现自己 加入集群成功 + this.IsInClustered = true + event.TriggerEvent(core.Event_RegistryStart) + } + } + } + + //创建服务会话 lego/base/rpcx/servicesession.go + func NewServiceSession(node *registry.ServiceNode) (ss base.IRPCXServiceSession, err error) { + session := new(ServiceSession) + session.node = node + session.client, err = rpcx.NewRpcClient(fmt.Sprintf("%s:%d", node.IP, node.Port), node.Id) + ss = session + return + } + + ``` +- service对象注册rpc服务接口 和调用其他服务的rpc接口 lego/base/rpcx/service.go + ```go + //注册服务对象 + func (this *RPCXService) Register(rcvr interface{}) (err error) { + err = rpcx.Register(rcvr) + return + } + + //注册服务方法 + func (this *RPCXService) RegisterFunction(fn interface{}) (err error) { + err = rpcx.RegisterFunction(fn) + return + } + + //注册服务方法 自定义方法名称 + func (this *RPCXService) RegisterFunctionName(name string, fn interface{}) (err error) { + err = rpcx.RegisterFunctionName(name, fn) + return + } + + //同步 执行目标远程服务方法 + func (this *RPCXService) RpcCallById(sId string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (err error) { + defer lego.Recover(fmt.Sprintf("RpcCallById sId:%s rkey:%v arg %v", sId, serviceMethod, args)) + this.lock.RLock() + defer this.lock.RUnlock() + ss, ok := this.serverList.Load(sId) + if !ok { + if node, err := registry.GetServiceById(sId); err != nil { + log.Errorf("未找到目标服务【%s】节点 err:%v", sId, err) + return fmt.Errorf("No Found " + sId) + } else { + ss, err = NewServiceSession(node) + if err != nil { + return fmt.Errorf(fmt.Sprintf("创建服务会话失败【%s】 err:%v", sId, err)) + } else { + this.serverList.Store(node.Id, ss) + } + } + } + err = ss.(base.IRPCXServiceSession).Call(ctx, serviceMethod, args, reply) + return + } + + //异步 执行目标远程服务方法 + func (this *RPCXService) RpcGoById(sId string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (call *client.Call, err error) { + defer lego.Recover(fmt.Sprintf("RpcGoById sId:%s rkey:%v arg %v", sId, serviceMethod, args)) + this.lock.RLock() + defer this.lock.RUnlock() + ss, ok := this.serverList.Load(sId) + if !ok { + if node, err := registry.GetServiceById(sId); err != nil { + log.Errorf("未找到目标服务【%s】节点 err:%v", sId, err) + return nil, fmt.Errorf("No Found " + sId) + } else { + ss, err = NewServiceSession(node) + if err != nil { + return nil, fmt.Errorf(fmt.Sprintf("创建服务会话失败【%s】 err:%v", sId, err)) + } else { + this.serverList.Store(node.Id, ss) + } + } + } + call, err = ss.(base.IRPCXServiceSession).Go(ctx, serviceMethod, args, reply) + return + } + + + /* + DefauleRpcRouteRules 此方法采用的是默认负载方式 版本信息和权重信息排序 找到最佳服务节点 + 新版本正在偶合rpcx系统中的 selector 设计 支持 随机 轮询 权重 和 网络质量 等 服务策略 + */ + func (this *RPCXService) RpcCallByType(sType string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (err error) { + defer lego.Recover(fmt.Sprintf("RpcCallByType sType:%s rkey:%s arg %v", sType, serviceMethod, args)) + this.lock.RLock() + defer this.lock.RUnlock() + ss, err := this.rpcxService.DefauleRpcRouteRules(sType, core.AutoIp) + if err != nil { + log.Errorf("未找到目标服务【%s】节点 err:%v", sType, err) + return err + } + err = ss.Call(ctx, serviceMethod, args, reply) + return + } + + func (this *RPCXService) RpcGoByType(sType string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (call *client.Call, err error) { + defer lego.Recover(fmt.Sprintf("RpcCallByType sType:%s rkey:%s arg %v", sType, serviceMethod, args)) + this.lock.RLock() + defer this.lock.RUnlock() + ss, err := this.rpcxService.DefauleRpcRouteRules(sType, core.AutoIp) + if err != nil { + log.Errorf("未找到目标服务【%s】节点 err:%v", sType, err) + return nil, err + } + call, err = ss.Go(ctx, serviceMethod, args, reply) + return + } ``` \ No newline at end of file