Add metrics exporter

This commit is contained in:
photino 2023-01-11 23:25:53 +08:00
parent 96b16fe75f
commit e830067fd6
11 changed files with 167 additions and 72 deletions

View File

@ -27,4 +27,9 @@ username = "postgres"
password = "QAx01wnh1i5ER713zfHmZi6dIUYn/Iq9ag+iUGtvKzEFJFYW" password = "QAx01wnh1i5ER713zfHmZi6dIUYn/Iq9ag+iUGtvKzEFJFYW"
[tracing] [tracing]
filter = "info,sqlx=trace,tower_http=trace,zino=trace,zino_core=trace" filter = "info,sqlx=trace,tower_http=trace,zino=trace,zino_core=trace"
[metrics]
exporter = "prometheus"
host = "localhost"
port = 9000

View File

@ -27,4 +27,9 @@ username = "postgres"
password = "G76hTg8T5Aa+SZQFc+0QnsRLo1UOjqpkp/jUQ+lySc8QCt4B" password = "G76hTg8T5Aa+SZQFc+0QnsRLo1UOjqpkp/jUQ+lySc8QCt4B"
[tracing] [tracing]
filter = "info,sqlx=warn" filter = "info,sqlx=warn"
[metrics]
exporter = "prometheus"
host = "localhost"
port = 9000

View File

@ -25,6 +25,9 @@ http = { version = "0.2.8" }
http-body = { version = "0.4.5" } http-body = { version = "0.4.5" }
http-types = { version = "2.12.0" } http-types = { version = "2.12.0" }
lru = { version = "0.9.0" } lru = { version = "0.9.0" }
metrics = { version = "0.20.1" }
metrics-exporter-prometheus = { version = "0.11.0" }
metrics-exporter-tcp = { version = "0.7.0" }
parking_lot = { version = "0.12.1" } parking_lot = { version = "0.12.1" }
rand = { version = "0.8.5" } rand = { version = "0.8.5" }
serde = { version = "1.0.152", features = ["derive"] } serde = { version = "1.0.152", features = ["derive"] }

View File

