补充代码注释

This commit is contained in:
liwei1dao 2022-06-08 19:10:43 +08:00
parent de4b3e54ff
commit 0a9e855257
23 changed files with 148 additions and 63 deletions

View File

@ -11,6 +11,10 @@ import (
"google.golang.org/protobuf/proto"
)
/*
用户会话对象 跨服操作用户的对象
*/
func NewUserSession(service base.IRPCXService, ip, sessionId, gatewayServiceId string, uid string) IUserSession {
return &UserSession{
IP: ip,
@ -29,24 +33,32 @@ type UserSession struct {
service base.IRPCXService
}
//获取用户的会话id
func (this *UserSession) GetSessionId() string {
return this.SessionId
}
//获取用户的uid
func (this *UserSession) GetUserId() string {
return this.UserId
}
//获取用户的远程ip地址
func (this *UserSession) GetIP() string {
return this.IP
}
//用户当先所在网关服务
func (this *UserSession) GetGatewayServiceId() string {
return this.GatewayServiceId
}
//是否登录
func (this *UserSession) IsLogin() bool {
return this.UserId != ""
}
//绑定uid 登录后操作
func (this *UserSession) Build(uid string) (err error) {
reply := &pb.RPCMessageReply{}
if err := this.service.RpcCallById(this.GatewayServiceId, string(Rpc_GatewayAgentBuild), context.Background(), &pb.AgentBuildReq{
@ -58,6 +70,7 @@ func (this *UserSession) Build(uid string) (err error) {
return
}
//解绑uid 注销和切换账号是处理
func (this *UserSession) UnBuild(ServiceMethod string, msg proto.Message) (err error) {
reply := &pb.RPCMessageReply{}
if err := this.service.RpcCallById(this.GatewayServiceId, string(Rpc_GatewayAgentUnBuild), context.Background(), &pb.AgentUnBuildReq{
@ -68,6 +81,7 @@ func (this *UserSession) UnBuild(ServiceMethod string, msg proto.Message) (err e
return
}
//向用户发送消息
func (this *UserSession) SendMsg(mainType, subType string, code pb.ErrorCode, msg proto.Message) (err error) {
reply := &pb.RPCMessageReply{}
data, _ := proto.Marshal(msg)
@ -83,6 +97,8 @@ func (this *UserSession) SendMsg(mainType, subType string, code pb.ErrorCode, ms
}
return
}
//关闭用户连接对象
func (this *UserSession) Close() (err error) {
reply := &pb.RPCMessageReply{}
if err := this.service.RpcCallById(this.GatewayServiceId, string(Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentCloseeReq{
@ -93,6 +109,7 @@ func (this *UserSession) Close() (err error) {
return
}
//打印日志需要
func (this *UserSession) ToString() string {
return fmt.Sprintf("SessionId:%s UserId:%s GatewayServiceId:%s", this.SessionId, this.UserId, this.GatewayServiceId)
}

View File

@ -1,17 +1,19 @@
package modules
import (
"go_dreamfactory/pb"
"go_dreamfactory/lego/core"
"go_dreamfactory/pb"
"google.golang.org/protobuf/proto"
)
type (
//业务模块基类接口 定义所有业务模块都可以使用的接口
IModule interface {
core.IModule
///向指定用户发送消息
SendMsgToUser(mainType, subType string, msg proto.Message, user *pb.Cache_UserData) (err error)
///向多个用户发送消息
SendMsgToUsers(mainType, subType string, msg proto.Message, user ...*pb.Cache_UserData) (err error)
}
)

View File

@ -14,6 +14,10 @@ import (
"go_dreamfactory/lego/core/cbase"
)
/*
模块网关组件的基类实现 模块接收用户的消息请求都需要通过装备继承此组件的api组件来实现
*/
var typeOfContext = reflect.TypeOf((*context.Context)(nil)).Elem()
var typeOfSession = reflect.TypeOf((*comm.IUserSession)(nil)).Elem()
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
@ -23,20 +27,21 @@ var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
*/
type MComp_GateComp struct {
cbase.ModuleCompBase
service base.IRPCXService
module core.IModule
comp core.IModuleComp
service base.IRPCXService //rpc服务对象
module core.IModule //当前业务模块
comp core.IModuleComp //网关组件自己
}
//组件初始化接口
func (this *MComp_GateComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
this.ModuleCompBase.Init(service, module, comp, options)
this.service = service.(base.IRPCXService)
this.module = module
this.comp = comp
return
}
//组件启动接口启动时将自己接收用户消息的处理函数注册到services/comp_gateroute.go 对象中
func (this *MComp_GateComp) Start() (err error) {
if err = this.ModuleCompBase.Start(); err != nil {
return
@ -50,6 +55,7 @@ func (this *MComp_GateComp) Start() (err error) {
return
}
//反射注册相关接口道services/comp_gateroute.go 对象中
func (this *MComp_GateComp) suitableMethods(scomp comm.ISC_GateRouteComp, typ reflect.Type) {
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)

View File

@ -15,6 +15,11 @@ import (
"google.golang.org/protobuf/proto"
)
/*
用户代理对象
封装用户socket 对象 处理用户的消息读取 写入 关闭等操作
*/
func newAgent(gateway IGateway, conn *websocket.Conn) *Agent {
agent := &Agent{
gateway: gateway,

View File

@ -9,6 +9,9 @@ import (
"go_dreamfactory/lego/core/cbase"
)
/*
用户代理对象管理组件
*/
type AgentMgr_Comp struct {
cbase.ModuleCompBase
agents *sync.Map
@ -19,9 +22,13 @@ func (this *AgentMgr_Comp) Init(service core.IService, module core.IModule, comp
this.agents = new(sync.Map)
return
}
//加入新的用户
func (this *AgentMgr_Comp) Connect(a IAgent) {
this.agents.Store(a.SessionId(), a)
}
//移除断开的用户
func (this *AgentMgr_Comp) DisConnect(a IAgent) {
this.agents.Delete(a.SessionId())
}

View File

@ -8,6 +8,7 @@ import (
)
type (
//用户代理对象接口定义
IAgent interface {
SessionId() string
IP() string
@ -17,6 +18,7 @@ type (
WriteMsg(msg *pb.UserMessage) (err error)
Close() //主动关闭接口
}
//网关模块 接口定义
IGateway interface {
core.IModule
Service() base.IRPCXService

View File

@ -9,6 +9,11 @@ import (
"go_dreamfactory/lego/sys/log"
)
/*
模块名:gateway
描述:提供客户端网关路由服务 管理用户socket对象 以及分发用户消息
开发:李伟
*/
func NewModule() core.IModule {
m := new(Gateway)
return m
@ -17,39 +22,51 @@ func NewModule() core.IModule {
type Gateway struct {
modules.ModuleBase
service base.IRPCXService
wsservice_comp *WSService_Comp
agentmgr_comp *AgentMgr_Comp
wsservice_comp *WSService_Comp //websocket 服务组件 提供websocket服务监听
agentmgr_comp *AgentMgr_Comp //用户代理对象管理组件 管理用户socekt对象
}
//模块名
func (this *Gateway) GetType() core.M_Modules {
return comm.SM_GateModule
}
//模块自定义参数
func (this *Gateway) NewOptions() (options core.IModuleOptions) {
return new(Options)
}
//提供服务对象获取接口
func (this *Gateway) Service() base.IRPCXService {
return this.service
}
//模块初始化函数
func (this *Gateway) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) {
err = this.ModuleBase.Init(service, module, options)
this.service = service.(base.IRPCXService)
return
}
//模块启动函数 注册rpc服务接口提供用户相关的rpc接口服务
func (this *Gateway) Start() (err error) {
//注册用户绑定uid接口 登录成功后触发
this.service.RegisterFunctionName(string(comm.Rpc_GatewayAgentBuild), this.agentmgr_comp.Build)
//注册用户解绑uid接口 登出或则切换账号是触发
this.service.RegisterFunctionName(string(comm.Rpc_GatewayAgentUnBuild), this.agentmgr_comp.UnBuild)
//向用户发送消息接口
this.service.RegisterFunctionName(string(comm.Rpc_GatewayAgentSendMsg), this.agentmgr_comp.SendMsgToAgent)
//向多个用户对象发送消息接口
this.service.RegisterFunctionName(string(comm.Rpc_GatewaySendBatchMsg), this.agentmgr_comp.SendMsgToAgents)
//向所有用户发送消息接口
this.service.RegisterFunctionName(string(comm.Rpc_GatewaySendRadioMsg), this.agentmgr_comp.SendMsgToAllAgent)
//关闭用户socket连接接口
this.service.RegisterFunctionName(string(comm.Rpc_GatewayAgentClose), this.agentmgr_comp.CloseAgent)
err = this.ModuleBase.Start()
return
}
//装备组件
func (this *Gateway) OnInstallComp() {
this.ModuleBase.OnInstallComp()
this.agentmgr_comp = this.RegisterComp(new(AgentMgr_Comp)).(*AgentMgr_Comp)
@ -62,7 +79,7 @@ func (this *Gateway) Connect(a IAgent) {
this.agentmgr_comp.Connect(a)
}
//有新的连接对象进入
//有用户断开连接
func (this *Gateway) DisConnect(a IAgent) {
log.Debugf("[Module.Gateway] have disConnect:Ip[%s] SessionId:[%s] uid:[%s]", a.IP(), a.SessionId(), a.UserId())
this.agentmgr_comp.DisConnect(a)

View File

@ -4,12 +4,17 @@ import (
"go_dreamfactory/lego/utils/mapstructure"
)
/*
网关模块 参数定义
*/
type (
Options struct {
ListenPort int
ListenPort int //websocket 监听端口
}
)
//序列化参数对象
func (this *Options) LoadConfig(settings map[string]interface{}) (err error) {
if settings != nil {
err = mapstructure.Decode(settings, this)

View File

@ -13,17 +13,22 @@ import (
"google.golang.org/protobuf/proto"
)
/*
基础业务模块实现 封装一些通用的接口提供给业务模块使用
*/
type ModuleBase struct {
cbase.ModuleBase
service base.IRPCXService
}
//模块初始化接口
func (this *ModuleBase) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) {
err = this.ModuleBase.Init(service, module, options)
this.service = service.(base.IRPCXService)
return
}
//向指定用户发送消息
func (this *ModuleBase) SendMsgToUser(mainType, subType string, msg proto.Message, user *pb.Cache_UserData) (err error) {
reply := &pb.RPCMessageReply{}
data, _ := proto.Marshal(msg)
@ -38,6 +43,7 @@ func (this *ModuleBase) SendMsgToUser(mainType, subType string, msg proto.Messag
return
}
//向多个用户发送消息
func (this *ModuleBase) SendMsgToUsers(mainType, subType string, msg proto.Message, user ...*pb.Cache_UserData) (err error) {
var (
gateways map[string][]string = make(map[string][]string)

View File

@ -11,22 +11,22 @@ import (
"go_dreamfactory/lego/sys/log"
)
const (
QueryUserPackReq = "pack.queryuserpackreq"
QueryUserPackResp = "pack.queryuserpackresp"
UseItemReq = "pack.useitemreq"
UseItemResp = "pack.useitemresp"
SellItemReq = "pack.sellitemreq"
SellItemResp = "pack.sellitemresp"
const ( //消息回复的头名称
QueryUserPackResp = "queryuserpackresp"
UseItemResp = "useitemresp"
SellItemResp = "sellitemresp"
)
/*
背包 处理用户的请求组件 必须继承 modules.MComp_GateComp
*/
type Api_Comp struct {
modules.MComp_GateComp
service core.IService
module *Pack
mail comm.Imail
}
//组件初始化接口
func (this *Api_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
this.MComp_GateComp.Init(service, module, comp, options)
this.service = service
@ -34,17 +34,6 @@ func (this *Api_Comp) Init(service core.IService, module core.IModule, comp core
return
}
func (this *Api_Comp) Start() (err error) {
err = this.MComp_GateComp.Start()
var module core.IModule
if module, err = this.service.GetModule(comm.SM_MailModule); err != nil {
return
}
this.mail = module.(comm.Imail)
return
}
///查询用户背包数据
func (this *Api_Comp) QueryUserPackReq(ctx context.Context, session comm.IUserSession, req *pb.QueryUserPackReq) (err error) {
var (

View File

@ -20,6 +20,7 @@ type Configure_Comp struct {
cbase.ModuleCompBase
}
//组件初始化接口
func (this *Configure_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
this.ModuleCompBase.Init(service, module, comp, options)
configure.RegisterConfigure(game_item, cfg.NewGame_item)

View File

@ -22,20 +22,23 @@ func NewModule() core.IModule {
type Pack struct {
modules.ModuleBase
api_comp *Api_Comp
configure_comp *Configure_Comp
api_comp *Api_Comp //背包模块 协议处理组件
configure_comp *Configure_Comp //背包模块 配置相关接口封装组件
}
//模块名称
func (this *Pack) GetType() core.M_Modules {
return comm.SM_PackModule
}
//模块初始化接口 注册用户创建角色事件
func (this *Pack) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) {
err = this.ModuleBase.Init(service, module, options)
event.RegisterGO(comm.Event_CreateUser, this.event_CreateUser)
return
}
//装备组件
func (this *Pack) OnInstallComp() {
this.ModuleBase.OnInstallComp()
this.api_comp = this.RegisterComp(new(Api_Comp)).(*Api_Comp)

View File

@ -12,13 +12,17 @@ import (
"go_dreamfactory/lego/sys/log"
)
/*
web api 服务组件
*/
type Api_Comp struct {
cbase.ModuleCompBase
options *Options
module *Web
gin gin.ISys
options *Options //模块参数
module *Web //当前模块对象
gin gin.ISys //gin 框架 web的热门框架
}
//组件初始化接口 启动web服务 并注册api
func (this *Api_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
err = this.ModuleCompBase.Init(service, module, comp, options)
this.options = options.(*Options)

View File

@ -3,11 +3,15 @@ package web
import (
"go_dreamfactory/comm"
"go_dreamfactory/modules"
cfg "go_dreamfactory/sys/configure/structs"
"go_dreamfactory/lego/core"
)
/*
模块名:web
描述:为服务集群提供一些http服务接口 方便测试和后期运维实现
开发:李伟
*/
func NewModule() core.IModule {
m := new(Web)
return m
@ -16,14 +20,15 @@ func NewModule() core.IModule {
type Web struct {
modules.ModuleBase
options *Options
table *cfg.TbItem
api_comp *Api_Comp
api_comp *Api_Comp //提供weba pi服务的组件
}
//模块名
func (this *Web) GetType() core.M_Modules {
return comm.SM_WebModule
}
//模块自定义参数
func (this *Web) NewOptions() (options core.IModuleOptions) {
return new(Options)
}
@ -34,21 +39,6 @@ func (this *Web) Init(service core.IService, module core.IModule, options core.I
return
}
func (this *Web) Start() (err error) {
err = this.ModuleBase.Start()
// var (
// data interface{}
// )
// if err = configure.RegisterConfigure("tbitem.json", cfg.NewTbItem); err != nil {
// return
// }
// if data, err = configure.GetConfigure("tbitem.json"); err != nil {
// return
// }
// this.table = data.(*cfg.TbItem)
return
}
func (this *Web) OnInstallComp() {
this.ModuleBase.OnInstallComp()
this.api_comp = this.RegisterComp(new(Api_Comp)).(*Api_Comp)

View File

@ -15,28 +15,36 @@ import (
"google.golang.org/protobuf/proto"
)
/*
服务网关组件 用于接收网关服务发送过来的消息
*/
func NewGateRouteComp() comm.ISC_GateRouteComp {
comp := new(SComp_GateRouteComp)
return comp
}
//用户协议处理函数注册的反射对象
type msghandle struct {
rcvr reflect.Value
msgType reflect.Type
fn reflect.Method
}
//服务网关组件
type SComp_GateRouteComp struct {
cbase.ServiceCompBase
service base.IRPCXService
mrlock sync.RWMutex
msghandles map[string]*msghandle
service base.IRPCXService //rpc服务对象 通过这个对象可以发布服务和调用其他服务的接口
mrlock sync.RWMutex //msghandles 对象的锁
msghandles map[string]*msghandle //处理函数的管理对象
}
//设置服务组件名称 方便业务模块中获取此组件对象
func (this *SComp_GateRouteComp) GetName() core.S_Comps {
return comm.SC_ServiceGateRouteComp
}
//组件初始化函数
func (this *SComp_GateRouteComp) Init(service core.IService, comp core.IServiceComp, options core.ICompOptions) (err error) {
err = this.ServiceCompBase.Init(service, comp, options)
this.service = service.(base.IRPCXService)
@ -44,13 +52,14 @@ func (this *SComp_GateRouteComp) Init(service core.IService, comp core.IServiceC
return err
}
//组件启动时注册rpc服务监听
func (this *SComp_GateRouteComp) Start() (err error) {
this.service.RegisterFunctionName(string(comm.Rpc_GatewayRoute), this.ReceiveMsg) //注册网关路由接收接口
err = this.ServiceCompBase.Start()
return
}
//注册路由
//业务模块注册用户消息处理路由
func (this *SComp_GateRouteComp) RegisterRoute(methodName string, comp reflect.Value, msg reflect.Type, fn reflect.Method) {
log.Debugf("注册用户路由【%s】", methodName)
this.mrlock.RLock()
@ -69,6 +78,7 @@ func (this *SComp_GateRouteComp) RegisterRoute(methodName string, comp reflect.V
this.mrlock.Unlock()
}
//Rpc_GatewayRoute服务接口的接收函数
func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.AgentMessage, reply *pb.RPCMessageReply) error {
log.Debugf("SComp_GateRouteComp ReceiveMsg agent:%s uId:%s MessageDistribution msg:%s", args.UserSessionId, args.UserId, args.Method)
this.mrlock.RLock()

View File

@ -10,6 +10,10 @@ import (
"go_dreamfactory/lego/core"
)
/*
服务类型:gateway
服务描述:网关服务提供客户端连接的服务管理用户的socket对象以及用户消息的分发只包含gateway业务模块
*/
var (
conf = flag.String("conf", "./conf/gateway_1.yaml", "获取需要启动的服务配置文件") //启动服务的Id
)

View File

@ -8,12 +8,15 @@ import (
"go_dreamfactory/lego/sys/log"
)
//梦工厂基础服务对象
type ServiceBase struct {
rpcx.RPCXService
}
//初始化相关系统
func (this *ServiceBase) InitSys() {
this.RPCXService.InitSys()
//初始化配置中心系统 每个服务都会用到的就在这个初始化就好
if err := configure.OnInit(this.GetSettings().Sys["configure"]); err != nil {
panic(fmt.Sprintf("init sys.configure err: %s", err.Error()))
} else {

View File

@ -14,6 +14,10 @@ import (
"go_dreamfactory/lego/sys/log"
)
/*
服务类型:web
服务描述:提供一些http服务方便后期维护和开发过程中的测试,只包含 web 业务模块
*/
var (
conf = flag.String("conf", "./conf/web_1.yaml", "获取需要启动的服务配置文件") //启动服务的Id
)
@ -25,7 +29,6 @@ func main() {
rpcx.SetVersion("1.0.0.0"),
)
s.OnInstallComp( //装备组件
services.NewGateRouteComp(),
)
lego.Run(s, //运行模块
web.NewModule(),

View File

@ -17,10 +17,15 @@ import (
"go_dreamfactory/lego/sys/log"
)
/*
服务类型:worker
服务描述:处理梦工厂的具体业务需求包含 user,pack,mail,friend...功能业务模块
*/
var (
conf = flag.String("conf", "./conf/worker.yaml", "获取需要启动的服务配置文件") //启动服务的Id
)
/*服务启动的入口函数*/
func main() {
flag.Parse()
s := NewService(
@ -28,10 +33,9 @@ func main() {
rpcx.SetVersion("1.0.0.0"),
)
s.OnInstallComp( //装备组件
services.NewGateRouteComp(),
services.NewGateRouteComp(), //此服务需要接受用户的消息 需要装备网关组件
)
lego.Run(s, //运行模块
// web.NewModule(),
user.NewModule(),
pack.NewModule(),
mail.NewModule(),
@ -46,17 +50,21 @@ func NewService(ops ...rpcx.Option) core.IService {
return s
}
//worker 的服务对象定义
type Service struct {
services.ServiceBase
}
//初始化worker需要的一些系统工具
func (this *Service) InitSys() {
this.ServiceBase.InitSys()
//缓存系统
if err := cache.OnInit(this.GetSettings().Sys["cache"]); err != nil {
panic(fmt.Sprintf("init sys.cache err: %s", err.Error()))
} else {
log.Infof("init sys.cache success!")
}
//存储系统
if err := db.OnInit(this.GetSettings().Sys["db"]); err != nil {
panic(fmt.Sprintf("init sys.db err: %s", err.Error()))
} else {

7
sys/cache/core.go vendored
View File

@ -6,9 +6,10 @@ redis 缓存数据管理系统
type (
ISys interface {
IUser //户模块的相关缓存接口
IPack //背包模块的线管缓存接口
IMail //邮件相关的缓存接口
IUser //户模块的相关缓存接口
IPack //背包模块的线管缓存接口
IMail //邮件相关的缓存接口
IFriend //好友相关的缓存接口
}
)

View File

@ -8,6 +8,7 @@ import (
"testing"
)
//测试环境下初始化db和cache 系统
func TestMain(m *testing.M) {
if err := db.OnInit(nil, db.Set_MongodbUrl("mongodb://admin:123456@10.0.0.9:27018"), db.Set_MongodbDatabase("dreamfactory")); err != nil {
fmt.Printf("err:%v\n", err)

2
sys/cache/pack.go vendored
View File

@ -11,7 +11,7 @@ import (
)
const ( //Redis
Redis_PackCache string = "pack:%s"
Redis_PackCache string = "pack:%s" //背包缓存数据存放key
)
const (

View File

@ -6,6 +6,7 @@ import (
"testing"
)
//测试用户背包 添加物品
func Test_Pack_AddItemToUserPack(t *testing.T) {
err := cache.Defsys.Pack_AddItemToUserPack("liwei1dao", 1001, 100)
fmt.Printf("Pack_AddItemToUserPack err:%v\n", err)