上传网关服务与 Service_Mainte 服务简练界面,实现Service_Mainte 服务器主动推送用户消息

This commit is contained in:
liwei1dao 2022-09-16 14:15:41 +08:00
parent c56138e9dc
commit 9fd21926aa
8 changed files with 208 additions and 40 deletions

View File

@ -338,6 +338,9 @@ const (
type ChatSystemType int8 type ChatSystemType int8
const ( const (
///定时公告
TimedNotice ChatSystemType = 1
///用户登录通告
UserLoginNotice ChatSystemType = 2 UserLoginNotice ChatSystemType = 2
///装备强化公告 ///装备强化公告
EquipmentUpgradeNotice ChatSystemType = 3 EquipmentUpgradeNotice ChatSystemType = 3

View File

@ -5,11 +5,12 @@ import (
) )
type ( type (
ISys interface { EntryID tcron.EntryID
ISys interface {
Start() Start()
Stop() Stop()
AddFunc(spec string, cmd func()) (tcron.EntryID, error) AddFunc(spec string, cmd func()) (EntryID, error)
Remove(id tcron.EntryID) Remove(id EntryID)
} }
) )
@ -39,10 +40,10 @@ func Stop() {
defsys.Stop() defsys.Stop()
} }
func AddFunc(spec string, cmd func()) (tcron.EntryID, error) { func AddFunc(spec string, cmd func()) (EntryID, error) {
return defsys.AddFunc(spec, cmd) return defsys.AddFunc(spec, cmd)
} }
func Remove(id tcron.EntryID) { func Remove(id EntryID) {
defsys.Remove(id) defsys.Remove(id)
} }

View File

@ -23,10 +23,14 @@ func (this *Cron) Stop() {
this.cron.Stop() this.cron.Stop()
} }
func (this *Cron) AddFunc(spec string, cmd func()) (tcron.EntryID, error) { func (this *Cron) AddFunc(spec string, cmd func()) (id EntryID, err error) {
return this.cron.AddFunc(spec, cmd) var eid tcron.EntryID
if eid, err = this.cron.AddFunc(spec, cmd); err != nil {
id = EntryID(eid)
}
return
} }
func (this *Cron) Remove(id tcron.EntryID) { func (this *Cron) Remove(id EntryID) {
this.cron.Remove(id) this.cron.Remove(tcron.EntryID(id))
} }

View File

@ -277,33 +277,6 @@ func (this *Client) UpdateServer(servers map[string]*ServiceNode) {
} else { } else {
this.options.Log.Debugf("UpdateServer addr:%s ", v.ServiceAddr) this.options.Log.Debugf("UpdateServer addr:%s ", v.ServiceAddr)
} }
// this.connsMapMu.RLock()
// _, ok := this.conns[v.ServiceAddr]
// this.connsMapMu.RUnlock()
// if !ok {
// this.connectMapMu.RLock()
// _, ok := this.connecting[v.ServiceAddr]
// this.connectMapMu.RUnlock()
// if !ok {
// this.connectMapMu.Lock()
// this.connecting[v.ServiceAddr] = struct{}{}
// this.connectMapMu.Unlock()
// if err := this.Call(context.Background(), fmt.Sprintf("%s/%s", v.ServiceType, v.ServiceId), RpcX_ShakeHands, &ServiceNode{
// ServiceTag: this.options.ServiceTag,
// ServiceId: this.options.ServiceId,
// ServiceType: this.options.ServiceType,
// ServiceAddr: this.options.ServiceAddr},
// &ServiceNode{}); err != nil {
// this.options.Log.Errorf("ShakeHands new node addr:%s err:%v", v.ServiceAddr, err)
// this.connectMapMu.Lock()
// delete(this.connecting, v.ServiceAddr)
// this.connectMapMu.Unlock()
// } else {
// this.options.Log.Debugf("UpdateServer addr:%s ", v.ServiceAddr)
// }
// }
// }
} }
} }

View File

