go_dreamfactory/modules/timer/activity.go

190 lines
4.7 KiB
Go

package timer
import (
"context"
"go_dreamfactory/comm"
"go_dreamfactory/lego/base"
"go_dreamfactory/lego/core"
"go_dreamfactory/lego/core/cbase"
"go_dreamfactory/lego/sys/timewheel"
"go_dreamfactory/modules"
"go_dreamfactory/pb"
"go_dreamfactory/sys/configure"
"go_dreamfactory/sys/db"
"sync"
"time"
"go.mongodb.org/mongo-driver/bson"
)
// 此组件废弃
type Activity struct {
cbase.ModuleBase
modules.MCompModel
service base.IRPCXService
module *Timer
cTimerObj *timewheel.Task
closeSignal chan struct{}
hlock sync.RWMutex
curActivity map[pb.HdType]*pb.DBHuodong // 正在进行的活动
delActivity map[pb.HdType]*pb.DBHuodong // 已经过期的活动
futureActivity map[pb.HdType]*pb.DBHuodong // 即将要开启的活动过期的活动
}
//组件初始化接口
func (this *Activity) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
this.TableName = comm.TableHdInfo
this.MCompModel.Init(service, module, comp, options)
this.module = module.(*Timer)
this.service = service.(base.IRPCXService)
return
}
func (this *Activity) Stop() (err error) {
this.closeSignal <- struct{}{}
return
}
func (this *Activity) Start() (err error) {
if db.IsCross() {
return
}
if err = this.MCompModel.Start(); err != nil {
return
}
timer := time.NewTicker(time.Second * 1)
this.LoadActivityData()
go func() {
locp:
for {
select {
case <-this.closeSignal:
break locp
case <-timer.C:
this.CheckActivityData()
}
}
timer.Stop()
}()
//cron.AddFunc("0 0 0 ? * MON", this.TimerSeason)
return
}
func (this *Activity) ReLoadActivityData(id string) {
var hd *pb.DBHuodong
if err := this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"_id": id}).Decode(&hd); err != nil {
return
}
if hd.Etime <= configure.Now().Unix() { // 活动结束
this.NotifyActivityOver([]string{hd.Id})
} else if hd.Etime > configure.Now().Unix() && hd.Stime < configure.Now().Unix() {
this.NotifyActivityStart([]string{hd.Id})
}
this.hlock.Lock()
// 删除内存中
if _, ok := this.curActivity[hd.Itype]; ok {
delete(this.curActivity, hd.Itype)
}
if _, ok := this.delActivity[hd.Itype]; ok {
delete(this.delActivity, hd.Itype)
}
if _, ok := this.futureActivity[hd.Itype]; ok {
delete(this.futureActivity, hd.Itype)
}
if hd.Etime <= configure.Now().Unix() { // 活动结束
this.delActivity[hd.Itype] = hd
} else if hd.Etime > configure.Now().Unix() && hd.Stime < configure.Now().Unix() {
this.curActivity[hd.Itype] = hd
} else {
this.futureActivity[hd.Itype] = hd
}
defer this.hlock.Unlock()
}
func (this *Activity) LoadActivityData() {
if c, err := this.DB.Find(core.SqlTable(this.TableName), bson.M{}); err == nil {
this.hlock.Lock()
this.curActivity = make(map[pb.HdType]*pb.DBHuodong)
this.delActivity = make(map[pb.HdType]*pb.DBHuodong)
this.futureActivity = make(map[pb.HdType]*pb.DBHuodong)
defer this.hlock.Unlock()
for c.Next(context.Background()) {
hd := &pb.DBHuodong{}
if err = c.Decode(hd); err != nil {
this.module.Errorf("err:%v", err)
continue
}
if hd.Etime <= configure.Now().Unix() {
this.delActivity[hd.Itype] = hd
} else if hd.Etime > configure.Now().Unix() && hd.Stime < configure.Now().Unix() {
this.curActivity[hd.Itype] = hd
} else {
this.futureActivity[hd.Itype] = hd
}
}
}
}
func (this *Activity) NotifyActivityOver(szEnd []string) {
if err := this.service.RpcCall(
context.Background(),
comm.Service_Worker,
string(comm.Rpc_ActivityOver),
pb.RPCGeneralReqB1{
Param1: "endActivity",
Param2: szEnd,
},
nil,
); err != nil {
this.module.Errorln(err)
}
}
func (this *Activity) NotifyActivityStart(szStart []string) {
if err := this.service.RpcCall(
context.Background(),
comm.Service_Worker,
string(comm.Rpc_ActivityStart),
pb.RPCGeneralReqB1{
Param1: "startActivity",
Param2: szStart,
},
nil,
); err != nil {
this.module.Errorln(err)
}
}
func (this *Activity) CheckActivityData() {
var (
szEnd []string // 活动结束
szStart []string // 有活动开启
)
this.hlock.Lock()
defer this.hlock.Unlock()
for _, v := range this.curActivity {
if v.Etime == configure.Now().Unix() { // 有活动结束
szEnd = append(szEnd, v.Id)
delete(this.curActivity, v.Itype)
this.delActivity[v.Itype] = v
}
}
for _, v := range this.futureActivity {
if v.Stime >= configure.Now().Unix() { // 有活动结束
szStart = append(szStart, v.Id)
delete(this.futureActivity, v.Itype)
this.curActivity[v.Itype] = v
}
}
if len(szEnd) > 0 {
this.NotifyActivityOver(szEnd)
}
if len(szStart) > 0 {
this.NotifyActivityStart(szStart)
}
}