From d9a5fbe5bacf52de06fa35d66b0e82173528e495 Mon Sep 17 00:00:00 2001 From: meixiongfeng <766881921@qq.com> Date: Wed, 15 Jun 2022 18:37:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8A=A5=E9=94=99=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/dbservice/db_comp.go | 32 ++++++++++------- services/dbservice/main.go | 67 ++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 12 deletions(-) create mode 100644 services/dbservice/main.go diff --git a/modules/dbservice/db_comp.go b/modules/dbservice/db_comp.go index 1e8851682..eab23f4cc 100644 --- a/modules/dbservice/db_comp.go +++ b/modules/dbservice/db_comp.go @@ -15,12 +15,14 @@ import ( type DB_Comp struct { modules.Model_Comp - task chan string + task chan string + _data *mongo.Cursor } func (this *DB_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { this.Model_Comp.Init(service, module, comp, options) this.task = make(chan string, TaskMaxNum) + this._data = new(mongo.Cursor) return } @@ -46,24 +48,24 @@ func (this *DB_Comp) PushUserTask(uid string) { } func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) { - _data := &mongo.Cursor{} + if uid == "" { - _data, err = this.DB.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum))) + this._data, err = this.DB.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum))) if err != nil { return err } } else { - _data, err = this.DB.Find(DB_ModelTable, bson.M{"uid": uid}, options.Find()) + this._data, err = this.DB.Find(DB_ModelTable, bson.M{"uid": uid}, options.Find()) if err != nil { return err } } - _delID := make([]string, 0) // 处理完成要删除的id - for _data.Next(context.TODO()) { // 处理删除逻辑 + _delID := make([]string, 0) // 处理完成要删除的id + for this._data.Next(context.TODO()) { // 处理删除逻辑 data := &comm.Autogenerated{} - if err = _data.Decode(data); err != nil { + if err = this._data.Decode(data); err != nil { log.Errorf("Decode Data err : %v", err) continue } @@ -126,7 +128,10 @@ func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) { } _key := data.D[0].(string) where := data.D[1].(bson.D) - _obj := &QueryStruct{} + _obj := &QueryStruct{ + Selector: make(map[string]interface{}), + Query: make(map[string]interface{}), + } for _, v := range where { _obj.Selector[v.Key] = v } @@ -150,11 +155,14 @@ func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) { } _delID = append(_delID, data.ID) // 都操作都成功了记录要删除的key } - // 批量删除已处理的数据 - _, err = this.DB.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$in": _delID}}, options.Delete()) - if err != nil { - log.Errorf("del err %v", err) + + if len(_delID) > 0 { + _, err = this.DB.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$in": _delID}}, options.Delete()) // 批量删除已处理的数据 + if err != nil { + log.Errorf("del err %v", err) + } } + return } diff --git a/services/dbservice/main.go b/services/dbservice/main.go new file mode 100644 index 000000000..84e957a20 --- /dev/null +++ b/services/dbservice/main.go @@ -0,0 +1,67 @@ +package main + +import ( + "flag" + "fmt" + "go_dreamfactory/modules/dbservice" + "go_dreamfactory/services" + "go_dreamfactory/sys/cache" + "go_dreamfactory/sys/db" + + "go_dreamfactory/lego" + "go_dreamfactory/lego/base/rpcx" + "go_dreamfactory/lego/core" + "go_dreamfactory/lego/sys/log" +) + +/* + 服务类型:dbservice + 服务描述:处理梦工厂的具体业务需求,包含 user,pack,mail,friend...功能业务模块 +*/ +var ( + conf = flag.String("conf", "./conf/dbservice.yaml", "获取需要启动的服务配置文件") //启动服务的Id +) + +/*服务启动的入口函数*/ +func main() { + flag.Parse() + s := NewService( + rpcx.SetConfPath(*conf), + rpcx.SetVersion("1.0.0.0"), + ) + s.OnInstallComp( //装备组件 + //services.NewGateRouteComp(), //此服务需要接受用户的消息 需要装备网关组件 + ) + lego.Run(s, //运行模块 + dbservice.NewModule(), + ) + +} + +func NewService(ops ...rpcx.Option) core.IService { + s := new(Service) + s.Configure(ops...) + return s +} + +//worker 的服务对象定义 +type Service struct { + services.ServiceBase +} + +//初始化worker需要的一些系统工具 +func (this *Service) InitSys() { + this.ServiceBase.InitSys() + //缓存系统 + if err := cache.OnInit(this.GetSettings().Sys["cache"]); err != nil { + panic(fmt.Sprintf("init sys.cache err: %s", err.Error())) + } else { + log.Infof("init sys.cache success!") + } + //存储系统 + if err := db.OnInit(this.GetSettings().Sys["db"]); err != nil { + panic(fmt.Sprintf("init sys.db err: %s", err.Error())) + } else { + log.Infof("init sys.db success!") + } +}