From 462e9dd6961501307cb3d430c5d4c10bcca21380 Mon Sep 17 00:00:00 2001 From: Yening Qin <710leo@gmail.com> Date: Sat, 11 Mar 2023 11:44:01 +0800 Subject: [PATCH] refactor: host heartbeat (#1417) * refactor host heartbeat --- alert/alert.go | 15 ++- alert/eval/alert_rule.go | 12 +- alert/eval/eval.go | 31 +++-- alert/naming/hashring.go | 1 + alert/naming/heartbeat.go | 51 +++++--- alert/naming/leader.go | 24 ---- alert/process/process.go | 8 +- center/center.go | 14 ++- center/idents/idents.go | 133 +++++++++++++++++++++ center/router/router.go | 7 +- center/router/router_heartbeat.go | 41 +++++++ center/router/router_target.go | 95 ++++----------- cli/upgrade/upgrade.go | 6 - cmd/alert/main.go | 1 + cmd/center/main.go | 1 + conf/conf.go | 4 - memsto/target_cache.go | 106 ++++++++++++++--- models/host_meta.go | 23 ++++ models/target.go | 111 +++++++---------- pushgw/idents/idents.go | 178 ---------------------------- pushgw/pconf/conf.go | 2 - pushgw/pushgw.go | 7 +- pushgw/router/fns.go | 12 +- pushgw/router/router.go | 10 +- pushgw/router/router_datadog.go | 5 - pushgw/router/router_openfalcon.go | 4 - pushgw/router/router_opentsdb.go | 5 - pushgw/router/router_remotewrite.go | 7 -- storage/redis.go | 2 + 29 files changed, 455 insertions(+), 461 deletions(-) delete mode 100644 alert/naming/leader.go create mode 100644 center/idents/idents.go create mode 100644 center/router/router_heartbeat.go create mode 100644 models/host_meta.go delete mode 100644 pushgw/idents/idents.go diff --git a/alert/alert.go b/alert/alert.go index 34a1e9dc..614bfb98 100644 --- a/alert/alert.go +++ b/alert/alert.go @@ -43,10 +43,15 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { } ctx := ctx.NewContext(context.Background(), db) + redis, err := storage.NewRedis(config.Redis) + if err != nil { + return nil, err + } + syncStats := memsto.NewSyncStats() alertStats := astats.NewSyncStats() - targetCache := memsto.NewTargetCache(ctx, syncStats) + targetCache := memsto.NewTargetCache(ctx, syncStats, redis) busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats) alertMuteCache := memsto.NewAlertMuteCache(ctx, syncStats) @@ -56,7 +61,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { externalProcessors := process.NewExternalProcessors() - Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, ctx, promClients) + Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, ctx, promClients, false) r := httpx.GinEngine(config.Global.RunMode, config.HTTP) rt := router.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors) @@ -71,7 +76,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { } func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, alertStats *astats.Stats, externalProcessors *process.ExternalProcessorsType, targetCache *memsto.TargetCacheType, busiGroupCache *memsto.BusiGroupCacheType, - alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, ctx *ctx.Context, promClients *prom.PromClientMap) { + alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, ctx *ctx.Context, promClients *prom.PromClientMap, isCenter bool) { userCache := memsto.NewUserCache(ctx, syncStats) userGroupCache := memsto.NewUserGroupCache(ctx, syncStats) alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats) @@ -81,12 +86,12 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al go models.InitNotifyConfig(ctx, alertc.Alerting.TemplatesDir) - naming := naming.NewNaming(ctx, alertc.Heartbeat) + naming := naming.NewNaming(ctx, alertc.Heartbeat, isCenter) writers := writer.NewWriters(pushgwc) record.NewScheduler(alertc, recordingRuleCache, promClients, writers, alertStats) - eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, promClients, naming, ctx, alertStats) + eval.NewScheduler(isCenter, alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, promClients, naming, ctx, alertStats) dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, webhookCache, notifyScript, alertc.Alerting, alertc.Ibex, ctx) consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp) diff --git a/alert/eval/alert_rule.go b/alert/eval/alert_rule.go index 20654d9e..24004dc8 100644 --- a/alert/eval/alert_rule.go +++ b/alert/eval/alert_rule.go @@ -15,6 +15,7 @@ import ( ) type Scheduler struct { + isCenter bool // key: hash alertRules map[string]*AlertRuleWorker @@ -35,10 +36,11 @@ type Scheduler struct { stats *astats.Stats } -func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType, +func NewScheduler(isCenter bool, aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType, busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, promClients *prom.PromClientMap, naming *naming.Naming, ctx *ctx.Context, stats *astats.Stats) *Scheduler { scheduler := &Scheduler{ + isCenter: isCenter, aconf: aconf, alertRules: make(map[string]*AlertRuleWorker), // recordRules: make(map[string]RuleContext), @@ -96,8 +98,12 @@ func (s *Scheduler) syncAlertRules() { alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.ctx) alertRuleWorkers[alertRule.Hash()] = alertRule } - } else if rule.IsHostRule() && s.naming.IamLeader() { - // all host rule will be processed by leader + } else if rule.IsHostRule() && s.isCenter { + // all host rule will be processed by center instance + + if !naming.DatasourceHashRing.IsHit(naming.HostDatasource, fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) { + continue + } processor := process.NewProcessor(rule, 0, s.alertRuleCache, s.targetCache, s.busiGroupCache, s.alertMuteCache, s.promClients, s.ctx, s.stats) alertRule := NewAlertRuleWorker(rule, 0, processor, s.promClients, s.ctx) alertRuleWorkers[alertRule.Hash()] = alertRule diff --git a/alert/eval/eval.go b/alert/eval/eval.go index 7f9b7011..fe17bfb1 100644 --- a/alert/eval/eval.go +++ b/alert/eval/eval.go @@ -188,36 +188,43 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom query := models.GetHostsQuery(rule.Queries) switch trigger.Type { - case "target_miss", "offset": - t := now - int64(trigger.Duration) - if trigger.Type == "offset" { - t = int64(trigger.Duration) - } - - hosts, err := models.TargetGetsByFilter(arw.ctx, query, trigger.Type, t, 0, 0) + case "target_miss": + targets, err := models.TargetGetsByFilter(arw.ctx, query, 0, 0) if err != nil { logger.Errorf("rule_eval:%s query:%v, error:%v", arw.Key(), query, err) continue } + t := now - int64(trigger.Duration) + hosts := arw.processor.TargetCache.GetMissHost(targets, t) for _, host := range hosts { m := make(map[string]string) - m["ident"] = host.Ident + m["ident"] = host lst = append(lst, common.NewAnomalyPoint(trigger.Type, m, now, float64(t), trigger.Severity)) } - case "pct_target_miss": - AllCount, err := models.TargetCountByFilter(arw.ctx, query, "", 0) + case "offset": + t := int64(trigger.Duration) + targets, err := models.TargetGetsByFilter(arw.ctx, query, 0, 0) if err != nil { logger.Errorf("rule_eval:%s query:%v, error:%v", arw.Key(), query, err) continue } - missCount, err := models.TargetCountByFilter(arw.ctx, query, trigger.Type, now-int64(trigger.Duration)) + hosts := arw.processor.TargetCache.GetOffsetHost(targets, t) + for _, host := range hosts { + m := make(map[string]string) + m["ident"] = host + lst = append(lst, common.NewAnomalyPoint(trigger.Type, m, now, float64(t), trigger.Severity)) + } + case "pct_target_miss": + targets, err := models.TargetGetsByFilter(arw.ctx, query, 0, 0) if err != nil { logger.Errorf("rule_eval:%s query:%v, error:%v", arw.Key(), query, err) continue } - pct := float64(missCount) / float64(AllCount) * 100 + t := now - int64(trigger.Duration) + hosts := arw.processor.TargetCache.GetMissHost(targets, t) + pct := float64(len(hosts)) / float64(len(targets)) * 100 if pct >= float64(trigger.Percent) { lst = append(lst, common.NewAnomalyPoint(trigger.Type, nil, now, pct, trigger.Severity)) } diff --git a/alert/naming/hashring.go b/alert/naming/hashring.go index 87e6569b..073db0c7 100644 --- a/alert/naming/hashring.go +++ b/alert/naming/hashring.go @@ -15,6 +15,7 @@ type DatasourceHashRingType struct { } // for alert_rule sharding +var HostDatasource int64 = 100000 var DatasourceHashRing = DatasourceHashRingType{Rings: make(map[int64]*consistent.Consistent)} func NewConsistentHashRing(replicas int32, nodes []string) *consistent.Consistent { diff --git a/alert/naming/heartbeat.go b/alert/naming/heartbeat.go index d21c8a6a..fb3c6066 100644 --- a/alert/naming/heartbeat.go +++ b/alert/naming/heartbeat.go @@ -14,14 +14,16 @@ import ( ) type Naming struct { - ctx *ctx.Context - Heartbeat aconf.HeartbeatConfig + ctx *ctx.Context + heartbeatConfig aconf.HeartbeatConfig + isCenter bool } -func NewNaming(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig) *Naming { +func NewNaming(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig, isCenter bool) *Naming { naming := &Naming{ - ctx: ctx, - Heartbeat: heartbeat, + ctx: ctx, + heartbeatConfig: heartbeat, + isCenter: isCenter, } naming.Heartbeats() return naming @@ -58,7 +60,7 @@ func (n *Naming) DeleteInactiveInstances() { } func (n *Naming) loopHeartbeat() { - interval := time.Duration(n.Heartbeat.Interval) * time.Millisecond + interval := time.Duration(n.heartbeatConfig.Interval) * time.Millisecond for { time.Sleep(interval) if err := n.heartbeat(); err != nil { @@ -72,19 +74,19 @@ func (n *Naming) heartbeat() error { var err error // 在页面上维护实例和集群的对应关系 - datasourceIds, err = models.GetDatasourceIdsByClusterName(n.ctx, n.Heartbeat.ClusterName) + datasourceIds, err = models.GetDatasourceIdsByClusterName(n.ctx, n.heartbeatConfig.ClusterName) if err != nil { return err } if len(datasourceIds) == 0 { - err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.Heartbeat.Endpoint, n.Heartbeat.ClusterName, 0) + err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.ClusterName, 0) if err != nil { logger.Warningf("heartbeat with cluster %s err:%v", "", err) } } else { for i := 0; i < len(datasourceIds); i++ { - err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.Heartbeat.Endpoint, n.Heartbeat.ClusterName, datasourceIds[i]) + err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.ClusterName, datasourceIds[i]) if err != nil { logger.Warningf("heartbeat with cluster %d err:%v", datasourceIds[i], err) } @@ -110,6 +112,32 @@ func (n *Naming) heartbeat() error { localss[datasourceIds[i]] = newss } + if n.isCenter { + // 如果是中心节点,还需要处理 host 类型的告警规则,host 类型告警规则,和数据源无关,想复用下数据源的 hash ring,想用一个虚假的数据源 id 来处理 + // if is center node, we need to handle host type alerting rules, host type alerting rules are not related to datasource, we want to reuse the hash ring of datasource, we want to use a fake datasource id to handle it + err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.ClusterName, HostDatasource) + if err != nil { + logger.Warningf("heartbeat with cluster %s err:%v", "", err) + } + + servers, err := n.ActiveServers(HostDatasource) + if err != nil { + logger.Warningf("hearbeat %d get active server err:%v", HostDatasource, err) + return nil + } + + sort.Strings(servers) + newss := strings.Join(servers, " ") + + oldss, exists := localss[HostDatasource] + if exists && oldss == newss { + return nil + } + + RebuildConsistentHashRing(HostDatasource, servers) + localss[HostDatasource] = newss + } + return nil } @@ -121,8 +149,3 @@ func (n *Naming) ActiveServers(datasourceId int64) ([]string, error) { // 30秒内有心跳,就认为是活的 return models.AlertingEngineGetsInstances(n.ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30) } - -func (n *Naming) AllActiveServers() ([]string, error) { - // 30秒内有心跳,就认为是活的 - return models.AlertingEngineGetsInstances(n.ctx, "clock > ?", time.Now().Unix()-30) -} diff --git a/alert/naming/leader.go b/alert/naming/leader.go deleted file mode 100644 index 15192c9a..00000000 --- a/alert/naming/leader.go +++ /dev/null @@ -1,24 +0,0 @@ -package naming - -import ( - "sort" - - "github.com/toolkits/pkg/logger" -) - -func (n *Naming) IamLeader() bool { - servers, err := n.AllActiveServers() - if err != nil { - logger.Errorf("failed to get active servers: %v", err) - return false - } - - if len(servers) == 0 { - logger.Errorf("active servers empty") - return false - } - - sort.Strings(servers) - - return n.Heartbeat.Endpoint == servers[0] -} diff --git a/alert/process/process.go b/alert/process/process.go index bc1bc54e..5d8ad3b3 100644 --- a/alert/process/process.go +++ b/alert/process/process.go @@ -59,7 +59,7 @@ type Processor struct { groupName string atertRuleCache *memsto.AlertRuleCacheType - targetCache *memsto.TargetCacheType + TargetCache *memsto.TargetCacheType busiGroupCache *memsto.BusiGroupCacheType alertMuteCache *memsto.AlertMuteCacheType @@ -90,7 +90,7 @@ func NewProcessor(rule *models.AlertRule, datasourceId int64, atertRuleCache *me quit: make(chan struct{}), rule: rule, - targetCache: targetCache, + TargetCache: targetCache, busiGroupCache: busiGroupCache, alertMuteCache: alertMuteCache, atertRuleCache: atertRuleCache, @@ -126,7 +126,7 @@ func (arw *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, i // 如果 event 被 mute 了,本质也是 fire 的状态,这里无论如何都添加到 alertingKeys 中,防止 fire 的事件自动恢复了 hash := event.Hash alertingKeys[hash] = struct{}{} - if mute.IsMuted(cachedRule, event, arw.targetCache, arw.alertMuteCache) { + if mute.IsMuted(cachedRule, event, arw.TargetCache, arw.alertMuteCache) { logger.Debugf("rule_eval:%s event:%v is muted", arw.Key(), event) continue } @@ -392,7 +392,7 @@ func (arw *Processor) fillTags(anomalyPoint common.AnomalyPoint) { func (arw *Processor) mayHandleIdent() { // handle ident if ident, has := arw.tagsMap["ident"]; has { - if target, exists := arw.targetCache.Get(ident); exists { + if target, exists := arw.TargetCache.Get(ident); exists { arw.target = target.Ident arw.targetNote = target.Note } diff --git a/center/center.go b/center/center.go index 9aa111c3..0958524b 100644 --- a/center/center.go +++ b/center/center.go @@ -8,6 +8,7 @@ import ( "github.com/ccfos/nightingale/v6/alert/astats" "github.com/ccfos/nightingale/v6/alert/process" "github.com/ccfos/nightingale/v6/center/cconf" + "github.com/ccfos/nightingale/v6/center/idents" "github.com/ccfos/nightingale/v6/center/sso" "github.com/ccfos/nightingale/v6/conf" "github.com/ccfos/nightingale/v6/memsto" @@ -17,7 +18,6 @@ import ( "github.com/ccfos/nightingale/v6/pkg/i18nx" "github.com/ccfos/nightingale/v6/pkg/logx" "github.com/ccfos/nightingale/v6/prom" - "github.com/ccfos/nightingale/v6/pushgw/idents" "github.com/ccfos/nightingale/v6/pushgw/writer" "github.com/ccfos/nightingale/v6/storage" @@ -54,13 +54,15 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { return nil, err } + idents := idents.New(db, redis) + syncStats := memsto.NewSyncStats() alertStats := astats.NewSyncStats() - idents := idents.New(db, config.Pushgw.DatasourceId, config.Pushgw.MaxOffset) + // idents := idents.New(db, config.Pushgw.DatasourceId, config.Pushgw.MaxOffset) sso := sso.Init(config.Center, ctx) busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats) - targetCache := memsto.NewTargetCache(ctx, syncStats) + targetCache := memsto.NewTargetCache(ctx, syncStats, redis) dsCache := memsto.NewDatasourceCache(ctx, syncStats) alertMuteCache := memsto.NewAlertMuteCache(ctx, syncStats) alertRuleCache := memsto.NewAlertRuleCache(ctx, syncStats) @@ -68,13 +70,13 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { promClients := prom.NewPromClient(ctx, config.Alert.Heartbeat) externalProcessors := process.NewExternalProcessors() - alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, ctx, promClients) + alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, ctx, promClients, true) writers := writer.NewWriters(config.Pushgw) alertrtRouter := alertrt.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors) - centerRouter := centerrt.New(config.HTTP, config.Center, cconf.Operations, dsCache, promClients, redis, sso, ctx) - pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, idents, writers, ctx) + centerRouter := centerrt.New(config.HTTP, config.Center, cconf.Operations, dsCache, promClients, redis, sso, ctx, idents) + pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, writers, ctx) r := httpx.GinEngine(config.Global.RunMode, config.HTTP) diff --git a/center/idents/idents.go b/center/idents/idents.go new file mode 100644 index 00000000..62aa84df --- /dev/null +++ b/center/idents/idents.go @@ -0,0 +1,133 @@ +package idents + +import ( + "context" + "sync" + "time" + + "github.com/ccfos/nightingale/v6/models" + "github.com/ccfos/nightingale/v6/storage" + + "github.com/toolkits/pkg/logger" + "github.com/toolkits/pkg/slice" + "gorm.io/gorm" +) + +type Set struct { + sync.RWMutex + items map[string]models.HostMeta + db *gorm.DB + redis storage.Redis +} + +func New(db *gorm.DB, redis storage.Redis) *Set { + set := &Set{ + items: make(map[string]models.HostMeta), + db: db, + redis: redis, + } + + set.Init() + return set +} + +func (s *Set) Init() { + go s.LoopPersist() +} + +func (s *Set) MSet(items map[string]models.HostMeta) { + s.Lock() + defer s.Unlock() + for ident, meta := range items { + s.items[ident] = meta + } +} + +func (s *Set) Set(ident string, meta models.HostMeta) { + s.Lock() + defer s.Unlock() + s.items[ident] = meta +} + +func (s *Set) LoopPersist() { + for { + time.Sleep(time.Second) + s.persist() + } +} + +func (s *Set) persist() { + var items map[string]models.HostMeta + + s.Lock() + if len(s.items) == 0 { + s.Unlock() + return + } + + items = s.items + s.items = make(map[string]models.HostMeta) + s.Unlock() + + s.updateMeta(items) +} + +func (s *Set) updateMeta(items map[string]models.HostMeta) { + m := make(map[string]models.HostMeta, 100) + now := time.Now().Unix() + num := 0 + + for _, meta := range items { + m[meta.Hostname] = meta + num++ + if num == 100 { + if err := s.updateTargets(m, now); err != nil { + logger.Errorf("failed to update targets: %v", err) + } + m = make(map[string]models.HostMeta, 100) + num = 0 + } + } + + if err := s.updateTargets(m, now); err != nil { + logger.Errorf("failed to update targets: %v", err) + } +} + +func (s *Set) updateTargets(m map[string]models.HostMeta, now int64) error { + count := int64(len(m)) + if count == 0 { + return nil + } + + var values []interface{} + for ident, meta := range m { + values = append(values, ident) + values = append(values, meta) + } + err := s.redis.MSet(context.Background(), values...).Err() + if err != nil { + return err + } + var lst []string + for ident := range m { + lst = append(lst, ident) + } + + // there are some idents not found in db, so insert them + var exists []string + err = s.db.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error + if err != nil { + return err + } + + news := slice.SubString(lst, exists) + for i := 0; i < len(news); i++ { + err = s.db.Exec("INSERT INTO target(ident, update_at) VALUES(?, ?)", news[i], now).Error + if err != nil { + logger.Error("failed to insert target:", news[i], "error:", err) + } + } + + return nil +} diff --git a/center/router/router.go b/center/router/router.go index b553c8a0..26db0357 100644 --- a/center/router/router.go +++ b/center/router/router.go @@ -9,6 +9,7 @@ import ( "github.com/ccfos/nightingale/v6/center/cconf" "github.com/ccfos/nightingale/v6/center/cstats" + "github.com/ccfos/nightingale/v6/center/idents" "github.com/ccfos/nightingale/v6/center/sso" "github.com/ccfos/nightingale/v6/memsto" "github.com/ccfos/nightingale/v6/pkg/aop" @@ -28,12 +29,13 @@ type Router struct { DatasourceCache *memsto.DatasourceCacheType PromClients *prom.PromClientMap Redis storage.Redis + IdentSet *idents.Set Sso *sso.SsoClient Ctx *ctx.Context } func New(httpConfig httpx.Config, center cconf.Center, operations cconf.Operation, ds *memsto.DatasourceCacheType, pc *prom.PromClientMap, - redis storage.Redis, sso *sso.SsoClient, ctx *ctx.Context) *Router { + redis storage.Redis, sso *sso.SsoClient, ctx *ctx.Context, identSet *idents.Set) *Router { return &Router{ HTTP: httpConfig, Center: center, @@ -43,6 +45,7 @@ func New(httpConfig httpx.Config, center cconf.Center, operations cconf.Operatio Redis: redis, Sso: sso, Ctx: ctx, + IdentSet: identSet, } } @@ -342,6 +345,8 @@ func (rt *Router) Config(r *gin.Engine) { service.POST("/conf-prop/encrypt", rt.confPropEncrypt) service.POST("/conf-prop/decrypt", rt.confPropDecrypt) + + service.POST("/heartbeat", rt.heartbeat) } rt.configNoRoute(r) diff --git a/center/router/router_heartbeat.go b/center/router/router_heartbeat.go new file mode 100644 index 00000000..81175315 --- /dev/null +++ b/center/router/router_heartbeat.go @@ -0,0 +1,41 @@ +package router + +import ( + "compress/gzip" + "encoding/json" + "io/ioutil" + "time" + + "github.com/ccfos/nightingale/v6/models" + + "github.com/gin-gonic/gin" + "github.com/toolkits/pkg/ginx" +) + +func (rt *Router) heartbeat(c *gin.Context) { + var bs []byte + var err error + var r *gzip.Reader + var req models.HostMeta + if c.GetHeader("Content-Encoding") == "gzip" { + r, err = gzip.NewReader(c.Request.Body) + if err != nil { + c.String(400, err.Error()) + return + } + defer r.Close() + bs, err = ioutil.ReadAll(r) + ginx.Dangerous(err) + } else { + defer c.Request.Body.Close() + bs, err = ioutil.ReadAll(c.Request.Body) + ginx.Dangerous(err) + } + + err = json.Unmarshal(bs, &req) + ginx.Dangerous(err) + + req.Offset = (time.Now().UnixMilli() - req.UnixTime) + rt.IdentSet.Set(req.Hostname, req) + ginx.NewRender(c).Message(nil) +} diff --git a/center/router/router_target.go b/center/router/router_target.go index 55c8d160..fcd2bdf5 100644 --- a/center/router/router_target.go +++ b/center/router/router_target.go @@ -2,15 +2,13 @@ package router import ( "context" + "encoding/json" "fmt" "net/http" - "strconv" "strings" "time" - "github.com/ccfos/nightingale/v6/alert/common" "github.com/ccfos/nightingale/v6/models" - "github.com/ccfos/nightingale/v6/pkg/prom" "github.com/gin-gonic/gin" "github.com/prometheus/common/model" @@ -29,10 +27,10 @@ func (rt *Router) targetGetsByHostFilter(c *gin.Context) { query := models.GetHostsQuery(f.Filters) - hosts, err := models.TargetGetsByFilter(rt.Ctx, query, "", 0, f.Limit, (f.P-1)*f.Limit) + hosts, err := models.TargetGetsByFilter(rt.Ctx, query, f.Limit, (f.P-1)*f.Limit) ginx.Dangerous(err) - total, err := models.TargetCountByFilter(rt.Ctx, query, "", 0) + total, err := models.TargetCountByFilter(rt.Ctx, query) ginx.Dangerous(err) ginx.NewRender(c).Data(gin.H{ @@ -45,7 +43,6 @@ func (rt *Router) targetGets(c *gin.Context) { bgid := ginx.QueryInt64(c, "bgid", -1) query := ginx.QueryStr(c, "query", "") limit := ginx.QueryInt(c, "limit", 30) - mins := ginx.QueryInt(c, "mins", 2) dsIds := queryDatasourceIds(c) total, err := models.TargetTotal(rt.Ctx, bgid, dsIds, query) @@ -57,55 +54,33 @@ func (rt *Router) targetGets(c *gin.Context) { if err == nil { now := time.Now() cache := make(map[int64]*models.BusiGroup) - targetsMap := make(map[string]*models.Target) + + var keys []string for i := 0; i < len(list); i++ { ginx.Dangerous(list[i].FillGroup(rt.Ctx, cache)) - targetsMap[strconv.FormatInt(list[i].DatasourceId, 10)+list[i].Ident] = list[i] - if now.Unix()-list[i].UpdateAt < 60 { - list[i].TargetUp = 1 - } + keys = append(keys, list[i].Ident) } - // query LoadPerCore / MemUtil / TargetUp / DiskUsedPercent from prometheus - // map key: cluster, map value: ident list - targets := make(map[int64][]string) - for i := 0; i < len(list); i++ { - targets[list[i].DatasourceId] = append(targets[list[i].DatasourceId], list[i].Ident) - } - - for dsId := range targets { - cc := rt.PromClients.GetCli(dsId) - - targetArr := targets[dsId] - if len(targetArr) == 0 { + metaMap := make(map[string]*models.HostMeta) + vals := rt.Redis.MGet(context.Background(), keys...).Val() + for _, value := range vals { + var meta models.HostMeta + if value == nil { continue } - - targetRe := strings.Join(targetArr, "|") - valuesMap := make(map[string]map[string]float64) - - for metric, ql := range rt.Center.TargetMetrics { - promql := fmt.Sprintf(ql, targetRe, mins) - values, err := instantQuery(context.Background(), cc, promql, now) - ginx.Dangerous(err) - valuesMap[metric] = values + err := json.Unmarshal([]byte(value.(string)), &meta) + if err != nil { + continue } + metaMap[meta.Hostname] = &meta + } - // handle values - for metric, values := range valuesMap { - for ident := range values { - mapkey := strconv.FormatInt(dsId, 10) + ident - if t, has := targetsMap[mapkey]; has { - switch metric { - case "LoadPerCore": - t.LoadPerCore = values[ident] - case "MemUtil": - t.MemUtil = values[ident] - case "DiskUtil": - t.DiskUtil = values[ident] - } - } + for i := 0; i < len(list); i++ { + if meta, ok := metaMap[list[i].Ident]; ok { + if now.UnixMilli()-meta.UnixTime < 60000 { + list[i].TargetUp = 1 } + list[i].FillMeta(meta) } } } @@ -116,30 +91,6 @@ func (rt *Router) targetGets(c *gin.Context) { }, nil) } -func instantQuery(ctx context.Context, c prom.API, promql string, ts time.Time) (map[string]float64, error) { - ret := make(map[string]float64) - - val, warnings, err := c.Query(ctx, promql, ts) - if err != nil { - return ret, err - } - - if len(warnings) > 0 { - return ret, fmt.Errorf("instant query occur warnings, promql: %s, warnings: %v", promql, warnings) - } - - // TODO 替换函数 - vectors := common.ConvertAnomalyPoints(val) - for i := range vectors { - ident, has := vectors[i].Labels["ident"] - if has { - ret[string(ident)] = vectors[i].Value - } - } - - return ret, nil -} - func (rt *Router) targetGetTags(c *gin.Context) { idents := ginx.QueryStr(c, "idents", "") idents = strings.ReplaceAll(idents, ",", " ") @@ -152,10 +103,6 @@ type targetTagsForm struct { Tags []string `json:"tags" binding:"required"` } -func (t targetTagsForm) Verify() { - -} - func (rt *Router) targetBindTagsByFE(c *gin.Context) { var f targetTagsForm ginx.BindJSON(c, &f) diff --git a/cli/upgrade/upgrade.go b/cli/upgrade/upgrade.go index 4a6abdda..950132f0 100644 --- a/cli/upgrade/upgrade.go +++ b/cli/upgrade/upgrade.go @@ -95,12 +95,6 @@ func Upgrade(configFile string) error { return err } - // target - err = models.TargetUpgradeToV6(ctx, m) - if err != nil { - return err - } - // recoding rule err = models.RecordingRuleUpgradeToV6(ctx, m) if err != nil { diff --git a/cmd/alert/main.go b/cmd/alert/main.go index e43b005b..7c1874aa 100644 --- a/cmd/alert/main.go +++ b/cmd/alert/main.go @@ -11,6 +11,7 @@ import ( "github.com/ccfos/nightingale/v6/alert" "github.com/ccfos/nightingale/v6/pkg/osx" "github.com/ccfos/nightingale/v6/pkg/version" + "github.com/toolkits/pkg/runner" ) diff --git a/cmd/center/main.go b/cmd/center/main.go index 48fb7632..97003ad8 100644 --- a/cmd/center/main.go +++ b/cmd/center/main.go @@ -11,6 +11,7 @@ import ( "github.com/ccfos/nightingale/v6/center" "github.com/ccfos/nightingale/v6/pkg/osx" "github.com/ccfos/nightingale/v6/pkg/version" + "github.com/toolkits/pkg/runner" ) diff --git a/conf/conf.go b/conf/conf.go index aa47e148..23874ad0 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -47,10 +47,6 @@ func InitConfig(configDir, cryptoKey string) (*ConfigType, error) { return nil, err } - if config.Pushgw.DatasourceId == 0 { - return nil, fmt.Errorf("datasourceId is 0") - } - if config.Alert.Heartbeat.IP == "" { // auto detect // config.Alert.Heartbeat.IP = fmt.Sprint(GetOutboundIP()) diff --git a/memsto/target_cache.go b/memsto/target_cache.go index 540dae50..35e95bb6 100644 --- a/memsto/target_cache.go +++ b/memsto/target_cache.go @@ -1,13 +1,15 @@ package memsto import ( + "context" + "encoding/json" "log" - "strings" "sync" "time" "github.com/ccfos/nightingale/v6/models" "github.com/ccfos/nightingale/v6/pkg/ctx" + "github.com/ccfos/nightingale/v6/storage" "github.com/pkg/errors" "github.com/toolkits/pkg/logger" @@ -20,17 +22,19 @@ type TargetCacheType struct { statLastUpdated int64 ctx *ctx.Context stats *Stats + redis storage.Redis sync.RWMutex targets map[string]*models.Target // key: ident } -func NewTargetCache(ctx *ctx.Context, stats *Stats) *TargetCacheType { +func NewTargetCache(ctx *ctx.Context, stats *Stats, redis storage.Redis) *TargetCacheType { tc := &TargetCacheType{ statTotal: -1, statLastUpdated: -1, ctx: ctx, stats: stats, + redis: redis, targets: make(map[string]*models.Target), } @@ -72,19 +76,40 @@ func (tc *TargetCacheType) Get(ident string) (*models.Target, bool) { return val, has } -func (tc *TargetCacheType) GetDeads(actives map[string]struct{}) map[string]*models.Target { - ret := make(map[string]*models.Target) - +func (tc *TargetCacheType) GetMissHost(targets []*models.Target, ts int64) []string { tc.RLock() defer tc.RUnlock() - - for ident, target := range tc.targets { - if _, has := actives[ident]; !has { - ret[ident] = target + var missHosts []string + for _, target := range targets { + target, exists := tc.targets[target.Ident] + if !exists { + missHosts = append(missHosts, target.Ident) + continue + } + if target.UnixTime < ts { + missHosts = append(missHosts, target.Ident) } } - return ret + return missHosts +} + +func (tc *TargetCacheType) GetOffsetHost(targets []*models.Target, ts int64) []string { + tc.RLock() + defer tc.RUnlock() + var hosts []string + for _, target := range targets { + target, exists := tc.targets[target.Ident] + if !exists { + continue + } + + if target.Offset > ts { + hosts = append(hosts, target.Ident) + } + } + + return hosts } func (tc *TargetCacheType) SyncTargets() { @@ -126,18 +151,14 @@ func (tc *TargetCacheType) syncTargets() error { return errors.WithMessage(err, "failed to call TargetGetsAll") } + metaMap := tc.GetHostMetas(lst) + m := make(map[string]*models.Target) for i := 0; i < len(lst); i++ { - lst[i].TagsJSON = strings.Fields(lst[i].Tags) - lst[i].TagsMap = make(map[string]string) - for _, item := range lst[i].TagsJSON { - arr := strings.Split(item, "=") - if len(arr) != 2 { - continue - } - lst[i].TagsMap[arr[0]] = arr[1] + lst[i].FillTagsMap() + if meta, ok := metaMap[lst[i].Ident]; ok { + lst[i].FillMeta(meta) } - m[lst[i].Ident] = lst[i] } @@ -150,3 +171,50 @@ func (tc *TargetCacheType) syncTargets() error { return nil } + +func (tc *TargetCacheType) GetHostMetas(targets []*models.Target) map[string]*models.HostMeta { + metaMap := make(map[string]*models.HostMeta) + if tc.redis == nil { + return metaMap + } + var metas []*models.HostMeta + num := 0 + var keys []string + for i := 0; i < len(targets); i++ { + keys = append(keys, targets[i].Ident) + num++ + if num == 100 { + vals := tc.redis.MGet(context.Background(), keys...).Val() + for _, value := range vals { + var meta models.HostMeta + if value == nil { + continue + } + err := json.Unmarshal([]byte(value.(string)), &meta) + if err != nil { + logger.Errorf("failed to unmarshal host meta: %s value:%v", err, value) + continue + } + metaMap[meta.Hostname] = &meta + } + keys = keys[:0] + metas = metas[:0] + num = 0 + } + } + + vals := tc.redis.MGet(context.Background(), keys...).Val() + for _, value := range vals { + var meta models.HostMeta + if value == nil { + continue + } + err := json.Unmarshal([]byte(value.(string)), &meta) + if err != nil { + continue + } + metaMap[meta.Hostname] = &meta + } + + return metaMap +} diff --git a/models/host_meta.go b/models/host_meta.go new file mode 100644 index 00000000..a3127e4e --- /dev/null +++ b/models/host_meta.go @@ -0,0 +1,23 @@ +package models + +import "encoding/json" + +type HostMeta struct { + AgentVersion string `json:"agent_version"` + Os string `json:"os"` + Arch string `json:"arch"` + Hostname string `json:"hostname"` + CpuNum int `json:"cpu_num"` + CpuUtil float64 `json:"cpu_util"` + MemUtil float64 `json:"mem_util"` + Offset int64 `json:"offset"` + UnixTime int64 `json:"unixtime"` +} + +func (h HostMeta) MarshalBinary() ([]byte, error) { + return json.Marshal(h) +} + +func (h *HostMeta) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, h) +} diff --git a/models/target.go b/models/target.go index 9bfdbc05..e697f577 100644 --- a/models/target.go +++ b/models/target.go @@ -6,56 +6,34 @@ import ( "time" "github.com/ccfos/nightingale/v6/pkg/ctx" - "github.com/toolkits/pkg/logger" "github.com/pkg/errors" "gorm.io/gorm" ) type Target struct { - Id int64 `json:"id" gorm:"primaryKey"` - GroupId int64 `json:"group_id"` - GroupObj *BusiGroup `json:"group_obj" gorm:"-"` - Cluster string `json:"cluster"` - DatasourceId int64 `json:"datasource_id"` - Ident string `json:"ident"` - Note string `json:"note"` - Tags string `json:"-"` - TagsJSON []string `json:"tags" gorm:"-"` - TagsMap map[string]string `json:"-" gorm:"-"` // internal use, append tags to series - UpdateAt int64 `json:"update_at"` - Offset int64 `json:"offset"` + Id int64 `json:"id" gorm:"primaryKey"` + GroupId int64 `json:"group_id"` + GroupObj *BusiGroup `json:"group_obj" gorm:"-"` + Cluster string `json:"cluster"` + Ident string `json:"ident"` + Note string `json:"note"` + Tags string `json:"-"` + TagsJSON []string `json:"tags" gorm:"-"` + TagsMap map[string]string `json:"-" gorm:"-"` // internal use, append tags to series - TargetUp float64 `json:"target_up" gorm:"-"` - LoadPerCore float64 `json:"load_per_core" gorm:"-"` - MemUtil float64 `json:"mem_util" gorm:"-"` - DiskUtil float64 `json:"disk_util" gorm:"-"` + UnixTime int64 `json:"unixtime" gorm:"-"` + Offset int64 `json:"offset" gorm:"-"` + TargetUp float64 `json:"target_up" gorm:"-"` + MemUtil float64 `json:"mem_util" gorm:"-"` + CpuNum int `json:"cpu_num" gorm:"-"` + CpuUtil float64 `json:"cpu_util" gorm:"-"` } func (t *Target) TableName() string { return "target" } -func (t *Target) Add(ctx *ctx.Context) error { - obj, err := TargetGet(ctx, "ident = ?", t.Ident) - if err != nil { - return err - } - - if obj == nil { - return Insert(ctx, t) - } - - if obj.DatasourceId != t.DatasourceId { - return DB(ctx).Model(&Target{}).Where("ident = ?", t.Ident).Updates(map[string]interface{}{ - "datasource_id": t.DatasourceId, - "update_at": t.UpdateAt, - }).Error - } - - return nil -} - func (t *Target) FillGroup(ctx *ctx.Context, cache map[int64]*BusiGroup) error { if t.GroupId <= 0 { return nil @@ -136,9 +114,9 @@ func TargetGets(ctx *ctx.Context, bgid int64, dsIds []int64, query string, limit } // 根据 DatasourceIds, groupids, tags, hosts 查询 targets -func TargetGetsByFilter(ctx *ctx.Context, query map[string]interface{}, typ string, ts int64, limit, offset int) ([]*Target, error) { +func TargetGetsByFilter(ctx *ctx.Context, query map[string]interface{}, limit, offset int) ([]*Target, error) { var lst []*Target - session := TargetFilterQueryBuild(ctx, query, typ, ts, limit, offset) + session := TargetFilterQueryBuild(ctx, query, limit, offset) err := session.Order("ident").Find(&lst).Error cache := make(map[int64]*BusiGroup) for i := 0; i < len(lst); i++ { @@ -149,24 +127,17 @@ func TargetGetsByFilter(ctx *ctx.Context, query map[string]interface{}, typ stri return lst, err } -func TargetCountByFilter(ctx *ctx.Context, query map[string]interface{}, typ string, ts int64) (int64, error) { - session := TargetFilterQueryBuild(ctx, query, typ, ts, 0, 0) +func TargetCountByFilter(ctx *ctx.Context, query map[string]interface{}) (int64, error) { + session := TargetFilterQueryBuild(ctx, query, 0, 0) return Count(session) } -func TargetFilterQueryBuild(ctx *ctx.Context, query map[string]interface{}, typ string, ts int64, limit, offset int) *gorm.DB { +func TargetFilterQueryBuild(ctx *ctx.Context, query map[string]interface{}, limit, offset int) *gorm.DB { session := DB(ctx).Model(&Target{}) for k, v := range query { session = session.Where(k, v) } - switch typ { - case "target_miss", "pct_target_miss": - session = session.Where("update_at < ?", ts) - case "offset": - session = session.Where("offset > ?", ts) - } - if limit > 0 { session = session.Limit(limit).Offset(offset) } @@ -288,6 +259,26 @@ func (t *Target) DelTags(ctx *ctx.Context, tags []string) error { }).Error } +func (t *Target) FillTagsMap() { + t.TagsJSON = strings.Fields(t.Tags) + t.TagsMap = make(map[string]string) + for _, item := range t.TagsJSON { + arr := strings.Split(item, "=") + if len(arr) != 2 { + continue + } + t.TagsMap[arr[0]] = arr[1] + } +} + +func (t *Target) FillMeta(meta *HostMeta) { + t.MemUtil = meta.MemUtil + t.CpuUtil = meta.CpuUtil + t.CpuNum = meta.CpuNum + t.UnixTime = meta.UnixTime + t.Offset = meta.Offset +} + func TargetIdents(ctx *ctx.Context, ids []int64) ([]string, error) { var ret []string @@ -323,25 +314,3 @@ func IdentsFilter(ctx *ctx.Context, idents []string, where string, args ...inter func (m *Target) UpdateFieldsMap(ctx *ctx.Context, fields map[string]interface{}) error { return DB(ctx).Model(m).Updates(fields).Error } - -func TargetUpgradeToV6(ctx *ctx.Context, dsm map[string]Datasource) error { - var lst []*Target - err := DB(ctx).Find(&lst).Error - if err != nil { - return err - } - - for i := 0; i < len(lst); i++ { - ds, exists := dsm[lst[i].Cluster] - if !exists { - continue - } - lst[i].DatasourceId = ds.Id - - err = lst[i].UpdateFieldsMap(ctx, map[string]interface{}{"datasource_id": lst[i].DatasourceId}) - if err != nil { - logger.Errorf("update target:%d datasource ids failed, %v", lst[i].Id, err) - } - } - return nil -} diff --git a/pushgw/idents/idents.go b/pushgw/idents/idents.go deleted file mode 100644 index 0c744c2c..00000000 --- a/pushgw/idents/idents.go +++ /dev/null @@ -1,178 +0,0 @@ -package idents - -import ( - "math" - "sync" - "time" - - "github.com/toolkits/pkg/logger" - "github.com/toolkits/pkg/slice" - "gorm.io/gorm" -) - -type Set struct { - sync.Mutex - items map[string]int64 - db *gorm.DB - datasourceId int64 - maxOffset int64 -} - -func New(db *gorm.DB, dsId, maxOffset int64) *Set { - if maxOffset <= 0 { - maxOffset = 500 - } - set := &Set{ - items: make(map[string]int64), - db: db, - datasourceId: dsId, - maxOffset: maxOffset, - } - - set.Init() - return set -} - -func (s *Set) Init() { - go s.LoopPersist() -} - -func (s *Set) MSet(items map[string]int64) { - s.Lock() - defer s.Unlock() - for ident, ts := range items { - s.items[ident] = ts - } -} - -func (s *Set) LoopPersist() { - for { - time.Sleep(time.Second) - s.persist() - } -} - -func (s *Set) persist() { - var items map[string]int64 - - s.Lock() - if len(s.items) == 0 { - s.Unlock() - return - } - - items = s.items - s.items = make(map[string]int64) - s.Unlock() - - s.updateTimestamp(items) -} - -func (s *Set) updateTimestamp(items map[string]int64) { - lst := make([]string, 0, 100) - offsetLst := make(map[string]int64) - now := time.Now().Unix() - num := 0 - - largeOffsetTargets, _ := s.GetLargeOffsetTargets() - - for ident, ts := range items { - // 和当前时间相差 maxOffset 毫秒以上的,更新偏移的时间 - // compare with current time, if offset is larger than maxOffset, update offset - offset := int64(math.Abs(float64(ts - time.Now().UnixMilli()))) - if offset >= s.maxOffset { - offsetLst[ident] = offset - } - - // 如果是大偏移的,也更新时间 - // if offset is large, update timestamp - if _, ok := largeOffsetTargets[ident]; ok { - offsetLst[ident] = offset - } - - lst = append(lst, ident) - num++ - if num == 100 { - if err := s.updateTargets(lst, now); err != nil { - logger.Errorf("failed to update targets: %v", err) - } - lst = lst[:0] - num = 0 - } - } - - if err := s.updateTargets(lst, now); err != nil { - logger.Errorf("failed to update targets: %v", err) - } - - for ident, offset := range offsetLst { - if err := s.updateTargetsAndOffset(ident, offset, now); err != nil { - logger.Errorf("failed to update offset: %v", err) - } - } -} - -func (s *Set) updateTargets(lst []string, now int64) error { - count := int64(len(lst)) - if count == 0 { - return nil - } - - ret := s.db.Table("target").Where("ident in ?", lst).Update("update_at", now) - if ret.Error != nil { - return ret.Error - } - - if ret.RowsAffected == count { - return nil - } - - // there are some idents not found in db, so insert them - var exists []string - err := s.db.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error - if err != nil { - return err - } - - news := slice.SubString(lst, exists) - for i := 0; i < len(news); i++ { - err = s.db.Exec("INSERT INTO target(ident, update_at, datasource_id) VALUES(?, ?, ?)", news[i], now, s.datasourceId).Error - if err != nil { - logger.Error("failed to insert target:", news[i], "error:", err) - } - } - - return nil -} - -func (s *Set) updateTargetsAndOffset(ident string, offset, now int64) error { - ret := s.db.Table("target").Where("ident = ?", ident).Updates(map[string]interface{}{"update_at": now, "offset": offset}) - if ret.Error != nil { - return ret.Error - } - if ret.RowsAffected == 1 { - return nil - } - - // there are some idents not found in db, so insert them - err := s.db.Exec("INSERT INTO target(ident, offset, update_at, datasource_id) VALUES(?, ?, ?, ?)", ident, offset, now, s.datasourceId).Error - if err != nil { - logger.Error("failed to insert target:", ident, "error:", err) - } - - return nil -} - -func (s *Set) GetLargeOffsetTargets() (map[string]struct{}, error) { - var targets []string - err := s.db.Table("target").Where("offset > ?", s.maxOffset).Pluck("ident", &targets).Error - if err != nil { - return nil, err - } - - var m = make(map[string]struct{}, len(targets)) - for i := 0; i < len(targets); i++ { - m[targets[i]] = struct{}{} - } - return m, nil -} diff --git a/pushgw/pconf/conf.go b/pushgw/pconf/conf.go index 7028c100..ddb8f6f2 100644 --- a/pushgw/pconf/conf.go +++ b/pushgw/pconf/conf.go @@ -10,9 +10,7 @@ import ( ) type Pushgw struct { - DatasourceId int64 BusiGroupLabelKey string - MaxOffset int64 LabelRewrite bool ForceUseServerTS bool DebugSample map[string]string diff --git a/pushgw/pushgw.go b/pushgw/pushgw.go index 0d1b9c0c..0cbf82ff 100644 --- a/pushgw/pushgw.go +++ b/pushgw/pushgw.go @@ -9,14 +9,12 @@ import ( "github.com/ccfos/nightingale/v6/pkg/ctx" "github.com/ccfos/nightingale/v6/pkg/httpx" "github.com/ccfos/nightingale/v6/pkg/logx" - "github.com/ccfos/nightingale/v6/pushgw/idents" "github.com/ccfos/nightingale/v6/pushgw/router" "github.com/ccfos/nightingale/v6/pushgw/writer" "github.com/ccfos/nightingale/v6/storage" ) type PushgwProvider struct { - Ident *idents.Set Router *router.Router } @@ -37,16 +35,15 @@ func Initialize(configDir string, cryptoKey string) (func(), error) { } ctx := ctx.NewContext(context.Background(), db) - idents := idents.New(db, config.Pushgw.DatasourceId, config.Pushgw.MaxOffset) stats := memsto.NewSyncStats() busiGroupCache := memsto.NewBusiGroupCache(ctx, stats) - targetCache := memsto.NewTargetCache(ctx, stats) + targetCache := memsto.NewTargetCache(ctx, stats, nil) writers := writer.NewWriters(config.Pushgw) r := httpx.GinEngine(config.Global.RunMode, config.HTTP) - rt := router.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, idents, writers, ctx) + rt := router.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, writers, ctx) rt.Config(r) httpClean := httpx.Init(config.HTTP, r) diff --git a/pushgw/router/fns.go b/pushgw/router/fns.go index fff637f7..969d1cc9 100644 --- a/pushgw/router/fns.go +++ b/pushgw/router/fns.go @@ -64,13 +64,13 @@ func (rt *Router) AppendLabels(pt *prompb.TimeSeries, target *models.Target, bgC } } -func getTs(pt *prompb.TimeSeries) int64 { - if len(pt.Samples) == 0 { - return 0 - } +// func getTs(pt *prompb.TimeSeries) int64 { +// if len(pt.Samples) == 0 { +// return 0 +// } - return pt.Samples[0].Timestamp -} +// return pt.Samples[0].Timestamp +// } func (rt *Router) debugSample(remoteAddr string, v *prompb.TimeSeries) { filter := rt.Pushgw.DebugSample diff --git a/pushgw/router/router.go b/pushgw/router/router.go index eafa7128..387f9947 100644 --- a/pushgw/router/router.go +++ b/pushgw/router/router.go @@ -6,7 +6,6 @@ import ( "github.com/ccfos/nightingale/v6/memsto" "github.com/ccfos/nightingale/v6/pkg/ctx" "github.com/ccfos/nightingale/v6/pkg/httpx" - "github.com/ccfos/nightingale/v6/pushgw/idents" "github.com/ccfos/nightingale/v6/pushgw/pconf" "github.com/ccfos/nightingale/v6/pushgw/writer" ) @@ -16,14 +15,13 @@ type Router struct { Pushgw pconf.Pushgw TargetCache *memsto.TargetCacheType BusiGroupCache *memsto.BusiGroupCacheType - IdentSet *idents.Set - Writers *writer.WritersType - Ctx *ctx.Context + // IdentSet *idents.Set + Writers *writer.WritersType + Ctx *ctx.Context } -func New(httpConfig httpx.Config, pushgw pconf.Pushgw, tc *memsto.TargetCacheType, bg *memsto.BusiGroupCacheType, set *idents.Set, writers *writer.WritersType, ctx *ctx.Context) *Router { +func New(httpConfig httpx.Config, pushgw pconf.Pushgw, tc *memsto.TargetCacheType, bg *memsto.BusiGroupCacheType, writers *writer.WritersType, ctx *ctx.Context) *Router { return &Router{ - IdentSet: set, HTTP: httpConfig, Writers: writers, Ctx: ctx, diff --git a/pushgw/router/router_datadog.go b/pushgw/router/router_datadog.go index 74262eb3..648f2f64 100644 --- a/pushgw/router/router_datadog.go +++ b/pushgw/router/router_datadog.go @@ -224,7 +224,6 @@ func (r *Router) datadogSeries(c *gin.Context) { succ int fail int msg = "received" - ids = make(map[string]int64) ) for i := 0; i < cnt; i++ { @@ -246,9 +245,6 @@ func (r *Router) datadogSeries(c *gin.Context) { } if ident != "" { - // register host - ids[ident] = getTs(pt) - // fill tags target, has := r.TargetCache.Get(ident) if has { @@ -273,7 +269,6 @@ func (r *Router) datadogSeries(c *gin.Context) { if succ > 0 { CounterSampleTotal.WithLabelValues("datadog").Add(float64(succ)) - r.IdentSet.MSet(ids) } c.JSON(200, gin.H{ diff --git a/pushgw/router/router_openfalcon.go b/pushgw/router/router_openfalcon.go index 38db8cb0..1f9e61c7 100644 --- a/pushgw/router/router_openfalcon.go +++ b/pushgw/router/router_openfalcon.go @@ -180,7 +180,6 @@ func (rt *Router) falconPush(c *gin.Context) { fail int msg = "received" ts = time.Now().Unix() - ids = make(map[string]int64) ) for i := 0; i < len(arr); i++ { @@ -196,8 +195,6 @@ func (rt *Router) falconPush(c *gin.Context) { } if ident != "" { - // register host - ids[ident] = getTs(pt) // fill tags target, has := rt.TargetCache.Get(ident) if has { @@ -222,7 +219,6 @@ func (rt *Router) falconPush(c *gin.Context) { if succ > 0 { CounterSampleTotal.WithLabelValues("openfalcon").Add(float64(succ)) - rt.IdentSet.MSet(ids) } c.JSON(200, gin.H{ diff --git a/pushgw/router/router_opentsdb.go b/pushgw/router/router_opentsdb.go index 04113947..f665da5d 100644 --- a/pushgw/router/router_opentsdb.go +++ b/pushgw/router/router_opentsdb.go @@ -166,7 +166,6 @@ func (rt *Router) openTSDBPut(c *gin.Context) { fail int msg = "received" ts = time.Now().Unix() - ids = make(map[string]int64) ) for i := 0; i < len(arr); i++ { @@ -191,9 +190,6 @@ func (rt *Router) openTSDBPut(c *gin.Context) { host, has := arr[i].Tags["ident"] if has { - // register host - ids[host] = getTs(pt) - // fill tags target, has := rt.TargetCache.Get(host) if has { @@ -218,7 +214,6 @@ func (rt *Router) openTSDBPut(c *gin.Context) { if succ > 0 { CounterSampleTotal.WithLabelValues("opentsdb").Add(float64(succ)) - rt.IdentSet.MSet(ids) } c.JSON(200, gin.H{ diff --git a/pushgw/router/router_remotewrite.go b/pushgw/router/router_remotewrite.go index db4cceb6..3b98c24a 100644 --- a/pushgw/router/router_remotewrite.go +++ b/pushgw/router/router_remotewrite.go @@ -4,7 +4,6 @@ import ( "io" "io/ioutil" "net/http" - "time" "github.com/gin-gonic/gin" "github.com/gogo/protobuf/proto" @@ -79,8 +78,6 @@ func (rt *Router) remoteWrite(c *gin.Context) { } var ( - now = time.Now().Unix() - ids = make(map[string]int64) ident string metric string ) @@ -101,9 +98,6 @@ func (rt *Router) remoteWrite(c *gin.Context) { } if len(ident) > 0 { - // register host - ids[ident] = now - // fill tags target, has := rt.TargetCache.Get(ident) if has { @@ -125,7 +119,6 @@ func (rt *Router) remoteWrite(c *gin.Context) { } CounterSampleTotal.WithLabelValues("prometheus").Add(float64(count)) - rt.IdentSet.MSet(ids) } // DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling diff --git a/storage/redis.go b/storage/redis.go index dc2eef90..a1ad80f1 100644 --- a/storage/redis.go +++ b/storage/redis.go @@ -29,6 +29,8 @@ type Redis interface { Del(ctx context.Context, keys ...string) *redis.IntCmd Get(ctx context.Context, key string) *redis.StringCmd Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd + MSet(ctx context.Context, values ...interface{}) *redis.StatusCmd + MGet(ctx context.Context, keys ...string) *redis.SliceCmd HGetAll(ctx context.Context, key string) *redis.MapStringStringCmd HSet(ctx context.Context, key string, values ...interface{}) *redis.IntCmd HDel(ctx context.Context, key string, fields ...string) *redis.IntCmd