nightingale/storage/redis.go

137 lines
2.8 KiB
Go

package storage
import (
"context"
"errors"
"fmt"
"os"
"strings"
"github.com/ccfos/nightingale/v6/pkg/tlsx"
"github.com/redis/go-redis/v9"
"github.com/toolkits/pkg/logger"
)
type RedisConfig struct {
Address string
Username string
Password string
DB int
tlsx.ClientConfig
RedisType string
MasterName string
SentinelUsername string
SentinelPassword string
}
type Redis redis.Cmdable
func NewRedis(cfg RedisConfig) (Redis, error) {
var redisClient Redis
switch cfg.RedisType {
case "standalone", "":
redisOptions := &redis.Options{
Addr: cfg.Address,
Username: cfg.Username,
Password: cfg.Password,
DB: cfg.DB,
}
if cfg.UseTLS {
tlsConfig, err := cfg.TLSConfig()
if err != nil {
fmt.Println("failed to init redis tls config:", err)
os.Exit(1)
}
redisOptions.TLSConfig = tlsConfig
}
redisClient = redis.NewClient(redisOptions)
case "cluster":
redisOptions := &redis.ClusterOptions{
Addrs: strings.Split(cfg.Address, ","),
Username: cfg.Username,
Password: cfg.Password,
}
if cfg.UseTLS {
tlsConfig, err := cfg.TLSConfig()
if err != nil {
fmt.Println("failed to init redis tls config:", err)
os.Exit(1)
}
redisOptions.TLSConfig = tlsConfig
}
redisClient = redis.NewClusterClient(redisOptions)
case "sentinel":
redisOptions := &redis.FailoverOptions{
MasterName: cfg.MasterName,
SentinelAddrs: strings.Split(cfg.Address, ","),
Username: cfg.Username,
Password: cfg.Password,
DB: cfg.DB,
SentinelUsername: cfg.SentinelUsername,
SentinelPassword: cfg.SentinelPassword,
}
if cfg.UseTLS {
tlsConfig, err := cfg.TLSConfig()
if err != nil {
fmt.Println("failed to init redis tls config:", err)
os.Exit(1)
}
redisOptions.TLSConfig = tlsConfig
}
redisClient = redis.NewFailoverClient(redisOptions)
default:
fmt.Println("failed to init redis , redis type is illegal:", cfg.RedisType)
os.Exit(1)
}
err := redisClient.Ping(context.Background()).Err()
if err != nil {
fmt.Println("failed to ping redis:", err)
os.Exit(1)
}
return redisClient, nil
}
func MGet(ctx context.Context, r Redis, keys []string) [][]byte {
var vals [][]byte
pipe := r.Pipeline()
for _, key := range keys {
pipe.Get(ctx, key)
}
cmds, _ := pipe.Exec(ctx)
for i, key := range keys {
cmd := cmds[i]
if errors.Is(cmd.Err(), redis.Nil) {
continue
}
if cmd.Err() != nil {
logger.Errorf("failed to get key: %s, err: %s", key, cmd.Err())
continue
}
val := []byte(cmd.(*redis.StringCmd).Val())
vals = append(vals, val)
}
return vals
}
func MSet(ctx context.Context, r Redis, m map[string]interface{}) error {
pipe := r.Pipeline()
for k, v := range m {
pipe.Set(ctx, k, v, 0)
}
_, err := pipe.Exec(ctx)
return err
}