This commit is contained in:
liwei1dao 2022-06-15 13:36:13 +08:00
commit c7517d345a
9 changed files with 59 additions and 138 deletions

View File

@ -1,23 +0,0 @@
package modules
import (
"go_dreamfactory/lego/core"
"go_dreamfactory/lego/core/cbase"
"go_dreamfactory/lego/sys/redis"
"go_dreamfactory/sys/cache"
)
/*
基础组件 缓存组件 读写缓存数据
*/
type MComp_CacheComp struct {
cbase.ModuleCompBase
Redis redis.ISys
}
//组件初始化接口
func (this *MComp_CacheComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
this.ModuleCompBase.Init(service, module, comp, options)
this.Redis = cache.Redis()
return
}

View File

@ -23,13 +23,13 @@ type (
IAPI_Comp interface {
}
ICache_Comp interface {
}
IDB_Comp interface {
// 向db 写日志信息
InsertModelLogs(table string, uID string, target interface{}) (err error)
DeleteModelLogs(table string, uID string, where interface{}) (err error)
UpdateModelLogs(table string, uID string, where interface{}, target interface{}) (err error)
}
IDB_Comp interface {
}
IConfigure_Comp interface {
LoadConfigure(name string, fn interface{}) (err error)
GetConfigure(name string) (v interface{}, err error)

View File

@ -17,7 +17,7 @@ const (
)
type DB_Comp struct {
modules.MComp_DBComp
modules.Model_Comp
}
type IMail interface {
Mail_QueryUserMail(uId string) (mail []*pb.DB_MailData, err error)

View File

@ -2,14 +2,13 @@ package model
import (
"context"
"errors"
"go_dreamfactory/comm"
"go_dreamfactory/lego/core"
"go_dreamfactory/lego/sys/log"
"go_dreamfactory/modules"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
@ -18,7 +17,7 @@ const (
)
type DB_Comp struct {
modules.MComp_DBComp
modules.Model_Comp
}
// type data struct {
@ -39,18 +38,22 @@ const (
type IModel interface {
Model_UpdateDBByLog() (err error) // 读取日志并更新对应的表
Model_InsertDBByLog(data *comm.Autogenerated) (err error) // 插入日志
Model_UpdateUserDataByUid(uid string) (err error) // 读取指定玩家信息到db中
}
func (this *DB_Comp) Model_UpdateDBByLog() (err error) {
//_startTime := time.Now().UnixNano()
_delID := make([]string, 0) // 处理完成要删除的id
_data, err := this.DB.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum)))
if err != nil {
return err
func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) {
_data := &mongo.Cursor{}
if uid == "" {
_data, err = this.DB.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum)))
if err != nil {
return err
}
} else {
_data, err = this.DB.Find(DB_ModelTable, bson.M{"uid": uid}, options.Find())
if err != nil {
return err
}
}
_delID := make([]string, 0) // 处理完成要删除的id
for _data.Next(context.TODO()) { // 处理删除逻辑
data := &comm.Autogenerated{}
@ -61,23 +64,20 @@ func (this *DB_Comp) Model_UpdateDBByLog() (err error) {
continue
}
//log.Debugf("======= insert log : %+v =======", data)
if data.Act == string(comm.LogHandleType_Insert) {
if len(data.D) < 2 { // 参数校验
log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D))
return errors.New("parameter len err")
break
}
query := data.D[1]
_obj := bson.M{}
for _, v := range query.(bson.D) {
for _, v := range data.D[1].(bson.D) {
_obj[v.Key] = v.Value
}
_key := data.D[0].(string)
_, err := this.DB.InsertOne(core.SqlTable(_key), _obj)
if err != nil {
log.Errorf("insert %s db err:%v", "mail", err)
log.Errorf("insert %s db err:%v", (core.SqlTable(_key)), err)
}
// _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID})
// if err != nil {
@ -86,25 +86,29 @@ func (this *DB_Comp) Model_UpdateDBByLog() (err error) {
} else if data.Act == string(comm.LogHandleType_Delete) {
if len(data.D) < 2 { // 参数校验
log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D))
return errors.New("parameter len err")
break
}
_, err := this.DB.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{}))
_objKey := make([]string, 0)
for _, v := range data.D[1].(bson.D) {
_objKey = append(_objKey, v.Value.(string))
}
_, err = this.DB.DeleteMany(data.D[0].(core.SqlTable), bson.M{"_id": bson.M{"$in": _objKey}}, options.Delete())
if err != nil {
log.Errorf("delete %s db err:%v", data.D[0].(core.SqlTable), err)
}
} else { // update
if len(data.D) < 3 { // 参数校验
log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D))
return errors.New("parameter len err")
break
}
where := data.D[1].(map[string]interface{})
where := data.D[1].(bson.D)
_obj := &QueryStruct{}
for k, v := range where {
_obj.Selector[k] = v
for _, v := range where {
_obj.Selector[v.Key] = v
}
query := data.D[2].(map[string]interface{})
for k, v := range query {
_obj.Query[k] = v
query := data.D[2].(bson.D)
for _, v := range query {
_obj.Query[v.Key] = v
}
this.DB.UpdateMany(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query)
}
@ -114,8 +118,6 @@ func (this *DB_Comp) Model_UpdateDBByLog() (err error) {
if err != nil {
log.Errorf("del err %v", err)
}
// _endTime := time.Now().UnixNano()
// log.Debugf("==============subTime=%d===========del len = %d", _endTime-_startTime, len(_delID))
return
}
@ -127,67 +129,3 @@ func (this *DB_Comp) Model_InsertDBByLog(data *comm.Autogenerated) (err error) {
}
return err
}
func (this *DB_Comp) Model_UpdateUserDataByUid(uid string) (err error) {
_startTime := time.Now().UnixNano()
_delID := make([]string, 0) // 处理完成要删除的id
_data, err := this.DB.Find(DB_ModelTable, bson.M{"userid": uid}, options.Find())
if err != nil {
return err
}
for _data.Next(context.TODO()) { // 处理删除逻辑
data := &comm.Autogenerated{}
if err = _data.Decode(data); err == nil {
_delID = append(_delID, data.ID)
} else {
log.Errorf("Decode Data err : %v", err)
continue
}
log.Debugf("======= insert log : %+v =======", data)
if data.Act == string(comm.LogHandleType_Insert) {
if len(data.D) < 2 { // 参数校验
log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D))
return errors.New("parameter len err")
}
_, err := this.DB.InsertMany(data.D[0].(core.SqlTable), data.D[1].([]interface{}))
if err != nil {
log.Errorf("insert %s db err:%v", data.D[0].(core.SqlTable), err)
}
} else if data.Act == string(comm.LogHandleType_Delete) {
if len(data.D) < 2 { // 参数校验
log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D))
return errors.New("parameter len err")
}
_, err := this.DB.DeleteMany(data.D[0].(core.SqlTable), data.D[1].(map[string]interface{}))
if err != nil {
log.Errorf("delete %s db err:%v", data.D[0].(core.SqlTable), err)
}
} else { // update
if len(data.D) < 3 { // 参数校验
log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D))
return errors.New("parameter len err")
}
where := data.D[1].(map[string]interface{})
_obj := &QueryStruct{}
for k, v := range where {
_obj.Selector[k] = v
}
query := data.D[2].(map[string]interface{})
for k, v := range query {
_obj.Query[k] = v
}
this.DB.FindOneAndUpdate(data.D[0].(core.SqlTable), _obj.Selector, _obj.Query)
}
}
// 批量删除已处理的数据
_, err = this.DB.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$inc": _delID}}, options.Delete())
if err != nil {
log.Errorf("del err %v", err)
}
_endTime := time.Now().UnixNano()
log.Debugf("==============subTime============del len = %d", _endTime-_startTime, len(_delID))
return
}

