From 7737d3be7814bd5df735a5c16c07885000c7ae32 Mon Sep 17 00:00:00 2001 From: meixiongfeng <766881921@qq.com> Date: Tue, 14 Jun 2022 15:26:47 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E7=BB=93=E6=9E=84=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/model/db_comp.go | 4 ++-- modules/model/dbservice_comp.go | 10 +++++----- modules/{ => model}/mail_test.go | 0 sys/cache/init_test.go | 33 ++++++++++++++++++++++++++++++++ 4 files changed, 40 insertions(+), 7 deletions(-) rename modules/{ => model}/mail_test.go (100%) diff --git a/modules/model/db_comp.go b/modules/model/db_comp.go index a6f37b8d6..083e8fbba 100644 --- a/modules/model/db_comp.go +++ b/modules/model/db_comp.go @@ -5,9 +5,9 @@ import ( "errors" "go_dreamfactory/comm" "go_dreamfactory/lego/core" - "go_dreamfactory/lego/core/cbase" "go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/sys/mgo" + "go_dreamfactory/modules" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" @@ -18,7 +18,7 @@ const ( ) type DB_Comp struct { - cbase.ModuleCompBase + modules.MComp_DBComp mgo mgo.ISys } diff --git a/modules/model/dbservice_comp.go b/modules/model/dbservice_comp.go index ade6efae5..cfaa543b4 100644 --- a/modules/model/dbservice_comp.go +++ b/modules/model/dbservice_comp.go @@ -8,13 +8,13 @@ import ( type DBService_Comp struct { cbase.ModuleCompBase - task chan string - DB_Comp + task chan string + module *Model } func (this *DBService_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { this.ModuleCompBase.Init(service, module, comp, options) - + this.module = module.(*Model) return } @@ -27,9 +27,9 @@ func (this *DBService_Comp) run() { for { select { case v := <-this.task: - this.Model_UpdateUserDataByUid(v) + this.module.DB().(*DB_Comp).Model_UpdateUserDataByUid(v) case <-time.After(time.Second): - this.Model_UpdateDBByLog() + this.module.DB().(*DB_Comp).Model_UpdateDBByLog() } } } diff --git a/modules/mail_test.go b/modules/model/mail_test.go similarity index 100% rename from modules/mail_test.go rename to modules/model/mail_test.go diff --git a/sys/cache/init_test.go b/sys/cache/init_test.go index af9cc838e..0e420d0bf 100644 --- a/sys/cache/init_test.go +++ b/sys/cache/init_test.go @@ -2,10 +2,16 @@ package cache_test import ( "fmt" + "go_dreamfactory/comm" + "go_dreamfactory/lego/sys/log" + "go_dreamfactory/pb" "go_dreamfactory/sys/cache" "go_dreamfactory/sys/db" "os" "testing" + "time" + + "go.mongodb.org/mongo-driver/bson/primitive" ) //测试环境下初始化db和cache 系统 @@ -19,6 +25,33 @@ func TestMain(m *testing.M) { return } + _mail := &pb.DB_MailData{ + + UserId: "uid123", + Title: "系统邮件", + + Contex: "恭喜获得专属礼包一份", + CreateTime: uint64(time.Now().Unix()), + DueTime: uint64(time.Now().Unix()) + 30*24*3600, + Check: false, + Reward: false, + } + //db.InsertModelLogs("mail", "uid123", _mail) + //InsertModelLogs("mail", "uid123", _mail) + data := &comm.Autogenerated{ + ID: primitive.NewObjectID().Hex(), + UID: "uid123", + Act: string(comm.LogHandleType_Insert), + } + data.D = append(data.D, "mail") // D[0] + data.D = append(data.D, _mail) // D[1] + for i := 0; i < 100000; i++ { + _, err1 := db.Defsys.Mgo().InsertOne("model", data) + if err1 != nil { + log.Errorf("insert model db err %v", err1) + } + } + defer os.Exit(m.Run()) } From ea15865a4a68c6280f8df9141d585fd346db9597 Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Tue, 14 Jun 2022 15:32:15 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E8=84=9A=E6=9C=AC=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lego/sys/redis/options.go | 2 +- modules/pack/pack_test.go | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/lego/sys/redis/options.go b/lego/sys/redis/options.go index 93fd9b111..427ec10ac 100644 --- a/lego/sys/redis/options.go +++ b/lego/sys/redis/options.go @@ -63,7 +63,7 @@ func SetRedis_Single_PoolSize(v int) Option { o.Redis_Single_PoolSize = v } } -func Redis_Cluster_Addr(v []string) Option { +func SetRedis_Cluster_Addr(v []string) Option { return func(o *Options) { o.Redis_Cluster_Addr = v } diff --git a/modules/pack/pack_test.go b/modules/pack/pack_test.go index 2d794c304..3fefd0aea 100644 --- a/modules/pack/pack_test.go +++ b/modules/pack/pack_test.go @@ -2,20 +2,24 @@ package pack_test import ( "fmt" - "go_dreamfactory/sys/cache" + "go_dreamfactory/lego/sys/mgo" + "go_dreamfactory/lego/sys/redis" "go_dreamfactory/sys/configure" - "go_dreamfactory/sys/db" "os" "testing" ) +var db mgo.ISys +var cache redis.ISys + //测试环境下初始化db和cache 系统 func TestMain(m *testing.M) { - if err := db.OnInit(nil, db.Set_MongodbUrl("mongodb://admin:123456@10.0.0.9:27018"), db.Set_MongodbDatabase("dreamfactory")); err != nil { + var err error + if db, err = mgo.NewSys(nil, mgo.SetMongodbUrl("mongodb://admin:123456@10.0.0.9:27018"), mgo.SetMongodbDatabase("dreamfactory")); err != nil { fmt.Printf("err:%v\n", err) return } - if err := cache.OnInit(nil, cache.Set_Redis_Addr([]string{"10.0.0.9:9001", "10.0.0.9:9002", "10.0.0.9:9003", "10.0.1.45:9004", "10.0.1.45:9005", "10.0.1.45:9006"}), cache.Set_Redis_Password("")); err != nil { + if cache, err = redis.NewSys(nil, redis.SetRedisType(redis.Redis_Cluster), redis.SetRedis_Cluster_Addr([]string{"10.0.0.9:9001", "10.0.0.9:9002", "10.0.0.9:9003", "10.0.1.45:9004", "10.0.1.45:9005", "10.0.1.45:9006"}), redis.SetRedis_Cluster_Password("")); err != nil { fmt.Printf("err:%v\n", err) return } @@ -25,3 +29,7 @@ func TestMain(m *testing.M) { } defer os.Exit(m.Run()) } + +func Test_Log(t *testing.T) { + db.InsertOne("", 1) +} From 7081e9a8cb6155cfaf74084022b7234585802ef2 Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Tue, 14 Jun 2022 15:36:58 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/model/dbservice_comp.go | 4 ++-- modules/model/mail_test.go | 8 ++------ modules/model/module.go | 2 ++ sys/cache/cache.go | 2 +- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/modules/model/dbservice_comp.go b/modules/model/dbservice_comp.go index cfaa543b4..bf6056c8a 100644 --- a/modules/model/dbservice_comp.go +++ b/modules/model/dbservice_comp.go @@ -27,9 +27,9 @@ func (this *DBService_Comp) run() { for { select { case v := <-this.task: - this.module.DB().(*DB_Comp).Model_UpdateUserDataByUid(v) + this.module.db_comp.Model_UpdateUserDataByUid(v) case <-time.After(time.Second): - this.module.DB().(*DB_Comp).Model_UpdateDBByLog() + this.module.db_comp.Model_UpdateDBByLog() } } } diff --git a/modules/model/mail_test.go b/modules/model/mail_test.go index 4329db094..a16685f61 100644 --- a/modules/model/mail_test.go +++ b/modules/model/mail_test.go @@ -1,4 +1,4 @@ -package modules +package model import ( "go_dreamfactory/lego/sys/log" @@ -7,10 +7,6 @@ import ( "time" ) -var ( - obj MComp_DBComp -) - func TestCreatemoudles(t *testing.T) { _mail := &pb.DB_MailData{ @@ -23,7 +19,7 @@ func TestCreatemoudles(t *testing.T) { Check: false, Reward: false, } - obj.InsertModelLogs("mail", "uid123", _mail) + // obj.InsertModelLogs("mail", "uid123", _mail) log.Debugf("insert : %v", _mail) } diff --git a/modules/model/module.go b/modules/model/module.go index 0b32766bd..e2ac54346 100644 --- a/modules/model/module.go +++ b/modules/model/module.go @@ -15,6 +15,7 @@ func NewModule() core.IModule { type Model struct { modules.ModuleBase api_comp *Api_Comp + db_comp *DB_Comp configure_comp *Configure_Comp } @@ -37,6 +38,7 @@ func (this *Model) GetType() core.M_Modules { func (this *Model) OnInstallComp() { this.ModuleBase.OnInstallComp() this.api_comp = this.RegisterComp(new(Api_Comp)).(*Api_Comp) + this.db_comp = this.RegisterComp(new(DB_Comp)).(*DB_Comp) this.configure_comp = this.RegisterComp(new(Configure_Comp)).(*Configure_Comp) } diff --git a/sys/cache/cache.go b/sys/cache/cache.go index cb284640a..924bbaa54 100644 --- a/sys/cache/cache.go +++ b/sys/cache/cache.go @@ -21,7 +21,7 @@ type Cache struct { func (this *Cache) init() (err error) { this.redis, err = redis.NewSys( redis.SetRedisType(redis.Redis_Cluster), - redis.Redis_Cluster_Addr(this.options.Redis_Addr), + redis.SetRedis_Cluster_Addr(this.options.Redis_Addr), redis.SetRedis_Cluster_Password(this.options.Redis_Password)) return } From 3bf2feba6b59929e3cda2e7e1eb32df915bfad07 Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Tue, 14 Jun 2022 17:33:22 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=20=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=20=E6=A8=A1=E6=9D=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- modules/pack/pack_test.go | 75 +++++++++++++++++++++++++++++---------- 1 file changed, 56 insertions(+), 19 deletions(-) diff --git a/modules/pack/pack_test.go b/modules/pack/pack_test.go index 3fefd0aea..d17b20e44 100644 --- a/modules/pack/pack_test.go +++ b/modules/pack/pack_test.go @@ -1,35 +1,72 @@ -package pack_test +package pack import ( "fmt" - "go_dreamfactory/lego/sys/mgo" - "go_dreamfactory/lego/sys/redis" + "go_dreamfactory/lego" + "go_dreamfactory/lego/base/rpcx" + "go_dreamfactory/lego/core" + "go_dreamfactory/lego/sys/log" + "go_dreamfactory/services" + "go_dreamfactory/sys/cache" "go_dreamfactory/sys/configure" + "go_dreamfactory/sys/db" "os" "testing" + "time" ) -var db mgo.ISys -var cache redis.ISys +func newService(ops ...rpcx.Option) core.IService { + s := new(TestService) + s.Configure(ops...) + return s +} + +//梦工厂基础服务对象 +type TestService struct { + rpcx.RPCXService +} + +//初始化相关系统 +func (this *TestService) InitSys() { + this.RPCXService.InitSys() + if err := cache.OnInit(this.GetSettings().Sys["cache"]); err != nil { + panic(fmt.Sprintf("init sys.cache err: %s", err.Error())) + } else { + log.Infof("init sys.cache success!") + } + if err := db.OnInit(this.GetSettings().Sys["db"]); err != nil { + panic(fmt.Sprintf("init sys.db err: %s", err.Error())) + } else { + log.Infof("init sys.db success!") + } + if err := configure.OnInit(this.GetSettings().Sys["configure"]); err != nil { + panic(fmt.Sprintf("init sys.configure err: %s", err.Error())) + } else { + log.Infof("init sys.configure success!") + } +} + +var module = new(Pack) //测试环境下初始化db和cache 系统 func TestMain(m *testing.M) { - var err error - if db, err = mgo.NewSys(nil, mgo.SetMongodbUrl("mongodb://admin:123456@10.0.0.9:27018"), mgo.SetMongodbDatabase("dreamfactory")); err != nil { - fmt.Printf("err:%v\n", err) - return - } - if cache, err = redis.NewSys(nil, redis.SetRedisType(redis.Redis_Cluster), redis.SetRedis_Cluster_Addr([]string{"10.0.0.9:9001", "10.0.0.9:9002", "10.0.0.9:9003", "10.0.1.45:9004", "10.0.1.45:9005", "10.0.1.45:9006"}), redis.SetRedis_Cluster_Password("")); err != nil { - fmt.Printf("err:%v\n", err) - return - } - if err := configure.OnInit(nil); err != nil { - fmt.Printf("err:%v\n", err) - return - } + s := newService( + rpcx.SetConfPath("../../bin/conf/worker_1.yaml"), + rpcx.SetVersion("1.0.0.0"), + ) + s.OnInstallComp( //装备组件 + services.NewGateRouteComp(), //此服务需要接受用户的消息 需要装备网关组件 + ) + go func() { + lego.Run(s, //运行模块 + module, + ) + }() + time.Sleep(time.Second) defer os.Exit(m.Run()) } func Test_Log(t *testing.T) { - db.InsertOne("", 1) + items, err := module.db_comp.Pack_QueryUserPack("liwei1dao") + log.Debugf("item:%v err:%v", items, err) } From b0a5b7ffe9b098fb42c864c9d676a09a2f5762e6 Mon Sep 17 00:00:00 2001 From: liwei1dao Date: Tue, 14 Jun 2022 19:28:47 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E4=BC=98=E5=8C=96mgo=E9=A9=B1=E5=8A=A8?= =?UTF-8?q?=E5=BA=93=E9=BB=98=E8=AE=A4=E9=85=8D=E7=BD=AE=E9=A1=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lego/sys/mgo/mgo.go | 22 ++++++++++------------ lego/sys/mgo/options.go | 4 ++-- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/lego/sys/mgo/mgo.go b/lego/sys/mgo/mgo.go index 6fb81cdf9..f35e2fa7d 100644 --- a/lego/sys/mgo/mgo.go +++ b/lego/sys/mgo/mgo.go @@ -3,7 +3,6 @@ package mgo import ( "context" "fmt" - "time" "go_dreamfactory/lego/core" @@ -12,7 +11,6 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readconcern" "go.mongodb.org/mongo-driver/mongo/readpref" - "go.mongodb.org/mongo-driver/mongo/writeconcern" ) func newSys(options Options) (sys *Mongodb, err error) { @@ -28,20 +26,20 @@ type Mongodb struct { } func (this *Mongodb) init() (err error) { - want, err := readpref.New(readpref.SecondaryMode) //表示只使用辅助节点 - if err != nil { - return fmt.Errorf("数据库设置辅助节点 err=%s", err.Error()) - } - wc := writeconcern.New(writeconcern.WMajority()) + // want, err := readpref.New(readpref.SecondaryMode) //表示只使用辅助节点 + // if err != nil { + // return fmt.Errorf("数据库设置辅助节点 err=%s", err.Error()) + // } + // wc := writeconcern.New(writeconcern.W(1)) readconcern.Majority() //链接mongo服务 opt := options.Client().ApplyURI(this.options.MongodbUrl) - opt.SetLocalThreshold(3 * time.Second) //只使用与mongo操作耗时小于3秒的 - opt.SetMaxConnIdleTime(5 * time.Second) //指定连接可以保持空闲的最大毫秒数 + // opt.SetLocalThreshold(3 * time.Second) //只使用与mongo操作耗时小于3秒的 + // opt.SetMaxConnIdleTime(5 * time.Second) //指定连接可以保持空闲的最大毫秒数 opt.SetMaxPoolSize(this.options.MaxPoolSize) //使用最大的连接数 - opt.SetReadPreference(want) //表示只使用辅助节点 - opt.SetReadConcern(readconcern.Majority()) //指定查询应返回实例的最新数据确认为,已写入副本集中的大多数成员 - opt.SetWriteConcern(wc) //请求确认写操作传播到大多数mongod实例 + // opt.SetReadPreference(want) //表示只使用辅助节点 + // opt.SetReadConcern(readconcern.Majority()) //指定查询应返回实例的最新数据确认为,已写入副本集中的大多数成员 + // opt.SetWriteConcern(wc) //请求确认写操作传播到大多数mongod实例 if client, err := mongo.Connect(this.getContext(), opt); err != nil { return fmt.Errorf("连接数据库错误 err=%s", err.Error()) } else { diff --git a/lego/sys/mgo/options.go b/lego/sys/mgo/options.go index 18c1b95ae..0abb5c603 100644 --- a/lego/sys/mgo/options.go +++ b/lego/sys/mgo/options.go @@ -40,7 +40,7 @@ func SetTimeOut(v time.Duration) Option { func newOptions(config map[string]interface{}, opts ...Option) Options { options := Options{ - MaxPoolSize: 1000, + MaxPoolSize: 100, TimeOut: time.Second * 3, } if config != nil { @@ -54,7 +54,7 @@ func newOptions(config map[string]interface{}, opts ...Option) Options { func newOptionsByOption(opts ...Option) Options { options := Options{ - MaxPoolSize: 1000, + MaxPoolSize: 100, TimeOut: time.Second * 3, } for _, o := range opts { From c7313573013bc62e6d21464da5cbde0633f13377 Mon Sep 17 00:00:00 2001 From: meixiongfeng <766881921@qq.com> Date: Tue, 14 Jun 2022 19:32:02 +0800 Subject: [PATCH 6/6] =?UTF-8?q?model=20=E6=95=B0=E6=8D=AE=E8=90=BD?= =?UTF-8?q?=E5=9C=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- comm/core.go | 2 +- modules/model/api.go | 5 ++++ modules/model/db_comp.go | 53 ++++++++++++++++++++++----------- modules/model/dbservice_comp.go | 7 +++-- modules/model/module.go | 16 ++-------- sys/cache/init_test.go | 44 ++++++++++++++------------- 6 files changed, 71 insertions(+), 56 deletions(-) diff --git a/comm/core.go b/comm/core.go index 02d6d204c..d92382beb 100644 --- a/comm/core.go +++ b/comm/core.go @@ -56,7 +56,7 @@ type ISC_GateRouteComp interface { } type Autogenerated struct { - ID string `json:"_id"` + ID string `json:"ID,omitempty" bson:"_id"` UID string `json:"uid"` Act string `json:"act"` // insert update delete D []interface{} diff --git a/modules/model/api.go b/modules/model/api.go index 777cf3ff2..1a72633b9 100644 --- a/modules/model/api.go +++ b/modules/model/api.go @@ -18,3 +18,8 @@ func (this *Api_Comp) Init(service core.IService, module core.IModule, comp core this.module = module.(*Model) return } + +func (this *Api_Comp) Start() (err error) { + + return +} diff --git a/modules/model/db_comp.go b/modules/model/db_comp.go index 083e8fbba..0331596f8 100644 --- a/modules/model/db_comp.go +++ b/modules/model/db_comp.go @@ -6,8 +6,8 @@ import ( "go_dreamfactory/comm" "go_dreamfactory/lego/core" "go_dreamfactory/lego/sys/log" - "go_dreamfactory/lego/sys/mgo" "go_dreamfactory/modules" + "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" @@ -19,7 +19,6 @@ const ( type DB_Comp struct { modules.MComp_DBComp - mgo mgo.ISys } // type data struct { @@ -44,13 +43,16 @@ type IModel interface { } func (this *DB_Comp) Model_UpdateDBByLog() (err error) { + //_startTime := time.Now().UnixNano() _delID := make([]string, 0) // 处理完成要删除的id - _data, err := this.mgo.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum))) + _data, err := this.DB.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum))) if err != nil { return err } + for _data.Next(context.TODO()) { // 处理删除逻辑 + data := &comm.Autogenerated{} if err = _data.Decode(data); err == nil { _delID = append(_delID, data.ID) @@ -59,23 +61,34 @@ func (this *DB_Comp) Model_UpdateDBByLog() (err error) { continue } - log.Debugf("======= insert log : %+v =======", data) + //log.Debugf("======= insert log : %+v =======", data) if data.Act == string(comm.LogHandleType_Insert) { if len(data.D) < 2 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) return errors.New("parameter len err") } - query := data.D[1].([]interface{}) - _, err := this.mgo.InsertMany(data.D[0].(core.SqlTable), query) - if err != nil { - log.Errorf("insert %s db err:%v", data.D[0].(core.SqlTable), err) + + query := data.D[1] + _obj := bson.M{} + for _, v := range query.(bson.D) { + _obj[v.Key] = v.Value } + _key := data.D[0].(string) + + _, err := this.DB.InsertOne(core.SqlTable(_key), _obj) + if err != nil { + log.Errorf("insert %s db err:%v", "mail", err) + } + // _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID}) + // if err != nil { + // log.Errorf("insert %s db err:%v", data.ID, err) + // } } else if data.Act == string(comm.LogHandleType_Delete) { if len(data.D) < 2 { // 参数校验 log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) return errors.New("parameter len err") } - _, err := this.mgo.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{})) + _, err := this.DB.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{})) if err != nil { log.Errorf("delete %s db err:%v", data.D[0].(core.SqlTable), err) } @@ -93,20 +106,22 @@ func (this *DB_Comp) Model_UpdateDBByLog() (err error) { for k, v := range query { _obj.Query[k] = v } - this.mgo.UpdateMany(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query) + this.DB.UpdateMany(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query) } } // 批量删除已处理的数据 - _, err = this.mgo.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$inc": _delID}}, options.Delete()) + _, err = this.DB.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$in": _delID}}, options.Delete()) if err != nil { log.Errorf("del err %v", err) } + // _endTime := time.Now().UnixNano() + // log.Debugf("==============subTime=%d===========del len = %d", _endTime-_startTime, len(_delID)) return } func (this *DB_Comp) Model_InsertDBByLog(data *comm.Autogenerated) (err error) { - _, err = this.mgo.InsertOne(DB_ModelTable, data) + _, err = this.DB.InsertOne(DB_ModelTable, data) if err != nil { log.Errorf("insert model db err %v", err) } @@ -114,9 +129,10 @@ func (this *DB_Comp) Model_InsertDBByLog(data *comm.Autogenerated) (err error) { } func (this *DB_Comp) Model_UpdateUserDataByUid(uid string) (err error) { + _startTime := time.Now().UnixNano() _delID := make([]string, 0) // 处理完成要删除的id - _data, err := this.mgo.Find(DB_ModelTable, bson.M{"userid": uid}, options.Find()) + _data, err := this.DB.Find(DB_ModelTable, bson.M{"userid": uid}, options.Find()) if err != nil { return err } @@ -135,7 +151,7 @@ func (this *DB_Comp) Model_UpdateUserDataByUid(uid string) (err error) { log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) return errors.New("parameter len err") } - _, err := this.mgo.InsertMany(data.D[0].(core.SqlTable), data.D[1].([]interface{})) + _, err := this.DB.InsertMany(data.D[0].(core.SqlTable), data.D[1].([]interface{})) if err != nil { log.Errorf("insert %s db err:%v", data.D[0].(core.SqlTable), err) } @@ -144,7 +160,7 @@ func (this *DB_Comp) Model_UpdateUserDataByUid(uid string) (err error) { log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D)) return errors.New("parameter len err") } - _, err := this.mgo.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{})) + _, err := this.DB.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{})) if err != nil { log.Errorf("delete %s db err:%v", data.D[0].(core.SqlTable), err) } @@ -162,13 +178,16 @@ func (this *DB_Comp) Model_UpdateUserDataByUid(uid string) (err error) { for k, v := range query { _obj.Query[k] = v } - this.mgo.FindOneAndUpdate(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query) + this.DB.FindOneAndUpdate(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query) } } // 批量删除已处理的数据 - _, err = this.mgo.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$inc": _delID}}, options.Delete()) + _, err = this.DB.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$inc": _delID}}, options.Delete()) if err != nil { log.Errorf("del err %v", err) } + + _endTime := time.Now().UnixNano() + log.Debugf("==============subTime============del len = %d", _endTime-_startTime, len(_delID)) return } diff --git a/modules/model/dbservice_comp.go b/modules/model/dbservice_comp.go index bf6056c8a..1d03a1bc2 100644 --- a/modules/model/dbservice_comp.go +++ b/modules/model/dbservice_comp.go @@ -18,9 +18,10 @@ func (this *DBService_Comp) Init(service core.IService, module core.IModule, com return } -func (this *DBService_Comp) Start(err error) { - this.ModuleCompBase.Start() +func (this *DBService_Comp) Start() (err error) { + err = this.ModuleCompBase.Start() go this.run() + return } func (this *DBService_Comp) run() { @@ -28,7 +29,7 @@ func (this *DBService_Comp) run() { select { case v := <-this.task: this.module.db_comp.Model_UpdateUserDataByUid(v) - case <-time.After(time.Second): + case <-time.After(time.Second * 2): this.module.db_comp.Model_UpdateDBByLog() } } diff --git a/modules/model/module.go b/modules/model/module.go index e2ac54346..2842df047 100644 --- a/modules/model/module.go +++ b/modules/model/module.go @@ -4,7 +4,6 @@ import ( "go_dreamfactory/comm" "go_dreamfactory/lego/core" "go_dreamfactory/modules" - "time" ) func NewModule() core.IModule { @@ -16,6 +15,7 @@ type Model struct { modules.ModuleBase api_comp *Api_Comp db_comp *DB_Comp + db_service *DBService_Comp configure_comp *Configure_Comp } @@ -25,12 +25,6 @@ func (this *Model) Init(service core.IService, module core.IModule, options core return } -func (this *Model) Start() (err error) { - err = this.ModuleBase.Start() - //go this.RunWriteDB() - return -} - func (this *Model) GetType() core.M_Modules { return comm.SM_LogModelModule } @@ -39,12 +33,6 @@ func (this *Model) OnInstallComp() { this.ModuleBase.OnInstallComp() this.api_comp = this.RegisterComp(new(Api_Comp)).(*Api_Comp) this.db_comp = this.RegisterComp(new(DB_Comp)).(*DB_Comp) + this.db_service = this.RegisterComp(new(DBService_Comp)).(*DBService_Comp) this.configure_comp = this.RegisterComp(new(Configure_Comp)).(*Configure_Comp) } - -func (this *Model) RunWriteDB() { - for { - time.Sleep(time.Second) - - } -} diff --git a/sys/cache/init_test.go b/sys/cache/init_test.go index 0e420d0bf..45d150459 100644 --- a/sys/cache/init_test.go +++ b/sys/cache/init_test.go @@ -24,34 +24,36 @@ func TestMain(m *testing.M) { fmt.Printf("err:%v\n", err) return } + for i := 0; i < 50000; i++ { + //go func() { + _mail := &pb.DB_MailData{ + ObjId: primitive.NewObjectID().Hex(), + UserId: "uid123", + Title: "系统邮件", - _mail := &pb.DB_MailData{ + Contex: "恭喜获得专属礼包一份", + CreateTime: uint64(time.Now().Unix()), + DueTime: uint64(time.Now().Unix()) + 30*24*3600, + Check: false, + Reward: false, + } + //db.InsertModelLogs("mail", "uid123", _mail) + //InsertModelLogs("mail", "uid123", _mail) + data := &comm.Autogenerated{ + ID: primitive.NewObjectID().Hex(), + UID: "uid123", + Act: string(comm.LogHandleType_Insert), + } + data.D = append(data.D, "mail") // D[0] + data.D = append(data.D, _mail) // D[1] - UserId: "uid123", - Title: "系统邮件", - - Contex: "恭喜获得专属礼包一份", - CreateTime: uint64(time.Now().Unix()), - DueTime: uint64(time.Now().Unix()) + 30*24*3600, - Check: false, - Reward: false, - } - //db.InsertModelLogs("mail", "uid123", _mail) - //InsertModelLogs("mail", "uid123", _mail) - data := &comm.Autogenerated{ - ID: primitive.NewObjectID().Hex(), - UID: "uid123", - Act: string(comm.LogHandleType_Insert), - } - data.D = append(data.D, "mail") // D[0] - data.D = append(data.D, _mail) // D[1] - for i := 0; i < 100000; i++ { _, err1 := db.Defsys.Mgo().InsertOne("model", data) if err1 != nil { log.Errorf("insert model db err %v", err1) } + //}() } - + time.Sleep(time.Second * 10) defer os.Exit(m.Run()) }