refactor: host heartbeat (#1417)

* refactor host heartbeat
This commit is contained in:
Yening Qin 2023-03-11 11:44:01 +08:00 committed by GitHub
parent 4f6a0bf56b
commit 462e9dd696
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 455 additions and 461 deletions

View File

@ -43,10 +43,15 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
}
ctx := ctx.NewContext(context.Background(), db)
redis, err := storage.NewRedis(config.Redis)
if err != nil {
return nil, err
}
syncStats := memsto.NewSyncStats()
alertStats := astats.NewSyncStats()
targetCache := memsto.NewTargetCache(ctx, syncStats)
targetCache := memsto.NewTargetCache(ctx, syncStats, redis)
busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats)
alertMuteCache := memsto.NewAlertMuteCache(ctx, syncStats)
@ -56,7 +61,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
externalProcessors := process.NewExternalProcessors()
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, ctx, promClients)
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, ctx, promClients, false)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
rt := router.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
@ -71,7 +76,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
}
func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, alertStats *astats.Stats, externalProcessors *process.ExternalProcessorsType, targetCache *memsto.TargetCacheType, busiGroupCache *memsto.BusiGroupCacheType,
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, ctx *ctx.Context, promClients *prom.PromClientMap) {
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, ctx *ctx.Context, promClients *prom.PromClientMap, isCenter bool) {
userCache := memsto.NewUserCache(ctx, syncStats)
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
@ -81,12 +86,12 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
go models.InitNotifyConfig(ctx, alertc.Alerting.TemplatesDir)
naming := naming.NewNaming(ctx, alertc.Heartbeat)
naming := naming.NewNaming(ctx, alertc.Heartbeat, isCenter)
writers := writer.NewWriters(pushgwc)
record.NewScheduler(alertc, recordingRuleCache, promClients, writers, alertStats)
eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, promClients, naming, ctx, alertStats)
eval.NewScheduler(isCenter, alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, promClients, naming, ctx, alertStats)
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, webhookCache, notifyScript, alertc.Alerting, alertc.Ibex, ctx)
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp)

View File

