Add system monitor

This commit is contained in:
photino 2023-01-22 23:41:11 +08:00
parent df5cd2ad81
commit 3be48f685d
25 changed files with 398 additions and 133 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "axum-app"
version = "0.4.4"
version = "0.4.5"
rust-version = "1.68"
edition = "2021"
publish = false
@ -12,9 +12,9 @@ tracing = "0.1.37"
[dependencies.zino]
path = "../../zino"
version = "0.4.4"
version = "0.5.0"
features = ["axum"]
[dependencies.zino-model]
path = "../../zino-model"
version = "0.3.6"
version = "0.3.7"

View File

@ -1,6 +1,6 @@
name = "data-cube"
version = "0.4.4"
version = "0.4.5"
[main]
host = "127.0.0.1"

View File

@ -1,6 +1,6 @@
name = "data-cube"
version = "0.4.4"
version = "0.4.5"
[main]
host = "127.0.0.1"

View File

@ -1,12 +1,13 @@
use serde_json::json;
use zino::{Request, RequestContext, Response};
use zino::{Application, AxumCluster, Request, RequestContext, Response};
pub(crate) async fn index(req: Request) -> zino::Result {
let mut res = Response::default().provide_context(&req);
res.set_data(json!({
"method": "GET",
"path": "/stats",
"config": req.config(),
"app_state_data": AxumCluster::state_data(),
"app_sysinfo": AxumCluster::sysinfo(),
}));
Ok(res.into())
}

View File

