package dbservice import ( "context" "go_dreamfactory/comm" "go_dreamfactory/lego/core" "go_dreamfactory/lego/sys/log" "go_dreamfactory/modules" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) const ( WriteMaxNum uint32 = 1000 //一次性最处理条数 ErrorMaxNum uint32 = 5 // 数据库操作最大失败次数 ) var ( ErrorLogCount = make(map[string]uint32, 0) ) type DB_Comp struct { modules.Model_Comp } // type data struct { // Table string // Wheremap map[string]interface{} // 如果是insert 条件就是nil del 只有条件 // Modifymap map[string]map[string]interface{} // } type QueryStruct struct { Selector bson.M Query bson.M } const ( DB_ModelTable core.SqlTable = "model_log" ) type IModel interface { Model_UpdateDBByLog() (err error) // 读取日志并更新对应的表 Model_InsertDBByLog(data *comm.Autogenerated) (err error) // 插入日志 } func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) { _data := &mongo.Cursor{} if uid == "" { _data, err = this.DB.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum))) if err != nil { return err } } else { _data, err = this.DB.Find(DB_ModelTable, bson.M{"uid": uid}, options.Find()) if err != nil { return err } } _delID := make([]string, 0) // 处理完成要删除的id for _data.Next(context.TODO()) { // 处理删除逻辑 data := &comm.Autogenerated{} if err = _data.Decode(data); err != nil { log.Errorf("Decode Data err : %v", err) continue } if data.Act == string(comm.LogHandleType_Insert) { if len(data.D) < 2 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) continue } _obj := bson.M{} for _, v := range data.D[1].(bson.D) { _obj[v.Key] = v.Value } _key := data.D[0].(string) _, err := this.DB.InsertOne(core.SqlTable(_key), _obj) if err != nil { log.Errorf("insert %s db err:%v", (core.SqlTable(_key)), err) ErrorLogCount[data.ID]++ if ErrorLogCount[data.ID] >= ErrorMaxNum { // 实在是写失败了那就删除吧 log.Errorf("insert db err max num %s db err:%v", data.ID, err) _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID}) if err != nil { log.Errorf("insert %s db err:%v", data.ID, err) } } continue } // _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID}) // if err != nil { // log.Errorf("delete %s db err:%v", data.ID, err) // } } else if data.Act == string(comm.LogHandleType_Delete) { if len(data.D) < 2 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) continue } _key := data.D[0].(string) _objKey := make([]string, 0) for _, v := range data.D[1].(bson.D) { _objKey = append(_objKey, v.Value.(string)) } _, err = this.DB.DeleteMany(core.SqlTable(_key), bson.M{"_id": bson.M{"$in": _objKey}}, options.Delete()) if err != nil { log.Errorf("delete %s db err:%v", core.SqlTable(_key), err) ErrorLogCount[data.ID]++ if ErrorLogCount[data.ID] >= ErrorMaxNum { // 实在是写失败了那就删除吧 log.Errorf("del db err max num %s db err:%v", data.ID, err) _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID}) if err != nil { log.Errorf("insert %s db err:%v", data.ID, err) } } continue } } else { // update if len(data.D) < 3 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) continue } _key := data.D[0].(string) where := data.D[1].(bson.D) _obj := &QueryStruct{} for _, v := range where { _obj.Selector[v.Key] = v } query := data.D[2].(bson.D) for _, v := range query { _obj.Query[v.Key] = v } _, err := this.DB.UpdateMany(core.SqlTable(_key), _obj.Selector, _obj.Query) if err != nil { log.Errorf("Update %s db err:%v", core.SqlTable(_key), err) ErrorLogCount[data.ID]++ if ErrorLogCount[data.ID] >= ErrorMaxNum { // 超过一定次数写失败了那就删除吧 log.Errorf("update db err max num %s db err:%v", data.ID, err) _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID}) if err != nil { log.Errorf("insert %s db err:%v", data.ID, err) } } continue } } _delID = append(_delID, data.ID) // 都操作都成功了记录要删除的key } // 批量删除已处理的数据 _, err = this.DB.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$in": _delID}}, options.Delete()) if err != nil { log.Errorf("del err %v", err) } return } func (this *DB_Comp) Model_InsertDBByLog(data *comm.Autogenerated) (err error) { _, err = this.DB.InsertOne(DB_ModelTable, data) if err != nil { log.Errorf("insert model db err %v", err) } return err }