@ -1,12 +1,15 @@
package gateway package gateway
import ( import (
"context"
"fmt"
"go_dreamfactory/comm" "go_dreamfactory/comm"
"go_dreamfactory/lego/base" "go_dreamfactory/lego/base"
"go_dreamfactory/lego/core" "go_dreamfactory/lego/core"
"go_dreamfactory/lego/core/cbase" "go_dreamfactory/lego/core/cbase"
"go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/sys/log"
"go_dreamfactory/lego/sys/rpcx"
) )
/* /*
@ -77,8 +80,14 @@ func (this *Gateway) Start() (err error) {
this.service.RegisterFunctionName(name, fn) this.service.RegisterFunctionName(name, fn)
} }
err = this.ModuleBase.Start() err = this.ModuleBase.Start()
//没有建立客户端 主动发起握手
this.service.RpcGo(context.Background(), comm.Service_Mainte, rpcx.RpcX_ShakeHands, &rpcx.ServiceNode{
ServiceTag: this.service.GetTag(),
ServiceId: this.service.GetId(),
ServiceType: this.service.GetType(),
ServiceAddr: fmt.Sprintf("tcp@%s:%d", this.service.GetIp(), this.service.GetPort())},
nil)
return return
} }
// OnInstallComp 装备组件 // OnInstallComp 装备组件

100
modules/timer/chat.go Normal file
View File

@ -0,0 +1,100 @@
package timer
import (
"context"
"fmt"
"go_dreamfactory/comm"
"go_dreamfactory/modules"
"go_dreamfactory/pb"
"go_dreamfactory/sys/configure"
cfg "go_dreamfactory/sys/configure/structs"
"time"
"go_dreamfactory/lego/core"
"go_dreamfactory/lego/sys/cron"
"google.golang.org/protobuf/types/known/anypb"
)
var (
game_chatsystem = "game_chatsystem.json"
)
/*
聊天系统 推送系统公告
*/
type ChatComp struct {
modules.MCompConfigure
service core.IService
module *Timer
takes []cron.EntryID
}
//组件初始化接口
func (this *ChatComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
this.MCompConfigure.Init(service, module, comp, options)
this.service = service
this.module = module.(*Timer)
this.takes = make([]cron.EntryID, 0)
return
}
func (this *ChatComp) Start() (err error) {
err = this.MCompConfigure.Start()
configure.RegisterConfigure(game_chatsystem, cfg.NewGameChatSystem, func() {
for _, v := range this.takes { //移除前面的定时任务
cron.Remove(v)
}
if v, err := this.GetConfigure(game_chatsystem); err != nil {
this.module.Errorf("err:%v", err)
return
} else {
var id cron.EntryID
this.takes = make([]cron.EntryID, 0)
for _, v := range v.(*cfg.GameChatSystem).GetDataList() {
if v.Type == int32(comm.TimedNotice) { //处理定时任务
weekStr := ""
for _, w := range v.Week {
weekStr += fmt.Sprintf("%d,", w)
}
if len(weekStr) > 0 && len(weekStr) < 7 {
weekStr = weekStr[0 : len(weekStr)-1]
} else {
weekStr = "*"
}
cronStr := fmt.Sprintf("* %d %d * * %s", v.TimeM, v.TimeH, weekStr)
if id, err = cron.AddFunc(cronStr, this.chatNoticen(v)); err != nil {
this.module.Errorf("cron.AddFunc:%s err:%v", cronStr, err)
continue
}
this.takes = append(this.takes, id)
}
}
//测试代码
// if id, err = cron.AddFunc("0 */1 * * * ?", this.chatNoticen(&cfg.GameChatSystemData{Text: "测试公告系统"})); err != nil {
// this.module.Errorf("cron.AddFunc:%s err:%v", "0 */1 * * * ?", err)
// }
}
})
return
}
func (this *ChatComp) chatNoticen(n *cfg.GameChatSystemData) func() {
return func() {
msg := &pb.DBChat{
Channel: pb.ChatChannel_System,
Stag: this.service.GetTag(),
Ctime: time.Now().Unix(),
Content: n.Text,
}
data, _ := anypb.New(&pb.ChatMessagePush{Chat: msg})
if err := this.module.service.AcrossClusterBroadcast(context.Background(), msg.Stag, comm.Service_Gateway, string(comm.Rpc_GatewaySendRadioMsg), pb.UserMessage{
MainType: string(comm.ModuleChat),
SubType: "message",
Data: data,
}, nil); err != nil {
this.module.Errorf("err:%v", err)
}
}
}

View File

