梦工厂服务端项目
Go to file
2022-06-08 17:57:02 +08:00
bin/json Merge branch 'dev' of http://git.legu.cc/liwei_3d/go_dreamfactory into liwei 2022-06-07 20:22:47 +08:00
cmd 上传机器人代码 2022-06-08 17:57:02 +08:00
comm 上传机器人代码 2022-06-08 17:57:02 +08:00
lego 移除不需要的系统 2022-06-08 09:17:03 +08:00
modules 定义用户创建角色事件 2022-06-08 15:53:58 +08:00
pb 还原误删的文件 2022-06-07 21:00:41 +08:00
services 注册好友模块 2022-06-08 14:49:27 +08:00
sys Merge branch 'dev' of http://git.legu.cc/liwei_3d/go_dreamfactory into liwei 2022-06-08 14:35:05 +08:00
utils Merge branch 'dev' of http://git.legu.cc/liwei_3d/go_dreamfactory into liwei 2022-06-07 20:22:47 +08:00
.gitignore Merge branch 'master' of http://git.legu.cc/liwei_3d/go_dreamfactory 2022-06-01 15:55:45 +08:00
go_dreamfactory.rar 上传测试代码 2022-06-07 19:50:08 +08:00
go.mod Merge branch 'dev' of http://git.legu.cc/liwei_3d/go_dreamfactory into liwei 2022-06-07 20:22:47 +08:00
go.sum Merge branch 'dev' of http://git.legu.cc/liwei_3d/go_dreamfactory into liwei 2022-06-07 20:22:47 +08:00
linux-build.bat 上传代码 2022-06-01 14:52:33 +08:00
pb_2.7.py 好友协议 2022-06-07 19:45:52 +08:00
py_consul.py 上传配置以及pyhont 工具脚本 2022-05-31 16:59:28 +08:00
README.md 补充项目说明文档 2022-06-08 15:13:22 +08:00

go_dreamfactory

梦工厂服务端项目

项目构架说明

.vscode                     #vscode 编辑器环境配置项 包含服务调试运行配置
bin                         #服务运行和发布环境 包含服务的执行文件以及相关依赖配置
    conf
        gateway_1.yaml      #服务配置 网关服务的配置依赖文件
        worker_1.yaml       #服务配置 工作服务的配置依赖文件
    json                    #项目策划配置文件
    log                     #服务运行输出日志文件目录
cmd                         #命令行工具集
comm                        #项目通用常量和类型定义
    core                        #模块名以及rpc方法名等等的定义
    usersession                 #用户的会话对象实现 实现跨服与用户通信
lego                        #lego 微服务框架包
    base                        #基础服务容器的定义
    core                        #lego服务和模块设计相关
    sys                         #一些通用的系统工具集合
    utils                       #一些公用的方法工具集合
modules                     #项目业务功能模块集合
    friend                      #好友模块 提供好像系统相关业务功能 在worker服务下运行
    gateway                     #网关模块 提供网关服务以及用户连接对象管理功能 在gateway服务容器下运行
    mail                        #邮件模块 提供邮箱相关业务功能 在worker服务下运行
    pack                        #背包模块 提供用户背包管理相关业务功能 在worker服务下运行
    user                        #用户模块 提供登录祖册相关业务功能 在worker服务下运行
    web                         #web模块 提供一些http服务支持 在web服务下运行
    core.go                     #一些模块集合内通用的接口和类型定义
    gate_comp.go                #模块网关组件 用户注册接收用户的消息 业务模块自定义api组件继承此组件即可
    modulebase.go               #基础模块 业务模块都应继承此模块 内部封装有一些通用的接口 方便业务开发
pb                          #服务协议以及数据类型定义
    proto                       #proto的定义文件目录
services                    #服务定义目录
    gateway                     #网关服务   定义网关服务容器 只运行网关服务模块
    web                         #web服务    提供一些http服务功能 前期测试和后期运维可能需要
    worker                      #工作服务   游戏的业务模块都会在这个服务容器中运行
    servicebase.go              #基础服务容器定义 可以实现服务的底层通用接口的实现
    comp_gateroute.go           #网关服务组件 用于接收从网关服务发出的用户协议并分发给本服务内各个模块处理
