上传数据模块接口补充
This commit is contained in:
parent
e845ed0adc
commit
5b2123bb27
@ -1,5 +1,7 @@
|
|||||||
package db
|
package db
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
type (
|
type (
|
||||||
ISys interface {
|
ISys interface {
|
||||||
//本服数据连接
|
//本服数据连接
|
||||||
@ -8,6 +10,16 @@ type (
|
|||||||
Cross() *DBConn
|
Cross() *DBConn
|
||||||
//跨服列表数据层连接
|
//跨服列表数据层连接
|
||||||
ServerDBConn(stage string) (conn *DBConn)
|
ServerDBConn(stage string) (conn *DBConn)
|
||||||
|
///获取区服列表标签
|
||||||
|
GetServerTags() []string
|
||||||
|
//更新数据过期
|
||||||
|
UpDateModelExpired(key string, childs map[string]struct{}, expired time.Duration)
|
||||||
|
}
|
||||||
|
//过期数据
|
||||||
|
ModelDataExpired struct {
|
||||||
|
key string //主key
|
||||||
|
keys map[string]struct{} //数据集合
|
||||||
|
expired time.Time //过期时间
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -42,3 +54,13 @@ func Cross() *DBConn {
|
|||||||
func ServerDBConn(stage string) (conn *DBConn) {
|
func ServerDBConn(stage string) (conn *DBConn) {
|
||||||
return defsys.ServerDBConn(stage)
|
return defsys.ServerDBConn(stage)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
///获取区服列表标签
|
||||||
|
func GetServerTags() []string {
|
||||||
|
return defsys.GetServerTags()
|
||||||
|
}
|
||||||
|
|
||||||
|
//更新数据过期
|
||||||
|
func UpDateModelExpired(key string, childs map[string]struct{}, expired time.Duration) {
|
||||||
|
defsys.UpDateModelExpired(key, childs, expired)
|
||||||
|
}
|
||||||
|
10
sys/db/db.go
10
sys/db/db.go
@ -2,6 +2,7 @@ package db
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"go_dreamfactory/lego/sys/log"
|
"go_dreamfactory/lego/sys/log"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newSys(options *Options) (sys *DB, err error) {
|
func newSys(options *Options) (sys *DB, err error) {
|
||||||
@ -15,6 +16,8 @@ type DB struct {
|
|||||||
local *DBConn
|
local *DBConn
|
||||||
cross *DBConn
|
cross *DBConn
|
||||||
servers map[string]*DBConn
|
servers map[string]*DBConn
|
||||||
|
mu sync.RWMutex
|
||||||
|
data map[string]*ModelDataExpired //过期数据
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *DB) init() (err error) {
|
func (this *DB) init() (err error) {
|
||||||
@ -59,3 +62,10 @@ func (this *DB) ServerDBConn(stage string) (conn *DBConn) {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
func (this *DB) GetServerTags() []string {
|
||||||
|
keys := make([]string, 0)
|
||||||
|
for k, _ := range this.servers {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
return keys
|
||||||
|
}
|
||||||
|
66
sys/db/expired.go
Normal file
66
sys/db/expired.go
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
package db
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
//更新数据模块过期
|
||||||
|
func (this *DB) UpDateModelExpired(key string, childs map[string]struct{}, expired time.Duration) {
|
||||||
|
this.mu.RLock()
|
||||||
|
exp, ok := this.data[key]
|
||||||
|
this.mu.RUnlock()
|
||||||
|
if ok {
|
||||||
|
exp.keys = childs
|
||||||
|
exp.expired = time.Now().Add(expired)
|
||||||
|
} else {
|
||||||
|
exp = &ModelDataExpired{
|
||||||
|
key: key,
|
||||||
|
keys: childs,
|
||||||
|
expired: time.Now().Add(expired),
|
||||||
|
}
|
||||||
|
this.mu.Lock()
|
||||||
|
this.data[key] = exp
|
||||||
|
this.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//定时清理过期数据
|
||||||
|
func (this *DB) run() {
|
||||||
|
timer := time.NewTicker(time.Minute * 1)
|
||||||
|
defer timer.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
this.scanning()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//扫描过期
|
||||||
|
func (this *DB) scanning() {
|
||||||
|
now := time.Now()
|
||||||
|
this.mu.Lock()
|
||||||
|
temp := make([]*ModelDataExpired, 0, len(this.data))
|
||||||
|
for k, v := range this.data {
|
||||||
|
if v.expired.Before(now) { //过期
|
||||||
|
temp = append(temp, v)
|
||||||
|
delete(this.data, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.mu.Unlock()
|
||||||
|
ctx := context.Background()
|
||||||
|
pipe := this.local.Redis.Pipeline()
|
||||||
|
for _, v := range temp {
|
||||||
|
pipe.Del(ctx, v.key)
|
||||||
|
if v.keys != nil {
|
||||||
|
for k1, _ := range v.keys {
|
||||||
|
pipe.Del(ctx, k1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if _, err := pipe.Exec(ctx); err != nil {
|
||||||
|
this.options.Log.Errorln(err)
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,8 @@
|
|||||||
package db
|
package db
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
|
"go_dreamfactory/lego/sys/log"
|
||||||
"go_dreamfactory/lego/utils/mapstructure"
|
"go_dreamfactory/lego/utils/mapstructure"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -21,6 +23,8 @@ type Options struct {
|
|||||||
Loacl DBConfig //本服配置
|
Loacl DBConfig //本服配置
|
||||||
Cross DBConfig //跨服配置
|
Cross DBConfig //跨服配置
|
||||||
ServerList map[string]DBConfig //服务列表配置
|
ServerList map[string]DBConfig //服务列表配置
|
||||||
|
Debug bool //日志是否开启
|
||||||
|
Log log.ILogger
|
||||||
}
|
}
|
||||||
|
|
||||||
//设置本服配置
|
//设置本服配置
|
||||||
@ -43,7 +47,17 @@ func SetServerList(v map[string]DBConfig) Option {
|
|||||||
o.ServerList = v
|
o.ServerList = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
func SetDebug(v bool) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Debug = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func SetLog(v log.ILogger) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Log = v
|
||||||
|
}
|
||||||
|
}
|
||||||
func newOptions(config map[string]interface{}, opts ...Option) (options *Options, err error) {
|
func newOptions(config map[string]interface{}, opts ...Option) (options *Options, err error) {
|
||||||
options = &Options{
|
options = &Options{
|
||||||
Loacl: DBConfig{},
|
Loacl: DBConfig{},
|
||||||
@ -58,6 +72,9 @@ func newOptions(config map[string]interface{}, opts ...Option) (options *Options
|
|||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(options)
|
o(options)
|
||||||
}
|
}
|
||||||
|
if options.Log = log.NewTurnlog(options.Debug, log.Clone("sys.db", 2)); options.Log == nil {
|
||||||
|
err = errors.New("log is nil")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,6 +83,9 @@ func newOptionsByOption(opts ...Option) (options *Options, err error) {
|
|||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(options)
|
o(options)
|
||||||
}
|
}
|
||||||
|
if options.Log = log.NewTurnlog(options.Debug, log.Clone("sys.db", 2)); options.Log == nil {
|
||||||
|
err = errors.New("log is nil")
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user