go_dreamfactory/sys/db/dbconn.go
2023-01-31 10:44:15 +08:00

123 lines
2.6 KiB
Go

package db
import (
"context"
"go_dreamfactory/lego/sys/log"
"go_dreamfactory/lego/sys/mgo"
lgredis "go_dreamfactory/lego/sys/redis"
"sync"
"time"
)
func newDBConn(lg log.ILogger, conf DBConfig) (conn *DBConn, err error) {
conn = &DBConn{
log: lg,
data: make(map[string]*ModelDataExpired),
}
if conf.RedisIsCluster {
conn.Redis, err = lgredis.NewSys(
lgredis.SetRedisType(lgredis.Redis_Cluster),
lgredis.SetRedis_Cluster_Addr(conf.RedisAddr),
lgredis.SetRedis_Cluster_Password(conf.RedisPassword))
} else {
conn.Redis, err = lgredis.NewSys(
lgredis.SetRedisType(lgredis.Redis_Single),
lgredis.SetRedis_Single_Addr(conf.RedisAddr[0]),
lgredis.SetRedis_Single_Password(conf.RedisPassword),
lgredis.SetRedis_Single_DB(conf.RedisDB),
)
}
if err != nil {
lg.Error(err.Error(), log.Field{Key: "config", Value: conf})
return
}
if conn.Mgo, err = mgo.NewSys(
mgo.SetMongodbUrl(conf.MongodbUrl),
mgo.SetMongodbDatabase(conf.MongodbDatabase),
); err != nil {
lg.Error(err.Error(), log.Field{Key: "config", Value: conf})
return
}
go conn.run()
return
}
type DBConn struct {
log log.ILogger
Redis lgredis.ISys
Mgo mgo.ISys
mu sync.RWMutex
data map[string]*ModelDataExpired //数据自动过期
}
//更新数据模块过期
func (this *DBConn) UpDateModelExpired(key string, childs map[string]struct{}, expired time.Duration) {
this.mu.RLock()
exp, ok := this.data[key]
this.mu.RUnlock()
if ok {
if childs != nil {
if exp.keys == nil {
exp.keys = make(map[string]struct{})
}
for k, _ := range childs {
exp.mu.Lock()
exp.keys[k] = struct{}{}
exp.mu.Unlock()
}
}
exp.expired = time.Now().Add(expired)
} else {
exp = &ModelDataExpired{
key: key,
keys: childs,
expired: time.Now().Add(expired),
}
this.mu.Lock()
this.data[key] = exp
this.mu.Unlock()
}
}
//定时清理过期数据
func (this *DBConn) run() {
timer := time.NewTicker(time.Minute * 1)
defer timer.Stop()
for {
select {
case <-timer.C:
this.scanning()
break
}
}
}
//扫描过期
func (this *DBConn) scanning() {
now := time.Now()
this.mu.Lock()
temp := make([]*ModelDataExpired, 0, len(this.data))
for k, v := range this.data {
if v.expired.Before(now) { //过期
temp = append(temp, v)
delete(this.data, k)
}
}
this.mu.Unlock()
ctx := context.Background()
pipe := this.Redis.Pipeline()
for _, v := range temp {
pipe.Del(ctx, v.key)
if v.keys != nil {
v.mu.RLock()
for k1, _ := range v.keys {
pipe.Del(ctx, k1)
}
v.mu.RUnlock()
}
}
if _, err := pipe.Exec(ctx); err != nil {
this.log.Errorln(err)
}
}