@ -24,7 +24,7 @@ func (this *forumComp) Init(service core.IService, module core.IModule, comp cor
return return
} }
func (this *forumComp) Start(timeSec int32) (err error) { func (this *forumComp) Start() (err error) {
err = this.MCompModel.Start() err = this.MCompModel.Start()
//cron.AddFunc("*/5 * * * * ?", this.Timer) //每五秒执行一次 //cron.AddFunc("*/5 * * * * ?", this.Timer) //每五秒执行一次
return return

View File

@ -5,7 +5,7 @@ import (
"go_dreamfactory/lego/base" "go_dreamfactory/lego/base"
"go_dreamfactory/lego/core" "go_dreamfactory/lego/core"
"go_dreamfactory/lego/core/cbase" "go_dreamfactory/lego/core/cbase"
"go_dreamfactory/modules" "go_dreamfactory/lego/sys/log"
) )
/* /*
@ -21,11 +21,12 @@ func NewModule() core.IModule {
type Timer struct { type Timer struct {
cbase.ModuleBase cbase.ModuleBase
modules.MCompModel options *Options
service base.IRPCXService //rpc服务对象 通过这个对象可以发布服务和调用其他服务的接口 service base.IRPCXService //rpc服务对象 通过这个对象可以发布服务和调用其他服务的接口
rank *PagodaRank rank *PagodaRank
rank2 *VikingRank rank2 *VikingRank
rank3 *HuntingRank rank3 *HuntingRank
chat *ChatComp //俩天系统定时任务
} }
//模块名 //模块名
@ -42,6 +43,7 @@ func (this *Timer) NewOptions() (options core.IModuleOptions) {
func (this *Timer) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) { func (this *Timer) Init(service core.IService, module core.IModule, options core.IModuleOptions) (err error) {
err = this.ModuleBase.Init(service, module, options) err = this.ModuleBase.Init(service, module, options)
this.service = service.(base.IRPCXService) this.service = service.(base.IRPCXService)
this.options = options.(*Options)
return return
} }
@ -56,4 +58,80 @@ func (this *Timer) OnInstallComp() {
this.rank = this.RegisterComp(new(PagodaRank)).(*PagodaRank) this.rank = this.RegisterComp(new(PagodaRank)).(*PagodaRank)
this.rank2 = this.RegisterComp(new(VikingRank)).(*VikingRank) this.rank2 = this.RegisterComp(new(VikingRank)).(*VikingRank)
this.rank3 = this.RegisterComp(new(HuntingRank)).(*HuntingRank) this.rank3 = this.RegisterComp(new(HuntingRank)).(*HuntingRank)
this.chat = this.RegisterComp(new(ChatComp)).(*ChatComp)
}
//日志
func (this *Timer) Enabled(lvl log.Loglevel) bool {
return this.options.GetLog().Enabled(lvl)
}
func (this *Timer) SetName(name string) {
this.options.GetLog().SetName(name)
}
//日志接口
func (this *Timer) Debug(msg string, args ...log.Field) {
this.options.GetLog().Debug(msg, args...)
}
func (this *Timer) Info(msg string, args ...log.Field) {
this.options.GetLog().Info(msg, args...)
}
func (this *Timer) Print(msg string, args ...log.Field) {
this.options.GetLog().Print(msg, args...)
}
func (this *Timer) Warn(msg string, args ...log.Field) {
this.options.GetLog().Warn(msg, args...)
}
func (this *Timer) Error(msg string, args ...log.Field) {
this.options.GetLog().Error(msg, args...)
}
func (this *Timer) Panic(msg string, args ...log.Field) {
this.options.GetLog().Panic(msg, args...)
}
func (this *Timer) Fatal(msg string, args ...log.Field) {
this.options.GetLog().Fatal(msg, args...)
}
func (this *Timer) Debugf(format string, args ...interface{}) {
this.options.GetLog().Debugf(format, args...)
}
func (this *Timer) Infof(format string, args ...interface{}) {
this.options.GetLog().Infof(format, args...)
}
func (this *Timer) Printf(format string, args ...interface{}) {
this.options.GetLog().Printf(format, args...)
}
func (this *Timer) Warnf(format string, args ...interface{}) {
this.options.GetLog().Warnf(format, args...)
}
func (this *Timer) Errorf(format string, args ...interface{}) {
this.options.GetLog().Errorf(format, args...)
}
func (this *Timer) Fatalf(format string, args ...interface{}) {
this.options.GetLog().Fatalf(format, args...)
}
func (this *Timer) Panicf(format string, args ...interface{}) {
this.options.GetLog().Panicf(format, args...)
}
func (this *Timer) Debugln(args ...interface{}) {
this.options.GetLog().Debugln(args...)
}
func (this *Timer) Infoln(args ...interface{}) {
this.options.GetLog().Infoln(args...)
}
func (this *Timer) Println(args ...interface{}) {
this.options.GetLog().Println(args...)
}
func (this *Timer) Warnln(args ...interface{}) {
this.options.GetLog().Warnln(args...)
}
func (this *Timer) Errorln(args ...interface{}) {
this.options.GetLog().Errorln(args...)
}
func (this *Timer) Fatalln(args ...interface{}) {
this.options.GetLog().Fatalln(args...)
}
func (this *Timer) Panicln(args ...interface{}) {
this.options.GetLog().Panicln(args...)
} }