From 679c5892a451e7adfad00f8b90ec160a7d1c9b11 Mon Sep 17 00:00:00 2001 From: UlricQin Date: Wed, 27 Jan 2021 23:51:42 +0800 Subject: [PATCH] Revert "add Prometheus as a plugin for prober (#556)" (#558) This reverts commit 1dac755787e05be3c67799f76a68c3e9ed5e3b90. --- changelog | 6 - go.mod | 6 +- go.sum | 19 + src/common/address/address.go | 4 +- src/modules/monapi/plugins/.gitignore | 1 + src/modules/monapi/plugins/all/all.go | 1 - .../monapi/plugins/github/github_test.go | 13 - .../monapi/plugins/prometheus/prometheus.go | 94 ----- .../plugins/prometheus/prometheus/README.md | 171 -------- .../prometheus/prometheus/kubernetes.go | 237 ----------- .../prometheus/prometheus/kubernetes_test.go | 155 ------- .../plugins/prometheus/prometheus/parser.go | 320 -------------- .../prometheus/prometheus/parser_test.go | 167 -------- .../prometheus/prometheus/prometheus.go | 398 ------------------ .../prometheus/prometheus/prometheus_test.go | 236 ----------- .../plugins/prometheus/prometheus_test.go | 56 --- src/modules/monapi/plugins/util.go | 30 -- src/modules/prober/manager/accumulator.go | 258 ------------ src/modules/prober/manager/collectrule.go | 218 +++++++--- src/modules/prober/manager/manager.go | 20 +- 20 files changed, 188 insertions(+), 2222 deletions(-) create mode 100644 src/modules/monapi/plugins/.gitignore delete mode 100644 src/modules/monapi/plugins/github/github_test.go delete mode 100644 src/modules/monapi/plugins/prometheus/prometheus.go delete mode 100644 src/modules/monapi/plugins/prometheus/prometheus/README.md delete mode 100644 src/modules/monapi/plugins/prometheus/prometheus/kubernetes.go delete mode 100644 src/modules/monapi/plugins/prometheus/prometheus/kubernetes_test.go delete mode 100644 src/modules/monapi/plugins/prometheus/prometheus/parser.go delete mode 100644 src/modules/monapi/plugins/prometheus/prometheus/parser_test.go delete mode 100644 src/modules/monapi/plugins/prometheus/prometheus/prometheus.go delete mode 100644 src/modules/monapi/plugins/prometheus/prometheus/prometheus_test.go delete mode 100644 src/modules/monapi/plugins/prometheus/prometheus_test.go delete mode 100644 src/modules/prober/manager/accumulator.go diff --git a/changelog b/changelog index 22934d3c..93ccc57c 100644 --- a/changelog +++ b/changelog @@ -84,9 +84,3 @@ 升级方法: - 替换n9e-prober n9e-monapi二进制,升级pub下的前端资源文件 -3.5.2 -升级内容: - - prober模板支持匿名结构体,结构体嵌套 - - prober插件添加了对TLS的支持 - - 修复prober上报没有port的问题 - diff --git a/go.mod b/go.mod index 365dbfdb..531631b5 100644 --- a/go.mod +++ b/go.mod @@ -8,9 +8,7 @@ require ( github.com/codegangsta/negroni v1.0.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/dgryski/go-tsz v0.0.0-20180227144327-03b7d791f4fe - github.com/ericchiang/k8s v1.2.0 github.com/garyburd/redigo v1.6.2 - github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 github.com/gin-contrib/pprof v1.3.0 github.com/gin-gonic/gin v1.6.3 github.com/go-sql-driver/mysql v1.5.0 @@ -24,12 +22,9 @@ require ( github.com/m3db/m3 v0.15.17 github.com/mattn/go-isatty v0.0.12 github.com/mattn/go-sqlite3 v1.14.0 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.1 github.com/mojocn/base64Captcha v1.3.1 github.com/open-falcon/rrdlite v0.0.0-20200214140804-bf5829f786ad github.com/pquerna/cachecontrol v0.0.0-20200819021114-67c6ae64274f // indirect - github.com/prometheus/client_model v0.2.0 - github.com/prometheus/common v0.9.1 github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62 // indirect github.com/shirou/gopsutil v3.20.11+incompatible // indirect github.com/spaolacci/murmur3 v1.1.0 @@ -42,6 +37,7 @@ require ( go.uber.org/automaxprocs v1.3.0 // indirect golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d golang.org/x/text v0.3.3 + google.golang.org/protobuf v1.25.0 // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df gopkg.in/ldap.v3 v3.1.0 diff --git a/go.sum b/go.sum index 26d755b9..b6163a23 100644 --- a/go.sum +++ b/go.sum @@ -403,6 +403,13 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= github.com/golang/protobuf v1.3.5 h1:F768QJ1E9tib+q5Sc8MkdJi1RxLTbRcTf8LJV56aRls= github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= @@ -415,6 +422,7 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM= @@ -1310,6 +1318,8 @@ google.golang.org/genproto v0.0.0-20200212174721-66ed5ce911ce/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200305110556-506484158171/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200317114155-1f3552e48f24 h1:IGPykv426z7LZSVPlaPufOyphngM4at5uZ7x5alaFvE= google.golang.org/genproto v0.0.0-20200317114155-1f3552e48f24/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -1328,6 +1338,15 @@ google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= google.golang.org/grpc v1.29.1 h1:EC2SB8S04d2r73uptxphDSUG+kTKVgjRPF+N3xpxRB4= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc h1:2gGKlE2+asNV9m7xrywl36YYNnBG5ZQ0r/BOOxqPpmk= diff --git a/src/common/address/address.go b/src/common/address/address.go index 73606c73..8d250818 100644 --- a/src/common/address/address.go +++ b/src/common/address/address.go @@ -46,7 +46,7 @@ func convPort(module, listen, portType string) int { } port, err := strconv.Atoi(strings.Split(listen, splitChar)[1]) if err != nil { - fmt.Printf("address: %s.%s invalid", module, portType) + fmt.Printf("%s.%s invalid", module, portType) os.Exit(1) } @@ -101,7 +101,7 @@ func getMod(modKey string) Module { mod, has := mods[modKey] if !has { - fmt.Printf("address: module(%s) configuration section not found", modKey) + fmt.Printf("module(%s) configuration section not found", modKey) os.Exit(1) } diff --git a/src/modules/monapi/plugins/.gitignore b/src/modules/monapi/plugins/.gitignore new file mode 100644 index 00000000..6dd5e90c --- /dev/null +++ b/src/modules/monapi/plugins/.gitignore @@ -0,0 +1 @@ +prometheus/ diff --git a/src/modules/monapi/plugins/all/all.go b/src/modules/monapi/plugins/all/all.go index 80b79154..0bbea17d 100644 --- a/src/modules/monapi/plugins/all/all.go +++ b/src/modules/monapi/plugins/all/all.go @@ -7,7 +7,6 @@ import ( _ "github.com/didi/nightingale/src/modules/monapi/plugins/github" _ "github.com/didi/nightingale/src/modules/monapi/plugins/mongodb" _ "github.com/didi/nightingale/src/modules/monapi/plugins/mysql" - _ "github.com/didi/nightingale/src/modules/monapi/plugins/prometheus" _ "github.com/didi/nightingale/src/modules/monapi/plugins/redis" // local diff --git a/src/modules/monapi/plugins/github/github_test.go b/src/modules/monapi/plugins/github/github_test.go deleted file mode 100644 index 0a04a64e..00000000 --- a/src/modules/monapi/plugins/github/github_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package github - -import ( - "testing" - - "github.com/didi/nightingale/src/modules/monapi/plugins" -) - -func TestCollect(t *testing.T) { - plugins.PluginTest(t, &GitHubRule{ - Repositories: []string{"didi/nightingale"}, - }) -} diff --git a/src/modules/monapi/plugins/prometheus/prometheus.go b/src/modules/monapi/plugins/prometheus/prometheus.go deleted file mode 100644 index 75526f55..00000000 --- a/src/modules/monapi/plugins/prometheus/prometheus.go +++ /dev/null @@ -1,94 +0,0 @@ -package prometheus - -import ( - "fmt" - "time" - - "github.com/didi/nightingale/src/modules/monapi/collector" - "github.com/didi/nightingale/src/modules/monapi/plugins" - "github.com/didi/nightingale/src/modules/monapi/plugins/prometheus/prometheus" - "github.com/didi/nightingale/src/toolkits/i18n" - "github.com/influxdata/telegraf" -) - -func init() { - collector.CollectorRegister(NewPrometheusCollector()) // for monapi - i18n.DictRegister(langDict) -} - -var ( - langDict = map[string]map[string]string{ - "zh": map[string]string{ - "URLs": "网址", - "An array of urls to scrape metrics from": "采集数据的网址", - "URL Tag": "网址标签", - "Url tag name (tag containing scrapped url. optional, default is \"url\")": "url 标签名称,默认值 \"url\"", - "An array of Kubernetes services to scrape metrics from": "采集kube服务的地址", - "Kubernetes config file contenct to create client from": "kube config 文件内容,用来连接kube服务", - "Use bearer token for authorization. ('bearer_token' takes priority)": "用户的Bearer令牌,优先级高于 username/password", - "HTTP Basic Authentication username": "HTTP认证用户名", - "HTTP Basic Authentication password": "HTTP认证密码", - "RESP Timeout": "请求超时时间", - "Specify timeout duration for slower prometheus clients": "k8s请求超时时间, 单位: 秒", - }, - } -) - -type PrometheusCollector struct { - *collector.BaseCollector -} - -func NewPrometheusCollector() *PrometheusCollector { - return &PrometheusCollector{BaseCollector: collector.NewBaseCollector( - "prometheus", - collector.RemoteCategory, - func() collector.TelegrafPlugin { return &PrometheusRule{} }, - )} -} - -type PrometheusRule struct { - URLs []string `label:"URLs" json:"urls,required" description:"An array of urls to scrape metrics from" example:"http://my-service-exporter:8080/metrics"` - // URLTag string `label:"URL Tag" json:"url_tag" description:"Url tag name (tag containing scrapped url. optional, default is \"url\")" example:"scrapeUrl"` - // KubernetesServices []string `label:"Kube Services" json:"kubernetes_services" description:"An array of Kubernetes services to scrape metrics from" example:"http://my-service-dns.my-namespace:9100/metrics"` - // KubeConfigContent string `label:"Kube Conf" json:"kube_config_content" format:"file" description:"Kubernetes config file contenct to create client from"` - // MonitorPods bool `label:"Monitor Pods" json:"monitor_kubernetes_pods" description:"Scrape Kubernetes pods for the following prometheus annotations:
- prometheus.io/scrape: Enable scraping for this pod
- prometheus.io/scheme: If the metrics endpoint is secured then you will need to
set this to 'https' & most likely set the tls config.
- prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
- prometheus.io/port: If port is not 9102 use this annotation"` - // PodNamespace string `label:"Pod Namespace" json:"monitor_kubernetes_pods_namespace" description:"Restricts Kubernetes monitoring to a single namespace" example:"default"` - // KubernetesLabelSelector string `label:"Kube Label Selector" json:"kubernetes_label_selector" description:"label selector to target pods which have the label" example:"env=dev,app=nginx"` - // KubernetesFieldSelector string `label:"Kube Field Selector" json:"kubernetes_field_selector" description:"field selector to target pods
eg. To scrape pods on a specific node" example:"spec.nodeName=$HOSTNAME"` - // BearerTokenString string `label:"Bearer Token" json:"bearer_token_string" format:"file" description:"Use bearer token for authorization. ('bearer_token' takes priority)"` - // Username string `label:"Username" json:"username" description:"HTTP Basic Authentication username"` - // Password string `label:"Password" json:"password" format:"password" description:"HTTP Basic Authentication password"` - ResponseTimeout int `label:"RESP Timeout" json:"response_timeout" default:"3" description:"Specify timeout duration for slower prometheus clients"` - plugins.ClientConfig -} - -func (p *PrometheusRule) Validate() error { - if len(p.URLs) == 0 || p.URLs[0] == "" { - return fmt.Errorf(" prometheus.rule unable to get urls") - } - return nil -} - -func (p *PrometheusRule) TelegrafInput() (telegraf.Input, error) { - if err := p.Validate(); err != nil { - return nil, err - } - - return &prometheus.Prometheus{ - URLs: p.URLs, - URLTag: "target", - // KubernetesServices: p.KubernetesServices, - // KubeConfigContent: p.KubeConfigContent, - // MonitorPods: p.MonitorPods, - // PodNamespace: p.PodNamespace, - // KubernetesLabelSelector: p.KubernetesLabelSelector, - // KubernetesFieldSelector: p.KubernetesFieldSelector, - // BearerTokenString: p.BearerTokenString, - // Username: p.Username, - // Password: p.Password, - ResponseTimeout: time.Second * time.Duration(p.ResponseTimeout), - MetricVersion: 2, - Log: plugins.GetLogger(), - ClientConfig: p.ClientConfig.TlsClientConfig(), - }, nil -} diff --git a/src/modules/monapi/plugins/prometheus/prometheus/README.md b/src/modules/monapi/plugins/prometheus/prometheus/README.md deleted file mode 100644 index e9dd119c..00000000 --- a/src/modules/monapi/plugins/prometheus/prometheus/README.md +++ /dev/null @@ -1,171 +0,0 @@ -# Prometheus Input Plugin - -The prometheus input plugin gathers metrics from HTTP servers exposing metrics -in Prometheus format. - -### Configuration: - -```toml -# Read metrics from one or many prometheus clients -[[inputs.prometheus]] - ## An array of urls to scrape metrics from. - urls = ["http://localhost:9100/metrics"] - - ## Metric version controls the mapping from Prometheus metrics into - ## Telegraf metrics. When using the prometheus_client output, use the same - ## value in both plugins to ensure metrics are round-tripped without - ## modification. - ## - ## example: metric_version = 1; deprecated in 1.13 - ## metric_version = 2; recommended version - # metric_version = 1 - - ## An array of Kubernetes services to scrape metrics from. - # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] - - ## Kubernetes config file to create client from. - # kube_config = "/path/to/kubernetes.config" - - ## Scrape Kubernetes pods for the following prometheus annotations: - ## - prometheus.io/scrape: Enable scraping for this pod - ## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to - ## set this to `https` & most likely set the tls config. - ## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. - ## - prometheus.io/port: If port is not 9102 use this annotation - # monitor_kubernetes_pods = true - ## Restricts Kubernetes monitoring to a single namespace - ## ex: monitor_kubernetes_pods_namespace = "default" - # monitor_kubernetes_pods_namespace = "" - # label selector to target pods which have the label - # kubernetes_label_selector = "env=dev,app=nginx" - # field selector to target pods - # eg. To scrape pods on a specific node - # kubernetes_field_selector = "spec.nodeName=$HOSTNAME" - - ## Use bearer token for authorization. ('bearer_token' takes priority) - # bearer_token = "/path/to/bearer/token" - ## OR - # bearer_token_string = "abc_123" - - ## HTTP Basic Authentication username and password. ('bearer_token' and - ## 'bearer_token_string' take priority) - # username = "" - # password = "" - - ## Specify timeout duration for slower prometheus clients (default is 3s) - # response_timeout = "3s" - - ## Optional TLS Config - # tls_ca = /path/to/cafile - # tls_cert = /path/to/certfile - # tls_key = /path/to/keyfile - ## Use TLS but skip chain & host verification - # insecure_skip_verify = false -``` - -`urls` can contain a unix socket as well. If a different path is required (default is `/metrics` for both http[s] and unix) for a unix socket, add `path` as a query parameter as follows: `unix:///var/run/prometheus.sock?path=/custom/metrics` - -#### Kubernetes Service Discovery - -URLs listed in the `kubernetes_services` parameter will be expanded -by looking up all A records assigned to the hostname as described in -[Kubernetes DNS service discovery](https://kubernetes.io/docs/concepts/services-networking/service/#dns). - -This method can be used to locate all -[Kubernetes headless services](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services). - -#### Kubernetes scraping - -Enabling this option will allow the plugin to scrape for prometheus annotation on Kubernetes -pods. Currently, you can run this plugin in your kubernetes cluster, or we use the kubeconfig -file to determine where to monitor. -Currently the following annotation are supported: - -* `prometheus.io/scrape` Enable scraping for this pod. -* `prometheus.io/scheme` If the metrics endpoint is secured then you will need to set this to `https` & most likely set the tls config. (default 'http') -* `prometheus.io/path` Override the path for the metrics endpoint on the service. (default '/metrics') -* `prometheus.io/port` Used to override the port. (default 9102) - -Using the `monitor_kubernetes_pods_namespace` option allows you to limit which pods you are scraping. - -#### Bearer Token - -If set, the file specified by the `bearer_token` parameter will be read on -each interval and its contents will be appended to the Bearer string in the -Authorization header. - -### Usage for Caddy HTTP server - -If you want to monitor Caddy, you need to use Caddy with its Prometheus plugin: - -* Download Caddy+Prometheus plugin [here](https://caddyserver.com/download/linux/amd64?plugins=http.prometheus) -* Add the `prometheus` directive in your `CaddyFile` -* Restart Caddy -* Configure Telegraf to fetch metrics on it: - -```toml -[[inputs.prometheus]] -# ## An array of urls to scrape metrics from. - urls = ["http://localhost:9180/metrics"] -``` - -> This is the default URL where Caddy Prometheus plugin will send data. -> For more details, please read the [Caddy Prometheus documentation](https://github.com/miekg/caddy-prometheus/blob/master/README.md). - -### Metrics: - -Measurement names are based on the Metric Family and tags are created for each -label. The value is added to a field named based on the metric type. - -All metrics receive the `url` tag indicating the related URL specified in the -Telegraf configuration. If using Kubernetes service discovery the `address` -tag is also added indicating the discovered ip address. - -### Example Output: - -**Source** -``` -# HELP go_gc_duration_seconds A summary of the GC invocation durations. -# TYPE go_gc_duration_seconds summary -go_gc_duration_seconds{quantile="0"} 7.4545e-05 -go_gc_duration_seconds{quantile="0.25"} 7.6999e-05 -go_gc_duration_seconds{quantile="0.5"} 0.000277935 -go_gc_duration_seconds{quantile="0.75"} 0.000706591 -go_gc_duration_seconds{quantile="1"} 0.000706591 -go_gc_duration_seconds_sum 0.00113607 -go_gc_duration_seconds_count 4 -# HELP go_goroutines Number of goroutines that currently exist. -# TYPE go_goroutines gauge -go_goroutines 15 -# HELP cpu_usage_user Telegraf collected metric -# TYPE cpu_usage_user gauge -cpu_usage_user{cpu="cpu0"} 1.4112903225816156 -cpu_usage_user{cpu="cpu1"} 0.702106318955865 -cpu_usage_user{cpu="cpu2"} 2.0161290322588776 -cpu_usage_user{cpu="cpu3"} 1.5045135406226022 -``` - -**Output** -``` -go_gc_duration_seconds,url=http://example.org:9273/metrics 1=0.001336611,count=14,sum=0.004527551,0=0.000057965,0.25=0.000083812,0.5=0.000286537,0.75=0.000365303 1505776733000000000 -go_goroutines,url=http://example.org:9273/metrics gauge=21 1505776695000000000 -cpu_usage_user,cpu=cpu0,url=http://example.org:9273/metrics gauge=1.513622603430151 1505776751000000000 -cpu_usage_user,cpu=cpu1,url=http://example.org:9273/metrics gauge=5.829145728641773 1505776751000000000 -cpu_usage_user,cpu=cpu2,url=http://example.org:9273/metrics gauge=2.119071644805144 1505776751000000000 -cpu_usage_user,cpu=cpu3,url=http://example.org:9273/metrics gauge=1.5228426395944945 1505776751000000000 -``` - -**Output (when metric_version = 2)** -``` -prometheus,quantile=1,url=http://example.org:9273/metrics go_gc_duration_seconds=0.005574303 1556075100000000000 -prometheus,quantile=0.75,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0001046 1556075100000000000 -prometheus,quantile=0.5,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000719 1556075100000000000 -prometheus,quantile=0.25,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000579 1556075100000000000 -prometheus,quantile=0,url=http://example.org:9273/metrics go_gc_duration_seconds=0.0000349 1556075100000000000 -prometheus,url=http://example.org:9273/metrics go_gc_duration_seconds_count=324,go_gc_duration_seconds_sum=0.091340353 1556075100000000000 -prometheus,url=http://example.org:9273/metrics go_goroutines=15 1556075100000000000 -prometheus,cpu=cpu0,url=http://example.org:9273/metrics cpu_usage_user=1.513622603430151 1505776751000000000 -prometheus,cpu=cpu1,url=http://example.org:9273/metrics cpu_usage_user=5.829145728641773 1505776751000000000 -prometheus,cpu=cpu2,url=http://example.org:9273/metrics cpu_usage_user=2.119071644805144 1505776751000000000 -prometheus,cpu=cpu3,url=http://example.org:9273/metrics cpu_usage_user=1.5228426395944945 1505776751000000000 -``` diff --git a/src/modules/monapi/plugins/prometheus/prometheus/kubernetes.go b/src/modules/monapi/plugins/prometheus/prometheus/kubernetes.go deleted file mode 100644 index 93f6c7e4..00000000 --- a/src/modules/monapi/plugins/prometheus/prometheus/kubernetes.go +++ /dev/null @@ -1,237 +0,0 @@ -package prometheus - -import ( - "context" - "log" - "net" - "net/url" - "sync" - "time" - - "github.com/ericchiang/k8s" - corev1 "github.com/ericchiang/k8s/apis/core/v1" - "github.com/ghodss/yaml" -) - -type payload struct { - eventype string - pod *corev1.Pod -} - -// loadClient parses a kubeconfig from a file and returns a Kubernetes -// client. It does not support extensions or client auth providers. -func loadClient(kubeconfig string) (*k8s.Client, error) { - // data, err := ioutil.ReadFile(kubeconfigPath) - // if err != nil { - // return nil, fmt.Errorf("failed reading '%s': %v", kubeconfigPath, err) - // } - - // Unmarshal YAML into a Kubernetes config object. - var config k8s.Config - if err := yaml.Unmarshal([]byte(kubeconfig), &config); err != nil { - return nil, err - } - return k8s.NewClient(&config) -} - -func (p *Prometheus) start(ctx context.Context) error { - client, err := k8s.NewInClusterClient() - if err != nil { - // u, err := user.Current() - // if err != nil { - // return fmt.Errorf("Failed to get current user - %v", err) - // } - - // configLocation := filepath.Join(u.HomeDir, ".kube/config") - // if p.KubeConfig != "" { - // configLocation = p.KubeConfig - // } - client, err = loadClient(p.KubeConfigContent) - if err != nil { - return err - } - } - - p.wg = sync.WaitGroup{} - - p.wg.Add(1) - go func() { - defer p.wg.Done() - for { - select { - case <-ctx.Done(): - return - case <-time.After(time.Second): - err := p.watch(ctx, client) - if err != nil { - p.Log.Errorf("Unable to watch resources: %s", err.Error()) - } - } - } - }() - - return nil -} - -// An edge case exists if a pod goes offline at the same time a new pod is created -// (without the scrape annotations). K8s may re-assign the old pod ip to the non-scrape -// pod, causing errors in the logs. This is only true if the pod going offline is not -// directed to do so by K8s. -func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error { - - selectors := podSelector(p) - - pod := &corev1.Pod{} - watcher, err := client.Watch(ctx, p.PodNamespace, &corev1.Pod{}, selectors...) - if err != nil { - return err - } - defer watcher.Close() - - for { - select { - case <-ctx.Done(): - return nil - default: - pod = &corev1.Pod{} - // An error here means we need to reconnect the watcher. - eventType, err := watcher.Next(pod) - if err != nil { - return err - } - - // If the pod is not "ready", there will be no ip associated with it. - if pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] != "true" || - !podReady(pod.Status.GetContainerStatuses()) { - continue - } - - switch eventType { - case k8s.EventAdded: - registerPod(pod, p) - case k8s.EventModified: - // To avoid multiple actions for each event, unregister on the first event - // in the delete sequence, when the containers are still "ready". - if pod.Metadata.GetDeletionTimestamp() != nil { - unregisterPod(pod, p) - } else { - registerPod(pod, p) - } - } - } - } -} - -func podReady(statuss []*corev1.ContainerStatus) bool { - if len(statuss) == 0 { - return false - } - for _, cs := range statuss { - if !cs.GetReady() { - return false - } - } - return true -} - -func podSelector(p *Prometheus) []k8s.Option { - options := []k8s.Option{} - - if len(p.KubernetesLabelSelector) > 0 { - options = append(options, k8s.QueryParam("labelSelector", p.KubernetesLabelSelector)) - } - - if len(p.KubernetesFieldSelector) > 0 { - options = append(options, k8s.QueryParam("fieldSelector", p.KubernetesFieldSelector)) - } - - return options - -} - -func registerPod(pod *corev1.Pod, p *Prometheus) { - if p.kubernetesPods == nil { - p.kubernetesPods = map[string]URLAndAddress{} - } - targetURL := getScrapeURL(pod) - if targetURL == nil { - return - } - - log.Printf("D! [inputs.prometheus] will scrape metrics from %q", *targetURL) - // add annotation as metrics tags - tags := pod.GetMetadata().GetAnnotations() - if tags == nil { - tags = map[string]string{} - } - tags["pod_name"] = pod.GetMetadata().GetName() - tags["namespace"] = pod.GetMetadata().GetNamespace() - // add labels as metrics tags - for k, v := range pod.GetMetadata().GetLabels() { - tags[k] = v - } - URL, err := url.Parse(*targetURL) - if err != nil { - log.Printf("E! [inputs.prometheus] could not parse URL %q: %s", *targetURL, err.Error()) - return - } - podURL := p.AddressToURL(URL, URL.Hostname()) - p.lock.Lock() - p.kubernetesPods[podURL.String()] = URLAndAddress{ - URL: podURL, - Address: URL.Hostname(), - OriginalURL: URL, - Tags: tags, - } - p.lock.Unlock() -} - -func getScrapeURL(pod *corev1.Pod) *string { - ip := pod.Status.GetPodIP() - if ip == "" { - // return as if scrape was disabled, we will be notified again once the pod - // has an IP - return nil - } - - scheme := pod.GetMetadata().GetAnnotations()["prometheus.io/scheme"] - path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"] - port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"] - - if scheme == "" { - scheme = "http" - } - if port == "" { - port = "9102" - } - if path == "" { - path = "/metrics" - } - - u := &url.URL{ - Scheme: scheme, - Host: net.JoinHostPort(ip, port), - Path: path, - } - - x := u.String() - - return &x -} - -func unregisterPod(pod *corev1.Pod, p *Prometheus) { - url := getScrapeURL(pod) - if url == nil { - return - } - - log.Printf("D! [inputs.prometheus] registered a delete request for %q in namespace %q", - pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace()) - - p.lock.Lock() - defer p.lock.Unlock() - if _, ok := p.kubernetesPods[*url]; ok { - delete(p.kubernetesPods, *url) - log.Printf("D! [inputs.prometheus] will stop scraping for %q", *url) - } -} diff --git a/src/modules/monapi/plugins/prometheus/prometheus/kubernetes_test.go b/src/modules/monapi/plugins/prometheus/prometheus/kubernetes_test.go deleted file mode 100644 index 8568ac94..00000000 --- a/src/modules/monapi/plugins/prometheus/prometheus/kubernetes_test.go +++ /dev/null @@ -1,155 +0,0 @@ -package prometheus - -import ( - "github.com/ericchiang/k8s" - "testing" - - "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" - - v1 "github.com/ericchiang/k8s/apis/core/v1" - metav1 "github.com/ericchiang/k8s/apis/meta/v1" -) - -func TestScrapeURLNoAnnotations(t *testing.T) { - p := &v1.Pod{Metadata: &metav1.ObjectMeta{}} - p.GetMetadata().Annotations = map[string]string{} - url := getScrapeURL(p) - assert.Nil(t, url) -} - -func TestScrapeURLAnnotationsNoScrape(t *testing.T) { - p := &v1.Pod{Metadata: &metav1.ObjectMeta{}} - p.Metadata.Name = str("myPod") - p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "false"} - url := getScrapeURL(p) - assert.Nil(t, url) -} - -func TestScrapeURLAnnotations(t *testing.T) { - p := pod() - p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} - url := getScrapeURL(p) - assert.Equal(t, "http://127.0.0.1:9102/metrics", *url) -} - -func TestScrapeURLAnnotationsCustomPort(t *testing.T) { - p := pod() - p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/port": "9000"} - url := getScrapeURL(p) - assert.Equal(t, "http://127.0.0.1:9000/metrics", *url) -} - -func TestScrapeURLAnnotationsCustomPath(t *testing.T) { - p := pod() - p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "mymetrics"} - url := getScrapeURL(p) - assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) -} - -func TestScrapeURLAnnotationsCustomPathWithSep(t *testing.T) { - p := pod() - p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true", "prometheus.io/path": "/mymetrics"} - url := getScrapeURL(p) - assert.Equal(t, "http://127.0.0.1:9102/mymetrics", *url) -} - -func TestAddPod(t *testing.T) { - prom := &Prometheus{Log: testutil.Logger{}} - - p := pod() - p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} - registerPod(p, prom) - assert.Equal(t, 1, len(prom.kubernetesPods)) -} - -func TestAddMultipleDuplicatePods(t *testing.T) { - prom := &Prometheus{Log: testutil.Logger{}} - - p := pod() - p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} - registerPod(p, prom) - p.Metadata.Name = str("Pod2") - registerPod(p, prom) - assert.Equal(t, 1, len(prom.kubernetesPods)) -} - -func TestAddMultiplePods(t *testing.T) { - prom := &Prometheus{Log: testutil.Logger{}} - - p := pod() - p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} - registerPod(p, prom) - p.Metadata.Name = str("Pod2") - p.Status.PodIP = str("127.0.0.2") - registerPod(p, prom) - assert.Equal(t, 2, len(prom.kubernetesPods)) -} - -func TestDeletePods(t *testing.T) { - prom := &Prometheus{Log: testutil.Logger{}} - - p := pod() - p.Metadata.Annotations = map[string]string{"prometheus.io/scrape": "true"} - registerPod(p, prom) - unregisterPod(p, prom) - assert.Equal(t, 0, len(prom.kubernetesPods)) -} - -func TestPodSelector(t *testing.T) { - - cases := []struct { - expected []k8s.Option - labelselector string - fieldselector string - }{ - { - expected: []k8s.Option{ - k8s.QueryParam("labelSelector", "key1=val1,key2=val2,key3"), - k8s.QueryParam("fieldSelector", "spec.nodeName=ip-1-2-3-4.acme.com"), - }, - labelselector: "key1=val1,key2=val2,key3", - fieldselector: "spec.nodeName=ip-1-2-3-4.acme.com", - }, - { - expected: []k8s.Option{ - k8s.QueryParam("labelSelector", "key1"), - k8s.QueryParam("fieldSelector", "spec.nodeName=ip-1-2-3-4.acme.com"), - }, - labelselector: "key1", - fieldselector: "spec.nodeName=ip-1-2-3-4.acme.com", - }, - { - expected: []k8s.Option{ - k8s.QueryParam("labelSelector", "key1"), - k8s.QueryParam("fieldSelector", "somefield"), - }, - labelselector: "key1", - fieldselector: "somefield", - }, - } - - for _, c := range cases { - prom := &Prometheus{ - Log: testutil.Logger{}, - KubernetesLabelSelector: c.labelselector, - KubernetesFieldSelector: c.fieldselector, - } - - output := podSelector(prom) - - assert.Equal(t, len(output), len(c.expected)) - } -} - -func pod() *v1.Pod { - p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}, Spec: &v1.PodSpec{}} - p.Status.PodIP = str("127.0.0.1") - p.Metadata.Name = str("myPod") - p.Metadata.Namespace = str("default") - return p -} - -func str(x string) *string { - return &x -} diff --git a/src/modules/monapi/plugins/prometheus/prometheus/parser.go b/src/modules/monapi/plugins/prometheus/prometheus/parser.go deleted file mode 100644 index 0726c877..00000000 --- a/src/modules/monapi/plugins/prometheus/prometheus/parser.go +++ /dev/null @@ -1,320 +0,0 @@ -package prometheus - -// Parser inspired from -// https://github.com/prometheus/prom2json/blob/master/main.go - -import ( - "bufio" - "bytes" - "fmt" - "io" - "math" - "mime" - "net/http" - "time" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/metric" - "github.com/matttproud/golang_protobuf_extensions/pbutil" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" -) - -// Parse returns a slice of Metrics from a text representation of a -// metrics -func ParseV2(buf []byte, header http.Header) ([]telegraf.Metric, error) { - var metrics []telegraf.Metric - var parser expfmt.TextParser - // parse even if the buffer begins with a newline - buf = bytes.TrimPrefix(buf, []byte("\n")) - // Read raw data - buffer := bytes.NewBuffer(buf) - reader := bufio.NewReader(buffer) - - mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type")) - // Prepare output - metricFamilies := make(map[string]*dto.MetricFamily) - - if err == nil && mediatype == "application/vnd.google.protobuf" && - params["encoding"] == "delimited" && - params["proto"] == "io.prometheus.client.MetricFamily" { - for { - mf := &dto.MetricFamily{} - if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil { - if ierr == io.EOF { - break - } - return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr) - } - metricFamilies[mf.GetName()] = mf - } - } else { - metricFamilies, err = parser.TextToMetricFamilies(reader) - if err != nil { - return nil, fmt.Errorf("reading text format failed: %s", err) - } - } - - // make sure all metrics have a consistent timestamp so that metrics don't straddle two different seconds - now := time.Now() - // read metrics - for metricName, mf := range metricFamilies { - for _, m := range mf.Metric { - // reading tags - tags := makeLabels(m) - - if mf.GetType() == dto.MetricType_SUMMARY { - // summary metric - telegrafMetrics := makeQuantilesV2(m, tags, metricName, mf.GetType(), now) - metrics = append(metrics, telegrafMetrics...) - } else if mf.GetType() == dto.MetricType_HISTOGRAM { - // histogram metric - telegrafMetrics := makeBucketsV2(m, tags, metricName, mf.GetType(), now) - metrics = append(metrics, telegrafMetrics...) - } else { - // standard metric - // reading fields - fields := getNameAndValueV2(m, metricName) - // converting to telegraf metric - if len(fields) > 0 { - var t time.Time - if m.TimestampMs != nil && *m.TimestampMs > 0 { - t = time.Unix(0, *m.TimestampMs*1000000) - } else { - t = now - } - metric, err := metric.New("prometheus", tags, fields, t, valueType(mf.GetType())) - if err == nil { - metrics = append(metrics, metric) - } - } - } - } - } - - return metrics, err -} - -// Get Quantiles for summary metric & Buckets for histogram -func makeQuantilesV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric { - var metrics []telegraf.Metric - fields := make(map[string]interface{}) - var t time.Time - if m.TimestampMs != nil && *m.TimestampMs > 0 { - t = time.Unix(0, *m.TimestampMs*1000000) - } else { - t = now - } - fields[metricName+"_count"] = float64(m.GetSummary().GetSampleCount()) - fields[metricName+"_sum"] = float64(m.GetSummary().GetSampleSum()) - met, err := metric.New("prometheus", tags, fields, t, valueType(metricType)) - if err == nil { - metrics = append(metrics, met) - } - - for _, q := range m.GetSummary().Quantile { - newTags := tags - fields = make(map[string]interface{}) - - newTags["quantile"] = fmt.Sprint(q.GetQuantile()) - fields[metricName] = float64(q.GetValue()) - - quantileMetric, err := metric.New("prometheus", newTags, fields, t, valueType(metricType)) - if err == nil { - metrics = append(metrics, quantileMetric) - } - } - return metrics -} - -// Get Buckets from histogram metric -func makeBucketsV2(m *dto.Metric, tags map[string]string, metricName string, metricType dto.MetricType, now time.Time) []telegraf.Metric { - var metrics []telegraf.Metric - fields := make(map[string]interface{}) - var t time.Time - if m.TimestampMs != nil && *m.TimestampMs > 0 { - t = time.Unix(0, *m.TimestampMs*1000000) - } else { - t = now - } - fields[metricName+"_count"] = float64(m.GetHistogram().GetSampleCount()) - fields[metricName+"_sum"] = float64(m.GetHistogram().GetSampleSum()) - - met, err := metric.New("prometheus", tags, fields, t, valueType(metricType)) - if err == nil { - metrics = append(metrics, met) - } - - for _, b := range m.GetHistogram().Bucket { - newTags := tags - fields = make(map[string]interface{}) - newTags["le"] = fmt.Sprint(b.GetUpperBound()) - fields[metricName+"_bucket"] = float64(b.GetCumulativeCount()) - - histogramMetric, err := metric.New("prometheus", newTags, fields, t, valueType(metricType)) - if err == nil { - metrics = append(metrics, histogramMetric) - } - } - return metrics -} - -// Parse returns a slice of Metrics from a text representation of a -// metrics -func Parse(buf []byte, header http.Header) ([]telegraf.Metric, error) { - var metrics []telegraf.Metric - var parser expfmt.TextParser - // parse even if the buffer begins with a newline - buf = bytes.TrimPrefix(buf, []byte("\n")) - // Read raw data - buffer := bytes.NewBuffer(buf) - reader := bufio.NewReader(buffer) - - mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type")) - // Prepare output - metricFamilies := make(map[string]*dto.MetricFamily) - - if err == nil && mediatype == "application/vnd.google.protobuf" && - params["encoding"] == "delimited" && - params["proto"] == "io.prometheus.client.MetricFamily" { - for { - mf := &dto.MetricFamily{} - if _, ierr := pbutil.ReadDelimited(reader, mf); ierr != nil { - if ierr == io.EOF { - break - } - return nil, fmt.Errorf("reading metric family protocol buffer failed: %s", ierr) - } - metricFamilies[mf.GetName()] = mf - } - } else { - metricFamilies, err = parser.TextToMetricFamilies(reader) - if err != nil { - return nil, fmt.Errorf("reading text format failed: %s", err) - } - } - - // make sure all metrics have a consistent timestamp so that metrics don't straddle two different seconds - now := time.Now() - // read metrics - for metricName, mf := range metricFamilies { - for _, m := range mf.Metric { - // reading tags - tags := makeLabels(m) - // reading fields - var fields map[string]interface{} - if mf.GetType() == dto.MetricType_SUMMARY { - // summary metric - fields = makeQuantiles(m) - fields["count"] = float64(m.GetSummary().GetSampleCount()) - fields["sum"] = float64(m.GetSummary().GetSampleSum()) - } else if mf.GetType() == dto.MetricType_HISTOGRAM { - // histogram metric - fields = makeBuckets(m) - fields["count"] = float64(m.GetHistogram().GetSampleCount()) - fields["sum"] = float64(m.GetHistogram().GetSampleSum()) - - } else { - // standard metric - fields = getNameAndValue(m) - } - // converting to telegraf metric - if len(fields) > 0 { - var t time.Time - if m.TimestampMs != nil && *m.TimestampMs > 0 { - t = time.Unix(0, *m.TimestampMs*1000000) - } else { - t = now - } - metric, err := metric.New(metricName, tags, fields, t, valueType(mf.GetType())) - if err == nil { - metrics = append(metrics, metric) - } - } - } - } - - return metrics, err -} - -func valueType(mt dto.MetricType) telegraf.ValueType { - switch mt { - case dto.MetricType_COUNTER: - return telegraf.Counter - case dto.MetricType_GAUGE: - return telegraf.Gauge - case dto.MetricType_SUMMARY: - return telegraf.Summary - case dto.MetricType_HISTOGRAM: - return telegraf.Histogram - default: - return telegraf.Untyped - } -} - -// Get Quantiles from summary metric -func makeQuantiles(m *dto.Metric) map[string]interface{} { - fields := make(map[string]interface{}) - for _, q := range m.GetSummary().Quantile { - if !math.IsNaN(q.GetValue()) { - fields[fmt.Sprint(q.GetQuantile())] = float64(q.GetValue()) - } - } - return fields -} - -// Get Buckets from histogram metric -func makeBuckets(m *dto.Metric) map[string]interface{} { - fields := make(map[string]interface{}) - for _, b := range m.GetHistogram().Bucket { - fields[fmt.Sprint(b.GetUpperBound())] = float64(b.GetCumulativeCount()) - } - return fields -} - -// Get labels from metric -func makeLabels(m *dto.Metric) map[string]string { - result := map[string]string{} - for _, lp := range m.Label { - result[lp.GetName()] = lp.GetValue() - } - return result -} - -// Get name and value from metric -func getNameAndValue(m *dto.Metric) map[string]interface{} { - fields := make(map[string]interface{}) - if m.Gauge != nil { - if !math.IsNaN(m.GetGauge().GetValue()) { - fields["gauge"] = float64(m.GetGauge().GetValue()) - } - } else if m.Counter != nil { - if !math.IsNaN(m.GetCounter().GetValue()) { - fields["counter"] = float64(m.GetCounter().GetValue()) - } - } else if m.Untyped != nil { - if !math.IsNaN(m.GetUntyped().GetValue()) { - fields["value"] = float64(m.GetUntyped().GetValue()) - } - } - return fields -} - -// Get name and value from metric -func getNameAndValueV2(m *dto.Metric, metricName string) map[string]interface{} { - fields := make(map[string]interface{}) - if m.Gauge != nil { - if !math.IsNaN(m.GetGauge().GetValue()) { - fields[metricName] = float64(m.GetGauge().GetValue()) - } - } else if m.Counter != nil { - if !math.IsNaN(m.GetCounter().GetValue()) { - fields[metricName] = float64(m.GetCounter().GetValue()) - } - } else if m.Untyped != nil { - if !math.IsNaN(m.GetUntyped().GetValue()) { - fields[metricName] = float64(m.GetUntyped().GetValue()) - } - } - return fields -} diff --git a/src/modules/monapi/plugins/prometheus/prometheus/parser_test.go b/src/modules/monapi/plugins/prometheus/prometheus/parser_test.go deleted file mode 100644 index 7b2bfeca..00000000 --- a/src/modules/monapi/plugins/prometheus/prometheus/parser_test.go +++ /dev/null @@ -1,167 +0,0 @@ -package prometheus - -import ( - "net/http" - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -var exptime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) - -const validUniqueGauge = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. -# TYPE cadvisor_version_info gauge -cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 -` - -const validUniqueCounter = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source -# TYPE get_token_fail_count counter -get_token_fail_count 0 -` - -const validUniqueLine = `# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source -` - -const validUniqueSummary = `# HELP http_request_duration_microseconds The HTTP request latencies in microseconds. -# TYPE http_request_duration_microseconds summary -http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 -http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 -http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 -http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 -http_request_duration_microseconds_count{handler="prometheus"} 9 -` - -const validUniqueHistogram = `# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client. -# TYPE apiserver_request_latencies histogram -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025 -apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08 -apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 -` - -const validData = `# HELP cadvisor_version_info A metric with a constant '1' value labeled by kernel version, OS version, docker version, cadvisor version & cadvisor revision. -# TYPE cadvisor_version_info gauge -cadvisor_version_info{cadvisorRevision="",cadvisorVersion="",dockerVersion="1.8.2",kernelVersion="3.10.0-229.20.1.el7.x86_64",osVersion="CentOS Linux 7 (Core)"} 1 -# HELP go_gc_duration_seconds A summary of the GC invocation durations. -# TYPE go_gc_duration_seconds summary -go_gc_duration_seconds{quantile="0"} 0.013534896000000001 -go_gc_duration_seconds{quantile="0.25"} 0.02469263 -go_gc_duration_seconds{quantile="0.5"} 0.033727822000000005 -go_gc_duration_seconds{quantile="0.75"} 0.03840335 -go_gc_duration_seconds{quantile="1"} 0.049956604 -go_gc_duration_seconds_sum 1970.341293002 -go_gc_duration_seconds_count 65952 -# HELP http_request_duration_microseconds The HTTP request latencies in microseconds. -# TYPE http_request_duration_microseconds summary -http_request_duration_microseconds{handler="prometheus",quantile="0.5"} 552048.506 -http_request_duration_microseconds{handler="prometheus",quantile="0.9"} 5.876804288e+06 -http_request_duration_microseconds{handler="prometheus",quantile="0.99"} 5.876804288e+06 -http_request_duration_microseconds_sum{handler="prometheus"} 1.8909097205e+07 -http_request_duration_microseconds_count{handler="prometheus"} 9 -# HELP get_token_fail_count Counter of failed Token() requests to the alternate token source -# TYPE get_token_fail_count counter -get_token_fail_count 0 -# HELP apiserver_request_latencies Response latency distribution in microseconds for each verb, resource and client. -# TYPE apiserver_request_latencies histogram -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="125000"} 1994 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="250000"} 1997 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="500000"} 2000 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="1e+06"} 2005 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="2e+06"} 2012 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="4e+06"} 2017 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="8e+06"} 2024 -apiserver_request_latencies_bucket{resource="bindings",verb="POST",le="+Inf"} 2025 -apiserver_request_latencies_sum{resource="bindings",verb="POST"} 1.02726334e+08 -apiserver_request_latencies_count{resource="bindings",verb="POST"} 2025 -` - -const prometheusMulti = ` -cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -` - -const prometheusMultiSomeInvalid = ` -cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -cpu,cpu=cpu3, host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -cpu,cpu=cpu4 , usage_idle=99,usage_busy=1 -cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1 -` - -func TestParseValidPrometheus(t *testing.T) { - // Gauge value - metrics, err := Parse([]byte(validUniqueGauge), http.Header{}) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "cadvisor_version_info", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ - "gauge": float64(1), - }, metrics[0].Fields()) - assert.Equal(t, map[string]string{ - "osVersion": "CentOS Linux 7 (Core)", - "cadvisorRevision": "", - "cadvisorVersion": "", - "dockerVersion": "1.8.2", - "kernelVersion": "3.10.0-229.20.1.el7.x86_64", - }, metrics[0].Tags()) - - // Counter value - metrics, err = Parse([]byte(validUniqueCounter), http.Header{}) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "get_token_fail_count", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ - "counter": float64(0), - }, metrics[0].Fields()) - assert.Equal(t, map[string]string{}, metrics[0].Tags()) - - // Summary data - //SetDefaultTags(map[string]string{}) - metrics, err = Parse([]byte(validUniqueSummary), http.Header{}) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "http_request_duration_microseconds", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ - "0.5": 552048.506, - "0.9": 5.876804288e+06, - "0.99": 5.876804288e+06, - "count": 9.0, - "sum": 1.8909097205e+07, - }, metrics[0].Fields()) - assert.Equal(t, map[string]string{"handler": "prometheus"}, metrics[0].Tags()) - - // histogram data - metrics, err = Parse([]byte(validUniqueHistogram), http.Header{}) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "apiserver_request_latencies", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ - "500000": 2000.0, - "count": 2025.0, - "sum": 1.02726334e+08, - "250000": 1997.0, - "2e+06": 2012.0, - "4e+06": 2017.0, - "8e+06": 2024.0, - "+Inf": 2025.0, - "125000": 1994.0, - "1e+06": 2005.0, - }, metrics[0].Fields()) - assert.Equal(t, - map[string]string{"verb": "POST", "resource": "bindings"}, - metrics[0].Tags()) - -} diff --git a/src/modules/monapi/plugins/prometheus/prometheus/prometheus.go b/src/modules/monapi/plugins/prometheus/prometheus/prometheus.go deleted file mode 100644 index 5226b4bd..00000000 --- a/src/modules/monapi/plugins/prometheus/prometheus/prometheus.go +++ /dev/null @@ -1,398 +0,0 @@ -package prometheus - -import ( - "context" - "errors" - "fmt" - "io/ioutil" - "net" - "net/http" - "net/url" - "sync" - "time" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/plugins/common/tls" -) - -const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1` - -type Prometheus struct { - // An array of urls to scrape metrics from. - URLs []string `toml:"urls"` - - // An array of Kubernetes services to scrape metrics from. - KubernetesServices []string - - // Content of kubernetes config file - KubeConfigContent string - - // Label Selector/s for Kubernetes - KubernetesLabelSelector string `toml:"kubernetes_label_selector"` - - // Field Selector/s for Kubernetes - KubernetesFieldSelector string `toml:"kubernetes_field_selector"` - - // Bearer Token authorization file path - BearerToken string `toml:"bearer_token"` - BearerTokenString string `toml:"bearer_token_string"` - - // Basic authentication credentials - Username string `toml:"username"` - Password string `toml:"password"` - - ResponseTimeout time.Duration `toml:"response_timeout"` - - MetricVersion int `toml:"metric_version"` - - URLTag string `toml:"url_tag"` - - tls.ClientConfig - - Log telegraf.Logger - - client *http.Client - - // Should we scrape Kubernetes services for prometheus annotations - MonitorPods bool `toml:"monitor_kubernetes_pods"` - PodNamespace string `toml:"monitor_kubernetes_pods_namespace"` - lock sync.Mutex - kubernetesPods map[string]URLAndAddress - cancel context.CancelFunc - wg sync.WaitGroup -} - -var sampleConfig = ` - ## An array of urls to scrape metrics from. - urls = ["http://localhost:9100/metrics"] - - ## Metric version controls the mapping from Prometheus metrics into - ## Telegraf metrics. When using the prometheus_client output, use the same - ## value in both plugins to ensure metrics are round-tripped without - ## modification. - ## - ## example: metric_version = 1; deprecated in 1.13 - ## metric_version = 2; recommended version - # metric_version = 1 - - ## Url tag name (tag containing scrapped url. optional, default is "url") - # url_tag = "scrapeUrl" - - ## An array of Kubernetes services to scrape metrics from. - # kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"] - - ## Kubernetes config file to create client from. - # kube_config = "/path/to/kubernetes.config" - - ## Scrape Kubernetes pods for the following prometheus annotations: - ## - prometheus.io/scrape: Enable scraping for this pod - ## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to - ## set this to 'https' & most likely set the tls config. - ## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. - ## - prometheus.io/port: If port is not 9102 use this annotation - # monitor_kubernetes_pods = true - ## Restricts Kubernetes monitoring to a single namespace - ## ex: monitor_kubernetes_pods_namespace = "default" - # monitor_kubernetes_pods_namespace = "" - # label selector to target pods which have the label - # kubernetes_label_selector = "env=dev,app=nginx" - # field selector to target pods - # eg. To scrape pods on a specific node - # kubernetes_field_selector = "spec.nodeName=$HOSTNAME" - - ## Use bearer token for authorization. ('bearer_token' takes priority) - # bearer_token = "/path/to/bearer/token" - ## OR - # bearer_token_string = "abc_123" - - ## HTTP Basic Authentication username and password. ('bearer_token' and - ## 'bearer_token_string' take priority) - # username = "" - # password = "" - - ## Specify timeout duration for slower prometheus clients (default is 3s) - # response_timeout = "3s" - - ## Optional TLS Config - # tls_ca = /path/to/cafile - # tls_cert = /path/to/certfile - # tls_key = /path/to/keyfile - ## Use TLS but skip chain & host verification - # insecure_skip_verify = false -` - -func (p *Prometheus) SampleConfig() string { - return sampleConfig -} - -func (p *Prometheus) Description() string { - return "Read metrics from one or many prometheus clients" -} - -func (p *Prometheus) Init() error { - if p.MetricVersion != 2 { - p.Log.Warnf("Use of deprecated configuration: 'metric_version = 1'; please update to 'metric_version = 2'") - } - - return nil -} - -var ErrProtocolError = errors.New("prometheus protocol error") - -func (p *Prometheus) AddressToURL(u *url.URL, address string) *url.URL { - host := address - if u.Port() != "" { - host = address + ":" + u.Port() - } - reconstructedURL := &url.URL{ - Scheme: u.Scheme, - Opaque: u.Opaque, - User: u.User, - Path: u.Path, - RawPath: u.RawPath, - ForceQuery: u.ForceQuery, - RawQuery: u.RawQuery, - Fragment: u.Fragment, - Host: host, - } - return reconstructedURL -} - -type URLAndAddress struct { - OriginalURL *url.URL - URL *url.URL - Address string - Tags map[string]string -} - -func (p *Prometheus) GetAllURLs() (map[string]URLAndAddress, error) { - allURLs := make(map[string]URLAndAddress, 0) - for _, u := range p.URLs { - URL, err := url.Parse(u) - if err != nil { - p.Log.Errorf("Could not parse %q, skipping it. Error: %s", u, err.Error()) - continue - } - allURLs[URL.String()] = URLAndAddress{URL: URL, OriginalURL: URL} - } - - p.lock.Lock() - defer p.lock.Unlock() - // loop through all pods scraped via the prometheus annotation on the pods - for k, v := range p.kubernetesPods { - allURLs[k] = v - } - - for _, service := range p.KubernetesServices { - URL, err := url.Parse(service) - if err != nil { - return nil, err - } - - resolvedAddresses, err := net.LookupHost(URL.Hostname()) - if err != nil { - p.Log.Errorf("Could not resolve %q, skipping it. Error: %s", URL.Host, err.Error()) - continue - } - for _, resolved := range resolvedAddresses { - serviceURL := p.AddressToURL(URL, resolved) - allURLs[serviceURL.String()] = URLAndAddress{ - URL: serviceURL, - Address: resolved, - OriginalURL: URL, - } - } - } - return allURLs, nil -} - -// Reads stats from all configured servers accumulates stats. -// Returns one of the errors encountered while gather stats (if any). -func (p *Prometheus) Gather(acc telegraf.Accumulator) error { - if p.client == nil { - client, err := p.createHTTPClient() - if err != nil { - return err - } - p.client = client - } - - var wg sync.WaitGroup - - allURLs, err := p.GetAllURLs() - if err != nil { - return err - } - for _, URL := range allURLs { - wg.Add(1) - go func(serviceURL URLAndAddress) { - defer wg.Done() - acc.AddError(p.gatherURL(serviceURL, acc)) - }(URL) - } - - wg.Wait() - - return nil -} - -func (p *Prometheus) createHTTPClient() (*http.Client, error) { - tlsCfg, err := p.ClientConfig.TLSConfig() - if err != nil { - return nil, err - } - - client := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: tlsCfg, - DisableKeepAlives: true, - }, - Timeout: p.ResponseTimeout, - } - - return client, nil -} - -func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error { - var req *http.Request - var err error - var uClient *http.Client - var metrics []telegraf.Metric - if u.URL.Scheme == "unix" { - path := u.URL.Query().Get("path") - if path == "" { - path = "/metrics" - } - addr := "http://localhost" + path - req, err = http.NewRequest("GET", addr, nil) - if err != nil { - return fmt.Errorf("unable to create new request '%s': %s", addr, err) - } - - // ignore error because it's been handled before getting here - tlsCfg, _ := p.ClientConfig.TLSConfig() - uClient = &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: tlsCfg, - DisableKeepAlives: true, - Dial: func(network, addr string) (net.Conn, error) { - c, err := net.Dial("unix", u.URL.Path) - return c, err - }, - }, - Timeout: p.ResponseTimeout, - } - } else { - if u.URL.Path == "" { - u.URL.Path = "/metrics" - } - req, err = http.NewRequest("GET", u.URL.String(), nil) - if err != nil { - return fmt.Errorf("unable to create new request '%s': %s", u.URL.String(), err) - } - } - - req.Header.Add("Accept", acceptHeader) - - if p.BearerToken != "" { - token, err := ioutil.ReadFile(p.BearerToken) - if err != nil { - return err - } - req.Header.Set("Authorization", "Bearer "+string(token)) - } else if p.BearerTokenString != "" { - req.Header.Set("Authorization", "Bearer "+p.BearerTokenString) - } else if p.Username != "" || p.Password != "" { - req.SetBasicAuth(p.Username, p.Password) - } - - var resp *http.Response - if u.URL.Scheme != "unix" { - resp, err = p.client.Do(req) - } else { - resp, err = uClient.Do(req) - } - if err != nil { - return fmt.Errorf("error making HTTP request to %s: %s", u.URL, err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("%s returned HTTP status %s", u.URL, resp.Status) - } - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading body: %s", err) - } - - if p.MetricVersion == 2 { - metrics, err = ParseV2(body, resp.Header) - } else { - metrics, err = Parse(body, resp.Header) - } - - if err != nil { - return fmt.Errorf("error reading metrics for %s: %s", - u.URL, err) - } - - for _, metric := range metrics { - tags := metric.Tags() - // strip user and password from URL - u.OriginalURL.User = nil - if p.URLTag != "" { - tags[p.URLTag] = u.OriginalURL.String() - } - if u.Address != "" { - tags["address"] = u.Address - } - for k, v := range u.Tags { - tags[k] = v - } - - switch metric.Type() { - case telegraf.Counter: - acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time()) - case telegraf.Gauge: - acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time()) - case telegraf.Summary: - acc.AddSummary(metric.Name(), metric.Fields(), tags, metric.Time()) - case telegraf.Histogram: - acc.AddHistogram(metric.Name(), metric.Fields(), tags, metric.Time()) - default: - acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time()) - } - } - - return nil -} - -// Start will start the Kubernetes scraping if enabled in the configuration -func (p *Prometheus) Start(a telegraf.Accumulator) error { - if p.MonitorPods { - var ctx context.Context - ctx, p.cancel = context.WithCancel(context.Background()) - return p.start(ctx) - } - return nil -} - -func (p *Prometheus) Stop() { - if p.MonitorPods { - p.cancel() - } - p.wg.Wait() -} - -/* -func init() { - inputs.Add("prometheus", func() telegraf.Input { - return &Prometheus{ - ResponseTimeout: internal.Duration{Duration: time.Second * 3}, - kubernetesPods: map[string]URLAndAddress{}, - URLTag: "url", - } - }) -} -*/ diff --git a/src/modules/monapi/plugins/prometheus/prometheus/prometheus_test.go b/src/modules/monapi/plugins/prometheus/prometheus/prometheus_test.go deleted file mode 100644 index d33cba27..00000000 --- a/src/modules/monapi/plugins/prometheus/prometheus/prometheus_test.go +++ /dev/null @@ -1,236 +0,0 @@ -package prometheus - -import ( - "fmt" - "math" - "net/http" - "net/http/httptest" - "net/url" - "testing" - "time" - - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const sampleTextFormat = `# HELP go_gc_duration_seconds A summary of the GC invocation durations. -# TYPE go_gc_duration_seconds summary -go_gc_duration_seconds{quantile="0"} 0.00010425500000000001 -go_gc_duration_seconds{quantile="0.25"} 0.000139108 -go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002 -go_gc_duration_seconds{quantile="0.75"} 0.000331463 -go_gc_duration_seconds{quantile="1"} 0.000667154 -go_gc_duration_seconds_sum 0.0018183950000000002 -go_gc_duration_seconds_count 7 -# HELP go_goroutines Number of goroutines that currently exist. -# TYPE go_goroutines gauge -go_goroutines 15 -# HELP test_metric An untyped metric with a timestamp -# TYPE test_metric untyped -test_metric{label="value"} 1.0 1490802350000 -` -const sampleSummaryTextFormat = `# HELP go_gc_duration_seconds A summary of the GC invocation durations. -# TYPE go_gc_duration_seconds summary -go_gc_duration_seconds{quantile="0"} 0.00010425500000000001 -go_gc_duration_seconds{quantile="0.25"} 0.000139108 -go_gc_duration_seconds{quantile="0.5"} 0.00015749400000000002 -go_gc_duration_seconds{quantile="0.75"} 0.000331463 -go_gc_duration_seconds{quantile="1"} 0.000667154 -go_gc_duration_seconds_sum 0.0018183950000000002 -go_gc_duration_seconds_count 7 -` -const sampleGaugeTextFormat = ` -# HELP go_goroutines Number of goroutines that currently exist. -# TYPE go_goroutines gauge -go_goroutines 15 1490802350000 -` - -func TestPrometheusGeneratesMetrics(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintln(w, sampleTextFormat) - })) - defer ts.Close() - - p := &Prometheus{ - Log: testutil.Logger{}, - URLs: []string{ts.URL}, - URLTag: "url", - } - - var acc testutil.Accumulator - - err := acc.GatherError(p.Gather) - require.NoError(t, err) - - assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count")) - assert.True(t, acc.HasFloatField("go_goroutines", "gauge")) - assert.True(t, acc.HasFloatField("test_metric", "value")) - assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0))) - assert.False(t, acc.HasTag("test_metric", "address")) - assert.True(t, acc.TagValue("test_metric", "url") == ts.URL+"/metrics") -} - -func TestPrometheusGeneratesMetricsWithHostNameTag(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintln(w, sampleTextFormat) - })) - defer ts.Close() - - p := &Prometheus{ - Log: testutil.Logger{}, - KubernetesServices: []string{ts.URL}, - URLTag: "url", - } - u, _ := url.Parse(ts.URL) - tsAddress := u.Hostname() - - var acc testutil.Accumulator - - err := acc.GatherError(p.Gather) - require.NoError(t, err) - - assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count")) - assert.True(t, acc.HasFloatField("go_goroutines", "gauge")) - assert.True(t, acc.HasFloatField("test_metric", "value")) - assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0))) - assert.True(t, acc.TagValue("test_metric", "address") == tsAddress) - assert.True(t, acc.TagValue("test_metric", "url") == ts.URL) -} - -func TestPrometheusGeneratesMetricsAlthoughFirstDNSFails(t *testing.T) { - if testing.Short() { - t.Skip("Skipping integration test in short mode") - } - - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintln(w, sampleTextFormat) - })) - defer ts.Close() - - p := &Prometheus{ - Log: testutil.Logger{}, - URLs: []string{ts.URL}, - KubernetesServices: []string{"http://random.telegraf.local:88/metrics"}, - } - - var acc testutil.Accumulator - - err := acc.GatherError(p.Gather) - require.NoError(t, err) - - assert.True(t, acc.HasFloatField("go_gc_duration_seconds", "count")) - assert.True(t, acc.HasFloatField("go_goroutines", "gauge")) - assert.True(t, acc.HasFloatField("test_metric", "value")) - assert.True(t, acc.HasTimestamp("test_metric", time.Unix(1490802350, 0))) -} - -func TestPrometheusGeneratesSummaryMetricsV2(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintln(w, sampleSummaryTextFormat) - })) - defer ts.Close() - - p := &Prometheus{ - URLs: []string{ts.URL}, - URLTag: "url", - MetricVersion: 2, - } - - var acc testutil.Accumulator - - err := acc.GatherError(p.Gather) - require.NoError(t, err) - - assert.True(t, acc.TagSetValue("prometheus", "quantile") == "0") - assert.True(t, acc.HasFloatField("prometheus", "go_gc_duration_seconds_sum")) - assert.True(t, acc.HasFloatField("prometheus", "go_gc_duration_seconds_count")) - assert.True(t, acc.TagValue("prometheus", "url") == ts.URL+"/metrics") - -} - -func TestSummaryMayContainNaN(t *testing.T) { - const data = `# HELP go_gc_duration_seconds A summary of the GC invocation durations. -# TYPE go_gc_duration_seconds summary -go_gc_duration_seconds{quantile="0"} NaN -go_gc_duration_seconds{quantile="1"} NaN -go_gc_duration_seconds_sum 42.0 -go_gc_duration_seconds_count 42 -` - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintln(w, data) - })) - defer ts.Close() - - p := &Prometheus{ - URLs: []string{ts.URL}, - URLTag: "", - MetricVersion: 2, - } - - var acc testutil.Accumulator - - err := p.Gather(&acc) - require.NoError(t, err) - - expected := []telegraf.Metric{ - testutil.MustMetric( - "prometheus", - map[string]string{ - "quantile": "0", - }, - map[string]interface{}{ - "go_gc_duration_seconds": math.NaN(), - }, - time.Unix(0, 0), - telegraf.Summary, - ), - testutil.MustMetric( - "prometheus", - map[string]string{ - "quantile": "1", - }, - map[string]interface{}{ - "go_gc_duration_seconds": math.NaN(), - }, - time.Unix(0, 0), - telegraf.Summary, - ), - testutil.MustMetric( - "prometheus", - map[string]string{}, - map[string]interface{}{ - "go_gc_duration_seconds_sum": 42.0, - "go_gc_duration_seconds_count": 42.0, - }, - time.Unix(0, 0), - telegraf.Summary, - ), - } - - testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics(), - testutil.IgnoreTime(), testutil.SortMetrics()) -} - -func TestPrometheusGeneratesGaugeMetricsV2(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintln(w, sampleGaugeTextFormat) - })) - defer ts.Close() - - p := &Prometheus{ - URLs: []string{ts.URL}, - URLTag: "url", - MetricVersion: 2, - } - - var acc testutil.Accumulator - - err := acc.GatherError(p.Gather) - require.NoError(t, err) - - assert.True(t, acc.HasFloatField("prometheus", "go_goroutines")) - assert.True(t, acc.TagValue("prometheus", "url") == ts.URL+"/metrics") - assert.True(t, acc.HasTimestamp("prometheus", time.Unix(1490802350, 0))) -} diff --git a/src/modules/monapi/plugins/prometheus/prometheus_test.go b/src/modules/monapi/plugins/prometheus/prometheus_test.go deleted file mode 100644 index 90f395b3..00000000 --- a/src/modules/monapi/plugins/prometheus/prometheus_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package prometheus - -import ( - "context" - "fmt" - "net/http" - "testing" - "time" - - "github.com/didi/nightingale/src/modules/monapi/plugins" -) - -const sampleTextFormat = `# HELP test_metric An untyped metric with a timestamp -# TYPE test_metric untyped -test_metric{label="value"} 1.0 1490802350000 -# HELP helo_stats_test_timer helo_stats_test_timer summary -# TYPE helo_stats_test_timer summary -helo_stats_test_timer{region="bj",zone="test_1",quantile="0.5"} 0.501462767 -helo_stats_test_timer{region="bj",zone="test_1",quantile="0.75"} 0.751876572 -helo_stats_test_timer{region="bj",zone="test_1",quantile="0.95"} 0.978413628 -helo_stats_test_timer{region="bj",zone="test_1",quantile="0.99"} 0.989530661 -helo_stats_test_timer{region="bj",zone="test_1",quantile="0.999"} 0.989530661 -helo_stats_test_timer_sum{region="bj",zone="test_1"} 39.169514066999994 -helo_stats_test_timer_count{region="bj",zone="test_1"} 74 -# HELP helo_stats_test_histogram helo_stats_test_histogram histogram -# TYPE helo_stats_test_histogram histogram -helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0"} 0 -helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0.05"} 0 -helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0.1"} 2 -helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0.25"} 13 -helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="0.5"} 24 -helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="1"} 56 -helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="3"} 56 -helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="6"} 56 -helo_stats_test_histogram_bucket{region="bj",zone="test_1",le="+Inf"} 56 -helo_stats_test_histogram_sum{region="bj",zone="test_1"} 40.45 -helo_stats_test_histogram_count{region="bj",zone="test_1"} 56 -# HELP go_goroutines Number of goroutines that currently exist. -# TYPE go_goroutines gauge -go_goroutines 15 1490802350000 -` - -func TestCollect(t *testing.T) { - http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, sampleTextFormat) }) - server := &http.Server{Addr: ":18080"} - go func() { - server.ListenAndServe() - }() - defer server.Shutdown(context.Background()) - - time.Sleep(time.Millisecond * 100) - - plugins.PluginTest(t, &PrometheusRule{ - URLs: []string{"http://localhost:18080/metrics"}, - }) -} diff --git a/src/modules/monapi/plugins/util.go b/src/modules/monapi/plugins/util.go index ac9cfee0..267f065f 100644 --- a/src/modules/monapi/plugins/util.go +++ b/src/modules/monapi/plugins/util.go @@ -2,11 +2,7 @@ package plugins import ( "fmt" - "testing" - "github.com/didi/nightingale/src/common/dataobj" - "github.com/didi/nightingale/src/modules/prober/manager" - "github.com/influxdata/telegraf" "github.com/toolkits/pkg/logger" ) @@ -43,29 +39,3 @@ func (l *Logger) Infof(format string, args ...interface{}) { func (l *Logger) Info(args ...interface{}) { logger.LogDepth(logger.INFO, 1, fmt.Sprint(args...)) } - -type telegrafPlugin interface { - TelegrafInput() (telegraf.Input, error) -} - -func PluginTest(t *testing.T, plugin telegrafPlugin) { - metrics := []*dataobj.MetricValue{} - - input, err := plugin.TelegrafInput() - if err != nil { - t.Error(err) - } - - acc, err := manager.NewAccumulator(manager.AccumulatorOptions{Name: "github-test", Metrics: &metrics}) - if err != nil { - t.Error(err) - } - - if err = input.Gather(acc); err != nil { - t.Error(err) - } - - for k, v := range metrics { - t.Logf("%d %s %s %f", k, v.CounterType, v.PK(), v.Value) - } -} diff --git a/src/modules/prober/manager/accumulator.go b/src/modules/prober/manager/accumulator.go deleted file mode 100644 index 46e8b9ed..00000000 --- a/src/modules/prober/manager/accumulator.go +++ /dev/null @@ -1,258 +0,0 @@ -package manager - -import ( - "fmt" - "strings" - "sync" - "time" - - "github.com/didi/nightingale/src/common/dataobj" - "github.com/influxdata/telegraf" - "github.com/toolkits/pkg/logger" -) - -type AccumulatorOptions struct { - Name string - Tags map[string]string - Metrics *[]*dataobj.MetricValue -} - -func (p *AccumulatorOptions) Validate() error { - if p.Name == "" { - return fmt.Errorf("unable to get Name") - } - if p.Metrics == nil { - return fmt.Errorf("unable to get metrics") - } - - return nil -} - -// NewAccumulator return telegraf.Accumulator -func NewAccumulator(opt AccumulatorOptions) (telegraf.Accumulator, error) { - if err := opt.Validate(); err != nil { - return nil, err - } - - return &accumulator{ - name: opt.Name, - tags: opt.Tags, - metrics: opt.Metrics, - precision: time.Second, - }, nil -} - -type accumulator struct { - sync.RWMutex - name string - tags map[string]string - precision time.Duration - metrics *[]*dataobj.MetricValue -} - -func (p *accumulator) AddFields( - measurement string, - fields map[string]interface{}, - tags map[string]string, - t ...time.Time, -) { - p.addFields(measurement, tags, fields, telegraf.Untyped, t...) -} - -func (p *accumulator) AddGauge( - measurement string, - fields map[string]interface{}, - tags map[string]string, - t ...time.Time, -) { - p.addFields(measurement, tags, fields, telegraf.Gauge, t...) -} - -func (p *accumulator) AddCounter( - measurement string, - fields map[string]interface{}, - tags map[string]string, - t ...time.Time, -) { - p.addFields(measurement, tags, fields, telegraf.Counter, t...) -} - -func (p *accumulator) AddSummary( - measurement string, - fields map[string]interface{}, - tags map[string]string, - t ...time.Time, -) { - p.addFields(measurement, tags, fields, telegraf.Summary, t...) -} - -func (p *accumulator) AddHistogram( - measurement string, - fields map[string]interface{}, - tags map[string]string, - t ...time.Time, -) { - p.addFields(measurement, tags, fields, telegraf.Histogram, t...) -} - -func (p *accumulator) AddMetric(m telegraf.Metric) { - m.SetTime(m.Time().Round(p.precision)) - if metrics := p.makeMetric(m); m != nil { - p.pushMetrics(metrics) - } -} - -func (p *accumulator) SetPrecision(precision time.Duration) { - p.precision = precision -} - -// AddError passes a runtime error to the accumulator. -// The error will be tagged with the plugin name and written to the log. -func (p *accumulator) AddError(err error) { - if err == nil { - return - } - logger.Debugf("accumulator %s Error: %s", p.name, err) -} - -func (p *accumulator) WithTracking(maxTracked int) telegraf.TrackingAccumulator { - return nil -} - -func (p *accumulator) addFields( - measurement string, - tags map[string]string, - fields map[string]interface{}, - tp telegraf.ValueType, - t ...time.Time, -) { - m, err := NewMetric(measurement, tags, fields, p.getTime(t), tp) - if err != nil { - return - } - if metrics := p.makeMetric(m); m != nil { - p.pushMetrics(metrics) - } -} - -func (p *accumulator) getTime(t []time.Time) time.Time { - var timestamp time.Time - if len(t) > 0 { - timestamp = t[0] - } else { - timestamp = time.Now() - } - return timestamp.Round(p.precision) -} - -// https://docs.influxdata.com/telegraf/v1.14/data_formats/output/prometheus/ -func (p *accumulator) makeMetric(metric telegraf.Metric) []*dataobj.MetricValue { - tags := map[string]string{} - for _, v := range metric.TagList() { - tags[v.Key] = v.Value - } - - for k, v := range p.tags { - tags[k] = v - } - - switch metric.Type() { - case telegraf.Counter: - return makeCounter(metric, tags) - case telegraf.Summary, telegraf.Histogram: - logger.Debugf("unsupported type summary, histogram, skip") - return nil - // return makeSummary(metric, tags) - default: - return makeGauge(metric, tags) - } - -} - -func makeSummary(metric telegraf.Metric, tags map[string]string) []*dataobj.MetricValue { - name := metric.Name() - ts := metric.Time().Unix() - fields := metric.Fields() - ms := make([]*dataobj.MetricValue, 0, len(fields)) - - for k, v := range fields { - f, ok := v.(float64) - if !ok { - continue - } - - countType := "GAUGE" - if strings.HasSuffix(k, "_count") || - strings.HasSuffix(k, "_sum") { - countType = "COUNTER" - } - - ms = append(ms, &dataobj.MetricValue{ - Metric: name + "_" + k, - CounterType: countType, - Timestamp: ts, - TagsMap: tags, - Value: f, - ValueUntyped: f, - }) - - } - return ms -} - -func makeCounter(metric telegraf.Metric, tags map[string]string) []*dataobj.MetricValue { - name := metric.Name() - ts := metric.Time().Unix() - fields := metric.Fields() - ms := make([]*dataobj.MetricValue, 0, len(fields)) - - for k, v := range fields { - f, ok := v.(float64) - if !ok { - continue - } - - ms = append(ms, &dataobj.MetricValue{ - Metric: name + "_" + k, - CounterType: "COUNTER", - Timestamp: ts, - TagsMap: tags, - Value: f, - ValueUntyped: f, - }) - } - - return ms -} - -func makeGauge(metric telegraf.Metric, tags map[string]string) []*dataobj.MetricValue { - name := metric.Name() - ts := metric.Time().Unix() - fields := metric.Fields() - ms := make([]*dataobj.MetricValue, 0, len(fields)) - - for k, v := range fields { - f, ok := v.(float64) - if !ok { - continue - } - - ms = append(ms, &dataobj.MetricValue{ - Metric: name + "_" + k, - CounterType: "GAUGE", - Timestamp: ts, - TagsMap: tags, - Value: f, - ValueUntyped: f, - }) - } - - return ms - -} - -func (p *accumulator) pushMetrics(metrics []*dataobj.MetricValue) { - p.Lock() - defer p.Unlock() - *p.metrics = append(*p.metrics, metrics...) -} diff --git a/src/modules/prober/manager/collectrule.go b/src/modules/prober/manager/collectrule.go index 0beff6c4..c0c6581c 100644 --- a/src/modules/prober/manager/collectrule.go +++ b/src/modules/prober/manager/collectrule.go @@ -1,9 +1,9 @@ package manager import ( - "fmt" "strconv" "sync" + "time" "github.com/didi/nightingale/src/common/dataobj" "github.com/didi/nightingale/src/models" @@ -13,14 +13,14 @@ import ( "github.com/toolkits/pkg/logger" ) +// not thread-safe type collectRule struct { sync.RWMutex + telegraf.Input *models.CollectRule - - input telegraf.Input - acc telegraf.Accumulator - metrics *[]*dataobj.MetricValue tags map[string]string + precision time.Duration + metrics []*dataobj.MetricValue lastAt int64 updatedAt int64 } @@ -41,72 +41,43 @@ func newCollectRule(rule *models.CollectRule) (*collectRule, error) { return nil, err } - metrics := []*dataobj.MetricValue{} - - acc, err := NewAccumulator(AccumulatorOptions{ - Name: fmt.Sprintf("%s-%d", rule.CollectType, rule.Id), - Tags: tags, - Metrics: &metrics}) - if err != nil { - return nil, err - } - return &collectRule{ + Input: input, CollectRule: rule, - input: input, - acc: acc, - metrics: &metrics, tags: tags, + metrics: []*dataobj.MetricValue{}, + precision: time.Second, updatedAt: rule.UpdatedAt, }, nil } -func (p *collectRule) reset() { - p.Lock() - defer p.Unlock() - - *p.metrics = (*p.metrics)[:0] -} - -func (p *collectRule) Metrics() []*dataobj.MetricValue { - p.RLock() - defer p.RUnlock() - - return *p.metrics -} - // prepareMetrics -func (p *collectRule) prepareMetrics() (metrics []*dataobj.MetricValue, err error) { - p.RLock() - defer p.RUnlock() - - if len(*p.metrics) == 0 { - return +func (p *collectRule) prepareMetrics() error { + if len(p.metrics) == 0 { + return nil } - - metrics = *p.metrics - ts := metrics[0].Timestamp + ts := p.metrics[0].Timestamp nid := strconv.FormatInt(p.Nid, 10) pluginConfig, ok := config.GetPluginConfig(p.PluginName()) if !ok { - return + return nil } vars := map[string]*dataobj.MetricValue{} - for _, v := range metrics { + for _, v := range p.metrics { logger.Debugf("get v[%s] %f", v.Metric, v.Value) vars[v.Metric] = v } - metrics = metrics[:0] + p.metrics = p.metrics[:0] for _, metric := range pluginConfig.ExprMetrics { f, err := metric.Calc(vars) if err != nil { logger.Debugf("calc err %s", err) continue } - metrics = append(metrics, &dataobj.MetricValue{ + p.metrics = append(p.metrics, &dataobj.MetricValue{ Nid: nid, Metric: metric.Name, Timestamp: ts, @@ -120,7 +91,7 @@ func (p *collectRule) prepareMetrics() (metrics []*dataobj.MetricValue, err erro for k, v := range vars { if metric, ok := pluginConfig.Metrics[k]; ok { - metrics = append(metrics, &dataobj.MetricValue{ + p.metrics = append(p.metrics, &dataobj.MetricValue{ Nid: nid, Metric: k, Timestamp: ts, @@ -134,7 +105,7 @@ func (p *collectRule) prepareMetrics() (metrics []*dataobj.MetricValue, err erro if pluginConfig.Mode == config.PluginModeWhitelist { continue } - metrics = append(metrics, &dataobj.MetricValue{ + p.metrics = append(p.metrics, &dataobj.MetricValue{ Nid: nid, Metric: k, Timestamp: ts, @@ -144,15 +115,13 @@ func (p *collectRule) prepareMetrics() (metrics []*dataobj.MetricValue, err erro Value: v.Value, ValueUntyped: v.ValueUntyped, }) + } } - return + return nil } func (p *collectRule) update(rule *models.CollectRule) error { - p.Lock() - defer p.Unlock() - if p.updatedAt == rule.UpdatedAt { return nil } @@ -170,18 +139,145 @@ func (p *collectRule) update(rule *models.CollectRule) error { return err } - acc, err := NewAccumulator(AccumulatorOptions{ - Name: fmt.Sprintf("%s-%d", rule.CollectType, rule.Id), - Tags: tags, - Metrics: p.metrics}) - if err != nil { - return err - } - - p.input = input + p.Input = input p.CollectRule = rule - p.acc = acc + p.tags = tags p.UpdatedAt = rule.UpdatedAt return nil } + +// https://docs.influxdata.com/telegraf/v1.14/data_formats/output/prometheus/ +func (p *collectRule) MakeMetric(metric telegraf.Metric) []*dataobj.MetricValue { + tags := map[string]string{} + for _, v := range metric.TagList() { + tags[v.Key] = v.Value + } + + for k, v := range p.tags { + tags[k] = v + } + + name := metric.Name() + ts := metric.Time().Unix() + + fields := metric.Fields() + ms := make([]*dataobj.MetricValue, 0, len(fields)) + for k, v := range fields { + f, ok := v.(float64) + if !ok { + continue + } + + ms = append(ms, &dataobj.MetricValue{ + Metric: name + "_" + k, + Timestamp: ts, + TagsMap: tags, + Value: f, + ValueUntyped: f, + }) + } + + return ms +} + +func (p *collectRule) AddFields( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + p.addFields(measurement, tags, fields, telegraf.Untyped, t...) +} + +func (p *collectRule) AddGauge( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + p.addFields(measurement, tags, fields, telegraf.Gauge, t...) +} + +func (p *collectRule) AddCounter( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + p.addFields(measurement, tags, fields, telegraf.Counter, t...) +} + +func (p *collectRule) AddSummary( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + p.addFields(measurement, tags, fields, telegraf.Summary, t...) +} + +func (p *collectRule) AddHistogram( + measurement string, + fields map[string]interface{}, + tags map[string]string, + t ...time.Time, +) { + p.addFields(measurement, tags, fields, telegraf.Histogram, t...) +} + +func (p *collectRule) AddMetric(m telegraf.Metric) { + m.SetTime(m.Time().Round(p.precision)) + if metrics := p.MakeMetric(m); m != nil { + p.pushMetrics(metrics) + } +} + +func (p *collectRule) pushMetrics(metrics []*dataobj.MetricValue) { + p.Lock() + defer p.Unlock() + p.metrics = append(p.metrics, metrics...) +} + +func (p *collectRule) addFields( + measurement string, + tags map[string]string, + fields map[string]interface{}, + tp telegraf.ValueType, + t ...time.Time, +) { + m, err := NewMetric(measurement, tags, fields, p.getTime(t), tp) + if err != nil { + return + } + if metrics := p.MakeMetric(m); m != nil { + p.pushMetrics(metrics) + } +} + +// AddError passes a runtime error to the accumulator. +// The error will be tagged with the plugin name and written to the log. +func (p *collectRule) AddError(err error) { + if err == nil { + return + } + logger.Debugf("collectRule %s.%s(%d) Error: %s", p.CollectType, p.Name, p.Id, err) +} + +func (p *collectRule) SetPrecision(precision time.Duration) { + p.precision = precision +} + +func (p *collectRule) getTime(t []time.Time) time.Time { + var timestamp time.Time + if len(t) > 0 { + timestamp = t[0] + } else { + timestamp = time.Now() + } + return timestamp.Round(p.precision) +} + +func (p *collectRule) WithTracking(maxTracked int) telegraf.TrackingAccumulator { + return nil +} diff --git a/src/modules/prober/manager/manager.go b/src/modules/prober/manager/manager.go index 7fe9a2b8..465fd8b4 100644 --- a/src/modules/prober/manager/manager.go +++ b/src/modules/prober/manager/manager.go @@ -3,7 +3,6 @@ package manager import ( "container/heap" "context" - "fmt" "log" "time" @@ -168,7 +167,7 @@ func (p *worker) loop(id int) { return case rule := <-p.collectRuleCh: if err := p.do(rule); err != nil { - logger.Debugf("work[%d].do %s", id, err) + log.Printf("work[%d].do err %s", id, err) } } } @@ -176,22 +175,19 @@ func (p *worker) loop(id int) { } func (p *worker) do(rule *collectRule) error { - rule.reset() + rule.metrics = rule.metrics[:0] // telegraf - err := rule.input.Gather(rule.acc) - if err != nil { - return fmt.Errorf("gather %s", err) + err := rule.Input.Gather(rule) + if len(rule.metrics) == 0 { + return err } // eval expression metrics - metrics, err := rule.prepareMetrics() - if err != nil { - return fmt.Errorf("prepareMetrics %s", err) - } + rule.prepareMetrics() - // push to transfer - core.Push(metrics) + // send + core.Push(rule.metrics) return err }