View File

@ -28,9 +28,9 @@ func (this *DBService_Comp) run() {
for {
select {
case v := <-this.task:
this.module.db_comp.Model_UpdateUserDataByUid(v)
this.module.db_comp.Model_UpdateDBByLog(v)
case <-time.After(time.Second * 2):
this.module.db_comp.Model_UpdateDBByLog()
this.module.db_comp.Model_UpdateDBByLog("")
}
}
}

View File

@ -6,33 +6,39 @@ import (
"go_dreamfactory/lego/core/cbase"
"go_dreamfactory/lego/sys/log"
"go_dreamfactory/lego/sys/mgo"
"go_dreamfactory/lego/sys/redis"
"go_dreamfactory/sys/cache"
"go_dreamfactory/sys/db"
"go.mongodb.org/mongo-driver/bson/primitive"
)
const (
DB_ModelTable core.SqlTable = "model"
)
/*
基础组件 存储组件 读写缓存数据
基础组件 缓存组件 读写缓存数据
DB组件也封装进来
*/
type MComp_DBComp struct {
type Model_Comp struct {
cbase.ModuleCompBase
DB mgo.ISys
Redis redis.ISys
DB mgo.ISys
}
const ()
//组件初始化接口
func (this *MComp_DBComp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
func (this *Model_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
this.ModuleCompBase.Init(service, module, comp, options)
this.Redis = cache.Redis()
this.DB = db.Mgo()
return
}
func (this *MComp_DBComp) Start() (err error) {
func (this *Model_Comp) Start() (err error) {
err = this.ModuleCompBase.Start()
return
}
func (this *MComp_DBComp) InsertModelLogs(table string, uID string, target interface{}) (err error) {
func (this *Model_Comp) InsertModelLogs(table string, uID string, target interface{}) (err error) {
data := &comm.Autogenerated{
ID: primitive.NewObjectID().Hex(),
@ -42,14 +48,14 @@ func (this *MComp_DBComp) InsertModelLogs(table string, uID string, target inter
data.D = append(data.D, table) // D[0]
data.D = append(data.D, target) // D[1]
_, err = this.DB.InsertOne("model", data)
_, err = this.DB.InsertOne(DB_ModelTable, data)
if err != nil {
log.Errorf("insert model db err %v", err)
}
return err
}
func (this *MComp_DBComp) DeleteModelLogs(table string, uID string, where interface{}) (err error) {
func (this *Model_Comp) DeleteModelLogs(table string, uID string, where interface{}) (err error) {
data := &comm.Autogenerated{
ID: primitive.NewObjectID().Hex(),
@ -60,7 +66,7 @@ func (this *MComp_DBComp) DeleteModelLogs(table string, uID string, where interf
data.D = append(data.D, table) // D[0]
data.D = append(data.D, where) // D[1]
_, err = this.DB.InsertOne("model", data)
_, err = this.DB.InsertOne(DB_ModelTable, data)
if err != nil {
log.Errorf("insert model db err %v", err)
}
@ -68,7 +74,7 @@ func (this *MComp_DBComp) DeleteModelLogs(table string, uID string, where interf
return err
}
func (this *MComp_DBComp) UpdateModelLogs(table string, uID string, where interface{}, target interface{}) (err error) {
func (this *Model_Comp) UpdateModelLogs(table string, uID string, where interface{}, target interface{}) (err error) {
data := &comm.Autogenerated{
ID: primitive.NewObjectID().Hex(),
@ -79,7 +85,7 @@ func (this *MComp_DBComp) UpdateModelLogs(table string, uID string, where interf
data.D = append(data.D, where) // D[1]
data.D = append(data.D, target) // D[2]
_, err = this.DB.InsertOne("model", data)
_, err = this.DB.InsertOne(DB_ModelTable, data)
if err != nil {
log.Errorf("insert model db err %v", err)
}

View File

@ -14,7 +14,7 @@ import (
///背包缓存数据管理组件
type Cache_Comp struct {
modules.MComp_CacheComp
modules.Model_Comp
module *Pack
}

View File

@ -12,7 +12,7 @@ import (
///背包数据库数据管理组件
type DB_Comp struct {
modules.MComp_DBComp
modules.Model_Comp
}
///查询用户背包数据

View File

@ -42,7 +42,7 @@ func (this *Pack) OnInstallComp() {
this.ModuleBase.OnInstallComp()
this.api_comp = this.RegisterComp(new(Api_Comp)).(*Api_Comp)
this.cache_comp = this.RegisterComp(new(Cache_Comp)).(*Cache_Comp)
this.db_comp = this.RegisterComp(new(DB_Comp)).(*DB_Comp)
//this.db_comp = this.RegisterComp(new(DB_Comp)).(*DB_Comp)
this.configure_comp = this.RegisterComp(new(Configure_Comp)).(*Configure_Comp)
}