@ -15,6 +15,7 @@ import (
)
type Scheduler struct {
isCenter bool
// key: hash
alertRules map[string]*AlertRuleWorker
@ -35,10 +36,11 @@ type Scheduler struct {
stats *astats.Stats
}
func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType,
func NewScheduler(isCenter bool, aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType,
busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, promClients *prom.PromClientMap, naming *naming.Naming,
ctx *ctx.Context, stats *astats.Stats) *Scheduler {
scheduler := &Scheduler{
isCenter: isCenter,
aconf: aconf,
alertRules: make(map[string]*AlertRuleWorker),
// recordRules: make(map[string]RuleContext),
@ -96,8 +98,12 @@ func (s *Scheduler) syncAlertRules() {
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.ctx)
alertRuleWorkers[alertRule.Hash()] = alertRule
}
} else if rule.IsHostRule() && s.naming.IamLeader() {
// all host rule will be processed by leader
} else if rule.IsHostRule() && s.isCenter {
// all host rule will be processed by center instance
if !naming.DatasourceHashRing.IsHit(naming.HostDatasource, fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {
continue
}
processor := process.NewProcessor(rule, 0, s.alertRuleCache, s.targetCache, s.busiGroupCache, s.alertMuteCache, s.promClients, s.ctx, s.stats)
alertRule := NewAlertRuleWorker(rule, 0, processor, s.promClients, s.ctx)
alertRuleWorkers[alertRule.Hash()] = alertRule

View File

@ -188,36 +188,43 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
query := models.GetHostsQuery(rule.Queries)
switch trigger.Type {
case "target_miss", "offset":
t := now - int64(trigger.Duration)
if trigger.Type == "offset" {
t = int64(trigger.Duration)
}
hosts, err := models.TargetGetsByFilter(arw.ctx, query, trigger.Type, t, 0, 0)
case "target_miss":
targets, err := models.TargetGetsByFilter(arw.ctx, query, 0, 0)
if err != nil {
logger.Errorf("rule_eval:%s query:%v, error:%v", arw.Key(), query, err)
continue
}
t := now - int64(trigger.Duration)
hosts := arw.processor.TargetCache.GetMissHost(targets, t)
for _, host := range hosts {
m := make(map[string]string)
m["ident"] = host.Ident
m["ident"] = host
lst = append(lst, common.NewAnomalyPoint(trigger.Type, m, now, float64(t), trigger.Severity))
}
case "pct_target_miss":
AllCount, err := models.TargetCountByFilter(arw.ctx, query, "", 0)
case "offset":
t := int64(trigger.Duration)
targets, err := models.TargetGetsByFilter(arw.ctx, query, 0, 0)
if err != nil {
logger.Errorf("rule_eval:%s query:%v, error:%v", arw.Key(), query, err)
continue
}
missCount, err := models.TargetCountByFilter(arw.ctx, query, trigger.Type, now-int64(trigger.Duration))
hosts := arw.processor.TargetCache.GetOffsetHost(targets, t)
for _, host := range hosts {
m := make(map[string]string)
m["ident"] = host
lst = append(lst, common.NewAnomalyPoint(trigger.Type, m, now, float64(t), trigger.Severity))
}
case "pct_target_miss":
targets, err := models.TargetGetsByFilter(arw.ctx, query, 0, 0)
if err != nil {
logger.Errorf("rule_eval:%s query:%v, error:%v", arw.Key(), query, err)
continue
}
pct := float64(missCount) / float64(AllCount) * 100
t := now - int64(trigger.Duration)
hosts := arw.processor.TargetCache.GetMissHost(targets, t)
pct := float64(len(hosts)) / float64(len(targets)) * 100
if pct >= float64(trigger.Percent) {
lst = append(lst, common.NewAnomalyPoint(trigger.Type, nil, now, pct, trigger.Severity))
}

View File

@ -15,6 +15,7 @@ type DatasourceHashRingType struct {
}
// for alert_rule sharding
var HostDatasource int64 = 100000
var DatasourceHashRing = DatasourceHashRingType{Rings: make(map[int64]*consistent.Consistent)}
func NewConsistentHashRing(replicas int32, nodes []string) *consistent.Consistent {

View File

@ -15,13 +15,15 @@ import (
type Naming struct {
ctx *ctx.Context
Heartbeat aconf.HeartbeatConfig
heartbeatConfig aconf.HeartbeatConfig
isCenter bool
}
func NewNaming(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig) *Naming {
func NewNaming(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig, isCenter bool) *Naming {
naming := &Naming{
ctx: ctx,
Heartbeat: heartbeat,
heartbeatConfig: heartbeat,
isCenter: isCenter,
}
naming.Heartbeats()
return naming
@ -58,7 +60,7 @@ func (n *Naming) DeleteInactiveInstances() {
}
func (n *Naming) loopHeartbeat() {
interval := time.Duration(n.Heartbeat.Interval) * time.Millisecond
interval := time.Duration(n.heartbeatConfig.Interval) * time.Millisecond
for {
time.Sleep(interval)
if err := n.heartbeat(); err != nil {
@ -72,19 +74,19 @@ func (n *Naming) heartbeat() error {
var err error
// 在页面上维护实例和集群的对应关系
datasourceIds, err = models.GetDatasourceIdsByClusterName(n.ctx, n.Heartbeat.ClusterName)
datasourceIds, err = models.GetDatasourceIdsByClusterName(n.ctx, n.heartbeatConfig.ClusterName)
if err != nil {
return err
}
if len(datasourceIds) == 0 {
err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.Heartbeat.Endpoint, n.Heartbeat.ClusterName, 0)
err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.ClusterName, 0)
if err != nil {
logger.Warningf("heartbeat with cluster %s err:%v", "", err)
}
} else {
for i := 0; i < len(datasourceIds); i++ {
err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.Heartbeat.Endpoint, n.Heartbeat.ClusterName, datasourceIds[i])
err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.ClusterName, datasourceIds[i])
if err != nil {
logger.Warningf("heartbeat with cluster %d err:%v", datasourceIds[i], err)
}
@ -110,6 +112,32 @@ func (n *Naming) heartbeat() error {
localss[datasourceIds[i]] = newss
}
if n.isCenter {
// 如果是中心节点,还需要处理 host 类型的告警规则host 类型告警规则,和数据源无关,想复用下数据源的 hash ring想用一个虚假的数据源 id 来处理
// if is center node, we need to handle host type alerting rules, host type alerting rules are not related to datasource, we want to reuse the hash ring of datasource, we want to use a fake datasource id to handle it
err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.ClusterName, HostDatasource)
if err != nil {
logger.Warningf("heartbeat with cluster %s err:%v", "", err)
}
servers, err := n.ActiveServers(HostDatasource)
if err != nil {
logger.Warningf("hearbeat %d get active server err:%v", HostDatasource, err)
return nil
}
sort.Strings(servers)
newss := strings.Join(servers, " ")
oldss, exists := localss[HostDatasource]
if exists && oldss == newss {
return nil
}
RebuildConsistentHashRing(HostDatasource, servers)
localss[HostDatasource] = newss
}
return nil
}
@ -121,8 +149,3 @@ func (n *Naming) ActiveServers(datasourceId int64) ([]string, error) {
// 30秒内有心跳就认为是活的
return models.AlertingEngineGetsInstances(n.ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)
}
func (n *Naming) AllActiveServers() ([]string, error) {
// 30秒内有心跳就认为是活的
return models.AlertingEngineGetsInstances(n.ctx, "clock > ?", time.Now().Unix()-30)
}

View File

@ -1,24 +0,0 @@
package naming
import (
"sort"
"github.com/toolkits/pkg/logger"
)
func (n *Naming) IamLeader() bool {
servers, err := n.AllActiveServers()
if err != nil {
logger.Errorf("failed to get active servers: %v", err)
return false
}
if len(servers) == 0 {
logger.Errorf("active servers empty")
return false
}
sort.Strings(servers)
return n.Heartbeat.Endpoint == servers[0]
}

View File

@ -59,7 +59,7 @@ type Processor struct {
groupName string
atertRuleCache *memsto.AlertRuleCacheType
targetCache *memsto.TargetCacheType
TargetCache *memsto.TargetCacheType
busiGroupCache *memsto.BusiGroupCacheType
alertMuteCache *memsto.AlertMuteCacheType
@ -90,7 +90,7 @@ func NewProcessor(rule *models.AlertRule, datasourceId int64, atertRuleCache *me
quit: make(chan struct{}),
rule: rule,
targetCache: targetCache,
TargetCache: targetCache,
busiGroupCache: busiGroupCache,
alertMuteCache: alertMuteCache,
atertRuleCache: atertRuleCache,
@ -126,7 +126,7 @@ func (arw *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, i
// 如果 event 被 mute 了,本质也是 fire 的状态,这里无论如何都添加到 alertingKeys 中,防止 fire 的事件自动恢复了
hash := event.Hash
alertingKeys[hash] = struct{}{}
if mute.IsMuted(cachedRule, event, arw.targetCache, arw.alertMuteCache) {
if mute.IsMuted(cachedRule, event, arw.TargetCache, arw.alertMuteCache) {
logger.Debugf("rule_eval:%s event:%v is muted", arw.Key(), event)
continue
}
@ -392,7 +392,7 @@ func (arw *Processor) fillTags(anomalyPoint common.AnomalyPoint) {
func (arw *Processor) mayHandleIdent() {
// handle ident
if ident, has := arw.tagsMap["ident"]; has {
if target, exists := arw.targetCache.Get(ident); exists {
if target, exists := arw.TargetCache.Get(ident); exists {
arw.target = target.Ident
arw.targetNote = target.Note
}

View File

@ -8,6 +8,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/alert/process"
"github.com/ccfos/nightingale/v6/center/cconf"
"github.com/ccfos/nightingale/v6/center/idents"
"github.com/ccfos/nightingale/v6/center/sso"
"github.com/ccfos/nightingale/v6/conf"
"github.com/ccfos/nightingale/v6/memsto"
@ -17,7 +18,6 @@ import (
"github.com/ccfos/nightingale/v6/pkg/i18nx"
"github.com/ccfos/nightingale/v6/pkg/logx"
"github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/ccfos/nightingale/v6/storage"
@ -54,13 +54,15 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, err
}
idents := idents.New(db, redis)
syncStats := memsto.NewSyncStats()
alertStats := astats.NewSyncStats()
idents := idents.New(db, config.Pushgw.DatasourceId, config.Pushgw.MaxOffset)
// idents := idents.New(db, config.Pushgw.DatasourceId, config.Pushgw.MaxOffset)
sso := sso.Init(config.Center, ctx)
busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats)
targetCache := memsto.NewTargetCache(ctx, syncStats)
targetCache := memsto.NewTargetCache(ctx, syncStats, redis)
dsCache := memsto.NewDatasourceCache(ctx, syncStats)
alertMuteCache := memsto.NewAlertMuteCache(ctx, syncStats)
alertRuleCache := memsto.NewAlertRuleCache(ctx, syncStats)
@ -68,13 +70,13 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
promClients := prom.NewPromClient(ctx, config.Alert.Heartbeat)
externalProcessors := process.NewExternalProcessors()
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, ctx, promClients)
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, ctx, promClients, true)
writers := writer.NewWriters(config.Pushgw)
alertrtRouter := alertrt.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
centerRouter := centerrt.New(config.HTTP, config.Center, cconf.Operations, dsCache, promClients, redis, sso, ctx)
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, idents, writers, ctx)
centerRouter := centerrt.New(config.HTTP, config.Center, cconf.Operations, dsCache, promClients, redis, sso, ctx, idents)
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, writers, ctx)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)

133
center/idents/idents.go Normal file
View File

@ -0,0 +1,133 @@
package idents
import (
"context"
"sync"
"time"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/storage"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/slice"
"gorm.io/gorm"
)
type Set struct {
sync.RWMutex
items map[string]models.HostMeta
db *gorm.DB
redis storage.Redis
}
func New(db *gorm.DB, redis storage.Redis) *Set {
set := &Set{
items: make(map[string]models.HostMeta),
db: db,
redis: redis,
}
set.Init()
return set
}
func (s *Set) Init() {
go s.LoopPersist()
}
func (s *Set) MSet(items map[string]models.HostMeta) {
s.Lock()
defer s.Unlock()
for ident, meta := range items {
s.items[ident] = meta
}
}
func (s *Set) Set(ident string, meta models.HostMeta) {
s.Lock()
defer s.Unlock()
s.items[ident] = meta
}
func (s *Set) LoopPersist() {
for {
time.Sleep(time.Second)
s.persist()
}
}
func (s *Set) persist() {
var items map[string]models.HostMeta
s.Lock()
if len(s.items) == 0 {
s.Unlock()
return
}
items = s.items
s.items = make(map[string]models.HostMeta)
s.Unlock()
s.updateMeta(items)
}
func (s *Set) updateMeta(items map[string]models.HostMeta) {
m := make(map[string]models.HostMeta, 100)
now := time.Now().Unix()
num := 0
for _, meta := range items {
m[meta.Hostname] = meta
num++
if num == 100 {
if err := s.updateTargets(m, now); err != nil {
logger.Errorf("failed to update targets: %v", err)
}
m = make(map[string]models.HostMeta, 100)
num = 0
}
}
if err := s.updateTargets(m, now); err != nil {
logger.Errorf("failed to update targets: %v", err)
}
}
func (s *Set) updateTargets(m map[string]models.HostMeta, now int64) error {
count := int64(len(m))
if count == 0 {
return nil
}
var values []interface{}
for ident, meta := range m {
values = append(values, ident)
values = append(values, meta)
}
err := s.redis.MSet(context.Background(), values...).Err()
if err != nil {
return err
}
var lst []string
for ident := range m {
lst = append(lst, ident)
}
// there are some idents not found in db, so insert them
var exists []string
err = s.db.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
if err != nil {
return err
}
news := slice.SubString(lst, exists)
for i := 0; i < len(news); i++ {
err = s.db.Exec("INSERT INTO target(ident, update_at) VALUES(?, ?)", news[i], now).Error
if err != nil {
logger.Error("failed to insert target:", news[i], "error:", err)
}
}
return nil
}

View File

@ -9,6 +9,7 @@ import (
"github.com/ccfos/nightingale/v6/center/cconf"
"github.com/ccfos/nightingale/v6/center/cstats"
"github.com/ccfos/nightingale/v6/center/idents"
"github.com/ccfos/nightingale/v6/center/sso"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/pkg/aop"
@ -28,12 +29,13 @@ type Router struct {
DatasourceCache *memsto.DatasourceCacheType
PromClients *prom.PromClientMap
Redis storage.Redis
IdentSet *idents.Set
Sso *sso.SsoClient
Ctx *ctx.Context
}
func New(httpConfig httpx.Config, center cconf.Center, operations cconf.Operation, ds *memsto.DatasourceCacheType, pc *prom.PromClientMap,
redis storage.Redis, sso *sso.SsoClient, ctx *ctx.Context) *Router {
redis storage.Redis, sso *sso.SsoClient, ctx *ctx.Context, identSet *idents.Set) *Router {
return &Router{
HTTP: httpConfig,
Center: center,
@ -43,6 +45,7 @@ func New(httpConfig httpx.Config, center cconf.Center, operations cconf.Operatio
Redis: redis,
Sso: sso,
Ctx: ctx,
IdentSet: identSet,
}
}
@ -342,6 +345,8 @@ func (rt *Router) Config(r *gin.Engine) {
service.POST("/conf-prop/encrypt", rt.confPropEncrypt)
service.POST("/conf-prop/decrypt", rt.confPropDecrypt)
service.POST("/heartbeat", rt.heartbeat)
}
rt.configNoRoute(r)

View File

@ -0,0 +1,41 @@
package router
import (
"compress/gzip"
"encoding/json"
"io/ioutil"
"time"
"github.com/ccfos/nightingale/v6/models"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
)
func (rt *Router) heartbeat(c *gin.Context) {
var bs []byte
var err error
var r *gzip.Reader
var req models.HostMeta
if c.GetHeader("Content-Encoding") == "gzip" {
r, err = gzip.NewReader(c.Request.Body)
if err != nil {
c.String(400, err.Error())
return
}
defer r.Close()
bs, err = ioutil.ReadAll(r)
ginx.Dangerous(err)
} else {
defer c.Request.Body.Close()
bs, err = ioutil.ReadAll(c.Request.Body)
ginx.Dangerous(err)
}
err = json.Unmarshal(bs, &req)
ginx.Dangerous(err)
req.Offset = (time.Now().UnixMilli() - req.UnixTime)
rt.IdentSet.Set(req.Hostname, req)
ginx.NewRender(c).Message(nil)
}

View File

@ -2,15 +2,13 @@ package router
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/ccfos/nightingale/v6/alert/common"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/prom"
"github.com/gin-gonic/gin"
"github.com/prometheus/common/model"
@ -29,10 +27,10 @@ func (rt *Router) targetGetsByHostFilter(c *gin.Context) {
query := models.GetHostsQuery(f.Filters)
hosts, err := models.TargetGetsByFilter(rt.Ctx, query, "", 0, f.Limit, (f.P-1)*f.Limit)
hosts, err := models.TargetGetsByFilter(rt.Ctx, query, f.Limit, (f.P-1)*f.Limit)
ginx.Dangerous(err)
total, err := models.TargetCountByFilter(rt.Ctx, query, "", 0)
total, err := models.TargetCountByFilter(rt.Ctx, query)
ginx.Dangerous(err)
ginx.NewRender(c).Data(gin.H{
@ -45,7 +43,6 @@ func (rt *Router) targetGets(c *gin.Context) {
bgid := ginx.QueryInt64(c, "bgid", -1)
query := ginx.QueryStr(c, "query", "")
limit := ginx.QueryInt(c, "limit", 30)
mins := ginx.QueryInt(c, "mins", 2)
dsIds := queryDatasourceIds(c)
total, err := models.TargetTotal(rt.Ctx, bgid, dsIds, query)
@ -57,55 +54,33 @@ func (rt *Router) targetGets(c *gin.Context) {
if err == nil {
now := time.Now()
cache := make(map[int64]*models.BusiGroup)
targetsMap := make(map[string]*models.Target)
var keys []string
for i := 0; i < len(list); i++ {
ginx.Dangerous(list[i].FillGroup(rt.Ctx, cache))
targetsMap[strconv.FormatInt(list[i].DatasourceId, 10)+list[i].Ident] = list[i]
if now.Unix()-list[i].UpdateAt < 60 {
list[i].TargetUp = 1
}
keys = append(keys, list[i].Ident)
}
// query LoadPerCore / MemUtil / TargetUp / DiskUsedPercent from prometheus
// map key: cluster, map value: ident list
targets := make(map[int64][]string)
for i := 0; i < len(list); i++ {
targets[list[i].DatasourceId] = append(targets[list[i].DatasourceId], list[i].Ident)
}
for dsId := range targets {
cc := rt.PromClients.GetCli(dsId)
targetArr := targets[dsId]
if len(targetArr) == 0 {
metaMap := make(map[string]*models.HostMeta)
vals := rt.Redis.MGet(context.Background(), keys...).Val()
for _, value := range vals {
var meta models.HostMeta
if value == nil {
continue
}
targetRe := strings.Join(targetArr, "|")
valuesMap := make(map[string]map[string]float64)
for metric, ql := range rt.Center.TargetMetrics {
promql := fmt.Sprintf(ql, targetRe, mins)
values, err := instantQuery(context.Background(), cc, promql, now)
ginx.Dangerous(err)
valuesMap[metric] = values
err := json.Unmarshal([]byte(value.(string)), &meta)
if err != nil {
continue
}
metaMap[meta.Hostname] = &meta
}
// handle values
for metric, values := range valuesMap {
for ident := range values {
mapkey := strconv.FormatInt(dsId, 10) + ident
if t, has := targetsMap[mapkey]; has {
switch metric {
case "LoadPerCore":
t.LoadPerCore = values[ident]
case "MemUtil":
t.MemUtil = values[ident]
case "DiskUtil":
t.DiskUtil = values[ident]
}
}
for i := 0; i < len(list); i++ {
if meta, ok := metaMap[list[i].Ident]; ok {
if now.UnixMilli()-meta.UnixTime < 60000 {
list[i].TargetUp = 1
}
list[i].FillMeta(meta)
}
}
}
@ -116,30 +91,6 @@ func (rt *Router) targetGets(c *gin.Context) {
}, nil)
}
func instantQuery(ctx context.Context, c prom.API, promql string, ts time.Time) (map[string]float64, error) {
ret := make(map[string]float64)
val, warnings, err := c.Query(ctx, promql, ts)
if err != nil {
return ret, err
}
if len(warnings) > 0 {
return ret, fmt.Errorf("instant query occur warnings, promql: %s, warnings: %v", promql, warnings)
}
// TODO 替换函数
vectors := common.ConvertAnomalyPoints(val)
for i := range vectors {
ident, has := vectors[i].Labels["ident"]
if has {
ret[string(ident)] = vectors[i].Value
}
}
return ret, nil
}
func (rt *Router) targetGetTags(c *gin.Context) {
idents := ginx.QueryStr(c, "idents", "")
idents = strings.ReplaceAll(idents, ",", " ")
@ -152,10 +103,6 @@ type targetTagsForm struct {
Tags []string `json:"tags" binding:"required"`
}
func (t targetTagsForm) Verify() {
}
func (rt *Router) targetBindTagsByFE(c *gin.Context) {
var f targetTagsForm
ginx.BindJSON(c, &f)

View File

@ -95,12 +95,6 @@ func Upgrade(configFile string) error {
return err
}
// target
err = models.TargetUpgradeToV6(ctx, m)
if err != nil {
return err
}
// recoding rule
err = models.RecordingRuleUpgradeToV6(ctx, m)
if err != nil {

View File

@ -11,6 +11,7 @@ import (
"github.com/ccfos/nightingale/v6/alert"
"github.com/ccfos/nightingale/v6/pkg/osx"
"github.com/ccfos/nightingale/v6/pkg/version"
"github.com/toolkits/pkg/runner"
)

View File

@ -11,6 +11,7 @@ import (
"github.com/ccfos/nightingale/v6/center"
"github.com/ccfos/nightingale/v6/pkg/osx"
"github.com/ccfos/nightingale/v6/pkg/version"
"github.com/toolkits/pkg/runner"
)

View File

@ -47,10 +47,6 @@ func InitConfig(configDir, cryptoKey string) (*ConfigType, error) {
return nil, err
}
if config.Pushgw.DatasourceId == 0 {
return nil, fmt.Errorf("datasourceId is 0")
}
if config.Alert.Heartbeat.IP == "" {
// auto detect
// config.Alert.Heartbeat.IP = fmt.Sprint(GetOutboundIP())

View File

@ -1,13 +1,15 @@
package memsto
import (
"context"
"encoding/json"
"log"
"strings"
"sync"
"time"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/storage"
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
@ -20,17 +22,19 @@ type TargetCacheType struct {
statLastUpdated int64
ctx *ctx.Context
stats *Stats
redis storage.Redis
sync.RWMutex
targets map[string]*models.Target // key: ident
}
func NewTargetCache(ctx *ctx.Context, stats *Stats) *TargetCacheType {
func NewTargetCache(ctx *ctx.Context, stats *Stats, redis storage.Redis) *TargetCacheType {
tc := &TargetCacheType{
statTotal: -1,
statLastUpdated: -1,
ctx: ctx,
stats: stats,
redis: redis,
targets: make(map[string]*models.Target),
}
@ -72,19 +76,40 @@ func (tc *TargetCacheType) Get(ident string) (*models.Target, bool) {
return val, has
}
func (tc *TargetCacheType) GetDeads(actives map[string]struct{}) map[string]*models.Target {
ret := make(map[string]*models.Target)
func (tc *TargetCacheType) GetMissHost(targets []*models.Target, ts int64) []string {
tc.RLock()
defer tc.RUnlock()
for ident, target := range tc.targets {
if _, has := actives[ident]; !has {
ret[ident] = target
var missHosts []string
for _, target := range targets {
target, exists := tc.targets[target.Ident]
if !exists {
missHosts = append(missHosts, target.Ident)
continue
}
if target.UnixTime < ts {
missHosts = append(missHosts, target.Ident)
}
}
return ret
return missHosts
}
func (tc *TargetCacheType) GetOffsetHost(targets []*models.Target, ts int64) []string {
tc.RLock()
defer tc.RUnlock()
var hosts []string
for _, target := range targets {
target, exists := tc.targets[target.Ident]
if !exists {
continue
}
if target.Offset > ts {
hosts = append(hosts, target.Ident)
}
}
return hosts
}
func (tc *TargetCacheType) SyncTargets() {
@ -126,18 +151,14 @@ func (tc *TargetCacheType) syncTargets() error {
return errors.WithMessage(err, "failed to call TargetGetsAll")
}
metaMap := tc.GetHostMetas(lst)
m := make(map[string]*models.Target)
for i := 0; i < len(lst); i++ {
lst[i].TagsJSON = strings.Fields(lst[i].Tags)
lst[i].TagsMap = make(map[string]string)
for _, item := range lst[i].TagsJSON {
arr := strings.Split(item, "=")
if len(arr) != 2 {
continue
lst[i].FillTagsMap()
if meta, ok := metaMap[lst[i].Ident]; ok {
lst[i].FillMeta(meta)
}
lst[i].TagsMap[arr[0]] = arr[1]
}
m[lst[i].Ident] = lst[i]
}
@ -150,3 +171,50 @@ func (tc *TargetCacheType) syncTargets() error {
return nil
}
func (tc *TargetCacheType) GetHostMetas(targets []*models.Target) map[string]*models.HostMeta {
metaMap := make(map[string]*models.HostMeta)
if tc.redis == nil {
return metaMap
}
var metas []*models.HostMeta
num := 0
var keys []string
for i := 0; i < len(targets); i++ {
keys = append(keys, targets[i].Ident)
num++
if num == 100 {
vals := tc.redis.MGet(context.Background(), keys...).Val()
for _, value := range vals {
var meta models.HostMeta
if value == nil {
continue
}
err := json.Unmarshal([]byte(value.(string)), &meta)
if err != nil {
logger.Errorf("failed to unmarshal host meta: %s value:%v", err, value)
continue
}
metaMap[meta.Hostname] = &meta
}
keys = keys[:0]
metas = metas[:0]
num = 0
}
}
vals := tc.redis.MGet(context.Background(), keys...).Val()
for _, value := range vals {
var meta models.HostMeta
if value == nil {
continue
}
err := json.Unmarshal([]byte(value.(string)), &meta)
if err != nil {
continue
}
metaMap[meta.Hostname] = &meta
}
return metaMap
}

23
models/host_meta.go Normal file
View File

@ -0,0 +1,23 @@
package models
import "encoding/json"
type HostMeta struct {
AgentVersion string `json:"agent_version"`
Os string `json:"os"`
Arch string `json:"arch"`
Hostname string `json:"hostname"`
CpuNum int `json:"cpu_num"`
CpuUtil float64 `json:"cpu_util"`
MemUtil float64 `json:"mem_util"`
Offset int64 `json:"offset"`
UnixTime int64 `json:"unixtime"`
}
func (h HostMeta) MarshalBinary() ([]byte, error) {
return json.Marshal(h)
}
func (h *HostMeta) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, h)
}

View File

@ -6,7 +6,6 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/toolkits/pkg/logger"
"github.com/pkg/errors"
"gorm.io/gorm"
@ -17,45 +16,24 @@ type Target struct {
GroupId int64 `json:"group_id"`
GroupObj *BusiGroup `json:"group_obj" gorm:"-"`
Cluster string `json:"cluster"`
DatasourceId int64 `json:"datasource_id"`
Ident string `json:"ident"`
Note string `json:"note"`
Tags string `json:"-"`
TagsJSON []string `json:"tags" gorm:"-"`
TagsMap map[string]string `json:"-" gorm:"-"` // internal use, append tags to series
UpdateAt int64 `json:"update_at"`
Offset int64 `json:"offset"`
UnixTime int64 `json:"unixtime" gorm:"-"`
Offset int64 `json:"offset" gorm:"-"`
TargetUp float64 `json:"target_up" gorm:"-"`
LoadPerCore float64 `json:"load_per_core" gorm:"-"`
MemUtil float64 `json:"mem_util" gorm:"-"`
DiskUtil float64 `json:"disk_util" gorm:"-"`
CpuNum int `json:"cpu_num" gorm:"-"`
CpuUtil float64 `json:"cpu_util" gorm:"-"`
}
func (t *Target) TableName() string {
return "target"
}
func (t *Target) Add(ctx *ctx.Context) error {
obj, err := TargetGet(ctx, "ident = ?", t.Ident)
if err != nil {
return err
}
if obj == nil {
return Insert(ctx, t)
}
if obj.DatasourceId != t.DatasourceId {
return DB(ctx).Model(&Target{}).Where("ident = ?", t.Ident).Updates(map[string]interface{}{
"datasource_id": t.DatasourceId,
"update_at": t.UpdateAt,
}).Error
}
return nil
}
func (t *Target) FillGroup(ctx *ctx.Context, cache map[int64]*BusiGroup) error {
if t.GroupId <= 0 {
return nil
@ -136,9 +114,9 @@ func TargetGets(ctx *ctx.Context, bgid int64, dsIds []int64, query string, limit
}
// 根据 DatasourceIds, groupids, tags, hosts 查询 targets
func TargetGetsByFilter(ctx *ctx.Context, query map[string]interface{}, typ string, ts int64, limit, offset int) ([]*Target, error) {
func TargetGetsByFilter(ctx *ctx.Context, query map[string]interface{}, limit, offset int) ([]*Target, error) {
var lst []*Target
session := TargetFilterQueryBuild(ctx, query, typ, ts, limit, offset)
session := TargetFilterQueryBuild(ctx, query, limit, offset)
err := session.Order("ident").Find(&lst).Error
cache := make(map[int64]*BusiGroup)
for i := 0; i < len(lst); i++ {
@ -149,24 +127,17 @@ func TargetGetsByFilter(ctx *ctx.Context, query map[string]interface{}, typ stri
return lst, err
}
func TargetCountByFilter(ctx *ctx.Context, query map[string]interface{}, typ string, ts int64) (int64, error) {
session := TargetFilterQueryBuild(ctx, query, typ, ts, 0, 0)
func TargetCountByFilter(ctx *ctx.Context, query map[string]interface{}) (int64, error) {
session := TargetFilterQueryBuild(ctx, query, 0, 0)
return Count(session)
}
func TargetFilterQueryBuild(ctx *ctx.Context, query map[string]interface{}, typ string, ts int64, limit, offset int) *gorm.DB {
func TargetFilterQueryBuild(ctx *ctx.Context, query map[string]interface{}, limit, offset int) *gorm.DB {
session := DB(ctx).Model(&Target{})
for k, v := range query {
session = session.Where(k, v)
}
switch typ {
case "target_miss", "pct_target_miss":
session = session.Where("update_at < ?", ts)
case "offset":
session = session.Where("offset > ?", ts)
}
if limit > 0 {
session = session.Limit(limit).Offset(offset)
}
@ -288,6 +259,26 @@ func (t *Target) DelTags(ctx *ctx.Context, tags []string) error {
}).Error
}
func (t *Target) FillTagsMap() {
t.TagsJSON = strings.Fields(t.Tags)
t.TagsMap = make(map[string]string)
for _, item := range t.TagsJSON {
arr := strings.Split(item, "=")
if len(arr) != 2 {
continue
}
t.TagsMap[arr[0]] = arr[1]
}
}
func (t *Target) FillMeta(meta *HostMeta) {
t.MemUtil = meta.MemUtil
t.CpuUtil = meta.CpuUtil
t.CpuNum = meta.CpuNum
t.UnixTime = meta.UnixTime
t.Offset = meta.Offset
}
func TargetIdents(ctx *ctx.Context, ids []int64) ([]string, error) {
var ret []string
@ -323,25 +314,3 @@ func IdentsFilter(ctx *ctx.Context, idents []string, where string, args ...inter
func (m *Target) UpdateFieldsMap(ctx *ctx.Context, fields map[string]interface{}) error {
return DB(ctx).Model(m).Updates(fields).Error
}
func TargetUpgradeToV6(ctx *ctx.Context, dsm map[string]Datasource) error {
var lst []*Target
err := DB(ctx).Find(&lst).Error
if err != nil {
return err
}
for i := 0; i < len(lst); i++ {
ds, exists := dsm[lst[i].Cluster]
if !exists {
continue
}
lst[i].DatasourceId = ds.Id
err = lst[i].UpdateFieldsMap(ctx, map[string]interface{}{"datasource_id": lst[i].DatasourceId})
if err != nil {
logger.Errorf("update target:%d datasource ids failed, %v", lst[i].Id, err)
}
}
return nil
}

View File

@ -1,178 +0,0 @@
package idents
import (
"math"
"sync"
"time"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/slice"
"gorm.io/gorm"
)
type Set struct {
sync.Mutex
items map[string]int64
db *gorm.DB
datasourceId int64
maxOffset int64
}
func New(db *gorm.DB, dsId, maxOffset int64) *Set {
if maxOffset <= 0 {
maxOffset = 500
}
set := &Set{
items: make(map[string]int64),
db: db,
datasourceId: dsId,
maxOffset: maxOffset,
}
set.Init()
return set
}
func (s *Set) Init() {
go s.LoopPersist()
}
func (s *Set) MSet(items map[string]int64) {
s.Lock()
defer s.Unlock()
for ident, ts := range items {
s.items[ident] = ts
}
}
func (s *Set) LoopPersist() {
for {
time.Sleep(time.Second)
s.persist()
}
}
func (s *Set) persist() {
var items map[string]int64
s.Lock()
if len(s.items) == 0 {
s.Unlock()
return
}
items = s.items
s.items = make(map[string]int64)
s.Unlock()
s.updateTimestamp(items)
}
func (s *Set) updateTimestamp(items map[string]int64) {
lst := make([]string, 0, 100)
offsetLst := make(map[string]int64)
now := time.Now().Unix()
num := 0
largeOffsetTargets, _ := s.GetLargeOffsetTargets()
for ident, ts := range items {
// 和当前时间相差 maxOffset 毫秒以上的,更新偏移的时间
// compare with current time, if offset is larger than maxOffset, update offset
offset := int64(math.Abs(float64(ts - time.Now().UnixMilli())))
if offset >= s.maxOffset {
offsetLst[ident] = offset
}
// 如果是大偏移的,也更新时间
// if offset is large, update timestamp
if _, ok := largeOffsetTargets[ident]; ok {
offsetLst[ident] = offset
}
lst = append(lst, ident)
num++
if num == 100 {
if err := s.updateTargets(lst, now); err != nil {
logger.Errorf("failed to update targets: %v", err)
}
lst = lst[:0]
num = 0
}
}
if err := s.updateTargets(lst, now); err != nil {
logger.Errorf("failed to update targets: %v", err)
}
for ident, offset := range offsetLst {
if err := s.updateTargetsAndOffset(ident, offset, now); err != nil {
logger.Errorf("failed to update offset: %v", err)
}
}
}
func (s *Set) updateTargets(lst []string, now int64) error {
count := int64(len(lst))
if count == 0 {
return nil
}
ret := s.db.Table("target").Where("ident in ?", lst).Update("update_at", now)
if ret.Error != nil {
return ret.Error
}
if ret.RowsAffected == count {
return nil
}
// there are some idents not found in db, so insert them
var exists []string
err := s.db.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
if err != nil {
return err
}
news := slice.SubString(lst, exists)
for i := 0; i < len(news); i++ {
err = s.db.Exec("INSERT INTO target(ident, update_at, datasource_id) VALUES(?, ?, ?)", news[i], now, s.datasourceId).Error
if err != nil {
logger.Error("failed to insert target:", news[i], "error:", err)
}
}
return nil
}
func (s *Set) updateTargetsAndOffset(ident string, offset, now int64) error {
ret := s.db.Table("target").Where("ident = ?", ident).Updates(map[string]interface{}{"update_at": now, "offset": offset})
if ret.Error != nil {
return ret.Error
}
if ret.RowsAffected == 1 {
return nil
}
// there are some idents not found in db, so insert them
err := s.db.Exec("INSERT INTO target(ident, offset, update_at, datasource_id) VALUES(?, ?, ?, ?)", ident, offset, now, s.datasourceId).Error
if err != nil {
logger.Error("failed to insert target:", ident, "error:", err)
}
return nil
}
func (s *Set) GetLargeOffsetTargets() (map[string]struct{}, error) {
var targets []string
err := s.db.Table("target").Where("offset > ?", s.maxOffset).Pluck("ident", &targets).Error
if err != nil {
return nil, err
}
var m = make(map[string]struct{}, len(targets))
for i := 0; i < len(targets); i++ {
m[targets[i]] = struct{}{}
}
return m, nil
}

View File

@ -10,9 +10,7 @@ import (
)
type Pushgw struct {
DatasourceId int64
BusiGroupLabelKey string
MaxOffset int64
LabelRewrite bool
ForceUseServerTS bool
DebugSample map[string]string

View File

@ -9,14 +9,12 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/httpx"
"github.com/ccfos/nightingale/v6/pkg/logx"
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/router"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/ccfos/nightingale/v6/storage"
)
type PushgwProvider struct {
Ident *idents.Set
Router *router.Router
}
@ -37,16 +35,15 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
}
ctx := ctx.NewContext(context.Background(), db)
idents := idents.New(db, config.Pushgw.DatasourceId, config.Pushgw.MaxOffset)
stats := memsto.NewSyncStats()
busiGroupCache := memsto.NewBusiGroupCache(ctx, stats)
targetCache := memsto.NewTargetCache(ctx, stats)
targetCache := memsto.NewTargetCache(ctx, stats, nil)
writers := writer.NewWriters(config.Pushgw)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
rt := router.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, idents, writers, ctx)
rt := router.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, writers, ctx)
rt.Config(r)
httpClean := httpx.Init(config.HTTP, r)

View File

@ -64,13 +64,13 @@ func (rt *Router) AppendLabels(pt *prompb.TimeSeries, target *models.Target, bgC
}
}
func getTs(pt *prompb.TimeSeries) int64 {
if len(pt.Samples) == 0 {
return 0
}
// func getTs(pt *prompb.TimeSeries) int64 {
// if len(pt.Samples) == 0 {
// return 0
// }
return pt.Samples[0].Timestamp
}
// return pt.Samples[0].Timestamp
// }
func (rt *Router) debugSample(remoteAddr string, v *prompb.TimeSeries) {
filter := rt.Pushgw.DebugSample

View File

@ -6,7 +6,6 @@ import (
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/httpx"
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/writer"
)
@ -16,14 +15,13 @@ type Router struct {
Pushgw pconf.Pushgw
TargetCache *memsto.TargetCacheType
BusiGroupCache *memsto.BusiGroupCacheType
IdentSet *idents.Set
// IdentSet *idents.Set
Writers *writer.WritersType
Ctx *ctx.Context
}
func New(httpConfig httpx.Config, pushgw pconf.Pushgw, tc *memsto.TargetCacheType, bg *memsto.BusiGroupCacheType, set *idents.Set, writers *writer.WritersType, ctx *ctx.Context) *Router {
func New(httpConfig httpx.Config, pushgw pconf.Pushgw, tc *memsto.TargetCacheType, bg *memsto.BusiGroupCacheType, writers *writer.WritersType, ctx *ctx.Context) *Router {
return &Router{
IdentSet: set,
HTTP: httpConfig,
Writers: writers,
Ctx: ctx,

View File

@ -224,7 +224,6 @@ func (r *Router) datadogSeries(c *gin.Context) {
succ int
fail int
msg = "received"
ids = make(map[string]int64)
)
for i := 0; i < cnt; i++ {
@ -246,9 +245,6 @@ func (r *Router) datadogSeries(c *gin.Context) {
}
if ident != "" {
// register host
ids[ident] = getTs(pt)
// fill tags
target, has := r.TargetCache.Get(ident)
if has {
@ -273,7 +269,6 @@ func (r *Router) datadogSeries(c *gin.Context) {
if succ > 0 {
CounterSampleTotal.WithLabelValues("datadog").Add(float64(succ))
r.IdentSet.MSet(ids)
}
c.JSON(200, gin.H{

View File

@ -180,7 +180,6 @@ func (rt *Router) falconPush(c *gin.Context) {
fail int
msg = "received"
ts = time.Now().Unix()
ids = make(map[string]int64)
)
for i := 0; i < len(arr); i++ {
@ -196,8 +195,6 @@ func (rt *Router) falconPush(c *gin.Context) {
}
if ident != "" {
// register host
ids[ident] = getTs(pt)
// fill tags
target, has := rt.TargetCache.Get(ident)
if has {
@ -222,7 +219,6 @@ func (rt *Router) falconPush(c *gin.Context) {
if succ > 0 {
CounterSampleTotal.WithLabelValues("openfalcon").Add(float64(succ))
rt.IdentSet.MSet(ids)
}
c.JSON(200, gin.H{

View File

@ -166,7 +166,6 @@ func (rt *Router) openTSDBPut(c *gin.Context) {
fail int
msg = "received"
ts = time.Now().Unix()
ids = make(map[string]int64)
)
for i := 0; i < len(arr); i++ {
@ -191,9 +190,6 @@ func (rt *Router) openTSDBPut(c *gin.Context) {
host, has := arr[i].Tags["ident"]
if has {
// register host
ids[host] = getTs(pt)
// fill tags
target, has := rt.TargetCache.Get(host)
if has {
@ -218,7 +214,6 @@ func (rt *Router) openTSDBPut(c *gin.Context) {
if succ > 0 {
CounterSampleTotal.WithLabelValues("opentsdb").Add(float64(succ))
rt.IdentSet.MSet(ids)
}
c.JSON(200, gin.H{

View File

@ -4,7 +4,6 @@ import (
"io"
"io/ioutil"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/gogo/protobuf/proto"
@ -79,8 +78,6 @@ func (rt *Router) remoteWrite(c *gin.Context) {
}
var (
now = time.Now().Unix()
ids = make(map[string]int64)
ident string
metric string
)
@ -101,9 +98,6 @@ func (rt *Router) remoteWrite(c *gin.Context) {
}
if len(ident) > 0 {
// register host
ids[ident] = now
// fill tags
target, has := rt.TargetCache.Get(ident)
if has {
@ -125,7 +119,6 @@ func (rt *Router) remoteWrite(c *gin.Context) {
}
CounterSampleTotal.WithLabelValues("prometheus").Add(float64(count))
rt.IdentSet.MSet(ids)
}
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling

View File

@ -29,6 +29,8 @@ type Redis interface {
Del(ctx context.Context, keys ...string) *redis.IntCmd
Get(ctx context.Context, key string) *redis.StringCmd
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd
MSet(ctx context.Context, values ...interface{}) *redis.StatusCmd
MGet(ctx context.Context, keys ...string) *redis.SliceCmd
HGetAll(ctx context.Context, key string) *redis.MapStringStringCmd
HSet(ctx context.Context, key string, values ...interface{}) *redis.IntCmd
HDel(ctx context.Context, key string, fields ...string) *redis.IntCmd