package dbservice import ( "context" "go_dreamfactory/comm" "go_dreamfactory/lego/core" "go_dreamfactory/lego/sys/log" "go_dreamfactory/modules" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type DB_Comp struct { modules.Model_Comp task chan string _data *mongo.Cursor } func (this *DB_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { this.Model_Comp.Init(service, module, comp, options) this.task = make(chan string, TaskMaxNum) this._data = new(mongo.Cursor) return } func (this *DB_Comp) Start() (err error) { err = this.Model_Comp.Start() go this.run() return } func (this *DB_Comp) run() { for { select { case v := <-this.task: this.Model_UpdateDBByLog(v) case <-time.After(time.Second * 2): this.Model_UpdateDBByLog("") } } } func (this *DB_Comp) PushUserTask(uid string) { this.task <- uid } func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) { if uid == "" { this._data, err = this.DB.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum))) if err != nil { return err } } else { this._data, err = this.DB.Find(DB_ModelTable, bson.M{"uid": uid}, options.Find()) if err != nil { return err } } _delID := make([]string, 0) // 处理完成要删除的id for this._data.Next(context.TODO()) { // 处理删除逻辑 data := &comm.Autogenerated{} if err = this._data.Decode(data); err != nil { log.Errorf("Decode Data err : %v", err) continue } if data.Act == string(comm.LogHandleType_Insert) { if len(data.D) < 2 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) continue } _obj := bson.M{} for _, v := range data.D[1].(bson.D) { _obj[v.Key] = v.Value } _key := data.D[0].(string) _, err := this.DB.InsertOne(core.SqlTable(_key), _obj) if err != nil { log.Errorf("insert %s db err:%v", (core.SqlTable(_key)), err) ErrorLogCount[data.ID]++ if ErrorLogCount[data.ID] >= ErrorMaxNum { // 实在是写失败了那就删除吧 log.Errorf("insert db err max num %s db err:%v", data.ID, err) _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID}) if err != nil { log.Errorf("insert %s db err:%v", data.ID, err) } } continue } // _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID}) // if err != nil { // log.Errorf("delete %s db err:%v", data.ID, err) // } } else if data.Act == string(comm.LogHandleType_Delete) { if len(data.D) < 2 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) continue } _key := data.D[0].(string) _objKey := make([]string, 0) for _, v := range data.D[1].(bson.D) { _objKey = append(_objKey, v.Value.(string)) } _, err = this.DB.DeleteMany(core.SqlTable(_key), bson.M{"_id": bson.M{"$in": _objKey}}, options.Delete()) if err != nil { log.Errorf("delete %s db err:%v", core.SqlTable(_key), err) ErrorLogCount[data.ID]++ if ErrorLogCount[data.ID] >= ErrorMaxNum { // 实在是写失败了那就删除吧 log.Errorf("del db err max num %s db err:%v", data.ID, err) _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID}) if err != nil { log.Errorf("insert %s db err:%v", data.ID, err) } } continue } } else { // update if len(data.D) < 3 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) continue } _key := data.D[0].(string) where := data.D[1].(bson.D) _obj := &QueryStruct{ Selector: make(map[string]interface{}), Query: make(map[string]interface{}), } for _, v := range where { _obj.Selector[v.Key] = v } query := data.D[2].(bson.D) for _, v := range query { _obj.Query[v.Key] = v } _, err := this.DB.UpdateMany(core.SqlTable(_key), _obj.Selector, _obj.Query) if err != nil { log.Errorf("Update %s db err:%v", core.SqlTable(_key), err) ErrorLogCount[data.ID]++ if ErrorLogCount[data.ID] >= ErrorMaxNum { // 超过一定次数写失败了那就删除吧 log.Errorf("update db err max num %s db err:%v", data.ID, err) _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID}) if err != nil { log.Errorf("insert %s db err:%v", data.ID, err) } } continue } } _delID = append(_delID, data.ID) // 都操作都成功了记录要删除的key } if len(_delID) > 0 { _, err = this.DB.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$in": _delID}}, options.Delete()) // 批量删除已处理的数据 if err != nil { log.Errorf("del err %v", err) } } return } func (this *DB_Comp) Model_InsertDBByLog(data *comm.Autogenerated) (err error) { _, err = this.DB.InsertOne(DB_ModelTable, data) if err != nil { log.Errorf("insert model db err %v", err) } return err }