package modules import ( "context" "fmt" "go_dreamfactory/comm" "go_dreamfactory/lego/core" "go_dreamfactory/lego/core/cbase" "go_dreamfactory/lego/sys/codec" ccore "go_dreamfactory/lego/sys/codec/core" "go_dreamfactory/lego/sys/log" "go_dreamfactory/lego/sys/mgo" "go_dreamfactory/lego/sys/redis" "go_dreamfactory/pb" "go_dreamfactory/sys/cache" "go_dreamfactory/sys/db" "reflect" "time" "unsafe" "github.com/modern-go/reflect2" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" ) //Redis 自定义脚本 批量读取列表数据 var LusScriptgetList = ` local key = tostring(KEYS[1]) local keys = redis.call("HGETALL", key) local data = {} local n = 1 for i, v in ipairs(keys) do if i%2 == 0 then data[n] = redis.call("HGETALL", v) n = n+1 end end return data ` //Redis 自定义脚本 批量写入列表数据 var LusScriptsetList = ` local n = 1 for i, v in ipairs(KEYS) do local key = v local argv = {} for i=n,#ARGV,1 do n = n+1 if ARGV[i] == "#end" then redis.call("HMSet", key,unpack(argv)) break else table.insert(argv, ARGV[i]) end end end return "OK" ` //Redis 自定义脚本 批量读取队列数据 var LusScriptgetQueue = ` local key = tostring(KEYS[1]) local keys = redis.call("LRANGE", key,0,-1) local data = {} for i, v in ipairs(keys) do data[i] = redis.call("HGETALL", v) end return data ` //Redis 自定义脚本 批量写入队列数据 var LusScriptsetQueue = ` local count = tonumber(ARGV[1]) local k = tostring(ARGV[3]) local keys = {} local out = {} local n = 1 for i, v in ipairs(KEYS) do if (i == 1) then for i=n,#ARGV,1 do n = n+1 if ARGV[i] == "#end" then break end end elseif (i == 2) then for i=n,#ARGV,1 do n = n+1 if ARGV[i] == "#end" then break end end else local key = v local argv = {} table.insert(keys, key) for i=n,#ARGV,1 do n = n+1 if ARGV[i] == "#end" then redis.call("HMSet", key,unpack(argv)) break else table.insert(argv, ARGV[i]) end end end end redis.call("RPush", k,unpack(keys)) local c = tonumber(redis.call("LLEN", k)) if (c > count) then local off = c-count out = redis.call("LRANGE", k,0,off-1) redis.call("LTRIM", k,off,-1) for i, v in ipairs(out) do redis.call("DEL", v) end end return out ` /* 基础组件 缓存组件 读写缓存数据 DB组件也封装进来 */ type MCompModel struct { cbase.ModuleCompBase Redis redis.ISys DB mgo.ISys TableName string //redis key前缀 getListSha1 string //getList LusScript 的shal值 setListSha1 string //getList LusScript 的shal值 getQueueSha1 string //getList LusScript 的shal值 setQueueSha1 string //getList LusScript 的shal值 } const ( DB_ModelTable core.SqlTable = "model_log" ) //组件初始化接口 func (this *MCompModel) Init(service core.IService, module core.IModule, comp core.IModuleComp, options core.IModuleOptions) (err error) { this.ModuleCompBase.Init(service, module, comp, options) this.Redis = cache.Redis() this.DB = db.Mgo() return } func (this *MCompModel) Start() (err error) { err = this.ModuleCompBase.Start() if this.getListSha1, err = this.Redis.NewScript(LusScriptgetList).Result(); err != nil { return } if this.setListSha1, err = this.Redis.NewScript(LusScriptsetList).Result(); err != nil { return } if this.getQueueSha1, err = this.Redis.NewScript(LusScriptgetQueue).Result(); err != nil { return } if this.setQueueSha1, err = this.Redis.NewScript(LusScriptsetQueue).Result(); err != nil { return } return } func (this *MCompModel) setTableName(suffix string) { this.TableName = fmt.Sprintf("%s_%s", this.TableName, suffix) } func (this *MCompModel) ukey(uid string) string { return fmt.Sprintf("%s:%s", this.TableName, uid) } func (this *MCompModel) ukeylist(uid string, id string) string { return fmt.Sprintf("%s:%s-%s", this.TableName, uid, id) } func (this *MCompModel) InsertModelLogs(table string, uID string, target interface{}) (err error) { data := &comm.Autogenerated{ ID: primitive.NewObjectID().Hex(), UID: uID, Act: string(comm.LogHandleType_Insert), } data.D = append(data.D, table) // D[0] data.D = append(data.D, target) // D[1] _, err = this.DB.InsertOne(DB_ModelTable, data) if err != nil { log.Errorf("insert model db err %v", err) } return err } func (this *MCompModel) DeleteModelLogs(table string, uID string, where interface{}) (err error) { data := &comm.Autogenerated{ ID: primitive.NewObjectID().Hex(), UID: uID, Act: string(comm.LogHandleType_Delete), } data.D = append(data.D, table) // D[0] data.D = append(data.D, where) // D[1] _, err = this.DB.InsertOne(DB_ModelTable, data) if err != nil { log.Errorf("insert model db err %v", err) } return err } func (this *MCompModel) UpdateModelLogs(table string, uID string, where bson.M, target interface{}) (err error) { data := &comm.Autogenerated{ ID: primitive.NewObjectID().Hex(), UID: uID, Act: string(comm.LogHandleType_Update), } data.D = append(data.D, table) // D[0] data.D = append(data.D, where) // D[1] data.D = append(data.D, target) // D[2] _, err = this.DB.InsertOne(DB_ModelTable, data) if err != nil { log.Errorf("insert model db err %v", err) } return err } //添加新的数据 func (this *MCompModel) Add(uid string, data interface{}, attrs ...*cache.OperationAttr) (err error) { if err = this.Redis.HMSet(this.ukey(uid), data); err != nil { return } if ret := cache.OperationAttrs(attrs).Find(cache.ATTR_EXPIRE).Unwrap_Or(nil); ret != nil { err = this.Redis.Expire(this.ukey(uid), ret.(time.Duration)) } if ret := cache.OperationAttrs(attrs).Find(cache.ATTR_MGOLOG).Unwrap_Or(nil); ret == nil { err = this.InsertModelLogs(this.TableName, uid, []interface{}{data}) } return } //添加新的数据到列表 func (this *MCompModel) AddList(uid string, id string, data interface{}, attrs ...*cache.OperationAttr) (err error) { key := this.ukeylist(uid, id) if err = this.Redis.HMSet(key, data); err != nil { return } if err = this.Redis.HSet(this.ukey(uid), id, key); err != nil { return } if ret := cache.OperationAttrs(attrs).Find(cache.ATTR_EXPIRE).Unwrap_Or(nil); ret != nil { err = this.Redis.Expire(this.ukey(uid), ret.(time.Duration)) } if ret := cache.OperationAttrs(attrs).Find(cache.ATTR_MGOLOG).Unwrap_Or(nil); ret == nil { err = this.InsertModelLogs(this.TableName, uid, []interface{}{data}) } return } //添加新的多个数据到列表 data map[string]type func (this *MCompModel) AddLists(uid string, data interface{}, attrs ...*cache.OperationAttr) (err error) { vof := reflect.ValueOf(data) if !vof.IsValid() { return fmt.Errorf("Model_Comp: AddLists(nil)") } if vof.Kind() != reflect.Map { return fmt.Errorf("Model_Comp: AddLists(non-pointer %T)", data) } listskeys := make(map[string]string) keys := vof.MapKeys() lists := make([]interface{}, len(keys)) for i, k := range keys { value := vof.MapIndex(k) keydata := k.Interface().(string) valuedata := value.Interface() key := this.ukeylist(uid, keydata) if err = this.Redis.HMSet(key, valuedata); err != nil { return } listskeys[keydata] = key lists[i] = valuedata } if err = this.Redis.HMSetForMap(this.ukey(uid), listskeys); err != nil { return } if ret := cache.OperationAttrs(attrs).Find(cache.ATTR_EXPIRE).Unwrap_Or(nil); ret != nil { this.Redis.Expire(this.ukey(uid), ret.(time.Duration)) for _, v := range listskeys { this.Redis.Expire(v, ret.(time.Duration)) } } if ret := cache.OperationAttrs(attrs).Find(cache.ATTR_MGOLOG).Unwrap_Or(nil); ret == nil { err = this.InsertModelLogs(this.TableName, uid, lists) } return } //修改数据多个字段 uid 作为主键 func (this *MCompModel) Change(uid string, data map[string]interface{}, attrs ...*cache.OperationAttr) (err error) { if err = this.Redis.HMSet(this.ukey(uid), data); err != nil { return } if ret := cache.OperationAttrs(attrs).Find(cache.ATTR_EXPIRE).Unwrap_Or(nil); ret != nil { this.Redis.Expire(this.ukey(uid), ret.(time.Duration)) } if ret := cache.OperationAttrs(attrs).Find(cache.ATTR_MGOLOG).Unwrap_Or(nil); ret == nil { err = this.UpdateModelLogs(this.TableName, uid, bson.M{"uid": uid}, data) } return nil } //修改数据多个字段 uid 作为主键 func (this *MCompModel) ChangeList(uid string, _id string, data map[string]interface{}, attrs ...*cache.OperationAttr) (err error) { if err = this.Redis.HMSet(this.ukeylist(uid, _id), data); err != nil { return } if ret := cache.OperationAttrs(attrs).Find(cache.ATTR_EXPIRE).Unwrap_Or(nil); ret != nil { this.Redis.Expire(this.ukey(uid), ret.(time.Duration)) } if ret := cache.OperationAttrs(attrs).Find(cache.ATTR_MGOLOG).Unwrap_Or(nil); ret == nil { err = this.UpdateModelLogs(this.TableName, uid, bson.M{"_id": _id, "uid": uid}, data) } return nil } //读取全部数据 func (this *MCompModel) Get(uid string, data interface{}) (err error) { if err = this.Redis.HGetAll(this.ukey(uid), data); err != nil && err != redis.RedisNil { return } if err == redis.RedisNil { if err = this.DB.FindOne(core.SqlTable(this.TableName), bson.M{"uid": uid}).Decode(data); err != nil { return } err = this.Redis.HMSet(this.ukey(uid), data) } return } //获取列表数据 注意 data 必须是 切片的指针 *[]type func (this *MCompModel) GetList(uid string, data interface{}) (err error) { var ( dtype reflect2.Type dkind reflect.Kind sType reflect2.Type sliceType *reflect2.UnsafeSliceType sliceelemType reflect2.Type decoder ccore.IDecoderMapJson encoder ccore.IEncoderMapJson dptr unsafe.Pointer elemPtr unsafe.Pointer n int ok bool keys map[string]string cdata []map[string]string wdata map[string]map[string]string tempdata map[string]string c *mongo.Cursor ) keys = make(map[string]string) dptr = reflect2.PtrOf(data) dtype = reflect2.TypeOf(data) dkind = dtype.Kind() if dkind != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(non-pointer %T)", data) return } sType = dtype.(*reflect2.UnsafePtrType).Elem() if sType.Kind() != reflect.Slice { err = fmt.Errorf("MCompModel: GetList(data no slice %T)", data) return } sliceType = sType.(*reflect2.UnsafeSliceType) sliceelemType = sliceType.Elem() if sliceelemType.Kind() != reflect.Ptr { err = fmt.Errorf("MCompModel: GetList(sliceelemType non-pointer %T)", data) return } if decoder, ok = codec.DecoderOf(sliceelemType).(ccore.IDecoderMapJson); !ok { err = fmt.Errorf("MCompModel: GetList(data not support MarshalMapJson %T)", data) return } sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem() if cdata, err = this.Batchgetlists(this.ukey(uid)); err == nil { for _, v := range cdata { sliceType.UnsafeGrow(dptr, n+1) elemPtr = sliceType.UnsafeGetIndex(dptr, n) if *((*unsafe.Pointer)(elemPtr)) == nil { newPtr := sliceelemType.UnsafeNew() if err = decoder.DecodeForMapJson(newPtr, v); err != nil { log.Errorf("err:%v", err) return } *((*unsafe.Pointer)(elemPtr)) = newPtr } else { decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), v) } n++ } } if err == redis.RedisNil { //query from mgo if c, err = this.DB.Find(core.SqlTable(this.TableName), bson.M{"uid": uid}); err != nil { return err } else { if encoder, ok = codec.EncoderOf(sliceelemType).(ccore.IEncoderMapJson); !ok { err = fmt.Errorf("MCompModel: GetList(data not support UnMarshalMapJson %T)", data) return } n = 0 wdata = make(map[string]map[string]string) for c.Next(context.Background()) { _id := c.Current.Lookup("_id").StringValue() sliceType.UnsafeGrow(dptr, n+1) elemPtr = sliceType.UnsafeGetIndex(dptr, n) if *((*unsafe.Pointer)(elemPtr)) == nil { newPtr := sliceelemType.UnsafeNew() *((*unsafe.Pointer)(elemPtr)) = newPtr } elem := sliceType.GetIndex(data, n) if err = c.Decode(elem); err != nil { return } if tempdata, err = encoder.EncodeToMapJson(*((*unsafe.Pointer)(elemPtr))); err != nil { return } key := this.ukeylist(uid, _id) wdata[key] = tempdata keys[_id] = key n++ } if len(wdata) > 0 { wdata[this.ukey(uid)] = keys err = this.Batchsetlists(wdata) } } } return err } //读取单个数据中 多个字段数据 func (this *MCompModel) GetFields(uid string, data interface{}, fields ...string) (err error) { this.Redis.HMGet(this.ukey(uid), data, fields...) return } //读取List列表中单个数据中 多个字段数据 func (this *MCompModel) GetListFields(uid string, id string, data interface{}, fields ...string) (err error) { this.Redis.HMGet(this.ukeylist(uid, id), data, fields...) return } //读取列表数据中单个数据 func (this *MCompModel) GetListObj(uid string, id string, data interface{}) (err error) { err = this.Redis.HGetAll(this.ukeylist(uid, id), data) return } //删除用户数据 func (this *MCompModel) Del(uid string) (err error) { err = this.Redis.Delete(this.ukey(uid)) if err != nil { return err } err = this.DeleteModelLogs(this.TableName, uid, bson.M{"uid": uid}) return nil } //删除多条数据 func (this *MCompModel) DelListlds(uid string, ids ...string) (err error) { listkey := this.ukey(uid) for _, v := range ids { key := this.ukeylist(uid, v) if err = this.Redis.Delete(key); err != nil { return } } if err = this.Redis.HDel(listkey, ids...); err == nil { err = this.DeleteModelLogs(this.TableName, uid, bson.M{"_id": bson.M{"$in": ids}}) } return } //获取用户通过扩展表 func (this *MCompModel) GetUserExpand(uid string) (result *pb.DBUserExpand, err error) { result = &pb.DBUserExpand{} key := fmt.Sprintf("userexpand:%s", uid) if err = this.Redis.HGetAll(key, result); err != nil && err != redis.RedisNil { return } if err == redis.RedisNil { if err = this.DB.FindOne(core.SqlTable("userexpand"), bson.M{"uid": uid}).Decode(result); err != nil { return } err = this.Redis.HMSet(key, result) } return } //修改用户扩展数据 func (this *MCompModel) ChanageUserExpand(uid string, value map[string]interface{}) (err error) { key := fmt.Sprintf("userexpand:%s", uid) if err = this.Redis.HMSet(key, value); err != nil && err != redis.RedisNil { return } err = this.UpdateModelLogs("userexpand", uid, bson.M{"uid": uid}, value) return } //批量读取列表数据 func (this *MCompModel) Batchgetlists(key string) (result []map[string]string, err error) { var data interface{} ret := this.Redis.EvalSha(this.Redis.Context(), this.getListSha1, []string{key}) if data, err = ret.Result(); err != nil { fmt.Printf("Execute batchgetlists err: %v", err.Error()) } else { temp1 := data.([]interface{}) result = make([]map[string]string, len(temp1)) for i, v := range temp1 { temp2 := v.([]interface{}) result[i] = make(map[string]string) for n := 0; n < len(temp2); n += 2 { result[i][temp2[n].(string)] = temp2[n+1].(string) } } if len(result) == 0 { err = redis.RedisNil return } } return } //批量写入数据 func (this *MCompModel) Batchsetlists(data map[string]map[string]string) (err error) { var ( n int keys []string values []interface{} ) keys = make([]string, len(data)) values = make([]interface{}, 0) for k, v := range data { keys[n] = k for k1, v1 := range v { values = append(values, k1, v1) } values = append(values, "#end") n++ } ret := this.Redis.EvalSha(this.Redis.Context(), this.setListSha1, keys, values...) if _, err := ret.Result(); err != nil { fmt.Printf("Execute batchsetlists err: %v", err.Error()) } return } //批量读取队列数据 func (this *MCompModel) Batchgetqueues(key string) (result []map[string]string, err error) { var data interface{} ret := this.Redis.EvalSha(this.Redis.Context(), this.getQueueSha1, []string{key}) if data, err = ret.Result(); err != nil { fmt.Printf("Execute batchgetqueues err: %v", err.Error()) } else { temp1 := data.([]interface{}) result = make([]map[string]string, len(temp1)) for i, v := range temp1 { temp2 := v.([]interface{}) result[i] = make(map[string]string) for n := 0; n < len(temp2); n += 2 { result[i][temp2[n].(string)] = temp2[n+1].(string) } } if len(result) == 0 { err = redis.RedisNil return } } return } //批量写入队列 并返回移除队列 func (this *MCompModel) Batchsetqueues(key string, count int32, ks []string, vs []map[string]string) (outkey []string, err error) { var ( n int keys []string values []interface{} result interface{} ) keys = make([]string, len(ks)+2) values = make([]interface{}, 0) keys[0] = "count" values = append(values, count) values = append(values, "#end") keys[1] = "key" values = append(values, key) values = append(values, "#end") n = 2 for i, v := range ks { keys[n] = v for k1, v1 := range vs[i] { values = append(values, k1, v1) } values = append(values, "#end") n++ } ret := this.Redis.EvalSha(this.Redis.Context(), this.setQueueSha1, keys, values...) if result, err = ret.Result(); err != nil { fmt.Printf("Execute batchsetqueues err: %v", err.Error()) } else { outkey = make([]string, len(result.([]interface{}))) for i, v := range result.([]interface{}) { outkey[i] = v.(string) } } return } //日志操作可选项 func (this *MCompModel) logOpt(uid string, data interface{}, attrs ...*cache.OperationAttr) error { ret := cache.OperationAttrs(attrs).Find(cache.ATTR_MGOLOG).Unwrap_Or(nil) if ret == nil { //启用mgolog ir := cache.OperationAttrs(attrs).Find(cache.ATTR_INSERT).Unwrap_Or(nil) if ir == nil { //updte opt where := bson.M{"uid": uid} if reflect.ValueOf(data).Kind() == reflect.Map { if m, ok := data.(map[string]interface{}); ok { where["_id"] = m["_id"] } else { return fmt.Errorf("have %v,but want map[string]interface{}", data) } } return this.UpdateModelLogs(this.TableName, uid, where, data) } else { //insert opt return this.InsertModelLogs(this.TableName, uid, data) } } return nil } //获取用户通过扩展表 func (this *MCompModel) GetUserRecord(uid string) (result *pb.DBUserRecord, err error) { result = &pb.DBUserRecord{} key := fmt.Sprintf("userrecord:%s", uid) if err = this.Redis.HGetAll(key, result); err != nil && err != redis.RedisNil { return } if err == redis.RedisNil { if err = this.DB.FindOne(core.SqlTable("userrecord"), bson.M{"uid": uid}).Decode(result); err != nil { return } err = this.Redis.HMSet(key, result) } return } //修改用户扩展数据 func (this *MCompModel) ChangeUserRecord(uid string, value map[string]interface{}) (err error) { value["mtime"] = time.Now().Unix() // 更新时间 key := fmt.Sprintf("userrecord:%s", uid) if err = this.Redis.HMSet(key, value); err != nil && err != redis.RedisNil { return } err = this.UpdateModelLogs("userrecord", uid, bson.M{"uid": uid}, value) return }