启动dbservice 先检验日志表是否有数据没写完

This commit is contained in:
meixiongfeng 2022-06-21 14:20:56 +08:00
parent aa426a1328
commit 500753f1af
2 changed files with 24 additions and 10 deletions

View File

@ -57,6 +57,10 @@ const (
Event_UserOffline core.Event_Key = "Event_UserOffline" //用户离线事件 Event_UserOffline core.Event_Key = "Event_UserOffline" //用户离线事件
) )
const (
DBService_Status string = "DBService_status"
)
// 服务网关组件接口定义 // 服务网关组件接口定义
type ISC_GateRouteComp interface { type ISC_GateRouteComp interface {
core.IServiceComp core.IServiceComp

View File

@ -15,19 +15,26 @@ import (
type DB_Comp struct { type DB_Comp struct {
modules.Model_Comp modules.Model_Comp
task chan string task chan string
_data *mongo.Cursor isInit bool
} }
func (this *DB_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { func (this *DB_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
this.Model_Comp.Init(service, module, comp, options) this.Model_Comp.Init(service, module, comp, options)
this.task = make(chan string, TaskMaxNum) this.task = make(chan string, TaskMaxNum)
this._data = new(mongo.Cursor)
return return
} }
func (this *DB_Comp) Start() (err error) { func (this *DB_Comp) Start() (err error) {
err = this.Model_Comp.Start() err = this.Model_Comp.Start()
model_count := this.Model_TotalCount()
if model_count > 0 { //1000
this.Redis.Set(comm.DBService_Status, true, -1)
this.isInit = false
} else {
this.isInit = true
}
go this.run() go this.run()
return return
} }
@ -40,6 +47,9 @@ func (this *DB_Comp) run() {
case <-time.After(time.Second * 2): case <-time.After(time.Second * 2):
this.Model_UpdateDBByLog("") this.Model_UpdateDBByLog("")
} }
if !this.isInit && this.Model_TotalCount() <= 0 {
this.Redis.Delete(comm.DBService_Status)
}
} }
} }
@ -48,24 +58,24 @@ func (this *DB_Comp) PushUserTask(uid string) {
} }
func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) { func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) {
var _data *mongo.Cursor
if uid == "" { if uid == "" {
this._data, err = this.DB.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum))) _data, err = this.DB.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum)))
if err != nil { if err != nil {
return err return err
} }
} else { } else {
this._data, err = this.DB.Find(DB_ModelTable, bson.M{"uid": uid}, options.Find()) _data, err = this.DB.Find(DB_ModelTable, bson.M{"uid": uid}, options.Find())
if err != nil { if err != nil {
return err return err
} }
} }
_delID := make([]string, 0) // 处理完成要删除的id _delID := make([]string, 0) // 处理完成要删除的id
for this._data.Next(context.TODO()) { // 处理删除逻辑 for _data.Next(context.TODO()) { // 处理删除逻辑
data := &comm.Autogenerated{} data := &comm.Autogenerated{}
if err = this._data.Decode(data); err != nil { if err = _data.Decode(data); err != nil {
log.Errorf("Decode Data err : %v", err) log.Errorf("Decode Data err : %v", err)
continue continue
} }
@ -178,7 +188,7 @@ func (this *DB_Comp) Model_InsertDBByLog(data *comm.Autogenerated) (err error) {
// 查询 当前日志列表还有没有处理完条数 // 查询 当前日志列表还有没有处理完条数
func (this *DB_Comp) Model_TotalCount() int { func (this *DB_Comp) Model_TotalCount() int {
_data, err := this.DB.Find("model_log", bson.M{}) _data, err := this.DB.Find("DB_ModelTable", bson.M{})
if err == nil { if err == nil {
return _data.RemainingBatchLength() return _data.RemainingBatchLength()
} }