This commit is contained in:
meixiongfeng 2023-07-13 11:49:52 +08:00
commit 5d302312b2
3 changed files with 21 additions and 23 deletions

View File

@ -7,7 +7,7 @@ import (
"go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/sys/log"
) )
//启动服务 // 启动服务
func Run(service core.IService, mod ...core.IModule) { func Run(service core.IService, mod ...core.IModule) {
cpuNum := runtime.NumCPU() //获得当前设备的cpu核心数 cpuNum := runtime.NumCPU() //获得当前设备的cpu核心数
runtime.GOMAXPROCS(cpuNum) //设置需要用到的cpu数量 runtime.GOMAXPROCS(cpuNum) //设置需要用到的cpu数量
@ -27,7 +27,7 @@ func Run(service core.IService, mod ...core.IModule) {
log.Infof("服务【%s】关闭成功", service.GetId()) log.Infof("服务【%s】关闭成功", service.GetId())
} }
//错误采集 // 错误采集
func Recover(tag string) { func Recover(tag string) {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 1024) buf := make([]byte, 1024)

View File

@ -2,8 +2,8 @@ package timewheel
import ( import (
"context" "context"
"go_dreamfactory/lego"
"go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/sys/log"
"runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -88,7 +88,7 @@ func (t *Task) Reset() {
t.circle = false t.circle = false
} }
//启动时间轮 // 启动时间轮
func (this *TimeWheel) Start() { func (this *TimeWheel) Start() {
// onlye once start // onlye once start
this.onceStart.Do( this.onceStart.Do(
@ -114,12 +114,12 @@ func (this *TimeWheel) Remove(task *Task) error {
return nil return nil
} }
//停止时间轮 // 停止时间轮
func (this *TimeWheel) Stop() { func (this *TimeWheel) Stop() {
this.stopC <- struct{}{} this.stopC <- struct{}{}
} }
//此处写法 为监控时间轮是否正常执行 // 此处写法 为监控时间轮是否正常执行
func (this *TimeWheel) tickGenerator() { func (this *TimeWheel) tickGenerator() {
if this.tickQueue == nil { if this.tickQueue == nil {
return return
@ -137,7 +137,7 @@ func (this *TimeWheel) tickGenerator() {
} }
} }
//调度器 // 调度器
func (this *TimeWheel) schduler() { func (this *TimeWheel) schduler() {
queue := this.ticker.C queue := this.ticker.C
if this.tickQueue != nil { if this.tickQueue != nil {
@ -160,7 +160,7 @@ func (this *TimeWheel) schduler() {
} }
} }
//清理 // 清理
func (this *TimeWheel) collectTask(task *Task) { func (this *TimeWheel) collectTask(task *Task) {
index := this.bucketIndexes[task.id] index := this.bucketIndexes[task.id]
delete(this.bucketIndexes, task.id) delete(this.bucketIndexes, task.id)
@ -185,10 +185,10 @@ func (this *TimeWheel) handleTick() {
} }
if task.async { if task.async {
go func(task *Task) { go func(_task *Task) {
go this.calltask(task, task.args...) this.calltask(_task, _task.args...)
this.collectTask(_task)
}(task) }(task)
} else { } else {
// optimize gopool // optimize gopool
this.calltask(task, task.args...) this.calltask(task, task.args...)
@ -200,9 +200,9 @@ func (this *TimeWheel) handleTick() {
this.putCircle(task, modeIsCircle) this.putCircle(task, modeIsCircle)
continue continue
} }
if !task.async { //异步模式下不能清理
// gc this.collectTask(task)
this.collectTask(task) }
} }
if this.currentIndex == this.bucketsNum-1 { if this.currentIndex == this.bucketsNum-1 {
@ -213,15 +213,13 @@ func (this *TimeWheel) handleTick() {
this.currentIndex++ this.currentIndex++
} }
//执行时间轮事件 捕捉异常错误 防止程序崩溃 // 执行时间轮事件 捕捉异常错误 防止程序崩溃
func (this *TimeWheel) calltask(task *Task, args ...interface{}) { func (this *TimeWheel) calltask(task *Task, args ...interface{}) {
defer func() { //程序异常 收集异常信息传递给前端显示 defer lego.Recover("TimeWheel")
if r := recover(); r != nil { if task.callback == nil {
buf := make([]byte, 4096) log.Error("sys.timeWheel task callback err!", log.Field{Key: "task", Value: task})
l := runtime.Stack(buf, false) return
log.Errorf("timewheel err:%s", string(buf[0:l])) }
}
}()
task.callback(task, task.args...) task.callback(task, task.args...)
} }

View File

@ -153,7 +153,7 @@ func (this *apiComp) Agree(session comm.IUserSession, req *pb.FriendAgreeReq) (e
return return
} }
event.TriggerEvent(comm.EventFriendChange, uid, len(self.FriendIds)) event.TriggerEvent(comm.EventFriendChange, uid, int32(len(self.FriendIds)))
} }
// 拥有xx个好友 // 拥有xx个好友