model 数据落地
This commit is contained in:
parent
3bf2feba6b
commit
c731357301
@ -56,7 +56,7 @@ type ISC_GateRouteComp interface {
|
||||
}
|
||||
|
||||
type Autogenerated struct {
|
||||
ID string `json:"_id"`
|
||||
ID string `json:"ID,omitempty" bson:"_id"`
|
||||
UID string `json:"uid"`
|
||||
Act string `json:"act"` // insert update delete
|
||||
D []interface{}
|
||||
|
@ -18,3 +18,8 @@ func (this *Api_Comp) Init(service core.IService, module core.IModule, comp core
|
||||
this.module = module.(*Model)
|
||||
return
|
||||
}
|
||||
|
||||
func (this *Api_Comp) Start() (err error) {
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -6,8 +6,8 @@ import (
|
||||
"go_dreamfactory/comm"
|
||||
"go_dreamfactory/lego/core"
|
||||
"go_dreamfactory/lego/sys/log"
|
||||
"go_dreamfactory/lego/sys/mgo"
|
||||
"go_dreamfactory/modules"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
@ -19,7 +19,6 @@ const (
|
||||
|
||||
type DB_Comp struct {
|
||||
modules.MComp_DBComp
|
||||
mgo mgo.ISys
|
||||
}
|
||||
|
||||
// type data struct {
|
||||
@ -44,13 +43,16 @@ type IModel interface {
|
||||
}
|
||||
|
||||
func (this *DB_Comp) Model_UpdateDBByLog() (err error) {
|
||||
//_startTime := time.Now().UnixNano()
|
||||
|
||||
_delID := make([]string, 0) // 处理完成要删除的id
|
||||
_data, err := this.mgo.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum)))
|
||||
_data, err := this.DB.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _data.Next(context.TODO()) { // 处理删除逻辑
|
||||
|
||||
data := &comm.Autogenerated{}
|
||||
if err = _data.Decode(data); err == nil {
|
||||
_delID = append(_delID, data.ID)
|
||||
@ -59,23 +61,34 @@ func (this *DB_Comp) Model_UpdateDBByLog() (err error) {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("======= insert log : %+v =======", data)
|
||||
//log.Debugf("======= insert log : %+v =======", data)
|
||||
if data.Act == string(comm.LogHandleType_Insert) {
|
||||
if len(data.D) < 2 { // 参数校验
|
||||
log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D))
|
||||
return errors.New("parameter len err")
|
||||
}
|
||||
query := data.D[1].([]interface{})
|
||||
_, err := this.mgo.InsertMany(data.D[0].(core.SqlTable), query)
|
||||
if err != nil {
|
||||
log.Errorf("insert %s db err:%v", data.D[0].(core.SqlTable), err)
|
||||
|
||||
query := data.D[1]
|
||||
_obj := bson.M{}
|
||||
for _, v := range query.(bson.D) {
|
||||
_obj[v.Key] = v.Value
|
||||
}
|
||||
_key := data.D[0].(string)
|
||||
|
||||
_, err := this.DB.InsertOne(core.SqlTable(_key), _obj)
|
||||
if err != nil {
|
||||
log.Errorf("insert %s db err:%v", "mail", err)
|
||||
}
|
||||
// _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID})
|
||||
// if err != nil {
|
||||
// log.Errorf("insert %s db err:%v", data.ID, err)
|
||||
// }
|
||||
} else if data.Act == string(comm.LogHandleType_Delete) {
|
||||
if len(data.D) < 2 { // 参数校验
|
||||
log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D))
|
||||
return errors.New("parameter len err")
|
||||
}
|
||||
_, err := this.mgo.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{}))
|
||||
_, err := this.DB.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{}))
|
||||
if err != nil {
|
||||
log.Errorf("delete %s db err:%v", data.D[0].(core.SqlTable), err)
|
||||
}
|
||||
@ -93,20 +106,22 @@ func (this *DB_Comp) Model_UpdateDBByLog() (err error) {
|
||||
for k, v := range query {
|
||||
_obj.Query[k] = v
|
||||
}
|
||||
this.mgo.UpdateMany(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query)
|
||||
this.DB.UpdateMany(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query)
|
||||
}
|
||||
}
|
||||
// 批量删除已处理的数据
|
||||
_, err = this.mgo.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$inc": _delID}}, options.Delete())
|
||||
_, err = this.DB.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$in": _delID}}, options.Delete())
|
||||
if err != nil {
|
||||
log.Errorf("del err %v", err)
|
||||
}
|
||||
// _endTime := time.Now().UnixNano()
|
||||
// log.Debugf("==============subTime=%d===========del len = %d", _endTime-_startTime, len(_delID))
|
||||
return
|
||||
}
|
||||
|
||||
func (this *DB_Comp) Model_InsertDBByLog(data *comm.Autogenerated) (err error) {
|
||||
|
||||
_, err = this.mgo.InsertOne(DB_ModelTable, data)
|
||||
_, err = this.DB.InsertOne(DB_ModelTable, data)
|
||||
if err != nil {
|
||||
log.Errorf("insert model db err %v", err)
|
||||
}
|
||||
@ -114,9 +129,10 @@ func (this *DB_Comp) Model_InsertDBByLog(data *comm.Autogenerated) (err error) {
|
||||
}
|
||||
|
||||
func (this *DB_Comp) Model_UpdateUserDataByUid(uid string) (err error) {
|
||||
_startTime := time.Now().UnixNano()
|
||||
|
||||
_delID := make([]string, 0) // 处理完成要删除的id
|
||||
_data, err := this.mgo.Find(DB_ModelTable, bson.M{"userid": uid}, options.Find())
|
||||
_data, err := this.DB.Find(DB_ModelTable, bson.M{"userid": uid}, options.Find())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -135,7 +151,7 @@ func (this *DB_Comp) Model_UpdateUserDataByUid(uid string) (err error) {
|
||||
log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D))
|
||||
return errors.New("parameter len err")
|
||||
}
|
||||
_, err := this.mgo.InsertMany(data.D[0].(core.SqlTable), data.D[1].([]interface{}))
|
||||
_, err := this.DB.InsertMany(data.D[0].(core.SqlTable), data.D[1].([]interface{}))
|
||||
if err != nil {
|
||||
log.Errorf("insert %s db err:%v", data.D[0].(core.SqlTable), err)
|
||||
}
|
||||
@ -144,7 +160,7 @@ func (this *DB_Comp) Model_UpdateUserDataByUid(uid string) (err error) {
|
||||
log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D))
|
||||
return errors.New("parameter len err")
|
||||
}
|
||||
_, err := this.mgo.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{}))
|
||||
_, err := this.DB.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{}))
|
||||
if err != nil {
|
||||
log.Errorf("delete %s db err:%v", data.D[0].(core.SqlTable), err)
|
||||
}
|
||||
@ -162,13 +178,16 @@ func (this *DB_Comp) Model_UpdateUserDataByUid(uid string) (err error) {
|
||||
for k, v := range query {
|
||||
_obj.Query[k] = v
|
||||
}
|
||||
this.mgo.FindOneAndUpdate(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query)
|
||||
this.DB.FindOneAndUpdate(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query)
|
||||
}
|
||||
}
|
||||
// 批量删除已处理的数据
|
||||
_, err = this.mgo.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$inc": _delID}}, options.Delete())
|
||||
_, err = this.DB.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$inc": _delID}}, options.Delete())
|
||||
if err != nil {
|
||||
log.Errorf("del err %v", err)
|
||||
}
|
||||
|
||||
_endTime := time.Now().UnixNano()
|
||||
log.Debugf("==============subTime============del len = %d", _endTime-_startTime, len(_delID))
|
||||
return
|
||||
}
|
||||
|
@ -18,9 +18,10 @@ func (this *DBService_Comp) Init(service core.IService, module core.IModule, com
|
||||
return
|
||||
}
|
||||
|
||||
func (this *DBService_Comp) Start(err error) {
|
||||
this.ModuleCompBase.Start()
|
||||
func (this *DBService_Comp) Start() (err error) {
|
||||
err = this.ModuleCompBase.Start()
|
||||
go this.run()
|
||||
return
|
||||
}
|
||||
|
||||
func (this *DBService_Comp) run() {
|
||||
@ -28,7 +29,7 @@ func (this *DBService_Comp) run() {
|
||||
select {
|
||||
case v := <-this.task:
|
||||
this.module.db_comp.Model_UpdateUserDataByUid(v)
|
||||
case <-time.After(time.Second):
|
||||
case <-time.After(time.Second * 2):
|
||||
this.module.db_comp.Model_UpdateDBByLog()
|
||||
}
|
||||
}
|
||||
|
@ -4,7 +4,6 @@ import (
|
||||
"go_dreamfactory/comm"
|
||||
"go_dreamfactory/lego/core"
|
||||
"go_dreamfactory/modules"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewModule() core.IModule {
|
||||
@ -16,6 +15,7 @@ type Model struct {
|
||||
modules.ModuleBase
|
||||
api_comp *Api_Comp
|
||||
db_comp *DB_Comp
|
||||
db_service *DBService_Comp
|
||||
configure_comp *Configure_Comp
|
||||
}
|
||||
|
||||
@ -25,12 +25,6 @@ func (this *Model) Init(service core.IService, module core.IModule, options core
|
||||
return
|
||||
}
|
||||
|
||||
func (this *Model) Start() (err error) {
|
||||
err = this.ModuleBase.Start()
|
||||
//go this.RunWriteDB()
|
||||
return
|
||||
}
|
||||
|
||||
func (this *Model) GetType() core.M_Modules {
|
||||
return comm.SM_LogModelModule
|
||||
}
|
||||
@ -39,12 +33,6 @@ func (this *Model) OnInstallComp() {
|
||||
this.ModuleBase.OnInstallComp()
|
||||
this.api_comp = this.RegisterComp(new(Api_Comp)).(*Api_Comp)
|
||||
this.db_comp = this.RegisterComp(new(DB_Comp)).(*DB_Comp)
|
||||
this.db_service = this.RegisterComp(new(DBService_Comp)).(*DBService_Comp)
|
||||
this.configure_comp = this.RegisterComp(new(Configure_Comp)).(*Configure_Comp)
|
||||
}
|
||||
|
||||
func (this *Model) RunWriteDB() {
|
||||
for {
|
||||
time.Sleep(time.Second)
|
||||
|
||||
}
|
||||
}
|
||||
|
44
sys/cache/init_test.go
vendored
44
sys/cache/init_test.go
vendored
@ -24,34 +24,36 @@ func TestMain(m *testing.M) {
|
||||
fmt.Printf("err:%v\n", err)
|
||||
return
|
||||
}
|
||||
for i := 0; i < 50000; i++ {
|
||||
//go func() {
|
||||
_mail := &pb.DB_MailData{
|
||||
ObjId: primitive.NewObjectID().Hex(),
|
||||
UserId: "uid123",
|
||||
Title: "系统邮件",
|
||||
|
||||
_mail := &pb.DB_MailData{
|
||||
Contex: "恭喜获得专属礼包一份",
|
||||
CreateTime: uint64(time.Now().Unix()),
|
||||
DueTime: uint64(time.Now().Unix()) + 30*24*3600,
|
||||
Check: false,
|
||||
Reward: false,
|
||||
}
|
||||
//db.InsertModelLogs("mail", "uid123", _mail)
|
||||
//InsertModelLogs("mail", "uid123", _mail)
|
||||
data := &comm.Autogenerated{
|
||||
ID: primitive.NewObjectID().Hex(),
|
||||
UID: "uid123",
|
||||
Act: string(comm.LogHandleType_Insert),
|
||||
}
|
||||
data.D = append(data.D, "mail") // D[0]
|
||||
data.D = append(data.D, _mail) // D[1]
|
||||
|
||||
UserId: "uid123",
|
||||
Title: "系统邮件",
|
||||
|
||||
Contex: "恭喜获得专属礼包一份",
|
||||
CreateTime: uint64(time.Now().Unix()),
|
||||
DueTime: uint64(time.Now().Unix()) + 30*24*3600,
|
||||
Check: false,
|
||||
Reward: false,
|
||||
}
|
||||
//db.InsertModelLogs("mail", "uid123", _mail)
|
||||
//InsertModelLogs("mail", "uid123", _mail)
|
||||
data := &comm.Autogenerated{
|
||||
ID: primitive.NewObjectID().Hex(),
|
||||
UID: "uid123",
|
||||
Act: string(comm.LogHandleType_Insert),
|
||||
}
|
||||
data.D = append(data.D, "mail") // D[0]
|
||||
data.D = append(data.D, _mail) // D[1]
|
||||
for i := 0; i < 100000; i++ {
|
||||
_, err1 := db.Defsys.Mgo().InsertOne("model", data)
|
||||
if err1 != nil {
|
||||
log.Errorf("insert model db err %v", err1)
|
||||
}
|
||||
//}()
|
||||
}
|
||||
|
||||
time.Sleep(time.Second * 10)
|
||||
defer os.Exit(m.Run())
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user