Version 0.2.3

This commit is contained in:
photino 2023-01-07 23:29:34 +08:00
parent 914cc89a5e
commit bba6de6cf5
13 changed files with 205 additions and 164 deletions

View File

@ -8,6 +8,7 @@ publish = false
[dependencies]
axum = { version = "0.6.1" }
serde_json = { version = "1.0.91" }
tracing = { version = "0.1.37" }
[dependencies.zino]
path = "../../../zino"

View File

@ -1,38 +1,48 @@
use zino_core::{BoxFuture, DateTime, Map, Query, Schema, Uuid};
use zino_model::User;
pub(super) fn every_15s(job_id: Uuid, job_data: &mut Map) {
pub(super) fn every_15s(job_id: Uuid, job_data: &mut Map, _last_tick: DateTime) {
let counter = job_data
.get("counter")
.map(|c| c.as_u64().unwrap_or_default() + 1)
.unwrap_or_default();
job_data.insert("current".to_string(), DateTime::now().to_string().into());
job_data.insert("counter".to_string(), counter.into());
println!("job {job_id} is executed every 15 seconds: {job_data:?}");
tracing::info!(
job_data = format!("{job_data:?}"),
"job {job_id} is executed every 15 seconds"
);
}
pub(super) fn every_20s(job_id: Uuid, job_data: &mut Map) {
pub(super) fn every_20s(job_id: Uuid, job_data: &mut Map, _last_tick: DateTime) {
let counter = job_data
.get("counter")
.map(|c| c.as_u64().unwrap_or_default() + 1)
.unwrap_or_default();
job_data.insert("current".to_string(), DateTime::now().to_string().into());
job_data.insert("counter".to_string(), counter.into());
println!("job {job_id} is executed every 20 seconds: {job_data:?}");
tracing::info!(
job_data = format!("{job_data:?}"),
"job {job_id} is executed every 20 seconds"
);
}
pub(super) fn every_30s(job_id: Uuid, job_data: &mut Map) -> BoxFuture {
pub(super) fn every_30s(job_id: Uuid, job_data: &mut Map, _last_tick: DateTime) -> BoxFuture {
let counter = job_data
.get("counter")
.map(|c| c.as_u64().unwrap_or_default() + 1)
.unwrap_or_default();
job_data.insert("current".to_string(), DateTime::now().to_string().into());
job_data.insert("counter".to_string(), counter.into());
println!("async job {job_id} is executed every 30 seconds: {job_data:?}");
tracing::info!(
job_data = format!("{job_data:?}"),
"async job {job_id} is executed every 30 seconds"
);
Box::pin(async {
let query = Query::new();
let users = User::find(query).await.unwrap();
job_data.insert("users".to_string(), users.len().into());
let columns = [("*", true), ("roles", true)];
let mut map = User::count(query, columns).await.unwrap();
job_data.append(&mut map);
})
}

View File

@ -27,4 +27,4 @@ user = "postgres"
password = "QAx01wnh1i5ER713zfHmZi6dIUYn/Iq9ag+iUGtvKzEFJFYW"
[tracing]
filter = "sqlx=trace,tower_http=trace,zino=trace,zino_core=trace"
filter = "info,sqlx=trace,tower_http=trace,zino=trace,zino_core=trace"

View File

@ -27,4 +27,4 @@ user = "postgres"
password = "G76hTg8T5Aa+SZQFc+0QnsRLo1UOjqpkp/jUQ+lySc8QCt4B"
[tracing]
filter = "sqlx=warn,tower_http=warn,zino=info,zino_core=info"
filter = "info,sqlx=warn,tower_http=warn"

View File

@ -47,10 +47,10 @@ impl Mutation {
/// Retains the editable fields in the allow list of columns.
/// If the editable fields are empty, it will be set to the allow list.
#[inline]
pub fn allow_fields(&mut self, columns: &[String]) {
pub fn allow_fields<const N: usize>(&mut self, columns: [&str; N]) {
let fields = &mut self.fields;
if fields.is_empty() {
fields.extend_from_slice(columns);
self.fields = columns.map(|col| col.to_string()).to_vec();
} else {
fields.retain(|field| {
columns
@ -62,7 +62,7 @@ impl Mutation {
/// Removes the editable fields in the deny list of columns.
#[inline]
pub fn deny_fields(&mut self, columns: &[String]) {
pub fn deny_fields<const N: usize>(&mut self, columns: [&str; N]) {
self.fields.retain(|field| {
!columns
.iter()

View File

@ -124,10 +124,10 @@ impl Query {
/// Retains the projection fields in the allow list of columns.
/// If the projection fields are empty, it will be set to the allow list.
#[inline]
pub fn allow_fields(&mut self, columns: &[String]) {
pub fn allow_fields<const N: usize>(&mut self, columns: [&str; N]) {
let fields = &mut self.fields;
if fields.is_empty() {
fields.extend_from_slice(columns);
self.fields = columns.map(|col| col.to_string()).to_vec();
} else {
fields.retain(|field| {
columns
@ -139,7 +139,7 @@ impl Query {
/// Removes the projection fields in the deny list of columns.
#[inline]
pub fn deny_fields(&mut self, columns: &[String]) {
pub fn deny_fields<const N: usize>(&mut self, columns: [&str; N]) {
self.fields.retain(|field| {
!columns
.iter()

View File

@ -86,16 +86,15 @@ pub trait Schema: 'static + Send + Sync + Model {
}
columns.push(column);
}
let columns = columns.join(",\n");
let sql = format!(
"
CREATE TABLE IF NOT EXISTS {0} (
{1},
CONSTRAINT {0}_pkey PRIMARY KEY ({2})
CREATE TABLE IF NOT EXISTS {table_name} (
{columns},
CONSTRAINT {table_name}_pkey PRIMARY KEY ({primary_key_name})
);
",
table_name,
columns.join(",\n"),
primary_key_name
"
);
let query_result = sqlx::query(&sql).execute(pool).await?;
Ok(query_result.rows_affected())
@ -159,20 +158,18 @@ pub trait Schema: 'static + Send + Sync + Model {
let pool = Self::init_writer().await.ok_or(Error::PoolClosed)?.pool();
let table_name = Self::table_name();
let map = self.into_map();
let mut keys = Vec::new();
let mut columns = Vec::new();
let mut values = Vec::new();
for col in Self::columns() {
let key = col.name();
let value = col.encode_postgres_value(map.get(key));
keys.push(key);
let column = col.name();
let value = col.encode_postgres_value(map.get(column));
columns.push(column);
values.push(value);
}
let sql = format!(
"INSERT INTO {0} ({1}) VALUES ({2});",
table_name,
keys.join(","),
values.join(",")
);
let columns = columns.join(",");
let values = values.join(",");
let sql = format!("INSERT INTO {table_name} ({columns}) VALUES ({values});");
let query_result = sqlx::query(&sql).execute(pool).await?;
Ok(query_result.rows_affected())
}
@ -181,25 +178,23 @@ pub trait Schema: 'static + Send + Sync + Model {
async fn insert_many(models: Vec<Self>) -> Result<u64, Error> {
let pool = Self::init_writer().await.ok_or(Error::PoolClosed)?.pool();
let table_name = Self::table_name();
let mut keys = Vec::new();
let mut columns = Vec::new();
let mut values = Vec::new();
for model in models.into_iter() {
let map = model.into_map();
let mut entries = Vec::new();
for col in Self::columns() {
let key = col.name();
let value = col.encode_postgres_value(map.get(key));
keys.push(key);
let column = col.name();
let value = col.encode_postgres_value(map.get(column));
columns.push(column);
entries.push(value);
}
values.push(format!("({})", entries.join(",")));
}
let sql = format!(
"INSERT INTO {0} ({1}) VALUES {2};",
table_name,
keys.join(","),
values.join(",")
);
let columns = columns.join(",");
let values = values.join(",");
let sql = format!("INSERT INTO {table_name} ({columns}) VALUES ({values});");
let query_result = sqlx::query(&sql).execute(pool).await?;
Ok(query_result.rows_affected())
}
@ -213,18 +208,16 @@ pub trait Schema: 'static + Send + Sync + Model {
let map = self.into_map();
let mut mutations = Vec::new();
for col in Self::columns() {
let key = col.name();
if key != primary_key_name {
let value = col.encode_postgres_value(map.get(key));
mutations.push(format!("{key} = {value}"));
let column = col.name();
if column != primary_key_name {
let value = col.encode_postgres_value(map.get(column));
mutations.push(format!("{column} = {value}"));
}
}
let mutations = mutations.join(",");
let sql = format!(
"UPDATE {0} SET {1} WHERE {2} = '{3}';",
table_name,
mutations.join(","),
primary_key_name,
primary_key
"UPDATE {table_name} SET {mutations} WHERE {primary_key_name} = '{primary_key}';"
);
let query_result = sqlx::query(&sql).execute(pool).await?;
Ok(query_result.rows_affected())
@ -265,28 +258,27 @@ pub trait Schema: 'static + Send + Sync + Model {
let table_name = Self::table_name();
let primary_key_name = Self::PRIMARY_KEY_NAME;
let map = self.into_map();
let mut keys = Vec::new();
let mut columns = Vec::new();
let mut values = Vec::new();
let mut mutations = Vec::new();
for col in Self::columns() {
let key = col.name();
let value = col.encode_postgres_value(map.get(key));
if key != primary_key_name {
mutations.push(format!("{key} = {value}"));
let column = col.name();
let value = col.encode_postgres_value(map.get(column));
if column != primary_key_name {
mutations.push(format!("{column} = {value}"));
}
keys.push(key);
columns.push(column);
values.push(value);
}
let columns = columns.join(",");
let values = values.join(",");
let mutations = mutations.join(",");
let sql = format!(
"
INSERT INTO {0} ({1}) VALUES ({2})
ON CONFLICT ({3}) DO UPDATE SET {4};
",
table_name,
keys.join(","),
values.join(","),
primary_key_name,
mutations.join(",")
INSERT INTO {table_name} ({columns}) VALUES ({values})
ON CONFLICT ({primary_key_name}) DO UPDATE SET {mutations};
"
);
let query_result = sqlx::query(&sql).execute(pool).await?;
Ok(query_result.rows_affected())
@ -409,10 +401,10 @@ pub trait Schema: 'static + Send + Sync + Model {
/// Fetches the associated data in the corresponding `columns` for `Vec<Map>` using
/// a merged select on the primary key, which solves the `N+1` problem.
async fn fetch(
async fn fetch<const N: usize>(
mut query: Query,
data: &mut Vec<Map>,
columns: &[String],
columns: [&str; N],
) -> Result<u64, Error> {
let pool = Self::init_reader().await.ok_or(Error::PoolClosed)?.pool();
let table_name = Self::table_name();
@ -485,7 +477,11 @@ pub trait Schema: 'static + Send + Sync + Model {
/// Fetches the associated data in the corresponding `columns` for `Map` using
/// a merged select on the primary key, which solves the `N+1` problem.
async fn fetch_one(mut query: Query, data: &mut Map, columns: &[String]) -> Result<u64, Error> {
async fn fetch_one<const N: usize>(
mut query: Query,
data: &mut Map,
columns: [&str; N],
) -> Result<u64, Error> {
let pool = Self::init_reader().await.ok_or(Error::PoolClosed)?.pool();
let table_name = Self::table_name();
let primary_key_name = Self::PRIMARY_KEY_NAME;
@ -551,8 +547,35 @@ pub trait Schema: 'static + Send + Sync + Model {
u64::try_from(associations.len()).map_err(|err| Error::Decode(Box::new(err)))
}
/// Counts the number of rows selected by the query in the table.
/// The boolean value `true` denotes that it only counts distinct values in the column.
async fn count<const N: usize>(query: Query, columns: [(&str, bool); N]) -> Result<Map, Error> {
let pool = Self::init_writer().await.ok_or(Error::PoolClosed)?.pool();
let table_name = Self::table_name();
let filter = query.format_filter::<Self>();
let projection = columns
.into_iter()
.map(|(key, distinct)| {
if key != "*" {
if distinct {
format!("count(distinct {key}) as count_distinct_{key}")
} else {
format!("count({key}) as count_{key}")
}
} else {
"count(*)".to_string()
}
})
.intersperse(",".to_string())
.collect::<String>();
let sql = format!("SELECT {projection} FROM {table_name} {filter};");
let row = sqlx::query(&sql).fetch_one(pool).await?;
let map = Column::parse_postgres_row(&row)?;
Ok(map)
}
/// Executes the query in the table, and returns the total number of rows affected.
async fn execute(sql: &str, params: Option<&[String]>) -> Result<u64, Error> {
async fn execute<const N: usize>(sql: &str, params: Option<[&str; N]>) -> Result<u64, Error> {
let pool = Self::init_reader().await.ok_or(Error::PoolClosed)?.pool();
let mut query = sqlx::query(sql);
if let Some(params) = params {
@ -565,7 +588,10 @@ pub trait Schema: 'static + Send + Sync + Model {
}
/// Executes the query in the table, and parses it as `Vec<Map>`.
async fn query(sql: &str, params: Option<&[String]>) -> Result<Vec<Map>, Error> {
async fn query<const N: usize>(
sql: &str,
params: Option<[&str; N]>,
) -> Result<Vec<Map>, Error> {
let pool = Self::init_reader().await.ok_or(Error::PoolClosed)?.pool();
let mut query = sqlx::query(sql);
if let Some(params) = params {
@ -583,16 +609,19 @@ pub trait Schema: 'static + Send + Sync + Model {
}
/// Executes the query in the table, and parses it as `Vec<T>`.
async fn query_as<T: DeserializeOwned>(
async fn query_as<T: DeserializeOwned, const N: usize>(
sql: &str,
params: Option<&[String]>,
params: Option<[&str; N]>,
) -> Result<Vec<T>, Error> {
let data = Self::query(sql, params).await?;
serde_json::from_value(data.into()).map_err(|err| Error::Decode(Box::new(err)))
}
/// Executes the query in the table, and parses it as a `Map`.
async fn query_one(sql: &str, params: Option<&[String]>) -> Result<Option<Map>, Error> {
async fn query_one<const N: usize>(
sql: &str,
params: Option<[&str; N]>,
) -> Result<Option<Map>, Error> {
let pool = Self::init_reader().await.ok_or(Error::PoolClosed)?.pool();
let mut query = sqlx::query(sql);
if let Some(params) = params {
@ -611,9 +640,9 @@ pub trait Schema: 'static + Send + Sync + Model {
}
/// Executes the query in the table, and parses it as an instance of type `T`.
async fn query_one_as<T: DeserializeOwned>(
async fn query_one_as<T: DeserializeOwned, const N: usize>(
sql: &str,
params: Option<&[String]>,
params: Option<[&str; N]>,
) -> Result<Option<T>, Error> {
match Self::query_one(sql, params).await? {
Some(data) => {
@ -628,11 +657,11 @@ pub trait Schema: 'static + Send + Sync + Model {
let pool = Self::init_reader().await.ok_or(Error::PoolClosed)?.pool();
let table_name = Self::table_name();
let primary_key_name = Self::PRIMARY_KEY_NAME;
let primary_key = Column::format_postgres_string(primary_key);
let sql = format!(
"SELECT * FROM {0} WHERE {1} = {2};",
table_name,
primary_key_name,
Column::format_postgres_string(primary_key)
"
SELECT * FROM {table_name} WHERE {primary_key_name} = {primary_key};
"
);
match sqlx::query(&sql).fetch_optional(pool).await? {
Some(row) => {

View File

@ -4,10 +4,10 @@ use cron::Schedule;
use std::{str::FromStr, time::Duration};
/// Cron job.
pub type CronJob = fn(Uuid, &mut Map);
pub type CronJob = fn(Uuid, &mut Map, DateTime);
/// Async cron job.
pub type AsyncCronJob = for<'a> fn(Uuid, &'a mut Map) -> BoxFuture<'a>;
pub type AsyncCronJob = for<'a> fn(Uuid, &'a mut Map, DateTime) -> BoxFuture<'a>;
/// Exectuable job.
enum ExecutableJob {
@ -72,13 +72,13 @@ impl Job {
/// Executes missed runs.
pub fn tick(&mut self) {
let now = Local::now();
if let Some(ref last_tick) = self.last_tick {
for event in self.schedule.after(last_tick) {
if let Some(last_tick) = self.last_tick {
for event in self.schedule.after(&last_tick) {
if event > now {
break;
}
match self.run {
ExecutableJob::Fn(exec) => exec(self.id, &mut self.data),
ExecutableJob::Fn(exec) => exec(self.id, &mut self.data, last_tick.into()),
ExecutableJob::AsyncFn(_) => tracing::warn!("job {} is async", self.id),
}
}
@ -89,14 +89,16 @@ impl Job {
/// Executes missed runs asynchronously.
pub async fn tick_async(&mut self) {
let now = Local::now();
if let Some(ref last_tick) = self.last_tick {
for event in self.schedule.after(last_tick) {
if let Some(last_tick) = self.last_tick {
for event in self.schedule.after(&last_tick) {
if event > now {
break;
}
match self.run {
ExecutableJob::Fn(_) => tracing::warn!("job {} is not async", self.id),
ExecutableJob::AsyncFn(exec) => exec(self.id, &mut self.data).await,
ExecutableJob::AsyncFn(exec) => {
exec(self.id, &mut self.data, last_tick.into()).await
}
}
}
}

View File

@ -130,21 +130,22 @@ impl State {
impl Default for State {
#[inline]
fn default() -> Self {
SHARED_STATE.clone()
let mut app_env = "dev".to_string();
for arg in env::args() {
if arg.starts_with("--env=") {
app_env = arg.strip_prefix("--env=").unwrap().to_string();
}
}
let mut state = State::new(app_env);
state.load_config();
state
}
}
/// Shared server state.
pub(crate) static SHARED_STATE: LazyLock<State> = LazyLock::new(|| {
let mut app_env = "dev".to_string();
for arg in env::args() {
if arg.starts_with("--env=") {
app_env = arg.strip_prefix("--env=").unwrap().to_string();
}
}
let mut state = State::new(app_env);
state.load_config();
let mut state = State::default();
// Database connection pools.
let mut pools = Vec::new();

View File

@ -111,11 +111,12 @@ pub fn schema_macro(item: TokenStream) -> TokenStream {
let schema_columns = format_ident!("{}_COLUMNS", type_name_uppercase);
let schema_reader = format_ident!("{}_READER", type_name_uppercase);
let schema_writer = format_ident!("{}_WRITER", type_name_uppercase);
let columns_len = columns.len();
let output = quote! {
use std::sync::{LazyLock, OnceLock};
static #schema_columns: LazyLock<Vec<zino_core::Column>> = LazyLock::new(|| {
vec![#(#columns),*]
static #schema_columns: LazyLock<[zino_core::Column; #columns_len]> = LazyLock::new(|| {
[#(#columns),*]
});
static #schema_reader: OnceLock<&zino_core::ConnectionPool> = OnceLock::new();
static #schema_writer: OnceLock<&zino_core::ConnectionPool> = OnceLock::new();
@ -132,7 +133,7 @@ pub fn schema_macro(item: TokenStream) -> TokenStream {
/// Returns a reference to the columns.
#[inline]
fn columns() -> &'static[zino_core::Column<'static>] {
fn columns() -> &'static [zino_core::Column<'static>] {
std::sync::LazyLock::force(&#schema_columns).as_slice()
}

View File

@ -25,6 +25,8 @@ use tower_http::{
compression::CompressionLayer,
services::{ServeDir, ServeFile},
};
use tracing::Level;
use tracing_subscriber::fmt::{time, writer::MakeWriterExt};
use zino_core::{Application, AsyncCronJob, Job, JobScheduler, Response, State};
/// An HTTP server cluster for `axum`.
@ -41,6 +43,61 @@ impl Application for AxumCluster {
/// Creates a new application.
fn new() -> Self {
let state = State::default();
let app_env = state.env();
let is_dev = app_env == "dev";
let mut env_filter = if is_dev {
"sqlx=trace,tower_http=trace,zino=trace,zino_core=trace"
} else {
"sqlx=warn,tower_http=info,zino=info,zino_core=info"
};
let mut display_target = is_dev;
let mut display_filename = false;
let mut display_line_number = false;
let mut display_thread_names = false;
let mut display_span_list = false;
let display_current_span = true;
if let Some(tracing) = state.config().get("tracing").and_then(|t| t.as_table()) {
if let Some(filter) = tracing.get("filter").and_then(|t| t.as_str()) {
env_filter = filter;
}
display_target = tracing
.get("display-target")
.and_then(|t| t.as_bool())
.unwrap_or(is_dev);
display_filename = tracing
.get("display-filename")
.and_then(|t| t.as_bool())
.unwrap_or(false);
display_line_number = tracing
.get("display-line-number")
.and_then(|t| t.as_bool())
.unwrap_or(false);
display_thread_names = tracing
.get("display-thread-names")
.and_then(|t| t.as_bool())
.unwrap_or(false);
display_span_list = tracing
.get("display-span-list")
.and_then(|t| t.as_bool())
.unwrap_or(false);
}
let stderr = std::io::stderr.with_max_level(Level::WARN);
tracing_subscriber::fmt()
.json()
.with_env_filter(env_filter)
.with_target(display_target)
.with_file(display_filename)
.with_line_number(display_line_number)
.with_thread_names(display_thread_names)
.with_span_list(display_span_list)
.with_current_span(display_current_span)
.with_timer(time::LocalTime::rfc_3339())
.map_writer(move |w| stderr.or_else(w))
.init();
Self {
start_time: Instant::now(),
routes: HashMap::new(),

View File

@ -4,8 +4,8 @@ use zino_core::State;
// CORS middleware.
pub(crate) static CORS_MIDDLEWARE: LazyLock<CorsLayer> = LazyLock::new(|| {
let config = State::shared().config();
match config.get("cors").and_then(|t| t.as_table()) {
let shared_state = State::shared();
match shared_state.config().get("cors").and_then(|t| t.as_table()) {
Some(cors) => {
let allow_credentials = cors
.get("allow-credentials")

View File

@ -8,77 +8,17 @@ use tower_http::{
LatencyUnit,
};
use tracing::Level;
use tracing_subscriber::fmt::{time, writer::MakeWriterExt};
use zino_core::State;
// Tracing middleware.
pub(crate) static TRACING_MIDDLEWARE: LazyLock<
TraceLayer<SharedClassifier<StatusInRangeAsFailures>>,
> = LazyLock::new(|| {
let shared_state = State::shared();
let app_env = shared_state.env();
let is_dev = app_env == "dev";
let mut env_filter = if is_dev {
"sqlx=trace,tower_http=trace,zino=trace,zino_core=trace"
} else {
"sqlx=warn,tower_http=info,zino=info,zino_core=info"
};
let mut display_target = is_dev;
let mut display_filename = false;
let mut display_line_number = false;
let mut display_thread_names = false;
let mut display_span_list = false;
let display_current_span = true;
let include_headers = true;
let config = shared_state.config();
if let Some(tracing) = config.get("tracing").and_then(|t| t.as_table()) {
if let Some(filter) = tracing.get("filter").and_then(|t| t.as_str()) {
env_filter = filter;
}
display_target = tracing
.get("display-target")
.and_then(|t| t.as_bool())
.unwrap_or(is_dev);
display_filename = tracing
.get("display-filename")
.and_then(|t| t.as_bool())
.unwrap_or(false);
display_line_number = tracing
.get("display-line-number")
.and_then(|t| t.as_bool())
.unwrap_or(false);
display_thread_names = tracing
.get("display-thread-names")
.and_then(|t| t.as_bool())
.unwrap_or(false);
display_span_list = tracing
.get("display-span-list")
.and_then(|t| t.as_bool())
.unwrap_or(false);
}
let stderr = std::io::stderr.with_max_level(Level::WARN);
tracing_subscriber::fmt()
.json()
.with_env_filter(env_filter)
.with_target(display_target)
.with_file(display_filename)
.with_line_number(display_line_number)
.with_thread_names(display_thread_names)
.with_span_list(display_span_list)
.with_current_span(display_current_span)
.with_timer(time::LocalTime::rfc_3339())
.map_writer(move |w| stderr.or_else(w))
.init();
let classifier = StatusInRangeAsFailures::new_for_client_and_server_errors();
TraceLayer::new(classifier.into_make_classifier())
.make_span_with(
DefaultMakeSpan::new()
.level(Level::INFO)
.include_headers(include_headers),
.include_headers(true),
)
.on_request(DefaultOnRequest::new().level(Level::DEBUG))
.on_response(