package db import ( "context" "fmt" "go_dreamfactory/comm" "go_dreamfactory/lego/core" "go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/sys/mgo" lgredis "go_dreamfactory/lego/sys/redis" "go_dreamfactory/lego/sys/redis/pipe" "go_dreamfactory/lego/utils/codec" "go_dreamfactory/lego/utils/codec/codecore" "go_dreamfactory/lego/utils/codec/json" "reflect" "runtime" "time" "unsafe" "github.com/go-redis/redis/v8" "github.com/modern-go/reflect2" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" ) var defconf = &codecore.Config{ SortMapKeys: true, IndentionStep: 1, OnlyTaggedField: false, DisallowUnknownFields: false, CaseSensitive: false, TagKey: "json", } const ( DB_ModelTable core.SqlTable = "model_log" ) func NewDBModel(sid string, tableName string, conn *DBConn) *DBModel { return &DBModel{ ServiceId: sid, TableName: tableName, Expired: time.Minute * 15, conn: conn, Redis: conn.Redis, DB: conn.Mgo, } } func NewDBModelNoExpired(sid string, tableName string, conn *DBConn) *DBModel { return &DBModel{ ServiceId: sid, TableName: tableName, Expired: 0, conn: conn, Redis: conn.Redis, DB: conn.Mgo, } } // DB模型 type DBModel struct { ServiceId string TableName string Expired time.Duration //过期时间 conn *DBConn //数据源 Redis lgredis.ISys DB mgo.ISys } func (this *DBModel) ukey(uid string) string { if uid == "" { return fmt.Sprintf("%s-%s:member", this.ServiceId, this.TableName) } return fmt.Sprintf("%s-%s:%s", this.ServiceId, this.TableName, uid) } func (this *DBModel) ukeylist(uid string, id string) string { if uid == "" { return fmt.Sprintf("%s-%s:%s", this.ServiceId, this.TableName, id) } return fmt.Sprintf("%s-%s:%s-%s", this.ServiceId, this.TableName, uid, id) } func (this *DBModel) InsertModelLogs(table string, uID string, target interface{}) (err error) { data := &comm.Autogenerated{ ID: primitive.NewObjectID().Hex(), UID: uID, Act: string(comm.LogHandleType_Insert), } data.D = append(data.D, table) // D[0] data.D = append(data.D, target) // D[1] _, err = this.DB.InsertOne(DB_ModelTable, data) if err != nil { log.Errorf("table:%s insert model db err %v", this.TableName, err) } return err } func (this *DBModel) InsertManyModelLogs(table string, datas map[string]interface{}) (err error) { var ( data []interface{} ) for k, v := range datas { temp := &comm.Autogenerated{ ID: primitive.NewObjectID().Hex(), UID: k, Act: string(comm.LogHandleType_Insert), } temp.D = append(temp.D, table) // D[0] temp.D = append(temp.D, []interface{}{v}) // D[1] data = append(data, temp) } _, err = this.DB.InsertMany(DB_ModelTable, data) if err != nil { log.Errorf("table:%s insert model db err %v", this.TableName, err) } return err } func (this *DBModel) DeleteModelLogs(table string, uID string, where interface{}) (err error) { data := &comm.Autogenerated{ ID: primitive.NewObjectID().Hex(), UID: uID, Act: string(comm.LogHandleType_Delete), } data.D = append(data.D, table) // D[0] data.D = append(data.D, where) // D[1] _, err = this.DB.InsertOne(DB_ModelTable, data) if err != nil { log.Errorf("table:%s insert model db err %v", this.TableName, err) } return err } func (this *DBModel) UpdateModelLogs(table string, uID string, where bson.M, target interface{}) (err error) { data := &comm.Autogenerated{ ID: primitive.NewObjectID().Hex(), UID: uID, Act: string(comm.LogHandleType_Update), } data.D = append(data.D, table) // D[0] data.D = append(data.D, where) // D[1] data.D = append(data.D, target) // D[2] _, err = this.DB.InsertOne(DB_ModelTable, data) if err != nil { log.Errorf("table:%s insert model db err %v", this.TableName, err) } return err } //创建锁对象 func (this *DBModel) NewRedisMutex(key string, outtime int) (result *lgredis.RedisMutex, err error) { result, err = this.Redis.NewRedisMutex(key, lgredis.SetExpiry(outtime)) return } // 添加新的数据 func (this *DBModel) Add(uid string, data interface{}, opt ...DBOption) (err error) { //defer log.Debug("DBModel Add", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data}) if err = this.Redis.HMSet(this.ukey(uid), data); err != nil { return } option := newDBOption(opt...) if option.IsMgoLog { err = this.InsertModelLogs(this.TableName, uid, []interface{}{data}) } if this.Expired > 0 { // err = this.Redis.Expire(this.ukey(uid), option.Expire) this.conn.UpDateModelExpired(this.ukey(uid), nil, this.Expired) } return } // 添加新的数据 func (this *DBModel) Adds(data map[string]interface{}, opt ...DBOption) (err error) { //defer log.Debug("DBModel Add", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data}) pipe := this.Redis.RedisPipe(context.TODO()) for k, v := range data { pipe.HMSet(this.ukey(k), v) } if _, err = pipe.Exec(); err != nil { return } option := newDBOption(opt...) if option.IsMgoLog { err = this.InsertManyModelLogs(this.TableName, data) } if this.Expired > 0 { for k, _ := range data { this.conn.UpDateModelExpired(this.ukey(k), nil, this.Expired) } } return } // 添加新的数据到列表 func (this *DBModel) AddList(uid string, id string, data interface{}, opt ...DBOption) (err error) { //defer log.Debug("DBModel AddList", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "_id", Value: id}, log.Field{Key: "data", Value: data}) key := this.ukeylist(uid, id) if err = this.Redis.HMSet(key, data); err != nil { return } if err = this.Redis.HSet(this.ukey(uid), id, key); err != nil { return } option := newDBOption(opt...) if option.IsMgoLog { err = this.InsertModelLogs(this.TableName, uid, []interface{}{data}) } if this.Expired > 0 { // err = this.Redis.Expire(this.ukey(uid), option.Expire) this.conn.UpDateModelExpired(this.ukey(uid), nil, this.Expired) } return } // 添加新的多个数据到列表 data map[string]type func (this *DBModel) AddLists(uid string, data interface{}, opt ...DBOption) (err error) { //defer log.Debug("DBModel AddLists", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data}) vof := reflect.ValueOf(data) if !vof.IsValid() { return fmt.Errorf("Model_Comp: AddLists(nil)") } if vof.Kind() != reflect.Map { return fmt.Errorf("Model_Comp: AddLists(non-pointer %T)", data) } listskeys := make(map[string]string) keys := vof.MapKeys() lists := make([]interface{}, 0, len(keys)) pipe := this.Redis.RedisPipe(context.TODO()) for _, k := range keys { value := vof.MapIndex(k) keydata := k.Interface().(string) valuedata := value.Interface() key := this.ukeylist(uid, keydata) pipe.HMSet(key, valuedata) listskeys[keydata] = key lists = append(lists, valuedata) } pipe.HMSetForMap(this.ukey(uid), listskeys) if _, err = pipe.Exec(); err != nil { return } option := newDBOption(opt...) if option.IsMgoLog { err = this.InsertModelLogs(this.TableName, uid, lists) } if this.Expired > 0 { childs := make(map[string]struct{}, len(listskeys)) for _, v := range listskeys { childs[v] = struct{}{} } this.conn.UpDateModelExpired(this.ukey(uid), childs, this.Expired) // this.Redis.Expire(this.ukey(uid), option.Expire) } return } // 添加队列 func (this *DBModel) AddQueues(key string, uplimit int64, data interface{}) (outkey []string, err error) { //defer log.Debug("DBModel AddQueues", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "key", Value: key}, log.Field{Key: "data", Value: data}) vof := reflect.ValueOf(data) if !vof.IsValid() { err = fmt.Errorf("Model_Comp: AddLists(nil)") return } if vof.Kind() != reflect.Map { err = fmt.Errorf("Model_Comp: AddLists(non-pointer %T)", data) return } keys := make([]string, 0) pipe := this.Redis.RedisPipe(context.TODO()) for _, k := range vof.MapKeys() { value := vof.MapIndex(k) tkey := k.Interface().(string) valuedata := value.Interface() pipe.HMSet(tkey, valuedata) keys = append(keys, tkey) } pipe.RPushForStringSlice(key, keys...) lcmd := pipe.Llen(key) if _, err = pipe.Exec(); err == nil { if lcmd.Val() > uplimit*3 { //操作3倍上限移除多余数据 off := uplimit - lcmd.Val() if outkey, err = this.Redis.LRangeToStringSlice(key, 0, int(off-1)).Result(); err != nil { return } pipe.Ltrim(key, int(off), -1) for _, v := range outkey { pipe.Delete(v) } _, err = pipe.Exec() } } return } // 修改数据多个字段 uid 作为主键 func (this *DBModel) Change(uid string, data map[string]interface{}, opt ...DBOption) (err error) { //defer log.Debug("DBModel Change", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data}) if err = this.Redis.HMSet(this.ukey(uid), data); err != nil { return } option := newDBOption(opt...) if option.IsMgoLog { err = this.UpdateModelLogs(this.TableName, uid, bson.M{"uid": uid}, data) } if this.Expired > 0 { this.conn.UpDateModelExpired(this.ukey(uid), nil, this.Expired) // this.Redis.Expire(this.ukey(uid), option.Expire) } return nil } // 修改数据多个字段 uid 作为主键 func (this *DBModel) ChangeById(id string, data map[string]interface{}, opt ...DBOption) (err error) { //defer log.Debug("DBModel Change", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data}) if err = this.Redis.HMSet(this.ukey(id), data); err != nil { return } option := newDBOption(opt...) if option.IsMgoLog { err = this.UpdateModelLogs(this.TableName, "", bson.M{"_id": id}, data) } if this.Expired > 0 { this.conn.UpDateModelExpired(this.ukey(id), nil, this.Expired) // this.Redis.Expire(this.ukey(uid), option.Expire) } return nil } // 修改列表中一个数据 func (this *DBModel) ChangeList(uid string, _id string, data map[string]interface{}, opt ...DBOption) (err error) { //defer log.Debug("DBModel ChangeList", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "_id", Value: _id}, log.Field{Key: "data", Value: data}) if err = this.Redis.HMSet(this.ukeylist(uid, _id), data); err != nil { log.Error("DBModel ChangeList", log.Field{Key: "err", Value: err.Error()}) return } option := newDBOption(opt...) if option.IsMgoLog { if uid == "" { err = this.UpdateModelLogs(this.TableName, uid, bson.M{"_id": _id}, data) } else { err = this.UpdateModelLogs(this.TableName, uid, bson.M{"_id": _id, "uid": uid}, data) } } if this.Expired > 0 { this.conn.UpDateModelExpired(this.ukey(uid), nil, this.Expired) // this.Redis.Expire(this.ukey(uid), option.Expire) } return nil } // 修改列表中多个数据 datas key是 _id value是 这个数据对象 func (this *DBModel) ChangeLists(uid string, datas map[string]interface{}, opt ...DBOption) (err error) { pipe := this.Redis.RedisPipe(context.TODO()) for k, v := range datas { if err = pipe.HMSet(this.ukeylist(uid, k), v); err != nil { log.Error("DBModel ChangeList", log.Field{Key: "err", Value: err.Error()}) return } } pipe.Exec() option := newDBOption(opt...) if option.IsMgoLog { if uid == "" { for k, v := range datas { err = this.UpdateModelLogs(this.TableName, uid, bson.M{"_id": k}, v) } } else { for k, v := range datas { err = this.UpdateModelLogs(this.TableName, uid, bson.M{"_id": k, "uid": uid}, v) } } } if this.Expired > 0 { this.conn.UpDateModelExpired(this.ukey(uid), nil, this.Expired) // this.Redis.Expire(this.ukey(uid), option.Expire) } return nil } //批量修改多个用户数据 func (this *DBModel) BatchChange(uids []string, datas map[string]interface{}, opt ...DBOption) (err error) { pipe := this.Redis.RedisPipe(context.TODO()) for _, uid := range uids { if err = pipe.HMSet(this.ukey(uid), datas); err != nil { log.Error("DBModel ChangeList", log.Field{Key: "err", Value: err.Error()}) return } } pipe.Exec() option := newDBOption(opt...) if option.IsMgoLog { for _, uid := range uids { err = this.UpdateModelLogs(this.TableName, uid, bson.M{"uid": uid}, datas) } } if this.Expired > 0 { for _, uid := range uids { this.conn.UpDateModelExpired(this.ukey(uid), nil, this.Expired) } } return nil } // 读取全部数据 func (this *DBModel) Get(uid string, data interface{}, opt ...DBOption) (err error) { //defer log.Debug("DBModel Get", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data}) if err = this.Redis.HGetAll(this.ukey(uid), data); err != nil && err != lgredis.RedisNil { return } if err == lgredis.RedisNil { if err = this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"uid": uid}).Decode(data); err != nil { return } err = this.Redis.HMSet(this.ukey(uid), data) } // option := newDBOption(opt...) if this.Expired > 0 { // this.Redis.Expire(this.ukey(uid), option.Expire) this.conn.UpDateModelExpired(this.ukey(uid), nil, this.Expired) } return } // 读取全部数据 func (this *DBModel) GetByUids(uids []string, data interface{}, opt ...DBOption) (onfound []string, err error) { //defer log.Debug("DBModel GetListObjs", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "ids", Value: ids}, log.Field{Key: "data", Value: data}) defer func() { //程序异常 收集异常信息传递给前端显示 if r := recover(); r != nil { buf := make([]byte, 4096) l := runtime.Stack(buf, false) err = fmt.Errorf("%v: %s", r, buf[:l]) log.Errorf("[DB Gets] TableName:%s ids:%v", this.TableName, uids) } }() var ( dtype reflect2.Type dkind reflect.Kind sType reflect2.Type sliceType *reflect2.UnsafeSliceType sliceelemType reflect2.Type decoder codecore.IDecoderMapJson encoder codecore.IEncoderMapJson dptr unsafe.Pointer elemPtr unsafe.Pointer n int ok bool keys map[string]string = make(map[string]string) tempdata map[string]string pipe *pipe.RedisPipe = this.Redis.RedisPipe(context.TODO()) result []*redis.StringStringMapCmd = make([]*redis.StringStringMapCmd, len(uids)) c *mongo.Cursor ) onfound = make([]string, 0, len(uids)) dptr = reflect2.PtrOf(data) dtype = reflect2.TypeOf(data) dkind = dtype.Kind() if dkind != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data) return } sType = dtype.(*reflect2.UnsafePtrType).Elem() if sType.Kind() != reflect.Slice { err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data) return } sliceType = sType.(*reflect2.UnsafeSliceType) sliceelemType = sliceType.Elem() if sliceelemType.Kind() != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data) return } if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok { err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data) return } sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem() for i, v := range uids { result[i] = pipe.HGetAllToMapString(this.ukey(v)) } if _, err = pipe.Exec(); err == nil { for i, v := range result { if tempdata, err = v.Result(); err == nil && len(tempdata) > 0 { sliceType.UnsafeGrow(dptr, n+1) elemPtr = sliceType.UnsafeGetIndex(dptr, n) if *((*unsafe.Pointer)(elemPtr)) == nil { newPtr := sliceelemType.UnsafeNew() if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil { log.Errorf("err:%v", err) return } *((*unsafe.Pointer)(elemPtr)) = newPtr } else { decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata) } n++ } else { onfound = append(onfound, uids[i]) } } } else { onfound = uids } if len(onfound) > 0 { if c, err = this.DB.Find(core.SqlTable(this.TableName), bson.M{"uid": bson.M{"$in": onfound}}); err != nil { return } else { if encoder, ok = codec.EncoderOf(sliceelemType, defconf).(codecore.IEncoderMapJson); !ok { err = fmt.Errorf("MCompModel: GetList(data not support UnMarshalMapJson %T)", data) return } pipe := this.Redis.RedisPipe(context.TODO()) for c.Next(context.Background()) { _id := c.Current.Lookup("_id").StringValue() sliceType.UnsafeGrow(dptr, n+1) elemPtr = sliceType.UnsafeGetIndex(dptr, n) if *((*unsafe.Pointer)(elemPtr)) == nil { newPtr := sliceelemType.UnsafeNew() *((*unsafe.Pointer)(elemPtr)) = newPtr } elem := sliceType.GetIndex(data, n) if err = c.Decode(elem); err != nil { return } if tempdata, err = encoder.EncodeToMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetWriter()); err != nil { return } key := this.ukey(_id) pipe.HMSetForMap(key, tempdata) keys[_id] = key n++ for i, v := range onfound { if v == _id { onfound = append(onfound[:i], onfound[i+1:]...) break } } } if len(keys) > 0 { _, err = pipe.Exec() } } } if this.Expired > 0 { for _, v := range uids { this.conn.UpDateModelExpired(this.ukey(v), nil, this.Expired) } } return } // 读取全部数据 func (this *DBModel) GetByID(id string, data interface{}, opt ...DBOption) (err error) { //defer log.Debug("DBModel Get", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data}) if err = this.Redis.HGetAll(this.ukey(id), data); err != nil && err != lgredis.RedisNil { log.Error("HGetAll 错误!", log.Field{Key: "ukey", Value: this.ukey(id)}, log.Field{Key: "err", Value: err.Error()}) return } if err == lgredis.RedisNil { if err = this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"_id": id}).Decode(data); err != nil { return } err = this.Redis.HMSet(this.ukey(id), data) } // option := newDBOption(opt...) if this.Expired > 0 { // this.Redis.Expire(this.ukey(uid), option.Expire) this.conn.UpDateModelExpired(this.ukey(id), nil, this.Expired) } return } // 读取多个数据对象 func (this *DBModel) Gets(ids []string, data interface{}, opt ...DBOption) (onfound []string, err error) { //defer log.Debug("DBModel GetListObjs", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "ids", Value: ids}, log.Field{Key: "data", Value: data}) defer func() { //程序异常 收集异常信息传递给前端显示 if r := recover(); r != nil { buf := make([]byte, 4096) l := runtime.Stack(buf, false) err = fmt.Errorf("%v: %s", r, buf[:l]) log.Errorf("[DB Gets] TableName:%s ids:%v", this.TableName, ids) } }() var ( dtype reflect2.Type dkind reflect.Kind sType reflect2.Type sliceType *reflect2.UnsafeSliceType sliceelemType reflect2.Type decoder codecore.IDecoderMapJson encoder codecore.IEncoderMapJson dptr unsafe.Pointer elemPtr unsafe.Pointer n int ok bool keys map[string]string = make(map[string]string) tempdata map[string]string pipe *pipe.RedisPipe = this.Redis.RedisPipe(context.TODO()) result []*redis.StringStringMapCmd = make([]*redis.StringStringMapCmd, len(ids)) c *mongo.Cursor ) onfound = make([]string, 0, len(ids)) dptr = reflect2.PtrOf(data) dtype = reflect2.TypeOf(data) dkind = dtype.Kind() if dkind != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data) return } sType = dtype.(*reflect2.UnsafePtrType).Elem() if sType.Kind() != reflect.Slice { err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data) return } sliceType = sType.(*reflect2.UnsafeSliceType) sliceelemType = sliceType.Elem() if sliceelemType.Kind() != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data) return } if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok { err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data) return } sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem() for i, v := range ids { result[i] = pipe.HGetAllToMapString(this.ukey(v)) } if _, err = pipe.Exec(); err == nil { for i, v := range result { if tempdata, err = v.Result(); err == nil && len(tempdata) > 0 { sliceType.UnsafeGrow(dptr, n+1) elemPtr = sliceType.UnsafeGetIndex(dptr, n) if *((*unsafe.Pointer)(elemPtr)) == nil { newPtr := sliceelemType.UnsafeNew() if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil { log.Errorf("err:%v", err) return } *((*unsafe.Pointer)(elemPtr)) = newPtr } else { decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata) } n++ } else { onfound = append(onfound, ids[i]) } } } else { onfound = ids } if len(onfound) > 0 { if c, err = this.DB.Find(core.SqlTable(this.TableName), bson.M{"_id": bson.M{"$in": onfound}}); err != nil { return } else { if encoder, ok = codec.EncoderOf(sliceelemType, defconf).(codecore.IEncoderMapJson); !ok { err = fmt.Errorf("MCompModel: GetList(data not support UnMarshalMapJson %T)", data) return } pipe := this.Redis.RedisPipe(context.TODO()) for c.Next(context.Background()) { _id := c.Current.Lookup("_id").StringValue() sliceType.UnsafeGrow(dptr, n+1) elemPtr = sliceType.UnsafeGetIndex(dptr, n) if *((*unsafe.Pointer)(elemPtr)) == nil { newPtr := sliceelemType.UnsafeNew() *((*unsafe.Pointer)(elemPtr)) = newPtr } elem := sliceType.GetIndex(data, n) if err = c.Decode(elem); err != nil { return } if tempdata, err = encoder.EncodeToMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetWriter()); err != nil { return } key := this.ukey(_id) pipe.HMSetForMap(key, tempdata) keys[_id] = key n++ for i, v := range onfound { if v == _id { onfound = append(onfound[:i], onfound[i+1:]...) break } } } if len(keys) > 0 { _, err = pipe.Exec() } } } if this.Expired > 0 { for _, v := range ids { this.conn.UpDateModelExpired(this.ukey(v), nil, this.Expired) } } return } // 获取列表数据 注意 data 必须是 切片的指针 *[]type func (this *DBModel) GetList(uid string, data interface{}) (err error) { //defer log.Debug("DBModel GetList", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data}) defer func() { //程序异常 收集异常信息传递给前端显示 if r := recover(); r != nil { buf := make([]byte, 4096) l := runtime.Stack(buf, false) err = fmt.Errorf("%v: %s", r, buf[:l]) log.Errorf("[DB GetList] TableName:%s uid:%s err:%v", this.TableName, uid, err) } }() var ( dtype reflect2.Type dkind reflect.Kind sType reflect2.Type sliceType *reflect2.UnsafeSliceType sliceelemType reflect2.Type decoder codecore.IDecoderMapJson encoder codecore.IEncoderMapJson dptr unsafe.Pointer elemPtr unsafe.Pointer n int ok bool keys map[string]string tempdata map[string]string result []*redis.StringStringMapCmd c *mongo.Cursor ) keys = make(map[string]string) dptr = reflect2.PtrOf(data) dtype = reflect2.TypeOf(data) dkind = dtype.Kind() if dkind != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data) return } sType = dtype.(*reflect2.UnsafePtrType).Elem() if sType.Kind() != reflect.Slice { err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data) return } sliceType = sType.(*reflect2.UnsafeSliceType) sliceelemType = sliceType.Elem() if sliceelemType.Kind() != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data) return } if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok { err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data) return } sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem() pipe := this.Redis.RedisPipe(context.TODO()) if keys, err = this.Redis.HGetAllToMapString(this.ukey(uid)); err == nil { result = make([]*redis.StringStringMapCmd, 0) for _, v := range keys { cmd := pipe.HGetAllToMapString(v) result = append(result, cmd) } pipe.Exec() for _, v := range result { tempdata, err = v.Result() sliceType.UnsafeGrow(dptr, n+1) elemPtr = sliceType.UnsafeGetIndex(dptr, n) if *((*unsafe.Pointer)(elemPtr)) == nil { newPtr := sliceelemType.UnsafeNew() if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil { log.Errorf("err:%v", err) return } *((*unsafe.Pointer)(elemPtr)) = newPtr } else { decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata) } n++ } } if err == lgredis.RedisNil { var f = bson.M{} if uid != "" { f = bson.M{"uid": uid} } //query from mgo if c, err = this.DB.Find(core.SqlTable(this.TableName), f); err != nil { return err } else { if encoder, ok = codec.EncoderOf(sliceelemType, defconf).(codecore.IEncoderMapJson); !ok { err = fmt.Errorf("MCompModel: GetList(data not support UnMarshalMapJson %T)", data) return } n = 0 for c.Next(context.Background()) { _id := c.Current.Lookup("_id").StringValue() sliceType.UnsafeGrow(dptr, n+1) elemPtr = sliceType.UnsafeGetIndex(dptr, n) if *((*unsafe.Pointer)(elemPtr)) == nil { newPtr := sliceelemType.UnsafeNew() *((*unsafe.Pointer)(elemPtr)) = newPtr } elem := sliceType.GetIndex(data, n) if err = c.Decode(elem); err != nil { return } if tempdata, err = encoder.EncodeToMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetWriter()); err != nil { return } key := this.ukeylist(uid, _id) pipe.HMSetForMap(key, tempdata) keys[_id] = key n++ } if len(keys) > 0 { pipe.HMSetForMap(this.ukey(uid), keys) _, err = pipe.Exec() } } } if this.Expired > 0 { childs := make(map[string]struct{}, len(keys)) for _, v := range keys { childs[v] = struct{}{} } this.conn.UpDateModelExpired(this.ukey(uid), childs, this.Expired) // this.Redis.Expire(this.ukey(uid), option.Expire) } return err } // 随机读取列表数据 func (this *DBModel) RandGetList(uid string, num int32, data interface{}) (err error) { //defer log.Debug("DBModel GetList", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data}) defer func() { //程序异常 收集异常信息传递给前端显示 if r := recover(); r != nil { buf := make([]byte, 4096) l := runtime.Stack(buf, false) err = fmt.Errorf("%v: %s", r, buf[:l]) log.Errorf("[DB GetList] TableName:%s uid:%s err:%v", this.TableName, uid, err) } }() var ( dtype reflect2.Type dkind reflect.Kind sType reflect2.Type sliceType *reflect2.UnsafeSliceType sliceelemType reflect2.Type decoder codecore.IDecoderMapJson dptr unsafe.Pointer elemPtr unsafe.Pointer n int ok bool keys map[string]string tempdata map[string]string result []*redis.StringStringMapCmd ) keys = make(map[string]string) dptr = reflect2.PtrOf(data) dtype = reflect2.TypeOf(data) dkind = dtype.Kind() if dkind != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data) return } sType = dtype.(*reflect2.UnsafePtrType).Elem() if sType.Kind() != reflect.Slice { err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data) return } sliceType = sType.(*reflect2.UnsafeSliceType) sliceelemType = sliceType.Elem() if sliceelemType.Kind() != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data) return } if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok { err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data) return } sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem() pipe := this.Redis.RedisPipe(context.TODO()) if keys, err = this.Redis.HGetAllToMapString(this.ukey(uid)); err == nil { result = make([]*redis.StringStringMapCmd, 0) for _, v := range keys { cmd := pipe.HGetAllToMapString(v) result = append(result, cmd) if len(result) >= int(num) { break } } pipe.Exec() for _, v := range result { tempdata, err = v.Result() sliceType.UnsafeGrow(dptr, n+1) elemPtr = sliceType.UnsafeGetIndex(dptr, n) if *((*unsafe.Pointer)(elemPtr)) == nil { newPtr := sliceelemType.UnsafeNew() if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil { log.Errorf("err:%v", err) return } *((*unsafe.Pointer)(elemPtr)) = newPtr } else { decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata) } n++ } } if this.Expired > 0 { childs := make(map[string]struct{}, len(keys)) for _, v := range keys { childs[v] = struct{}{} } this.conn.UpDateModelExpired(this.ukey(uid), childs, this.Expired) } return err } // 获取队列数据 func (this *DBModel) GetQueues(key string, count int, data interface{}) (err error) { //defer log.Debug("DBModel GetQueues", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "key", Value: key}, log.Field{Key: "data", Value: data}) var ( dtype reflect2.Type dkind reflect.Kind sType reflect2.Type sliceType *reflect2.UnsafeSliceType sliceelemType reflect2.Type dptr unsafe.Pointer elemPtr unsafe.Pointer decoder codecore.IDecoderMapJson ok bool n int keys = make([]string, 0) result = make([]*redis.StringStringMapCmd, 0) tempdata map[string]string ) dptr = reflect2.PtrOf(data) dtype = reflect2.TypeOf(data) dkind = dtype.Kind() if dkind != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data) return } sType = dtype.(*reflect2.UnsafePtrType).Elem() if sType.Kind() != reflect.Slice { err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data) return } sliceType = sType.(*reflect2.UnsafeSliceType) sliceelemType = sliceType.Elem() if sliceelemType.Kind() != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data) return } if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok { err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data) return } sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem() pipe := this.Redis.RedisPipe(context.TODO()) if keys, err = this.Redis.LRangeToStringSlice(key, -1*count, -1).Result(); err == nil { result = make([]*redis.StringStringMapCmd, 0) for _, v := range keys { cmd := pipe.HGetAllToMapString(v) result = append(result, cmd) } pipe.Exec() for _, v := range result { tempdata, err = v.Result() sliceType.UnsafeGrow(dptr, n+1) elemPtr = sliceType.UnsafeGetIndex(dptr, n) if *((*unsafe.Pointer)(elemPtr)) == nil { newPtr := sliceelemType.UnsafeNew() if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil { log.Errorf("err:%v", err) return } *((*unsafe.Pointer)(elemPtr)) = newPtr } else { decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata) } n++ } } return } // 读取单个数据中 多个字段数据 func (this *DBModel) GetFields(uid string, data interface{}, fields ...string) (err error) { //defer log.Debug("DBModel GetFields", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "data", Value: data}) if err = this.Redis.HMGet(this.ukey(uid), data, fields...); err != nil && err != lgredis.RedisNil { return } if err == lgredis.RedisNil { if err = this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"uid": uid}).Decode(data); err != nil { return err } if err = this.Redis.HMSet(this.ukey(uid), data); err != nil { return } } if this.Expired > 0 { this.conn.UpDateModelExpired(this.ukey(uid), nil, this.Expired) } return } // 读取List列表中单个数据中 多个字段数据 func (this *DBModel) GetListFields(uid string, id string, data interface{}, fields ...string) (err error) { //defer log.Debug("DBModel GetListFields", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "id", Value: id}, log.Field{Key: "data", Value: data}) var ( keys map[string]string tempdata map[string]string ) if err = this.Redis.HMGet(this.ukeylist(uid, id), data, fields...); err != nil && err != lgredis.RedisNil { return } if err == lgredis.RedisNil { if err = this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"_id": id}).Decode(data); err != nil { return err } else { pipe := this.Redis.RedisPipe(context.TODO()) key := this.ukeylist(uid, id) pipe.HMSetForMap(key, tempdata) keys[id] = key pipe.HMSetForMap(this.ukey(uid), keys) if _, err = pipe.Exec(); err != nil { return } } } if this.Expired > 0 { childs := make(map[string]struct{}, len(keys)) for _, v := range keys { childs[v] = struct{}{} } this.conn.UpDateModelExpired(this.ukey(uid), childs, this.Expired) } return } // 读取列表数据中单个数据 func (this *DBModel) GetListObj(uid string, id string, data interface{}) (err error) { //defer log.Debug("DBModel GetListObj", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "id", Value: id}, log.Field{Key: "data", Value: data}) var ( keys map[string]string ) keys = make(map[string]string) if err = this.Redis.HGetAll(this.ukeylist(uid, id), data); err != nil && err != lgredis.RedisNil { return } if err == lgredis.RedisNil { if err = this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"_id": id}).Decode(data); err != nil { return err } else { pipe := this.Redis.RedisPipe(context.TODO()) key := this.ukeylist(uid, id) pipe.HMSet(key, data) keys[id] = key pipe.HMSetForMap(this.ukey(uid), keys) if _, err = pipe.Exec(); err != nil { return } } } if this.Expired > 0 { childs := make(map[string]struct{}, len(keys)) for _, v := range keys { childs[v] = struct{}{} } this.conn.UpDateModelExpired(this.ukey(uid), childs, this.Expired) } return } // 读取列表数据中多条数据 func (this *DBModel) GetListObjs(uid string, ids []string, data interface{}) (err error) { //defer log.Debug("DBModel GetListObjs", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "ids", Value: ids}, log.Field{Key: "data", Value: data}) defer func() { //程序异常 收集异常信息传递给前端显示 if r := recover(); r != nil { buf := make([]byte, 4096) l := runtime.Stack(buf, false) err = fmt.Errorf("%v: %s", r, buf[:l]) log.Errorf("[DB GetListObjs] TableName:%s uid:%s", this.TableName, uid) } }() var ( dtype reflect2.Type dkind reflect.Kind sType reflect2.Type sliceType *reflect2.UnsafeSliceType sliceelemType reflect2.Type decoder codecore.IDecoderMapJson encoder codecore.IEncoderMapJson dptr unsafe.Pointer elemPtr unsafe.Pointer n int ok bool keys map[string]string = make(map[string]string) tempdata map[string]string onfound []string = make([]string, 0, len(ids)) pipe *pipe.RedisPipe = this.Redis.RedisPipe(context.TODO()) result []*redis.StringStringMapCmd = make([]*redis.StringStringMapCmd, len(ids)) c *mongo.Cursor ) dptr = reflect2.PtrOf(data) dtype = reflect2.TypeOf(data) dkind = dtype.Kind() if dkind != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data) return } sType = dtype.(*reflect2.UnsafePtrType).Elem() if sType.Kind() != reflect.Slice { err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data) return } sliceType = sType.(*reflect2.UnsafeSliceType) sliceelemType = sliceType.Elem() if sliceelemType.Kind() != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data) return } if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok { err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data) return } sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem() for i, v := range ids { result[i] = pipe.HGetAllToMapString(this.ukeylist(uid, v)) } if _, err = pipe.Exec(); err == nil { for i, v := range result { if tempdata, err = v.Result(); err == nil && len(tempdata) > 0 { sliceType.UnsafeGrow(dptr, n+1) elemPtr = sliceType.UnsafeGetIndex(dptr, n) if *((*unsafe.Pointer)(elemPtr)) == nil { newPtr := sliceelemType.UnsafeNew() if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil { log.Errorf("err:%v", err) return } *((*unsafe.Pointer)(elemPtr)) = newPtr } else { decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata) } n++ } else { onfound = append(onfound, ids[i]) } } } else { onfound = ids } if len(onfound) > 0 { if c, err = this.DB.Find(core.SqlTable(this.TableName), bson.M{"_id": bson.M{"$in": onfound}}); err != nil { return err } else { if encoder, ok = codec.EncoderOf(sliceelemType, defconf).(codecore.IEncoderMapJson); !ok { err = fmt.Errorf("MCompModel: GetList(data not support UnMarshalMapJson %T)", data) return } pipe := this.Redis.RedisPipe(context.TODO()) for c.Next(context.Background()) { _id := c.Current.Lookup("_id").StringValue() sliceType.UnsafeGrow(dptr, n+1) elemPtr = sliceType.UnsafeGetIndex(dptr, n) if *((*unsafe.Pointer)(elemPtr)) == nil { newPtr := sliceelemType.UnsafeNew() *((*unsafe.Pointer)(elemPtr)) = newPtr } elem := sliceType.GetIndex(data, n) if err = c.Decode(elem); err != nil { return } if tempdata, err = encoder.EncodeToMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetWriter()); err != nil { return } for i, v := range onfound { if v == _id { onfound = append(onfound[0:i], onfound[i+1:]...) } } key := this.ukeylist(uid, _id) pipe.HMSetForMap(key, tempdata) keys[_id] = key n++ } if len(keys) > 0 { pipe.HMSetForMap(this.ukey(uid), keys) _, err = pipe.Exec() } } } if len(onfound) > 0 { err = fmt.Errorf("onfound:%v data!", onfound) } if this.Expired > 0 { childs := make(map[string]struct{}, len(keys)) for _, v := range keys { childs[v] = struct{}{} } this.conn.UpDateModelExpired(this.ukey(uid), childs, this.Expired) } return } // 读取列表数据中多条数据 func (this *DBModel) GetRedisListObjs(uid string, ids []string, data interface{}) (onfound []string, err error) { //defer log.Debug("DBModel GetListObjs", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "ids", Value: ids}, log.Field{Key: "data", Value: data}) defer func() { //程序异常 收集异常信息传递给前端显示 if r := recover(); r != nil { buf := make([]byte, 4096) l := runtime.Stack(buf, false) err = fmt.Errorf("%v: %s", r, buf[:l]) log.Errorf("[DB GetListObjs] TableName:%s uid:%s", this.TableName, uid) } }() var ( dtype reflect2.Type dkind reflect.Kind sType reflect2.Type sliceType *reflect2.UnsafeSliceType sliceelemType reflect2.Type decoder codecore.IDecoderMapJson dptr unsafe.Pointer elemPtr unsafe.Pointer n int ok bool tempdata map[string]string pipe *pipe.RedisPipe = this.Redis.RedisPipe(context.TODO()) result []*redis.StringStringMapCmd = make([]*redis.StringStringMapCmd, len(ids)) ) onfound = make([]string, 0, len(ids)) dptr = reflect2.PtrOf(data) dtype = reflect2.TypeOf(data) dkind = dtype.Kind() if dkind != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data) return } sType = dtype.(*reflect2.UnsafePtrType).Elem() if sType.Kind() != reflect.Slice { err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data) return } sliceType = sType.(*reflect2.UnsafeSliceType) sliceelemType = sliceType.Elem() if sliceelemType.Kind() != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data) return } if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok { err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data) return } sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem() for i, v := range ids { result[i] = pipe.HGetAllToMapString(this.ukeylist(uid, v)) } if _, err = pipe.Exec(); err == nil { for i, v := range result { if tempdata, err = v.Result(); err == nil { sliceType.UnsafeGrow(dptr, n+1) elemPtr = sliceType.UnsafeGetIndex(dptr, n) if *((*unsafe.Pointer)(elemPtr)) == nil { newPtr := sliceelemType.UnsafeNew() if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil { log.Errorf("err:%v", err) return } *((*unsafe.Pointer)(elemPtr)) = newPtr } else { decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata) } n++ } else { onfound = append(onfound, ids[i]) } } } else { onfound = ids } return } // 删除目标数据 func (this *DBModel) Del(id string, opt ...DBOption) (err error) { //defer log.Debug("DBModel Del", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}) err = this.Redis.Delete(this.ukey(id)) if err != nil { return err } option := newDBOption(opt...) if option.IsMgoLog { err = this.DeleteModelLogs(this.TableName, "", bson.M{"_id": id}) } return nil } // 删除用户数据 func (this *DBModel) DelByUId(uid string, opt ...DBOption) (err error) { //defer log.Debug("DBModel Del", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}) err = this.Redis.Delete(this.ukey(uid)) if err != nil { return err } option := newDBOption(opt...) if option.IsMgoLog { err = this.DeleteModelLogs(this.TableName, uid, bson.M{"uid": uid}) } return nil } // 删除多条数据 func (this *DBModel) DelListlds(uid string, ids []string, opt ...DBOption) (err error) { //defer log.Debug("DBModel DelListlds", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "ids", Value: ids}) listkey := this.ukey(uid) pipe := this.Redis.RedisPipe(context.TODO()) for _, v := range ids { key := this.ukeylist(uid, v) pipe.Delete(key) } pipe.HDel(listkey, ids...) if _, err = pipe.Exec(); err == nil { option := newDBOption(opt...) if option.IsMgoLog { err = this.DeleteModelLogs(this.TableName, uid, bson.M{"_id": bson.M{"$in": ids}}) } } return } // 批量删除数据 func (this *DBModel) BatchDelLists(uid string) (err error) { //defer log.Debug("DBModel BatchDelLists", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}) var keys map[string]string pipe := this.Redis.RedisPipe(context.TODO()) if keys, err = this.Redis.HGetAllToMapString(this.ukey(uid)); err == nil { for _, v := range keys { pipe.Delete(v) } pipe.Delete(this.ukey(uid)) pipe.Exec() } return } // 删除卸载redis func (this *DBModel) RedisDels(uids []string) (err error) { pipe := this.Redis.RedisPipe(context.TODO()) for _, uid := range uids { if err = pipe.Delete(this.ukey(uid)); err != nil { log.Error("DBModel RedisDels", log.Field{Key: "err", Value: err.Error()}) return } } _, err = pipe.Exec() return }