package model import ( "context" "errors" "go_dreamfactory/lego/core" "go_dreamfactory/lego/core/cbase" "go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/sys/mgo" "go_dreamfactory/modules" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" ) const ( WriteMaxNum uint32 = 1000 //一次性最处理条数 ) type DB_Comp struct { cbase.ModuleCompBase mgo mgo.ISys } type Autogenerated struct { ID string `json:"_id"` UID string `json:"uid"` Act string `json:"act"` // insert update delete D []interface{} } // type data struct { // Table string // Wheremap map[string]interface{} // 如果是insert 条件就是nil del 只有条件 // Modifymap map[string]map[string]interface{} // } type QueryStruct struct { Selector bson.M Query bson.M } const ( DB_ModelTable core.SqlTable = "model" ) type IModel interface { Model_UpdateDBBylog() (err error) // 读取日志并更新对应的表 Model_InsertDBBylog(data *Autogenerated) (err error) // 插入日志 Model_UpdateUserDataByUid(uid string) (err error) // 读取指定玩家信息到db中 } func (this *DB_Comp) Model_UpdateDBBylog() (err error) { _delID := make([]string, 0) // 处理完成要删除的id _data, err := this.mgo.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum))) if err != nil { return err } for _data.Next(context.TODO()) { // 处理删除逻辑 data := &Autogenerated{} if err = _data.Decode(data); err == nil { _delID = append(_delID, data.ID) } else { log.Errorf("Decode Data err : %v", err) continue } log.Debugf("======= insert log : %+v =======", data) if data.Act == string(modules.LogHandleType_Insert) { if len(data.D) < 2 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) return errors.New("parameter len err") } query := data.D[1].([]interface{}) _, err := this.mgo.InsertMany(data.D[0].(core.SqlTable), query) if err != nil { log.Errorf("insert %s db err:%v", data.D[0].(core.SqlTable), err) } } else if data.Act == string(modules.LogHandleType_Delete) { if len(data.D) < 2 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) return errors.New("parameter len err") } _, err := this.mgo.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{})) if err != nil { log.Errorf("delete %s db err:%v", data.D[0].(core.SqlTable), err) } } else { // update if len(data.D) < 3 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) return errors.New("parameter len err") } where := data.D[1].(map[string]interface{}) _obj := &QueryStruct{} for k, v := range where { _obj.Selector[k] = v } query := data.D[2].(map[string]interface{}) for k, v := range query { _obj.Query[k] = v } this.mgo.UpdateMany(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query) } } // 批量删除已处理的数据 _, err = this.mgo.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$inc": _delID}}, options.Delete()) if err != nil { log.Errorf("del err %v", err) } return } func (this *DB_Comp) Model_InsertDBBylog(data *Autogenerated) (err error) { _, err = this.mgo.InsertOne(DB_ModelTable, data) if err != nil { log.Errorf("insert model db err %v", err) } return err } func (this *DB_Comp) Model_UpdateUserDataByUid(uid string) (err error) { _delID := make([]string, 0) // 处理完成要删除的id _data, err := this.mgo.Find(DB_ModelTable, bson.M{"userid": uid}, options.Find()) if err != nil { return err } for _data.Next(context.TODO()) { // 处理删除逻辑 data := &Autogenerated{} if err = _data.Decode(data); err == nil { _delID = append(_delID, data.ID) } else { log.Errorf("Decode Data err : %v", err) continue } log.Debugf("======= insert log : %+v =======", data) if data.Act == string(modules.LogHandleType_Insert) { if len(data.D) < 2 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) return errors.New("parameter len err") } query := data.D[1].([]interface{}) _, err := this.mgo.InsertMany(data.D[0].(core.SqlTable), query) if err != nil { log.Errorf("insert %s db err:%v", data.D[0].(core.SqlTable), err) } } else if data.Act == string(modules.LogHandleType_Delete) { if len(data.D) < 2 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) return errors.New("parameter len err") } _, err := this.mgo.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{})) if err != nil { log.Errorf("delete %s db err:%v", data.D[0].(core.SqlTable), err) } } else { // update if len(data.D) < 3 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) return errors.New("parameter len err") } where := data.D[1].(map[string]interface{}) _obj := &QueryStruct{} for k, v := range where { _obj.Selector[k] = v } query := data.D[2].(map[string]interface{}) for k, v := range query { _obj.Query[k] = v } this.mgo.FindOneAndUpdate(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query) } } // 批量删除已处理的数据 _, err = this.mgo.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$inc": _delID}}, options.Delete()) if err != nil { log.Errorf("del err %v", err) } return }