时间轮优化

This commit is contained in:
liwei1dao 2024-01-10 16:30:30 +08:00
parent 6ca2b346b6
commit d922939c1f
2 changed files with 38 additions and 42 deletions

View File

@ -10,7 +10,6 @@ type Option func(*Options)
type Options struct {
Tick time.Duration //不小于 10毫秒
BucketsNum int
IsSyncPool bool
}
func SetTick(v time.Duration) Option {
@ -25,17 +24,10 @@ func SetBucketsNum(v int) Option {
}
}
func SetIsSyncPool(v bool) Option {
return func(o *Options) {
o.IsSyncPool = v
}
}
func newOptions(config map[string]interface{}, opts ...Option) Options {
options := Options{
Tick: time.Second,
BucketsNum: 1024,
IsSyncPool: true,
}
if config != nil {
mapstructure.Decode(config, &options)
@ -58,7 +50,6 @@ func newOptionsByOption(opts ...Option) Options {
options := Options{
Tick: 1000,
BucketsNum: 1024,
IsSyncPool: true,
}
for _, o := range opts {
o(&options)

View File

@ -2,8 +2,10 @@ package timewheel
import (
"context"
"fmt"
"go_dreamfactory/lego"
"go_dreamfactory/lego/sys/log"
"runtime"
"sync"
"sync/atomic"
"time"
@ -26,7 +28,6 @@ func newsys(options Options) (sys *TimeWheel, err error) {
addC: make(chan *Task, 1024*5),
removeC: make(chan *Task, 1024*2),
stopC: make(chan struct{}),
syncPool: options.IsSyncPool,
}
for i := 0; i < options.BucketsNum; i++ {
@ -56,9 +57,9 @@ type (
round int
args []interface{}
callback func(*Task, ...interface{})
async bool
stop bool
circle bool
async bool //异步执行
stop bool //是否停止
circle bool //是否循环
}
TimeWheel struct {
randomID int64
@ -74,7 +75,6 @@ type (
removeC chan *Task
stopC chan struct{}
exited bool
syncPool bool
}
)
@ -162,20 +162,22 @@ func (this *TimeWheel) schduler() {
// 清理
func (this *TimeWheel) collectTask(task *Task) {
index := this.bucketIndexes[task.id]
if index, ok := this.bucketIndexes[task.id]; ok {
delete(this.bucketIndexes, task.id)
delete(this.buckets[index], task.id)
if this.syncPool && !task.circle {
defaultTaskPool.put(task)
}
}
func (this *TimeWheel) recoverTask(task *Task) {
defaultTaskPool.put(task)
}
func (this *TimeWheel) handleTick() {
bucket := this.buckets[this.currentIndex]
for k, task := range bucket {
if task.stop {
this.collectTask(task)
this.recoverTask(task)
continue
}
@ -183,25 +185,32 @@ func (this *TimeWheel) handleTick() {
bucket[k].round--
continue
}
this.collectTask(task)
if task.async {
go func(_task *Task) {
defer func() { //程序异常 收集异常信息传递给前端显示
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
err := fmt.Errorf("%v: %s", r, buf[:l])
log.Errorf("[timewheel] calltask err:%s", err.Error())
}
}()
this.calltask(_task, _task.args...)
this.collectTask(_task)
if _task.circle {
this.putCircle(_task, true)
} else {
this.recoverTask(_task)
}
}(task)
} else {
// optimize gopool
this.calltask(task, task.args...)
}
// circle
//循环执行
if task.circle {
this.collectTask(task)
this.putCircle(task, modeIsCircle)
continue
this.putCircle(task, true)
} else {
this.recoverTask(task)
}
if !task.async { //异步模式下不能清理
this.collectTask(task)
}
}
@ -231,12 +240,7 @@ func (this *TimeWheel) addAny(delay time.Duration, circle, async bool, callback
id := this.genUniqueID()
var task *Task
if this.syncPool {
task = defaultTaskPool.get()
} else {
task = new(Task)
}
task.delay = delay
task.id = id
task.args = agr
@ -286,6 +290,7 @@ func (this *TimeWheel) calculateIndex(delay time.Duration) (index int) {
func (this *TimeWheel) remove(task *Task) {
this.collectTask(task)
this.recoverTask(task)
}
func (this *TimeWheel) NewTimer(delay time.Duration) *Timer {