diff --git a/modules/model/api.go b/modules/dbservice/api.go similarity index 85% rename from modules/model/api.go rename to modules/dbservice/api.go index 1a72633b9..b0f57528b 100644 --- a/modules/model/api.go +++ b/modules/dbservice/api.go @@ -1,4 +1,4 @@ -package model +package dbservice import ( "go_dreamfactory/modules" @@ -9,13 +9,13 @@ import ( type Api_Comp struct { modules.MComp_GateComp service core.IService - module *Model + module *DBService } func (this *Api_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { this.MComp_GateComp.Init(service, module, comp, options) this.service = service - this.module = module.(*Model) + this.module = module.(*DBService) return } diff --git a/modules/model/configure_comp.go b/modules/dbservice/configure_comp.go similarity index 95% rename from modules/model/configure_comp.go rename to modules/dbservice/configure_comp.go index 5d46dd968..8a01baa1b 100644 --- a/modules/model/configure_comp.go +++ b/modules/dbservice/configure_comp.go @@ -1,4 +1,4 @@ -package model +package dbservice import ( "go_dreamfactory/lego/core" diff --git a/modules/model/db_comp.go b/modules/dbservice/db_comp.go similarity index 62% rename from modules/model/db_comp.go rename to modules/dbservice/db_comp.go index e45618ce9..077a6e09f 100644 --- a/modules/model/db_comp.go +++ b/modules/dbservice/db_comp.go @@ -1,4 +1,4 @@ -package model +package dbservice import ( "context" @@ -14,6 +14,11 @@ import ( const ( WriteMaxNum uint32 = 1000 //一次性最处理条数 + ErrorMaxNum uint32 = 5 // 数据库操作最大失败次数 +) + +var ( + ErrorLogCount = make(map[string]uint32, 0) ) type DB_Comp struct { @@ -32,7 +37,7 @@ type QueryStruct struct { } const ( - DB_ModelTable core.SqlTable = "model" + DB_ModelTable core.SqlTable = "model_log" ) type IModel interface { @@ -53,13 +58,12 @@ func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) { return err } } + _delID := make([]string, 0) // 处理完成要删除的id for _data.Next(context.TODO()) { // 处理删除逻辑 data := &comm.Autogenerated{} - if err = _data.Decode(data); err == nil { - _delID = append(_delID, data.ID) - } else { + if err = _data.Decode(data); err != nil { log.Errorf("Decode Data err : %v", err) continue } @@ -67,7 +71,7 @@ func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) { if data.Act == string(comm.LogHandleType_Insert) { if len(data.D) < 2 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) - break + continue } _obj := bson.M{} for _, v := range data.D[1].(bson.D) { @@ -78,29 +82,49 @@ func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) { _, err := this.DB.InsertOne(core.SqlTable(_key), _obj) if err != nil { log.Errorf("insert %s db err:%v", (core.SqlTable(_key)), err) + ErrorLogCount[data.ID]++ + if ErrorLogCount[data.ID] >= ErrorMaxNum { // 实在是写失败了那就删除吧 + log.Errorf("insert db err max num %s db err:%v", data.ID, err) + _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID}) + if err != nil { + log.Errorf("insert %s db err:%v", data.ID, err) + } + } + continue } // _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID}) // if err != nil { - // log.Errorf("insert %s db err:%v", data.ID, err) + // log.Errorf("delete %s db err:%v", data.ID, err) // } } else if data.Act == string(comm.LogHandleType_Delete) { if len(data.D) < 2 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) - break + continue } + _key := data.D[0].(string) _objKey := make([]string, 0) for _, v := range data.D[1].(bson.D) { _objKey = append(_objKey, v.Value.(string)) } - _, err = this.DB.DeleteMany(data.D[0].(core.SqlTable), bson.M{"_id": bson.M{"$in": _objKey}}, options.Delete()) + _, err = this.DB.DeleteMany(core.SqlTable(_key), bson.M{"_id": bson.M{"$in": _objKey}}, options.Delete()) if err != nil { - log.Errorf("delete %s db err:%v", data.D[0].(core.SqlTable), err) + log.Errorf("delete %s db err:%v", core.SqlTable(_key), err) + ErrorLogCount[data.ID]++ + if ErrorLogCount[data.ID] >= ErrorMaxNum { // 实在是写失败了那就删除吧 + log.Errorf("del db err max num %s db err:%v", data.ID, err) + _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID}) + if err != nil { + log.Errorf("insert %s db err:%v", data.ID, err) + } + } + continue } } else { // update if len(data.D) < 3 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) - break + continue } + _key := data.D[0].(string) where := data.D[1].(bson.D) _obj := &QueryStruct{} for _, v := range where { @@ -110,8 +134,21 @@ func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) { for _, v := range query { _obj.Query[v.Key] = v } - this.DB.UpdateMany(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query) + _, err := this.DB.UpdateMany(core.SqlTable(_key), _obj.Selector, _obj.Query) + if err != nil { + log.Errorf("Update %s db err:%v", core.SqlTable(_key), err) + ErrorLogCount[data.ID]++ + if ErrorLogCount[data.ID] >= ErrorMaxNum { // 超过一定次数写失败了那就删除吧 + log.Errorf("update db err max num %s db err:%v", data.ID, err) + _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID}) + if err != nil { + log.Errorf("insert %s db err:%v", data.ID, err) + } + } + continue + } } + _delID = append(_delID, data.ID) // 都操作都成功了记录要删除的key } // 批量删除已处理的数据 _, err = this.DB.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$in": _delID}}, options.Delete()) diff --git a/modules/model/dbservice_comp.go b/modules/dbservice/dbservice_comp.go similarity index 91% rename from modules/model/dbservice_comp.go rename to modules/dbservice/dbservice_comp.go index 1e9a27ac3..cbe688147 100644 --- a/modules/model/dbservice_comp.go +++ b/modules/dbservice/dbservice_comp.go @@ -1,4 +1,4 @@ -package model +package dbservice import ( "go_dreamfactory/lego/core" @@ -9,12 +9,12 @@ import ( type DBService_Comp struct { cbase.ModuleCompBase task chan string - module *Model + module *DBService } func (this *DBService_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { this.ModuleCompBase.Init(service, module, comp, options) - this.module = module.(*Model) + this.module = module.(*DBService) return } diff --git a/modules/dbservice/mail_test.go b/modules/dbservice/mail_test.go new file mode 100644 index 000000000..c8dc3b648 --- /dev/null +++ b/modules/dbservice/mail_test.go @@ -0,0 +1,48 @@ +package dbservice + +import ( + "go_dreamfactory/comm" + "go_dreamfactory/lego/sys/log" + "go_dreamfactory/pb" + "os" + "testing" + "time" + + "go.mongodb.org/mongo-driver/bson/primitive" +) + +var module = new(DBService) + +func TestMain(m *testing.M) { + for i := 0; i < 50000; i++ { + //go func() { + _mail := &pb.DB_MailData{ + ObjId: primitive.NewObjectID().Hex(), + UserId: "uid123", + Title: "系统邮件", + + Contex: "恭喜获得专属礼包一份", + CreateTime: uint64(time.Now().Unix()), + DueTime: uint64(time.Now().Unix()) + 30*24*3600, + Check: false, + Reward: false, + } + //db.InsertModelLogs("mail", "uid123", _mail) + //InsertModelLogs("mail", "uid123", _mail) + data := &comm.Autogenerated{ + ID: primitive.NewObjectID().Hex(), + UID: "uid123", + Act: string(comm.LogHandleType_Insert), + } + data.D = append(data.D, "mail") // D[0] + data.D = append(data.D, _mail) // D[1] + + _, err1 := module.db_comp.DB.InsertOne("model_log", data) + if err1 != nil { + log.Errorf("insert model db err %v", err1) + } + //}() + } + time.Sleep(time.Second * 10) + defer os.Exit(m.Run()) +} diff --git a/modules/model/module.go b/modules/dbservice/module.go similarity index 71% rename from modules/model/module.go rename to modules/dbservice/module.go index 2842df047..7988738b1 100644 --- a/modules/model/module.go +++ b/modules/dbservice/module.go @@ -1,4 +1,4 @@ -package model +package dbservice import ( "go_dreamfactory/comm" @@ -7,11 +7,11 @@ import ( ) func NewModule() core.IModule { - m := new(Model) + m := new(DBService) return m } -type Model struct { +type DBService struct { modules.ModuleBase api_comp *Api_Comp db_comp *DB_Comp @@ -19,17 +19,17 @@ type Model struct { configure_comp *Configure_Comp } -func (this *Model) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) { +func (this *DBService) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) { err = this.ModuleBase.Init(service, module, options) return } -func (this *Model) GetType() core.M_Modules { +func (this *DBService) GetType() core.M_Modules { return comm.SM_LogModelModule } -func (this *Model) OnInstallComp() { +func (this *DBService) OnInstallComp() { this.ModuleBase.OnInstallComp() this.api_comp = this.RegisterComp(new(Api_Comp)).(*Api_Comp) this.db_comp = this.RegisterComp(new(DB_Comp)).(*DB_Comp) diff --git a/modules/model/mail_test.go b/modules/model/mail_test.go deleted file mode 100644 index a16685f61..000000000 --- a/modules/model/mail_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package model - -import ( - "go_dreamfactory/lego/sys/log" - "go_dreamfactory/pb" - "testing" - "time" -) - -func TestCreatemoudles(t *testing.T) { - _mail := &pb.DB_MailData{ - - UserId: "uid123", - Title: "系统邮件", - - Contex: "恭喜获得专属礼包一份", - CreateTime: uint64(time.Now().Unix()), - DueTime: uint64(time.Now().Unix()) + 30*24*3600, - Check: false, - Reward: false, - } - // obj.InsertModelLogs("mail", "uid123", _mail) - - log.Debugf("insert : %v", _mail) -} diff --git a/modules/model_comp.go b/modules/model_comp.go index 028b137cb..5a4618b5e 100644 --- a/modules/model_comp.go +++ b/modules/model_comp.go @@ -13,10 +13,6 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" ) -const ( - DB_ModelTable core.SqlTable = "model" -) - /* 基础组件 缓存组件 读写缓存数据 DB组件也封装进来 @@ -27,6 +23,10 @@ type Model_Comp struct { DB mgo.ISys } +const ( + DB_ModelTable core.SqlTable = "model_log" +) + //组件初始化接口 func (this *Model_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { this.ModuleCompBase.Init(service, module, comp, options) diff --git a/services/worker/main.go b/services/worker/main.go index a667dfb03..cca649e65 100644 --- a/services/worker/main.go +++ b/services/worker/main.go @@ -3,9 +3,9 @@ package main import ( "flag" "fmt" + "go_dreamfactory/modules/dbservice" "go_dreamfactory/modules/friend" "go_dreamfactory/modules/mail" - "go_dreamfactory/modules/model" "go_dreamfactory/modules/pack" "go_dreamfactory/modules/user" "go_dreamfactory/services" @@ -41,7 +41,7 @@ func main() { pack.NewModule(), mail.NewModule(), friend.NewModule(), - model.NewModule(), + dbservice.NewModule(), ) } diff --git a/sys/cache/init_test.go b/sys/cache/init_test.go index 45d150459..6e36c6742 100644 --- a/sys/cache/init_test.go +++ b/sys/cache/init_test.go @@ -47,7 +47,7 @@ func TestMain(m *testing.M) { data.D = append(data.D, "mail") // D[0] data.D = append(data.D, _mail) // D[1] - _, err1 := db.Defsys.Mgo().InsertOne("model", data) + _, err1 := db.Defsys.Mgo().InsertOne("model_log", data) if err1 != nil { log.Errorf("insert model db err %v", err1) }