sys                         #公用系统工具集
    cache                       #缓存管理系统 封装游戏类缓存数据的处理接口
    db                          #存储管理系统 封装游戏mgo数据库相关数据处理接口
    configure                   #配置管理中心 负责加载更新管理项目中使用到的各类配置文件数据
utils                       #公用方法工具集

如何快速开发模块

  • 定义模块类型名称 在comm/core.go 中定义定义自己的业务模块名称
    const (
        SM_GateModule   core.M_Modules = "gateway" //gate模块 	网关服务模块
        SM_WebModule    core.M_Modules = "web"     //web模块
        SM_UserModule   core.M_Modules = "user"    //用户模块
        SM_PackModule   core.M_Modules = "pack"    //背包模块
        SM_MailModule   core.M_Modules = "mail"    //邮件模块
        SM_FriendModule core.M_Modules = "friend"  //好友模块
        //定义自己的模块名称
    )
    
  • 创建自己的模块目录 在modules/ 目录下创建自己的模块目录
  • 定义自己的模块对象 在modules/自己的模块目录 创建module.go 文件
    package pack    //修改成自己的包名
    
    import (
        "go_dreamfactory/comm"
        "go_dreamfactory/modules"
    
        "go_dreamfactory/lego/core"
    )
    
    /*
    模块名:Pack
    描述:背包系统模块
    开发:李伟
    */
    func NewModule() core.IModule {
        m := new(Pack)
        return m
    }
    
    //换成自己的模块对象
    type Pack struct {
        modules.ModuleBase
    }
    
    func (this *Pack) GetType() core.M_Modules {
        return comm.SM_PackModule               //换成自己的模块名
    }
    
    //组装组件的地方
    func (this *Pack) OnInstallComp() {
        this.ModuleBase.OnInstallComp()
    }
    
    
  • 定义自己的用户请求处理组件 在modules/自己的模块目录 创建api_comp.go 文件 名字根据自己的情况定义,
    //api_comp.go
    //必须继承  modules.MComp_GateComp
    type Api_Comp struct {
        modules.MComp_GateComp
    }
    
    //定义具体的消息处理函数
    func (this *Api_Comp) QueryUserPackReq(ctx context.Context, session comm.IUserSession, req *pb.QueryUserPackReq) (err error) {
    
    }
    
    
    
    //将组件装备到模块上
    //module.go
    func (this *Pack) OnInstallComp() {
        this.ModuleBase.OnInstallComp()
        this.api_comp = this.RegisterComp(new(Api_Comp)).(*Api_Comp)
    }
    
  • 定义自己的协议 在pb/proto 下定义自己的proto协议文件并将文件加入到pb_2.7.py编译工具中
        buildProto('./pb/proto','./pb','comm')
        buildProto('./pb/proto','./pb','errorcode')
        buildProto('./pb/proto','./pb','user_db')
        buildProto('./pb/proto','./pb','user_msg')
        buildProto('./pb/proto','./pb','pack_db')
        buildProto('./pb/proto','./pb','pack_msg')
        //加入知己的协议文件
    
  • 将模块装备到worker服务容器中 services/worker/main.go
    func main() {
        flag.Parse()
        s := NewService(
            rpcx.SetConfPath(*conf),
            rpcx.SetVersion("1.0.0.0"),
        )
        s.OnInstallComp( //装备组件
            services.NewGateRouteComp(),
        )
        lego.Run(s, //运行模块
            // web.NewModule(),
            user.NewModule(),
            pack.NewModule(),
            mail.NewModule(),
            //在这里吧自己的模块注册进去
        )
    
    }
    

服务容器的定义说明

  • service 容器定义 项目可更具情况组合拆分业务到具体的服务中
var (
	conf = flag.String("conf", "./conf/worker.yaml", "获取需要启动的服务配置文件") //启动服务的Id
)

