feat: auto drop metrics by ident threshold (#1845)

* auto drop data by ident

* refactor drop ident
This commit is contained in:
Yening Qin 2024-01-19 19:01:13 +08:00 committed by GitHub
parent e52a76921f
commit 840221d9ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 124 additions and 25 deletions

1
go.mod
View File

@ -21,7 +21,6 @@ require (
github.com/mailru/easyjson v0.7.7
github.com/mattn/go-isatty v0.0.19
github.com/mojocn/base64Captcha v1.3.6
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pelletier/go-toml/v2 v2.0.8
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0

2
go.sum
View File

@ -196,8 +196,6 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mojocn/base64Captcha v1.3.6 h1:gZEKu1nsKpttuIAQgWHO+4Mhhls8cAKyiV2Ew03H+Tw=
github.com/mojocn/base64Captcha v1.3.6/go.mod h1:i5CtHvm+oMbj1UzEPXaA8IH/xHFZ3DGY3Wh3dBpZ28E=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pelletier/go-toml/v2 v2.0.1/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=

100
memsto/drop_ident.go Normal file
View File

@ -0,0 +1,100 @@
package memsto
import (
"sync"
"time"
)
type Item struct {
Count int
Ts int64
}
type IdentCountCacheType struct {
sync.RWMutex
idents map[string]Item
}
func NewIdentCountCache() *IdentCountCacheType {
d := &IdentCountCacheType{
idents: make(map[string]Item),
}
go d.CronDeleteExpired()
return d
}
// Set ident
func (c *IdentCountCacheType) Set(ident string, count int, ts int64) {
c.Lock()
item := Item{
Count: count,
Ts: ts,
}
c.idents[ident] = item
c.Unlock()
}
func (c *IdentCountCacheType) Increment(ident string, num int) {
now := time.Now().Unix()
c.Lock()
if item, exists := c.idents[ident]; exists {
item.Count += num
item.Ts = now
c.idents[ident] = item
} else {
item := Item{
Count: num,
Ts: now,
}
c.idents[ident] = item
}
c.Unlock()
}
// check exists ident
func (c *IdentCountCacheType) Exists(ident string) bool {
c.RLock()
_, exists := c.idents[ident]
c.RUnlock()
return exists
}
func (c *IdentCountCacheType) Get(ident string) int {
c.RLock()
defer c.RUnlock()
item, exists := c.idents[ident]
if !exists {
return 0
}
return item.Count
}
func (c *IdentCountCacheType) GetsAndFlush() map[string]Item {
c.Lock()
data := make(map[string]Item)
for k, v := range c.idents {
data[k] = v
}
c.idents = make(map[string]Item)
c.Unlock()
return data
}
func (c *IdentCountCacheType) CronDeleteExpired() {
for {
time.Sleep(60 * time.Second)
c.deleteExpired()
}
}
// cron delete expired ident
func (c *IdentCountCacheType) deleteExpired() {
c.Lock()
now := time.Now().Unix()
for ident, item := range c.idents {
if item.Ts < now-120 {
delete(c.idents, ident)
}
}
c.Unlock()
}

View File

@ -13,6 +13,7 @@ type Pushgw struct {
BusiGroupLabelKey string
IdentMetrics []string
IdentStatsThreshold int
IdentDropThreshold int
WriteConcurrency int
LabelRewrite bool
ForceUseServerTS bool
@ -79,7 +80,11 @@ func (p *Pushgw) PreCheck() {
}
if p.IdentStatsThreshold <= 0 {
p.IdentStatsThreshold = 400
p.IdentStatsThreshold = 1500
}
if p.IdentDropThreshold <= 0 {
p.IdentDropThreshold = 20000
}
for _, writer := range p.Writers {

View File

@ -97,7 +97,7 @@ func (rt *Router) debugSample(remoteAddr string, v *prompb.TimeSeries) {
logger.Debugf("--> debug sample from: %s, sample: %s", remoteAddr, v.String())
}
func (rt *Router) DropSample(remoteAddr string, v *prompb.TimeSeries) bool {
func (rt *Router) DropSample(v *prompb.TimeSeries) bool {
filters := rt.Pushgw.DropSample
if len(filters) == 0 {
return false
@ -141,9 +141,15 @@ func (rt *Router) ForwardByIdent(clientIP string, ident string, v *prompb.TimeSe
return
}
IdentStatsInc(ident)
if rt.DropSample(clientIP, v) {
CounterDropSampleTotal.WithLabelValues(clientIP).Inc()
IdentStats.Increment(ident, 1)
if rt.DropSample(v) {
CounterDropSampleTotal.WithLabelValues(ident).Inc()
return
}
count := IdentStats.Get(ident)
if count > rt.Pushgw.IdentDropThreshold {
CounterDropSampleTotal.WithLabelValues(ident).Inc()
return
}
@ -156,9 +162,9 @@ func (rt *Router) ForwardByMetric(clientIP string, metric string, v *prompb.Time
return
}
IdentStatsInc(metric)
if rt.DropSample(clientIP, v) {
CounterDropSampleTotal.WithLabelValues(clientIP).Inc()
IdentStats.Increment(metric, 1)
if rt.DropSample(v) {
CounterDropSampleTotal.WithLabelValues(metric).Inc()
return
}

View File

@ -3,33 +3,24 @@ package router
import (
"time"
"github.com/patrickmn/go-cache"
"github.com/ccfos/nightingale/v6/memsto"
)
var IdentStats *cache.Cache
var IdentStats *memsto.IdentCountCacheType
func init() {
IdentStats = cache.New(2*time.Minute, 5*time.Minute)
IdentStats = memsto.NewIdentCountCache()
}
func (rt *Router) ReportIdentStats() (interface{}, bool) {
for {
time.Sleep(60 * time.Second)
m := IdentStats.Items()
IdentStats.Flush()
m := IdentStats.GetsAndFlush()
for k, v := range m {
count := v.Object.(int)
count := v.Count
if count > rt.Pushgw.IdentStatsThreshold {
CounterSampleReceivedByIdent.WithLabelValues(k).Add(float64(count))
}
}
}
}
func IdentStatsInc(name string) {
_, exists := IdentStats.Get(name)
if !exists {
IdentStats.Set(name, 1, cache.DefaultExpiration)
}
IdentStats.Increment(name, 1)
}