上传项目说明文档补充

This commit is contained in:
liwei1dao 2022-06-08 20:14:55 +08:00
parent 4372d4d283
commit 7e8bafb965

336
README.md
View File

@ -541,4 +541,340 @@ func (this *Api_Comp) QueryUserPackReq(ctx context.Context, session comm.IUserSe
}
return
}
```
### lego 微服务构架 服务发现 registry 注册表系统
- 服务节点定义 registry/core.go
```go
//服务发现系统 定义每一个服务节点的结构对象
ServiceNode struct {
Tag string `json:"Tag"` //服务集群标签
Type string `json:"Type"` //服务类型
Category core.S_Category `json:"Category"` //服务类别
Id string `json:"Id"` //服务Id
Version string `json:"Version"` //服务版本
IP string `json:"Ip"` //服务Ip
Port int `json:"Port"` //端口
RpcId string `json:"RpcId"` //服务通信Id
PreWeight float64 `json:"PreWeight"` //服务负载权重
}
```
- 集群服务启动时 底层会默认启动 registry 系统 实现集群内的服务节点互相发现
```go
// lego/base/rpcx/service.go
func (this *RPCXService) InitSys() {
//初始化日志系统
if err := log.OnInit(this.opts.Setting.Sys["log"]); err != nil {
panic(fmt.Sprintf("Sys log Init err:%v", err))
} else {
log.Infof("Sys log Init success !")
}
//初始化事件系统
if err := event.OnInit(this.opts.Setting.Sys["event"]); err != nil {
log.Panicf(fmt.Sprintf("Sys event Init err:%v", err))
} else {
log.Infof("Sys event Init success !")
}
//初始化注册表系统
if err := registry.OnInit(this.opts.Setting.Sys["registry"], registry.SetService(this.rpcxService), registry.SetListener(this.rpcxService.(registry.IListener))); err != nil {
log.Panicf(fmt.Sprintf("Sys registry Init err:%v", err))
} else {
log.Infof("Sys registry Init success !")
}
//初始化rpcx 系统
if err := rpcx.OnInit(this.opts.Setting.Sys["rpcx"], rpcx.SetServiceId(this.GetId()), rpcx.SetPort(this.GetPort())); err != nil {
log.Panicf(fmt.Sprintf("Sys rpcx Init err:%v", err))
} else {
log.Infof("Sys rpcx Init success !")
}
//监听服务初始化完毕后 启动注册表和rpc服务
event.Register(core.Event_ServiceStartEnd, func() { //阻塞 先注册服务集群 保证其他服务能及时发现
if err := rpcx.Start(); err != nil {
log.Panicf(fmt.Sprintf("Sys rpcx Start err:%v", err))
}
if err := registry.Start(); err != nil {
log.Panicf(fmt.Sprintf("Sys registry Start err:%v", err))
}
})
}
//registry/coire.go 注册表系统的接口定义
ISys interface {
///启动注册表系统 向第三方服务注册当前服务信息
Start() error
///停止注册表系统 向第三方服务注销当前服务
Stop() error
///推送当前服务信息到 第三方服务
PushServiceInfo() (err error)
///通过id 获取集群内服务的节点信息
GetServiceById(sId string) (n *ServiceNode, err error)
///通过服务类型 获取当前集群下的所有服务节点信息
GetServiceByType(sType string) (n []*ServiceNode)
///获取当前集群下所有的服务节点信息
GetAllServices() (n []*ServiceNode)
///通过服务类别 获取当前集群下的所有服务节点信息
GetServiceByCategory(category core.S_Category) (n []*ServiceNode)
///通过RPC接口获取 提供此接口的所有服务节点信息
GetRpcSubById(rId core.Rpc_Key) (n []*ServiceNode)
}
```
- registry 发现集群下服务变动 通知 service
```go
//registry/options.go 启动是需要设置监听对象
registry.SetListener(this.rpcxService.(registry.IListener))
// /registry/coire.go 监听注册表发送的集群变更信息
IListener interface {
FindServiceHandlefunc(snode ServiceNode)
UpDataServiceHandlefunc(snode ServiceNode)
LoseServiceHandlefunc(sId string)
}
// lego/base/rpcx/service.go 服务对象实现上面三个接口 既可以接收到集群变化事件
//注册服务会话 当有新的服务加入时
func (this *RPCXService) FindServiceHandlefunc(node registry.ServiceNode) {
this.lock.Lock()
if _, ok := this.serverList.Load(node.Id); !ok {
if s, err := NewServiceSession(&node); err != nil {
log.Errorf("创建服务会话失败【%s】 err:%v", node.Id, err)
} else {
this.serverList.Store(node.Id, s)
}
}
this.lock.Unlock()
if this.IsInClustered {
event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件
} else {
if node.Id == this.opts.Setting.Id { //发现自己 加入集群成功
this.IsInClustered = true
event.TriggerEvent(core.Event_RegistryStart)
}
}
}
//更新服务会话 当有新的服务加入时
func (this *RPCXService) UpDataServiceHandlefunc(node registry.ServiceNode) {
if ss, ok := this.serverList.Load(node.Id); ok { //已经在缓存中 需要更新节点信息
session := ss.(base.IRPCXServiceSession)
if session.GetRpcId() != node.RpcId {
if s, err := NewServiceSession(&node); err != nil {
log.Errorf("更新服务会话失败【%s】 err:%v", node.Id, err)
} else {
this.serverList.Store(node.Id, s)
}
event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件
} else {
if session.GetVersion() != node.Version {
session.SetVersion(node.Version)
}
if session.GetPreWeight() != node.PreWeight {
session.SetPreWeight(node.PreWeight)
}
event.TriggerEvent(core.Event_UpDataOldService, node) //触发发现新的服务事件
}
}
}
//注销服务会话
func (this *RPCXService) LoseServiceHandlefunc(sId string) {
this.lock.Lock()
session, ok := this.serverList.Load(sId)
if ok && session != nil {
session.(base.IRPCXServiceSession).Done()
this.serverList.Delete(sId)
}
this.lock.Unlock()
event.TriggerEvent(core.Event_LoseService, sId) //触发发现新的服务事件
}
```
### lego 微服务构架 服务通信 rpcx
- 服务启动时启动rpcx服务 提供rpcx服务监听
```go
//rpcx 系统接口定义
ISys interface {
IRPCXServer
NewRpcClient(addr, sId string) (clent IRPCXClient, err error) //创建rpc客户端对象
}
IRPCXServer interface {
Start() (err error) //启动rpcx服务
Stop() (err error) //停止rpcx服务
Register(rcvr interface{}) (err error) //注册rpc服务接口对象
RegisterFunction(fn interface{}) (err error) //注册rpc服务接口
RegisterFunctionName(name string, fn interface{}) (err error) //注册rpc服务接口
UnregisterAll() (err error) //注销全部rpc服务接口
}
//rpc客户端对象接口定义
IRPCXClient interface {
Stop() (err error)
Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *client.Call) (*client.Call, error)
}
//lego/sys/rpcx/service.go
/*RPC 服务对象*/
type Service struct {
server *server.Server
options Options
}
func (this *Service) Start() (err error) {
go this.server.Serve("tcp", fmt.Sprintf(":%d", this.options.Port))
return
}
func (this *Service) Stop() (err error) {
err = this.server.Close()
return
}
func (this *Service) Register(rcvr interface{}) (err error) {
err = this.server.RegisterName(this.options.ServiceId, rcvr, "")
return
}
func (this *Service) RegisterFunction(fn interface{}) (err error) {
err = this.server.RegisterFunction(this.options.ServiceId, fn, "")
return
}
func (this *Service) RegisterFunctionName(name string, fn interface{}) (err error) {
err = this.server.RegisterFunctionName(this.options.ServiceId, name, fn, "")
return
}
func (this *Service) UnregisterAll() (err error) {
err = this.server.UnregisterAll()
return
}
```
- 发现新的服务节点信息是 创建服务会话对象 lego/base/rpcx/servicesession.go
```go
//注册服务会话 当有新的服务加入时 lego/base/rpcx/service.go
func (this *RPCXService) FindServiceHandlefunc(node registry.ServiceNode) {
this.lock.Lock()
if _, ok := this.serverList.Load(node.Id); !ok {
if s, err := NewServiceSession(&node); err != nil {
log.Errorf("创建服务会话失败【%s】 err:%v", node.Id, err)
} else {
this.serverList.Store(node.Id, s)
}
}
this.lock.Unlock()
if this.IsInClustered {
event.TriggerEvent(core.Event_FindNewService, node) //触发发现新的服务事件
} else {
if node.Id == this.opts.Setting.Id { //发现自己 加入集群成功
this.IsInClustered = true
event.TriggerEvent(core.Event_RegistryStart)
}
}
}
//创建服务会话 lego/base/rpcx/servicesession.go
func NewServiceSession(node *registry.ServiceNode) (ss base.IRPCXServiceSession, err error) {
session := new(ServiceSession)
session.node = node
session.client, err = rpcx.NewRpcClient(fmt.Sprintf("%s:%d", node.IP, node.Port), node.Id)
ss = session
return
}
```
- service对象注册rpc服务接口 和调用其他服务的rpc接口 lego/base/rpcx/service.go
```go
//注册服务对象
func (this *RPCXService) Register(rcvr interface{}) (err error) {
err = rpcx.Register(rcvr)
return
}
//注册服务方法
func (this *RPCXService) RegisterFunction(fn interface{}) (err error) {
err = rpcx.RegisterFunction(fn)
return
}
//注册服务方法 自定义方法名称
func (this *RPCXService) RegisterFunctionName(name string, fn interface{}) (err error) {
err = rpcx.RegisterFunctionName(name, fn)
return
}
//同步 执行目标远程服务方法
func (this *RPCXService) RpcCallById(sId string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (err error) {
defer lego.Recover(fmt.Sprintf("RpcCallById sId:%s rkey:%v arg %v", sId, serviceMethod, args))
this.lock.RLock()
defer this.lock.RUnlock()
ss, ok := this.serverList.Load(sId)
if !ok {
if node, err := registry.GetServiceById(sId); err != nil {
log.Errorf("未找到目标服务【%s】节点 err:%v", sId, err)
return fmt.Errorf("No Found " + sId)
} else {
ss, err = NewServiceSession(node)
if err != nil {
return fmt.Errorf(fmt.Sprintf("创建服务会话失败【%s】 err:%v", sId, err))
} else {
this.serverList.Store(node.Id, ss)
}
}
}
err = ss.(base.IRPCXServiceSession).Call(ctx, serviceMethod, args, reply)
return
}
//异步 执行目标远程服务方法
func (this *RPCXService) RpcGoById(sId string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (call *client.Call, err error) {
defer lego.Recover(fmt.Sprintf("RpcGoById sId:%s rkey:%v arg %v", sId, serviceMethod, args))
this.lock.RLock()
defer this.lock.RUnlock()
ss, ok := this.serverList.Load(sId)
if !ok {
if node, err := registry.GetServiceById(sId); err != nil {
log.Errorf("未找到目标服务【%s】节点 err:%v", sId, err)
return nil, fmt.Errorf("No Found " + sId)
} else {
ss, err = NewServiceSession(node)
if err != nil {
return nil, fmt.Errorf(fmt.Sprintf("创建服务会话失败【%s】 err:%v", sId, err))
} else {
this.serverList.Store(node.Id, ss)
}
}
}
call, err = ss.(base.IRPCXServiceSession).Go(ctx, serviceMethod, args, reply)
return
}
/*
DefauleRpcRouteRules 此方法采用的是默认负载方式 版本信息和权重信息排序 找到最佳服务节点
新版本正在偶合rpcx系统中的 selector 设计 支持 随机 轮询 权重 和 网络质量 等 服务策略
*/
func (this *RPCXService) RpcCallByType(sType string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (err error) {
defer lego.Recover(fmt.Sprintf("RpcCallByType sType:%s rkey:%s arg %v", sType, serviceMethod, args))
this.lock.RLock()
defer this.lock.RUnlock()
ss, err := this.rpcxService.DefauleRpcRouteRules(sType, core.AutoIp)
if err != nil {
log.Errorf("未找到目标服务【%s】节点 err:%v", sType, err)
return err
}
err = ss.Call(ctx, serviceMethod, args, reply)
return
}
func (this *RPCXService) RpcGoByType(sType string, serviceMethod string, ctx context.Context, args interface{}, reply interface{}) (call *client.Call, err error) {
defer lego.Recover(fmt.Sprintf("RpcCallByType sType:%s rkey:%s arg %v", sType, serviceMethod, args))
this.lock.RLock()
defer this.lock.RUnlock()
ss, err := this.rpcxService.DefauleRpcRouteRules(sType, core.AutoIp)
if err != nil {
log.Errorf("未找到目标服务【%s】节点 err:%v", sType, err)
return nil, err
}
call, err = ss.Go(ctx, serviceMethod, args, reply)
return
}
```