From bedf7eae844f959aae5c3fd780989f6eaa784ce2 Mon Sep 17 00:00:00 2001 From: meixiongfeng <766881921@qq.com> Date: Mon, 13 Jun 2022 14:23:03 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=8F=82=E6=95=B0=E6=A0=A1=E9=AA=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sys/db/model.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sys/db/model.go b/sys/db/model.go index a08b3ed08..8ad58c26c 100644 --- a/sys/db/model.go +++ b/sys/db/model.go @@ -56,7 +56,7 @@ func (this *DB) Mail_UpdateDBBylog() (err error) { log.Errorf("delete %s db err:%v", data.D[0].(core.SqlTable), err) } } else { // update - if len(data.D) < 2 { + if len(data.D) < 3 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) return errors.New("parameter len err") } From 10f5922475388a7b0879a583700648a3318f983d Mon Sep 17 00:00:00 2001 From: meixiongfeng <766881921@qq.com> Date: Mon, 13 Jun 2022 17:40:10 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=97=A5=E5=BF=97=E5=86=99=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- comm/core.go | 13 +-- modules/model/api.go | 20 ++++ modules/model/configure_comp.go | 16 +++ modules/model/db_comp.go | 174 ++++++++++++++++++++++++++++++++ modules/model/dbservice_comp.go | 39 +++++++ modules/model/module.go | 48 +++++++++ services/worker/main.go | 2 + sys/db/model.go | 84 --------------- 9 files changed, 307 insertions(+), 91 deletions(-) create mode 100644 modules/model/api.go create mode 100644 modules/model/configure_comp.go create mode 100644 modules/model/db_comp.go create mode 100644 modules/model/dbservice_comp.go create mode 100644 modules/model/module.go delete mode 100644 sys/db/model.go diff --git a/README.md b/README.md index 8d5a2168f..7d85b2e65 100644 --- a/README.md +++ b/README.md @@ -217,7 +217,7 @@ func main() { lego.Run(s, //装备模块 user.NewModule(), //装备用户模块 pack.NewModule(), //装备背包模块 - mail.NewModule(), //装备邮件模块 + mail.NewModule(), //装备邮件模块 ) } diff --git a/comm/core.go b/comm/core.go index d5246fce7..15d07a612 100644 --- a/comm/core.go +++ b/comm/core.go @@ -22,12 +22,13 @@ const ( //模块名定义处 const ( - SM_GateModule core.M_Modules = "gateway" //gate模块 网关服务模块 - SM_WebModule core.M_Modules = "web" //web模块 - SM_UserModule core.M_Modules = "user" //用户模块 - SM_PackModule core.M_Modules = "pack" //背包模块 - SM_MailModule core.M_Modules = "mail" //邮件模块 - SM_FriendModule core.M_Modules = "friend" //好友模块 + SM_GateModule core.M_Modules = "gateway" //gate模块 网关服务模块 + SM_WebModule core.M_Modules = "web" //web模块 + SM_UserModule core.M_Modules = "user" //用户模块 + SM_PackModule core.M_Modules = "pack" //背包模块 + SM_MailModule core.M_Modules = "mail" //邮件模块 + SM_FriendModule core.M_Modules = "friend" //好友模块 + SM_LogModelMueule core.M_Modules = "model" //日志模块 ) //RPC服务接口定义处 diff --git a/modules/model/api.go b/modules/model/api.go new file mode 100644 index 000000000..777cf3ff2 --- /dev/null +++ b/modules/model/api.go @@ -0,0 +1,20 @@ +package model + +import ( + "go_dreamfactory/modules" + + "go_dreamfactory/lego/core" +) + +type Api_Comp struct { + modules.MComp_GateComp + service core.IService + module *Model +} + +func (this *Api_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { + this.MComp_GateComp.Init(service, module, comp, options) + this.service = service + this.module = module.(*Model) + return +} diff --git a/modules/model/configure_comp.go b/modules/model/configure_comp.go new file mode 100644 index 000000000..5d46dd968 --- /dev/null +++ b/modules/model/configure_comp.go @@ -0,0 +1,16 @@ +package model + +import ( + "go_dreamfactory/lego/core" + "go_dreamfactory/lego/core/cbase" +) + +// 邮件配置管理组件 +type Configure_Comp struct { + cbase.ModuleCompBase +} + +func (this *Configure_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { + this.ModuleCompBase.Init(service, module, comp, options) + return +} diff --git a/modules/model/db_comp.go b/modules/model/db_comp.go new file mode 100644 index 000000000..09c04d01a --- /dev/null +++ b/modules/model/db_comp.go @@ -0,0 +1,174 @@ +package model + +import ( + "context" + "errors" + "go_dreamfactory/lego/core" + "go_dreamfactory/lego/core/cbase" + "go_dreamfactory/lego/sys/log" + "go_dreamfactory/lego/sys/mgo" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/options" +) + +const ( + WriteMaxNum uint32 = 1000 //一次性最处理条数 +) + +type DB_Comp struct { + cbase.ModuleCompBase + mgo mgo.ISys +} +type Autogenerated struct { + ID string `json:"_id"` + UID string `json:"uid"` + Act string `json:"act"` // insert update delete + D []interface{} +} + +type QueryStruct struct { + Selector bson.M + Query bson.M +} + +const ( + DB_ModelTable core.SqlTable = "model" +) + +type IModel interface { + Model_UpdateDBBylog() (err error) // 读取日志并更新对应的表 + Model_InsertDBBylog(data *Autogenerated) (err error) // 插入日志 + Model_UpdateUserDataByUid(uid string) (err error) // 读取指定玩家信息到db中 +} + +func (this *DB_Comp) Model_UpdateDBBylog() (err error) { + + _delID := make([]string, 0) // 处理完成要删除的id + _data, err := this.mgo.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum))) + if err != nil { + return err + } + for _data.Next(context.TODO()) { // 处理删除逻辑 + data := &Autogenerated{} + if err = _data.Decode(data); err == nil { + _delID = append(_delID, data.ID) + } else { + log.Errorf("Decode Data err : %v", err) + continue + } + + log.Debugf("======= insert log : %+v =======", data) + if data.Act == "insert" { + if len(data.D) < 2 { // 参数校验 + log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) + return errors.New("parameter len err") + } + query := data.D[1].([]interface{}) + _, err := this.mgo.InsertMany(data.D[0].(core.SqlTable), query) + if err != nil { + log.Errorf("insert %s db err:%v", data.D[0].(core.SqlTable), err) + } + } else if data.Act == "delete" { + if len(data.D) < 2 { // 参数校验 + log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) + return errors.New("parameter len err") + } + _, err := this.mgo.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{})) + if err != nil { + log.Errorf("delete %s db err:%v", data.D[0].(core.SqlTable), err) + } + } else { // update + if len(data.D) < 3 { // 参数校验 + log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) + return errors.New("parameter len err") + } + where := data.D[1].(map[string]interface{}) + _obj := &QueryStruct{} + for k, v := range where { + _obj.Selector[k] = v + } + query := data.D[2].(map[string]interface{}) + for k, v := range query { + _obj.Query[k] = v + } + this.mgo.UpdateMany(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query) + } + } + // 批量删除已处理的数据 + _, err = this.mgo.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$inc": _delID}}, options.Delete()) + if err != nil { + log.Errorf("del err %v", err) + } + return +} + +func (this *DB_Comp) Model_InsertDBBylog(data *Autogenerated) (err error) { + + _, err = this.mgo.InsertOne(DB_ModelTable, data) + if err != nil { + log.Errorf("insert model db err %v", err) + } + return err +} + +func (this *DB_Comp) Model_UpdateUserDataByUid(uid string) (err error) { + + _delID := make([]string, 0) // 处理完成要删除的id + _data, err := this.mgo.Find(DB_ModelTable, bson.M{"userid": uid}, options.Find()) + if err != nil { + return err + } + for _data.Next(context.TODO()) { // 处理删除逻辑 + data := &Autogenerated{} + if err = _data.Decode(data); err == nil { + _delID = append(_delID, data.ID) + } else { + log.Errorf("Decode Data err : %v", err) + continue + } + + log.Debugf("======= insert log : %+v =======", data) + if data.Act == "insert" { + if len(data.D) < 2 { // 参数校验 + log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) + return errors.New("parameter len err") + } + query := data.D[1].([]interface{}) + _, err := this.mgo.InsertMany(data.D[0].(core.SqlTable), query) + if err != nil { + log.Errorf("insert %s db err:%v", data.D[0].(core.SqlTable), err) + } + } else if data.Act == "delete" { + if len(data.D) < 2 { // 参数校验 + log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) + return errors.New("parameter len err") + } + _, err := this.mgo.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{})) + if err != nil { + log.Errorf("delete %s db err:%v", data.D[0].(core.SqlTable), err) + } + } else { // update + if len(data.D) < 3 { // 参数校验 + log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) + return errors.New("parameter len err") + } + where := data.D[1].(map[string]interface{}) + _obj := &QueryStruct{} + for k, v := range where { + _obj.Selector[k] = v + } + query := data.D[2].(map[string]interface{}) + for k, v := range query { + _obj.Query[k] = v + } + this.mgo.FindOneAndUpdate(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query) + } + } + // 批量删除已处理的数据 + _, err = this.mgo.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$inc": _delID}}, options.Delete()) + if err != nil { + log.Errorf("del err %v", err) + } + return +} diff --git a/modules/model/dbservice_comp.go b/modules/model/dbservice_comp.go new file mode 100644 index 000000000..fb7ce3e74 --- /dev/null +++ b/modules/model/dbservice_comp.go @@ -0,0 +1,39 @@ +package model + +import ( + "go_dreamfactory/lego/core" + "go_dreamfactory/lego/core/cbase" + "time" +) + +type DBService_Comp struct { + cbase.ModuleCompBase + task chan string + DB_Comp +} + +func (this *DBService_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { + this.ModuleCompBase.Init(service, module, comp, options) + + return +} + +func (this *DBService_Comp) Start(err error) { + this.ModuleCompBase.Start() + go this.run() +} + +func (this *DBService_Comp) run() { + for { + select { + case v := <-this.task: + this.Model_UpdateUserDataByUid(v) + case <-time.After(time.Second): + this.Model_UpdateDBBylog() + } + } +} + +func (this *DBService_Comp) PushUserTask(uid string) { + this.task <- uid +} diff --git a/modules/model/module.go b/modules/model/module.go new file mode 100644 index 000000000..f343ce609 --- /dev/null +++ b/modules/model/module.go @@ -0,0 +1,48 @@ +package model + +import ( + "go_dreamfactory/comm" + "go_dreamfactory/lego/core" + "go_dreamfactory/modules" + "time" +) + +func NewModule() core.IModule { + m := new(Model) + return m +} + +type Model struct { + modules.ModuleBase + api_comp *Api_Comp + configure_comp *Configure_Comp +} + +func (this *Model) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) { + err = this.ModuleBase.Init(service, module, options) + + return +} + +func (this *Model) Start() (err error) { + err = this.ModuleBase.Start() + //go this.RunWriteDB() + return +} + +func (this *Model) GetType() core.M_Modules { + return comm.SM_LogModelMueule +} + +func (this *Model) OnInstallComp() { + this.ModuleBase.OnInstallComp() + this.api_comp = this.RegisterComp(new(Api_Comp)).(*Api_Comp) + this.configure_comp = this.RegisterComp(new(Configure_Comp)).(*Configure_Comp) +} + +func (this *Model) RunWriteDB() { + for { + time.Sleep(time.Second) + + } +} diff --git a/services/worker/main.go b/services/worker/main.go index 3458ede0f..a667dfb03 100644 --- a/services/worker/main.go +++ b/services/worker/main.go @@ -5,6 +5,7 @@ import ( "fmt" "go_dreamfactory/modules/friend" "go_dreamfactory/modules/mail" + "go_dreamfactory/modules/model" "go_dreamfactory/modules/pack" "go_dreamfactory/modules/user" "go_dreamfactory/services" @@ -40,6 +41,7 @@ func main() { pack.NewModule(), mail.NewModule(), friend.NewModule(), + model.NewModule(), ) } diff --git a/sys/db/model.go b/sys/db/model.go deleted file mode 100644 index 8ad58c26c..000000000 --- a/sys/db/model.go +++ /dev/null @@ -1,84 +0,0 @@ -package db - -import ( - "errors" - "go_dreamfactory/lego/core" - "go_dreamfactory/lego/sys/log" - - "go.mongodb.org/mongo-driver/bson" -) - -type Autogenerated struct { - ID string `json:"_id"` - UID string `json:"uid"` - Act string `json:"act"` // insert update delete - D []interface{} -} - -// type data struct { -// Table string -// Wheremap [map[string]]interface{} // 如果是insert 条件就是nil del 只有条件 -// Modifymap map[string]map[string]interface{} -// insertData []interface{} -// } - -type QueryStruct struct { - Selector bson.M - Query bson.M -} - -const ( - DB_ModelTable core.SqlTable = "model" -) - -type IModel interface { - Model_UpdateDBBylog() (err error) // 读取日志并更新对应的表 - Model_InsertDBBylog(data *Autogenerated) (err error) // 插入日志 - Model_UpdateUserDataByUid(uid string) (err error) // 读取指定玩家信息到db中 -} - -func (this *DB) Mail_UpdateDBBylog() (err error) { - data := &Autogenerated{} - this.mgo.FindOneAndDelete(DB_ModelTable, bson.M{}).Decode(data) // 查找最新的一条并且删除 - - log.Debugf("======= insert log : %+v =======", data) - if data.Act == "insert" { - query := data.D[1].([]interface{}) - - _, err := this.mgo.InsertMany(data.D[0].(core.SqlTable), query) - if err != nil { - log.Errorf("insert %s db err:%v", data.D[0].(core.SqlTable), err) - } - } else if data.Act == "delete" { - - _, err := this.mgo.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{})) - if err != nil { - log.Errorf("delete %s db err:%v", data.D[0].(core.SqlTable), err) - } - } else { // update - if len(data.D) < 3 { // 参数校验 - log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) - return errors.New("parameter len err") - } - where := data.D[1].(map[string]interface{}) - _obj := &QueryStruct{} - for k, v := range where { - _obj.Selector[k] = v - } - query := data.D[2].(map[string]interface{}) - for k, v := range query { - _obj.Query[k] = v - } - this.mgo.FindOneAndUpdate(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query) - } - return -} - -func (this *DB) Model_InsertDBBylog(data *Autogenerated) (err error) { - - _, err = this.mgo.InsertOne(DB_ModelTable, data) - if err != nil { - log.Errorf("insert model db err %v", err) - } - return err -}