From 500753f1af2cb24c8613838d39c790587810413f Mon Sep 17 00:00:00 2001 From: meixiongfeng <766881921@qq.com> Date: Tue, 21 Jun 2022 14:20:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=AF=E5=8A=A8dbservice=20=E5=85=88?= =?UTF-8?q?=E6=A3=80=E9=AA=8C=E6=97=A5=E5=BF=97=E8=A1=A8=E6=98=AF=E5=90=A6?= =?UTF-8?q?=E6=9C=89=E6=95=B0=E6=8D=AE=E6=B2=A1=E5=86=99=E5=AE=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- comm/core.go | 4 ++++ modules/dbservice/db_comp.go | 30 ++++++++++++++++++++---------- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/comm/core.go b/comm/core.go index 03abb8d1d..d8f072ec5 100644 --- a/comm/core.go +++ b/comm/core.go @@ -57,6 +57,10 @@ const ( Event_UserOffline core.Event_Key = "Event_UserOffline" //用户离线事件 ) +const ( + DBService_Status string = "DBService_status" +) + // 服务网关组件接口定义 type ISC_GateRouteComp interface { core.IServiceComp diff --git a/modules/dbservice/db_comp.go b/modules/dbservice/db_comp.go index 8d91af130..24383fe2f 100644 --- a/modules/dbservice/db_comp.go +++ b/modules/dbservice/db_comp.go @@ -15,19 +15,26 @@ import ( type DB_Comp struct { modules.Model_Comp - task chan string - _data *mongo.Cursor + task chan string + isInit bool } func (this *DB_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { this.Model_Comp.Init(service, module, comp, options) this.task = make(chan string, TaskMaxNum) - this._data = new(mongo.Cursor) + return } func (this *DB_Comp) Start() (err error) { err = this.Model_Comp.Start() + model_count := this.Model_TotalCount() + if model_count > 0 { //1000 + this.Redis.Set(comm.DBService_Status, true, -1) + this.isInit = false + } else { + this.isInit = true + } go this.run() return } @@ -40,6 +47,9 @@ func (this *DB_Comp) run() { case <-time.After(time.Second * 2): this.Model_UpdateDBByLog("") } + if !this.isInit && this.Model_TotalCount() <= 0 { + this.Redis.Delete(comm.DBService_Status) + } } } @@ -48,24 +58,24 @@ func (this *DB_Comp) PushUserTask(uid string) { } func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) { - + var _data *mongo.Cursor if uid == "" { - this._data, err = this.DB.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum))) + _data, err = this.DB.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum))) if err != nil { return err } } else { - this._data, err = this.DB.Find(DB_ModelTable, bson.M{"uid": uid}, options.Find()) + _data, err = this.DB.Find(DB_ModelTable, bson.M{"uid": uid}, options.Find()) if err != nil { return err } } - _delID := make([]string, 0) // 处理完成要删除的id - for this._data.Next(context.TODO()) { // 处理删除逻辑 + _delID := make([]string, 0) // 处理完成要删除的id + for _data.Next(context.TODO()) { // 处理删除逻辑 data := &comm.Autogenerated{} - if err = this._data.Decode(data); err != nil { + if err = _data.Decode(data); err != nil { log.Errorf("Decode Data err : %v", err) continue } @@ -178,7 +188,7 @@ func (this *DB_Comp) Model_InsertDBByLog(data *comm.Autogenerated) (err error) { // 查询 当前日志列表还有没有处理完条数 func (this *DB_Comp) Model_TotalCount() int { - _data, err := this.DB.Find("model_log", bson.M{}) + _data, err := this.DB.Find("DB_ModelTable", bson.M{}) if err == nil { return _data.RemainingBatchLength() }