go_dreamfactory/modules/timer/activity.go
2023-09-28 11:18:29 +08:00

176 lines
3.8 KiB
Go

package timer
import (
"context"
"go_dreamfactory/comm"
"go_dreamfactory/lego/base"
"go_dreamfactory/lego/core"
"go_dreamfactory/lego/core/cbase"
"go_dreamfactory/lego/sys/timewheel"
"go_dreamfactory/modules"
"go_dreamfactory/pb"
"go_dreamfactory/sys/configure"
"go_dreamfactory/sys/db"
"sync"
"time"
"go.mongodb.org/mongo-driver/bson"
)
// 此组件废弃
type Activity struct {
cbase.ModuleBase
modules.MCompModel
service base.IRPCXService
module *Timer
cTimerObj *timewheel.Task
closeSignal chan struct{}
hlock sync.RWMutex
activity map[pb.HdType]*pb.DBHuodong
}
//组件初始化接口
func (this *Activity) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
this.TableName = comm.TableHdInfo
this.MCompModel.Init(service, module, comp, options)
this.module = module.(*Timer)
this.service = service.(base.IRPCXService)
return
}
func (this *Activity) Stop() (err error) {
this.closeSignal <- struct{}{}
return
}
func (this *Activity) Start() (err error) {
if db.IsCross() {
return
}
if err = this.MCompModel.Start(); err != nil {
return
}
timer := time.NewTicker(time.Second * 1)
this.LoadActivityData("")
go func() {
locp:
for {
select {
case <-this.closeSignal:
break locp
case <-timer.C:
this.CheckActivityData()
}
}
timer.Stop()
}()
//cron.AddFunc("0 0 0 ? * MON", this.TimerSeason)
return
}
func (this *Activity) LoadActivityData(id string) {
if id == "" { // 查所有的
if c, err := this.DB.Find(core.SqlTable(this.TableName), bson.M{}); err != nil {
return
} else {
this.hlock.Lock()
defer this.hlock.Unlock()
this.activity = make(map[pb.HdType]*pb.DBHuodong)
for c.Next(context.Background()) {
hd := &pb.DBHuodong{}
if err = c.Decode(hd); err != nil {
this.module.Errorf("err:%v", err)
continue
}
this.activity[hd.Itype] = hd
}
}
} else {
var hd *pb.DBHuodong
this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"_id": id}).Decode(&hd)
this.hlock.Lock()
if this.activity == nil {
this.activity = make(map[pb.HdType]*pb.DBHuodong)
}
this.activity[hd.Itype] = hd
this.hlock.Unlock()
// rpc 通知
if configure.Now().Unix() <= hd.Etime && configure.Now().Unix() >= hd.Stime {
if err := this.service.RpcCall(
context.Background(),
comm.Service_Worker,
string(comm.Rpc_ActivityStar),
pb.RPCGeneralReqB1{
Param1: "starActivity",
Param2: []string{hd.Id},
},
nil,
); err != nil {
this.module.Errorln(err)
}
}
if configure.Now().Unix() > hd.Etime {
if err := this.service.RpcCall(
context.Background(),
comm.Service_Worker,
string(comm.Rpc_ActivityOver),
pb.RPCGeneralReqB1{
Param1: "endActivity",
Param2: []string{hd.Id},
},
nil,
); err != nil {
this.module.Errorln(err)
}
}
}
}
func (this *Activity) CheckActivityData() {
var (
szEnd []string // 活动结束
szStar []string // 有活动开启
)
this.hlock.RLock()
for _, v := range this.activity {
if v.Etime == configure.Now().Unix() { // 有活动结束
szEnd = append(szEnd, v.Id)
}
if v.Stime == configure.Now().Unix() { // 有活动结束
szStar = append(szStar, v.Id)
}
}
this.hlock.RUnlock()
if len(szEnd) > 0 {
if err := this.service.RpcCall(
context.Background(),
comm.Service_Worker,
string(comm.Rpc_ActivityOver),
pb.RPCGeneralReqB1{
Param1: "endActivity",
Param2: szEnd,
},
nil,
); err != nil {
this.module.Errorln(err)
}
}
if len(szStar) > 0 {
if err := this.service.RpcCall(
context.Background(),
comm.Service_Worker,
string(comm.Rpc_ActivityStar),
pb.RPCGeneralReqB1{
Param1: "starActivity",
Param2: szStar,
},
nil,
); err != nil {
this.module.Errorln(err)
}
}
}