func main() {
	flag.Parse()
	s := NewService(
		rpcx.SetConfPath(*conf),            //指定配置文件路径
		rpcx.SetVersion("1.0.0.0"),         //设置服务版本号
	)
    //每一个服务都只是一个进程容器,服务提供的功能业务取决服务绑定了哪些组件和模块
	s.OnInstallComp( //装备组件
		services.NewGateRouteComp(),                //装备接收用户消息的组件     
	)
	lego.Run(s, //装备模块
		user.NewModule(),                           //装备用户模块          
		pack.NewModule(),                           //装备背包模块          
		mail.NewModule(),                           //装备邮件模块            
	)

}

func NewService(ops ...rpcx.Option) core.IService {
	s := new(Service)
	s.Configure(ops...)
	return s
}

//继承基础服务对象 方便后期通用接口的扩展
type Service struct {
	services.ServiceBase
}

//初始化服务中需要用到的系统工具
func (this *Service) InitSys() {
	this.ServiceBase.InitSys()
    //初始化缓存系统
	if err := cache.OnInit(this.GetSettings().Sys["cache"]); err != nil {
		panic(fmt.Sprintf("init sys.cache err: %s", err.Error()))
	} else {
		log.Infof("init sys.cache success!")
	}
    //初始化存储系统
	if err := db.OnInit(this.GetSettings().Sys["db"]); err != nil {
		panic(fmt.Sprintf("init sys.db err: %s", err.Error()))
	} else {
		log.Infof("init sys.db success!")
	}
}

模块容器的定义说明

pack/module.go 每个模块目录必须有一个模块对象 一般将相同类型的业务分为一个模块中实现, 模块也只是一个容器,模块中具体实现功能的单元为模块组件对象,模块启动是装备具体的业务组件集合实现相关的业务功能

func NewModule() core.IModule {
	m := new(Pack)
	return m
}

//模块结构对象 继承基础模块对象
type Pack struct {
	modules.ModuleBase
	api_comp       *Api_Comp
	configure_comp *Configure_Comp
}

//模块类型确定 模块名称
func (this *Pack) GetType() core.M_Modules {
	return comm.SM_PackModule
}
//服务启动相关模块时调用的接口
func (this *Pack) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) {
	err = this.ModuleBase.Init(service, module, options)	
	return
}
//模块启动时装备业务组件 可更具自己的业务划分 自己定义业务组件
func (this *Pack) OnInstallComp() {
	this.ModuleBase.OnInstallComp()
    //用户背包相关协议处理组件
	this.api_comp = this.RegisterComp(new(Api_Comp)).(*Api_Comp)
    //背包系统的相关配置管理组件             
	this.configure_comp = this.RegisterComp(new(Configure_Comp)).(*Configure_Comp)
}

pack/api_comp.go 背包用户api组件 用于接收处理用具背包请求协议

//Api 组件继承于模块网关组件 modules.MComp_GateComp 内部通过反射机制将当前组件内处理协议的接口注册到 comp_gateroute.go 网关服务组件中之后服务组件就可以把接收到的用户消息分发给当前api组件
type Api_Comp struct {
	modules.MComp_GateComp
	module *Pack
}
//每个组件都可以通过组件初始化接口拿到 当前服务容器对象以及所属的模块对象
func (this *Api_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
	this.MComp_GateComp.Init(service, module, comp, options)
	this.module = module.(*Pack)
	return
}

