From a2a0b419099fb8a28b5d180c4e529b0b3f0bedd2 Mon Sep 17 00:00:00 2001 From: Yening Qin <710leo@gmail.com> Date: Tue, 28 Mar 2023 15:39:43 +0800 Subject: [PATCH] refactor: redis mset and mget (#1446) * refactor redis mset --- center/metas/metas.go | 7 +++--- center/router/router_target.go | 5 +++-- memsto/target_cache.go | 18 ++++++++++++---- storage/redis.go | 39 +++++++++++++++++++++++++++++++--- 4 files changed, 56 insertions(+), 13 deletions(-) diff --git a/center/metas/metas.go b/center/metas/metas.go index c2841222..c023149f 100644 --- a/center/metas/metas.go +++ b/center/metas/metas.go @@ -95,11 +95,10 @@ func (s *Set) updateTargets(m map[string]models.HostMeta) error { return nil } - var values []interface{} + newMap := make(map[string]interface{}, count) for ident, meta := range m { - values = append(values, models.WrapIdent(ident)) - values = append(values, meta) + newMap[models.WrapIdent(ident)] = meta } - err := s.redis.MSet(context.Background(), values...).Err() + err := storage.MSet(context.Background(), s.redis, newMap) return err } diff --git a/center/router/router_target.go b/center/router/router_target.go index 7f74108a..f862932a 100644 --- a/center/router/router_target.go +++ b/center/router/router_target.go @@ -9,6 +9,7 @@ import ( "time" "github.com/ccfos/nightingale/v6/models" + "github.com/ccfos/nightingale/v6/storage" "github.com/gin-gonic/gin" "github.com/prometheus/common/model" @@ -64,13 +65,13 @@ func (rt *Router) targetGets(c *gin.Context) { if len(keys) > 0 { metaMap := make(map[string]*models.HostMeta) - vals := rt.Redis.MGet(context.Background(), keys...).Val() + vals, _ := storage.MGet(context.Background(), rt.Redis, keys) for _, value := range vals { var meta models.HostMeta if value == nil { continue } - err := json.Unmarshal([]byte(value.(string)), &meta) + err := json.Unmarshal(value, &meta) if err != nil { logger.Warningf("unmarshal %v host meta failed: %v", value, err) continue diff --git a/memsto/target_cache.go b/memsto/target_cache.go index 9dcbce5a..2cce679a 100644 --- a/memsto/target_cache.go +++ b/memsto/target_cache.go @@ -177,13 +177,19 @@ func (tc *TargetCacheType) GetHostMetas(targets []*models.Target) map[string]*mo keys = append(keys, models.WrapIdent(targets[i].Ident)) num++ if num == 100 { - vals := tc.redis.MGet(context.Background(), keys...).Val() + vals, err := storage.MGet(context.Background(), tc.redis, keys) + if err != nil { + logger.Warningf("keys:%v get host meta err:%v", keys, err) + continue + } + for _, value := range vals { var meta models.HostMeta if value == nil { continue } - err := json.Unmarshal([]byte(value.(string)), &meta) + + err := json.Unmarshal(value, &meta) if err != nil { logger.Errorf("failed to unmarshal host meta: %s value:%v", err, value) continue @@ -196,13 +202,17 @@ func (tc *TargetCacheType) GetHostMetas(targets []*models.Target) map[string]*mo } } - vals := tc.redis.MGet(context.Background(), keys...).Val() + vals, err := storage.MGet(context.Background(), tc.redis, keys) + if err != nil { + logger.Warningf("keys:%v get host meta err:%v", keys, err) + } for _, value := range vals { var meta models.HostMeta if value == nil { continue } - err := json.Unmarshal([]byte(value.(string)), &meta) + + err := json.Unmarshal(value, &meta) if err != nil { continue } diff --git a/storage/redis.go b/storage/redis.go index a1ad80f1..c5863599 100644 --- a/storage/redis.go +++ b/storage/redis.go @@ -8,8 +8,8 @@ import ( "time" "github.com/ccfos/nightingale/v6/pkg/tlsx" - "github.com/redis/go-redis/v9" + "github.com/toolkits/pkg/logger" ) type RedisConfig struct { @@ -26,11 +26,10 @@ type RedisConfig struct { } type Redis interface { + Pipeline() redis.Pipeliner Del(ctx context.Context, keys ...string) *redis.IntCmd Get(ctx context.Context, key string) *redis.StringCmd Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd - MSet(ctx context.Context, values ...interface{}) *redis.StatusCmd - MGet(ctx context.Context, keys ...string) *redis.SliceCmd HGetAll(ctx context.Context, key string) *redis.MapStringStringCmd HSet(ctx context.Context, key string, values ...interface{}) *redis.IntCmd HDel(ctx context.Context, key string, fields ...string) *redis.IntCmd @@ -113,3 +112,37 @@ func NewRedis(cfg RedisConfig) (Redis, error) { } return redisClient, nil } + +func MGet(ctx context.Context, r Redis, keys []string) ([][]byte, error) { + var vals [][]byte + pipe := r.Pipeline() + for _, key := range keys { + pipe.Get(ctx, key) + } + cmds, err := pipe.Exec(ctx) + if err != nil { + logger.Errorf("failed to exec pipeline: %s", err) + return vals, err + } + + for i, key := range keys { + cmd := cmds[i] + if cmd.Err() != nil { + logger.Errorf("failed to get key: %s, err: %s", key, cmd.Err()) + continue + } + val := []byte(cmd.(*redis.StringCmd).Val()) + vals = append(vals, val) + } + + return vals, err +} + +func MSet(ctx context.Context, r Redis, m map[string]interface{}) error { + pipe := r.Pipeline() + for k, v := range m { + pipe.Set(ctx, k, v, 0) + } + _, err := pipe.Exec(ctx) + return err +}