agent支持metrics指标采集能力 (#368)

This commit is contained in:
qinyening 2020-10-29 16:54:48 +08:00 committed by GitHub
parent c6b5a5b400
commit 313144bebf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 4775 additions and 21 deletions

View File

@ -7,6 +7,18 @@ enable:
mon: true
job: true
report: true
metrics: true
udp:
enable: true
listen: :788
metrics:
maxProcs: 1
reportIntervalMs: 10
reportTimeoutMs: 2000
reportPacketSize: 100
sendToInfoFile: false
job:
metadir: ./meta

7
go.mod
View File

@ -5,13 +5,12 @@ go 1.12
require (
github.com/Shopify/sarama v1.19.0
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/caio/go-tdigest v3.1.0+incompatible
github.com/cespare/xxhash v1.1.0
github.com/codegangsta/negroni v1.0.0
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/dgryski/go-tsz v0.0.0-20180227144327-03b7d791f4fe
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/garyburd/redigo v1.6.2
github.com/gin-contrib/pprof v1.3.0
github.com/gin-gonic/gin v1.6.3
@ -19,14 +18,12 @@ require (
github.com/go-sql-driver/mysql v1.5.0
github.com/google/go-cmp v0.5.1 // indirect
github.com/google/uuid v1.1.2
github.com/gorilla/context v1.1.1 // indirect
github.com/gorilla/mux v1.6.2
github.com/hashicorp/golang-lru v0.5.1
github.com/hpcloud/tail v1.0.0
github.com/influxdata/influxdb v1.8.0
github.com/mattn/go-isatty v0.0.12
github.com/mattn/go-sqlite3 v1.14.0 // indirect
github.com/onsi/ginkgo v1.7.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect
github.com/open-falcon/rrdlite v0.0.0-20200214140804-bf5829f786ad
github.com/pquerna/cachecontrol v0.0.0-20200819021114-67c6ae64274f // indirect
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect

5
go.sum
View File

@ -51,6 +51,9 @@ github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJm
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/caio/go-tdigest v1.1.3 h1:dwSirEYz3a9cPJox2HCszM6TcE+7keac+spVV7LNWfw=
github.com/caio/go-tdigest v3.1.0+incompatible h1:uoVMJ3Q5lXmVLCCqaMGHLBWnbGoN6Lpu7OAUPR60cds=
github.com/caio/go-tdigest v3.1.0+incompatible/go.mod h1:sHQM/ubZStBUmF1WbB8FAm8q9GjDajLC5T7ydxE3JHI=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
@ -377,8 +380,6 @@ github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/toolkits/pkg v1.1.2 h1:BygBwfbL+kiYBH6Rlrx6hKC3WTvNQCsDDOy8keYFNCM=
github.com/toolkits/pkg v1.1.2/go.mod h1:ge83E8FQqUnFk+2wtVtZ8kvbmoSjE1l8FP3f+qmR0fY=
github.com/toolkits/pkg v1.1.3 h1:cjZMz9hmuTv4v7ivYERA9mWJCLKyr8JMd4S+CL/YzMM=
github.com/toolkits/pkg v1.1.3/go.mod h1:ge83E8FQqUnFk+2wtVtZ8kvbmoSjE1l8FP3f+qmR0fY=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=

View File

@ -14,6 +14,7 @@ import (
"github.com/didi/nightingale/src/modules/agent/http"
"github.com/didi/nightingale/src/modules/agent/log/worker"
"github.com/didi/nightingale/src/modules/agent/report"
"github.com/didi/nightingale/src/modules/agent/statsd"
"github.com/didi/nightingale/src/modules/agent/stra"
"github.com/didi/nightingale/src/modules/agent/sys"
"github.com/didi/nightingale/src/modules/agent/sys/funcs"
@ -21,6 +22,8 @@ import (
"github.com/didi/nightingale/src/modules/agent/sys/ports"
"github.com/didi/nightingale/src/modules/agent/sys/procs"
"github.com/didi/nightingale/src/modules/agent/timer"
"github.com/didi/nightingale/src/modules/agent/udp"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/runner"
@ -59,6 +62,7 @@ func main() {
parseConf()
loggeri.Init(config.Config.Logger)
stats.Init("agent")
if config.Config.Enable.Mon {
monStart()
@ -72,6 +76,16 @@ func main() {
reportStart()
}
if config.Config.Enable.Metrics {
// 初始化 statsd服务
statsd.Start()
// 开启 udp监听 和 udp数据包处理进程
udp.Start()
}
core.InitRpcClients()
http.Start()
endingProc()
@ -94,7 +108,6 @@ func monStart() {
sys.Init(config.Config.Sys)
stra.Init()
core.InitRpcClients()
funcs.BuildMappers()
funcs.Collect()

View File

@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"strings"
"time"
"github.com/spf13/viper"
"github.com/toolkits/pkg/file"
@ -21,12 +22,28 @@ type ConfigT struct {
Enable enableSection `yaml:"enable"`
Job jobSection `yaml:"job"`
Report reportSection `yaml:"report"`
Udp UdpSection `yaml:"udp"`
Metrics MetricsSection `yaml:"metrics"`
}
type UdpSection struct {
Enable bool `yaml:"enable"`
Listen string `yaml:"listen"`
}
type MetricsSection struct {
MaxProcs int `yaml:"maxProcs"`
ReportIntervalMs int `yaml:"reportIntervalMs"`
ReportTimeoutMs int `yaml:"reportTimeoutMs"`
ReportPacketSize int `yaml:"reportPacketSize"`
SendToInfoFile bool `yaml:"sendToInfoFile"`
Interval time.Duration
}
type enableSection struct {
Mon bool `yaml:"mon"`
Job bool `yaml:"job"`
Report bool `yaml:"report"`
Metrics bool `yaml:"metrics"`
}
type reportSection struct {

View File

@ -25,7 +25,7 @@ func Push(metricItems []*dataobj.MetricValue) error {
now := time.Now().Unix()
for _, item := range metricItems {
logger.Debug("->recv: ", item)
logger.Debugf("->recv:%+v", item)
if item.Endpoint == "" {
item.Endpoint = config.Endpoint
}
@ -48,7 +48,7 @@ func Push(metricItems []*dataobj.MetricValue) error {
continue
}
}
logger.Debug("push item: ", item)
logger.Debugf("push item: %+v", item)
items = append(items, item)
}

View File

@ -0,0 +1,178 @@
package statsd
/*
// raw configs
type MetricAgentConfig struct {
Updated int64 `json:"updated"` // 配置生成的时间戳
Version string `json:"version"` // 配置版本
Hostname string `json:"hostname"`
Ip string `json:"ip"`
Aggr map[string]*AggrConfigItem `json:"aggr"` // ns --> x
}
type AggrConfigItem struct {
Ns string `json:"ns"`
Type string `json:"type"`
MetricTagks map[string]*AggrMetricTagks `json:"metric_tagks"`
}
type AggrMetricTagks struct {
Metric string `json:"metric"`
Tagks [][]string `json:"tagks"`
}
func (this MetricAgentConfig) UpdateLoop() {
if sconfig.Config.Cfg.Disable {
logger.Debugf("config update loop disabled")
return
}
for {
nc, err := this.getMetricAgentConfigFromRemote()
if err != nil {
logger.Debugf("get metric agent config error, [error: %s]", err.Error())
} else if nc == nil {
// 机器没有配置metrics本机聚合
} else {
lac, err1 := nc.transToLocalAggrConfig()
if err1 != nil {
logger.Debugf("trans to local aggr config error, [error: %s]", err1.Error())
} else {
localAggrConfig.Update(lac, nc.Version, nc.Updated)
logger.Debugf("localAggrConfig updated at:%d", nc.Updated)
}
}
time.Sleep(time.Duration(sconfig.Config.Cfg.UdpateIntervalMs) * time.Millisecond)
}
}
func (this *MetricAgentConfig) transToLocalAggrConfig() (map[string]*NsAggrConfig, error) {
if len(this.Aggr) == 0 && this.Updated == 0 && this.Version == "" {
return nil, fmt.Errorf("bad aggr configs")
}
ret := make(map[string]*NsAggrConfig, 0)
for _, v := range this.Aggr {
if !(LocalAggrConfig{}.CheckType(v.Type)) {
logger.Debugf("bad aggr config type, [type: %s]", v.Type)
continue
}
// metric_tagks
mtks := make(map[string][][]string, 0)
for _, mtk := range v.MetricTagks {
if mtk == nil || len(mtk.Metric) == 0 || len(mtk.Tagks) == 0 {
continue
}
ttagks := make([][]string, 0)
for i := 0; i < len(mtk.Tagks); i++ {
mtksTagksMap := make(map[string]bool, 0)
for _, tk := range mtk.Tagks[i] {
mtksTagksMap[tk] = true
}
mktsTagsList := make([]string, 0)
for k, _ := range mtksTagksMap {
mktsTagsList = append(mktsTagsList, k)
}
sort.Strings(mktsTagsList)
ttagks = append(ttagks, mktsTagsList)
}
if (Func{}).HasSameSortedArray(ttagks) {
logger.Debugf("bad aggr config tagks, has same tagks: [ns: %s][metric: %s][tagks: %#v]",
v.Ns, mtk.Metric, mtk.Tagks)
logger.Debugf("drop aggr config of metric, [ns: %s][metric: %s]", v.Ns, mtk.Metric)
continue
}
mtks[mtk.Metric] = ttagks
}
if attks, ok := mtks[Const_AllMetrics]; ok && len(attks) > 0 {
for k, v := range mtks {
if k == Const_AllMetrics {
continue
}
mtks[k] = (Func{}).MergeSortedArrays(attks, v)
}
}
// metric_tagks
ret[v.Ns] = &NsAggrConfig{
Ns: v.Ns,
Type: v.Type,
MetricTagks: mtks,
}
}
return ret, nil
}
// local transfered configs
var (
localAggrConfig = &LocalAggrConfig{NsConfig: map[string]*NsAggrConfig{}, Updated: 0, Version: "init"}
)
func (this LocalAggrConfig) GetLocalAggrConfig() *LocalAggrConfig {
return localAggrConfig.Clone()
}
const (
// Type: 三段式 ${指标}:${聚合维度}:${聚合与否}
Const_AggrType_AllAnyNoaggr = "all:any:noaggr"
Const_AggrType_SomeSomeAggr = "some:some:aggr"
// 全部指标
Const_AllMetrics = ".*"
)
var (
// 禁止聚合-常亮
Const_NoAggrConfig = &NsAggrConfig{Ns: ".*", Type: Const_AggrType_AllAnyNoaggr}
)
type LocalAggrConfig struct {
sync.RWMutex
NsConfig map[string]*NsAggrConfig `json:"ns_config"`
Version string `json:"version"`
Updated int64 `json:"updated"`
}
type NsAggrConfig struct {
Ns string `json:"ns"`
Type string `json:"type"`
MetricTagks map[string][][]string `json:"metric_tagks"`
}
func (this *LocalAggrConfig) GetByNs(ns string) (nsAggrConfig *NsAggrConfig, found bool) {
// TODO: daijia产品线自己做了聚合,因此metrics不再聚合
if strings.HasSuffix(ns, ".daijia.n9e.com") {
nsAggrConfig = Const_NoAggrConfig
found = true
return
}
this.RLock()
nsAggrConfig, found = this.NsConfig[ns]
this.RUnlock()
return
}
func (this *LocalAggrConfig) Update(nac map[string]*NsAggrConfig, version string, updated int64) {
this.Lock()
this.NsConfig = nac
this.Version = version
this.Updated = updated
this.Unlock()
}
func (this *LocalAggrConfig) Clone() *LocalAggrConfig {
ret := &LocalAggrConfig{}
this.RLock()
ret.Updated = this.Updated
ret.NsConfig = this.NsConfig
this.RUnlock()
return ret
}
func (this LocalAggrConfig) CheckType(t string) bool {
switch t {
case Const_AggrType_AllAnyNoaggr, Const_AggrType_SomeSomeAggr:
return true
}
return false
}
*/

View File

@ -0,0 +1,171 @@
package statsd
import (
"fmt"
"sort"
"strconv"
)
type counterAggregator struct {
Counter float64
}
func (self *counterAggregator) new(aggregatorNames []string) (aggregator, error) {
if len(aggregatorNames) < 1 || aggregatorNames[0] != "c" {
return nil, BadAggregatorNameError
}
return &counterAggregator{}, nil
}
// counter类型可以接受一个或多个(并包模式下) value, 没有statusCode字段, 不在sdk做并包
// 形如 10{"\u2318"}1{"\u2318"}20
func (self *counterAggregator) collect(values []string, metric string, argLines string) error {
if len(values) < 1 {
return fmt.Errorf("bad values")
}
for i := range values {
delta := float64(0.0)
parsed, err := strconv.ParseFloat(values[i], 64)
if err != nil {
return err
}
delta = parsed
self.Counter += delta
}
return nil
}
func (self *counterAggregator) dump(points []*Point, timestamp int64,
tags map[string]string, metric, argLines string) ([]*Point, error) {
points = append(points, &Point{
Name: metric + ".counter",
Timestamp: timestamp,
Tags: tags,
Value: self.Counter,
})
return points, nil
}
func (self *counterAggregator) summarize(nsmetric, argLines string, newAggrs map[string]aggregator) {
// 准备: ns/metric
//items, _ := Func{}.TranslateMetricLine(nsmetric)
//ns := items[0]
//metric := items[1]
// 黑名单
// 准备: tags
tags, _, err := Func{}.TranslateArgLines(argLines)
if err != nil {
return
}
self.doAggr(tags, newAggrs)
// 本机聚合
return
}
func (self *counterAggregator) merge(toMerge aggregator) (aggregator, error) {
that := toMerge.(*counterAggregator)
self.Counter += that.Counter
return self, nil
}
func (self *counterAggregator) toMap() (map[string]interface{}, error) {
return map[string]interface{}{
"__aggregator__": "counter",
"counter": self.Counter,
}, nil
}
func (self counterAggregator) fromMap(serialized map[string]interface{}) (aggregator, error) {
return &counterAggregator{Counter: serialized["counter"].(float64)}, nil
}
// internals
func (self counterAggregator) addSummarizeAggregator(argLines string, toMerge *counterAggregator, newAggrs map[string]aggregator) {
aggr, ok := newAggrs[argLines]
if !(ok && aggr != nil) {
nAggr, err := toMerge.clone()
if err == nil {
newAggrs[argLines] = nAggr
}
} else {
aggr.merge(toMerge)
}
}
func (self *counterAggregator) clone() (aggregator, error) {
maps, err := self.toMap()
if err != nil {
return nil, err
}
aggr, err := counterAggregator{}.fromMap(maps)
if err != nil {
return nil, err
}
return aggr, nil
}
func (self *counterAggregator) doAggr(tags map[string]string, newAggrs map[string]aggregator, aggrTagksList ...[][]string) {
tagks := make([]string, 0)
for k, _ := range tags {
tagks = append(tagks, k)
}
tagkNum := len(tagks)
if tagkNum == 0 {
return
}
sort.Strings(tagks)
// get formator
formator := ""
for i := 0; i < tagkNum; i++ {
formator += tagks[i] + "=%s\n"
}
formator += "c"
// 聚合所有维度
ntagvs_all := make([]interface{}, tagkNum)
for i := 0; i < tagkNum; i++ {
ntagvs_all[i] = "<all>"
}
summarizedTags := fmt.Sprintf(formator, ntagvs_all...)
counterAggregator{}.addSummarizeAggregator(summarizedTags, self, newAggrs)
// 聚合指定维度
if len(aggrTagksList) > 0 {
for i := 0; i < len(aggrTagksList[0]); i++ {
aggrTagks := aggrTagksList[0][i]
// 判断合法性
if !(len(aggrTagks) > 0 && len(aggrTagks) < tagkNum && // ==tagsNum 会造成 所有维度 的重复聚合
(Func{}).IsSubKeys(aggrTagks, tags)) { // 监控数据 有 指定的聚合维度
continue
}
// 聚合
sometagks := make([]interface{}, tagkNum)
for i, tk := range tagks {
sometagks[i] = tags[tk]
}
for _, tk := range aggrTagks {
for i := 0; i < tagkNum; i++ {
if tk == tagks[i] {
sometagks[i] = "<all>"
break
}
}
}
summarizedTags := fmt.Sprintf(formator, sometagks...)
counterAggregator{}.addSummarizeAggregator(summarizedTags, self, newAggrs)
}
}
}

View File

@ -0,0 +1,267 @@
package statsd
import (
"fmt"
"sort"
"strconv"
)
// maxAggregator
// counter enhance, aggr="ce"
type counterEAggregator struct {
Counter float64
Stats map[int64]float64 // 不需要加锁, 单线程
lastTimestamp int64
delta float64
raw bool // 原始统计(true) or 聚合后的统计(false), bool型初始化是false
}
func (self *counterEAggregator) new(aggregatorNames []string) (aggregator, error) {
if len(aggregatorNames) < 1 || aggregatorNames[0] != "ce" {
return nil, BadAggregatorNameError
}
return &counterEAggregator{
Stats: make(map[int64]float64),
lastTimestamp: GetTimestamp(),
delta: 0,
raw: true,
}, nil
}
// counterE类型可以接受一个或多个(并包模式下) value, 没有statusCode字段, 不在sdk做并包
// 形如 10{"\u2318"}1{"\u2318"}20
func (self *counterEAggregator) collect(values []string, metric string, argLines string) error {
if len(values) < 1 {
return fmt.Errorf("bad values")
}
ts := GetTimestamp()
for i := range values {
delta := float64(0.0)
parsed, err := strconv.ParseFloat(values[i], 64)
if nil != err {
return err
}
delta = parsed
self.Counter += delta
if ts > self.lastTimestamp {
self.Stats[self.lastTimestamp] = self.delta
self.delta = delta
self.lastTimestamp = ts
} else {
self.delta += delta
}
}
return nil
}
func (self *counterEAggregator) dump(points []*Point, timestamp int64,
tags map[string]string, metric, argLines string) ([]*Point, error) {
points = append(points, &Point{
Name: metric + ".counter",
Timestamp: timestamp,
Tags: tags,
Value: self.Counter,
})
// 原始统计出max/min值,聚合的结果不出
if self.raw {
max := float64(0.0)
min := float64(0.0)
sum := float64(0.0)
cnt := len(self.Stats)
if cnt > 0 {
flag := true
for _, value := range self.Stats {
sum += value
if flag {
max = value
min = value
flag = false
continue
}
if value > max {
max = value
}
if value < min {
min = value
}
}
} else {
cnt = 1
}
points = append(points, &Point{
Name: metric + ".counter.max",
Timestamp: timestamp,
Tags: tags,
Value: max,
})
points = append(points, &Point{
Name: metric + ".counter.min",
Timestamp: timestamp,
Tags: tags,
Value: min,
})
points = append(points, &Point{
Name: metric + ".counter.avg",
Timestamp: timestamp,
Tags: tags,
Value: sum / float64(cnt),
})
}
return points, nil
}
func (self *counterEAggregator) summarize(nsmetric, argLines string, newAggrs map[string]aggregator) {
// 准备: ns/metric
//items, _ := Func{}.TranslateMetricLine(nsmetric)
//ns := items[0]
//metric := items[1]
// 黑名单
// 准备: tags
tags, _, err := Func{}.TranslateArgLines(argLines)
if err != nil {
return
}
// 未统计的delta补齐到stats中
if self.raw && self.delta > 0 {
self.Stats[self.lastTimestamp] = self.delta
}
// 只做默认聚合
self.doAggr(tags, newAggrs)
// 本机聚合
return
}
func (self *counterEAggregator) merge(toMerge aggregator) (aggregator, error) {
that := toMerge.(*counterEAggregator)
self.Counter += that.Counter
for ts, value := range that.Stats {
if _, found := self.Stats[ts]; found {
self.Stats[ts] += value
} else {
self.Stats[ts] = value
}
}
return self, nil
}
func (self *counterEAggregator) toMap() (map[string]interface{}, error) {
stats := map[int64]interface{}{}
for k, v := range self.Stats {
stats[k] = v
}
return map[string]interface{}{
"__aggregator__": "counterE",
"counter": self.Counter,
"stats": stats,
}, nil
}
func (self counterEAggregator) fromMap(serialized map[string]interface{}) (aggregator, error) {
// raw字段默认是false
aggregator := &counterEAggregator{Counter: serialized["counter"].(float64), Stats: map[int64]float64{}}
stats := (serialized["stats"]).(map[int64]interface{})
for k, v := range stats {
aggregator.Stats[k] = v.(float64)
}
return aggregator, nil
}
// internals
func (self counterEAggregator) addSummarizeAggregator(argLines string, toMerge *counterEAggregator, newAggrs map[string]aggregator) {
aggr, ok := newAggrs[argLines]
if !(ok && aggr != nil) {
nAggr, err := toMerge.clone()
if err == nil {
newAggrs[argLines] = nAggr
}
} else {
aggr.merge(toMerge)
}
}
func (self *counterEAggregator) clone() (aggregator, error) {
maps, err := self.toMap()
if err != nil {
return nil, err
}
aggr, err := counterEAggregator{}.fromMap(maps)
if err != nil {
return nil, err
}
return aggr, nil
}
func (self *counterEAggregator) doAggr(tags map[string]string, newAggrs map[string]aggregator, aggrTagksList ...[][]string) {
tagks := make([]string, 0)
for k, _ := range tags {
tagks = append(tagks, k)
}
tagkNum := len(tagks)
if tagkNum == 0 {
return
}
sort.Strings(tagks)
// get formator
formator := ""
for i := 0; i < tagkNum; i++ {
formator += tagks[i] + "=%s\n"
}
formator += "ce"
// 聚合所有维度
ntagvs_all := make([]interface{}, tagkNum)
for i := 0; i < tagkNum; i++ {
ntagvs_all[i] = "<all>"
}
summarizedTags := fmt.Sprintf(formator, ntagvs_all...)
counterEAggregator{}.addSummarizeAggregator(summarizedTags, self, newAggrs)
// 聚合指定维度
if len(aggrTagksList) > 0 {
for i := 0; i < len(aggrTagksList[0]); i++ {
aggrTagks := aggrTagksList[0][i]
// 判断合法性
if !(len(aggrTagks) > 0 && len(aggrTagks) < tagkNum && // ==tagsNum 会造成 所有维度 的重复聚合
(Func{}).IsSubKeys(aggrTagks, tags)) { // 监控数据 有 指定的聚合维度
continue
}
// 聚合
sometagks := make([]interface{}, tagkNum)
for i, tk := range tagks {
sometagks[i] = tags[tk]
}
for _, tk := range aggrTagks {
for i := 0; i < tagkNum; i++ {
if tk == tagks[i] {
sometagks[i] = "<all>"
break
}
}
}
summarizedTags := fmt.Sprintf(formator, sometagks...)
counterEAggregator{}.addSummarizeAggregator(summarizedTags, self, newAggrs)
}
}
}

View File

@ -0,0 +1,69 @@
package statsd
import (
"fmt"
"strconv"
)
type gaugeAggregator struct {
Gauge float64
}
func (self *gaugeAggregator) new(aggregatorNames []string) (aggregator, error) {
if len(aggregatorNames) < 1 || aggregatorNames[0] != "g" {
return nil, BadAggregatorNameError
}
return &gaugeAggregator{}, nil
}
// gauge类型可以接受一个或多个(并包模式下) value, 没有statusCode字段, 不在sdk做并包
// 形如 10{"\u2318"}1{"\u2318"}20
func (self *gaugeAggregator) collect(values []string, metric string, argLines string) error {
if len(values) < 1 {
return fmt.Errorf("bad values")
}
for i := range values {
delta := float64(0.0)
parsed, err := strconv.ParseFloat(values[i], 64)
if err != nil {
return err
}
delta = parsed
self.Gauge = delta
}
return nil
}
func (self *gaugeAggregator) dump(points []*Point, timestamp int64,
tags map[string]string, metric, argLines string) ([]*Point, error) {
points = append(points, &Point{
Name: metric + ".gauge",
Timestamp: timestamp,
Tags: tags,
Value: self.Gauge,
})
return points, nil
}
// 不支持聚合功能
func (self *gaugeAggregator) summarize(nsmetric, argLines string, newAggrs map[string]aggregator) {
return
}
func (self *gaugeAggregator) merge(toMerge aggregator) (aggregator, error) {
return self, nil
}
func (self *gaugeAggregator) toMap() (map[string]interface{}, error) {
return map[string]interface{}{
"__aggregator__": "gauge",
"gauge": self.Gauge,
}, nil
}
func (self gaugeAggregator) fromMap(serialized map[string]interface{}) (aggregator, error) {
return &gaugeAggregator{Gauge: serialized["gauge"].(float64)}, nil
}

View File

@ -0,0 +1,187 @@
package statsd
import (
"bytes"
"encoding/base64"
"fmt"
"strconv"
tdigest "github.com/didi/nightingale/src/toolkits/go-tdigest"
)
type histogramAggregator struct {
AggregatorNames []string
digest *tdigest.TDigest
max float64
min float64
sum float64
cnt int
}
func (self *histogramAggregator) new(aggregatorNames []string) (aggregator, error) {
if len(aggregatorNames) < 1 {
return nil, BadAggregatorNameError
}
ni := self.newInstence(aggregatorNames)
return &ni, nil
}
// histogram类型可以接受一个或多个(并包模式下) value, 没有statusCode字段
// 形如 10.1{"\u2318"}10.2{"\u2318"}20.8
func (self *histogramAggregator) collect(values []string, metric string, argLines string) error {
if len(values) < 1 {
return fmt.Errorf("bad values")
}
for i := range values {
parsed, err := strconv.ParseFloat(values[i], 64)
if nil != err {
return err
}
self.sum += parsed
self.cnt += 1
if self.max < parsed {
self.max = parsed
}
if self.min > parsed {
self.min = parsed
}
err = self.digest.Add(parsed, 1)
return err
}
return nil
}
func (self *histogramAggregator) dump(points []*Point, timestamp int64,
tags map[string]string, metric, argLines string) ([]*Point, error) {
for _, aggregatorName := range self.AggregatorNames {
value := 0.0
percentile := ""
switch aggregatorName {
case "p99":
value = self.digest.Quantile(0.99)
case "p95":
value = self.digest.Quantile(0.95)
case "p90":
value = self.digest.Quantile(0.90)
case "p75":
value = self.digest.Quantile(0.75)
case "p50":
value = self.digest.Quantile(0.5)
case "p25":
value = self.digest.Quantile(0.25)
case "p10":
value = self.digest.Quantile(0.10)
case "p5":
value = self.digest.Quantile(0.05)
case "p1":
value = self.digest.Quantile(0.01)
case "max":
value = self.max
percentile = "max"
case "min":
value = self.min
percentile = "min"
case "sum":
value = self.sum
percentile = "sum"
case "cnt":
value = float64(self.cnt)
percentile = "cnt"
case "avg":
if self.cnt > 0 {
value = self.sum / float64(self.cnt)
}
percentile = "avg"
default:
continue
}
// TODO: 为什么不支持负数的统计? 先保持现状吧, 否则可能会影响rpc的latency指标
if value < 0 {
value = 0
}
myTags := map[string]string{}
for k, v := range tags {
myTags[k] = v
}
if percentile == "" {
myTags["percentile"] = aggregatorName[1:]
} else {
myTags["percentile"] = percentile
}
points = append(points, &Point{
Name: metric,
Timestamp: timestamp,
Tags: myTags,
Value: value,
})
}
return points, nil
}
// 该统计不提供聚合功能, 因此下面的函数 不对 max/min/sum/cnt做处理
func (self *histogramAggregator) summarize(nsmetric, argLines string, newAggrs map[string]aggregator) {
return
}
// aggr_rpc结构体聚合时使用
func (self *histogramAggregator) merge(toMerge aggregator) (aggregator, error) {
that, ok := toMerge.(*histogramAggregator)
if !ok {
return nil, BadSummarizeAggregatorError
}
self.digest.Merge(that.digest)
return self, nil
}
func (self *histogramAggregator) toMap() (map[string]interface{}, error) {
digest, err := self.digest.AsBytes()
if nil != err {
return nil, err
}
aggregatorNames := make([]interface{}, 0)
for _, aggregatorName := range self.AggregatorNames {
aggregatorNames = append(aggregatorNames, aggregatorName)
}
return map[string]interface{}{
"__aggregator__": "histogram",
"aggregatorNames": aggregatorNames,
"digest": base64.StdEncoding.EncodeToString(digest),
}, nil
}
func (self *histogramAggregator) fromMap(serialized map[string]interface{}) (aggregator, error) {
b, err := base64.StdEncoding.DecodeString(serialized["digest"].(string))
if nil != err {
return nil, fmt.Errorf("failed to deserialize: %v", serialized)
}
digest, err := tdigest.FromBytes(bytes.NewReader(b))
if nil != err {
return nil, fmt.Errorf("failed to deserialize: %v", serialized)
}
aggregator := &histogramAggregator{AggregatorNames: make([]string, 0), digest: digest}
aggregatorNames := (serialized["aggregatorNames"]).([]interface{})
for _, aggregatorName := range aggregatorNames {
aggregator.AggregatorNames = append(aggregator.AggregatorNames, aggregatorName.(string))
}
return aggregator, nil
}
// internal functions
func (self histogramAggregator) newInstence(aggregatorNames []string) histogramAggregator {
return histogramAggregator{
AggregatorNames: aggregatorNames,
digest: tdigest.New(100),
max: float64(0.0),
min: float64(0.0),
sum: float64(0.0),
cnt: int(0),
}
}

View File

@ -0,0 +1,12 @@
package statsd
// interface aggregator
type aggregator interface {
new(aggregatorNames []string) (aggregator, error)
collect(values []string, metric string, argLines string) error
dump(points []*Point, timestamp int64, tags map[string]string, metric string, argLines string) ([]*Point, error)
summarize(nsmetric, argLines string, newAggrs map[string]aggregator)
merge(toMerge aggregator) (aggregator, error)
toMap() (map[string]interface{}, error)
fromMap(map[string]interface{}) (aggregator, error)
}

View File

@ -0,0 +1,200 @@
package statsd
import (
"fmt"
"strconv"
"strings"
)
type ratioAggregator struct {
Counters map[string]float64
}
func (self *ratioAggregator) new(aggregatorNames []string) (aggregator, error) {
if len(aggregatorNames) < 1 || aggregatorNames[0] != "r" {
return nil, BadAggregatorNameError
}
return &ratioAggregator{Counters: map[string]float64{}}, nil
}
// ratio类型可以接受一个或多个(并包模式下) value, 有statusCode字段
// 旧版协议 形如: ok{"\u2318"}error{"\u2318"}ok
// 新版协议 形如: 1,ok{"\u2318"}1,error{"\u2318"}0,ok
func (self *ratioAggregator) collect(values []string, metric string, argLines string) error {
if len(values) < 1 {
return fmt.Errorf("bad values")
}
for i := range values {
/*
旧版协议: "error" 计数为 1, 形如"error,none", code取值为error(此处是values[0], none被截断)
新版协议: "2,error" 计数为 2, 形如"2,error,none", code取值为error(此处是values[1], none被截断)
为了兼容旧版
1.只上报"error", 不包含","(逗号) 直接计数为1
2.包含","(逗号), 且values[0]无法解析为数字, 计数为1, code取值values[0]
3.包含","(逗号)且原来通过旧版协议上报了"2,error", 直接按新版处理, code从2变为error
*/
cvalues := strings.Split(values[i], CodeDelimiter)
if len(cvalues) == 0 {
continue
}
if len(cvalues) == 1 {
code := values[0]
self.Counters[code] += 1
continue
}
code := cvalues[1]
value, err := strconv.ParseFloat(cvalues[0], 64)
if err != nil {
value = float64(1) // 兼容旧版协议, 形如"error,something", 按照 1,error 处理
code = values[0]
}
self.Counters[code] += value
}
return nil
}
func (self *ratioAggregator) dump(points []*Point, timestamp int64,
tags map[string]string, metric, argLines string) ([]*Point, error) {
return self._dump(false, points, timestamp, tags, metric, argLines)
}
func (self *ratioAggregator) summarize(nsmetric, argLines string, newAggrs map[string]aggregator) {
return
}
func (self *ratioAggregator) merge(toMerge aggregator) (aggregator, error) {
that := toMerge.(*ratioAggregator)
for k, v2 := range that.Counters {
_, found := self.Counters[k]
if found {
self.Counters[k] += v2
} else {
self.Counters[k] = v2
}
}
return self, nil
}
func (self *ratioAggregator) toMap() (map[string]interface{}, error) {
counters := map[string]float64{}
for k, v := range self.Counters {
counters[k] = v
}
return map[string]interface{}{
"__aggregator__": "ratio",
"counters": counters,
}, nil
}
func (self *ratioAggregator) fromMap(serialized map[string]interface{}) (aggregator, error) {
aggr := &ratioAggregator{Counters: map[string]float64{}}
counters := (serialized["counters"]).(map[string]interface{})
for k, v := range counters {
aggr.Counters[k] = v.(float64)
}
return aggr, nil
}
func (self *ratioAggregator) _dump(
asTags bool, points []*Point, timestamp int64, tags map[string]string,
metric string, argLines string) ([]*Point, error) {
// 没有统计,则不dump
if len(self.Counters) == 0 {
return points, nil
}
convertedCounters := map[string]float64{}
total := float64(0)
for code, byCodeCount := range self.Counters {
counter := byCodeCount
convertedCounters[code] = counter
total += counter
}
if total > 0 {
for code := range self.Counters {
myMetric := metric
myTags := tags
if asTags {
myTags = map[string]string{}
for tagk, tagv := range tags {
myTags[tagk] = tagv
}
myTags["code"] = code
myMetric = metric + ".ratio"
} else {
myMetric = metric + "." + code + ".ratio"
}
points = append(points, &Point{
Name: myMetric,
Timestamp: timestamp,
Tags: myTags,
Value: convertedCounters[code] / total * 100,
})
}
}
points = append(points, &Point{
Name: metric + ".counter",
Timestamp: timestamp,
Tags: tags,
Value: total,
})
return points, nil
}
////////////////////////////////////////////////////////////
// struct ratioAsTagsAggregator
////////////////////////////////////////////////////////////
type ratioAsTagsAggregator struct {
ratioAggregator
}
func (self *ratioAsTagsAggregator) new(aggregatorNames []string) (aggregator, error) {
if len(aggregatorNames) < 1 || aggregatorNames[0] != "rt" {
return nil, BadAggregatorNameError
}
return &ratioAsTagsAggregator{ratioAggregator: ratioAggregator{Counters: map[string]float64{}}}, nil
}
func (self *ratioAsTagsAggregator) dump(points []*Point, timestamp int64,
tags map[string]string, metric, argLines string) ([]*Point, error) {
return self._dump(true, points, timestamp, tags, metric, argLines)
}
func (self *ratioAsTagsAggregator) merge(toMerge aggregator) (aggregator, error) {
that := toMerge.(*ratioAsTagsAggregator)
merged, err := self.ratioAggregator.merge(&that.ratioAggregator)
if err != nil {
return self, err
}
self.ratioAggregator = *(merged.(*ratioAggregator))
return self, nil
}
func (self *ratioAsTagsAggregator) toMap() (map[string]interface{}, error) {
counters := map[string]float64{}
for k, v := range self.Counters {
counters[k] = v
}
return map[string]interface{}{
"__aggregator__": "ratioAsTags",
"counters": counters,
}, nil
}
func (self *ratioAsTagsAggregator) fromMap(serialized map[string]interface{}) (aggregator, error) {
aggr, err := self.ratioAggregator.fromMap(serialized)
if err != nil {
return nil, err
}
raggr := aggr.(*ratioAggregator)
return &ratioAsTagsAggregator{ratioAggregator: *raggr}, nil
}

View File

@ -0,0 +1,441 @@
package statsd
import (
"fmt"
"sort"
"strconv"
"strings"
)
type rpcAggregator struct {
histogramAggregator
Counters map[string]float64
Latencys map[string]float64
}
func (self *rpcAggregator) new(aggregatorNames []string) (aggregator, error) {
if len(aggregatorNames) < 1 || aggregatorNames[0] != "rpc" {
return nil, BadAggregatorNameError
}
histogramAggregatorNames := []string{"p99", "p95", "p75", "p50"}
return &rpcAggregator{
histogramAggregator: histogramAggregator{}.newInstence(histogramAggregatorNames),
Counters: map[string]float64{},
Latencys: map[string]float64{},
}, nil
}
// ratio类型可以接受一个或多个(并包模式下) value, 有statusCode字段
// 形如 10.1,ok{"\u2318"}10.2,error{"\u2318"}20.8,ok
func (self *rpcAggregator) collect(values []string, metric string, argLines string) error {
if len(values) < 1 {
return fmt.Errorf("bad values")
}
for i := range values {
cvalues := strings.Split(values[i], CodeDelimiter)
if len(cvalues) < 2 {
// bad values
continue
}
err := self.histogramAggregator.collect(cvalues[:1], metric, argLines)
if err != nil {
return err
}
latency, err := strconv.ParseFloat(cvalues[0], 64)
if err != nil {
return err
}
code := cvalues[1]
self.Counters[code] += 1
self.Latencys[code] += latency
}
return nil
}
// @input
// metric: $metric_name(不包含ns)
func (self *rpcAggregator) dump(points []*Point, timestamp int64,
tags map[string]string, metric, argLines string) ([]*Point, error) {
var (
err error
)
// 无数据,则不dump点
if len(self.Counters) == 0 {
return points, nil
}
// 验证tag信息: 必须存在callee caller
if _, ok := tags["caller"]; !ok {
return points, nil
}
callee, ok := tags["callee"]
if !ok {
return points, nil
}
tags["callee"] = Func{}.TrimRpcCallee(callee) // 修改callee字段
// 带tag的rpc统计, 指标名称调整为 by_tags.$metric
//if len(tags) > 2 {
// metric = fmt.Sprintf("by_tags.%s", metric)
//}
totalCount := float64(0)
totalErrorCount := float64(0)
for code, count := range self.Counters {
if !(Func{}.IsOk(code)) {
myTags := map[string]string{}
for k, v := range tags {
myTags[k] = v
}
myTags["code"] = code
points = append(points, &Point{
Name: metric + ".error.counter",
Timestamp: timestamp,
Tags: myTags,
Value: count,
})
totalErrorCount += count
}
totalCount += count
}
points = append(points, &Point{
Name: metric + ".counter",
Timestamp: timestamp,
Tags: tags,
Value: totalCount,
})
if totalCount > 0 {
points = append(points, &Point{
Name: metric + ".error.ratio",
Timestamp: timestamp,
Tags: tags,
Value: totalErrorCount / totalCount * 100,
})
myTags := map[string]string{}
for k, v := range tags {
myTags[k] = v
}
myTags["code"] = "<all>"
points = append(points, &Point{
Name: metric + ".error.counter",
Timestamp: timestamp,
Tags: myTags,
Value: totalErrorCount,
})
}
// latency
latencyMetric := fmt.Sprintf("%s.latency", metric)
{ // avg
totalLatency := float64(0)
for _, latency := range self.Latencys {
totalLatency += latency
}
avgLatency := float64(0)
if totalCount > 0 && totalLatency > 0 {
avgLatency = totalLatency / totalCount
}
myTags := map[string]string{}
for k, v := range tags {
myTags[k] = v
}
myTags["percentile"] = "avg"
points = append(points, &Point{
Name: latencyMetric,
Timestamp: timestamp,
Tags: myTags,
Value: avgLatency,
})
}
points, err = self.histogramAggregator.dump(points, timestamp, tags, latencyMetric, argLines) // percentile
return points, err
}
func (self *rpcAggregator) summarize(nsmetric, argLines string, newAggrs map[string]aggregator) {
items, _ := Func{}.TranslateMetricLine(nsmetric)
//ns := items[0]
metric := items[1]
tags, _, err := Func{}.TranslateArgLines(argLines)
if err != nil {
return
}
// rpc_dirpc_call & rpc_dirpc_called
if metric == MetricToBeSummarized_DirpcCallConst || metric == MetricToBeSummarized_DirpcCalledConst {
if len(tags) != 5 {
return
}
callee, _ := tags["callee"]
calleef, _ := tags["callee-func"]
caller, _ := tags["caller"]
callerf, _ := tags["caller-func"]
su, _ := tags["su"]
if !(caller != "" && callerf != "" && callee != "" && calleef != "" && su != "") {
return
}
formator := "callee=%s\ncallee-func=%s\ncaller=%s\ncaller-func=%s\nsu=%s\nrpc"
if calleef != "<all>" {
summarizedCalleef := fmt.Sprintf(formator, callee, "<all>", caller, callerf, su)
rpcAggregator{}.addSummarizeAggregator(summarizedCalleef, self, newAggrs)
}
if callerf != "<all>" {
summarizedCallerf := fmt.Sprintf(formator, callee, calleef, caller, "<all>", su)
rpcAggregator{}.addSummarizeAggregator(summarizedCallerf, self, newAggrs)
}
if calleef != "<all>" && callerf != "<all>" {
summarizedCalleefCallerf := fmt.Sprintf(formator, callee, "<all>", caller, "<all>", su)
rpcAggregator{}.addSummarizeAggregator(summarizedCalleefCallerf, self, newAggrs)
}
return
}
// rpcdisf
if metric == MetricToBeSummarized_RpcdisfConst {
if len(tags) != 7 {
return
}
callee, _ := tags["callee"]
calleec, _ := tags["callee-cluster"]
calleef, _ := tags["callee-func"]
caller, _ := tags["caller"]
callerc, _ := tags["caller-cluster"]
callerf, _ := tags["caller-func"]
su, _ := tags["su"]
if !(caller != "" && callerc != "" && callerf != "" &&
callee != "" && calleec != "" && calleef != "" && su != "") {
return
}
formator := "callee=%s\ncallee-cluster=%s\ncallee-func=%s\ncaller=%s\ncaller-cluster=%s\ncaller-func=%s\nsu=%s\nrpc"
if calleef != "<all>" {
summarizedCalleef := fmt.Sprintf(formator, callee, calleec, "<all>", caller, callerc, callerf, su)
rpcAggregator{}.addSummarizeAggregator(summarizedCalleef, self, newAggrs)
}
if callerf != "<all>" {
summarizedCallerf := fmt.Sprintf(formator, callee, calleec, calleef, caller, callerc, "<all>", su)
rpcAggregator{}.addSummarizeAggregator(summarizedCallerf, self, newAggrs)
}
summarizedCalleefCallerf := fmt.Sprintf(formator, callee, calleec, "<all>", caller, callerc, "<all>", su)
rpcAggregator{}.addSummarizeAggregator(summarizedCalleefCallerf, self, newAggrs)
return
}
// rpcdfe
if metric == MetricToBeSummarized_RpcdfeConst {
if len(tags) != 5 {
return
}
callee, _ := tags["callee"]
caller, _ := tags["caller"]
domain, _ := tags["domain"]
scheme, _ := tags["scheme"]
upstream, _ := tags["upstream"]
if !(callee != "" && caller != "" && domain != "" &&
scheme != "" && upstream != "") {
return
}
formator := "callee=%s\ncaller=%s\ndomain=%s\nscheme=%s\nupstream=%s\nrpc"
if domain != "<all>" {
summarizedDomain := fmt.Sprintf(formator, callee, caller, "<all>", scheme, upstream)
rpcAggregator{}.addSummarizeAggregator(summarizedDomain, self, newAggrs)
}
if scheme != "<all>" {
summarizedScheme := fmt.Sprintf(formator, callee, caller, domain, "<all>", upstream)
rpcAggregator{}.addSummarizeAggregator(summarizedScheme, self, newAggrs)
}
if upstream != "<all>" {
summarizedUpstream := fmt.Sprintf(formator, callee, caller, domain, scheme, "<all>")
rpcAggregator{}.addSummarizeAggregator(summarizedUpstream, self, newAggrs)
}
summarizedDomainSchemeUp := fmt.Sprintf(formator, callee, caller, "<all>", "<all>", "<all>")
rpcAggregator{}.addSummarizeAggregator(summarizedDomainSchemeUp, self, newAggrs)
return
}
// 黑名单
// 只做默认聚合
self.doAggr(tags, newAggrs)
// 本机聚合
return
}
func (self *rpcAggregator) merge(toMerge aggregator) (aggregator, error) {
that, ok := toMerge.(*rpcAggregator)
if !ok {
return nil, BadSummarizeAggregatorError
}
_, err := self.histogramAggregator.merge(&that.histogramAggregator)
if err != nil {
return nil, err
}
for k, v2 := range that.Counters {
_, found := self.Counters[k]
if found {
self.Counters[k] += v2
} else {
self.Counters[k] = v2
}
}
for k, v2 := range that.Latencys {
_, found := self.Latencys[k]
if found {
self.Latencys[k] += v2
} else {
self.Latencys[k] = v2
}
}
return self, nil
}
func (self *rpcAggregator) toMap() (map[string]interface{}, error) {
counters := map[string]interface{}{}
for k, v := range self.Counters {
counters[k] = v
}
latencys := map[string]interface{}{}
for k, v := range self.Latencys {
latencys[k] = v
}
hm, err := self.histogramAggregator.toMap()
if err != nil {
return nil, err
}
return map[string]interface{}{
"__aggregator__": "rpc",
"counters": counters,
"latencys": latencys,
"histogram": hm,
}, nil
}
func (self rpcAggregator) fromMap(serialized map[string]interface{}) (aggregator, error) {
aggregator := &rpcAggregator{Counters: map[string]float64{}, Latencys: map[string]float64{}}
counters := (serialized["counters"]).(map[string]interface{})
for k, v := range counters {
aggregator.Counters[k] = v.(float64)
}
latencys := (serialized["latencys"]).(map[string]interface{})
for k, v := range latencys {
aggregator.Latencys[k] = v.(float64)
}
histogram := (serialized["histogram"]).(map[string]interface{})
hm, err := self.histogramAggregator.fromMap(histogram)
if err != nil {
return nil, err
}
hmaggr, ok := hm.(*histogramAggregator)
if !ok {
return nil, BadDeserializeError
}
aggregator.histogramAggregator = *hmaggr
return aggregator, nil
}
// internal functions
func (self rpcAggregator) addSummarizeAggregator(argLines string, toMerge *rpcAggregator, newAggrs map[string]aggregator) {
aggr, ok := newAggrs[argLines]
if !(ok && aggr != nil) {
nAggr, err := toMerge.clone()
if err == nil {
newAggrs[argLines] = nAggr
}
} else {
aggr.merge(toMerge)
}
}
func (self *rpcAggregator) clone() (aggregator, error) {
maps, err := self.toMap()
if err != nil {
return nil, err
}
aggr, err := rpcAggregator{}.fromMap(maps)
if err != nil {
return nil, err
}
return aggr, nil
}
func (self *rpcAggregator) doAggr(tags map[string]string, newAggrs map[string]aggregator, aggrTagksList ...[][]string) {
tagks := make([]string, 0)
for k, _ := range tags {
tagks = append(tagks, k)
}
tagkNum := len(tagks)
if tagkNum == 0 {
return
}
sort.Strings(tagks)
// get formator
formator := ""
for i := 0; i < tagkNum; i++ {
formator += tagks[i] + "=%s\n"
}
formator += "rpc"
// 聚合所有维度
ntagvs_all := make([]interface{}, tagkNum)
for i := 0; i < tagkNum; i++ {
ntagvs_all[i] = "<all>"
}
summarizedTags := fmt.Sprintf(formator, ntagvs_all...)
rpcAggregator{}.addSummarizeAggregator(summarizedTags, self, newAggrs)
// 聚合指定维度
if len(aggrTagksList) > 0 {
for i := 0; i < len(aggrTagksList[0]); i++ {
aggrTagks := aggrTagksList[0][i]
// 判断合法性
if !(len(aggrTagks) > 0 && len(aggrTagks) < tagkNum && // ==tagsNum 会造成 所有维度 的重复聚合
(Func{}).IsSubKeys(aggrTagks, tags)) { // 监控数据 有 指定的聚合维度
continue
}
// 聚合
sometagks := make([]interface{}, tagkNum)
for i, tk := range tagks {
sometagks[i] = tags[tk]
}
for _, tk := range aggrTagks {
for i := 0; i < tagkNum; i++ {
if tk == tagks[i] {
sometagks[i] = "<all>"
break
}
}
}
summarizedTags := fmt.Sprintf(formator, sometagks...)
rpcAggregator{}.addSummarizeAggregator(summarizedTags, self, newAggrs)
}
}
}

View File

@ -0,0 +1,470 @@
package statsd
import (
"fmt"
"sort"
"strconv"
"strings"
)
type rpcEAggregator struct {
histogramAggregator
Counters map[string]float64
Latencys map[string]float64
}
func (self *rpcEAggregator) new(aggregatorNames []string) (aggregator, error) {
if len(aggregatorNames) < 1 || aggregatorNames[0] != "rpce" {
return nil, BadAggregatorNameError
}
histogramAggregatorNames := []string{"p99", "p95", "p75", "p50"}
return &rpcEAggregator{
histogramAggregator: histogramAggregator{}.newInstence(histogramAggregatorNames),
Counters: map[string]float64{},
Latencys: map[string]float64{},
}, nil
}
func (self *rpcEAggregator) collect(values []string, metric string, argLines string) error {
if len(values) < 1 {
return fmt.Errorf("bad values")
}
for i := range values {
cvalues := strings.Split(values[i], CodeDelimiter)
if len(cvalues) < 2 {
// bad values
continue
}
err := self.histogramAggregator.collect(cvalues[:1], metric, argLines)
if err != nil {
return err
}
latency, err := strconv.ParseFloat(cvalues[0], 64)
if err != nil {
return err
}
code := cvalues[1]
self.Counters[code] += 1
self.Latencys[code] += latency
}
return nil
}
// @input
// metric: $metric_name(不包含ns)
func (self *rpcEAggregator) dump(points []*Point, timestamp int64,
tags map[string]string, metric, argLines string) ([]*Point, error) {
var (
err error
)
// 无数据,则不dump点
if len(self.Counters) == 0 {
return points, nil
}
// 验证tag信息: 必须存在callee caller
if _, ok := tags["caller"]; !ok {
return points, nil
}
callee, ok := tags["callee"]
if !ok {
return points, nil
}
tags["callee"] = Func{}.TrimRpcCallee(callee) // 修改callee字段
// 带tag的rpc统计, 指标名称调整为 by_tags.$metric
//if len(tags) > 2 {
// metric = fmt.Sprintf("by_tags.%s", metric)
//}
totalCount := float64(0)
totalErrorCount := float64(0)
for code, count := range self.Counters {
if !(Func{}.IsOk(code)) {
myTags := map[string]string{}
for k, v := range tags {
myTags[k] = v
}
myTags["code"] = code
points = append(points, &Point{
Name: metric + ".error.counter",
Timestamp: timestamp,
Tags: myTags,
Value: count,
})
totalErrorCount += count
}
totalCount += count
}
points = append(points, &Point{
Name: metric + ".counter",
Timestamp: timestamp,
Tags: tags,
Value: totalCount,
})
if totalCount > 0 {
for code, count := range self.Counters {
myTags := map[string]string{}
for k, v := range tags {
myTags[k] = v
}
myTags["code"] = code
points = append(points, &Point{
Name: metric + ".code.ratio",
Timestamp: timestamp,
Tags: myTags,
Value: count / totalCount * 100,
})
}
points = append(points, &Point{
Name: metric + ".error.ratio",
Timestamp: timestamp,
Tags: tags,
Value: totalErrorCount / totalCount * 100,
})
myTags := map[string]string{}
for k, v := range tags {
myTags[k] = v
}
myTags["code"] = "<all>"
points = append(points, &Point{
Name: metric + ".error.counter",
Timestamp: timestamp,
Tags: myTags,
Value: totalErrorCount,
})
}
// latency
latencyMetric := fmt.Sprintf("%s.latency", metric)
{ // avg
totalLatency := float64(0)
for _, latency := range self.Latencys {
totalLatency += latency
}
avgLatency := float64(0)
if totalCount > 0 && totalLatency > 0 {
avgLatency = totalLatency / totalCount
}
myTags := map[string]string{}
for k, v := range tags {
myTags[k] = v
}
myTags["percentile"] = "avg"
points = append(points, &Point{
Name: latencyMetric,
Timestamp: timestamp,
Tags: myTags,
Value: avgLatency,
})
}
points, err = self.histogramAggregator.dump(points, timestamp, tags, latencyMetric, argLines) // percentile
return points, err
}
func (self *rpcEAggregator) summarize(nsmetric, argLines string, newAggrs map[string]aggregator) {
items, _ := Func{}.TranslateMetricLine(nsmetric)
//ns := items[0]
metric := items[1]
tags, _, err := Func{}.TranslateArgLines(argLines)
if err != nil {
return
}
// rpc_dirpc_call & rpc_dirpc_called
if metric == MetricToBeSummarized_DirpcCallConst || metric == MetricToBeSummarized_DirpcCalledConst {
if len(tags) != 5 {
return
}
callee, _ := tags["callee"]
calleef, _ := tags["callee-func"]
caller, _ := tags["caller"]
callerf, _ := tags["caller-func"]
su, _ := tags["su"]
if !(caller != "" && callerf != "" && callee != "" && calleef != "" && su != "") {
return
}
formator := "callee=%s\ncallee-func=%s\ncaller=%s\ncaller-func=%s\nsu=%s\nrpce"
if calleef != "<all>" {
summarizedCalleef := fmt.Sprintf(formator, callee, "<all>", caller, callerf, su)
rpcEAggregator{}.addSummarizeAggregator(summarizedCalleef, self, newAggrs)
}
if callerf != "<all>" {
summarizedCallerf := fmt.Sprintf(formator, callee, calleef, caller, "<all>", su)
rpcEAggregator{}.addSummarizeAggregator(summarizedCallerf, self, newAggrs)
}
if calleef != "<all>" && callerf != "<all>" {
summarizedCalleefCallerf := fmt.Sprintf(formator, callee, "<all>", caller, "<all>", su)
rpcEAggregator{}.addSummarizeAggregator(summarizedCalleefCallerf, self, newAggrs)
}
return
}
// rpcdisf
if metric == MetricToBeSummarized_RpcdisfConst {
if len(tags) != 7 {
return
}
callee, _ := tags["callee"]
calleec, _ := tags["callee-cluster"]
calleef, _ := tags["callee-func"]
caller, _ := tags["caller"]
callerc, _ := tags["caller-cluster"]
callerf, _ := tags["caller-func"]
su, _ := tags["su"]
if !(caller != "" && callerc != "" && callerf != "" &&
callee != "" && calleec != "" && calleef != "" && su != "") {
return
}
formator := "callee=%s\ncallee-cluster=%s\ncallee-func=%s\ncaller=%s\ncaller-cluster=%s\ncaller-func=%s\nsu=%s\nrpce"
if calleef != "<all>" {
summarizedCalleef := fmt.Sprintf(formator, callee, calleec, "<all>", caller, callerc, callerf, su)
rpcEAggregator{}.addSummarizeAggregator(summarizedCalleef, self, newAggrs)
}
if callerf != "<all>" {
summarizedCallerf := fmt.Sprintf(formator, callee, calleec, calleef, caller, callerc, "<all>", su)
rpcEAggregator{}.addSummarizeAggregator(summarizedCallerf, self, newAggrs)
}
summarizedCalleefCallerf := fmt.Sprintf(formator, callee, calleec, "<all>", caller, callerc, "<all>", su)
rpcEAggregator{}.addSummarizeAggregator(summarizedCalleefCallerf, self, newAggrs)
return
}
// rpcdfe
if metric == MetricToBeSummarized_RpcdfeConst {
tagks := make([]string, 0)
for k, _ := range tags {
tagks = append(tagks, k)
}
tagkLen := len(tagks)
if tagkLen < 3 {
return
}
sort.Strings(tagks)
callee, _ := tags["callee"]
caller, _ := tags["caller"]
service, _ := tags["service"]
if !(callee != "" && caller != "" && service != "") {
return
}
// 单独聚合callee caller service schema
for k, v := range tags {
if (k == "callee" && v != "<all>") || (k == "caller" && v != "<all>") ||
(k == "service" && v != "<all>") || (k == "schema" && v != "<all>") {
formator := ""
for i := 0; i < tagkLen; i++ {
formator += tagks[i] + "=%s\n"
}
formator += "rpce"
// 聚合所有维度
ntagvs_all := make([]interface{}, tagkLen)
for i := 0; i < tagkLen; i++ {
if tagks[i] == k {
ntagvs_all[i] = "<all>"
} else {
ntagvs_all[i] = tags[tagks[i]]
}
}
summarizedTags := fmt.Sprintf(formator, ntagvs_all...)
rpcEAggregator{}.addSummarizeAggregator(summarizedTags, self, newAggrs)
}
}
// 默认聚合所有tag
self.doAggr(tags, newAggrs)
return
}
// 黑名单
// 只做默认聚合
self.doAggr(tags, newAggrs)
// 本机聚合
return
}
func (self *rpcEAggregator) merge(toMerge aggregator) (aggregator, error) {
that, ok := toMerge.(*rpcEAggregator)
if !ok {
return nil, BadSummarizeAggregatorError
}
_, err := self.histogramAggregator.merge(&that.histogramAggregator)
if err != nil {
return nil, err
}
for k, v2 := range that.Counters {
_, found := self.Counters[k]
if found {
self.Counters[k] += v2
} else {
self.Counters[k] = v2
}
}
for k, v2 := range that.Latencys {
_, found := self.Latencys[k]
if found {
self.Latencys[k] += v2
} else {
self.Latencys[k] = v2
}
}
return self, nil
}
func (self *rpcEAggregator) toMap() (map[string]interface{}, error) {
counters := map[string]interface{}{}
for k, v := range self.Counters {
counters[k] = v
}
latencys := map[string]interface{}{}
for k, v := range self.Latencys {
latencys[k] = v
}
hm, err := self.histogramAggregator.toMap()
if err != nil {
return nil, err
}
return map[string]interface{}{
"__aggregator__": "rpce",
"counters": counters,
"latencys": latencys,
"histogram": hm,
}, nil
}
func (self rpcEAggregator) fromMap(serialized map[string]interface{}) (aggregator, error) {
aggregator := &rpcEAggregator{Counters: map[string]float64{}, Latencys: map[string]float64{}}
counters := (serialized["counters"]).(map[string]interface{})
for k, v := range counters {
aggregator.Counters[k] = v.(float64)
}
latencys := (serialized["latencys"]).(map[string]interface{})
for k, v := range latencys {
aggregator.Latencys[k] = v.(float64)
}
histogram := (serialized["histogram"]).(map[string]interface{})
hm, err := self.histogramAggregator.fromMap(histogram)
if err != nil {
return nil, err
}
hmaggr, ok := hm.(*histogramAggregator)
if !ok {
return nil, BadDeserializeError
}
aggregator.histogramAggregator = *hmaggr
return aggregator, nil
}
// internal functions
func (self rpcEAggregator) addSummarizeAggregator(argLines string, toMerge *rpcEAggregator, newAggrs map[string]aggregator) {
aggr, ok := newAggrs[argLines]
if !(ok && aggr != nil) {
nAggr, err := toMerge.clone()
if err == nil {
newAggrs[argLines] = nAggr
}
} else {
aggr.merge(toMerge)
}
}
func (self *rpcEAggregator) clone() (aggregator, error) {
maps, err := self.toMap()
if err != nil {
return nil, err
}
aggr, err := rpcEAggregator{}.fromMap(maps)
if err != nil {
return nil, err
}
return aggr, nil
}
func (self *rpcEAggregator) doAggr(tags map[string]string, newAggrs map[string]aggregator, aggrTagksList ...[][]string) {
tagks := make([]string, 0)
for k, _ := range tags {
tagks = append(tagks, k)
}
tagkNum := len(tagks)
if tagkNum == 0 {
return
}
sort.Strings(tagks)
// get formator
formator := ""
for i := 0; i < tagkNum; i++ {
formator += tagks[i] + "=%s\n"
}
formator += "rpce"
// 聚合所有维度
ntagvs_all := make([]interface{}, tagkNum)
for i := 0; i < tagkNum; i++ {
ntagvs_all[i] = "<all>"
}
summarizedTags := fmt.Sprintf(formator, ntagvs_all...)
rpcEAggregator{}.addSummarizeAggregator(summarizedTags, self, newAggrs)
// 聚合指定维度
if len(aggrTagksList) > 0 {
for i := 0; i < len(aggrTagksList[0]); i++ {
aggrTagks := aggrTagksList[0][i]
// 判断合法性
if !(len(aggrTagks) > 0 && len(aggrTagks) < tagkNum && // ==tagsNum 会造成 所有维度 的重复聚合
(Func{}).IsSubKeys(aggrTagks, tags)) { // 监控数据 有 指定的聚合维度
continue
}
// 聚合
sometagks := make([]interface{}, tagkNum)
for i, tk := range tagks {
sometagks[i] = tags[tk]
}
for _, tk := range aggrTagks {
for i := 0; i < tagkNum; i++ {
if tk == tagks[i] {
sometagks[i] = "<all>"
break
}
}
}
summarizedTags := fmt.Sprintf(formator, sometagks...)
rpcEAggregator{}.addSummarizeAggregator(summarizedTags, self, newAggrs)
}
}
}

View File

@ -0,0 +1,41 @@
package statsd
import (
"sync/atomic"
"time"
)
type Clock struct {
start int64
timestamp int64
}
var clock Clock
func init() {
ts := time.Now().Unix()
clock.start = ts
clock.timestamp = ts
go clock.modify()
}
func (t *Clock) modify() {
duration := time.Duration(100) * time.Millisecond
for {
now := time.Now().Unix()
t.set(now)
time.Sleep(duration)
}
}
func (t *Clock) set(ts int64) {
atomic.StoreInt64(&t.timestamp, ts)
}
func (t *Clock) get() int64 {
return atomic.LoadInt64(&t.timestamp)
}
func GetTimestamp() int64 {
return clock.get()
}

View File

@ -0,0 +1,82 @@
package statsd
import (
"sync"
)
var (
// metrics支持的聚合类型
CommonAggregatorsConst = map[string]bool{
"c": true, "ce": true, "rpc": true, "r": true, "rt": true,
"p1": true, "p5": true, "p25": true, "p50": true, "p75": true,
"p90": true, "p95": true, "p99": true, "rpce": true,
"max": true, "min": true, "sum": true, "avg": true, "cnt": true,
"g": true,
}
HistogramAggregatorsConst = map[string]bool{
"p1": true, "p5": true, "p25": true, "p50": true, "p75": true,
"p90": true, "p95": true, "p99": true,
"max": true, "min": true, "sum": true, "avg": true, "cnt": true,
}
Const_CommonAggregator_Rpc = "rpc"
Const_CommonAggregator_RpcE = "rpce"
// rpc状态码
RpcOkCodesConst = map[string]bool{"ok": true, "0": true,
"200": true, "201": true, "203": true}
// metrics支持的最大tag数
MaxTagsCntConst = 12
// ns前缀后缀
NsPrefixConst = ""
NsSuffixConst = ""
// 需要聚合的metric
MetricToBeSummarized_RpcdisfConst = "rpcdisf"
MetricToBeSummarized_RpcdfeConst = "rpcdfe"
MetricToBeSummarized_DirpcCallConst = "rpc_dirpc_call"
MetricToBeSummarized_DirpcCalledConst = "rpc_dirpc_called"
// summarize等待collect结束的超时时间
SummarizeWaitCollectTimeoutMsConst = 2000
// traceid对应的tagk
TagTraceId = "traceid"
// LRU 缓存的大小
MaxLRUCacheSize = 10000
// 并包模式下的分隔符
MergeDelimiter = "&"
// $value,$statusCode的分隔符, 向前兼容, 使用 ","
CodeDelimiter = ","
)
var (
exitLock = &sync.RWMutex{}
isExited = false
)
func Start() {
isExited = false
// 定时从中心拉取配置
//go MetricAgentConfig{}.UpdateLoop()
// 开启监控数据上报
go StatsdReporter{}.Report()
}
func Exit() {
exitLock.Lock()
isExited = true
exitLock.Unlock()
}
func IsExited() bool {
exitLock.RLock()
r := isExited
exitLock.RUnlock()
return r
}

View File

@ -0,0 +1,43 @@
package statsd
import (
"strings"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/toolkits/pkg/logger"
)
type StatsdReceiver struct{}
func (self StatsdReceiver) HandlePacket(packet string) {
lines := strings.SplitN(packet, "\n", 3)
if len(lines) != 3 {
logger.Warningf("invalid packet, [error: missing args][packet: %s]", packet)
return
}
value := lines[0]
//
argLines, aggrs, err := Func{}.FormatArgLines(lines[2], lines[1])
if err != nil {
if err.Error() == "ignore" {
return
}
logger.Warningf("invalid packet, [error: bad tags or aggr][msg: %s][packet: %s]", err.Error(), packet)
return
}
metric, err := Func{}.FormatMetricLine(lines[1], aggrs) // metric = $ns/$metric_name
if err != nil {
logger.Warningf("invalid packet, [error: bad metric line][msg: %s][packet %s]", err.Error(), packet)
return
}
stats.Counter.Set("metric.recv.packet", 1)
err = StatsdState{}.GetState().Collect(value, metric, argLines)
if err != nil {
logger.Warningf("invalid packet, [error: collect packet error][msg: %s][packet: %s]", err.Error(), packet)
return
}
}

View File

@ -0,0 +1,255 @@
package statsd
import (
"fmt"
"strings"
"sync"
"time"
"github.com/didi/nightingale/src/common/dataobj"
"github.com/didi/nightingale/src/modules/agent/config"
"github.com/didi/nightingale/src/modules/agent/core"
"github.com/didi/nightingale/src/toolkits/exit"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/toolkits/pkg/logger"
)
type StatsdReporter struct{}
// point to n9e-agent
type Point struct {
Namespace string `json:"namespace"`
Name string `json:"name"`
Timestamp int64 `json:"timestamp"`
Tags map[string]string `json:"tags"`
Value float64 `json:"value"`
Step int `json:"step"`
}
func (self *Point) String() string {
return fmt.Sprintf("<namespace:%s, name:%s, timestamp:%d, value:%v, step:%d, tags:%v>",
self.Namespace, self.Name, self.Timestamp, self.Value, self.Step, self.Tags)
}
func (self Point) Strings(points []*Point) string {
pointsString := ""
for _, p := range points {
pointsString += p.String() + "\n"
}
return pointsString
}
var (
lastPointLock = &sync.RWMutex{}
lastPoints []*Point
)
var (
isFirstPeriod = true // metrics启动后的第一个统计周期(非线程安全)
)
func (self StatsdReporter) Report() {
// init schedule
schedule := &schedule{}
schedule.clearStateAt = self.nextTenSeconds(time.Now())
schedule.reportAt = schedule.clearStateAt
// send loop
for !IsExited() {
actions := schedule.listActions(time.Now())
if len(actions) != 0 {
self.handleActions(actions)
}
time.Sleep(time.Duration(config.Config.Metrics.ReportIntervalMs) * time.Millisecond)
}
}
func (self StatsdReporter) LastPoints() []*Point {
lastPointLock.RLock()
ret := lastPoints
lastPointLock.RUnlock()
return ret
}
func (self StatsdReporter) setLastPoints(ps []*Point) {
lastPointLock.Lock()
lastPoints = ps
lastPointLock.Unlock()
}
func (self StatsdReporter) handleActions(actions []action) {
defer func() {
if err := recover(); err != nil {
stack := exit.Stack(3)
logger.Warningf("udp handler exit unexpected, [error: %v],[stack: %s]", err, stack)
}
}()
for _, action := range actions {
switch action.actionType {
case "report":
previousState := StatsdState{}.RollState()
//previousState.Summarize() // 指标进一步聚合,得到类似<all>的tag值
// 第一个统计周期不准确, 扔掉
if isFirstPeriod {
isFirstPeriod = false
break
}
// report cnt
// proc
stats.Counter.Set("metric.cache.size", previousState.Size())
//startTs := time.Now()
cnt := self.translateAndSend(previousState, action.toTime, 10, action.prefix)
stats.Counter.Set("metric.report.cnt", cnt)
// proc
//latencyMs := int64(time.Now().Sub(startTs).Nanoseconds() / 1000000)
default:
logger.Debugf("ignored action %s", action.actionType)
}
}
}
func (self StatsdReporter) nextTenSeconds(t time.Time) time.Time {
nowSec := t.Second()
clearStateSec := ((nowSec / 10) * 10)
diff := 10 - (nowSec - clearStateSec)
t = t.Add(time.Duration(-t.Nanosecond()) * time.Nanosecond)
return t.Add(time.Duration(diff) * time.Second)
}
func (self StatsdReporter) translateAndSend(state *state, reportTime time.Time,
frequency int, prefix string) (cnt int) {
cnt = 0
// 业务上报的点
oldPoints := self.translateToPoints(state, reportTime)
// 和traceid统计/过滤相关的点
oldTrace := traceHandler.rollHandler()
tracePoints := oldTrace.dumpPoints(reportTime)
if len(tracePoints) > 0 {
oldPoints = append(oldPoints, tracePoints...)
}
self.setLastPoints(oldPoints)
if len(oldPoints) == 0 {
return
}
buffer := make([]*dataobj.MetricValue, 0)
lastNamespace := oldPoints[0].Namespace
for _, point := range oldPoints {
n9ePoint := TranslateToN9EPoint(point)
if len(buffer) >= config.Config.Metrics.ReportPacketSize || point.Namespace != lastNamespace {
core.Push(buffer)
buffer = make([]*dataobj.MetricValue, 0)
}
n9ePoint.Step = int64(frequency)
buffer = append(buffer, n9ePoint)
lastNamespace = point.Namespace
}
core.Push(buffer)
return
}
func (self StatsdReporter) translateToPoints(state *state, reportTime time.Time) []*Point {
ts := reportTime.Unix()
allPoints := make([]*Point, 0)
for rawMetric, metricState := range state.Metrics {
// 此处不考虑异常: 数据进入时 已经对metric行做了严格校验
items, _ := Func{}.TranslateMetricLine(rawMetric)
namespace := items[0]
metric := items[1]
for key, aggregator := range metricState.Aggrs {
if nil == aggregator {
continue
}
var (
tags map[string]string
err error
)
// 包含 <all> 关键字, 是聚合的结果, 不能从缓存中查询
if strings.Contains(key, "<all>") {
tags, _, err = Func{}.TranslateArgLines(key, true)
} else {
tags, _, err = Func{}.TranslateArgLines(key)
}
if err != nil {
logger.Warningf("post points to n9e-agent failed, tags/aggr error, "+
"[msg: %s][nid/metric: %s][tags/aggr: %s]", err.Error(), rawMetric, key)
continue
}
points := make([]*Point, 0)
points, err = aggregator.dump(points, ts, tags, metric, key)
if err != nil {
logger.Warningf("post points to n9e-agent failed, generate points error, "+
"[msg: %s][ns/metric: %s][tags/aggr: %s]", err.Error(), rawMetric, key)
continue
}
for _, point := range points {
point.Namespace = namespace
allPoints = append(allPoints, point)
}
}
}
return allPoints
}
func TranslateToN9EPoint(point *Point) *dataobj.MetricValue {
if point.Namespace != "" {
point.Tags["instance"] = config.Endpoint
}
obj := &dataobj.MetricValue{
Nid: point.Namespace,
Metric: point.Name,
Timestamp: point.Timestamp,
Step: int64(point.Step),
ValueUntyped: point.Value,
TagsMap: point.Tags,
}
return obj
}
//
type action struct {
actionType string
fromTime time.Time
toTime time.Time
fromFrequency int // in seconds
toFrequency int // in seconds
prefix string
}
//
type schedule struct {
clearStateAt time.Time
reportAt time.Time
}
func (self *schedule) listActions(now time.Time) []action {
actions := make([]action, 0)
if now.After(self.reportAt) {
actions = append(actions, action{
actionType: "report",
fromTime: self.reportAt.Add(-10 * time.Second),
toTime: self.reportAt,
toFrequency: 10,
prefix: "",
})
self.reportAt = StatsdReporter{}.nextTenSeconds(now)
}
return actions
}

View File

@ -0,0 +1,287 @@
package statsd
import (
"fmt"
"sync"
"time"
"github.com/didi/nightingale/src/toolkits/stats"
"github.com/toolkits/pkg/logger"
)
var (
currentState = &state{Metrics: map[string]*metricState{}, packageCounter: map[string]int{}}
currentStateLock = &sync.RWMutex{}
)
type StatsdState struct{}
func (self StatsdState) GetState() *state {
currentStateLock.RLock()
ptr := currentState
currentStateLock.RUnlock()
return ptr
}
func (self StatsdState) RollState() *state {
currentStateLock.Lock()
oldState := currentState
newState := &state{
Metrics: map[string]*metricState{},
packageCounter: map[string]int{},
}
currentState = newState
currentStateLock.Unlock()
return oldState
}
////////////////////////////////////////////////////////////
// struct state
// 所有metric 的 所有tag组合 的 统计器, 全局只有一个
////////////////////////////////////////////////////////////
type state struct {
isCollecting bool
Metrics map[string]*metricState
packageCounter map[string]int // 每个ns/metric的请求数统计, 用于INFO日志
}
// @input
// value: $value 或者 $value,$status "," 就是 ${CodeDelimiter}
// 并包模式下 $value${MergeDelimeter}$value 或者 $value,$status${MergeDelimeter}$value,$status
// metric: $ns/$metric_name
// argLines:$tagk1=$tagv2\n...$tagkN=$tagvN\n$aggr
func (self *state) Collect(value string, metric string, argLines string) error {
self.isCollecting = true
metricState, err := self.getMetricState(metric)
if err != nil {
self.isCollecting = false
return err
}
// Metrics 与 packageCounter的 map key 相同
if _, found := self.packageCounter[metric]; !found {
self.packageCounter[metric] = 1
} else {
self.packageCounter[metric] += 1
}
err = metricState.Collect(value, metric, argLines)
self.isCollecting = false
return err
}
func (self *state) Size() int {
cnt := 0
for _, ms := range self.Metrics {
cnt += len(ms.Aggrs)
}
return cnt
}
func (self *state) ToMap() (map[string]interface{}, error) {
serialized := map[string]interface{}{}
for k, v := range self.Metrics {
m, err := v.ToMap()
if err != nil {
return nil, err
}
serialized[k] = m
}
return map[string]interface{}{"metrics": serialized}, nil
}
func (self *state) Summarize() {
// 等待最后一次Collect执行完毕, 避免state内存区的读写冲突
var waitMs int
for waitMs = 0; waitMs < SummarizeWaitCollectTimeoutMsConst; waitMs += 5 {
time.Sleep(5 * time.Millisecond)
if !self.isCollecting {
break
}
}
if self.isCollecting {
logger.Warningf("summarize wait collect timeout(%dms), summarize skipped", SummarizeWaitCollectTimeoutMsConst)
return
}
// 调试信息
if waitMs > 0 {
logger.Debugf("system info: summarize wait collect %dms", waitMs)
}
for nsmetric, ms := range self.Metrics {
ms.Summarize(nsmetric)
}
}
func (self *state) getMetricState(metricName string) (*metricState, error) {
metric, ok := self.Metrics[metricName]
if ok && metric != nil {
return metric, nil
}
metric = &metricState{Aggrs: map[string]aggregator{}}
self.Metrics[metricName] = metric
return metric, nil
}
////////////////////////////////////////////////////////////
// struct metricState
// 一个metric 的 所有tag组合的 统计器
////////////////////////////////////////////////////////////
type metricState struct {
Aggrs map[string]aggregator
}
// @input
// value: $value 或者 $value,$status, "," 就是 ${CodeDelimiter}
// 并包模式下 $value${MergeDelimeter}$value 或者 $value,$status${MergeDelimeter}$value,$status
// metric: $ns/$metric_name
// argLines:$tagk1=$tagv2\n...$tagkN=$tagvN\n$aggr
func (self *metricState) Collect(value string, metric string, argLines string) error {
aggregator, err := self.getAggregator(value, metric, argLines)
if err != nil {
return err
}
values, err := Func{}.TranslateValueLine(value)
if err != nil {
return err
}
// 记录实际的打点请求数
stats.Counter.Set("metric.recv.cnt", len(values))
return aggregator.collect(values, metric, argLines)
}
func (self *metricState) ToMap() (map[string]interface{}, error) {
maps := map[string]interface{}{}
for k, v := range self.Aggrs {
m, err := v.toMap()
if err != nil {
return nil, err
}
maps[k] = m
}
return map[string]interface{}{"aggrs": maps}, nil
}
func (self *metricState) Summarize(nsmetric string) {
if len(self.Aggrs) == 0 {
return
}
newAggrs := make(map[string]aggregator, 0)
// copy
for argLines, aggr := range self.Aggrs {
key := argLines
ptrAggr := aggr
newAggrs[key] = ptrAggr
}
// summarize
for argLines, aggr := range self.Aggrs {
key := argLines
ptrAggr := aggr
if ptrAggr == nil {
continue
}
ptrAggr.summarize(nsmetric, key, newAggrs)
}
self.Aggrs = newAggrs
}
func (self *metricState) getAggregator(value, metric, argLines string) (aggregator, error) {
aggr, ok := self.Aggrs[argLines]
if ok && aggr != nil {
return aggr, nil
}
// 创建 聚合器
aggregatorNames, err := Func{}.GetAggrsFromArgLines(argLines)
if err != nil {
return nil, err
}
aggr, err = self.createAggregator(aggregatorNames, value, metric, argLines)
if err != nil {
return nil, err
}
self.Aggrs[argLines] = aggr
return aggr, nil
}
func (self *metricState) createAggregator(aggregatorNames []string, value, metric, argLines string) (aggregator, error) {
switch aggregatorNames[0] {
case "c":
return (&counterAggregator{}).new(aggregatorNames)
case "ce":
return (&counterEAggregator{}).new(aggregatorNames)
case "g":
return (&gaugeAggregator{}).new(aggregatorNames)
case "rpc":
return (&rpcAggregator{}).new(aggregatorNames)
case "rpce":
return (&rpcEAggregator{}).new(aggregatorNames)
case "r":
return (&ratioAggregator{}).new(aggregatorNames)
case "rt":
return (&ratioAsTagsAggregator{}).new(aggregatorNames)
case "p1", "p5", "p25", "p50", "p75", "p90", "p95", "p99", "max", "min", "avg", "sum", "cnt":
return (&histogramAggregator{}).new(aggregatorNames)
default:
return nil, fmt.Errorf("unknown aggregator %s", argLines)
}
}
// internals
func (self state) StateFromMap(serialized map[string]interface{}) (*state, error) {
state := &state{Metrics: map[string]*metricState{}}
for k, v := range serialized {
ms, err := (metricState{}.MetricFromMap(v.(map[string]interface{})))
if err != nil {
return nil, err
}
state.Metrics[k] = ms
}
return state, nil
}
func (self metricState) MetricFromMap(serialized map[string]interface{}) (*metricState, error) {
metricState := &metricState{Aggrs: map[string]aggregator{}}
keys := (serialized["aggrs"]).(map[string]interface{})
for k, v := range keys {
ret, err := self.aggregatorFromMap(v.(map[string]interface{}))
if err != nil {
return nil, err
}
metricState.Aggrs[k] = ret
}
return metricState, nil
}
func (self metricState) aggregatorFromMap(serialized map[string]interface{}) (aggregator, error) {
switch serialized["__aggregator__"] {
case "counter":
return (&counterAggregator{}).fromMap(serialized)
case "counterE":
return (&counterEAggregator{}).fromMap(serialized)
case "gauge":
return (&gaugeAggregator{}).fromMap(serialized)
case "ratio":
return (&ratioAggregator{}).fromMap(serialized)
case "ratioAsTags":
return (&ratioAsTagsAggregator{}).fromMap(serialized)
case "histogram":
return (&histogramAggregator{}).fromMap(serialized)
case "rpc":
return (&rpcAggregator{}).fromMap(serialized)
case "rpce":
return (&rpcEAggregator{}).fromMap(serialized)
default:
return nil, fmt.Errorf("unknown aggregator: %v", serialized)
}
}

View File

@ -0,0 +1,420 @@
package statsd
import (
"fmt"
"sort"
"strings"
"sync"
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/spaolacci/murmur3"
)
type Func struct{}
var (
BadRpcMetricError = fmt.Errorf("bad rpc metric")
BadSummarizeAggregatorError = fmt.Errorf("bad summarize aggregator")
BadDeserializeError = fmt.Errorf("bad deserialize")
BadAggregatorNameError = fmt.Errorf("bad aggregator name")
cache *lru.Cache
)
func init() {
cache, _ = lru.New(MaxLRUCacheSize)
}
type ArgCacheUnit struct {
Aggrs []string
Tags map[string]string
ArgLine string
Error error
}
func NewArgCacheUnitWithError(err error) *ArgCacheUnit {
return &ArgCacheUnit{
Aggrs: []string{},
Tags: make(map[string]string),
ArgLine: "",
Error: err,
}
}
func NewArgCacheUnit(argline string, aggrs []string,
tags map[string]string) *ArgCacheUnit {
return &ArgCacheUnit{
Aggrs: aggrs,
Tags: tags,
ArgLine: argline,
Error: nil,
}
}
// tags+aggr lines
func (f Func) FormatArgLines(argLines string, metricLines string) (string, []string, error) {
// BUG: hash碰撞下可能出现问题, 暂时不处理
key := murmur3.Sum32([]byte(argLines))
value, found := cache.Get(key)
if found {
unit, ok := value.(*ArgCacheUnit)
if ok {
return unit.ArgLine, unit.Aggrs, unit.Error
}
}
tags, agg, err := f.TranslateArgLines(argLines, true)
if err != nil {
cache.Add(key, NewArgCacheUnitWithError(err))
return "", []string{}, fmt.Errorf("translate to tags error, [lines: %s][error: %s]", argLines, err.Error())
}
// check
if err := f.checkTags(tags); err != nil {
cache.Add(key, NewArgCacheUnitWithError(err))
return "", []string{}, err
}
aggrs, err := f.formatAggr(agg)
if err != nil {
cache.Add(key, NewArgCacheUnitWithError(err))
return "", []string{}, err
}
if len(tags) == 0 {
cache.Add(key, NewArgCacheUnit(argLines, aggrs, tags))
return argLines, aggrs, nil
}
traceExist := false
if traceid, found := tags[TagTraceId]; found {
traceExist = true
delete(tags, TagTraceId)
ignore := traceHandler.collectAndIgnore(metricLines, traceid)
if ignore {
return "", []string{}, fmt.Errorf("ignore")
}
}
newLines := []string{}
var keys []string
for k, _ := range tags {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
v := tags[k]
if v == "<all>" { // <all>是关键字, 需要去重
v = "all"
tags[k] = v // 缓存的tags 需要更新,保持一致
}
newLines = append(newLines, fmt.Sprintf("%s=%s", k, v))
}
newLines = append(newLines, agg)
newArgLines := strings.Join(newLines, "\n")
// 包含了traceid, 没有必要缓存, 基本不会命中
if !traceExist {
cache.Add(key, NewArgCacheUnit(newArgLines, aggrs, tags))
// argLine重新排序后发生了变化(tag map有关), 新的argLine也要缓存
if argLines != newArgLines {
newKey := murmur3.Sum32([]byte(newArgLines))
cache.Add(newKey, NewArgCacheUnit(newArgLines, aggrs, tags))
}
}
return newArgLines, aggrs, nil
}
func (f Func) GetAggrsFromArgLines(argLines string) ([]string, error) {
key := murmur3.Sum32([]byte(argLines))
value, found := cache.Get(key)
if found {
unit, ok := value.(*ArgCacheUnit)
if ok {
return unit.Aggrs, unit.Error
}
}
lines := strings.Split(argLines, "\n")
lineSize := len(lines)
if lineSize == 0 {
return nil, fmt.Errorf("empty aggr")
}
return strings.Split(lines[lineSize-1], ","), nil
}
func (f Func) TranslateArgLines(argLines string, aggrNeed ...bool) (map[string]string, string, error) {
// 只需要提取tags参数, 尝试从缓存中获取
if len(aggrNeed) == 0 {
key := murmur3.Sum32([]byte(argLines))
value, found := cache.Get(key)
if found {
unit, ok := value.(*ArgCacheUnit)
if ok {
return unit.Tags, "", unit.Error
}
}
}
// 缓存中不存在, 执行解析 or 不允许从缓存中查询
tags := make(map[string]string)
lines := strings.Split(argLines, "\n")
lineSize := len(lines)
if lineSize == 0 {
return tags, "", fmt.Errorf("empty aggr")
}
agg := lines[lineSize-1]
if lineSize == 1 {
return tags, agg, nil
}
for _, line := range lines[:lineSize-1] {
parts := strings.SplitN(line, "=", 2)
if len(parts) == 2 {
tags[parts[0]] = parts[1]
} else {
return nil, "", fmt.Errorf("bad tag [%s]", line)
}
}
return tags, agg, nil
}
func (f Func) checkTags(tags map[string]string) error {
tcnt := len(tags)
if tcnt > MaxTagsCntConst {
return fmt.Errorf("too many tags %v", tags)
}
return nil
}
func (f Func) TrimRpcCallee(callee string) string {
callee = strings.Replace(callee, "://", "|", -1)
return strings.Replace(callee, ":", "|", -1)
}
// metric line: $ns/$raw-metric
func (f Func) FormatMetricLine(metricLine string, aggrs []string) (string, error) {
ret, err := f.TranslateMetricLine(metricLine)
if err != nil {
return "", err
}
if len(ret) != 2 {
return "", fmt.Errorf("bad metric line, missing ns or metric")
}
// ns
ns := ret[0]
if !strings.HasPrefix(ns, NsPrefixConst) {
ns = NsPrefixConst + ns
}
if !strings.HasSuffix(ns, NsSuffixConst) {
ns = ns + NsSuffixConst
}
// metric
metric := ret[1]
if len(aggrs) > 0 &&
(aggrs[0] == Const_CommonAggregator_Rpc || aggrs[0] == Const_CommonAggregator_RpcE) {
// metric: rpc统计类型 必须以rpc开头
if !strings.HasPrefix(metric, "rpc") {
metric = "rpc_" + metric
}
}
return fmt.Sprintf("%s/%s", ns, metric), nil
}
func (f Func) TranslateMetricLine(metricLine string) ([]string, error) {
return strings.SplitN(metricLine, "/", 2), nil
}
// aggr line
func (f Func) formatAggr(aggr string) ([]string, error) {
aggrNames, err := f.translateAggregator(aggr)
if err != nil {
return []string{}, err
}
if len(aggrNames) == 1 {
aggrName := aggrNames[0]
if _, ok := CommonAggregatorsConst[aggrName]; !ok {
return []string{}, fmt.Errorf("bad aggregator %s", aggrName)
}
} else {
for _, aggrName := range aggrNames {
if _, ok := HistogramAggregatorsConst[aggrName]; !ok {
return []string{}, fmt.Errorf("bad aggregator %s", aggrName)
}
}
}
return aggrNames, nil
}
func (f Func) translateAggregator(aggr string) ([]string, error) {
if len(aggr) == 0 {
return nil, fmt.Errorf("emtpy aggr")
}
return strings.Split(aggr, ","), nil
}
// value line
// 拆解为子字符串, 根据协议不同, 每个协议单独对子串进行处理
func (f Func) TranslateValueLine(valueLine string) ([]string, error) {
if len(valueLine) == 0 {
return nil, fmt.Errorf("empty value line")
}
return strings.Split(valueLine, MergeDelimiter), nil
}
//
func (f Func) IsOk(code string) bool {
if ok, exist := RpcOkCodesConst[code]; exist && ok {
return true
}
return false
}
// 检查 a是否为b的keys的子集(subKeys)
func (f Func) IsSubKeys(a []string, b map[string]string) bool {
isAllSub := true
for i := 0; i < len(a) && isAllSub; i++ {
isSub := false
for k, _ := range b {
if a[i] == k {
isSub = true
break
}
}
if !isSub {
isAllSub = false
}
}
return isAllSub
}
// 检查 排序字符串数组数组 a中是否有完全相同的数组
func (f Func) HasSameSortedArray(a [][]string) bool {
hasSameArray := false
for i := 0; i < len(a) && !hasSameArray; i++ {
for k := i + 1; k < len(a) && !hasSameArray; k++ {
t1 := a[i]
t2 := a[k]
if len(t1) != len(t2) {
continue
}
isEqualArray := true
for j := 0; j < len(t1) && isEqualArray; j++ {
if t1[j] != t2[j] {
isEqualArray = false
}
}
if isEqualArray {
hasSameArray = true
}
}
}
return hasSameArray
}
// consts不能被修改, vars可以被修改
func (f Func) MergeSortedArrays(consts, vars [][]string) [][]string {
for i := 0; i < len(consts); i++ {
// check same
hasSame := false
for j := 0; j < len(vars) && !hasSame; j++ {
if len(consts[i]) != len(vars[j]) {
continue
}
isAllItemSame := true
for k := 0; k < len(consts[i]) && isAllItemSame; k++ {
if consts[i][k] != vars[j][k] {
isAllItemSame = false
}
}
if isAllItemSame {
hasSame = true
}
}
if !hasSame {
vars = append(vars, consts[i])
}
}
return vars
}
type TraceHandler struct {
sync.RWMutex
SecurityScanCounter map[string]float64 // map[ns]counter
}
var traceHandler = &TraceHandler{SecurityScanCounter: map[string]float64{}}
func (t *TraceHandler) rollHandler() *TraceHandler {
t.Lock()
defer t.Unlock()
old := &TraceHandler{SecurityScanCounter: map[string]float64{}}
old.SecurityScanCounter = t.SecurityScanCounter
t.SecurityScanCounter = make(map[string]float64)
return old
}
// 后续可以做很多, 比如打印日志,关联把脉 等
func (t *TraceHandler) collectAndIgnore(nsMetric string, traceid string) bool {
t.Lock()
defer t.Unlock()
ignore := false
if strings.HasSuffix(traceid, "ff") {
ignore = true
if _, found := t.SecurityScanCounter[nsMetric]; !found {
t.SecurityScanCounter[nsMetric] = 1
} else {
t.SecurityScanCounter[nsMetric] += 1
}
}
return ignore
}
// 不需要加锁, 单线程不会并发
func (t *TraceHandler) dumpPoints(reportTime time.Time) []*Point {
var ret []*Point
if len(t.SecurityScanCounter) == 0 {
return ret
}
ts := reportTime.Unix()
for nsMetric, counter := range t.SecurityScanCounter {
slice := strings.Split(nsMetric, "/")
if len(slice) != 2 {
continue
}
ns := slice[0]
if !strings.HasPrefix(ns, NsPrefixConst) {
ns = NsPrefixConst + ns
}
ret = append(ret, &Point{
Namespace: ns,
Name: "security.scan.counter",
Timestamp: ts,
Tags: map[string]string{
"metric": slice[1],
},
Value: counter,
})
}
return ret
}

View File

@ -0,0 +1,41 @@
package udp
import (
"sync"
"github.com/didi/nightingale/src/modules/agent/statsd"
"github.com/didi/nightingale/src/toolkits/exit"
"github.com/toolkits/pkg/logger"
)
var ByteSlicePool = sync.Pool{
New: func() interface{} {
return make([]byte, 4096, 4096)
}}
func handleUdpPackets() {
defer func() {
if err := recover(); err != nil {
stack := exit.Stack(3)
logger.Warningf("udp handler exit unexpected, [error: %v],[stack: %s]", err, stack)
panic(err) // udp异常, 为保证metrics功能完备性, 快速panic
}
// 停止udp服务
stop()
}()
message := ByteSlicePool.Get().([]byte)
for !statsd.IsExited() {
n, _, err := udpConn.ReadFrom(message)
if err != nil {
logger.Warningf("read from udp error, [error: %s]", err.Error())
continue
}
packet := string(message[0:n])
ByteSlicePool.Put(message)
logger.Debugf("recv packet: %v\n", packet)
statsd.StatsdReceiver{}.HandlePacket(packet)
}
}

View File

@ -0,0 +1,42 @@
package udp
import (
"fmt"
"log"
"net"
"github.com/didi/nightingale/src/modules/agent/config"
)
var (
udpConn *net.UDPConn = nil
)
func Start() {
if !config.Config.Udp.Enable {
log.Println("udp server disabled")
return
}
address, _ := net.ResolveUDPAddr("udp4", config.Config.Udp.Listen)
conn, err := net.ListenUDP("udp4", address)
if err != nil {
errsmg := fmt.Sprintf("listen udp error, [addr: %s][error: %s]", config.Config.Udp.Listen, err.Error())
log.Printf(errsmg)
panic(errsmg)
}
log.Println("udp start, listening on ", config.Config.Udp.Listen)
// 保存 udp服务链接
udpConn = conn
// 开启 udp数据包处理进程
go handleUdpPackets()
}
func stop() error {
if udpConn != nil {
udpConn.Close()
}
return nil
}

View File

@ -86,7 +86,7 @@ func popEvent(queues []interface{}) (*models.Event, bool) {
var curNodePath string
node, err := models.NodeGet("id=?", stra.Nid)
if err != nil {
if err != nil || node == nil {
logger.Warningf("get node failed, node id: %v, event: %+v, err: %v", stra.Nid, event, err)
} else {
nodePath = node.Path

View File

@ -270,6 +270,10 @@ func HostBindingsForMon(endpointList []string) ([]string, error) {
return list, err
}
if node == nil {
continue
}
list = append(list, node.Path)
}
return list, nil

View File

@ -28,6 +28,8 @@ func Config(r *gin.Engine) {
notLogin.POST("/auth/send-rst-code-by-sms", sendRstCodeBySms)
notLogin.POST("/auth/rst-password", rstPassword)
notLogin.GET("/v2/nodes", nodeGets)
}
hbs := r.Group("/api/hbs")

92
src/toolkits/exit/exit.go Normal file
View File

@ -0,0 +1,92 @@
package exit
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"runtime"
)
var (
dunno = []byte("???")
centerDot = []byte("·")
dot = []byte(".")
slash = []byte("/")
)
func OnExit(onexits ...func()) {
if err := recover(); err != nil {
stack := Stack(3)
log.Println("\napp exit unexpected, \n[error]: %v\n[stack]: %s", err, stack)
}
if len(onexits) != 0 {
for _, f := range onexits {
if f != nil {
f()
}
}
}
}
// stack returns a nicely formated stack frame, skipping skip frames
func Stack(skip int) []byte {
buf := new(bytes.Buffer) // the returned data
// As we loop, we open files and read them. These variables record the currently
// loaded file.
var lines [][]byte
var lastFile string
for i := skip; ; i++ { // Skip the expected number of frames
pc, file, line, ok := runtime.Caller(i)
if !ok {
break
}
// Print this much at least. If we can't find the source, it won't show.
fmt.Fprintf(buf, "%s:%d (0x%x)\n", file, line, pc)
if file != lastFile {
data, err := ioutil.ReadFile(file)
if err != nil {
continue
}
lines = bytes.Split(data, []byte{'\n'})
lastFile = file
}
fmt.Fprintf(buf, "\t%s: %s\n", function(pc), source(lines, line))
}
return buf.Bytes()
}
// source returns a space-trimmed slice of the n'th line.
func source(lines [][]byte, n int) []byte {
n-- // in stack trace, lines are 1-indexed but our array is 0-indexed
if n < 0 || n >= len(lines) {
return dunno
}
return bytes.TrimSpace(lines[n])
}
// function returns, if possible, the name of the function containing the PC.
func function(pc uintptr) []byte {
fn := runtime.FuncForPC(pc)
if fn == nil {
return dunno
}
name := []byte(fn.Name())
// The name includes the path name to the package, which is unnecessary
// since the file name is already included. Plus, it has center dots.
// That is, we see
// runtime/debug.*T·ptrmethod
// and want
// *T.ptrmethod
// Also the package path might contains dot (e.g. code.google.com/...),
// so first eliminate the path prefix
if lastslash := bytes.LastIndex(name, slash); lastslash >= 0 {
name = name[lastslash+1:]
}
if period := bytes.Index(name, dot); period >= 0 {
name = name[period+1:]
}
name = bytes.Replace(name, centerDot, dot, -1)
return name
}

View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2015 Caio Romão Costa Nascimento
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,55 @@
# T-Digest
A map-reduce and parallel streaming friendly data-structure for accurate
quantile approximation.
This package provides a very crude implementation of Ted Dunning's t-digest
data structure in Go.
[![Build Status](https://travis-ci.org/caio/go-tdigest.svg?branch=master)](https://travis-ci.org/caio/go-tdigest)
[![GoDoc](https://godoc.org/github.com/caio/go-tdigest?status.svg)](http://godoc.org/github.com/caio/go-tdigest)
[![Coverage](http://gocover.io/_badge/github.com/caio/go-tdigest)](http://gocover.io/github.com/caio/go-tdigest)
[![Go Report Card](https://goreportcard.com/badge/github.com/caio/go-tdigest)](https://goreportcard.com/report/github.com/caio/go-tdigest)
## Installation
go get github.com/caio/go-tdigest
## Usage
package main
import (
"fmt"
"math/rand"
"github.com/caio/go-tdigest"
)
func main() {
var t = tdigest.New(100)
for i := 0; i < 10000; i++ {
t.Add(rand.Float64(), 1)
}
fmt.Printf("p(.5) = %.6f\n", t.Quantile(0.5))
}
## Disclaimer
I've written this solely with the purpose of understanding how the
data-structure works, it hasn't been throughly verified nor battle tested
in a production environment.
## References
This is a very simple port of the [reference][1] implementation with some
ideas borrowed from the [python version][2]. If you wanna get a quick grasp of
how it works and why it's useful, [this video and companion article is pretty
helpful][3].
[1]: https://github.com/tdunning/t-digest
[2]: https://github.com/CamDavidsonPilon/tdigest
[3]: https://www.mapr.com/blog/better-anomaly-detection-t-digest-whiteboard-walkthrough

View File

@ -0,0 +1,131 @@
package tdigest
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
)
const smallEncoding int32 = 2
var endianess = binary.BigEndian
// AsBytes serializes the digest into a byte array so it can be
// saved to disk or sent over the wire.
func (t TDigest) AsBytes() ([]byte, error) {
buffer := new(bytes.Buffer)
err := binary.Write(buffer, endianess, smallEncoding)
if err != nil {
return nil, err
}
err = binary.Write(buffer, endianess, t.compression)
if err != nil {
return nil, err
}
err = binary.Write(buffer, endianess, int32(t.summary.Len()))
if err != nil {
return nil, err
}
var x float64
t.summary.Iterate(func(item centroid) bool {
delta := item.mean - x
x = item.mean
err = binary.Write(buffer, endianess, float32(delta))
return err == nil
})
if err != nil {
return nil, err
}
t.summary.Iterate(func(item centroid) bool {
err = encodeUint(buffer, item.count)
return err == nil
})
if err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
// FromBytes reads a byte buffer with a serialized digest (from AsBytes)
// and deserializes it.
func FromBytes(buf *bytes.Reader) (*TDigest, error) {
var encoding int32
err := binary.Read(buf, endianess, &encoding)
if err != nil {
return nil, err
}
if encoding != smallEncoding {
return nil, fmt.Errorf("Unsupported encoding version: %d", encoding)
}
var compression float64
err = binary.Read(buf, endianess, &compression)
if err != nil {
return nil, err
}
t := New(compression)
var numCentroids int32
err = binary.Read(buf, endianess, &numCentroids)
if err != nil {
return nil, err
}
if numCentroids < 0 || numCentroids > 1<<22 {
return nil, errors.New("bad number of centroids in serialization")
}
means := make([]float64, numCentroids)
var delta float32
var x float64
for i := 0; i < int(numCentroids); i++ {
err = binary.Read(buf, endianess, &delta)
if err != nil {
return nil, err
}
x += float64(delta)
means[i] = x
}
for i := 0; i < int(numCentroids); i++ {
decUint, err := decodeUint(buf)
if err != nil {
return nil, err
}
t.Add(means[i], decUint)
}
return t, nil
}
func encodeUint(buf *bytes.Buffer, n uint32) error {
var b [binary.MaxVarintLen32]byte
l := binary.PutUvarint(b[:], uint64(n))
buf.Write(b[:l])
return nil
}
func decodeUint(buf *bytes.Reader) (uint32, error) {
v, err := binary.ReadUvarint(buf)
if v > 0xffffffff {
return 0, errors.New("Something wrong, this number looks too big")
}
return uint32(v), err
}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,202 @@
package tdigest
import (
"fmt"
"math"
"sort"
)
type centroid struct {
mean float64
count uint32
index int
}
func (c centroid) isValid() bool {
return !math.IsNaN(c.mean) && c.count > 0
}
func (c *centroid) Update(x float64, weight uint32) {
c.count += weight
c.mean += float64(weight) * (x - c.mean) / float64(c.count)
}
var invalidCentroid = centroid{mean: math.NaN(), count: 0}
type summary struct {
keys []float64
counts []uint32
}
func newSummary(initialCapacity uint) *summary {
return &summary{
keys: make([]float64, 0, initialCapacity),
counts: make([]uint32, 0, initialCapacity),
}
}
func (s summary) Len() int {
return len(s.keys)
}
func (s *summary) Add(key float64, value uint32) error {
if math.IsNaN(key) {
return fmt.Errorf("Key must not be NaN")
}
if value == 0 {
return fmt.Errorf("Count must be >0")
}
idx := s.FindIndex(key)
if s.meanAtIndexIs(idx, key) {
s.updateAt(idx, key, value)
return nil
}
s.keys = append(s.keys, math.NaN())
s.counts = append(s.counts, 0)
copy(s.keys[idx+1:], s.keys[idx:])
copy(s.counts[idx+1:], s.counts[idx:])
s.keys[idx] = key
s.counts[idx] = value
return nil
}
func (s summary) Find(x float64) centroid {
idx := s.FindIndex(x)
if idx < s.Len() && s.keys[idx] == x {
return centroid{x, s.counts[idx], idx}
}
return invalidCentroid
}
func (s summary) FindIndex(x float64) int {
// FIXME When is linear scan better than binsearch()?
// should I even bother?
if len(s.keys) < 30 {
for i, item := range s.keys {
if item >= x {
return i
}
}
return len(s.keys)
}
return sort.Search(len(s.keys), func(i int) bool {
return s.keys[i] >= x
})
}
func (s summary) At(index int) centroid {
if s.Len()-1 < index || index < 0 {
return invalidCentroid
}
return centroid{s.keys[index], s.counts[index], index}
}
func (s summary) Iterate(f func(c centroid) bool) {
for i := 0; i < s.Len(); i++ {
if !f(centroid{s.keys[i], s.counts[i], i}) {
break
}
}
}
func (s summary) Min() centroid {
return s.At(0)
}
func (s summary) Max() centroid {
return s.At(s.Len() - 1)
}
func (s summary) Data() []centroid {
data := make([]centroid, 0, s.Len())
s.Iterate(func(c centroid) bool {
data = append(data, c)
return true
})
return data
}
func (s summary) successorAndPredecessorItems(mean float64) (centroid, centroid) {
idx := s.FindIndex(mean)
return s.At(idx + 1), s.At(idx - 1)
}
func (s summary) ceilingAndFloorItems(mean float64) (centroid, centroid) {
idx := s.FindIndex(mean)
// Case 1: item is greater than all items in the summary
if idx == s.Len() {
return invalidCentroid, s.Max()
}
item := s.At(idx)
// Case 2: item exists in the summary
if item.isValid() && mean == item.mean {
return item, item
}
// Case 3: item is smaller than all items in the summary
if idx == 0 {
return s.Min(), invalidCentroid
}
return item, s.At(idx - 1)
}
func (s summary) sumUntilMean(mean float64) uint32 {
var cumSum uint32
for i := range s.keys {
if s.keys[i] < mean {
cumSum += s.counts[i]
} else {
break
}
}
return cumSum
}
func (s *summary) updateAt(index int, mean float64, count uint32) {
c := centroid{s.keys[index], s.counts[index], index}
c.Update(mean, count)
oldMean := s.keys[index]
s.keys[index] = c.mean
s.counts[index] = c.count
if c.mean > oldMean {
s.adjustRight(index)
} else if c.mean < oldMean {
s.adjustLeft(index)
}
}
func (s *summary) adjustRight(index int) {
for i := index + 1; i < len(s.keys) && s.keys[i-1] > s.keys[i]; i++ {
s.keys[i-1], s.keys[i] = s.keys[i], s.keys[i-1]
s.counts[i-1], s.counts[i] = s.counts[i], s.counts[i-1]
}
}
func (s *summary) adjustLeft(index int) {
for i := index - 1; i >= 0 && s.keys[i] > s.keys[i+1]; i-- {
s.keys[i], s.keys[i+1] = s.keys[i+1], s.keys[i]
s.counts[i], s.counts[i+1] = s.counts[i+1], s.counts[i]
}
}
func (s summary) meanAtIndexIs(index int, mean float64) bool {
return index < len(s.keys) && s.keys[index] == mean
}

View File

@ -0,0 +1,239 @@
package tdigest
import (
"math"
"math/rand"
"sort"
"testing"
)
func TestBasics(t *testing.T) {
s := newSummary(2)
for _, n := range []float64{12, 13, 14, 15} {
item := s.Find(n)
if item.isValid() {
t.Errorf("Found something for non existing key %.0f: %v", n, item)
}
}
err := s.Add(1, 1)
if err != nil {
t.Errorf("Failed to add simple item")
}
if s.Add(math.NaN(), 1) == nil {
t.Errorf("Adding math.NaN() shouldn't be allowed")
}
if s.Add(1, 0) == nil {
t.Errorf("Adding count=0 shouldn't be allowed")
}
}
func checkSorted(s *summary, t *testing.T) {
if !sort.Float64sAreSorted(s.keys) {
t.Fatalf("Keys are not sorted! %v", s.keys)
}
}
func TestCore(t *testing.T) {
testData := make(map[float64]uint32)
const maxDataSize = 10000
s := newSummary(maxDataSize)
checkSorted(s, t)
if s.Len() != 0 {
t.Errorf("Initial size should be zero regardless of capacity. Got %d", s.Len())
}
for i := 0; i < maxDataSize; i++ {
k := rand.Float64()
v := rand.Uint32()
err := s.Add(k, v)
if err != nil {
_, exists := testData[k]
if !exists {
t.Errorf("Failed to insert %.2f even though it doesn't exist yet", k)
}
}
testData[k] = v
}
checkSorted(s, t)
if s.Len() != len(testData) {
t.Errorf("Got Len() == %d. Expected %d", s.Len(), len(testData))
}
for k, v := range testData {
c := s.Find(k)
if !c.isValid() || c.count != v {
t.Errorf("Find(%.0f) returned %d, expected %d", k, c.count, v)
}
}
}
func TestGetAt(t *testing.T) {
data := make(map[int]uint32)
const maxDataSize = 1000
s := newSummary(maxDataSize)
c := s.At(0)
if c.isValid() {
t.Errorf("At() on an empty structure should give invalid data. Got %v", c)
}
for i := 0; i < maxDataSize; i++ {
data[i] = rand.Uint32()
s.Add(float64(i), data[i])
}
for i, v := range data {
c := s.At(i)
if !c.isValid() || c.count != v {
t.Errorf("At(%d) = %d. Should've been %d", i, c.count, v)
}
}
c = s.At(s.Len())
if c.isValid() {
t.Errorf("At() past the slice length should give invalid data")
}
c = s.At(-10)
if c.isValid() {
t.Errorf("At() with negative index should give invalid data")
}
}
func TestIterate(t *testing.T) {
s := newSummary(10)
for _, i := range []uint32{1, 2, 3, 4, 5, 6} {
s.Add(float64(i), i*10)
}
c := 0
s.Iterate(func(i centroid) bool {
c++
return false
})
if c != 1 {
t.Errorf("Iterate must exit early if the closure returns false")
}
var tot uint32
s.Iterate(func(i centroid) bool {
tot += i.count
return true
})
if tot != 210 {
t.Errorf("Iterate must walk through the whole data if it always returns true")
}
}
func TestCeilingAndFloor(t *testing.T) {
s := newSummary(100)
ceil, floor := s.ceilingAndFloorItems(1)
if ceil.isValid() || floor.isValid() {
t.Errorf("Empty centroids must return invalid ceiling and floor items")
}
s.Add(0.4, 1)
ceil, floor = s.ceilingAndFloorItems(0.3)
if floor.isValid() || ceil.mean != 0.4 {
t.Errorf("Expected to find a ceil and NOT find a floor. ceil=%v, floor=%v", ceil, floor)
}
ceil, floor = s.ceilingAndFloorItems(0.5)
if ceil.isValid() || floor.mean != 0.4 {
t.Errorf("Expected to find a floor and NOT find a ceiling. ceil=%v, floor=%v", ceil, floor)
}
s.Add(0.1, 2)
ceil, floor = s.ceilingAndFloorItems(0.2)
if ceil.mean != 0.4 || floor.mean != 0.1 {
t.Errorf("Expected to find a ceiling and a floor. ceil=%v, floor=%v", ceil, floor)
}
s.Add(0.21, 3)
ceil, floor = s.ceilingAndFloorItems(0.2)
if ceil.mean != 0.21 || floor.mean != 0.1 {
t.Errorf("Ceil should've shrunk. ceil=%v, floor=%v", ceil, floor)
}
s.Add(0.1999, 1)
ceil, floor = s.ceilingAndFloorItems(0.2)
if ceil.mean != 0.21 || floor.mean != 0.1999 {
t.Errorf("Floor should've shrunk. ceil=%v, floor=%v", ceil, floor)
}
ceil, floor = s.ceilingAndFloorItems(10)
if ceil.isValid() {
t.Errorf("Expected an invalid ceil. Got %v", ceil)
}
ceil, floor = s.ceilingAndFloorItems(0.0001)
if floor.isValid() {
t.Errorf("Expected an invalid floor. Got %v", floor)
}
m := float64(0.42)
s.Add(m, 1)
ceil, floor = s.ceilingAndFloorItems(m)
if ceil.mean != m || floor.mean != m {
t.Errorf("ceiling and floor of an existing item should be the item itself")
}
}
func TestAdjustLeftRight(t *testing.T) {
keys := []float64{1, 2, 3, 4, 9, 5, 6, 7, 8}
counts := []uint32{1, 2, 3, 4, 9, 5, 6, 7, 8}
s := summary{keys: keys, counts: counts}
s.adjustRight(4)
if !sort.Float64sAreSorted(s.keys) || s.counts[4] != 5 {
t.Errorf("adjustRight should have fixed the keys/counts state. %v %v", s.keys, s.counts)
}
keys = []float64{1, 2, 3, 4, 0, 5, 6, 7, 8}
counts = []uint32{1, 2, 3, 4, 0, 5, 6, 7, 8}
s = summary{keys: keys, counts: counts}
s.adjustLeft(4)
if !sort.Float64sAreSorted(s.keys) || s.counts[4] != 4 {
t.Errorf("adjustLeft should have fixed the keys/counts state. %v %v", s.keys, s.counts)
}
}

View File

@ -0,0 +1,245 @@
// Package tdigest provides a highly accurate mergeable data-structure
// for quantile estimation.
package tdigest
import (
"fmt"
"math"
"math/rand"
)
// TDigest is a quantile approximation data structure.
// Typical T-Digest use cases involve accumulating metrics on several
// distinct nodes of a cluster and then merging them together to get
// a system-wide quantile overview. Things such as: sensory data from
// IoT devices, quantiles over enormous document datasets (think
// ElasticSearch), performance metrics for distributed systems, etc.
type TDigest struct {
summary *summary
compression float64
count uint32
}
// New creates a new digest.
// The compression parameter rules the threshold in which samples are
// merged together - the more often distinct samples are merged the more
// precision is lost. Compression should be tuned according to your data
// distribution, but a value of 100 is often good enough. A higher
// compression value means holding more centroids in memory (thus: better
// precision), which means a bigger serialization payload and higher
// memory footprint.
// Compression must be a value greater of equal to 1, will panic
// otherwise.
func New(compression float64) *TDigest {
if compression < 1 {
panic("Compression must be >= 1.0")
}
return &TDigest{
compression: compression,
summary: newSummary(estimateCapacity(compression)),
count: 0,
}
}
// Quantile returns the desired percentile estimation.
// Values of p must be between 0 and 1 (inclusive), will panic otherwise.
func (t *TDigest) Quantile(q float64) float64 {
if q < 0 || q > 1 {
panic("q must be between 0 and 1 (inclusive)")
}
if t.summary.Len() == 0 {
return math.NaN()
} else if t.summary.Len() == 1 {
return t.summary.Min().mean
}
q *= float64(t.count)
var total float64
i := 0
found := false
var result float64
t.summary.Iterate(func(item centroid) bool {
k := float64(item.count)
if q < total+k {
if i == 0 || i+1 == t.summary.Len() {
result = item.mean
found = true
return false
}
succ, pred := t.summary.successorAndPredecessorItems(item.mean)
delta := (succ.mean - pred.mean) / 2
result = item.mean + ((q-total)/k-0.5)*delta
found = true
return false
}
i++
total += k
return true
})
if found {
return result
}
return t.summary.Max().mean
}
// Add registers a new sample in the digest.
// It's the main entry point for the digest and very likely the only
// method to be used for collecting samples. The count parameter is for
// when you are registering a sample that occurred multiple times - the
// most common value for this is 1.
func (t *TDigest) Add(value float64, count uint32) error {
if count == 0 {
return fmt.Errorf("Illegal datapoint <value: %.4f, count: %d>", value, count)
}
if t.summary.Len() == 0 {
t.summary.Add(value, count)
t.count = count
return nil
}
// Avoid allocation for our slice by using a local array here.
ar := [2]centroid{}
candidates := ar[:]
candidates[0], candidates[1] = t.findNearestCentroids(value)
if !candidates[1].isValid() {
candidates = candidates[:1]
}
for len(candidates) > 0 && count > 0 {
j := 0
if len(candidates) > 1 {
j = rand.Intn(len(candidates))
}
chosen := candidates[j]
quantile := t.computeCentroidQuantile(&chosen)
if float64(chosen.count+count) > t.threshold(quantile) {
candidates = append(candidates[:j], candidates[j+1:]...)
continue
}
t.summary.updateAt(chosen.index, value, uint32(count))
t.count += count
count = 0
}
if count > 0 {
t.summary.Add(value, count)
t.count += count
}
if float64(t.summary.Len()) > 20*t.compression {
t.Compress()
}
return nil
}
// Compress tries to reduce the number of individual centroids stored
// in the digest.
// Compression trades off accuracy for performance and happens
// automatically after a certain amount of distinct samples have been
// stored.
func (t *TDigest) Compress() {
if t.summary.Len() <= 1 {
return
}
oldTree := t.summary
t.summary = newSummary(estimateCapacity(t.compression))
t.count = 0
nodes := oldTree.Data()
shuffle(nodes)
for _, item := range nodes {
t.Add(item.mean, item.count)
}
}
// Merge joins a given digest into itself.
// Merging is useful when you have multiple TDigest instances running
// in separate threads and you want to compute quantiles over all the
// samples. This is particularly important on a scatter-gather/map-reduce
// scenario.
func (t *TDigest) Merge(other *TDigest) {
if other.summary.Len() == 0 {
return
}
nodes := other.summary.Data()
shuffle(nodes)
for _, item := range nodes {
t.Add(item.mean, item.count)
}
}
// Len returns the number of centroids in the TDigest.
func (t *TDigest) Len() int { return t.summary.Len() }
// ForEachCentroid calls the specified function for each centroid.
// Iteration stops when the supplied function returns false, or when all
// centroids have been iterated.
func (t *TDigest) ForEachCentroid(f func(mean float64, count uint32) bool) {
s := t.summary
for i := 0; i < s.Len(); i++ {
if !f(s.keys[i], s.counts[i]) {
break
}
}
}
func shuffle(data []centroid) {
for i := len(data) - 1; i > 1; i-- {
other := rand.Intn(i + 1)
tmp := data[other]
data[other] = data[i]
data[i] = tmp
}
}
func estimateCapacity(compression float64) uint {
return uint(compression) * 10
}
func (t *TDigest) threshold(q float64) float64 {
return (4 * float64(t.count) * q * (1 - q)) / t.compression
}
func (t *TDigest) computeCentroidQuantile(c *centroid) float64 {
cumSum := t.summary.sumUntilMean(c.mean)
return (float64(c.count)/2.0 + float64(cumSum)) / float64(t.count)
}
func (t *TDigest) findNearestCentroids(mean float64) (centroid, centroid) {
ceil, floor := t.summary.ceilingAndFloorItems(mean)
if !ceil.isValid() && !floor.isValid() {
panic("findNearestCentroids called on an empty tree")
}
if !ceil.isValid() {
return floor, invalidCentroid
}
if !floor.isValid() {
return ceil, invalidCentroid
}
if math.Abs(floor.mean-mean) < math.Abs(ceil.mean-mean) {
return floor, invalidCentroid
} else if math.Abs(floor.mean-mean) == math.Abs(ceil.mean-mean) && floor.mean != ceil.mean {
return floor, ceil
} else {
return ceil, invalidCentroid
}
}

View File

@ -0,0 +1,430 @@
package tdigest
import (
"math"
"math/rand"
"sort"
"testing"
)
// Test of tdigest internals and accuracy. Note no t.Parallel():
// during tests the default random seed is consistent, but varying
// concurrency scheduling mixes up the random values used in each test.
// Since there's a random number call inside tdigest this breaks repeatability
// for all tests. So, no test concurrency here.
func TestTInternals(t *testing.T) {
tdigest := New(100)
if !math.IsNaN(tdigest.Quantile(0.1)) {
t.Errorf("Quantile() on an empty digest should return NaN. Got: %.4f", tdigest.Quantile(0.1))
}
tdigest.Add(0.4, 1)
if tdigest.Quantile(0.1) != 0.4 {
t.Errorf("Quantile() on a single-sample digest should return the samples's mean. Got %.4f", tdigest.Quantile(0.1))
}
tdigest.Add(0.5, 1)
if tdigest.summary.Len() != 2 {
t.Errorf("Expected size 2, got %d", tdigest.summary.Len())
}
if tdigest.summary.Min().mean != 0.4 {
t.Errorf("Min() returned an unexpected centroid: %v", tdigest.summary.Min())
}
if tdigest.summary.Max().mean != 0.5 {
t.Errorf("Min() returned an unexpected centroid: %v", tdigest.summary.Min())
}
tdigest.Add(0.4, 2)
tdigest.Add(0.4, 3)
if tdigest.summary.Len() != 2 {
t.Errorf("Adding centroids of same mean shouldn't change size")
}
y := tdigest.summary.Find(0.4)
if y.count != 6 || y.mean != 0.4 {
t.Errorf("Adding centroids with same mean should increment the count only. Got %v", y)
}
err := tdigest.Add(0, 0)
if err == nil {
t.Errorf("Expected Add() to error out with input (0,0)")
}
if tdigest.Quantile(0.9999999) != tdigest.summary.Max().mean {
t.Errorf("High quantiles with little data should give out the MAX recorded mean")
}
if tdigest.Quantile(0.0000001) != tdigest.summary.Min().mean {
t.Errorf("Low quantiles with little data should give out the MIN recorded mean")
}
}
func assertDifferenceSmallerThan(tdigest *TDigest, p float64, m float64, t *testing.T) {
tp := tdigest.Quantile(p)
if math.Abs(tp-p) >= m {
t.Errorf("T-Digest.Quantile(%.4f) = %.4f. Diff (%.4f) >= %.4f", p, tp, math.Abs(tp-p), m)
}
}
func TestUniformDistribution(t *testing.T) {
tdigest := New(100)
for i := 0; i < 10000; i++ {
tdigest.Add(rand.Float64(), 1)
}
assertDifferenceSmallerThan(tdigest, 0.5, 0.02, t)
assertDifferenceSmallerThan(tdigest, 0.1, 0.01, t)
assertDifferenceSmallerThan(tdigest, 0.9, 0.01, t)
assertDifferenceSmallerThan(tdigest, 0.01, 0.005, t)
assertDifferenceSmallerThan(tdigest, 0.99, 0.005, t)
assertDifferenceSmallerThan(tdigest, 0.001, 0.001, t)
assertDifferenceSmallerThan(tdigest, 0.999, 0.001, t)
}
// Asserts quantile p is no greater than absolute m off from "true"
// fractional quantile for supplied data. So m must be scaled
// appropriately for source data range.
func assertDifferenceFromQuantile(data []float64, tdigest *TDigest, p float64, m float64, t *testing.T) {
q := quantile(p, data)
tp := tdigest.Quantile(p)
if math.Abs(tp-q) >= m {
t.Fatalf("T-Digest.Quantile(%.4f) = %.4f vs actual %.4f. Diff (%.4f) >= %.4f", p, tp, q, math.Abs(tp-q), m)
}
}
func TestSequentialInsertion(t *testing.T) {
tdigest := New(10)
data := make([]float64, 10000)
for i := 0; i < len(data); i++ {
data[i] = float64(i)
}
for i := 0; i < len(data); i++ {
tdigest.Add(data[i], 1)
assertDifferenceFromQuantile(data[:i+1], tdigest, 0.001, 1.0+0.001*float64(i), t)
assertDifferenceFromQuantile(data[:i+1], tdigest, 0.01, 1.0+0.005*float64(i), t)
assertDifferenceFromQuantile(data[:i+1], tdigest, 0.05, 1.0+0.01*float64(i), t)
assertDifferenceFromQuantile(data[:i+1], tdigest, 0.25, 1.0+0.03*float64(i), t)
assertDifferenceFromQuantile(data[:i+1], tdigest, 0.5, 1.0+0.03*float64(i), t)
assertDifferenceFromQuantile(data[:i+1], tdigest, 0.75, 1.0+0.03*float64(i), t)
assertDifferenceFromQuantile(data[:i+1], tdigest, 0.95, 1.0+0.01*float64(i), t)
assertDifferenceFromQuantile(data[:i+1], tdigest, 0.99, 1.0+0.005*float64(i), t)
assertDifferenceFromQuantile(data[:i+1], tdigest, 0.999, 1.0+0.001*float64(i), t)
}
}
func TestNonUniformDistribution(t *testing.T) {
tdigest := New(10)
// Not quite a uniform distribution, but close.
data := make([]float64, 1000)
for i := 0; i < 500; i++ {
data[i] = 700.0 + rand.Float64()*100.0
}
for i := 500; i < 750; i++ {
data[i] = 100.0 + rand.Float64()*100.0
}
for i := 750; i < 1000; i++ {
data[i] = 600.0 + rand.Float64()*10.0
}
for i := 0; i < len(data); i++ {
tdigest.Add(data[i], 1)
}
max := float64(len(data))
sort.Float64s(data)
assertDifferenceFromQuantile(data, tdigest, 0.001, 1.0+0.001*max, t)
assertDifferenceFromQuantile(data, tdigest, 0.01, 1.0+0.005*max, t)
assertDifferenceFromQuantile(data, tdigest, 0.05, 1.0+0.01*max, t)
assertDifferenceFromQuantile(data, tdigest, 0.25, 1.0+0.01*max, t)
assertDifferenceFromQuantile(data, tdigest, 0.5, 1.0+0.05*max, t)
assertDifferenceFromQuantile(data, tdigest, 0.75, 1.0+0.01*max, t)
assertDifferenceFromQuantile(data, tdigest, 0.95, 1.0+0.01*max, t)
assertDifferenceFromQuantile(data, tdigest, 0.99, 1.0+0.005*max, t)
assertDifferenceFromQuantile(data, tdigest, 0.999, 1.0+0.001*max, t)
}
func TestNonSequentialInsertion(t *testing.T) {
tdigest := New(10)
// Not quite a uniform distribution, but close.
data := make([]float64, 1000)
for i := 0; i < len(data); i++ {
tmp := (i * 1627) % len(data)
data[i] = float64(tmp)
}
sorted := make([]float64, 0, len(data))
for i := 0; i < len(data); i++ {
tdigest.Add(data[i], 1)
sorted = append(sorted, data[i])
// Estimated quantiles are all over the place for low counts, which is
// OK given that something like P99 is not very meaningful when there are
// 25 samples. To account for this, increase the error tolerance for
// smaller counts.
if i == 0 {
continue
}
max := float64(len(data))
fac := 1.0 + max/float64(i)
sort.Float64s(sorted)
assertDifferenceFromQuantile(sorted, tdigest, 0.001, fac+0.001*max, t)
assertDifferenceFromQuantile(sorted, tdigest, 0.01, fac+0.005*max, t)
assertDifferenceFromQuantile(sorted, tdigest, 0.05, fac+0.01*max, t)
assertDifferenceFromQuantile(sorted, tdigest, 0.25, fac+0.01*max, t)
assertDifferenceFromQuantile(sorted, tdigest, 0.5, fac+0.02*max, t)
assertDifferenceFromQuantile(sorted, tdigest, 0.75, fac+0.01*max, t)
assertDifferenceFromQuantile(sorted, tdigest, 0.95, fac+0.01*max, t)
assertDifferenceFromQuantile(sorted, tdigest, 0.99, fac+0.005*max, t)
assertDifferenceFromQuantile(sorted, tdigest, 0.999, fac+0.001*max, t)
}
}
func TestWeights(t *testing.T) {
tdigest := New(10)
// Create data slice with repeats matching weights we gave to tdigest
data := []float64{}
for i := 0; i < 100; i++ {
tdigest.Add(float64(i), uint32(i))
for j := 0; j < i; j++ {
data = append(data, float64(i))
}
}
assertDifferenceFromQuantile(data, tdigest, 0.001, 1.0+0.001*100.0, t)
assertDifferenceFromQuantile(data, tdigest, 0.01, 1.0+0.005*100.0, t)
assertDifferenceFromQuantile(data, tdigest, 0.05, 1.0+0.01*100.0, t)
assertDifferenceFromQuantile(data, tdigest, 0.25, 1.0+0.01*100.0, t)
assertDifferenceFromQuantile(data, tdigest, 0.5, 1.0+0.02*100.0, t)
assertDifferenceFromQuantile(data, tdigest, 0.75, 1.0+0.01*100.0, t)
assertDifferenceFromQuantile(data, tdigest, 0.95, 1.0+0.01*100.0, t)
assertDifferenceFromQuantile(data, tdigest, 0.99, 1.0+0.005*100.0, t)
assertDifferenceFromQuantile(data, tdigest, 0.999, 1.0+0.001*100.0, t)
}
func TestIntegers(t *testing.T) {
tdigest := New(100)
tdigest.Add(1, 1)
tdigest.Add(2, 1)
tdigest.Add(3, 1)
if tdigest.Quantile(0.5) != 2 {
t.Errorf("Expected p(0.5) = 2, Got %.2f instead", tdigest.Quantile(0.5))
}
tdigest = New(100)
for _, i := range []float64{1, 2, 2, 2, 2, 2, 2, 2, 3} {
tdigest.Add(i, 1)
}
if tdigest.Quantile(0.5) != 2 {
t.Errorf("Expected p(0.5) = 2, Got %.2f instead", tdigest.Quantile(0.5))
}
var tot uint32
tdigest.summary.Iterate(func(item centroid) bool {
tot += item.count
return true
})
if tot != 9 {
t.Errorf("Expected the centroid count to be 9, Got %d instead", tot)
}
}
func quantile(q float64, data []float64) float64 {
if len(data) == 0 {
return math.NaN()
}
if q == 1 || len(data) == 1 {
return data[len(data)-1]
}
index := q * (float64(len(data)) - 1)
return data[int(index)+1]*(index-float64(int(index))) + data[int(index)]*(float64(int(index)+1)-index)
}
func TestMerge(t *testing.T) {
if testing.Short() {
t.Skipf("Skipping merge test. Short flag is on")
}
const numItems = 10000
const numSubs = 5
data := make([]float64, numItems)
var subs [numSubs]*TDigest
dist1 := New(10)
for i := 0; i < numSubs; i++ {
subs[i] = New(10)
}
for i := 0; i < numItems; i++ {
num := rand.Float64()
data[i] = num
dist1.Add(num, 1)
for j := 0; j < numSubs; j++ {
subs[j].Add(num, 1)
}
}
dist2 := New(10)
for i := 0; i < numSubs; i++ {
dist2.Merge(subs[i])
}
// Merge empty. Should be no-op
dist2.Merge(New(10))
sort.Float64s(data)
for _, p := range []float64{0.001, 0.01, 0.1, 0.2, 0.3, 0.5} {
q := quantile(p, data)
p1 := dist1.Quantile(p)
p2 := dist2.Quantile(p)
e1 := math.Abs(p1 - q)
e2 := math.Abs(p1 - q)
if e2/p >= 0.3 {
t.Errorf("Relative error for %f above threshold. q=%f p1=%f p2=%f e1=%f e2=%f", p, q, p1, p2, e1, e2)
}
if e2 >= 0.015 {
t.Errorf("Absolute error for %f above threshold. q=%f p1=%f p2=%f e1=%f e2=%f", p, q, p1, p2, e1, e2)
}
}
}
func TestCompressDoesntChangeCount(t *testing.T) {
tdigest := New(100)
for i := 0; i < 1000; i++ {
tdigest.Add(rand.Float64(), 1)
}
initialCount := tdigest.count
tdigest.Compress()
if tdigest.count != initialCount {
t.Errorf("Compress() should not change count. Wanted %d, got %d", initialCount, tdigest.count)
}
}
func shouldPanic(f func(), t *testing.T, message string) {
defer func() {
tryRecover := recover()
if tryRecover == nil {
t.Errorf(message)
}
}()
f()
}
func TestPanic(t *testing.T) {
shouldPanic(func() {
New(0.5)
}, t, "Compression < 1 should panic!")
tdigest := New(100)
shouldPanic(func() {
tdigest.Quantile(-42)
}, t, "Quantile < 0 should panic!")
shouldPanic(func() {
tdigest.Quantile(42)
}, t, "Quantile > 1 should panic!")
shouldPanic(func() {
tdigest.findNearestCentroids(0.2)
}, t, "findNearestCentroids on empty summary should panic!")
}
func TestForEachCentroid(t *testing.T) {
t.Parallel()
tdigest := New(10)
for i := 0; i < 100; i++ {
tdigest.Add(float64(i), 1)
}
// Iterate limited number.
means := []float64{}
tdigest.ForEachCentroid(func(mean float64, count uint32) bool {
means = append(means, mean)
if len(means) == 3 {
return false
}
return true
})
if len(means) != 3 {
t.Errorf("ForEachCentroid handled incorrect number of data items")
}
// Iterate all datapoints.
means = []float64{}
tdigest.ForEachCentroid(func(mean float64, count uint32) bool {
means = append(means, mean)
return true
})
if len(means) != tdigest.Len() {
t.Errorf("ForEachCentroid did not handle all data")
}
}
func benchmarkAdd(compression float64, b *testing.B) {
t := New(compression)
data := make([]float64, b.N)
for n := 0; n < b.N; n++ {
data[n] = rand.Float64()
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
err := t.Add(data[n], 1)
if err != nil {
b.Error(err)
}
}
b.StopTimer()
}
func BenchmarkAdd1(b *testing.B) {
benchmarkAdd(1, b)
}
func BenchmarkAdd10(b *testing.B) {
benchmarkAdd(10, b)
}
func BenchmarkAdd100(b *testing.B) {
benchmarkAdd(100, b)
}