///查询用户背包数据 处理用处的消息接口 统一 func(ctx context.Context, session comm.IUserSession, req 消息结构对象)(err error)/////ctx 上下文控制器 可以管理方法调用中生成的多个协程 以及传递一些元数据
//session 用户的会话对象 可以使用此对象给用户发送消息以及关闭用户的连接对象
//req 协议处理函数接收到具体消息数据对象
func (this *Api_Comp) QueryUserPackReq(ctx context.Context, session comm.IUserSession, req *pb.QueryUserPackReq) (err error) {
	var (
		code  pb.ErrorCode
		pack  *pb.DB_UserPackData
		grids []*pb.DB_GridData
	)
	defer func() {
		session.SendMsg(string(this.module.GetType()), "queryuserpackresp", code, &pb.QueryUserPackResp{Grids: grids})
	}()
	if !session.IsLogin() {
		code = pb.ErrorCode_NoLogin
		return
	}
    //通过缓存读取到用户的背包信息
	if pack, err = cache.Defsys.Pack_QueryUserPack(session.GetUserId()); err != nil {
		log.Errorf("QueryUserPackReq err:%v", err)
		code = pb.ErrorCode_CacheReadError
		return
	} else {
		grids = this.module.configure_comp.GetPackItemByType(pack, req.IType)
	}
	return
}

