diff --git a/lego/sys/timewheel/options.go b/lego/sys/timewheel/options.go index c8ec3e146..aae627cd7 100644 --- a/lego/sys/timewheel/options.go +++ b/lego/sys/timewheel/options.go @@ -10,7 +10,6 @@ type Option func(*Options) type Options struct { Tick time.Duration //不小于 10毫秒 BucketsNum int - IsSyncPool bool } func SetTick(v time.Duration) Option { @@ -25,17 +24,10 @@ func SetBucketsNum(v int) Option { } } -func SetIsSyncPool(v bool) Option { - return func(o *Options) { - o.IsSyncPool = v - } -} - func newOptions(config map[string]interface{}, opts ...Option) Options { options := Options{ Tick: time.Second, BucketsNum: 1024, - IsSyncPool: true, } if config != nil { mapstructure.Decode(config, &options) @@ -58,7 +50,6 @@ func newOptionsByOption(opts ...Option) Options { options := Options{ Tick: 1000, BucketsNum: 1024, - IsSyncPool: true, } for _, o := range opts { o(&options) diff --git a/lego/sys/timewheel/timewheel.go b/lego/sys/timewheel/timewheel.go index 6f327d3a4..8381f45f3 100644 --- a/lego/sys/timewheel/timewheel.go +++ b/lego/sys/timewheel/timewheel.go @@ -2,8 +2,10 @@ package timewheel import ( "context" + "fmt" "go_dreamfactory/lego" "go_dreamfactory/lego/sys/log" + "runtime" "sync" "sync/atomic" "time" @@ -23,10 +25,9 @@ func newsys(options Options) (sys *TimeWheel, err error) { currentIndex: 0, // signal - addC: make(chan *Task, 1024*5), - removeC: make(chan *Task, 1024*2), - stopC: make(chan struct{}), - syncPool: options.IsSyncPool, + addC: make(chan *Task, 1024*5), + removeC: make(chan *Task, 1024*2), + stopC: make(chan struct{}), } for i := 0; i < options.BucketsNum; i++ { @@ -56,9 +57,9 @@ type ( round int args []interface{} callback func(*Task, ...interface{}) - async bool - stop bool - circle bool + async bool //异步执行 + stop bool //是否停止 + circle bool //是否循环 } TimeWheel struct { randomID int64 @@ -74,7 +75,6 @@ type ( removeC chan *Task stopC chan struct{} exited bool - syncPool bool } ) @@ -162,20 +162,22 @@ func (this *TimeWheel) schduler() { // 清理 func (this *TimeWheel) collectTask(task *Task) { - index := this.bucketIndexes[task.id] - delete(this.bucketIndexes, task.id) - delete(this.buckets[index], task.id) - - if this.syncPool && !task.circle { - defaultTaskPool.put(task) + if index, ok := this.bucketIndexes[task.id]; ok { + delete(this.bucketIndexes, task.id) + delete(this.buckets[index], task.id) } } +func (this *TimeWheel) recoverTask(task *Task) { + defaultTaskPool.put(task) +} + func (this *TimeWheel) handleTick() { bucket := this.buckets[this.currentIndex] for k, task := range bucket { if task.stop { this.collectTask(task) + this.recoverTask(task) continue } @@ -183,25 +185,32 @@ func (this *TimeWheel) handleTick() { bucket[k].round-- continue } - + this.collectTask(task) if task.async { go func(_task *Task) { + defer func() { //程序异常 收集异常信息传递给前端显示 + if r := recover(); r != nil { + buf := make([]byte, 4096) + l := runtime.Stack(buf, false) + err := fmt.Errorf("%v: %s", r, buf[:l]) + log.Errorf("[timewheel] calltask err:%s", err.Error()) + } + }() this.calltask(_task, _task.args...) - this.collectTask(_task) + if _task.circle { + this.putCircle(_task, true) + } else { + this.recoverTask(_task) + } }(task) } else { - // optimize gopool this.calltask(task, task.args...) - } - - // circle - if task.circle { - this.collectTask(task) - this.putCircle(task, modeIsCircle) - continue - } - if !task.async { //异步模式下不能清理 - this.collectTask(task) + //循环执行 + if task.circle { + this.putCircle(task, true) + } else { + this.recoverTask(task) + } } } @@ -231,12 +240,7 @@ func (this *TimeWheel) addAny(delay time.Duration, circle, async bool, callback id := this.genUniqueID() var task *Task - if this.syncPool { - task = defaultTaskPool.get() - } else { - task = new(Task) - } - + task = defaultTaskPool.get() task.delay = delay task.id = id task.args = agr @@ -286,6 +290,7 @@ func (this *TimeWheel) calculateIndex(delay time.Duration) (index int) { func (this *TimeWheel) remove(task *Task) { this.collectTask(task) + this.recoverTask(task) } func (this *TimeWheel) NewTimer(delay time.Duration) *Timer {