196 lines
5.5 KiB
Go
196 lines
5.5 KiB
Go
package dbservice
|
|
|
|
import (
|
|
"context"
|
|
"go_dreamfactory/comm"
|
|
"go_dreamfactory/lego/core"
|
|
"go_dreamfactory/lego/sys/log"
|
|
"go_dreamfactory/modules"
|
|
"time"
|
|
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
)
|
|
|
|
type DB_Comp struct {
|
|
modules.MCompModel
|
|
task chan string
|
|
isInit bool
|
|
}
|
|
|
|
func (this *DB_Comp) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) {
|
|
this.MCompModel.Init(service, module, comp, options)
|
|
this.task = make(chan string, TaskMaxNum)
|
|
|
|
return
|
|
}
|
|
|
|
func (this *DB_Comp) Start() (err error) {
|
|
err = this.MCompModel.Start()
|
|
model_count := this.Model_TotalCount()
|
|
if model_count > 0 { //1000
|
|
this.Redis.Set(comm.DBServiceStatus, true, -1)
|
|
this.isInit = false
|
|
} else {
|
|
this.isInit = true
|
|
}
|
|
go this.run()
|
|
return
|
|
}
|
|
|
|
func (this *DB_Comp) run() {
|
|
for {
|
|
select {
|
|
case v := <-this.task:
|
|
this.Model_UpdateDBByLog(v)
|
|
case <-time.After(time.Second * 2):
|
|
this.Model_UpdateDBByLog("")
|
|
}
|
|
if !this.isInit && this.Model_TotalCount() <= 0 {
|
|
this.Redis.Delete(comm.DBServiceStatus)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (this *DB_Comp) PushUserTask(uid string) {
|
|
this.task <- uid
|
|
}
|
|
|
|
func (this *DB_Comp) Model_UpdateDBByLog(uid string) (err error) {
|
|
var _data *mongo.Cursor
|
|
if uid == "" {
|
|
_data, err = this.DB.Find(DB_ModelTable, bson.M{}, options.Find().SetLimit(int64(WriteMaxNum)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
_data, err = this.DB.Find(DB_ModelTable, bson.M{"uid": uid}, options.Find())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
_delID := make([]string, 0) // 处理完成要删除的id
|
|
for _data.Next(context.TODO()) { // 处理删除逻辑
|
|
|
|
data := &comm.Autogenerated{}
|
|
if err = _data.Decode(data); err != nil {
|
|
log.Errorf("Decode Data err : %v", err)
|
|
continue
|
|
}
|
|
|
|
if data.Act == string(comm.LogHandleType_Insert) {
|
|
if len(data.D) < 2 { // 参数校验
|
|
log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D))
|
|
continue
|
|
}
|
|
_obj := make(bson.A, len(data.D[1].(bson.A)))
|
|
for i, v := range data.D[1].(bson.A) {
|
|
_obj[i] = v
|
|
}
|
|
_key := data.D[0].(string)
|
|
|
|
_, err := this.DB.InsertMany(core.SqlTable(_key), _obj)
|
|
if err != nil {
|
|
log.Errorf("insert %s db err:%v", (core.SqlTable(_key)), err)
|
|
ErrorLogCount[data.ID]++
|
|
if ErrorLogCount[data.ID] >= ErrorMaxNum { // 实在是写失败了那就删除吧
|
|
log.Errorf("insert db err max num %s db err:%v", data.ID, err)
|
|
_, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID})
|
|
if err != nil {
|
|
log.Errorf("insert %s db err:%+v", data.ID, err)
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
// _, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID})
|
|
// if err != nil {
|
|
// log.Errorf("delete %s db err:%v", data.ID, err)
|
|
// }
|
|
} else if data.Act == string(comm.LogHandleType_Delete) {
|
|
if len(data.D) < 2 { // 参数校验
|
|
log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D))
|
|
continue
|
|
}
|
|
_key := data.D[0].(string)
|
|
_objKey := make([]string, 0)
|
|
for _, v := range data.D[1].(bson.D) {
|
|
_objKey = append(_objKey, v.Value.(string))
|
|
}
|
|
_, err = this.DB.DeleteMany(core.SqlTable(_key), bson.M{"_id": bson.M{"$in": _objKey}}, options.Delete())
|
|
if err != nil {
|
|
log.Errorf("delete %s db err:%v", core.SqlTable(_key), err)
|
|
ErrorLogCount[data.ID]++
|
|
if ErrorLogCount[data.ID] >= ErrorMaxNum { // 实在是写失败了那就删除吧
|
|
log.Errorf("del db err max num %s db err:%v", data.ID, err)
|
|
_, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID})
|
|
if err != nil {
|
|
log.Errorf("insert %s db err:%+v", data.ID, err)
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
} else { // update
|
|
if len(data.D) < 3 { // 参数校验
|
|
log.Errorf("parameter len _id : %s,uid : %s d.len:%v", data.ID, data.UID, len(data.D))
|
|
continue
|
|
}
|
|
|
|
_tableName := data.D[0].(string) //表名
|
|
Where := bson.M{} //data.D[1].(bson.M)
|
|
Query := bson.M{} //data.D[2].(bson.M)
|
|
|
|
for _, v := range data.D[1].(bson.D) {
|
|
Where[v.Key] = v.Value
|
|
}
|
|
for _, v := range data.D[2].(bson.D) {
|
|
Query[v.Key] = v.Value
|
|
}
|
|
_, err = this.DB.UpdateMany(core.SqlTable(_tableName), Where, bson.M{"$set": Query})
|
|
if err != nil {
|
|
log.Errorf("Update %s db err:%v", core.SqlTable(_tableName), err)
|
|
ErrorLogCount[data.ID]++
|
|
if ErrorLogCount[data.ID] >= ErrorMaxNum { // 超过一定次数写失败了那就删除吧
|
|
log.Errorf("update db err max num %s db err:%v", data.ID, err)
|
|
_, err = this.DB.DeleteOne(DB_ModelTable, bson.M{"_id": data.ID})
|
|
if err != nil {
|
|
log.Errorf("insert %s db err:%+v", data.ID, err)
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
_delID = append(_delID, data.ID) // 都操作都成功了记录要删除的key
|
|
}
|
|
|
|
if len(_delID) > 0 {
|
|
_, err = this.DB.DeleteMany(DB_ModelTable, bson.M{"_id": bson.M{"$in": _delID}}, options.Delete()) // 批量删除已处理的数据
|
|
if err != nil {
|
|
log.Errorf("del err %v", err)
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// 写入日志数据
|
|
func (this *DB_Comp) Model_InsertDBByLog(data *comm.Autogenerated) (err error) {
|
|
|
|
_, err = this.DB.InsertOne(DB_ModelTable, data)
|
|
if err != nil {
|
|
log.Errorf("insert model db err %v", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// 查询 当前日志列表还有没有处理完条数
|
|
func (this *DB_Comp) Model_TotalCount() int {
|
|
|
|
_data, err := this.DB.Find("DB_ModelTable", bson.M{})
|
|
if err == nil {
|
|
return _data.RemainingBatchLength()
|
|
}
|
|
return 0
|
|
}
|