优化db数据成 过期系统 跨服兼容机制
This commit is contained in:
parent
566f6855ce
commit
3675b5ac52
@ -7,7 +7,9 @@ import (
|
|||||||
"go_dreamfactory/modules"
|
"go_dreamfactory/modules"
|
||||||
"go_dreamfactory/pb"
|
"go_dreamfactory/pb"
|
||||||
cfg "go_dreamfactory/sys/configure/structs"
|
cfg "go_dreamfactory/sys/configure/structs"
|
||||||
|
"go_dreamfactory/sys/db"
|
||||||
"math"
|
"math"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
///星座图 数据组件
|
///星座图 数据组件
|
||||||
@ -36,25 +38,53 @@ func (this *modelHoroscope) queryInfo(uId string) (result *pb.DBHoroscope, err e
|
|||||||
|
|
||||||
///保存用户竞技场信息
|
///保存用户竞技场信息
|
||||||
func (this *modelHoroscope) updateInfo(info *pb.DBHoroscope) (err error) {
|
func (this *modelHoroscope) updateInfo(info *pb.DBHoroscope) (err error) {
|
||||||
|
var (
|
||||||
|
model *db.DBModel
|
||||||
|
)
|
||||||
err = this.Change(info.Uid, map[string]interface{}{
|
err = this.Change(info.Uid, map[string]interface{}{
|
||||||
"nodes": info.Nodes,
|
"nodes": info.Nodes,
|
||||||
})
|
})
|
||||||
|
if model, err = this.module.GetDBNoduleByUid(info.Uid, this.TableName, time.Hour); err != nil {
|
||||||
|
this.module.Errorln(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if heros := this.module.ModuleHero.GetHeroListByUse(info.Uid); heros != nil && len(heros) > 0 {
|
||||||
|
for _, v := range heros {
|
||||||
|
if err = this.compute(info, v); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = model.ChangeList(info.Uid, v.Id, map[string]interface{}{
|
||||||
|
"horoscopeProperty": v.HoroscopeProperty,
|
||||||
|
}); err != nil {
|
||||||
|
this.module.Errorln(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
//计算英雄属性
|
//计算英雄属性
|
||||||
func (this *modelHoroscope) computeHeroNumeric(hero *pb.DBHero) (err error) {
|
func (this *modelHoroscope) computeHeroNumeric(hero *pb.DBHero) (err error) {
|
||||||
var (
|
var (
|
||||||
info *pb.DBHoroscope
|
info *pb.DBHoroscope
|
||||||
node *cfg.GameHoroscopeData
|
|
||||||
heroconf *cfg.GameHeroData
|
|
||||||
)
|
)
|
||||||
if hero.IsOverlying {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if info, err = this.queryInfo(hero.Uid); err != nil {
|
if info, err = this.queryInfo(hero.Uid); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
err = this.compute(info, hero)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//计算属性
|
||||||
|
func (this *modelHoroscope) compute(info *pb.DBHoroscope, hero *pb.DBHero) (err error) {
|
||||||
|
if hero.IsOverlying {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
heroconf *cfg.GameHeroData
|
||||||
|
node *cfg.GameHoroscopeData
|
||||||
|
)
|
||||||
hero.HoroscopeProperty = make(map[string]int32)
|
hero.HoroscopeProperty = make(map[string]int32)
|
||||||
if heroconf, err = this.module.configure.getHeroConfig(hero.HeroID); err != nil {
|
if heroconf, err = this.module.configure.getHeroConfig(hero.HeroID); err != nil {
|
||||||
this.module.Errorln(err)
|
this.module.Errorln(err)
|
||||||
|
@ -18,8 +18,6 @@ type (
|
|||||||
ServerDBConn(stage string) (conn *DBConn, err error)
|
ServerDBConn(stage string) (conn *DBConn, err error)
|
||||||
///获取区服列表标签
|
///获取区服列表标签
|
||||||
GetServerTags() []string
|
GetServerTags() []string
|
||||||
//更新数据过期
|
|
||||||
UpDateModelExpired(key string, childs map[string]struct{}, expired time.Duration)
|
|
||||||
}
|
}
|
||||||
//过期数据
|
//过期数据
|
||||||
ModelDataExpired struct {
|
ModelDataExpired struct {
|
||||||
@ -75,8 +73,3 @@ func ServerDBConn(stage string) (conn *DBConn, err error) {
|
|||||||
func GetServerTags() []string {
|
func GetServerTags() []string {
|
||||||
return defsys.GetServerTags()
|
return defsys.GetServerTags()
|
||||||
}
|
}
|
||||||
|
|
||||||
//更新数据过期
|
|
||||||
func UpDateModelExpired(key string, childs map[string]struct{}, expired time.Duration) {
|
|
||||||
defsys.UpDateModelExpired(key, childs, expired)
|
|
||||||
}
|
|
||||||
|
15
sys/db/db.go
15
sys/db/db.go
@ -8,18 +8,14 @@ import (
|
|||||||
"go_dreamfactory/lego/utils/codec/json"
|
"go_dreamfactory/lego/utils/codec/json"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func newSys(options *Options) (sys *DB, err error) {
|
func newSys(options *Options) (sys *DB, err error) {
|
||||||
sys = &DB{
|
sys = &DB{
|
||||||
options: options,
|
options: options,
|
||||||
servers: make(map[string]*DBConn),
|
servers: make(map[string]*DBConn),
|
||||||
data: make(map[string]*ModelDataExpired),
|
|
||||||
}
|
|
||||||
if err = sys.init(); err != nil {
|
|
||||||
go sys.run()
|
|
||||||
}
|
}
|
||||||
|
sys.init()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -28,12 +24,10 @@ type DB struct {
|
|||||||
local *DBConn
|
local *DBConn
|
||||||
cross *DBConn
|
cross *DBConn
|
||||||
servers map[string]*DBConn
|
servers map[string]*DBConn
|
||||||
mu sync.RWMutex
|
|
||||||
data map[string]*ModelDataExpired //过期数据
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *DB) init() (err error) {
|
func (this *DB) init() (err error) {
|
||||||
if this.local, err = newDBConn(DBConfig{
|
if this.local, err = newDBConn(this.options.Log, DBConfig{
|
||||||
RedisIsCluster: this.options.RedisIsCluster,
|
RedisIsCluster: this.options.RedisIsCluster,
|
||||||
RedisAddr: this.options.RedisAddr,
|
RedisAddr: this.options.RedisAddr,
|
||||||
RedisPassword: this.options.RedisPassword,
|
RedisPassword: this.options.RedisPassword,
|
||||||
@ -69,7 +63,7 @@ func (this *DB) readercrossconf(path string) (err error) {
|
|||||||
return
|
return
|
||||||
} else {
|
} else {
|
||||||
if !this.options.IsCross {
|
if !this.options.IsCross {
|
||||||
if this.cross, err = newDBConn(DBConfig{
|
if this.cross, err = newDBConn(this.options.Log, DBConfig{
|
||||||
RedisIsCluster: cf.LoaclDB.RedisIsCluster,
|
RedisIsCluster: cf.LoaclDB.RedisIsCluster,
|
||||||
RedisAddr: cf.LoaclDB.RedisAddr,
|
RedisAddr: cf.LoaclDB.RedisAddr,
|
||||||
RedisPassword: cf.LoaclDB.RedisPassword,
|
RedisPassword: cf.LoaclDB.RedisPassword,
|
||||||
@ -82,7 +76,7 @@ func (this *DB) readercrossconf(path string) (err error) {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for k, v := range cf.ServiceList {
|
for k, v := range cf.ServiceList {
|
||||||
if this.servers[k], err = newDBConn(DBConfig{
|
if this.servers[k], err = newDBConn(this.options.Log, DBConfig{
|
||||||
RedisIsCluster: v.RedisIsCluster,
|
RedisIsCluster: v.RedisIsCluster,
|
||||||
RedisAddr: v.RedisAddr,
|
RedisAddr: v.RedisAddr,
|
||||||
RedisPassword: v.RedisPassword,
|
RedisPassword: v.RedisPassword,
|
||||||
@ -121,7 +115,6 @@ func (this *DB) Cross() (conn *DBConn, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (this *DB) ServerDBConn(stage string) (conn *DBConn, err error) {
|
func (this *DB) ServerDBConn(stage string) (conn *DBConn, err error) {
|
||||||
ok := false
|
ok := false
|
||||||
conn, ok = this.servers[stage]
|
conn, ok = this.servers[stage]
|
||||||
|
915
sys/db/dbconn.go
915
sys/db/dbconn.go
@ -2,43 +2,17 @@ package db
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"go_dreamfactory/comm"
|
|
||||||
"go_dreamfactory/lego/core"
|
|
||||||
"go_dreamfactory/lego/sys/log"
|
"go_dreamfactory/lego/sys/log"
|
||||||
"go_dreamfactory/lego/sys/mgo"
|
"go_dreamfactory/lego/sys/mgo"
|
||||||
lgredis "go_dreamfactory/lego/sys/redis"
|
lgredis "go_dreamfactory/lego/sys/redis"
|
||||||
"go_dreamfactory/lego/sys/redis/pipe"
|
"sync"
|
||||||
"go_dreamfactory/lego/utils/codec"
|
|
||||||
"go_dreamfactory/lego/utils/codec/codecore"
|
|
||||||
"go_dreamfactory/lego/utils/codec/json"
|
|
||||||
"reflect"
|
|
||||||
"time"
|
"time"
|
||||||
"unsafe"
|
|
||||||
|
|
||||||
"github.com/go-redis/redis/v8"
|
|
||||||
|
|
||||||
"github.com/modern-go/reflect2"
|
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
|
||||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var defconf = &codecore.Config{
|
func newDBConn(lg log.ILogger, conf DBConfig) (conn *DBConn, err error) {
|
||||||
SortMapKeys: true,
|
conn = &DBConn{
|
||||||
IndentionStep: 1,
|
log: lg,
|
||||||
OnlyTaggedField: false,
|
}
|
||||||
DisallowUnknownFields: false,
|
|
||||||
CaseSensitive: false,
|
|
||||||
TagKey: "json",
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
DB_ModelTable core.SqlTable = "model_log"
|
|
||||||
)
|
|
||||||
|
|
||||||
func newDBConn(conf DBConfig) (conn *DBConn, err error) {
|
|
||||||
conn = &DBConn{}
|
|
||||||
if conf.RedisIsCluster {
|
if conf.RedisIsCluster {
|
||||||
conn.Redis, err = lgredis.NewSys(
|
conn.Redis, err = lgredis.NewSys(
|
||||||
lgredis.SetRedisType(lgredis.Redis_Cluster),
|
lgredis.SetRedisType(lgredis.Redis_Cluster),
|
||||||
@ -53,864 +27,91 @@ func newDBConn(conf DBConfig) (conn *DBConn, err error) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err.Error(), log.Field{"config", conf})
|
lg.Error(err.Error(), log.Field{Key: "config", Value: conf})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if conn.Mgo, err = mgo.NewSys(
|
if conn.Mgo, err = mgo.NewSys(
|
||||||
mgo.SetMongodbUrl(conf.MongodbUrl),
|
mgo.SetMongodbUrl(conf.MongodbUrl),
|
||||||
mgo.SetMongodbDatabase(conf.MongodbDatabase),
|
mgo.SetMongodbDatabase(conf.MongodbDatabase),
|
||||||
); err != nil {
|
); err != nil {
|
||||||
log.Error(err.Error(), log.Field{"config", conf})
|
lg.Error(err.Error(), log.Field{Key: "config", Value: conf})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
go conn.run()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
type DBConn struct {
|
type DBConn struct {
|
||||||
|
log log.ILogger
|
||||||
Redis lgredis.ISys
|
Redis lgredis.ISys
|
||||||
Mgo mgo.ISys
|
Mgo mgo.ISys
|
||||||
|
mu sync.RWMutex
|
||||||
|
data map[string]*ModelDataExpired //数据自动过期
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDBModel(tableName string, expired time.Duration, conn *DBConn) *DBModel {
|
//更新数据模块过期
|
||||||
return &DBModel{
|
func (this *DBConn) UpDateModelExpired(key string, childs map[string]struct{}, expired time.Duration) {
|
||||||
TableName: tableName,
|
this.mu.RLock()
|
||||||
Expired: expired,
|
exp, ok := this.data[key]
|
||||||
Redis: conn.Redis,
|
this.mu.RUnlock()
|
||||||
DB: conn.Mgo,
|
if ok {
|
||||||
}
|
if childs != nil {
|
||||||
}
|
if exp.keys == nil {
|
||||||
|
exp.keys = make(map[string]struct{})
|
||||||
//DB模型
|
|
||||||
type DBModel struct {
|
|
||||||
TableName string
|
|
||||||
Expired time.Duration //过期时间
|
|
||||||
Redis lgredis.ISys
|
|
||||||
DB mgo.ISys
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *DBModel) ukey(uid string) string {
|
|
||||||
if uid == "" {
|
|
||||||
return fmt.Sprintf("%s:member", this.TableName)
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("%s:%s", this.TableName, uid)
|
|
||||||
}
|
|
||||||
func (this *DBModel) ukeylist(uid string, id string) string {
|
|
||||||
if uid == "" {
|
|
||||||
return fmt.Sprintf("%s:%s", this.TableName, id)
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("%s:%s-%s", this.TableName, uid, id)
|
|
||||||
}
|
|
||||||
func (this *DBModel) InsertModelLogs(table string, uID string, target interface{}) (err error) {
|
|
||||||
|
|
||||||
data := &comm.Autogenerated{
|
|
||||||
ID: primitive.NewObjectID().Hex(),
|
|
||||||
UID: uID,
|
|
||||||
Act: string(comm.LogHandleType_Insert),
|
|
||||||
}
|
|
||||||
data.D = append(data.D, table) // D[0]
|
|
||||||
data.D = append(data.D, target) // D[1]
|
|
||||||
|
|
||||||
_, err = this.DB.InsertOne(DB_ModelTable, data)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("insert model db err %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
func (this *DBModel) DeleteModelLogs(table string, uID string, where interface{}) (err error) {
|
|
||||||
|
|
||||||
data := &comm.Autogenerated{
|
|
||||||
ID: primitive.NewObjectID().Hex(),
|
|
||||||
UID: uID,
|
|
||||||
Act: string(comm.LogHandleType_Delete),
|
|
||||||
}
|
|
||||||
|
|
||||||
data.D = append(data.D, table) // D[0]
|
|
||||||
data.D = append(data.D, where) // D[1]
|
|
||||||
|
|
||||||
_, err = this.DB.InsertOne(DB_ModelTable, data)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("insert model db err %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *DBModel) UpdateModelLogs(table string, uID string, where bson.M, target interface{}) (err error) {
|
|
||||||
|
|
||||||
data := &comm.Autogenerated{
|
|
||||||
ID: primitive.NewObjectID().Hex(),
|
|
||||||
UID: uID,
|
|
||||||
Act: string(comm.LogHandleType_Update),
|
|
||||||
}
|
|
||||||
data.D = append(data.D, table) // D[0]
|
|
||||||
data.D = append(data.D, where) // D[1]
|
|
||||||
data.D = append(data.D, target) // D[2]
|
|
||||||
|
|
||||||
_, err = this.DB.InsertOne(DB_ModelTable, data)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("insert model db err %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
///创建锁对象
|
|
||||||
func (this *DBModel) NewRedisMutex(key string, outtime int) (result *lgredis.RedisMutex, err error) {
|
|
||||||
result, err = this.Redis.NewRedisMutex(key, lgredis.SetExpiry(outtime))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//添加新的数据
|
|
||||||
func (this *DBModel) Add(uid string, data interface{}, opt ...DBOption) (err error) {
|
|
||||||
//defer log.Debug("DBModel Add", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data})
|
|
||||||
if err = this.Redis.HMSet(this.ukey(uid), data); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
option := newDBOption(opt...)
|
|
||||||
if option.IsMgoLog {
|
|
||||||
err = this.InsertModelLogs(this.TableName, uid, []interface{}{data})
|
|
||||||
}
|
|
||||||
if this.Expired > 0 {
|
|
||||||
// err = this.Redis.Expire(this.ukey(uid), option.Expire)
|
|
||||||
UpDateModelExpired(this.ukey(uid), nil, this.Expired)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//添加新的数据到列表
|
|
||||||
func (this *DBModel) AddList(uid string, id string, data interface{}, opt ...DBOption) (err error) {
|
|
||||||
//defer log.Debug("DBModel AddList", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "_id", Value: id}, log.Field{Key: "data", Value: data})
|
|
||||||
key := this.ukeylist(uid, id)
|
|
||||||
if err = this.Redis.HMSet(key, data); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err = this.Redis.HSet(this.ukey(uid), id, key); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
option := newDBOption(opt...)
|
|
||||||
if option.IsMgoLog {
|
|
||||||
err = this.InsertModelLogs(this.TableName, uid, []interface{}{data})
|
|
||||||
}
|
|
||||||
if this.Expired > 0 {
|
|
||||||
// err = this.Redis.Expire(this.ukey(uid), option.Expire)
|
|
||||||
UpDateModelExpired(this.ukey(uid), nil, this.Expired)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//添加新的多个数据到列表 data map[string]type
|
|
||||||
func (this *DBModel) AddLists(uid string, data interface{}, opt ...DBOption) (err error) {
|
|
||||||
//defer log.Debug("DBModel AddLists", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data})
|
|
||||||
vof := reflect.ValueOf(data)
|
|
||||||
if !vof.IsValid() {
|
|
||||||
return fmt.Errorf("Model_Comp: AddLists(nil)")
|
|
||||||
}
|
|
||||||
if vof.Kind() != reflect.Map {
|
|
||||||
return fmt.Errorf("Model_Comp: AddLists(non-pointer %T)", data)
|
|
||||||
}
|
|
||||||
listskeys := make(map[string]string)
|
|
||||||
keys := vof.MapKeys()
|
|
||||||
lists := make([]interface{}, 0, len(keys))
|
|
||||||
pipe := this.Redis.RedisPipe(context.TODO())
|
|
||||||
for _, k := range keys {
|
|
||||||
value := vof.MapIndex(k)
|
|
||||||
keydata := k.Interface().(string)
|
|
||||||
valuedata := value.Interface()
|
|
||||||
key := this.ukeylist(uid, keydata)
|
|
||||||
pipe.HMSet(key, valuedata)
|
|
||||||
listskeys[keydata] = key
|
|
||||||
lists = append(lists, valuedata)
|
|
||||||
}
|
|
||||||
pipe.HMSetForMap(this.ukey(uid), listskeys)
|
|
||||||
if _, err = pipe.Exec(); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
option := newDBOption(opt...)
|
|
||||||
if option.IsMgoLog {
|
|
||||||
err = this.InsertModelLogs(this.TableName, uid, lists)
|
|
||||||
}
|
|
||||||
if this.Expired > 0 {
|
|
||||||
childs := make(map[string]struct{}, len(listskeys))
|
|
||||||
for _, v := range listskeys {
|
|
||||||
childs[v] = struct{}{}
|
|
||||||
}
|
|
||||||
UpDateModelExpired(this.ukey(uid), childs, this.Expired)
|
|
||||||
// this.Redis.Expire(this.ukey(uid), option.Expire)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//添加队列
|
|
||||||
func (this *DBModel) AddQueues(key string, uplimit int64, data interface{}) (outkey []string, err error) {
|
|
||||||
//defer log.Debug("DBModel AddQueues", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "key", Value: key}, log.Field{Key: "data", Value: data})
|
|
||||||
vof := reflect.ValueOf(data)
|
|
||||||
if !vof.IsValid() {
|
|
||||||
err = fmt.Errorf("Model_Comp: AddLists(nil)")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if vof.Kind() != reflect.Map {
|
|
||||||
err = fmt.Errorf("Model_Comp: AddLists(non-pointer %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
keys := make([]string, 0)
|
|
||||||
pipe := this.Redis.RedisPipe(context.TODO())
|
|
||||||
for _, k := range vof.MapKeys() {
|
|
||||||
value := vof.MapIndex(k)
|
|
||||||
tkey := k.Interface().(string)
|
|
||||||
valuedata := value.Interface()
|
|
||||||
pipe.HMSet(tkey, valuedata)
|
|
||||||
keys = append(keys, tkey)
|
|
||||||
}
|
|
||||||
pipe.RPushForStringSlice(key, keys...)
|
|
||||||
lcmd := pipe.Llen(key)
|
|
||||||
|
|
||||||
if _, err = pipe.Exec(); err == nil {
|
|
||||||
if lcmd.Val() > uplimit*3 { //操作3倍上限移除多余数据
|
|
||||||
off := uplimit - lcmd.Val()
|
|
||||||
if outkey, err = this.Redis.LRangeToStringSlice(key, 0, int(off-1)).Result(); err != nil {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
pipe.Ltrim(key, int(off), -1)
|
for k, _ := range childs {
|
||||||
for _, v := range outkey {
|
exp.keys[k] = struct{}{}
|
||||||
pipe.Delete(v)
|
|
||||||
}
|
|
||||||
_, err = pipe.Exec()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//修改数据多个字段 uid 作为主键
|
|
||||||
func (this *DBModel) Change(uid string, data map[string]interface{}, opt ...DBOption) (err error) {
|
|
||||||
//defer log.Debug("DBModel Change", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data})
|
|
||||||
if err = this.Redis.HMSet(this.ukey(uid), data); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
option := newDBOption(opt...)
|
|
||||||
if option.IsMgoLog {
|
|
||||||
err = this.UpdateModelLogs(this.TableName, uid, bson.M{"uid": uid}, data)
|
|
||||||
}
|
|
||||||
if this.Expired > 0 {
|
|
||||||
UpDateModelExpired(this.ukey(uid), nil, this.Expired)
|
|
||||||
// this.Redis.Expire(this.ukey(uid), option.Expire)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
//修改数据多个字段 uid 作为主键
|
|
||||||
func (this *DBModel) ChangeList(uid string, _id string, data map[string]interface{}, opt ...DBOption) (err error) {
|
|
||||||
//defer log.Debug("DBModel ChangeList", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "_id", Value: _id}, log.Field{Key: "data", Value: data})
|
|
||||||
if err = this.Redis.HMSet(this.ukeylist(uid, _id), data); err != nil {
|
|
||||||
log.Error("DBModel ChangeList", log.Field{Key: "err", Value: err})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
option := newDBOption(opt...)
|
|
||||||
if option.IsMgoLog {
|
|
||||||
err = this.UpdateModelLogs(this.TableName, uid, bson.M{"_id": _id, "uid": uid}, data)
|
|
||||||
}
|
|
||||||
if this.Expired > 0 {
|
|
||||||
UpDateModelExpired(this.ukey(uid), nil, this.Expired)
|
|
||||||
// this.Redis.Expire(this.ukey(uid), option.Expire)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
//读取全部数据
|
|
||||||
func (this *DBModel) Get(uid string, data interface{}, opt ...DBOption) (err error) {
|
|
||||||
//defer log.Debug("DBModel Get", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data})
|
|
||||||
if err = this.Redis.HGetAll(this.ukey(uid), data); err != nil && err != lgredis.RedisNil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err == lgredis.RedisNil {
|
|
||||||
if err = this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"uid": uid}).Decode(data); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
err = this.Redis.HMSet(this.ukey(uid), data)
|
|
||||||
}
|
|
||||||
// option := newDBOption(opt...)
|
|
||||||
if this.Expired > 0 {
|
|
||||||
// this.Redis.Expire(this.ukey(uid), option.Expire)
|
|
||||||
UpDateModelExpired(this.ukey(uid), nil, this.Expired)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//读取多个数据对象
|
|
||||||
func (this *DBModel) Gets(ids []string, data interface{}, opt ...DBOption) (err error) {
|
|
||||||
//defer log.Debug("DBModel GetListObjs", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "ids", Value: ids}, log.Field{Key: "data", Value: data})
|
|
||||||
var (
|
|
||||||
dtype reflect2.Type
|
|
||||||
dkind reflect.Kind
|
|
||||||
sType reflect2.Type
|
|
||||||
sliceType *reflect2.UnsafeSliceType
|
|
||||||
sliceelemType reflect2.Type
|
|
||||||
decoder codecore.IDecoderMapJson
|
|
||||||
encoder codecore.IEncoderMapJson
|
|
||||||
dptr unsafe.Pointer
|
|
||||||
elemPtr unsafe.Pointer
|
|
||||||
n int
|
|
||||||
ok bool
|
|
||||||
keys map[string]string = make(map[string]string)
|
|
||||||
tempdata map[string]string
|
|
||||||
onfound []string = make([]string, 0, len(ids))
|
|
||||||
pipe *pipe.RedisPipe = this.Redis.RedisPipe(context.TODO())
|
|
||||||
result []*redis.StringStringMapCmd = make([]*redis.StringStringMapCmd, len(ids))
|
|
||||||
c *mongo.Cursor
|
|
||||||
)
|
|
||||||
dptr = reflect2.PtrOf(data)
|
|
||||||
dtype = reflect2.TypeOf(data)
|
|
||||||
dkind = dtype.Kind()
|
|
||||||
if dkind != reflect.Ptr {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sType = dtype.(*reflect2.UnsafePtrType).Elem()
|
|
||||||
if sType.Kind() != reflect.Slice {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sliceType = sType.(*reflect2.UnsafeSliceType)
|
|
||||||
sliceelemType = sliceType.Elem()
|
|
||||||
if sliceelemType.Kind() != reflect.Ptr {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem()
|
|
||||||
for i, v := range ids {
|
|
||||||
result[i] = pipe.HGetAllToMapString(this.ukey(v))
|
|
||||||
}
|
|
||||||
if _, err = pipe.Exec(); err == nil {
|
|
||||||
for i, v := range result {
|
|
||||||
if tempdata, err = v.Result(); err == nil && len(tempdata) > 0 {
|
|
||||||
sliceType.UnsafeGrow(dptr, n+1)
|
|
||||||
elemPtr = sliceType.UnsafeGetIndex(dptr, n)
|
|
||||||
if *((*unsafe.Pointer)(elemPtr)) == nil {
|
|
||||||
newPtr := sliceelemType.UnsafeNew()
|
|
||||||
if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil {
|
|
||||||
log.Errorf("err:%v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
*((*unsafe.Pointer)(elemPtr)) = newPtr
|
|
||||||
} else {
|
|
||||||
decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata)
|
|
||||||
}
|
|
||||||
n++
|
|
||||||
} else {
|
|
||||||
onfound = append(onfound, ids[i])
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
exp.expired = time.Now().Add(expired)
|
||||||
} else {
|
} else {
|
||||||
onfound = ids
|
exp = &ModelDataExpired{
|
||||||
}
|
key: key,
|
||||||
|
keys: childs,
|
||||||
if len(onfound) > 0 {
|
expired: time.Now().Add(expired),
|
||||||
if c, err = this.DB.Find(core.SqlTable(this.TableName), bson.M{"_id": bson.M{"$in": onfound}}); err != nil {
|
|
||||||
return err
|
|
||||||
} else {
|
|
||||||
if encoder, ok = codec.EncoderOf(sliceelemType, defconf).(codecore.IEncoderMapJson); !ok {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(data not support UnMarshalMapJson %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pipe := this.Redis.RedisPipe(context.TODO())
|
|
||||||
for c.Next(context.Background()) {
|
|
||||||
_id := c.Current.Lookup("_id").StringValue()
|
|
||||||
sliceType.UnsafeGrow(dptr, n+1)
|
|
||||||
elemPtr = sliceType.UnsafeGetIndex(dptr, n)
|
|
||||||
if *((*unsafe.Pointer)(elemPtr)) == nil {
|
|
||||||
newPtr := sliceelemType.UnsafeNew()
|
|
||||||
*((*unsafe.Pointer)(elemPtr)) = newPtr
|
|
||||||
}
|
|
||||||
elem := sliceType.GetIndex(data, n)
|
|
||||||
if err = c.Decode(elem); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if tempdata, err = encoder.EncodeToMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetWriter()); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
key := this.ukey(_id)
|
|
||||||
pipe.HMSetForMap(key, tempdata)
|
|
||||||
keys[_id] = key
|
|
||||||
n++
|
|
||||||
}
|
|
||||||
if len(keys) > 0 {
|
|
||||||
_, err = pipe.Exec()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
this.mu.Lock()
|
||||||
|
this.data[key] = exp
|
||||||
|
this.mu.Unlock()
|
||||||
}
|
}
|
||||||
if this.Expired > 0 {
|
|
||||||
for _, v := range ids {
|
|
||||||
UpDateModelExpired(this.ukey(v), nil, this.Expired)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//获取列表数据 注意 data 必须是 切片的指针 *[]type
|
//定时清理过期数据
|
||||||
func (this *DBModel) GetList(uid string, data interface{}) (err error) {
|
func (this *DBConn) run() {
|
||||||
//defer log.Debug("DBModel GetList", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data})
|
timer := time.NewTicker(time.Minute * 1)
|
||||||
var (
|
defer timer.Stop()
|
||||||
dtype reflect2.Type
|
for {
|
||||||
dkind reflect.Kind
|
select {
|
||||||
sType reflect2.Type
|
case <-timer.C:
|
||||||
sliceType *reflect2.UnsafeSliceType
|
this.scanning()
|
||||||
sliceelemType reflect2.Type
|
break
|
||||||
decoder codecore.IDecoderMapJson
|
|
||||||
encoder codecore.IEncoderMapJson
|
|
||||||
dptr unsafe.Pointer
|
|
||||||
elemPtr unsafe.Pointer
|
|
||||||
n int
|
|
||||||
ok bool
|
|
||||||
keys map[string]string
|
|
||||||
tempdata map[string]string
|
|
||||||
result []*redis.StringStringMapCmd
|
|
||||||
c *mongo.Cursor
|
|
||||||
)
|
|
||||||
keys = make(map[string]string)
|
|
||||||
dptr = reflect2.PtrOf(data)
|
|
||||||
dtype = reflect2.TypeOf(data)
|
|
||||||
dkind = dtype.Kind()
|
|
||||||
if dkind != reflect.Ptr {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sType = dtype.(*reflect2.UnsafePtrType).Elem()
|
|
||||||
if sType.Kind() != reflect.Slice {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sliceType = sType.(*reflect2.UnsafeSliceType)
|
|
||||||
sliceelemType = sliceType.Elem()
|
|
||||||
if sliceelemType.Kind() != reflect.Ptr {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem()
|
|
||||||
pipe := this.Redis.RedisPipe(context.TODO())
|
|
||||||
if keys, err = this.Redis.HGetAllToMapString(this.ukey(uid)); err == nil {
|
|
||||||
result = make([]*redis.StringStringMapCmd, 0)
|
|
||||||
for _, v := range keys {
|
|
||||||
cmd := pipe.HGetAllToMapString(v)
|
|
||||||
result = append(result, cmd)
|
|
||||||
}
|
|
||||||
pipe.Exec()
|
|
||||||
for _, v := range result {
|
|
||||||
tempdata, err = v.Result()
|
|
||||||
sliceType.UnsafeGrow(dptr, n+1)
|
|
||||||
elemPtr = sliceType.UnsafeGetIndex(dptr, n)
|
|
||||||
if *((*unsafe.Pointer)(elemPtr)) == nil {
|
|
||||||
newPtr := sliceelemType.UnsafeNew()
|
|
||||||
if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil {
|
|
||||||
log.Errorf("err:%v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
*((*unsafe.Pointer)(elemPtr)) = newPtr
|
|
||||||
} else {
|
|
||||||
decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata)
|
|
||||||
}
|
|
||||||
n++
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err == lgredis.RedisNil {
|
|
||||||
var f = bson.M{}
|
|
||||||
if uid != "" {
|
|
||||||
f = bson.M{"uid": uid}
|
|
||||||
}
|
|
||||||
//query from mgo
|
|
||||||
if c, err = this.DB.Find(core.SqlTable(this.TableName), f); err != nil {
|
|
||||||
return err
|
|
||||||
} else {
|
|
||||||
if encoder, ok = codec.EncoderOf(sliceelemType, defconf).(codecore.IEncoderMapJson); !ok {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(data not support UnMarshalMapJson %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
n = 0
|
|
||||||
for c.Next(context.Background()) {
|
|
||||||
_id := c.Current.Lookup("_id").StringValue()
|
|
||||||
sliceType.UnsafeGrow(dptr, n+1)
|
|
||||||
elemPtr = sliceType.UnsafeGetIndex(dptr, n)
|
|
||||||
if *((*unsafe.Pointer)(elemPtr)) == nil {
|
|
||||||
newPtr := sliceelemType.UnsafeNew()
|
|
||||||
*((*unsafe.Pointer)(elemPtr)) = newPtr
|
|
||||||
}
|
|
||||||
elem := sliceType.GetIndex(data, n)
|
|
||||||
if err = c.Decode(elem); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if tempdata, err = encoder.EncodeToMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetWriter()); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
key := this.ukeylist(uid, _id)
|
|
||||||
pipe.HMSetForMap(key, tempdata)
|
|
||||||
keys[_id] = key
|
|
||||||
n++
|
|
||||||
}
|
|
||||||
if len(keys) > 0 {
|
|
||||||
pipe.HMSetForMap(this.ukey(uid), keys)
|
|
||||||
_, err = pipe.Exec()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if this.Expired > 0 {
|
|
||||||
childs := make(map[string]struct{}, len(keys))
|
|
||||||
for _, v := range keys {
|
|
||||||
childs[v] = struct{}{}
|
|
||||||
}
|
|
||||||
UpDateModelExpired(this.ukey(uid), childs, this.Expired)
|
|
||||||
// this.Redis.Expire(this.ukey(uid), option.Expire)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//获取队列数据
|
//扫描过期
|
||||||
func (this *DBModel) GetQueues(key string, count int, data interface{}) (err error) {
|
func (this *DBConn) scanning() {
|
||||||
//defer log.Debug("DBModel GetQueues", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "key", Value: key}, log.Field{Key: "data", Value: data})
|
now := time.Now()
|
||||||
var (
|
this.mu.Lock()
|
||||||
dtype reflect2.Type
|
temp := make([]*ModelDataExpired, 0, len(this.data))
|
||||||
dkind reflect.Kind
|
for k, v := range this.data {
|
||||||
sType reflect2.Type
|
if v.expired.Before(now) { //过期
|
||||||
sliceType *reflect2.UnsafeSliceType
|
temp = append(temp, v)
|
||||||
sliceelemType reflect2.Type
|
delete(this.data, k)
|
||||||
dptr unsafe.Pointer
|
|
||||||
elemPtr unsafe.Pointer
|
|
||||||
decoder codecore.IDecoderMapJson
|
|
||||||
ok bool
|
|
||||||
n int
|
|
||||||
keys = make([]string, 0)
|
|
||||||
result = make([]*redis.StringStringMapCmd, 0)
|
|
||||||
tempdata map[string]string
|
|
||||||
)
|
|
||||||
|
|
||||||
dptr = reflect2.PtrOf(data)
|
|
||||||
dtype = reflect2.TypeOf(data)
|
|
||||||
dkind = dtype.Kind()
|
|
||||||
if dkind != reflect.Ptr {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sType = dtype.(*reflect2.UnsafePtrType).Elem()
|
|
||||||
if sType.Kind() != reflect.Slice {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sliceType = sType.(*reflect2.UnsafeSliceType)
|
|
||||||
sliceelemType = sliceType.Elem()
|
|
||||||
if sliceelemType.Kind() != reflect.Ptr {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem()
|
|
||||||
|
|
||||||
pipe := this.Redis.RedisPipe(context.TODO())
|
|
||||||
if keys, err = this.Redis.LRangeToStringSlice(key, -1*count, -1).Result(); err == nil {
|
|
||||||
result = make([]*redis.StringStringMapCmd, 0)
|
|
||||||
for _, v := range keys {
|
|
||||||
cmd := pipe.HGetAllToMapString(v)
|
|
||||||
result = append(result, cmd)
|
|
||||||
}
|
|
||||||
pipe.Exec()
|
|
||||||
for _, v := range result {
|
|
||||||
tempdata, err = v.Result()
|
|
||||||
sliceType.UnsafeGrow(dptr, n+1)
|
|
||||||
elemPtr = sliceType.UnsafeGetIndex(dptr, n)
|
|
||||||
if *((*unsafe.Pointer)(elemPtr)) == nil {
|
|
||||||
newPtr := sliceelemType.UnsafeNew()
|
|
||||||
if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil {
|
|
||||||
log.Errorf("err:%v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
*((*unsafe.Pointer)(elemPtr)) = newPtr
|
|
||||||
} else {
|
|
||||||
decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata)
|
|
||||||
}
|
|
||||||
n++
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
this.mu.Unlock()
|
||||||
}
|
ctx := context.Background()
|
||||||
|
pipe := this.Redis.Pipeline()
|
||||||
//读取单个数据中 多个字段数据
|
for _, v := range temp {
|
||||||
func (this *DBModel) GetFields(uid string, data interface{}, fields ...string) (err error) {
|
pipe.Del(ctx, v.key)
|
||||||
//defer log.Debug("DBModel GetFields", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "data", Value: data})
|
if v.keys != nil {
|
||||||
if err = this.Redis.HMGet(this.ukey(uid), data, fields...); err != nil && err != lgredis.RedisNil {
|
for k1, _ := range v.keys {
|
||||||
return
|
pipe.Del(ctx, k1)
|
||||||
}
|
|
||||||
if err == lgredis.RedisNil {
|
|
||||||
if err = this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"uid": uid}).Decode(data); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err = this.Redis.HMSet(this.ukey(uid), data); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if this.Expired > 0 {
|
|
||||||
UpDateModelExpired(this.ukey(uid), nil, this.Expired)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//读取List列表中单个数据中 多个字段数据
|
|
||||||
func (this *DBModel) GetListFields(uid string, id string, data interface{}, fields ...string) (err error) {
|
|
||||||
//defer log.Debug("DBModel GetListFields", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "id", Value: id}, log.Field{Key: "data", Value: data})
|
|
||||||
var (
|
|
||||||
keys map[string]string
|
|
||||||
tempdata map[string]string
|
|
||||||
)
|
|
||||||
if err = this.Redis.HMGet(this.ukeylist(uid, id), data, fields...); err != nil && err != lgredis.RedisNil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err == lgredis.RedisNil {
|
|
||||||
if err = this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"_id": id}).Decode(data); err != nil {
|
|
||||||
return err
|
|
||||||
} else {
|
|
||||||
pipe := this.Redis.RedisPipe(context.TODO())
|
|
||||||
key := this.ukeylist(uid, id)
|
|
||||||
pipe.HMSetForMap(key, tempdata)
|
|
||||||
keys[id] = key
|
|
||||||
pipe.HMSetForMap(this.ukey(uid), keys)
|
|
||||||
if _, err = pipe.Exec(); err != nil {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if this.Expired > 0 {
|
if _, err := pipe.Exec(ctx); err != nil {
|
||||||
childs := make(map[string]struct{}, len(keys))
|
this.log.Errorln(err)
|
||||||
for _, v := range keys {
|
|
||||||
childs[v] = struct{}{}
|
|
||||||
}
|
|
||||||
UpDateModelExpired(this.ukey(uid), childs, this.Expired)
|
|
||||||
}
|
}
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//读取列表数据中单个数据
|
|
||||||
func (this *DBModel) GetListObj(uid string, id string, data interface{}) (err error) {
|
|
||||||
//defer log.Debug("DBModel GetListObj", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "id", Value: id}, log.Field{Key: "data", Value: data})
|
|
||||||
var (
|
|
||||||
keys map[string]string
|
|
||||||
)
|
|
||||||
keys = make(map[string]string)
|
|
||||||
if err = this.Redis.HGetAll(this.ukeylist(uid, id), data); err != nil && err != lgredis.RedisNil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err == lgredis.RedisNil {
|
|
||||||
if err = this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"_id": id}).Decode(data); err != nil {
|
|
||||||
return err
|
|
||||||
} else {
|
|
||||||
pipe := this.Redis.RedisPipe(context.TODO())
|
|
||||||
key := this.ukeylist(uid, id)
|
|
||||||
pipe.HMSet(key, data)
|
|
||||||
keys[id] = key
|
|
||||||
pipe.HMSetForMap(this.ukey(uid), keys)
|
|
||||||
if _, err = pipe.Exec(); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if this.Expired > 0 {
|
|
||||||
childs := make(map[string]struct{}, len(keys))
|
|
||||||
for _, v := range keys {
|
|
||||||
childs[v] = struct{}{}
|
|
||||||
}
|
|
||||||
UpDateModelExpired(this.ukey(uid), childs, this.Expired)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//读取列表数据中单个数据
|
|
||||||
func (this *DBModel) GetListObjs(uid string, ids []string, data interface{}) (err error) {
|
|
||||||
//defer log.Debug("DBModel GetListObjs", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "ids", Value: ids}, log.Field{Key: "data", Value: data})
|
|
||||||
var (
|
|
||||||
dtype reflect2.Type
|
|
||||||
dkind reflect.Kind
|
|
||||||
sType reflect2.Type
|
|
||||||
sliceType *reflect2.UnsafeSliceType
|
|
||||||
sliceelemType reflect2.Type
|
|
||||||
decoder codecore.IDecoderMapJson
|
|
||||||
encoder codecore.IEncoderMapJson
|
|
||||||
dptr unsafe.Pointer
|
|
||||||
elemPtr unsafe.Pointer
|
|
||||||
n int
|
|
||||||
ok bool
|
|
||||||
keys map[string]string = make(map[string]string)
|
|
||||||
tempdata map[string]string
|
|
||||||
onfound []string = make([]string, 0, len(ids))
|
|
||||||
pipe *pipe.RedisPipe = this.Redis.RedisPipe(context.TODO())
|
|
||||||
result []*redis.StringStringMapCmd = make([]*redis.StringStringMapCmd, len(ids))
|
|
||||||
c *mongo.Cursor
|
|
||||||
)
|
|
||||||
dptr = reflect2.PtrOf(data)
|
|
||||||
dtype = reflect2.TypeOf(data)
|
|
||||||
dkind = dtype.Kind()
|
|
||||||
if dkind != reflect.Ptr {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sType = dtype.(*reflect2.UnsafePtrType).Elem()
|
|
||||||
if sType.Kind() != reflect.Slice {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sliceType = sType.(*reflect2.UnsafeSliceType)
|
|
||||||
sliceelemType = sliceType.Elem()
|
|
||||||
if sliceelemType.Kind() != reflect.Ptr {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem()
|
|
||||||
for i, v := range ids {
|
|
||||||
result[i] = pipe.HGetAllToMapString(this.ukeylist(uid, v))
|
|
||||||
}
|
|
||||||
if _, err = pipe.Exec(); err == nil {
|
|
||||||
for i, v := range result {
|
|
||||||
if tempdata, err = v.Result(); err == nil {
|
|
||||||
sliceType.UnsafeGrow(dptr, n+1)
|
|
||||||
elemPtr = sliceType.UnsafeGetIndex(dptr, n)
|
|
||||||
if *((*unsafe.Pointer)(elemPtr)) == nil {
|
|
||||||
newPtr := sliceelemType.UnsafeNew()
|
|
||||||
if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil {
|
|
||||||
log.Errorf("err:%v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
*((*unsafe.Pointer)(elemPtr)) = newPtr
|
|
||||||
} else {
|
|
||||||
decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata)
|
|
||||||
}
|
|
||||||
n++
|
|
||||||
} else {
|
|
||||||
onfound = append(onfound, ids[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
onfound = ids
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(onfound) > 0 {
|
|
||||||
if c, err = this.DB.Find(core.SqlTable(this.TableName), bson.M{"_id": bson.M{"$in": onfound}}); err != nil {
|
|
||||||
return err
|
|
||||||
} else {
|
|
||||||
if encoder, ok = codec.EncoderOf(sliceelemType, defconf).(codecore.IEncoderMapJson); !ok {
|
|
||||||
err = fmt.Errorf("MCompModel: GetList(data not support UnMarshalMapJson %T)", data)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pipe := this.Redis.RedisPipe(context.TODO())
|
|
||||||
for c.Next(context.Background()) {
|
|
||||||
_id := c.Current.Lookup("_id").StringValue()
|
|
||||||
sliceType.UnsafeGrow(dptr, n+1)
|
|
||||||
elemPtr = sliceType.UnsafeGetIndex(dptr, n)
|
|
||||||
if *((*unsafe.Pointer)(elemPtr)) == nil {
|
|
||||||
newPtr := sliceelemType.UnsafeNew()
|
|
||||||
*((*unsafe.Pointer)(elemPtr)) = newPtr
|
|
||||||
}
|
|
||||||
elem := sliceType.GetIndex(data, n)
|
|
||||||
if err = c.Decode(elem); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if tempdata, err = encoder.EncodeToMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetWriter()); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for i, v := range onfound {
|
|
||||||
if v == _id {
|
|
||||||
onfound = append(onfound[0:i], onfound[i+1:]...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
key := this.ukeylist(uid, _id)
|
|
||||||
pipe.HMSetForMap(key, tempdata)
|
|
||||||
keys[_id] = key
|
|
||||||
n++
|
|
||||||
}
|
|
||||||
if len(keys) > 0 {
|
|
||||||
pipe.HMSetForMap(this.ukey(uid), keys)
|
|
||||||
_, err = pipe.Exec()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(onfound) > 0 {
|
|
||||||
err = fmt.Errorf("onfound:%v data!", onfound)
|
|
||||||
}
|
|
||||||
if this.Expired > 0 {
|
|
||||||
childs := make(map[string]struct{}, len(keys))
|
|
||||||
for _, v := range keys {
|
|
||||||
childs[v] = struct{}{}
|
|
||||||
}
|
|
||||||
UpDateModelExpired(this.ukey(uid), childs, this.Expired)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//删除目标数据
|
|
||||||
func (this *DBModel) Del(id string, opt ...DBOption) (err error) {
|
|
||||||
//defer log.Debug("DBModel Del", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid})
|
|
||||||
err = this.Redis.Delete(this.ukey(id))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
option := newDBOption(opt...)
|
|
||||||
if option.IsMgoLog {
|
|
||||||
err = this.DeleteModelLogs(this.TableName, "", bson.M{"_id": id})
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
//删除用户数据
|
|
||||||
func (this *DBModel) DelByUId(uid string, opt ...DBOption) (err error) {
|
|
||||||
//defer log.Debug("DBModel Del", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid})
|
|
||||||
err = this.Redis.Delete(this.ukey(uid))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
option := newDBOption(opt...)
|
|
||||||
if option.IsMgoLog {
|
|
||||||
err = this.DeleteModelLogs(this.TableName, uid, bson.M{"uid": uid})
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
//删除多条数据
|
|
||||||
func (this *DBModel) DelListlds(uid string, ids ...string) (err error) {
|
|
||||||
//defer log.Debug("DBModel DelListlds", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "ids", Value: ids})
|
|
||||||
listkey := this.ukey(uid)
|
|
||||||
pipe := this.Redis.RedisPipe(context.TODO())
|
|
||||||
for _, v := range ids {
|
|
||||||
key := this.ukeylist(uid, v)
|
|
||||||
pipe.Delete(key)
|
|
||||||
}
|
|
||||||
pipe.HDel(listkey, ids...)
|
|
||||||
if _, err = pipe.Exec(); err == nil {
|
|
||||||
err = this.DeleteModelLogs(this.TableName, uid, bson.M{"_id": bson.M{"$in": ids}})
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
//批量删除数据
|
|
||||||
func (this *DBModel) BatchDelLists(uid string) (err error) {
|
|
||||||
//defer log.Debug("DBModel BatchDelLists", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid})
|
|
||||||
var keys map[string]string
|
|
||||||
pipe := this.Redis.RedisPipe(context.TODO())
|
|
||||||
if keys, err = this.Redis.HGetAllToMapString(this.ukey(uid)); err == nil {
|
|
||||||
for _, v := range keys {
|
|
||||||
pipe.Delete(v)
|
|
||||||
}
|
|
||||||
pipe.Delete(this.ukey(uid))
|
|
||||||
pipe.Exec()
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
884
sys/db/dbmodel.go
Normal file
884
sys/db/dbmodel.go
Normal file
@ -0,0 +1,884 @@
|
|||||||
|
package db
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"go_dreamfactory/comm"
|
||||||
|
"go_dreamfactory/lego/core"
|
||||||
|
"go_dreamfactory/lego/sys/log"
|
||||||
|
"go_dreamfactory/lego/sys/mgo"
|
||||||
|
lgredis "go_dreamfactory/lego/sys/redis"
|
||||||
|
"go_dreamfactory/lego/sys/redis/pipe"
|
||||||
|
"go_dreamfactory/lego/utils/codec"
|
||||||
|
"go_dreamfactory/lego/utils/codec/codecore"
|
||||||
|
"go_dreamfactory/lego/utils/codec/json"
|
||||||
|
"reflect"
|
||||||
|
"time"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis/v8"
|
||||||
|
|
||||||
|
"github.com/modern-go/reflect2"
|
||||||
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
)
|
||||||
|
|
||||||
|
var defconf = &codecore.Config{
|
||||||
|
SortMapKeys: true,
|
||||||
|
IndentionStep: 1,
|
||||||
|
OnlyTaggedField: false,
|
||||||
|
DisallowUnknownFields: false,
|
||||||
|
CaseSensitive: false,
|
||||||
|
TagKey: "json",
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
DB_ModelTable core.SqlTable = "model_log"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewDBModel(tableName string, expired time.Duration, conn *DBConn) *DBModel {
|
||||||
|
return &DBModel{
|
||||||
|
TableName: tableName,
|
||||||
|
Expired: expired,
|
||||||
|
conn: conn,
|
||||||
|
Redis: conn.Redis,
|
||||||
|
DB: conn.Mgo,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//DB模型
|
||||||
|
type DBModel struct {
|
||||||
|
TableName string
|
||||||
|
Expired time.Duration //过期时间
|
||||||
|
conn *DBConn //数据源
|
||||||
|
Redis lgredis.ISys
|
||||||
|
DB mgo.ISys
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *DBModel) ukey(uid string) string {
|
||||||
|
if uid == "" {
|
||||||
|
return fmt.Sprintf("%s:member", this.TableName)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%s:%s", this.TableName, uid)
|
||||||
|
}
|
||||||
|
func (this *DBModel) ukeylist(uid string, id string) string {
|
||||||
|
if uid == "" {
|
||||||
|
return fmt.Sprintf("%s:%s", this.TableName, id)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%s:%s-%s", this.TableName, uid, id)
|
||||||
|
}
|
||||||
|
func (this *DBModel) InsertModelLogs(table string, uID string, target interface{}) (err error) {
|
||||||
|
|
||||||
|
data := &comm.Autogenerated{
|
||||||
|
ID: primitive.NewObjectID().Hex(),
|
||||||
|
UID: uID,
|
||||||
|
Act: string(comm.LogHandleType_Insert),
|
||||||
|
}
|
||||||
|
data.D = append(data.D, table) // D[0]
|
||||||
|
data.D = append(data.D, target) // D[1]
|
||||||
|
|
||||||
|
_, err = this.DB.InsertOne(DB_ModelTable, data)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("insert model db err %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
func (this *DBModel) DeleteModelLogs(table string, uID string, where interface{}) (err error) {
|
||||||
|
|
||||||
|
data := &comm.Autogenerated{
|
||||||
|
ID: primitive.NewObjectID().Hex(),
|
||||||
|
UID: uID,
|
||||||
|
Act: string(comm.LogHandleType_Delete),
|
||||||
|
}
|
||||||
|
|
||||||
|
data.D = append(data.D, table) // D[0]
|
||||||
|
data.D = append(data.D, where) // D[1]
|
||||||
|
|
||||||
|
_, err = this.DB.InsertOne(DB_ModelTable, data)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("insert model db err %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *DBModel) UpdateModelLogs(table string, uID string, where bson.M, target interface{}) (err error) {
|
||||||
|
|
||||||
|
data := &comm.Autogenerated{
|
||||||
|
ID: primitive.NewObjectID().Hex(),
|
||||||
|
UID: uID,
|
||||||
|
Act: string(comm.LogHandleType_Update),
|
||||||
|
}
|
||||||
|
data.D = append(data.D, table) // D[0]
|
||||||
|
data.D = append(data.D, where) // D[1]
|
||||||
|
data.D = append(data.D, target) // D[2]
|
||||||
|
|
||||||
|
_, err = this.DB.InsertOne(DB_ModelTable, data)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("insert model db err %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
///创建锁对象
|
||||||
|
func (this *DBModel) NewRedisMutex(key string, outtime int) (result *lgredis.RedisMutex, err error) {
|
||||||
|
result, err = this.Redis.NewRedisMutex(key, lgredis.SetExpiry(outtime))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//添加新的数据
|
||||||
|
func (this *DBModel) Add(uid string, data interface{}, opt ...DBOption) (err error) {
|
||||||
|
//defer log.Debug("DBModel Add", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data})
|
||||||
|
if err = this.Redis.HMSet(this.ukey(uid), data); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
option := newDBOption(opt...)
|
||||||
|
if option.IsMgoLog {
|
||||||
|
err = this.InsertModelLogs(this.TableName, uid, []interface{}{data})
|
||||||
|
}
|
||||||
|
if this.Expired > 0 {
|
||||||
|
// err = this.Redis.Expire(this.ukey(uid), option.Expire)
|
||||||
|
this.conn.UpDateModelExpired(this.ukey(uid), nil, this.Expired)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//添加新的数据到列表
|
||||||
|
func (this *DBModel) AddList(uid string, id string, data interface{}, opt ...DBOption) (err error) {
|
||||||
|
//defer log.Debug("DBModel AddList", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "_id", Value: id}, log.Field{Key: "data", Value: data})
|
||||||
|
key := this.ukeylist(uid, id)
|
||||||
|
if err = this.Redis.HMSet(key, data); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = this.Redis.HSet(this.ukey(uid), id, key); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
option := newDBOption(opt...)
|
||||||
|
if option.IsMgoLog {
|
||||||
|
err = this.InsertModelLogs(this.TableName, uid, []interface{}{data})
|
||||||
|
}
|
||||||
|
if this.Expired > 0 {
|
||||||
|
// err = this.Redis.Expire(this.ukey(uid), option.Expire)
|
||||||
|
this.conn.UpDateModelExpired(this.ukey(uid), nil, this.Expired)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//添加新的多个数据到列表 data map[string]type
|
||||||
|
func (this *DBModel) AddLists(uid string, data interface{}, opt ...DBOption) (err error) {
|
||||||
|
//defer log.Debug("DBModel AddLists", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data})
|
||||||
|
vof := reflect.ValueOf(data)
|
||||||
|
if !vof.IsValid() {
|
||||||
|
return fmt.Errorf("Model_Comp: AddLists(nil)")
|
||||||
|
}
|
||||||
|
if vof.Kind() != reflect.Map {
|
||||||
|
return fmt.Errorf("Model_Comp: AddLists(non-pointer %T)", data)
|
||||||
|
}
|
||||||
|
listskeys := make(map[string]string)
|
||||||
|
keys := vof.MapKeys()
|
||||||
|
lists := make([]interface{}, 0, len(keys))
|
||||||
|
pipe := this.Redis.RedisPipe(context.TODO())
|
||||||
|
for _, k := range keys {
|
||||||
|
value := vof.MapIndex(k)
|
||||||
|
keydata := k.Interface().(string)
|
||||||
|
valuedata := value.Interface()
|
||||||
|
key := this.ukeylist(uid, keydata)
|
||||||
|
pipe.HMSet(key, valuedata)
|
||||||
|
listskeys[keydata] = key
|
||||||
|
lists = append(lists, valuedata)
|
||||||
|
}
|
||||||
|
pipe.HMSetForMap(this.ukey(uid), listskeys)
|
||||||
|
if _, err = pipe.Exec(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
option := newDBOption(opt...)
|
||||||
|
if option.IsMgoLog {
|
||||||
|
err = this.InsertModelLogs(this.TableName, uid, lists)
|
||||||
|
}
|
||||||
|
if this.Expired > 0 {
|
||||||
|
childs := make(map[string]struct{}, len(listskeys))
|
||||||
|
for _, v := range listskeys {
|
||||||
|
childs[v] = struct{}{}
|
||||||
|
}
|
||||||
|
this.conn.UpDateModelExpired(this.ukey(uid), childs, this.Expired)
|
||||||
|
// this.Redis.Expire(this.ukey(uid), option.Expire)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//添加队列
|
||||||
|
func (this *DBModel) AddQueues(key string, uplimit int64, data interface{}) (outkey []string, err error) {
|
||||||
|
//defer log.Debug("DBModel AddQueues", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "key", Value: key}, log.Field{Key: "data", Value: data})
|
||||||
|
vof := reflect.ValueOf(data)
|
||||||
|
if !vof.IsValid() {
|
||||||
|
err = fmt.Errorf("Model_Comp: AddLists(nil)")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if vof.Kind() != reflect.Map {
|
||||||
|
err = fmt.Errorf("Model_Comp: AddLists(non-pointer %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
keys := make([]string, 0)
|
||||||
|
pipe := this.Redis.RedisPipe(context.TODO())
|
||||||
|
for _, k := range vof.MapKeys() {
|
||||||
|
value := vof.MapIndex(k)
|
||||||
|
tkey := k.Interface().(string)
|
||||||
|
valuedata := value.Interface()
|
||||||
|
pipe.HMSet(tkey, valuedata)
|
||||||
|
keys = append(keys, tkey)
|
||||||
|
}
|
||||||
|
pipe.RPushForStringSlice(key, keys...)
|
||||||
|
lcmd := pipe.Llen(key)
|
||||||
|
|
||||||
|
if _, err = pipe.Exec(); err == nil {
|
||||||
|
if lcmd.Val() > uplimit*3 { //操作3倍上限移除多余数据
|
||||||
|
off := uplimit - lcmd.Val()
|
||||||
|
if outkey, err = this.Redis.LRangeToStringSlice(key, 0, int(off-1)).Result(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pipe.Ltrim(key, int(off), -1)
|
||||||
|
for _, v := range outkey {
|
||||||
|
pipe.Delete(v)
|
||||||
|
}
|
||||||
|
_, err = pipe.Exec()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//修改数据多个字段 uid 作为主键
|
||||||
|
func (this *DBModel) Change(uid string, data map[string]interface{}, opt ...DBOption) (err error) {
|
||||||
|
//defer log.Debug("DBModel Change", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data})
|
||||||
|
if err = this.Redis.HMSet(this.ukey(uid), data); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
option := newDBOption(opt...)
|
||||||
|
if option.IsMgoLog {
|
||||||
|
err = this.UpdateModelLogs(this.TableName, uid, bson.M{"uid": uid}, data)
|
||||||
|
}
|
||||||
|
if this.Expired > 0 {
|
||||||
|
this.conn.UpDateModelExpired(this.ukey(uid), nil, this.Expired)
|
||||||
|
// this.Redis.Expire(this.ukey(uid), option.Expire)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//修改数据多个字段 uid 作为主键
|
||||||
|
func (this *DBModel) ChangeList(uid string, _id string, data map[string]interface{}, opt ...DBOption) (err error) {
|
||||||
|
//defer log.Debug("DBModel ChangeList", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "_id", Value: _id}, log.Field{Key: "data", Value: data})
|
||||||
|
if err = this.Redis.HMSet(this.ukeylist(uid, _id), data); err != nil {
|
||||||
|
log.Error("DBModel ChangeList", log.Field{Key: "err", Value: err})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
option := newDBOption(opt...)
|
||||||
|
if option.IsMgoLog {
|
||||||
|
err = this.UpdateModelLogs(this.TableName, uid, bson.M{"_id": _id, "uid": uid}, data)
|
||||||
|
}
|
||||||
|
if this.Expired > 0 {
|
||||||
|
this.conn.UpDateModelExpired(this.ukey(uid), nil, this.Expired)
|
||||||
|
// this.Redis.Expire(this.ukey(uid), option.Expire)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//读取全部数据
|
||||||
|
func (this *DBModel) Get(uid string, data interface{}, opt ...DBOption) (err error) {
|
||||||
|
//defer log.Debug("DBModel Get", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data})
|
||||||
|
if err = this.Redis.HGetAll(this.ukey(uid), data); err != nil && err != lgredis.RedisNil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err == lgredis.RedisNil {
|
||||||
|
if err = this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"uid": uid}).Decode(data); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = this.Redis.HMSet(this.ukey(uid), data)
|
||||||
|
}
|
||||||
|
// option := newDBOption(opt...)
|
||||||
|
if this.Expired > 0 {
|
||||||
|
// this.Redis.Expire(this.ukey(uid), option.Expire)
|
||||||
|
this.conn.UpDateModelExpired(this.ukey(uid), nil, this.Expired)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//读取多个数据对象
|
||||||
|
func (this *DBModel) Gets(ids []string, data interface{}, opt ...DBOption) (err error) {
|
||||||
|
//defer log.Debug("DBModel GetListObjs", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "ids", Value: ids}, log.Field{Key: "data", Value: data})
|
||||||
|
var (
|
||||||
|
dtype reflect2.Type
|
||||||
|
dkind reflect.Kind
|
||||||
|
sType reflect2.Type
|
||||||
|
sliceType *reflect2.UnsafeSliceType
|
||||||
|
sliceelemType reflect2.Type
|
||||||
|
decoder codecore.IDecoderMapJson
|
||||||
|
encoder codecore.IEncoderMapJson
|
||||||
|
dptr unsafe.Pointer
|
||||||
|
elemPtr unsafe.Pointer
|
||||||
|
n int
|
||||||
|
ok bool
|
||||||
|
keys map[string]string = make(map[string]string)
|
||||||
|
tempdata map[string]string
|
||||||
|
onfound []string = make([]string, 0, len(ids))
|
||||||
|
pipe *pipe.RedisPipe = this.Redis.RedisPipe(context.TODO())
|
||||||
|
result []*redis.StringStringMapCmd = make([]*redis.StringStringMapCmd, len(ids))
|
||||||
|
c *mongo.Cursor
|
||||||
|
)
|
||||||
|
dptr = reflect2.PtrOf(data)
|
||||||
|
dtype = reflect2.TypeOf(data)
|
||||||
|
dkind = dtype.Kind()
|
||||||
|
if dkind != reflect.Ptr {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sType = dtype.(*reflect2.UnsafePtrType).Elem()
|
||||||
|
if sType.Kind() != reflect.Slice {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sliceType = sType.(*reflect2.UnsafeSliceType)
|
||||||
|
sliceelemType = sliceType.Elem()
|
||||||
|
if sliceelemType.Kind() != reflect.Ptr {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem()
|
||||||
|
for i, v := range ids {
|
||||||
|
result[i] = pipe.HGetAllToMapString(this.ukey(v))
|
||||||
|
}
|
||||||
|
if _, err = pipe.Exec(); err == nil {
|
||||||
|
for i, v := range result {
|
||||||
|
if tempdata, err = v.Result(); err == nil && len(tempdata) > 0 {
|
||||||
|
sliceType.UnsafeGrow(dptr, n+1)
|
||||||
|
elemPtr = sliceType.UnsafeGetIndex(dptr, n)
|
||||||
|
if *((*unsafe.Pointer)(elemPtr)) == nil {
|
||||||
|
newPtr := sliceelemType.UnsafeNew()
|
||||||
|
if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil {
|
||||||
|
log.Errorf("err:%v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
*((*unsafe.Pointer)(elemPtr)) = newPtr
|
||||||
|
} else {
|
||||||
|
decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata)
|
||||||
|
}
|
||||||
|
n++
|
||||||
|
} else {
|
||||||
|
onfound = append(onfound, ids[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
onfound = ids
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(onfound) > 0 {
|
||||||
|
if c, err = this.DB.Find(core.SqlTable(this.TableName), bson.M{"_id": bson.M{"$in": onfound}}); err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
if encoder, ok = codec.EncoderOf(sliceelemType, defconf).(codecore.IEncoderMapJson); !ok {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(data not support UnMarshalMapJson %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pipe := this.Redis.RedisPipe(context.TODO())
|
||||||
|
for c.Next(context.Background()) {
|
||||||
|
_id := c.Current.Lookup("_id").StringValue()
|
||||||
|
sliceType.UnsafeGrow(dptr, n+1)
|
||||||
|
elemPtr = sliceType.UnsafeGetIndex(dptr, n)
|
||||||
|
if *((*unsafe.Pointer)(elemPtr)) == nil {
|
||||||
|
newPtr := sliceelemType.UnsafeNew()
|
||||||
|
*((*unsafe.Pointer)(elemPtr)) = newPtr
|
||||||
|
}
|
||||||
|
elem := sliceType.GetIndex(data, n)
|
||||||
|
if err = c.Decode(elem); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if tempdata, err = encoder.EncodeToMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetWriter()); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key := this.ukey(_id)
|
||||||
|
pipe.HMSetForMap(key, tempdata)
|
||||||
|
keys[_id] = key
|
||||||
|
n++
|
||||||
|
}
|
||||||
|
if len(keys) > 0 {
|
||||||
|
_, err = pipe.Exec()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if this.Expired > 0 {
|
||||||
|
for _, v := range ids {
|
||||||
|
this.conn.UpDateModelExpired(this.ukey(v), nil, this.Expired)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//获取列表数据 注意 data 必须是 切片的指针 *[]type
|
||||||
|
func (this *DBModel) GetList(uid string, data interface{}) (err error) {
|
||||||
|
//defer log.Debug("DBModel GetList", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "data", Value: data})
|
||||||
|
var (
|
||||||
|
dtype reflect2.Type
|
||||||
|
dkind reflect.Kind
|
||||||
|
sType reflect2.Type
|
||||||
|
sliceType *reflect2.UnsafeSliceType
|
||||||
|
sliceelemType reflect2.Type
|
||||||
|
decoder codecore.IDecoderMapJson
|
||||||
|
encoder codecore.IEncoderMapJson
|
||||||
|
dptr unsafe.Pointer
|
||||||
|
elemPtr unsafe.Pointer
|
||||||
|
n int
|
||||||
|
ok bool
|
||||||
|
keys map[string]string
|
||||||
|
tempdata map[string]string
|
||||||
|
result []*redis.StringStringMapCmd
|
||||||
|
c *mongo.Cursor
|
||||||
|
)
|
||||||
|
keys = make(map[string]string)
|
||||||
|
dptr = reflect2.PtrOf(data)
|
||||||
|
dtype = reflect2.TypeOf(data)
|
||||||
|
dkind = dtype.Kind()
|
||||||
|
if dkind != reflect.Ptr {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sType = dtype.(*reflect2.UnsafePtrType).Elem()
|
||||||
|
if sType.Kind() != reflect.Slice {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sliceType = sType.(*reflect2.UnsafeSliceType)
|
||||||
|
sliceelemType = sliceType.Elem()
|
||||||
|
if sliceelemType.Kind() != reflect.Ptr {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem()
|
||||||
|
pipe := this.Redis.RedisPipe(context.TODO())
|
||||||
|
if keys, err = this.Redis.HGetAllToMapString(this.ukey(uid)); err == nil {
|
||||||
|
result = make([]*redis.StringStringMapCmd, 0)
|
||||||
|
for _, v := range keys {
|
||||||
|
cmd := pipe.HGetAllToMapString(v)
|
||||||
|
result = append(result, cmd)
|
||||||
|
}
|
||||||
|
pipe.Exec()
|
||||||
|
for _, v := range result {
|
||||||
|
tempdata, err = v.Result()
|
||||||
|
sliceType.UnsafeGrow(dptr, n+1)
|
||||||
|
elemPtr = sliceType.UnsafeGetIndex(dptr, n)
|
||||||
|
if *((*unsafe.Pointer)(elemPtr)) == nil {
|
||||||
|
newPtr := sliceelemType.UnsafeNew()
|
||||||
|
if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil {
|
||||||
|
log.Errorf("err:%v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
*((*unsafe.Pointer)(elemPtr)) = newPtr
|
||||||
|
} else {
|
||||||
|
decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata)
|
||||||
|
}
|
||||||
|
n++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err == lgredis.RedisNil {
|
||||||
|
var f = bson.M{}
|
||||||
|
if uid != "" {
|
||||||
|
f = bson.M{"uid": uid}
|
||||||
|
}
|
||||||
|
//query from mgo
|
||||||
|
if c, err = this.DB.Find(core.SqlTable(this.TableName), f); err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
if encoder, ok = codec.EncoderOf(sliceelemType, defconf).(codecore.IEncoderMapJson); !ok {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(data not support UnMarshalMapJson %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
n = 0
|
||||||
|
for c.Next(context.Background()) {
|
||||||
|
_id := c.Current.Lookup("_id").StringValue()
|
||||||
|
sliceType.UnsafeGrow(dptr, n+1)
|
||||||
|
elemPtr = sliceType.UnsafeGetIndex(dptr, n)
|
||||||
|
if *((*unsafe.Pointer)(elemPtr)) == nil {
|
||||||
|
newPtr := sliceelemType.UnsafeNew()
|
||||||
|
*((*unsafe.Pointer)(elemPtr)) = newPtr
|
||||||
|
}
|
||||||
|
elem := sliceType.GetIndex(data, n)
|
||||||
|
if err = c.Decode(elem); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if tempdata, err = encoder.EncodeToMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetWriter()); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key := this.ukeylist(uid, _id)
|
||||||
|
pipe.HMSetForMap(key, tempdata)
|
||||||
|
keys[_id] = key
|
||||||
|
n++
|
||||||
|
}
|
||||||
|
if len(keys) > 0 {
|
||||||
|
pipe.HMSetForMap(this.ukey(uid), keys)
|
||||||
|
_, err = pipe.Exec()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if this.Expired > 0 {
|
||||||
|
childs := make(map[string]struct{}, len(keys))
|
||||||
|
for _, v := range keys {
|
||||||
|
childs[v] = struct{}{}
|
||||||
|
}
|
||||||
|
this.conn.UpDateModelExpired(this.ukey(uid), childs, this.Expired)
|
||||||
|
// this.Redis.Expire(this.ukey(uid), option.Expire)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
//获取队列数据
|
||||||
|
func (this *DBModel) GetQueues(key string, count int, data interface{}) (err error) {
|
||||||
|
//defer log.Debug("DBModel GetQueues", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "key", Value: key}, log.Field{Key: "data", Value: data})
|
||||||
|
var (
|
||||||
|
dtype reflect2.Type
|
||||||
|
dkind reflect.Kind
|
||||||
|
sType reflect2.Type
|
||||||
|
sliceType *reflect2.UnsafeSliceType
|
||||||
|
sliceelemType reflect2.Type
|
||||||
|
dptr unsafe.Pointer
|
||||||
|
elemPtr unsafe.Pointer
|
||||||
|
decoder codecore.IDecoderMapJson
|
||||||
|
ok bool
|
||||||
|
n int
|
||||||
|
keys = make([]string, 0)
|
||||||
|
result = make([]*redis.StringStringMapCmd, 0)
|
||||||
|
tempdata map[string]string
|
||||||
|
)
|
||||||
|
|
||||||
|
dptr = reflect2.PtrOf(data)
|
||||||
|
dtype = reflect2.TypeOf(data)
|
||||||
|
dkind = dtype.Kind()
|
||||||
|
if dkind != reflect.Ptr {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sType = dtype.(*reflect2.UnsafePtrType).Elem()
|
||||||
|
if sType.Kind() != reflect.Slice {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sliceType = sType.(*reflect2.UnsafeSliceType)
|
||||||
|
sliceelemType = sliceType.Elem()
|
||||||
|
if sliceelemType.Kind() != reflect.Ptr {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem()
|
||||||
|
|
||||||
|
pipe := this.Redis.RedisPipe(context.TODO())
|
||||||
|
if keys, err = this.Redis.LRangeToStringSlice(key, -1*count, -1).Result(); err == nil {
|
||||||
|
result = make([]*redis.StringStringMapCmd, 0)
|
||||||
|
for _, v := range keys {
|
||||||
|
cmd := pipe.HGetAllToMapString(v)
|
||||||
|
result = append(result, cmd)
|
||||||
|
}
|
||||||
|
pipe.Exec()
|
||||||
|
for _, v := range result {
|
||||||
|
tempdata, err = v.Result()
|
||||||
|
sliceType.UnsafeGrow(dptr, n+1)
|
||||||
|
elemPtr = sliceType.UnsafeGetIndex(dptr, n)
|
||||||
|
if *((*unsafe.Pointer)(elemPtr)) == nil {
|
||||||
|
newPtr := sliceelemType.UnsafeNew()
|
||||||
|
if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil {
|
||||||
|
log.Errorf("err:%v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
*((*unsafe.Pointer)(elemPtr)) = newPtr
|
||||||
|
} else {
|
||||||
|
decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata)
|
||||||
|
}
|
||||||
|
n++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//读取单个数据中 多个字段数据
|
||||||
|
func (this *DBModel) GetFields(uid string, data interface{}, fields ...string) (err error) {
|
||||||
|
//defer log.Debug("DBModel GetFields", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "data", Value: data})
|
||||||
|
if err = this.Redis.HMGet(this.ukey(uid), data, fields...); err != nil && err != lgredis.RedisNil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err == lgredis.RedisNil {
|
||||||
|
if err = this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"uid": uid}).Decode(data); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err = this.Redis.HMSet(this.ukey(uid), data); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if this.Expired > 0 {
|
||||||
|
this.conn.UpDateModelExpired(this.ukey(uid), nil, this.Expired)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//读取List列表中单个数据中 多个字段数据
|
||||||
|
func (this *DBModel) GetListFields(uid string, id string, data interface{}, fields ...string) (err error) {
|
||||||
|
//defer log.Debug("DBModel GetListFields", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "id", Value: id}, log.Field{Key: "data", Value: data})
|
||||||
|
var (
|
||||||
|
keys map[string]string
|
||||||
|
tempdata map[string]string
|
||||||
|
)
|
||||||
|
if err = this.Redis.HMGet(this.ukeylist(uid, id), data, fields...); err != nil && err != lgredis.RedisNil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err == lgredis.RedisNil {
|
||||||
|
if err = this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"_id": id}).Decode(data); err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
pipe := this.Redis.RedisPipe(context.TODO())
|
||||||
|
key := this.ukeylist(uid, id)
|
||||||
|
pipe.HMSetForMap(key, tempdata)
|
||||||
|
keys[id] = key
|
||||||
|
pipe.HMSetForMap(this.ukey(uid), keys)
|
||||||
|
if _, err = pipe.Exec(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if this.Expired > 0 {
|
||||||
|
childs := make(map[string]struct{}, len(keys))
|
||||||
|
for _, v := range keys {
|
||||||
|
childs[v] = struct{}{}
|
||||||
|
}
|
||||||
|
this.conn.UpDateModelExpired(this.ukey(uid), childs, this.Expired)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//读取列表数据中单个数据
|
||||||
|
func (this *DBModel) GetListObj(uid string, id string, data interface{}) (err error) {
|
||||||
|
//defer log.Debug("DBModel GetListObj", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "id", Value: id}, log.Field{Key: "data", Value: data})
|
||||||
|
var (
|
||||||
|
keys map[string]string
|
||||||
|
)
|
||||||
|
keys = make(map[string]string)
|
||||||
|
if err = this.Redis.HGetAll(this.ukeylist(uid, id), data); err != nil && err != lgredis.RedisNil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err == lgredis.RedisNil {
|
||||||
|
if err = this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"_id": id}).Decode(data); err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
pipe := this.Redis.RedisPipe(context.TODO())
|
||||||
|
key := this.ukeylist(uid, id)
|
||||||
|
pipe.HMSet(key, data)
|
||||||
|
keys[id] = key
|
||||||
|
pipe.HMSetForMap(this.ukey(uid), keys)
|
||||||
|
if _, err = pipe.Exec(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if this.Expired > 0 {
|
||||||
|
childs := make(map[string]struct{}, len(keys))
|
||||||
|
for _, v := range keys {
|
||||||
|
childs[v] = struct{}{}
|
||||||
|
}
|
||||||
|
this.conn.UpDateModelExpired(this.ukey(uid), childs, this.Expired)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//读取列表数据中单个数据
|
||||||
|
func (this *DBModel) GetListObjs(uid string, ids []string, data interface{}) (err error) {
|
||||||
|
//defer log.Debug("DBModel GetListObjs", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "ids", Value: ids}, log.Field{Key: "data", Value: data})
|
||||||
|
var (
|
||||||
|
dtype reflect2.Type
|
||||||
|
dkind reflect.Kind
|
||||||
|
sType reflect2.Type
|
||||||
|
sliceType *reflect2.UnsafeSliceType
|
||||||
|
sliceelemType reflect2.Type
|
||||||
|
decoder codecore.IDecoderMapJson
|
||||||
|
encoder codecore.IEncoderMapJson
|
||||||
|
dptr unsafe.Pointer
|
||||||
|
elemPtr unsafe.Pointer
|
||||||
|
n int
|
||||||
|
ok bool
|
||||||
|
keys map[string]string = make(map[string]string)
|
||||||
|
tempdata map[string]string
|
||||||
|
onfound []string = make([]string, 0, len(ids))
|
||||||
|
pipe *pipe.RedisPipe = this.Redis.RedisPipe(context.TODO())
|
||||||
|
result []*redis.StringStringMapCmd = make([]*redis.StringStringMapCmd, len(ids))
|
||||||
|
c *mongo.Cursor
|
||||||
|
)
|
||||||
|
dptr = reflect2.PtrOf(data)
|
||||||
|
dtype = reflect2.TypeOf(data)
|
||||||
|
dkind = dtype.Kind()
|
||||||
|
if dkind != reflect.Ptr {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sType = dtype.(*reflect2.UnsafePtrType).Elem()
|
||||||
|
if sType.Kind() != reflect.Slice {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sliceType = sType.(*reflect2.UnsafeSliceType)
|
||||||
|
sliceelemType = sliceType.Elem()
|
||||||
|
if sliceelemType.Kind() != reflect.Ptr {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if decoder, ok = codec.DecoderOf(sliceelemType, defconf).(codecore.IDecoderMapJson); !ok {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem()
|
||||||
|
for i, v := range ids {
|
||||||
|
result[i] = pipe.HGetAllToMapString(this.ukeylist(uid, v))
|
||||||
|
}
|
||||||
|
if _, err = pipe.Exec(); err == nil {
|
||||||
|
for i, v := range result {
|
||||||
|
if tempdata, err = v.Result(); err == nil {
|
||||||
|
sliceType.UnsafeGrow(dptr, n+1)
|
||||||
|
elemPtr = sliceType.UnsafeGetIndex(dptr, n)
|
||||||
|
if *((*unsafe.Pointer)(elemPtr)) == nil {
|
||||||
|
newPtr := sliceelemType.UnsafeNew()
|
||||||
|
if err = decoder.DecodeForMapJson(newPtr, json.GetReader([]byte{}), tempdata); err != nil {
|
||||||
|
log.Errorf("err:%v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
*((*unsafe.Pointer)(elemPtr)) = newPtr
|
||||||
|
} else {
|
||||||
|
decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetReader([]byte{}), tempdata)
|
||||||
|
}
|
||||||
|
n++
|
||||||
|
} else {
|
||||||
|
onfound = append(onfound, ids[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
onfound = ids
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(onfound) > 0 {
|
||||||
|
if c, err = this.DB.Find(core.SqlTable(this.TableName), bson.M{"_id": bson.M{"$in": onfound}}); err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
if encoder, ok = codec.EncoderOf(sliceelemType, defconf).(codecore.IEncoderMapJson); !ok {
|
||||||
|
err = fmt.Errorf("MCompModel: GetList(data not support UnMarshalMapJson %T)", data)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pipe := this.Redis.RedisPipe(context.TODO())
|
||||||
|
for c.Next(context.Background()) {
|
||||||
|
_id := c.Current.Lookup("_id").StringValue()
|
||||||
|
sliceType.UnsafeGrow(dptr, n+1)
|
||||||
|
elemPtr = sliceType.UnsafeGetIndex(dptr, n)
|
||||||
|
if *((*unsafe.Pointer)(elemPtr)) == nil {
|
||||||
|
newPtr := sliceelemType.UnsafeNew()
|
||||||
|
*((*unsafe.Pointer)(elemPtr)) = newPtr
|
||||||
|
}
|
||||||
|
elem := sliceType.GetIndex(data, n)
|
||||||
|
if err = c.Decode(elem); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if tempdata, err = encoder.EncodeToMapJson(*((*unsafe.Pointer)(elemPtr)), json.GetWriter()); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for i, v := range onfound {
|
||||||
|
if v == _id {
|
||||||
|
onfound = append(onfound[0:i], onfound[i+1:]...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
key := this.ukeylist(uid, _id)
|
||||||
|
pipe.HMSetForMap(key, tempdata)
|
||||||
|
keys[_id] = key
|
||||||
|
n++
|
||||||
|
}
|
||||||
|
if len(keys) > 0 {
|
||||||
|
pipe.HMSetForMap(this.ukey(uid), keys)
|
||||||
|
_, err = pipe.Exec()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(onfound) > 0 {
|
||||||
|
err = fmt.Errorf("onfound:%v data!", onfound)
|
||||||
|
}
|
||||||
|
if this.Expired > 0 {
|
||||||
|
childs := make(map[string]struct{}, len(keys))
|
||||||
|
for _, v := range keys {
|
||||||
|
childs[v] = struct{}{}
|
||||||
|
}
|
||||||
|
this.conn.UpDateModelExpired(this.ukey(uid), childs, this.Expired)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//删除目标数据
|
||||||
|
func (this *DBModel) Del(id string, opt ...DBOption) (err error) {
|
||||||
|
//defer log.Debug("DBModel Del", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid})
|
||||||
|
err = this.Redis.Delete(this.ukey(id))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
option := newDBOption(opt...)
|
||||||
|
if option.IsMgoLog {
|
||||||
|
err = this.DeleteModelLogs(this.TableName, "", bson.M{"_id": id})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//删除用户数据
|
||||||
|
func (this *DBModel) DelByUId(uid string, opt ...DBOption) (err error) {
|
||||||
|
//defer log.Debug("DBModel Del", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid})
|
||||||
|
err = this.Redis.Delete(this.ukey(uid))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
option := newDBOption(opt...)
|
||||||
|
if option.IsMgoLog {
|
||||||
|
err = this.DeleteModelLogs(this.TableName, uid, bson.M{"uid": uid})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//删除多条数据
|
||||||
|
func (this *DBModel) DelListlds(uid string, ids ...string) (err error) {
|
||||||
|
//defer log.Debug("DBModel DelListlds", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid}, log.Field{Key: "ids", Value: ids})
|
||||||
|
listkey := this.ukey(uid)
|
||||||
|
pipe := this.Redis.RedisPipe(context.TODO())
|
||||||
|
for _, v := range ids {
|
||||||
|
key := this.ukeylist(uid, v)
|
||||||
|
pipe.Delete(key)
|
||||||
|
}
|
||||||
|
pipe.HDel(listkey, ids...)
|
||||||
|
if _, err = pipe.Exec(); err == nil {
|
||||||
|
err = this.DeleteModelLogs(this.TableName, uid, bson.M{"_id": bson.M{"$in": ids}})
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//批量删除数据
|
||||||
|
func (this *DBModel) BatchDelLists(uid string) (err error) {
|
||||||
|
//defer log.Debug("DBModel BatchDelLists", log.Field{Key: "TableName", Value: this.TableName}, log.Field{Key: "uid", Value: uid})
|
||||||
|
var keys map[string]string
|
||||||
|
pipe := this.Redis.RedisPipe(context.TODO())
|
||||||
|
if keys, err = this.Redis.HGetAllToMapString(this.ukey(uid)); err == nil {
|
||||||
|
for _, v := range keys {
|
||||||
|
pipe.Delete(v)
|
||||||
|
}
|
||||||
|
pipe.Delete(this.ukey(uid))
|
||||||
|
pipe.Exec()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
@ -1,73 +0,0 @@
|
|||||||
package db
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
//更新数据模块过期
|
|
||||||
func (this *DB) UpDateModelExpired(key string, childs map[string]struct{}, expired time.Duration) {
|
|
||||||
this.mu.RLock()
|
|
||||||
exp, ok := this.data[key]
|
|
||||||
this.mu.RUnlock()
|
|
||||||
if ok {
|
|
||||||
if childs != nil {
|
|
||||||
if exp.keys == nil {
|
|
||||||
exp.keys = make(map[string]struct{})
|
|
||||||
}
|
|
||||||
for k, _ := range childs {
|
|
||||||
exp.keys[k] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
exp.expired = time.Now().Add(expired)
|
|
||||||
} else {
|
|
||||||
exp = &ModelDataExpired{
|
|
||||||
key: key,
|
|
||||||
keys: childs,
|
|
||||||
expired: time.Now().Add(expired),
|
|
||||||
}
|
|
||||||
this.mu.Lock()
|
|
||||||
this.data[key] = exp
|
|
||||||
this.mu.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//定时清理过期数据
|
|
||||||
func (this *DB) run() {
|
|
||||||
timer := time.NewTicker(time.Minute * 1)
|
|
||||||
defer timer.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-timer.C:
|
|
||||||
this.scanning()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//扫描过期
|
|
||||||
func (this *DB) scanning() {
|
|
||||||
now := time.Now()
|
|
||||||
this.mu.Lock()
|
|
||||||
temp := make([]*ModelDataExpired, 0, len(this.data))
|
|
||||||
for k, v := range this.data {
|
|
||||||
if v.expired.Before(now) { //过期
|
|
||||||
temp = append(temp, v)
|
|
||||||
delete(this.data, k)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.mu.Unlock()
|
|
||||||
ctx := context.Background()
|
|
||||||
pipe := this.local.Redis.Pipeline()
|
|
||||||
for _, v := range temp {
|
|
||||||
pipe.Del(ctx, v.key)
|
|
||||||
if v.keys != nil {
|
|
||||||
for k1, _ := range v.keys {
|
|
||||||
pipe.Del(ctx, k1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if _, err := pipe.Exec(ctx); err != nil {
|
|
||||||
this.options.Log.Errorln(err)
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user