上传时间轮系统 处理延时任务

This commit is contained in:
liwei1dao 2022-09-07 11:05:04 +08:00
parent 05ec2a1f9f
commit 57153d13cf
4 changed files with 27 additions and 14 deletions

View File

@ -3,16 +3,17 @@ package timewheel
import ( import (
"go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/sys/log"
"go_dreamfactory/lego/utils/mapstructure" "go_dreamfactory/lego/utils/mapstructure"
"time"
) )
type Option func(*Options) type Option func(*Options)
type Options struct { type Options struct {
Tick int //单位毫秒 Tick time.Duration //不小于 10毫秒
BucketsNum int BucketsNum int
IsSyncPool bool IsSyncPool bool
} }
func SetTick(v int) Option { func SetTick(v time.Duration) Option {
return func(o *Options) { return func(o *Options) {
o.Tick = v o.Tick = v
} }
@ -32,7 +33,7 @@ func SetIsSyncPool(v bool) Option {
func newOptions(config map[string]interface{}, opts ...Option) Options { func newOptions(config map[string]interface{}, opts ...Option) Options {
options := Options{ options := Options{
Tick: 1000, Tick: time.Second,
BucketsNum: 1, BucketsNum: 1,
IsSyncPool: true, IsSyncPool: true,
} }
@ -42,9 +43,9 @@ func newOptions(config map[string]interface{}, opts ...Option) Options {
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
if options.Tick < 100 { if options.Tick < 100*time.Millisecond {
log.Errorf("创建时间轮参数异常 Tick 必须大于 100 ms ") log.Errorf("创建时间轮参数异常 Tick 必须大于 100 ms ")
options.Tick = 100 options.Tick = 100 * time.Millisecond
} }
if options.BucketsNum < 0 { if options.BucketsNum < 0 {
log.Errorf("创建时间轮参数异常 BucketsNum 必须大于 0 ") log.Errorf("创建时间轮参数异常 BucketsNum 必须大于 0 ")
@ -62,9 +63,9 @@ func newOptionsByOption(opts ...Option) Options {
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
if options.Tick < 100 { if options.Tick < 100*time.Millisecond {
log.Warnf("创建时间轮参数异常 Tick 必须大于 100 ms ") log.Warnf("创建时间轮参数异常 Tick 必须大于 100 ms ")
options.Tick = 100 options.Tick = 100 * time.Millisecond
} }
if options.BucketsNum < 0 { if options.BucketsNum < 0 {
log.Warnf("创建时间轮参数异常 BucketsNum 必须大于 0 ") log.Warnf("创建时间轮参数异常 BucketsNum 必须大于 0 ")

View File

@ -11,7 +11,7 @@ import (
func newsys(options Options) (sys *TimeWheel, err error) { func newsys(options Options) (sys *TimeWheel, err error) {
sys = &TimeWheel{ sys = &TimeWheel{
// tick // tick
tick: time.Millisecond * time.Duration(options.Tick), tick: options.Tick,
tickQueue: make(chan time.Time, 10), tickQueue: make(chan time.Time, 10),
// store // store
@ -194,7 +194,7 @@ func (this *TimeWheel) handleTick() {
this.putCircle(task, modeIsCircle) this.putCircle(task, modeIsCircle)
continue continue
} }
// gc // gc
this.collectTask(task) this.collectTask(task)
} }

View File

@ -4,6 +4,7 @@ import (
"go_dreamfactory/comm" "go_dreamfactory/comm"
"go_dreamfactory/lego/core" "go_dreamfactory/lego/core"
"go_dreamfactory/lego/sys/mgo" "go_dreamfactory/lego/sys/mgo"
"go_dreamfactory/lego/sys/timewheel"
"go_dreamfactory/modules" "go_dreamfactory/modules"
"go_dreamfactory/pb" "go_dreamfactory/pb"
cfg "go_dreamfactory/sys/configure/structs" cfg "go_dreamfactory/sys/configure/structs"
@ -80,6 +81,16 @@ func (this *modelDreamComp) noticeuserfriend(stag, uid string, chat *pb.DBChat)
} }
chat.Id = primitive.NewObjectID().Hex() chat.Id = primitive.NewObjectID().Hex()
chat.Channel = pb.ChatChannel_World chat.Channel = pb.ChatChannel_World
code = this.module.chat.SendWorldChat(stag, chat) // code = this.module.chat.SendWorldChat(stag, chat)
this.delaynoticeWorld(stag, chat)
return return
} }
//延迟推送到 世界聊天频道
func (this *modelDreamComp) delaynoticeWorld(stag string, chat *pb.DBChat) {
timewheel.Add(time.Minute*15, func(t *timewheel.Task, i ...interface{}) {
_stag := i[0].(string)
_chat := i[0].(*pb.DBChat)
this.module.chat.SendWorldChat(_stag, _chat)
}, stag, chat)
}

View File

@ -28,12 +28,13 @@ import (
"go_dreamfactory/modules/viking" "go_dreamfactory/modules/viking"
"go_dreamfactory/services" "go_dreamfactory/services"
"go_dreamfactory/sys/db" "go_dreamfactory/sys/db"
"time"
"go_dreamfactory/lego" "go_dreamfactory/lego"
"go_dreamfactory/lego/base/rpcx" "go_dreamfactory/lego/base/rpcx"
"go_dreamfactory/lego/core" "go_dreamfactory/lego/core"
"go_dreamfactory/lego/sys/cron"
"go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/sys/log"
"go_dreamfactory/lego/sys/timewheel"
) )
/* /*
@ -96,10 +97,10 @@ type Service struct {
func (this *Service) InitSys() { func (this *Service) InitSys() {
this.ServiceBase.InitSys() this.ServiceBase.InitSys()
//定时系统 //定时系统
if err := cron.OnInit(nil); err != nil { if err := timewheel.OnInit(nil, timewheel.SetTick(time.Minute)); err != nil {
panic(fmt.Sprintf("init sys.cron err: %s", err.Error())) panic(fmt.Sprintf("init sys.timewheel err: %s", err.Error()))
} else { } else {
log.Infof("init sys.cron success!") log.Infof("init sys.timewheel success!")
} }
//存储系统 //存储系统
if err := db.OnInit(this.GetSettings().Sys["db"]); err != nil { if err := db.OnInit(this.GetSettings().Sys["db"]); err != nil {