@ -1,9 +1,6 @@
use serde_json::json;
use std::time::Instant;
use zino::{
Application, AxumCluster, Model, Query, Rejection, Request, RequestContext, Response, Schema,
Uuid,
};
use zino::{Model, Query, Rejection, Request, RequestContext, Response, Schema, Uuid};
use zino_model::User;
pub(crate) async fn new(mut req: Request) -> zino::Result {
@ -30,11 +27,7 @@ pub(crate) async fn update(mut req: Request) -> zino::Result {
pub(crate) async fn list(req: Request) -> zino::Result {
let mut query = Query::new();
let mut res = req.query_validation(&mut query)?;
let db_query_start_time = Instant::now();
let users = User::find(query).await.map_err(Rejection::from)?;
res.record_server_timing("db", None, db_query_start_time.elapsed());
let data = json!({
"users": users,
});
@ -54,22 +47,12 @@ pub(crate) async fn view(mut req: Request) -> zino::Result {
let event = req.cloud_event("message", message);
req.try_send(event)?;
let db_query_start_time = Instant::now();
let user = User::find_one(query).await.map_err(Rejection::from)?;
req.fetch("http://localhost:6081/stats", None).await.map_err(Rejection::from)?;
req.fetch("http://localhost:6081/user/list", None).await.map_err(Rejection::from)?;
let state_data = req.state_data_mut();
let counter = state_data
.get("counter")
.map(|c| c.as_u64().unwrap_or_default() + 1)
.unwrap_or_default();
state_data.insert("counter".to_owned(), counter.into());
res.record_server_timing("db", None, db_query_start_time.elapsed());
let data = json!({
"user": user,
"app_state_data": AxumCluster::state_data(),
"state_data": state_data,
"config": req.config(),
});
res.set_data(data);
Ok(res.into())

View File

@ -1,7 +1,7 @@
[package]
name = "zino-core"
description = "Core types and traits for zino."
version = "0.4.4"
version = "0.5.0"
rust-version = "1.68"
edition = "2021"
license = "MIT"
@ -12,10 +12,16 @@ repository = "https://github.com/photino/zino"
documentation = "https://docs.rs/zino-core"
readme = "README.md"
[package.metadata.docs.rs]
all-features = true
[features]
accessor = ["dep:backon", "dep:opendal"]
cache = ["dep:lru"]
[dependencies]
aes-gcm-siv = "0.11.1"
async-trait = "0.1.60"
backon = "0.2.0"
base64 = "0.21.0"
bytes = "1.3.0"
cron = "0.12.0"
@ -24,7 +30,6 @@ hmac = "0.12.1"
http = "0.2.8"
http-body = "0.4.5"
hyper = "0.14.23"
lru = "0.9.0"
metrics = "0.20.1"
metrics-exporter-prometheus = "0.11.0"
metrics-exporter-tcp = "0.7.0"
@ -36,16 +41,25 @@ reqwest-tracing = "0.4.0"
serde_json = "1.0.91"
serde_qs = "0.11.0"
serde_urlencoded = "0.7.1"
sysinfo = "0.27.7"
task-local-extensions = "0.1.3"
toml = "0.5.10"
tracing = "0.1.37"
tracing-appender = "0.2.2"
url = "2.3.1"
[dependencies.backon]
version = "0.2.0"
optional = true
[dependencies.chrono]
version = "0.4.23"
features = ["serde"]
[dependencies.lru]
version = "0.9.0"
optional = true
[dependencies.opendal]
version = "0.25.0"
features = [
@ -58,6 +72,7 @@ features = [
"services-moka",
"services-redis",
]
optional = true
[dependencies.serde]
version = "1.0.152"

View File

@ -1,15 +1,16 @@
//! Unified data access to databases and storage backends.
//! Unified data access to different storage services.
use crate::state::State;
use backon::ExponentialBackoff;
use opendal::{
layers::{MetricsLayer, RetryLayer, TracingLayer},
services::{
azblob, azdfs, fs, ftp, gcs, ipfs, ipmfs, memcached, memory, moka, obs, oss, redis, s3,
azblob, azdfs, fs, ftp, gcs, ghac, ipfs, ipmfs, memcached, memory, moka, obs, oss, redis,
s3,
},
Error,
ErrorKind::{Unexpected, Unsupported},
Operator, Result,
Operator,
};
use std::time::Duration;
@ -18,22 +19,20 @@ use std::time::Duration;
pub struct StorageAccessor {}
impl StorageAccessor {
/// Creates a new operator for the specific storage backend.
pub fn new_operator(scheme: &'static str, name: Option<&'static str>) -> Result<Operator> {
/// Creates a new operator for the specific storage service.
pub fn new_operator(
scheme: &'static str,
name: Option<&'static str>,
) -> Result<Operator, Error> {
let config = State::shared().config();
let operator = if scheme == "memory" {
let mut builder = memory::Builder::default();
Ok(Operator::new(builder.build()?))
} else if let Some(accessors) = config.get("accessor").and_then(|v| v.as_array()) {
if let Some(accessor) = accessors
.iter()
.filter_map(|v| v.as_table())
.filter(|t| {
if let Some(accessor) = accessors.iter().filter_map(|v| v.as_table()).find(|t| {
t.get("scheme").and_then(|v| v.as_str()).contains(&scheme)
&& t.get("name").and_then(|v| v.as_str()) == name
})
.next()
{
}) {
match scheme {
"azblob" => {
let mut builder = azblob::Builder::default();
@ -144,6 +143,16 @@ impl StorageAccessor {
}
Ok(Operator::new(builder.build()?))
}
"ghac" => {
let mut builder = ghac::Builder::default();
if let Some(root) = accessor.get("root").and_then(|v| v.as_str()) {
builder.root(root);
}
if let Some(version) = accessor.get("version").and_then(|v| v.as_str()) {
builder.version(version);
}
Ok(Operator::new(builder.build()?))
}
"ipfs" => {
let mut builder = ipfs::Builder::default();
if let Some(root) = accessor.get("root").and_then(|v| v.as_str()) {
@ -336,7 +345,7 @@ impl StorageAccessor {
_ => Err(Error::new(Unsupported, "scheme is unsupported")),
}
} else {
Err(Error::new(Unexpected, "failed to find the storage backend"))
Err(Error::new(Unexpected, "failed to find the storage service"))
}
} else if name.is_none() {
scheme.parse().and_then(Operator::from_env)

View File

@ -1,7 +1,7 @@
use crate::{application::Application, trace::TraceContext, BoxError, Map, Uuid};
use reqwest::{
header::{self, HeaderMap, HeaderName},
Client, IntoUrl, Method, Request, Response, Url,
Certificate, Client, IntoUrl, Method, Request, Response, Url,
};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware, Error, RequestBuilder};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
@ -9,6 +9,7 @@ use reqwest_tracing::{ReqwestOtelSpanBackend, TracingMiddleware};
use serde_json::Value;
use std::{
borrow::Cow,
fs,
net::IpAddr,
str::FromStr,
sync::OnceLock,
@ -34,6 +35,20 @@ pub(super) fn init<APP: Application + ?Sized>() {
{
client_builder = client_builder.timeout(Duration::from_secs(timeout));
}
if let Some(timeout) = http_client
.get("pool-idle-timeout")
.and_then(|v| v.as_integer())
.and_then(|i| u64::try_from(i).ok())
{
client_builder = client_builder.pool_idle_timeout(Duration::from_secs(timeout));
}
if let Some(max_idle_per_host) = http_client
.get("pool-max-idle-per-host")
.and_then(|v| v.as_integer())
.and_then(|i| usize::try_from(i).ok())
{
client_builder = client_builder.pool_max_idle_per_host(max_idle_per_host);
}
if let Some(addr) = http_client
.get("local-address")
.and_then(|v| v.as_str())
@ -41,6 +56,37 @@ pub(super) fn init<APP: Application + ?Sized>() {
{
client_builder = client_builder.local_address(addr);
}
if let Some(tcp_keepalive) = http_client
.get("tcp-keepalive")
.and_then(|v| v.as_integer())
.and_then(|i| u64::try_from(i).ok())
{
client_builder = client_builder.tcp_keepalive(Duration::from_secs(tcp_keepalive));
}
if let Some(root_certs) = http_client.get("root-certs").and_then(|v| v.as_array()) {
for root_cert in root_certs.iter().filter_map(|cert| cert.as_str()) {
match fs::read(root_cert) {
Ok(bytes) => {
if root_cert.ends_with(".der") {
match Certificate::from_der(&bytes) {
Ok(cert) => {
client_builder = client_builder.add_root_certificate(cert);
}
Err(err) => panic!("failed to read a DER encoded cert: {err}"),
}
} else if root_cert.ends_with(".pem") {
match Certificate::from_pem(&bytes) {
Ok(cert) => {
client_builder = client_builder.add_root_certificate(cert);
}
Err(err) => panic!("failed to read a PEM encoded cert: {err}"),
}
}
}
Err(err) => panic!("failed to read cert file: {err}"),
}
}
}
}
let reqwest_client = client_builder

View File

@ -14,7 +14,7 @@ pub(super) fn init<APP: Application + ?Sized>() {
Some(endpoint) => {
let interval = metrics
.get("interval")
.and_then(|v| v.as_integer().and_then(|i| i.try_into().ok()))
.and_then(|v| v.as_integer().and_then(|i| u64::try_from(i).ok()))
.unwrap_or(60);
PrometheusBuilder::new()
.with_push_gateway(endpoint, Duration::from_secs(interval))

View File

@ -11,6 +11,7 @@ use std::{collections::HashMap, env, path::PathBuf, sync::LazyLock, thread};
use toml::value::Table;
mod metrics_exporter;
mod system_monitor;
mod tracing_subscriber;
pub(crate) mod http_client;
@ -32,39 +33,17 @@ pub trait Application {
/// Runs the application.
fn run(self, async_jobs: HashMap<&'static str, AsyncCronJob>);
/// Spawns a new thread to run cron jobs.
fn spawn(self, jobs: HashMap<&'static str, CronJob>) -> Self
where
Self: Sized,
{
let mut scheduler = JobScheduler::new();
for (cron_expr, exec) in jobs {
scheduler.add(Job::new(cron_expr, exec));
}
thread::spawn(move || loop {
scheduler.tick();
thread::sleep(scheduler.time_till_next_job());
});
self
/// Initializes the application. It setups the tracing subscriber, the metrics exporter
/// and a global HTTP client.
fn init() {
tracing_subscriber::init::<Self>();
metrics_exporter::init::<Self>();
http_client::init::<Self>();
}
/// Makes an HTTP request to the provided resource
/// using [`reqwest`](https://crates.io/crates/reqwest).
async fn fetch(
resource: impl IntoUrl,
options: impl Into<Option<Map>>,
) -> Result<Response, BoxError> {
let mut trace_context = TraceContext::new();
let span_id = trace_context.span_id();
trace_context
.trace_state_mut()
.push("zino", format!("{span_id:x}"));
http_client::request_builder(resource, options)?
.header("traceparent", trace_context.traceparent())
.header("tracestate", trace_context.tracestate())
.send()
.await
.map_err(BoxError::from)
/// Gets the systems information.
fn sysinfo() -> Map {
system_monitor::refresh_and_retrieve()
}
/// Returns the application env.
@ -109,12 +88,39 @@ pub trait Application {
LazyLock::force(&PROJECT_DIR)
}
/// Initializes the application. It setups the tracing subscriber, the metrics exporter
/// and a global HTTP client.
fn init() {
tracing_subscriber::init::<Self>();
metrics_exporter::init::<Self>();
http_client::init::<Self>();
/// Spawns a new thread to run cron jobs.
fn spawn(self, jobs: HashMap<&'static str, CronJob>) -> Self
where
Self: Sized,
{
let mut scheduler = JobScheduler::new();
for (cron_expr, exec) in jobs {
scheduler.add(Job::new(cron_expr, exec));
}
thread::spawn(move || loop {
scheduler.tick();
thread::sleep(scheduler.time_till_next_job());
});
self
}
/// Makes an HTTP request to the provided resource
/// using [`reqwest`](https://crates.io/crates/reqwest).
async fn fetch(
resource: impl IntoUrl,
options: impl Into<Option<Map>>,
) -> Result<Response, BoxError> {
let mut trace_context = TraceContext::new();
let span_id = trace_context.span_id();
trace_context
.trace_state_mut()
.push("zino", format!("{span_id:x}"));
http_client::request_builder(resource, options)?
.header("traceparent", trace_context.traceparent())
.header("tracestate", trace_context.tracestate())
.send()
.await
.map_err(BoxError::from)
}
}

View File

@ -0,0 +1,169 @@
use crate::{datetime::DateTime, Map};
use parking_lot::RwLock;
use std::sync::LazyLock;
use sysinfo::{DiskExt, NetworkExt, NetworksExt, System, SystemExt};
/// Refreshes the system and retrieves the information.
pub(super) fn refresh_and_retrieve() -> Map {
// Refreshes the system first.
refresh_system();
// Reads the system.
let sys = GLOBAL_MONITOR.read();
let mut map = SYSTEM_INFO.clone();
// Retrieves OS information.
map.insert("os.uptime".to_owned(), sys.uptime().into());
// Retrieves the system load average value.
if sys
.name()
.is_some_and(|sys_name| !sys_name.eq_ignore_ascii_case("windows"))
{
let load_avg = sys.load_average();
let load_avg_values = vec![load_avg.one, load_avg.five, load_avg.fifteen];
map.insert("os.load_average".to_owned(), load_avg_values.into());
}
// Retrieves RAM and SWAP usage.
map.insert("mem.free_memory".to_owned(), sys.free_memory().into());
map.insert(
"mem.available_memory".to_owned(),
sys.available_memory().into(),
);
map.insert("mem.used_memory".to_owned(), sys.used_memory().into());
map.insert("mem.free_swap".to_owned(), sys.free_swap().into());
map.insert("mem.used_swap".to_owned(), sys.used_swap().into());
// Retrieves the disks list.
map.insert(
"disk.available_space".to_owned(),
sys.disks()
.iter()
.fold(0, |sum, disk| sum + disk.available_space())
.into(),
);
// Retrieves the networks list.
let mut network_received = 0;
let mut network_total_received = 0;
let mut network_transmitted = 0;
let mut network_total_transmitted = 0;
let mut network_packets_received = 0;
let mut network_total_packets_received = 0;
let mut network_packets_transmitted = 0;
let mut network_total_packets_transmitted = 0;
let mut network_errors_on_received = 0;
let mut network_total_errors_on_received = 0;
let mut network_errors_on_transmitted = 0;
let mut network_total_errors_on_transmitted = 0;
for (_name, network) in sys.networks() {
network_received += network.received();
network_total_received += network.total_received();
network_transmitted += network.transmitted();
network_total_transmitted += network.total_transmitted();
network_packets_received += network.packets_received();
network_total_packets_received += network.total_packets_received();
network_packets_transmitted += network.packets_transmitted();
network_total_packets_transmitted += network.total_packets_transmitted();
network_errors_on_received += network.errors_on_received();
network_total_errors_on_received += network.total_errors_on_received();
network_errors_on_transmitted += network.errors_on_transmitted();
network_total_errors_on_transmitted += network.total_errors_on_transmitted();
}
map.insert("net.received".to_owned(), network_received.into());
map.insert(
"net.total_received".to_owned(),
network_total_received.into(),
);
map.insert("net.transmitted".to_owned(), network_transmitted.into());
map.insert(
"net.total_transmitted".to_owned(),
network_total_transmitted.into(),
);
map.insert(
"net.packets_received".to_owned(),
network_packets_received.into(),
);
map.insert(
"net.total_packets_received".to_owned(),
network_total_packets_received.into(),
);
map.insert(
"net.packets_transmitted".to_owned(),
network_packets_transmitted.into(),
);
map.insert(
"net.total_packets_transmitted".to_owned(),
network_total_packets_transmitted.into(),
);
map.insert(
"net.errors_on_received".to_owned(),
network_errors_on_received.into(),
);
map.insert(
"net.total_errors_on_received".to_owned(),
network_total_errors_on_received.into(),
);
map.insert(
"net.errors_on_transmitted".to_owned(),
network_errors_on_transmitted.into(),
);
map.insert(
"net.total_errors_on_transmitted".to_owned(),
network_total_errors_on_transmitted.into(),
);
map
}
/// Refreshes the system.
fn refresh_system() {
let mut sys = GLOBAL_MONITOR.write();
sys.refresh_cpu();
sys.refresh_memory();
sys.refresh_disks_list();
sys.networks_mut().refresh_networks_list();
}
/// Static system information.
static SYSTEM_INFO: LazyLock<Map> = LazyLock::new(|| {
let mut map = Map::new();
let mut sys = System::new();
sys.refresh_cpu();
sys.refresh_memory();
sys.refresh_disks_list();
// Retrieves OS information.
map.insert("os.name".to_owned(), sys.name().into());
map.insert("os.version".to_owned(), sys.os_version().into());
if let Ok(boot_time) = i64::try_from(sys.boot_time()) {
let booted_at = DateTime::from_timestamp(boot_time);
map.insert("os.booted_at".to_owned(), booted_at.to_string().into());
}
// Retrieves CPUs information.
map.insert("cpu.num_cpus".to_owned(), sys.cpus().len().into());
map.insert(
"cpu.physical_core_count".to_owned(),
sys.physical_core_count().into(),
);
// Retrieves RAM and SWAP information.
map.insert("mem.total_memory".to_owned(), sys.total_memory().into());
map.insert("mem.total_swap".to_owned(), sys.total_swap().into());
// Retrieves the disks list.
map.insert(
"disk.total_space".to_owned(),
sys.disks()
.iter()
.fold(0, |sum, disk| sum + disk.total_space())
.into(),
);
map
});
/// Global system monitor.
static GLOBAL_MONITOR: LazyLock<RwLock<System>> = LazyLock::new(|| RwLock::new(System::new()));

View File

@ -65,6 +65,7 @@ impl SecurityToken {
/// Encrypts the plaintext using AES-GCM-SIV.
pub fn encrypt(key: impl AsRef<[u8]>, plaintext: impl AsRef<[u8]>) -> Option<String> {
crypto::encrypt(key.as_ref(), plaintext.as_ref())
.inspect_err(|_| tracing::error!("failed to encrypt the plaintext"))
.ok()
.map(|bytes| STANDARD_NO_PAD.encode(bytes))
}
@ -73,8 +74,13 @@ impl SecurityToken {
pub fn decrypt(key: impl AsRef<[u8]>, data: impl AsRef<[u8]>) -> Option<String> {
STANDARD_NO_PAD
.decode(data)
.inspect_err(|_| tracing::error!("failed to encode the data with base64"))
.ok()
.and_then(|cipher| crypto::decrypt(key.as_ref(), &cipher).ok())
.and_then(|cipher| {
crypto::decrypt(key.as_ref(), &cipher)
.inspect_err(|_| tracing::error!("failed to decrypt the data"))
.ok()
})
}
/// Parses the token with the encryption key.

View File

@ -1,6 +1,6 @@
//! Connection pool and ORM.
use crate::{crypto, state::State, SharedString};
use crate::{crypto, state::State};
use base64::{engine::general_purpose::STANDARD_NO_PAD, Engine};
use sqlx::{
postgres::{PgConnectOptions, PgPoolOptions},
@ -27,7 +27,7 @@ pub struct ConnectionPool {
/// Name.
name: &'static str,
/// Database.
database: SharedString,
database: &'static str,
/// Pool.
pool: PgPool,
}
@ -52,6 +52,7 @@ impl ConnectionPool {
.expect("the `postgres.password` field should be a str");
let key = format!("{username}@{database}");
crypto::encrypt(key.as_bytes(), password.as_bytes())
.inspect_err(|_| tracing::error!("failed to encrypt the database password"))
.ok()
.map(|bytes| STANDARD_NO_PAD.encode(bytes))
}
@ -106,7 +107,7 @@ impl ConnectionPool {
.get_database()
.unwrap_or_default()
.to_owned()
.into();
.leak();
// Pool options.
let max_connections = config
@ -158,8 +159,8 @@ impl ConnectionPool {
/// Returns the database.
#[inline]
pub fn database(&self) -> &str {
self.database.as_ref()
pub fn database(&self) -> &'static str {
self.database
}
/// Returns a reference to the pool.

View File

@ -11,7 +11,8 @@ pub trait Model: Default + Serialize + DeserializeOwned {
#[must_use]
fn read_map(&mut self, data: Map) -> Validation;
/// Attempts to constructs a model from a json object.
/// Attempts to construct a model from a json object.
#[inline]
fn try_from_map(data: Map) -> Result<Self, Error> {
serde_json::from_value(Value::from(data))
}

View File

@ -13,7 +13,7 @@ use std::{
time::Duration,
};
/// A wrapper type for `chrono::DateTime<Local>`.
/// A wrapper type for [`chrono::DateTime<Local>`](chrono::DateTime).
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct DateTime(chrono::DateTime<Local>);
@ -80,6 +80,13 @@ impl DateTime {
let datetime = self.0.with_timezone(&Utc);
datetime.to_rfc3339_opts(SecondsFormat::Millis, true)
}
/// Formats the combined date and time with the specified format string.
/// See [`format::strftime`](chrono::format::strftime) for the supported escape sequences.
#[inline]
pub fn format(&self, fmt: &str) -> String {
format!("{}", self.0.format(fmt))
}
}
impl fmt::Display for DateTime {
@ -93,7 +100,7 @@ impl fmt::Display for DateTime {
}
impl Default for DateTime {
/// Returns an instance which corresponds to the current date.
/// Returns an instance which corresponds to **the current date**.
fn default() -> Self {
Self::now()
}

View File

@ -3,22 +3,29 @@
//! [`zino`]: https://github.com/photino/zino
#![feature(async_fn_in_trait)]
#![feature(doc_auto_cfg)]
#![feature(io_error_other)]
#![feature(is_some_and)]
#![feature(iter_intersperse)]
#![feature(let_chains)]
#![feature(nonzero_min_max)]
#![feature(once_cell)]
#![feature(option_result_contains)]
#![feature(result_option_inspect)]
#![feature(string_leak)]
#![feature(type_alias_impl_trait)]
#![forbid(unsafe_code)]
mod crypto;
#[cfg(feature = "accessor")]
pub mod accessor;
#[cfg(feature = "cache")]
pub mod cache;
pub mod application;
pub mod authentication;
pub mod cache;
pub mod channel;
pub mod database;
pub mod datetime;
@ -31,7 +38,7 @@ pub mod trace;
/// A JSON key/value type.
pub type Map = serde_json::Map<String, serde_json::Value>;
/// A UUID is a unique 128-bit number, stored as 16 octets.
/// A Universally Unique Identifier (UUID).
pub type Uuid = uuid::Uuid;
/// An allocation-optimized string.

View File

@ -3,7 +3,6 @@ use bytes::Bytes;
use http_body::Full;
use serde_json::Value;
use std::{
collections::HashMap,
net::{AddrParseError, IpAddr, Ipv4Addr, Ipv6Addr},
num::{ParseFloatError, ParseIntError},
str::{FromStr, ParseBoolError},
@ -14,14 +13,14 @@ use uuid::Uuid;
/// A record of validation results.
#[derive(Debug)]
pub struct Validation {
failed_entries: HashMap<SharedString, BoxError>,
failed_entries: Vec<(SharedString, BoxError)>,
}
impl Validation {
/// Creates a new validation record.
pub fn new() -> Self {
Self {
failed_entries: HashMap::new(),
failed_entries: Vec::new(),
}
}
@ -163,7 +162,7 @@ impl Validation {
/// Records a failed entry.
#[inline]
pub fn record_fail(&mut self, key: impl Into<SharedString>, err: impl Into<BoxError>) {
self.failed_entries.insert(key.into(), err.into());
self.failed_entries.push((key.into(), err.into()));
}
/// Consumes the validation and returns as a json object.

View File

@ -83,14 +83,31 @@ impl From<BoxError> for Rejection {
}
}
impl From<serde_json::Error> for Rejection {
/// Converts to a rejection from the input type [`serde_json::Error`](serde_json::Error).
#[inline]
fn from(err: serde_json::Error) -> Self {
InternalServerError(Box::new(err))
}
}
impl From<sqlx::Error> for Rejection {
/// Converts to this type from the input type `sqlx::Error`.
/// Converts to a rejection from the input type [`sqlx::Error`](sqlx::Error).
#[inline]
fn from(err: sqlx::Error) -> Self {
InternalServerError(Box::new(err))
}
}
#[cfg(feature = "accessor")]
impl From<opendal::Error> for Rejection {
/// Converts to a rejection from the input type [`opendal::Error`](opendal::Error).
#[inline]
fn from(err: opendal::Error) -> Self {
InternalServerError(Box::new(err))
}
}
impl From<Rejection> for Response<StatusCode> {
fn from(rejection: Rejection) -> Self {
use Rejection::*;

View File

@ -1,19 +1,18 @@
//! Application state.
//! Application or request scoped state.
use crate::{Map, SharedString};
use crate::Map;
use std::{
borrow::Cow,
env, fs,
net::{IpAddr, SocketAddr},
sync::LazyLock,
};
use toml::value::{Table, Value};
/// Application state.
/// A state is a record of the env, config and associated data.
#[derive(Debug, Clone)]
pub struct State {
/// Environment.
env: SharedString,
env: &'static str,
/// Configuration.
config: Table,
/// Associated data.
@ -23,9 +22,9 @@ pub struct State {
impl State {
/// Creates a new instance.
#[inline]
pub fn new(env: impl Into<SharedString>) -> Self {
pub fn new(env: &'static str) -> Self {
Self {
env: env.into(),
env,
config: Table::new(),
data: Map::new(),
}
@ -33,7 +32,7 @@ impl State {
/// Loads the config file according to the specific env.
pub fn load_config(&mut self) {
let env = self.env.as_ref();
let env = self.env;
let project_dir = env::current_dir()
.expect("the project directory does not exist or permissions are insufficient");
let config_file = project_dir.join(format!("./config/config.{env}.toml"));
@ -58,8 +57,8 @@ impl State {
/// Returns the env as `&str`.
#[inline]
pub fn env(&self) -> &str {
self.env.as_ref()
pub fn env(&self) -> &'static str {
self.env
}
/// Returns a reference to the config.
@ -150,10 +149,10 @@ impl Default for State {
/// Shared application state.
pub(crate) static SHARED_STATE: LazyLock<State> = LazyLock::new(|| {
let mut app_env = Cow::from("dev");
let mut app_env = "dev";
for arg in env::args() {
if let Some(value) = arg.strip_prefix("--env=") {
app_env = value.to_owned().into();
app_env = value.to_owned().leak();
}
}

View File

@ -1,7 +1,7 @@
[package]
name = "zino-derive"
description = "Derived traits for zino."
version = "0.3.6"
version = "0.3.7"
rust-version = "1.68"
edition = "2021"
license = "MIT"
@ -23,4 +23,4 @@ features = ["full", "extra-traits"]
[dependencies.zino-core]
path = "../zino-core"
version = "0.4.4"
version = "0.5.0"

View File

@ -1,7 +1,7 @@
[package]
name = "zino-model"
description = "Model types for zino."
version = "0.3.6"
version = "0.3.7"
rust-version = "1.68"
edition = "2021"
license = "MIT"
@ -16,8 +16,8 @@ features = ["derive"]
[dependencies.zino-core]
path = "../zino-core"
version = "0.4.4"
version = "0.5.0"
[dependencies.zino-derive]
path = "../zino-derive"
version = "0.3.6"
version = "0.3.7"

View File

@ -1,7 +1,7 @@
[package]
name = "zino"
description = "Full featured web application framework for Rust."
version = "0.4.4"
description = "Full-featured web application framework for Rust."
version = "0.5.0"
rust-version = "1.68"
edition = "2021"
license = "MIT"
@ -66,4 +66,4 @@ optional = true
[dependencies.zino-core]
path = "../zino-core"
version = "0.4.4"
version = "0.5.0"

View File

@ -8,7 +8,7 @@ use axum::{
use futures::future;
use std::{
collections::HashMap, convert::Infallible, io, net::SocketAddr, path::PathBuf, sync::LazyLock,
thread, time::Duration,
time::Duration,
};
use tokio::runtime::Builder;
use tower::{
@ -207,18 +207,11 @@ static SHARED_CLUSTER_STATE: LazyLock<State> = LazyLock::new(|| {
.get("version")
.and_then(|v| v.as_str())
.expect("the `version` field should be specified");
let available_parallelism = thread::available_parallelism()
.map(usize::from)
.unwrap_or_default();
let mut data = Map::new();
data.insert("app_name".to_owned(), app_name.into());
data.insert("app_version".to_owned(), app_version.into());
data.insert("cluster_start_at".to_owned(), DateTime::now().into());
data.insert(
"available_parallelism".to_owned(),
available_parallelism.into(),
);
data.insert("app.name".to_owned(), app_name.into());
data.insert("app.version".to_owned(), app_version.into());
data.insert("app.start_at".to_owned(), DateTime::now().into());
state.set_data(data);
state
});

View File

@ -27,7 +27,7 @@ pub(crate) async fn websocket_handler(
if source.filter(|&source| source != event_source).is_none() {
let event_topic = event.topic();
if topic.filter(|&topic| topic != event_topic).is_none() {
let message = Message::Text(data.to_string());
let message = Message::Text(data.to_owned());
if let Err(err) = socket.send(message).await {
tracing::error!("{err}");
}

View File

@ -73,7 +73,13 @@ impl RequestContext for AxumExtractor<Request<Body>> {
#[inline]
fn get_header(&self, key: &str) -> Option<&str> {
self.headers().get(key)?.to_str().ok()
self.headers()
.get(key)?
.to_str()
.inspect_err(|err| {
tracing::error!("{err}");
})
.ok()
}
#[inline]