客户端发送消息到服务器处理的流程介绍

  • 前后端通用协议结构定义 pb/proto/comm.proto
    message UserMessage  {
        string MainType =1;         //消息的主id 默认为具体的业务模块名称
        string SubType = 2;         //消息的副id 默认为具体处理消息的函数名
        ErrorCode Code = 3;         //服务端回应客户端返回的错误码 pb/proto/errorcode.proto 定义
        bytes Data = 4;             //具体的消息体内容
    }
    
  • 网关服务接收用户协议的处理 modules/gateway/agent.go
        //用户代理对象
        type Agent struct {
            gateway     IGateway
            wsConn      *websocket.Conn
            sessionId   string
            uId         string
            writeChan   chan *pb.UserMessage
            closeSignal chan bool
            state       int32 //状态 0 关闭 1 运行 2 关闭中
            wg          sync.WaitGroup
        }
    
        //读取用户发送过来的消息
        func (this *Agent) readLoop() {
            defer this.wg.Done()
            var (
                data []byte
                msg  *pb.UserMessage = &pb.UserMessage{}
                err  error
            )
        locp:
            for {
                if _, data, err = this.wsConn.ReadMessage(); err != nil {
                    log.Errorf("agent:%s uId:%s ReadMessage err:%v", this.sessionId, this.uId, err)
                    go this.Close()
                    break locp
                }
                //读取消息 并序列化到 UserMessage 结构对象
                if err = proto.Unmarshal(data, msg); err != nil {
                    log.Errorf("agent:%s uId:%s Unmarshal err:%v", this.sessionId, this.uId, err)
                    go this.Close()
                    break locp
                } else {
                    //分发处理用户的消息请求
                    this.messageDistribution(msg)
                }
            }
            log.Debugf("agent:%s uId:%s readLoop end!", this.sessionId, this.uId)
        }
    
        //分发用户消息 将用户消息采用负载的方式分到到 worker 类型的服务
        //此处后期会补充设计用户协议分化系统,暂时采用负载分发实现基本功能线
        func (this *Agent) messageDistribution(msg *pb.UserMessage) {
            reply := &pb.RPCMessageReply{}
            log.Debugf("agent:%s uId:%s MessageDistribution msg:%s.%s", this.sessionId, this.uId, msg.MainType, msg.SubType)
            if err := this.gateway.Service().RpcCallByType("worker", string(comm.Rpc_GatewayRoute), context.Background(), &pb.AgentMessage{
                Ip:               this.IP(),
                UserSessionId:    this.sessionId,
                UserId:           this.uId,
                GatewayServiceId: this.gateway.Service().GetId(),
                Method:           fmt.Sprintf("%s.%s", msg.MainType, msg.SubType),
                Message:          msg.Data,
            }, reply); err != nil {
                log.Errorf("agent:%s uId:%s MessageDistribution err:%v", this.sessionId, this.uId, err)
            } else {
                log.Debugf("agent:%s uId:%s MessageDistribution reply:%v", this.sessionId, this.uId, reply)
            }
        }
    
    
  • Worker 服务如何接受用户的消息 services/comp_gateroute.go
    //网关服务组件启动时会注册Rpc方法 接收网关服务那边发过来的用户消息
    func (this *SComp_GateRouteComp) Start() (err error) {
        this.service.RegisterFunctionName(string(comm.Rpc_GatewayRoute), this.ReceiveMsg) //注册网关路由接收接口
        err = this.ServiceCompBase.Start()
        return
    }
    
    //具体接收用户消息的方法 此处消息分发需要当前服务内有模块注册处理消息的接口 没有注册的话 此处会打印错误日志
    //msghandles 为消息处理函数的管理对象
    func (this *SComp_GateRouteComp) ReceiveMsg(ctx context.Context, args *pb.AgentMessage, reply *pb.RPCMessageReply) error {
        log.Debugf("SComp_GateRouteComp ReceiveMsg agent:%s uId:%d MessageDistribution msg:%s", args.UserSessionId, args.UserId, args.Method)
        this.mrlock.RLock()
        msghandle, ok := this.msghandles[args.Method]
        this.mrlock.RUnlock()
        if ok {
            session := comm.NewUserSession(this.service, args.Ip, args.UserSessionId, args.GatewayServiceId, args.UserId)
            msg := reflect.New(msghandle.msgType.Elem()).Interface()
            if err := proto.Unmarshal(args.Message, msg.(proto.Message)); err != nil {
                log.Errorf("UserMessage:%s Unmarshal err:%v", args.Method, err)
                return err
            }
            msghandle.fn.Func.Call([]reflect.Value{msghandle.rcvr, reflect.ValueOf(ctx), reflect.ValueOf(session), reflect.ValueOf(msg)})
        } else {
            reply.Code = pb.ErrorCode_ReqParameterError
            // reply.Msg = pb.GetErrorCodeMsg(pb.ErrorCode_ReqParameterError)
        }
        return nil
    }
    
  • 业务模块 如何接受处理用户的消息
    #模块需要接受处理用户发送过来的消息需要装备一个继承modules/gate_comp.go 的模块网关组件此组件启动是会自动把自身定义的消息处理函数注册到上面提到的服务网关组件services/comp_gateroute.go 中去实现可以参考pack/api_comp.go 的定义
    /*
    模块 网关组件 接收处理用户传递消息
    */
    type MComp_GateComp struct {
        cbase.ModuleCompBase
        service base.IRPCXService
        module  core.IModule
        comp    core.IModuleComp
    }
    //组件启动是会通过反射把当前组件上定义的用户处理函数 统一注册到当前服务容器中的网关组件中
    func (this *MComp_GateComp) Start() (err error) {
        if err = this.ModuleCompBase.Start(); err != nil {
            return
        }
        var comp core.IServiceComp
        //注册远程路由
        if comp, err = this.service.GetComp(comm.SC_ServiceGateRouteComp); err != nil {
            return
        }
        this.suitableMethods(comp.(comm.ISC_GateRouteComp), reflect.TypeOf(this.comp))
        return
    }
    
    //反射注册消息处理函数
    func (this *MComp_GateComp) suitableMethods(scomp comm.ISC_GateRouteComp, typ reflect.Type) {
        for m := 0; m < typ.NumMethod(); m++ {
            method := typ.Method(m)
            mtype := method.Type
            mname := method.Name
            // Method must be exported.
            if method.PkgPath != "" {
                continue
            }
            // Method needs four ins: receiver, context.Context, *args, *reply.
            if mtype.NumIn() != 4 {
                continue
            }
            // First arg must be context.Context
            ctxType := mtype.In(1)
            if !ctxType.Implements(typeOfContext) {
                continue
            }
    
            // Second arg need not be a pointer.
            argType := mtype.In(2)
            if !argType.Implements(typeOfSession) {
                continue
            }
            // Third arg must be a pointer.
            replyType := mtype.In(3)
            if replyType.Kind() != reflect.Ptr {
                continue
            }
            // Reply type must be exported.
            if !this.isExportedOrBuiltinType(replyType) {
                continue
            }
            // Method needs one out.
            if mtype.NumOut() != 1 {
                continue
            }
            // The return type of the method must be error.
            if returnType := mtype.Out(0); returnType != typeOfError {
                continue
            }
            scomp.RegisterRoute(fmt.Sprintf("%s.%s", this.module.GetType(), strings.ToLower(mname)), reflect.ValueOf(this.comp), replyType, method)
        }
    }
    
  • 消息处理函数如何给用户回复消息
    //每一个消息处理函数我们都会传递一个 IUserSession的对象 开发人员可以使用此对象向当前用户回复消息以及关闭连接等操作
    type IUserSession interface {
        GetSessionId() string
        GetUserId() string
        GetIP() string
        GetGatewayServiceId() string
        IsLogin() bool
        Build(uid string) (err error)
        UnBuild(ServiceMethod string, msg proto.Message) (err error)
        SendMsg(mainType, subType string, code pb.ErrorCode, msg proto.Message) (err error)
        Close() (err error)
        ToString() string
    }
    
    
    ///查询用户背包数据 
    func (this *Api_Comp) QueryUserPackReq(ctx context.Context, session comm.IUserSession, req *pb.QueryUserPackReq) (err error) {
        var (
            code  pb.ErrorCode
            pack  *pb.DB_UserPackData
            grids []*pb.DB_GridData
        )
        defer func() {
            session.SendMsg(string(this.module.GetType()), "queryuserpackresp", code, &pb.QueryUserPackResp{Grids: grids})
        }()
        if !session.IsLogin() {
            code = pb.ErrorCode_NoLogin
            return
        }
        if pack, err = cache.Defsys.Pack_QueryUserPack(session.GetUserId()); err != nil {
            log.Errorf("QueryUserPackReq err:%v", err)
            code = pb.ErrorCode_CacheReadError
            return
        } else {
            grids = this.module.configure_comp.GetPackItemByType(pack, req.IType)
        }
        return
    }
    
  • 消息处理函数如何给其他用户发送消息
    modules/modulebase.go 是所有业务模块的基础模块,大家在定义业务模块时注意继承此模块,此模块封装有给指定用户发送消息以及给多个用户同时发送消息的接口后续一些通用的接口都会封装到这一层 方便上层业务模块快速调用,组件可以在Init方法中通过断言的方式获取到模块对象,在调用底层接口即可
    
    //向目标用户发送消息
    func (this *ModuleBase) SendMsgToUser(mainType, subType string, msg proto.Message, user *pb.Cache_UserData) (err error) {
        reply := &pb.RPCMessageReply{}
        data, _ := proto.Marshal(msg)
        if _, err = this.service.RpcGoById(user.GatewayServiceId, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.AgentSendMessageReq{
            UserSessionId: user.SessionId,
            MainType:      mainType,
            SubType:       subType,
            Data:          data,
        }, reply); err != nil {
            log.Errorf("SendMsgToUser%d:%s [%s.%s] err:%v", user.UserData.UserId, user.SessionId, mainType, subType, err)
        }
        return
    }
    //向多个目标用户发送消息
    func (this *ModuleBase) SendMsgToUsers(mainType, subType string, msg proto.Message, user ...*pb.Cache_UserData) (err error) {
        var (
            gateways map[string][]string = make(map[string][]string)
            gateway  []string
            ok       bool
        )
        for _, v := range user {
            if gateway, ok = gateways[v.GatewayServiceId]; !ok {
                gateway = make([]string, 0)
                gateways[v.GatewayServiceId] = gateway
            }
            gateway = append(gateway, v.SessionId)
        }
        reply := &pb.RPCMessageReply{}
        data, _ := proto.Marshal(msg)
        for k, v := range gateways {
            if _, err = this.service.RpcGoById(k, string(comm.Rpc_GatewayAgentSendMsg), context.Background(), &pb.BatchMessageReq{
                UserSessionIds: v,
                MainType:       mainType,
                SubType:        subType,
                Data:           data,
            }, reply); err != nil {
                log.Errorf("SendMsgToUsers:%s->%s.%s err:%v", k, mainType, subType, err)
            }
        }
        return
    }