From 9fd21926aa003acf791dbf7d2e55d61ebabeab00 Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Fri, 16 Sep 2022 14:15:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E7=BD=91=E5=85=B3=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E4=B8=8E=20=20Service=5FMainte=20=20=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E7=AE=80=E7=BB=83=E7=95=8C=E9=9D=A2=EF=BC=8C=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0Service=5FMainte=20=E6=9C=8D=E5=8A=A1=E5=99=A8?= =?UTF-8?q?=E4=B8=BB=E5=8A=A8=E6=8E=A8=E9=80=81=E7=94=A8=E6=88=B7=E6=B6=88?= =?UTF-8?q?=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- comm/const.go | 3 ++ lego/sys/cron/core.go | 11 +++-- lego/sys/cron/cron.go | 12 +++-- lego/sys/rpcx/client.go | 27 ---------- modules/gateway/module.go | 11 ++++- modules/timer/chat.go | 100 ++++++++++++++++++++++++++++++++++++++ modules/timer/forum.go | 2 +- modules/timer/module.go | 82 ++++++++++++++++++++++++++++++- 8 files changed, 208 insertions(+), 40 deletions(-) create mode 100644 modules/timer/chat.go diff --git a/comm/const.go b/comm/const.go index eee2c0575..431b12222 100644 --- a/comm/const.go +++ b/comm/const.go @@ -338,6 +338,9 @@ const ( type ChatSystemType int8 const ( + ///定时公告 + TimedNotice ChatSystemType = 1 + ///用户登录通告 UserLoginNotice ChatSystemType = 2 ///装备强化公告 EquipmentUpgradeNotice ChatSystemType = 3 diff --git a/lego/sys/cron/core.go b/lego/sys/cron/core.go index bcbaca5b0..8e10b05fc 100644 --- a/lego/sys/cron/core.go +++ b/lego/sys/cron/core.go @@ -5,11 +5,12 @@ import ( ) type ( - ISys interface { + EntryID tcron.EntryID + ISys interface { Start() Stop() - AddFunc(spec string, cmd func()) (tcron.EntryID, error) - Remove(id tcron.EntryID) + AddFunc(spec string, cmd func()) (EntryID, error) + Remove(id EntryID) } ) @@ -39,10 +40,10 @@ func Stop() { defsys.Stop() } -func AddFunc(spec string, cmd func()) (tcron.EntryID, error) { +func AddFunc(spec string, cmd func()) (EntryID, error) { return defsys.AddFunc(spec, cmd) } -func Remove(id tcron.EntryID) { +func Remove(id EntryID) { defsys.Remove(id) } diff --git a/lego/sys/cron/cron.go b/lego/sys/cron/cron.go index 07346c9a5..190205835 100644 --- a/lego/sys/cron/cron.go +++ b/lego/sys/cron/cron.go @@ -23,10 +23,14 @@ func (this *Cron) Stop() { this.cron.Stop() } -func (this *Cron) AddFunc(spec string, cmd func()) (tcron.EntryID, error) { - return this.cron.AddFunc(spec, cmd) +func (this *Cron) AddFunc(spec string, cmd func()) (id EntryID, err error) { + var eid tcron.EntryID + if eid, err = this.cron.AddFunc(spec, cmd); err != nil { + id = EntryID(eid) + } + return } -func (this *Cron) Remove(id tcron.EntryID) { - this.cron.Remove(id) +func (this *Cron) Remove(id EntryID) { + this.cron.Remove(tcron.EntryID(id)) } diff --git a/lego/sys/rpcx/client.go b/lego/sys/rpcx/client.go index d7456e734..d54a5c157 100644 --- a/lego/sys/rpcx/client.go +++ b/lego/sys/rpcx/client.go @@ -277,33 +277,6 @@ func (this *Client) UpdateServer(servers map[string]*ServiceNode) { } else { this.options.Log.Debugf("UpdateServer addr:%s ", v.ServiceAddr) } - - // this.connsMapMu.RLock() - // _, ok := this.conns[v.ServiceAddr] - // this.connsMapMu.RUnlock() - // if !ok { - // this.connectMapMu.RLock() - // _, ok := this.connecting[v.ServiceAddr] - // this.connectMapMu.RUnlock() - // if !ok { - // this.connectMapMu.Lock() - // this.connecting[v.ServiceAddr] = struct{}{} - // this.connectMapMu.Unlock() - // if err := this.Call(context.Background(), fmt.Sprintf("%s/%s", v.ServiceType, v.ServiceId), RpcX_ShakeHands, &ServiceNode{ - // ServiceTag: this.options.ServiceTag, - // ServiceId: this.options.ServiceId, - // ServiceType: this.options.ServiceType, - // ServiceAddr: this.options.ServiceAddr}, - // &ServiceNode{}); err != nil { - // this.options.Log.Errorf("ShakeHands new node addr:%s err:%v", v.ServiceAddr, err) - // this.connectMapMu.Lock() - // delete(this.connecting, v.ServiceAddr) - // this.connectMapMu.Unlock() - // } else { - // this.options.Log.Debugf("UpdateServer addr:%s ", v.ServiceAddr) - // } - // } - // } } } diff --git a/modules/gateway/module.go b/modules/gateway/module.go index 906cffeb2..5b3c9dda5 100644 --- a/modules/gateway/module.go +++ b/modules/gateway/module.go @@ -1,12 +1,15 @@ package gateway import ( + "context" + "fmt" "go_dreamfactory/comm" "go_dreamfactory/lego/base" "go_dreamfactory/lego/core" "go_dreamfactory/lego/core/cbase" "go_dreamfactory/lego/sys/log" + "go_dreamfactory/lego/sys/rpcx" ) /* @@ -77,8 +80,14 @@ func (this *Gateway) Start() (err error) { this.service.RegisterFunctionName(name, fn) } err = this.ModuleBase.Start() + //没有建立客户端 主动发起握手 + this.service.RpcGo(context.Background(), comm.Service_Mainte, rpcx.RpcX_ShakeHands, &rpcx.ServiceNode{ + ServiceTag: this.service.GetTag(), + ServiceId: this.service.GetId(), + ServiceType: this.service.GetType(), + ServiceAddr: fmt.Sprintf("tcp@%s:%d", this.service.GetIp(), this.service.GetPort())}, + nil) return - } // OnInstallComp 装备组件 diff --git a/modules/timer/chat.go b/modules/timer/chat.go new file mode 100644 index 000000000..81452a250 --- /dev/null +++ b/modules/timer/chat.go @@ -0,0 +1,100 @@ +package timer + +import ( + "context" + "fmt" + "go_dreamfactory/comm" + "go_dreamfactory/modules" + "go_dreamfactory/pb" + "go_dreamfactory/sys/configure" + cfg "go_dreamfactory/sys/configure/structs" + "time" + + "go_dreamfactory/lego/core" + "go_dreamfactory/lego/sys/cron" + + "google.golang.org/protobuf/types/known/anypb" +) + +var ( + game_chatsystem = "game_chatsystem.json" +) + +/* +聊天系统 推送系统公告 +*/ +type ChatComp struct { + modules.MCompConfigure + service core.IService + module *Timer + takes []cron.EntryID +} + +//组件初始化接口 +func (this *ChatComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { + this.MCompConfigure.Init(service, module, comp, options) + this.service = service + this.module = module.(*Timer) + this.takes = make([]cron.EntryID, 0) + return +} + +func (this *ChatComp) Start() (err error) { + err = this.MCompConfigure.Start() + configure.RegisterConfigure(game_chatsystem, cfg.NewGameChatSystem, func() { + for _, v := range this.takes { //移除前面的定时任务 + cron.Remove(v) + } + + if v, err := this.GetConfigure(game_chatsystem); err != nil { + this.module.Errorf("err:%v", err) + return + } else { + var id cron.EntryID + this.takes = make([]cron.EntryID, 0) + for _, v := range v.(*cfg.GameChatSystem).GetDataList() { + if v.Type == int32(comm.TimedNotice) { //处理定时任务 + weekStr := "" + for _, w := range v.Week { + weekStr += fmt.Sprintf("%d,", w) + } + if len(weekStr) > 0 && len(weekStr) < 7 { + weekStr = weekStr[0 : len(weekStr)-1] + } else { + weekStr = "*" + } + cronStr := fmt.Sprintf("* %d %d * * %s", v.TimeM, v.TimeH, weekStr) + if id, err = cron.AddFunc(cronStr, this.chatNoticen(v)); err != nil { + this.module.Errorf("cron.AddFunc:%s err:%v", cronStr, err) + continue + } + this.takes = append(this.takes, id) + } + } + //测试代码 + // if id, err = cron.AddFunc("0 */1 * * * ?", this.chatNoticen(&cfg.GameChatSystemData{Text: "测试公告系统"})); err != nil { + // this.module.Errorf("cron.AddFunc:%s err:%v", "0 */1 * * * ?", err) + // } + } + }) + return +} + +func (this *ChatComp) chatNoticen(n *cfg.GameChatSystemData) func() { + return func() { + msg := &pb.DBChat{ + Channel: pb.ChatChannel_System, + Stag: this.service.GetTag(), + Ctime: time.Now().Unix(), + Content: n.Text, + } + data, _ := anypb.New(&pb.ChatMessagePush{Chat: msg}) + if err := this.module.service.AcrossClusterBroadcast(context.Background(), msg.Stag, comm.Service_Gateway, string(comm.Rpc_GatewaySendRadioMsg), pb.UserMessage{ + MainType: string(comm.ModuleChat), + SubType: "message", + Data: data, + }, nil); err != nil { + this.module.Errorf("err:%v", err) + } + } +} diff --git a/modules/timer/forum.go b/modules/timer/forum.go index 83969e08d..e6be0086a 100644 --- a/modules/timer/forum.go +++ b/modules/timer/forum.go @@ -24,7 +24,7 @@ func (this *forumComp) Init(service core.IService, module core.IModule, comp cor return } -func (this *forumComp) Start(timeSec int32) (err error) { +func (this *forumComp) Start() (err error) { err = this.MCompModel.Start() //cron.AddFunc("*/5 * * * * ?", this.Timer) //每五秒执行一次 return diff --git a/modules/timer/module.go b/modules/timer/module.go index 6f9040126..92c34bc2b 100644 --- a/modules/timer/module.go +++ b/modules/timer/module.go @@ -5,7 +5,7 @@ import ( "go_dreamfactory/lego/base" "go_dreamfactory/lego/core" "go_dreamfactory/lego/core/cbase" - "go_dreamfactory/modules" + "go_dreamfactory/lego/sys/log" ) /* @@ -21,11 +21,12 @@ func NewModule() core.IModule { type Timer struct { cbase.ModuleBase - modules.MCompModel + options *Options service base.IRPCXService //rpc服务对象 通过这个对象可以发布服务和调用其他服务的接口 rank *PagodaRank rank2 *VikingRank rank3 *HuntingRank + chat *ChatComp //俩天系统定时任务 } //模块名 @@ -42,6 +43,7 @@ func (this *Timer) NewOptions() (options core.IModuleOptions) { func (this *Timer) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) { err = this.ModuleBase.Init(service, module, options) this.service = service.(base.IRPCXService) + this.options = options.(*Options) return } @@ -56,4 +58,80 @@ func (this *Timer) OnInstallComp() { this.rank = this.RegisterComp(new(PagodaRank)).(*PagodaRank) this.rank2 = this.RegisterComp(new(VikingRank)).(*VikingRank) this.rank3 = this.RegisterComp(new(HuntingRank)).(*HuntingRank) + this.chat = this.RegisterComp(new(ChatComp)).(*ChatComp) +} + +//日志 +func (this *Timer) Enabled(lvl log.Loglevel) bool { + return this.options.GetLog().Enabled(lvl) +} +func (this *Timer) SetName(name string) { + this.options.GetLog().SetName(name) +} + +//日志接口 +func (this *Timer) Debug(msg string, args ...log.Field) { + this.options.GetLog().Debug(msg, args...) +} +func (this *Timer) Info(msg string, args ...log.Field) { + this.options.GetLog().Info(msg, args...) +} +func (this *Timer) Print(msg string, args ...log.Field) { + this.options.GetLog().Print(msg, args...) +} +func (this *Timer) Warn(msg string, args ...log.Field) { + this.options.GetLog().Warn(msg, args...) +} +func (this *Timer) Error(msg string, args ...log.Field) { + this.options.GetLog().Error(msg, args...) +} +func (this *Timer) Panic(msg string, args ...log.Field) { + this.options.GetLog().Panic(msg, args...) +} +func (this *Timer) Fatal(msg string, args ...log.Field) { + this.options.GetLog().Fatal(msg, args...) +} + +func (this *Timer) Debugf(format string, args ...interface{}) { + this.options.GetLog().Debugf(format, args...) +} +func (this *Timer) Infof(format string, args ...interface{}) { + this.options.GetLog().Infof(format, args...) +} +func (this *Timer) Printf(format string, args ...interface{}) { + this.options.GetLog().Printf(format, args...) +} +func (this *Timer) Warnf(format string, args ...interface{}) { + this.options.GetLog().Warnf(format, args...) +} +func (this *Timer) Errorf(format string, args ...interface{}) { + this.options.GetLog().Errorf(format, args...) +} +func (this *Timer) Fatalf(format string, args ...interface{}) { + this.options.GetLog().Fatalf(format, args...) +} +func (this *Timer) Panicf(format string, args ...interface{}) { + this.options.GetLog().Panicf(format, args...) +} + +func (this *Timer) Debugln(args ...interface{}) { + this.options.GetLog().Debugln(args...) +} +func (this *Timer) Infoln(args ...interface{}) { + this.options.GetLog().Infoln(args...) +} +func (this *Timer) Println(args ...interface{}) { + this.options.GetLog().Println(args...) +} +func (this *Timer) Warnln(args ...interface{}) { + this.options.GetLog().Warnln(args...) +} +func (this *Timer) Errorln(args ...interface{}) { + this.options.GetLog().Errorln(args...) +} +func (this *Timer) Fatalln(args ...interface{}) { + this.options.GetLog().Fatalln(args...) +} +func (this *Timer) Panicln(args ...interface{}) { + this.options.GetLog().Panicln(args...) }