@ -1,10 +1,14 @@
use crate::{AsyncCronJob, CronJob, Job, JobScheduler, Map, State}; use crate::{AsyncCronJob, CronJob, Job, JobScheduler, Map, State};
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_exporter_tcp::TcpBuilder;
use std::{ use std::{
collections::HashMap, collections::HashMap,
env, io, env, io,
path::PathBuf, net::IpAddr,
path::{Path, PathBuf},
sync::{LazyLock, OnceLock}, sync::{LazyLock, OnceLock},
thread, thread,
time::Duration,
}; };
use toml::value::Table; use toml::value::Table;
use tracing::Level; use tracing::Level;
@ -86,16 +90,16 @@ pub trait Application {
LazyLock::force(&PROJECT_DIR) LazyLock::force(&PROJECT_DIR)
} }
/// Initializes the tracing subscriber. /// Initializes the application.
fn init_tracing_subscriber() { fn init() {
if TRACING_APPENDER_GUARD.get().is_some() { if TRACING_APPENDER_GUARD.get().is_some() {
tracing::warn!("the tracing subscriber has already been initialized"); tracing::warn!("the tracing subscriber has already been initialized");
return; return;
} }
let app_env = Self::env(); let app_env = Self::env();
let is_dev = app_env == "dev"; let mut log_dir = "./log";
let mut env_filter = if is_dev { let mut env_filter = if app_env == "dev" {
"info,sqlx=trace,zino=trace,zino_core=trace" "info,sqlx=trace,zino=trace,zino_core=trace"
} else { } else {
"info,sqlx=warn" "info,sqlx=warn"
@ -109,6 +113,9 @@ pub trait Application {
let config = Self::config(); let config = Self::config();
if let Some(tracing) = config.get("tracing").and_then(|t| t.as_table()) { if let Some(tracing) = config.get("tracing").and_then(|t| t.as_table()) {
if let Some(dir) = tracing.get("log-dir").and_then(|t| t.as_str()) {
log_dir = dir;
}
if let Some(filter) = tracing.get("filter").and_then(|t| t.as_str()) { if let Some(filter) = tracing.get("filter").and_then(|t| t.as_str()) {
env_filter = filter; env_filter = filter;
} }
@ -134,7 +141,23 @@ pub trait Application {
.unwrap_or(false); .unwrap_or(false);
} }
let subscriber = tracing_subscriber::fmt() let app_name = Self::name();
let log_dir = Path::new(log_dir);
let rolling_file_dir = if log_dir.exists() {
log_dir.to_path_buf()
} else {
let project_dir = Self::project_dir();
let log_dir = project_dir.join("./log");
if log_dir.exists() {
log_dir
} else {
project_dir.join("../log")
}
};
let file_appender = rolling::hourly(rolling_file_dir, format!("{app_name}.{app_env}"));
let (non_blocking_appender, worker_guard) = tracing_appender::non_blocking(file_appender);
let stderr = io::stderr.with_max_level(Level::WARN);
tracing_subscriber::fmt()
.json() .json()
.with_env_filter(env_filter) .with_env_filter(env_filter)
.with_target(display_target) .with_target(display_target)
@ -143,29 +166,82 @@ pub trait Application {
.with_thread_names(display_thread_names) .with_thread_names(display_thread_names)
.with_span_list(display_span_list) .with_span_list(display_span_list)
.with_current_span(display_current_span) .with_current_span(display_current_span)
.with_timer(time::LocalTime::rfc_3339()); .with_timer(time::LocalTime::rfc_3339())
.with_writer(stderr.and(non_blocking_appender))
let app_name = Self::name(); .init();
let project_dir = Self::project_dir();
let log_dir = project_dir.join("./log");
let rolling_file_dir = if log_dir.exists() {
log_dir
} else {
project_dir.join("../log")
};
let file_appender = rolling::hourly(rolling_file_dir, format!("{app_name}.{app_env}"));
let (non_blocking_appender, worker_guard) = tracing_appender::non_blocking(file_appender);
if is_dev {
let stdout = io::stdout.with_max_level(Level::WARN);
subscriber
.with_writer(stdout.and(non_blocking_appender))
.init();
} else {
subscriber.with_writer(non_blocking_appender).init();
}
TRACING_APPENDER_GUARD TRACING_APPENDER_GUARD
.set(worker_guard) .set(worker_guard)
.expect("fail to set the worker guard for the tracing appender"); .expect("failed to set the worker guard for the tracing appender");
if let Some(metrics) = config.get("metrics").and_then(|t| t.as_table()) {
let exporter = metrics
.get("exporter")
.and_then(|t| t.as_str())
.unwrap_or_default();
if exporter == "prometheus" {
let mut builder = match metrics.get("push-gateway").and_then(|t| t.as_str()) {
Some(endpoint) => {
let interval = metrics
.get("interval")
.and_then(|t| t.as_integer().and_then(|i| i.try_into().ok()))
.unwrap_or(60);
PrometheusBuilder::new()
.with_push_gateway(endpoint, Duration::from_secs(interval))
.expect("failed to configure the exporter to run in push gateway mode")
}
None => {
let host = config
.get("host")
.and_then(|t| t.as_str())
.unwrap_or("localhost");
let port = config
.get("port")
.and_then(|t| t.as_integer())
.and_then(|t| u16::try_from(t).ok())
.unwrap_or(9000);
let host_addr = host
.parse::<IpAddr>()
.unwrap_or_else(|_| panic!("invalid host address: {host}"));
PrometheusBuilder::new().with_http_listener((host_addr, port))
}
};
if let Some(quantiles) = config.get("quantiles").and_then(|t| t.as_array()) {
let quantiles = quantiles
.iter()
.filter_map(|q| q.as_float())
.collect::<Vec<_>>();
builder = builder
.set_quantiles(&quantiles)
.expect("the quantiles to use when rendering histograms are incorrect");
}
builder
.install()
.expect("failed to install Prometheus exporter");
} else if exporter == "tcp" {
let host = config
.get("host")
.and_then(|t| t.as_str())
.unwrap_or("localhost");
let port = config
.get("port")
.and_then(|t| t.as_integer())
.and_then(|t| u16::try_from(t).ok())
.unwrap_or(9000);
let buffer_size = config
.get("buffer_size")
.and_then(|t| t.as_integer())
.and_then(|t| usize::try_from(t).ok())
.unwrap_or(1024);
let host_addr = host
.parse::<IpAddr>()
.unwrap_or_else(|_| panic!("invalid host address: {host}"));
TcpBuilder::new()
.listen_address((host_addr, port))
.buffer_size(Some(buffer_size))
.install()
.expect("failed to install TCP exporter");
}
}
} }
} }

View File

@ -68,7 +68,7 @@ impl SecurityToken {
match base64::decode(&token) { match base64::decode(&token) {
Ok(data) => { Ok(data) => {
let authorization = crypto::decrypt(key, &data) let authorization = crypto::decrypt(key, &data)
.map_err(|_| DecodeError("fail to decrypt authorization".to_string()))?; .map_err(|_| DecodeError("failed to decrypt authorization".to_string()))?;
if let Some((assignee_id, timestamp)) = authorization.split_once(':') { if let Some((assignee_id, timestamp)) = authorization.split_once(':') {
match timestamp.parse() { match timestamp.parse() {
Ok(secs) => { Ok(secs) => {
@ -76,7 +76,7 @@ impl SecurityToken {
let expires = DateTime::from_timestamp(secs); let expires = DateTime::from_timestamp(secs);
let grantor_id = crypto::decrypt(key, assignee_id.as_ref()) let grantor_id = crypto::decrypt(key, assignee_id.as_ref())
.map_err(|_| { .map_err(|_| {
DecodeError("fail to decrypt grantor id".to_string()) DecodeError("failed to decrypt grantor id".to_string())
})?; })?;
Ok(Self { Ok(Self {
grantor_id: grantor_id.into(), grantor_id: grantor_id.into(),

View File

@ -141,5 +141,7 @@ static GLOBAL_CACHE: LazyLock<RwLock<LruCache<String, Value>>> = LazyLock::new(|
.expect("the `cache.capacity` field should be a positive integer"), .expect("the `cache.capacity` field should be a positive integer"),
None => 10000, None => 10000,
}; };
RwLock::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())) RwLock::new(LruCache::new(
NonZeroUsize::new(capacity).unwrap_or(NonZeroUsize::MIN),
))
}); });

View File

@ -5,6 +5,7 @@
#![feature(async_fn_in_trait)] #![feature(async_fn_in_trait)]
#![feature(iter_intersperse)] #![feature(iter_intersperse)]
#![feature(let_chains)] #![feature(let_chains)]
#![feature(nonzero_min_max)]
#![feature(once_cell)] #![feature(once_cell)]
#![feature(string_leak)] #![feature(string_leak)]
#![feature(type_alias_impl_trait)] #![feature(type_alias_impl_trait)]
@ -40,5 +41,11 @@ pub type Map = serde_json::Map<String, serde_json::Value>;
/// A UUID is a unique 128-bit number, stored as 16 octets. /// A UUID is a unique 128-bit number, stored as 16 octets.
pub type Uuid = uuid::Uuid; pub type Uuid = uuid::Uuid;
/// An owned dynamically typed Future. /// An allocation-optimized string.
pub type SharedString = std::borrow::Cow<'static, str>;
/// A type-erased error type.
pub type BoxError = Box<dyn std::error::Error + Sync + Send + 'static>;
/// An owned dynamically typed future.
pub type BoxFuture<'a, T = ()> = futures::future::BoxFuture<'a, T>; pub type BoxFuture<'a, T = ()> = futures::future::BoxFuture<'a, T>;

View File

@ -1,4 +1,4 @@
use crate::{RequestContext, Uuid, Validation}; use crate::{RequestContext, SharedString, Uuid, Validation};
use bytes::Bytes; use bytes::Bytes;
use http::{ use http::{
self, self,
@ -9,6 +9,7 @@ use http_types::trace::{Metric, ServerTiming, TraceContext};
use serde::Serialize; use serde::Serialize;
use serde_json::Value; use serde_json::Value;
use std::{ use std::{
borrow::{Borrow, Cow},
marker::PhantomData, marker::PhantomData,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -28,22 +29,22 @@ pub trait ResponseCode {
fn status_code(&self) -> u16; fn status_code(&self) -> u16;
/// Error code. /// Error code.
fn error_code(&self) -> Option<String>; fn error_code(&self) -> Option<SharedString>;
/// Returns `true` if the response is successful. /// Returns `true` if the response is successful.
fn is_success(&self) -> bool; fn is_success(&self) -> bool;
/// A URI reference that identifies the problem type. /// A URI reference that identifies the problem type.
/// For successful response, it should be `None`. /// For successful response, it should be `None`.
fn type_uri(&self) -> Option<String>; fn type_uri(&self) -> Option<SharedString>;
/// A short, human-readable summary of the problem type. /// A short, human-readable summary of the problem type.
/// For successful response, it should be `None`. /// For successful response, it should be `None`.
fn title(&self) -> Option<String>; fn title(&self) -> Option<SharedString>;
/// A context-specific descriptive message. If the response is not successful, /// A context-specific descriptive message. If the response is not successful,
/// it should be a human-readable explanation specific to this occurrence of the problem. /// it should be a human-readable explanation specific to this occurrence of the problem.
fn message(&self) -> Option<String>; fn message(&self) -> Option<SharedString>;
} }
/// An HTTP response. /// An HTTP response.
@ -53,28 +54,28 @@ pub struct Response<S> {
/// A URI reference that identifies the problem type. /// A URI reference that identifies the problem type.
#[serde(rename = "type")] #[serde(rename = "type")]
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
type_uri: Option<String>, type_uri: Option<SharedString>,
/// A short, human-readable summary of the problem type. /// A short, human-readable summary of the problem type.
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
title: Option<String>, title: Option<SharedString>,
/// Status code. /// Status code.
#[serde(rename = "status")] #[serde(rename = "status")]
status_code: u16, status_code: u16,
/// Error code. /// Error code.
#[serde(rename = "error")] #[serde(rename = "error")]
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
error_code: Option<String>, error_code: Option<SharedString>,
/// A human-readable explanation specific to this occurrence of the problem. /// A human-readable explanation specific to this occurrence of the problem.
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
detail: Option<String>, detail: Option<SharedString>,
/// A URI reference that identifies the specific occurrence of the problem. /// A URI reference that identifies the specific occurrence of the problem.
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
instance: Option<String>, instance: Option<SharedString>,
/// Indicates the response is successful or not. /// Indicates the response is successful or not.
success: bool, success: bool,
/// A context-specific descriptive message for successful response. /// A context-specific descriptive message for successful response.
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
message: Option<String>, message: Option<SharedString>,
/// Start time. /// Start time.
#[serde(skip)] #[serde(skip)]
start_time: Instant, start_time: Instant,
@ -86,7 +87,7 @@ pub struct Response<S> {
data: Value, data: Value,
/// Content type. /// Content type.
#[serde(skip)] #[serde(skip)]
content_type: Option<String>, content_type: Option<SharedString>,
/// Trace context. /// Trace context.
#[serde(skip)] #[serde(skip)]
trace_context: Option<TraceContext>, trace_context: Option<TraceContext>,
@ -138,7 +139,7 @@ impl<S: ResponseCode> Response<S> {
status_code: code.status_code(), status_code: code.status_code(),
error_code: code.error_code(), error_code: code.error_code(),
detail: None, detail: None,
instance: (!success).then(|| ctx.request_path().to_string()), instance: (!success).then(|| ctx.request_path().to_string().into()),
success, success,
message: None, message: None,
start_time: ctx.start_time(), start_time: ctx.start_time(),
@ -178,7 +179,7 @@ impl<S: ResponseCode> Response<S> {
/// Sets the request context. /// Sets the request context.
pub fn set_context<T: RequestContext>(&mut self, ctx: &T) { pub fn set_context<T: RequestContext>(&mut self, ctx: &T) {
self.instance = (!self.is_success()).then(|| ctx.request_path().to_string()); self.instance = (!self.is_success()).then(|| ctx.request_path().to_string().into());
self.start_time = ctx.start_time(); self.start_time = ctx.start_time();
self.request_id = ctx.request_id(); self.request_id = ctx.request_id();
self.trace_context = ctx.trace_context().map(|t| t.child()); self.trace_context = ctx.trace_context().map(|t| t.child());
@ -191,13 +192,13 @@ impl<S: ResponseCode> Response<S> {
} }
/// Sets a URI reference that identifies the specific occurrence of the problem. /// Sets a URI reference that identifies the specific occurrence of the problem.
pub fn set_instance(&mut self, instance: impl Into<Option<String>>) { pub fn set_instance(&mut self, instance: impl Into<Option<SharedString>>) {
self.instance = instance.into(); self.instance = instance.into();
} }
/// Sets the message. If the response is not successful, /// Sets the message. If the response is not successful,
/// it should be a human-readable explanation specific to this occurrence of the problem. /// it should be a human-readable explanation specific to this occurrence of the problem.
pub fn set_message(&mut self, message: impl Into<String>) { pub fn set_message(&mut self, message: impl Into<SharedString>) {
if self.is_success() { if self.is_success() {
self.detail = None; self.detail = None;
self.message = Some(message.into()); self.message = Some(message.into());
@ -227,7 +228,7 @@ impl<S: ResponseCode> Response<S> {
/// Sets the content type. /// Sets the content type.
#[inline] #[inline]
pub fn set_content_type(&mut self, content_type: impl Into<String>) { pub fn set_content_type(&mut self, content_type: impl Into<SharedString>) {
self.content_type = Some(content_type.into()); self.content_type = Some(content_type.into());
} }
@ -280,7 +281,7 @@ impl ResponseCode for http::StatusCode {
} }
#[inline] #[inline]
fn error_code(&self) -> Option<String> { fn error_code(&self) -> Option<SharedString> {
None None
} }
@ -290,23 +291,23 @@ impl ResponseCode for http::StatusCode {
} }
#[inline] #[inline]
fn type_uri(&self) -> Option<String> { fn type_uri(&self) -> Option<SharedString> {
None None
} }
#[inline] #[inline]
fn title(&self) -> Option<String> { fn title(&self) -> Option<SharedString> {
if self.is_success() { if self.is_success() {
None None
} else { } else {
self.canonical_reason().map(|s| s.to_string()) self.canonical_reason().map(Cow::Borrowed)
} }
} }
#[inline] #[inline]
fn message(&self) -> Option<String> { fn message(&self) -> Option<SharedString> {
if self.is_success() { if self.is_success() {
self.canonical_reason().map(|s| s.to_string()) self.canonical_reason().map(Cow::Borrowed)
} else { } else {
None None
} }
@ -340,7 +341,7 @@ impl From<Response<http::StatusCode>> for http::Response<Full<Bytes>> {
.status(response.status_code) .status(response.status_code)
.header( .header(
header::CONTENT_TYPE, header::CONTENT_TYPE,
HeaderValue::from_str(content_type.as_str()).unwrap(), HeaderValue::from_str(content_type.borrow()).unwrap(),
) )
.body(Full::from(bytes)) .body(Full::from(bytes))
.unwrap_or_default(), .unwrap_or_default(),
@ -405,7 +406,7 @@ impl ResponseCode for http_types::StatusCode {
} }
#[inline] #[inline]
fn error_code(&self) -> Option<String> { fn error_code(&self) -> Option<SharedString> {
None None
} }
@ -415,19 +416,18 @@ impl ResponseCode for http_types::StatusCode {
} }
#[inline] #[inline]
fn type_uri(&self) -> Option<String> { fn type_uri(&self) -> Option<SharedString> {
None None
} }
#[inline] #[inline]
fn title(&self) -> Option<String> { fn title(&self) -> Option<SharedString> {
(!self.is_success()).then(|| self.canonical_reason().to_string()) (!self.is_success()).then(|| self.canonical_reason().into())
} }
#[inline] #[inline]
fn message(&self) -> Option<String> { fn message(&self) -> Option<SharedString> {
self.is_success() self.is_success().then(|| self.canonical_reason().into())
.then(|| self.canonical_reason().to_string())
} }
} }

View File

@ -1,12 +1,9 @@
use crate::{Response, Validation}; use crate::{BoxError, Response, Validation};
use bytes::Bytes; use bytes::Bytes;
use http_body::Full; use http_body::Full;
use std::{error, fmt}; use std::{error, fmt};
use Rejection::*; use Rejection::*;
/// A type-erased error type.
type BoxError = Box<dyn std::error::Error + Send + Sync>;
/// A rejection response type. /// A rejection response type.
#[derive(Debug)] #[derive(Debug)]
#[non_exhaustive] #[non_exhaustive]

View File

@ -34,9 +34,9 @@ impl State {
project_dir.join(format!("../config/config.{}.toml", self.env)) project_dir.join(format!("../config/config.{}.toml", self.env))
}; };
let config: Value = fs::read_to_string(&path) let config: Value = fs::read_to_string(&path)
.unwrap_or_else(|_| panic!("fail to read config file `{:#?}`", &path)) .unwrap_or_else(|_| panic!("failed to read config file `{:#?}`", &path))
.parse() .parse()
.expect("fail to parse toml value"); .expect("failed to parse toml value");
match config { match config {
Value::Table(table) => self.config = table, Value::Table(table) => self.config = table,
_ => panic!("toml config file should be a table"), _ => panic!("toml config file should be a table"),

View File

@ -34,7 +34,7 @@ impl Application for AxumCluster {
/// Creates a new application. /// Creates a new application.
fn new() -> Self { fn new() -> Self {
Self::init_tracing_subscriber(); Self::init();
Self { Self {
routes: HashMap::new(), routes: HashMap::new(),
} }
@ -64,7 +64,7 @@ impl Application for AxumCluster {
.global_queue_interval(61) .global_queue_interval(61)
.enable_all() .enable_all()
.build() .build()
.expect("fail to build Tokio runtime with the multi thread scheduler selected"); .expect("failed to build Tokio runtime with the multi thread scheduler selected");
let mut scheduler = JobScheduler::new(); let mut scheduler = JobScheduler::new();
for (cron_expr, exec) in async_jobs { for (cron_expr, exec) in async_jobs {
scheduler.add(Job::new_async(cron_expr, exec)); scheduler.add(Job::new_async(cron_expr, exec));