diff --git a/go.mod b/go.mod index 1b5f2648..5999a531 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,6 @@ require ( github.com/mailru/easyjson v0.7.7 github.com/mattn/go-isatty v0.0.19 github.com/mojocn/base64Captcha v1.3.6 - github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pelletier/go-toml/v2 v2.0.8 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.16.0 diff --git a/go.sum b/go.sum index 1fc24f0a..62de545d 100644 --- a/go.sum +++ b/go.sum @@ -196,8 +196,6 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mojocn/base64Captcha v1.3.6 h1:gZEKu1nsKpttuIAQgWHO+4Mhhls8cAKyiV2Ew03H+Tw= github.com/mojocn/base64Captcha v1.3.6/go.mod h1:i5CtHvm+oMbj1UzEPXaA8IH/xHFZ3DGY3Wh3dBpZ28E= -github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= -github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml/v2 v2.0.1/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= diff --git a/memsto/drop_ident.go b/memsto/drop_ident.go new file mode 100644 index 00000000..6d84a706 --- /dev/null +++ b/memsto/drop_ident.go @@ -0,0 +1,100 @@ +package memsto + +import ( + "sync" + "time" +) + +type Item struct { + Count int + Ts int64 +} + +type IdentCountCacheType struct { + sync.RWMutex + idents map[string]Item +} + +func NewIdentCountCache() *IdentCountCacheType { + d := &IdentCountCacheType{ + idents: make(map[string]Item), + } + go d.CronDeleteExpired() + return d +} + +// Set ident +func (c *IdentCountCacheType) Set(ident string, count int, ts int64) { + c.Lock() + item := Item{ + Count: count, + Ts: ts, + } + c.idents[ident] = item + c.Unlock() +} + +func (c *IdentCountCacheType) Increment(ident string, num int) { + now := time.Now().Unix() + c.Lock() + if item, exists := c.idents[ident]; exists { + item.Count += num + item.Ts = now + c.idents[ident] = item + } else { + item := Item{ + Count: num, + Ts: now, + } + c.idents[ident] = item + } + c.Unlock() +} + +// check exists ident +func (c *IdentCountCacheType) Exists(ident string) bool { + c.RLock() + _, exists := c.idents[ident] + c.RUnlock() + return exists +} + +func (c *IdentCountCacheType) Get(ident string) int { + c.RLock() + defer c.RUnlock() + item, exists := c.idents[ident] + if !exists { + return 0 + } + return item.Count +} + +func (c *IdentCountCacheType) GetsAndFlush() map[string]Item { + c.Lock() + data := make(map[string]Item) + for k, v := range c.idents { + data[k] = v + } + c.idents = make(map[string]Item) + c.Unlock() + return data +} + +func (c *IdentCountCacheType) CronDeleteExpired() { + for { + time.Sleep(60 * time.Second) + c.deleteExpired() + } +} + +// cron delete expired ident +func (c *IdentCountCacheType) deleteExpired() { + c.Lock() + now := time.Now().Unix() + for ident, item := range c.idents { + if item.Ts < now-120 { + delete(c.idents, ident) + } + } + c.Unlock() +} diff --git a/pushgw/pconf/conf.go b/pushgw/pconf/conf.go index 73d7be06..22c5a304 100644 --- a/pushgw/pconf/conf.go +++ b/pushgw/pconf/conf.go @@ -13,6 +13,7 @@ type Pushgw struct { BusiGroupLabelKey string IdentMetrics []string IdentStatsThreshold int + IdentDropThreshold int WriteConcurrency int LabelRewrite bool ForceUseServerTS bool @@ -79,7 +80,11 @@ func (p *Pushgw) PreCheck() { } if p.IdentStatsThreshold <= 0 { - p.IdentStatsThreshold = 400 + p.IdentStatsThreshold = 1500 + } + + if p.IdentDropThreshold <= 0 { + p.IdentDropThreshold = 20000 } for _, writer := range p.Writers { diff --git a/pushgw/router/fns.go b/pushgw/router/fns.go index cdca77ec..d3bb93c0 100644 --- a/pushgw/router/fns.go +++ b/pushgw/router/fns.go @@ -97,7 +97,7 @@ func (rt *Router) debugSample(remoteAddr string, v *prompb.TimeSeries) { logger.Debugf("--> debug sample from: %s, sample: %s", remoteAddr, v.String()) } -func (rt *Router) DropSample(remoteAddr string, v *prompb.TimeSeries) bool { +func (rt *Router) DropSample(v *prompb.TimeSeries) bool { filters := rt.Pushgw.DropSample if len(filters) == 0 { return false @@ -141,9 +141,15 @@ func (rt *Router) ForwardByIdent(clientIP string, ident string, v *prompb.TimeSe return } - IdentStatsInc(ident) - if rt.DropSample(clientIP, v) { - CounterDropSampleTotal.WithLabelValues(clientIP).Inc() + IdentStats.Increment(ident, 1) + if rt.DropSample(v) { + CounterDropSampleTotal.WithLabelValues(ident).Inc() + return + } + + count := IdentStats.Get(ident) + if count > rt.Pushgw.IdentDropThreshold { + CounterDropSampleTotal.WithLabelValues(ident).Inc() return } @@ -156,9 +162,9 @@ func (rt *Router) ForwardByMetric(clientIP string, metric string, v *prompb.Time return } - IdentStatsInc(metric) - if rt.DropSample(clientIP, v) { - CounterDropSampleTotal.WithLabelValues(clientIP).Inc() + IdentStats.Increment(metric, 1) + if rt.DropSample(v) { + CounterDropSampleTotal.WithLabelValues(metric).Inc() return } diff --git a/pushgw/router/ident_stats.go b/pushgw/router/ident_stats.go index 95ae5e74..f7bf762d 100644 --- a/pushgw/router/ident_stats.go +++ b/pushgw/router/ident_stats.go @@ -3,33 +3,24 @@ package router import ( "time" - "github.com/patrickmn/go-cache" + "github.com/ccfos/nightingale/v6/memsto" ) -var IdentStats *cache.Cache +var IdentStats *memsto.IdentCountCacheType func init() { - IdentStats = cache.New(2*time.Minute, 5*time.Minute) + IdentStats = memsto.NewIdentCountCache() } func (rt *Router) ReportIdentStats() (interface{}, bool) { for { time.Sleep(60 * time.Second) - m := IdentStats.Items() - IdentStats.Flush() + m := IdentStats.GetsAndFlush() for k, v := range m { - count := v.Object.(int) + count := v.Count if count > rt.Pushgw.IdentStatsThreshold { CounterSampleReceivedByIdent.WithLabelValues(k).Add(float64(count)) } } } } - -func IdentStatsInc(name string) { - _, exists := IdentStats.Get(name) - if !exists { - IdentStats.Set(name, 1, cache.DefaultExpiration) - } - IdentStats.Increment(name, 1) -}