优化redis 批量读写

This commit is contained in:
liwei1dao 2022-07-15 16:03:03 +08:00
parent 653ddb0b8c
commit a2273fa1d1
8 changed files with 182 additions and 48 deletions

View File

@ -1,6 +1,8 @@
package cluster package cluster
import ( import (
"go_dreamfactory/lego/sys/redis/core"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
) )
@ -178,8 +180,12 @@ Redis Hset 命令用于为哈希表中的字段赋值
*/ */
func (this *Redis) HSet(key string, field string, value interface{}) (err error) { func (this *Redis) HSet(key string, field string, value interface{}) (err error) {
var resultvalue []byte var resultvalue []byte
if resultvalue, err = this.codec.Marshal(value); err == nil { if !core.IsBaseType(value) {
err = this.client.Do(this.client.Context(), "HSET", key, field, resultvalue).Err() if resultvalue, err = this.codec.Marshal(value); err == nil {
err = this.client.Do(this.client.Context(), "HSET", key, field, resultvalue).Err()
}
} else {
err = this.client.Do(this.client.Context(), "HSET", key, field, value).Err()
} }
return return
} }

View File

@ -1,5 +1,10 @@
package core package core
import (
"encoding"
"time"
)
type ( type (
ICodec interface { ICodec interface {
Marshal(v interface{}) ([]byte, error) Marshal(v interface{}) ([]byte, error)
@ -10,3 +15,12 @@ type (
UnmarshalSlice(data []string, val interface{}) (err error) UnmarshalSlice(data []string, val interface{}) (err error)
} }
) )
func IsBaseType(v interface{}) bool {
switch v.(type) {
case nil, string, []byte, int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool, time.Time, time.Duration, encoding.BinaryMarshaler:
return true
default:
return false
}
}

View File

@ -370,10 +370,6 @@ func (this *Redis) ScriptExists(ctx context.Context, hashes ...string) *redis.Bo
return this.client.ScriptExists(ctx, hashes...) return this.client.ScriptExists(ctx, hashes...)
} }
// func (this *Redis) ScriptLoad(ctx context.Context, script string) *redis.StringCmd {
// return this.client.ScriptLoad(ctx, script)
// }
//Codec--------------------------------------------------------------------------------------------------------------------------------------- //Codec---------------------------------------------------------------------------------------------------------------------------------------
func (this *Redis) Marshal(v interface{}) ([]byte, error) { func (this *Redis) Marshal(v interface{}) ([]byte, error) {
if this.options.Codec != nil { if this.options.Codec != nil {

View File

@ -1,6 +1,8 @@
package single package single
import ( import (
"go_dreamfactory/lego/sys/redis/core"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
) )
@ -168,8 +170,12 @@ Redis Hset 命令用于为哈希表中的字段赋值
*/ */
func (this *Redis) HSet(key string, field string, value interface{}) (err error) { func (this *Redis) HSet(key string, field string, value interface{}) (err error) {
var resultvalue []byte var resultvalue []byte
if resultvalue, err = this.codec.Marshal(value); err == nil { if !core.IsBaseType(value) {
err = this.client.Do(this.client.Context(), "HSET", key, field, resultvalue).Err() if resultvalue, err = this.codec.Marshal(value); err == nil {
err = this.client.Do(this.client.Context(), "HSET", key, field, resultvalue).Err()
}
} else {
err = this.client.Do(this.client.Context(), "HSET", key, field, value).Err()
} }
return return
} }

View File

@ -122,7 +122,7 @@ func Test_Redis_Encoder_Hash(t *testing.T) {
} }
//测试redis lua 脚本 //测试redis lua 脚本
func Test_Redis_Lua(t *testing.T) { func Test_Redis_Lua_HGETALL(t *testing.T) {
script := redis.NewScript(` script := redis.NewScript(`
local key = tostring(KEYS[1]) local key = tostring(KEYS[1])
local keys = redis.call("HGETALL", key) local keys = redis.call("HGETALL", key)
@ -144,6 +144,47 @@ func Test_Redis_Lua(t *testing.T) {
if result, err := ret.Result(); err != nil { if result, err := ret.Result(); err != nil {
fmt.Printf("Execute Redis err: %v", err.Error()) fmt.Printf("Execute Redis err: %v", err.Error())
} else { } else {
fmt.Printf("userid: %v", result) temp1 := result.([]interface{})
data := make([]map[string]string, len(temp1))
for i, v := range temp1 {
temp2 := v.([]interface{})
data[i] = make(map[string]string)
for n := 0; n < len(temp2); n += 2 {
data[i][temp2[n].(string)] = temp2[n+1].(string)
}
}
fmt.Printf("data: %v", data)
}
}
//测试redis lua 脚本
func Test_Redis_Lua_HSETALL(t *testing.T) {
script := redis.NewScript(`
local n = 1
local key = ""
for i, v in ipairs(KEYS) do
key = v
local argv = {}
for i=n,#ARGV,1 do
n = n+1
if ARGV[i] == "#end" then
redis.call("HMSet", key,unpack(argv))
break
else
table.insert(argv, ARGV[i])
end
end
end
return "OK"
`)
sha, err := script.Result()
if err != nil {
fmt.Println(err)
}
ret := redis.EvalSha(redis.Context(), sha, []string{"test_HMSet", "test_HMSet_1"}, "a", "1", "b", "2", "#end", "a1", "11", "b", "21", "#end")
if result, err := ret.Result(); err != nil {
fmt.Printf("Execute Redis err: %v", err.Error())
} else {
fmt.Printf("data: %v", result)
} }
} }

View File

@ -24,15 +24,51 @@ import (
"go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo"
) )
//Redis 自定义脚本 批量读取列表数据
var LusScriptgetList = `
local key = tostring(KEYS[1])
local keys = redis.call("HGETALL", key)
local data = {}
local n = 1
for i, v in ipairs(keys) do
if i%2 == 0 then
data[n] = redis.call("HGETALL", v)
n = n+1
end
end
return data
`
//Redis 自定义脚本 批量写入列表数据
var LusScriptsetList = `
local n = 1
for i, v in ipairs(KEYS) do
local key = v
local argv = {}
for i=n,#ARGV,1 do
n = n+1
if ARGV[i] == "#end" then
redis.call("HMSet", key,unpack(argv))
break
else
table.insert(argv, ARGV[i])
end
end
end
return "OK"
`
/* /*
基础组件 缓存组件 读写缓存数据 基础组件 缓存组件 读写缓存数据
DB组件也封装进来 DB组件也封装进来
*/ */
type MCompModel struct { type MCompModel struct {
cbase.ModuleCompBase cbase.ModuleCompBase
Redis redis.ISys Redis redis.ISys
DB mgo.ISys DB mgo.ISys
TableName string //redis key前缀 TableName string //redis key前缀
getListSha1 string //getList LusScript 的shal值
setListSha1 string //getList LusScript 的shal值
} }
const ( const (
@ -48,6 +84,12 @@ func (this *MCompModel) Init(service core.IService, module core.IModule, comp co
} }
func (this *MCompModel) Start() (err error) { func (this *MCompModel) Start() (err error) {
err = this.ModuleCompBase.Start() err = this.ModuleCompBase.Start()
if this.getListSha1, err = this.Redis.NewScript(LusScriptgetList).Result(); err != nil {
return
}
if this.setListSha1, err = this.Redis.NewScript(LusScriptsetList).Result(); err != nil {
return
}
return return
} }
@ -170,7 +212,7 @@ func (this *MCompModel) AddLists(uid string, data interface{}, attrs ...*cache.O
lists[i] = valuedata lists[i] = valuedata
} }
if err = this.Redis.HMSet(this.ukey(uid), listskeys); err != nil { if err = this.Redis.HMSetForMap(this.ukey(uid), listskeys); err != nil {
return return
} }
if ret := cache.OperationAttrs(attrs).Find(cache.ATTR_EXPIRE).Unwrap_Or(nil); ret != nil { if ret := cache.OperationAttrs(attrs).Find(cache.ATTR_EXPIRE).Unwrap_Or(nil); ret != nil {
@ -334,7 +376,9 @@ func (this *MCompModel) GetList(uid string, data interface{}) (err error) {
n int n int
ok bool ok bool
keys map[string]string keys map[string]string
cdata map[string]string cdata []map[string]string
wdata map[string]map[string]string
tempdata map[string]string
c *mongo.Cursor c *mongo.Cursor
) )
keys = make(map[string]string) keys = make(map[string]string)
@ -361,21 +405,16 @@ func (this *MCompModel) GetList(uid string, data interface{}) (err error) {
return return
} }
sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem() sliceelemType = sliceelemType.(*reflect2.UnsafePtrType).Elem()
err = this.Redis.HGetAll(this.ukey(uid), &keys) if cdata, err = this.batchgetlists(this.ukey(uid)); err == nil {
if err == nil { for _, v := range cdata {
n = 0
for _, v := range keys {
if cdata, err = this.Redis.HGetAllToMapString(v); err != nil {
return
}
sliceType.UnsafeGrow(dptr, n+1) sliceType.UnsafeGrow(dptr, n+1)
elemPtr = sliceType.UnsafeGetIndex(dptr, n) elemPtr = sliceType.UnsafeGetIndex(dptr, n)
if *((*unsafe.Pointer)(elemPtr)) == nil { if *((*unsafe.Pointer)(elemPtr)) == nil {
newPtr := sliceelemType.UnsafeNew() newPtr := sliceelemType.UnsafeNew()
decoder.DecodeForMapJson(newPtr, cdata) decoder.DecodeForMapJson(newPtr, v)
*((*unsafe.Pointer)(elemPtr)) = newPtr *((*unsafe.Pointer)(elemPtr)) = newPtr
} else { } else {
decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), cdata) decoder.DecodeForMapJson(*((*unsafe.Pointer)(elemPtr)), v)
} }
n++ n++
} }
@ -390,6 +429,7 @@ func (this *MCompModel) GetList(uid string, data interface{}) (err error) {
return return
} }
n = 0 n = 0
wdata = make(map[string]map[string]string)
for c.Next(context.Background()) { for c.Next(context.Background()) {
_id := c.Current.Lookup("_id").StringValue() _id := c.Current.Lookup("_id").StringValue()
sliceType.UnsafeGrow(dptr, n+1) sliceType.UnsafeGrow(dptr, n+1)
@ -399,23 +439,20 @@ func (this *MCompModel) GetList(uid string, data interface{}) (err error) {
*((*unsafe.Pointer)(elemPtr)) = newPtr *((*unsafe.Pointer)(elemPtr)) = newPtr
} }
elem := sliceType.GetIndex(data, n) elem := sliceType.GetIndex(data, n)
n++
if err = c.Decode(elem); err != nil { if err = c.Decode(elem); err != nil {
return return
} }
if cdata, err = encoder.EncodeToMapJson(*((*unsafe.Pointer)(elemPtr))); err != nil { if tempdata, err = encoder.EncodeToMapJson(*((*unsafe.Pointer)(elemPtr))); err != nil {
return return
} }
key := this.ukeylist(uid, _id) key := this.ukeylist(uid, _id)
if err = this.Redis.HMSetForMap(key, cdata); err != nil { wdata[key] = tempdata
return
}
keys[_id] = key keys[_id] = key
n++
} }
if len(keys) > 0 { if len(wdata) > 0 {
if err = this.Redis.HMSet(this.ukey(uid), keys); err != nil { wdata[this.ukey(uid)] = keys
return err = this.batchsetlists(wdata)
}
} }
} }
} }
@ -481,6 +518,54 @@ func (this *MCompModel) GetUserExpand(uid string) (result *pb.DBUserExpand, err
return return
} }
//批量读取列表数据
func (this *MCompModel) batchgetlists(key string) (result []map[string]string, err error) {
var data interface{}
ret := this.Redis.EvalSha(this.Redis.Context(), this.getListSha1, []string{key})
if data, err = ret.Result(); err != nil {
fmt.Printf("Execute batchgetlists err: %v", err.Error())
} else {
temp1 := data.([]interface{})
result = make([]map[string]string, len(temp1))
for i, v := range temp1 {
temp2 := v.([]interface{})
result[i] = make(map[string]string)
for n := 0; n < len(temp2); n += 2 {
result[i][temp2[n].(string)] = temp2[n+1].(string)
}
}
if len(result) == 0 {
err = redis.RedisNil
return
}
}
return
}
//批量写入数据
func (this *MCompModel) batchsetlists(data map[string]map[string]string) (err error) {
var (
n int
keys []string
values []interface{}
)
keys = make([]string, len(data))
values = make([]interface{}, len(data))
for k, v := range data {
keys[n] = k
for k1, v1 := range v {
values = append(values, k1, v1)
}
values = append(values, "#end")
}
ret := this.Redis.EvalSha(this.Redis.Context(), this.getListSha1, keys, values...)
if _, err := ret.Result(); err != nil {
fmt.Printf("Execute batchsetlists err: %v", err.Error())
}
return
}
//日志操作可选项 //日志操作可选项
func (this *MCompModel) logOpt(uid string, data interface{}, attrs ...*cache.OperationAttr) error { func (this *MCompModel) logOpt(uid string, data interface{}, attrs ...*cache.OperationAttr) error {
ret := cache.OperationAttrs(attrs).Find(cache.ATTR_MGOLOG).Unwrap_Or(nil) ret := cache.OperationAttrs(attrs).Find(cache.ATTR_MGOLOG).Unwrap_Or(nil)

View File

@ -304,20 +304,6 @@ func (this *ModelItemsComp) pack_addItemToUserPack(uid string, items []*pb.DB_Us
return return
} }
if leftnum > 0 { //还没有放完 寻找空的格子填充 if leftnum > 0 { //还没有放完 寻找空的格子填充
for _, v := range items {
if leftnum <= int64(conf.Maxnum) {
v.ItemId = itemId
v.Amount = uint32(leftnum)
leftnum = 0
update = append(update, v)
break
} else {
leftnum -= int64(conf.Maxnum)
v.ItemId = itemId
v.Amount = uint32(conf.Maxnum)
update = append(update, v)
}
}
index := int32(len(items)) index := int32(len(items))
for leftnum > 0 { //需要补充格子 for leftnum > 0 { //需要补充格子
if leftnum <= int64(conf.Maxnum) { if leftnum <= int64(conf.Maxnum) {

View File

@ -85,7 +85,7 @@ func TestMain(m *testing.M) {
func Test_Modules(t *testing.T) { func Test_Modules(t *testing.T) {
data, _ := ptypes.MarshalAny(&pb.ItemsGetlistReq{IType: 9}) data, _ := ptypes.MarshalAny(&pb.ItemsGetlistReq{IType: 9})
reply := &pb.RPCMessageReply{} reply := &pb.RPCMessageReply{}
s_gateComp.ReceiveMsg(context.Background(), &pb.AgentMessage{UserId: "0_62cd5952f72cc4bdc2d85f6b", MainType: "items", SubType: "getlist", Message: data}, reply) s_gateComp.ReceiveMsg(context.Background(), &pb.AgentMessage{UserId: "0_62c259916d8cf3e4e06311a8", MainType: "items", SubType: "getlist", Message: data}, reply)
log.Debugf("reply:%v", reply) log.Debugf("reply:%v", reply)
} }