From ca7a4746af6945912057641caa5a40e9412444dc Mon Sep 17 00:00:00 2001 From: photino Date: Sat, 7 Jan 2023 01:08:41 +0800 Subject: [PATCH] Add schedule --- examples/axum-app/app/src/controller/stats.rs | 3 +- examples/axum-app/app/src/controller/user.rs | 4 +- examples/axum-app/app/src/main.rs | 11 +- examples/axum-app/app/src/router.rs | 2 +- examples/axum-app/app/src/schedule/job.rs | 38 ++++ examples/axum-app/app/src/schedule/mod.rs | 24 +++ examples/axum-app/config/config.prod.toml | 2 +- zino-core/Cargo.toml | 5 +- zino-core/src/application.rs | 27 ++- .../src/authentication/security_token.rs | 14 ++ zino-core/src/channel/cloud_event.rs | 3 +- zino-core/src/database/column.rs | 14 +- zino-core/src/database/mod.rs | 5 +- zino-core/src/datetime.rs | 91 ++++++--- zino-core/src/lib.rs | 6 + zino-core/src/request/validation.rs | 2 +- zino-core/src/schedule/mod.rs | 177 ++++++++++++++++++ zino-derive/src/parser.rs | 6 +- zino-model/src/collection.rs | 6 +- zino-model/src/dataset.rs | 10 +- zino-model/src/group.rs | 6 +- zino-model/src/log.rs | 3 +- zino-model/src/message.rs | 6 +- zino-model/src/order.rs | 6 +- zino-model/src/policy.rs | 8 +- zino-model/src/record.rs | 3 +- zino-model/src/resource.rs | 6 +- zino-model/src/source.rs | 6 +- zino-model/src/tag.rs | 6 +- zino-model/src/task.rs | 12 +- zino-model/src/user.rs | 6 +- zino/Cargo.toml | 2 +- zino/src/cluster/axum_cluster.rs | 161 ++++++++-------- 33 files changed, 494 insertions(+), 187 deletions(-) create mode 100644 examples/axum-app/app/src/schedule/job.rs create mode 100644 examples/axum-app/app/src/schedule/mod.rs create mode 100644 zino-core/src/schedule/mod.rs diff --git a/examples/axum-app/app/src/controller/stats.rs b/examples/axum-app/app/src/controller/stats.rs index 5bd418f..10ac619 100644 --- a/examples/axum-app/app/src/controller/stats.rs +++ b/examples/axum-app/app/src/controller/stats.rs @@ -1,5 +1,6 @@ -use crate::{Request, RequestContext, Response}; use serde_json::json; +use zino::Request; +use zino_core::{RequestContext, Response}; pub(crate) async fn index(req: Request) -> zino::Result { let mut res = Response::default(); diff --git a/examples/axum-app/app/src/controller/user.rs b/examples/axum-app/app/src/controller/user.rs index 6367c10..a81075e 100644 --- a/examples/axum-app/app/src/controller/user.rs +++ b/examples/axum-app/app/src/controller/user.rs @@ -1,5 +1,7 @@ -use crate::{Model, Query, Rejection, Request, RequestContext, Response, Schema, User, Uuid}; use serde_json::json; +use zino::Request; +use zino_core::{Model, Query, Rejection, RequestContext, Response, Schema, Uuid}; +use zino_model::User; pub(crate) async fn new(mut req: Request) -> zino::Result { let mut user = User::new(); diff --git a/examples/axum-app/app/src/main.rs b/examples/axum-app/app/src/main.rs index 59444e1..7e06875 100644 --- a/examples/axum-app/app/src/main.rs +++ b/examples/axum-app/app/src/main.rs @@ -1,11 +1,12 @@ mod controller; mod router; +mod schedule; -/// Reexports. -use zino::{AxumCluster, Request}; -use zino_core::{Application, Model, Query, Rejection, RequestContext, Response, Schema, Uuid}; -use zino_model::User; +use zino_core::Application; fn main() -> std::io::Result<()> { - AxumCluster::new().register(router::init()).run() + zino::AxumCluster::new() + .register(router::init_routes()) + .spawn(schedule::init_jobs()) + .run(schedule::init_async_jobs()) } diff --git a/examples/axum-app/app/src/router.rs b/examples/axum-app/app/src/router.rs index a1a8915..44b6b1d 100644 --- a/examples/axum-app/app/src/router.rs +++ b/examples/axum-app/app/src/router.rs @@ -5,7 +5,7 @@ use axum::{ }; use std::collections::HashMap; -pub(crate) fn init() -> HashMap<&'static str, Router> { +pub(crate) fn init_routes() -> HashMap<&'static str, Router> { let mut routes = HashMap::new(); // User controller. diff --git a/examples/axum-app/app/src/schedule/job.rs b/examples/axum-app/app/src/schedule/job.rs new file mode 100644 index 0000000..c5c775b --- /dev/null +++ b/examples/axum-app/app/src/schedule/job.rs @@ -0,0 +1,38 @@ +use zino_core::{BoxFuture, DateTime, Map, Query, Schema, Uuid}; +use zino_model::User; + +pub(super) fn every_15s(job_id: Uuid, job_data: &mut Map) { + let counter = job_data + .get("counter") + .map(|c| c.as_u64().unwrap_or_default() + 1) + .unwrap_or_default(); + job_data.insert("current".to_string(), DateTime::now().to_string().into()); + job_data.insert("counter".to_string(), counter.into()); + println!("Job {job_id} is executed every 15 seconds: {job_data:?}"); +} + +pub(super) fn every_20s(job_id: Uuid, job_data: &mut Map) { + let counter = job_data + .get("counter") + .map(|c| c.as_u64().unwrap_or_default() + 1) + .unwrap_or_default(); + job_data.insert("current".to_string(), DateTime::now().to_string().into()); + job_data.insert("counter".to_string(), counter.into()); + println!("Job {job_id} is executed every 20 seconds: {job_data:?}"); +} + +pub(super) fn every_30s(job_id: Uuid, job_data: &mut Map) -> BoxFuture { + let counter = job_data + .get("counter") + .map(|c| c.as_u64().unwrap_or_default() + 1) + .unwrap_or_default(); + job_data.insert("current".to_string(), DateTime::now().to_string().into()); + job_data.insert("counter".to_string(), counter.into()); + println!("Job {job_id} is executed every 45 seconds: {job_data:?}"); + + Box::pin(async { + let query = Query::new(); + let users = User::find(query).await.unwrap(); + job_data.insert("users".to_string(), users.len().into()); + }) +} diff --git a/examples/axum-app/app/src/schedule/mod.rs b/examples/axum-app/app/src/schedule/mod.rs new file mode 100644 index 0000000..c2165c1 --- /dev/null +++ b/examples/axum-app/app/src/schedule/mod.rs @@ -0,0 +1,24 @@ +use std::collections::HashMap; +use zino_core::{AsyncCronJob, CronJob}; + +mod job; + +pub(crate) fn init_jobs() -> HashMap<&'static str, CronJob> { + let mut jobs = HashMap::new(); + + let run_every_15s: CronJob = job::every_15s; + let run_every_20s: CronJob = job::every_20s; + jobs.insert("0/15 * * * * *", run_every_15s); + jobs.insert("0/20 * * * * *", run_every_20s); + + jobs +} + +pub(crate) fn init_async_jobs() -> HashMap<&'static str, AsyncCronJob> { + let mut async_jobs = HashMap::new(); + + let run_every_30s: AsyncCronJob = job::every_30s; + async_jobs.insert("0/30 * * * * *", run_every_30s); + + async_jobs +} diff --git a/examples/axum-app/config/config.prod.toml b/examples/axum-app/config/config.prod.toml index c637254..1493268 100644 --- a/examples/axum-app/config/config.prod.toml +++ b/examples/axum-app/config/config.prod.toml @@ -27,4 +27,4 @@ user = "postgres" password = "G76hTg8T5Aa+SZQFc+0QnsRLo1UOjqpkp/jUQ+lySc8QCt4B" [tracing] -filter = "sqlx=warn,tower_http=info,zino=info,zino_core=info" \ No newline at end of file +filter = "sqlx=warn,tower_http=warn,zino=info,zino_core=info" \ No newline at end of file diff --git a/zino-core/Cargo.toml b/zino-core/Cargo.toml index 69aafc3..c54f20d 100644 --- a/zino-core/Cargo.toml +++ b/zino-core/Cargo.toml @@ -17,6 +17,8 @@ aes-gcm-siv = { version = "0.11.1" } async-trait = { version = "0.1.60" } base64 = { version = "0.20.0" } bytes = { version = "1.3.0" } +chrono = { version = "0.4.23", features = ["serde"] } +cron = { version = "0.12.0" } futures = { version = "0.3.25" } hmac = { version = "0.12.1" } http = { version = "0.2.8" } @@ -26,8 +28,7 @@ rand = { version = "0.8.5" } serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.91" } sha2 = { version = "0.10.6" } -sqlx = { version = "0.6.2", features = ["runtime-tokio-native-tls", "postgres", "uuid", "time", "json"] } -time = { version = "0.3.17", features = ["local-offset", "parsing", "serde"] } +sqlx = { version = "0.6.2", features = ["runtime-tokio-native-tls", "postgres", "uuid", "chrono", "json"] } tracing = { version = "0.1.37" } toml = { version = "0.5.10" } url = { version = "2.3.1" } diff --git a/zino-core/src/application.rs b/zino-core/src/application.rs index 6fd98db..f9333b9 100644 --- a/zino-core/src/application.rs +++ b/zino-core/src/application.rs @@ -1,4 +1,5 @@ -use std::{io, time::Instant}; +use crate::{AsyncCronJob, CronJob, Job, JobScheduler}; +use std::{collections::HashMap, io, thread, time::Instant}; /// Application. pub trait Application { @@ -8,12 +9,28 @@ pub trait Application { /// Creates a new application. fn new() -> Self; - /// Registers the router. - fn register(self, routes: Self::Router) -> Self; - /// Returns the start time. fn start_time(&self) -> Instant; + /// Registers routes. + fn register(self, routes: HashMap<&'static str, Self::Router>) -> Self; + + /// Spawns a new thread to run jobs. + fn spawn(self, jobs: HashMap<&'static str, CronJob>) -> Self + where + Self: Sized, + { + let mut scheduler = JobScheduler::new(); + for (cron_expr, exec) in jobs { + scheduler.add(Job::new(cron_expr, exec)); + } + thread::spawn(move || loop { + scheduler.tick(); + thread::sleep(scheduler.time_till_next_job()); + }); + self + } + /// Runs the application. - fn run(self) -> io::Result<()>; + fn run(self, async_jobs: HashMap<&'static str, AsyncCronJob>) -> io::Result<()>; } diff --git a/zino-core/src/authentication/security_token.rs b/zino-core/src/authentication/security_token.rs index 3f63876..58ae1e3 100644 --- a/zino-core/src/authentication/security_token.rs +++ b/zino-core/src/authentication/security_token.rs @@ -121,6 +121,20 @@ impl SecurityToken { pub fn as_str(&self) -> &str { self.token.as_str() } + + /// Encrypts the plaintext using AES-GCM-SIV. + pub fn encrypt(key: impl AsRef<[u8]>, plaintext: impl AsRef<[u8]>) -> Option { + crypto::encrypt(key.as_ref(), plaintext.as_ref()) + .ok() + .map(base64::encode) + } + + /// Decrypts the data using AES-GCM-SIV. + pub fn decrypt(key: impl AsRef<[u8]>, data: impl AsRef<[u8]>) -> Option { + base64::decode(data) + .ok() + .and_then(|cipher| crypto::decrypt(key.as_ref(), &cipher).ok()) + } } impl fmt::Display for SecurityToken { diff --git a/zino-core/src/channel/cloud_event.rs b/zino-core/src/channel/cloud_event.rs index 2389db9..3aec61f 100644 --- a/zino-core/src/channel/cloud_event.rs +++ b/zino-core/src/channel/cloud_event.rs @@ -2,7 +2,8 @@ use crate::{DateTime, Map}; use serde::{Deserialize, Serialize}; use serde_json::Value; -/// Cloud event. See [the spec](https://github.com/cloudevents/spec/blob/v1.0.1/spec.md). +/// Cloud event. +/// See [the spec](https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md). #[derive(Debug, Clone, Default, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] #[serde(default)] diff --git a/zino-core/src/database/column.rs b/zino-core/src/database/column.rs index d9f354a..c08d5f2 100644 --- a/zino-core/src/database/column.rs +++ b/zino-core/src/database/column.rs @@ -1,8 +1,8 @@ use crate::{Map, Uuid}; +use chrono::{DateTime, Local, SecondsFormat}; use serde::Serialize; use serde_json::Value; use sqlx::{postgres::PgRow, Column as _, Error, Row, TypeInfo}; -use time::OffsetDateTime; /// A column is a model field with associated metadata. #[derive(Debug, Clone, Serialize)] @@ -140,8 +140,10 @@ impl<'a> Column<'a> { "bool" => row.try_get_unchecked::(key)?.into(), "String" => row.try_get_unchecked::(key)?.into(), "DateTime" => { - let datetime = row.try_get_unchecked::(key)?; - datetime.to_string().into() + let datetime = row.try_get_unchecked::, _>(key)?; + datetime + .to_rfc3339_opts(SecondsFormat::Micros, false) + .into() } "Uuid" | "Option" => row.try_get_unchecked::(key)?.to_string().into(), "Vec" => row.try_get_unchecked::, _>(key)?.into(), @@ -175,8 +177,10 @@ impl<'a> Column<'a> { "BOOL" => row.try_get_unchecked::(key)?.into(), "TEXT" | "VARCHAR" => row.try_get_unchecked::(key)?.into(), "TIMESTAMPTZ" => { - let datetime = row.try_get_unchecked::(key)?; - datetime.to_string().into() + let datetime = row.try_get_unchecked::, _>(key)?; + datetime + .to_rfc3339_opts(SecondsFormat::Micros, false) + .into() } "UUID" => row.try_get_unchecked::(key)?.to_string().into(), "BYTEA" => row.try_get_unchecked::, _>(key)?.into(), diff --git a/zino-core/src/database/mod.rs b/zino-core/src/database/mod.rs index c5ca6f6..ad7f9cd 100644 --- a/zino-core/src/database/mod.rs +++ b/zino-core/src/database/mod.rs @@ -1,3 +1,4 @@ +use crate::crypto; use sqlx::{postgres::PgPoolOptions, Error, PgPool}; use toml::value::Table; @@ -44,7 +45,7 @@ impl ConnectionPool { .as_str() .expect("the `postgres.password` field should be a str"); let key = format!("{user}@{database}"); - crate::crypto::encrypt(key.as_bytes(), password.as_bytes()) + crypto::encrypt(key.as_bytes(), password.as_bytes()) .ok() .map(base64::encode) } @@ -78,7 +79,7 @@ impl ConnectionPool { .expect("the `postgres.password` field should be a str"); if let Ok(data) = base64::decode(password) { let key = format!("{user}@{database}"); - if let Ok(plaintext) = crate::crypto::decrypt(key.as_bytes(), &data) { + if let Ok(plaintext) = crypto::decrypt(key.as_bytes(), &data) { password = plaintext.leak(); } } diff --git a/zino-core/src/datetime.rs b/zino-core/src/datetime.rs index 78e825c..943db0b 100644 --- a/zino-core/src/datetime.rs +++ b/zino-core/src/datetime.rs @@ -1,3 +1,7 @@ +use chrono::{ + format::{ParseError, ParseResult}, + Local, SecondsFormat, TimeZone, Utc, +}; use serde::{Deserialize, Serialize}; use std::{ fmt, @@ -5,74 +9,107 @@ use std::{ str::FromStr, time::Duration, }; -use time::{ - error::Parse, - format_description::well_known::{Rfc2822, Rfc3339}, - OffsetDateTime, UtcOffset, -}; /// ISO 8601 combined date and time with local time zone. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] -pub struct DateTime(OffsetDateTime); +pub struct DateTime(chrono::DateTime); impl DateTime { /// Returns a new instance which corresponds to the current date. #[inline] pub fn now() -> Self { - Self(OffsetDateTime::now_utc()) + Self(Local::now()) } /// Returns a new instance corresponding to a UTC date and time, /// from the number of non-leap seconds since the midnight UTC on January 1, 1970. #[inline] pub fn from_timestamp(secs: i64) -> Self { - Self(OffsetDateTime::from_unix_timestamp(secs).unwrap()) + Self(Local.timestamp_opt(secs, 0).unwrap()) + } + + /// Returns a new instance corresponding to a UTC date and time, + /// from the number of non-leap milliseconds since the midnight UTC on January 1, 1970. + #[inline] + pub fn from_timestamp_millis(mills: i64) -> Self { + Self(Local.timestamp_millis_opt(mills).unwrap()) } /// Returns the number of non-leap seconds since January 1, 1970 0:00:00 UTC. #[inline] pub fn timestamp(&self) -> i64 { - self.0.unix_timestamp() + self.0.timestamp() + } + + /// Returns the number of non-leap-milliseconds since January 1, 1970 UTC. + #[inline] + pub fn timestamp_millis(&self) -> i64 { + self.0.timestamp_millis() } /// Parses an RFC 2822 date and time. #[inline] - pub fn parse_utc_str(s: &str) -> Result { - let datetime = OffsetDateTime::parse(s, &Rfc2822)?; - Ok(Self(datetime)) + pub fn parse_utc_str(s: &str) -> ParseResult { + let datetime = chrono::DateTime::parse_from_rfc2822(s)?; + Ok(Self(datetime.with_timezone(&Local))) + } + + /// Parses an RFC 3339 and ISO 8601 date and time. + #[inline] + pub fn parse_iso_str(s: &str) -> ParseResult { + let datetime = chrono::DateTime::parse_from_rfc3339(s)?; + Ok(Self(datetime.with_timezone(&Local))) } /// Returns an RFC 2822 date and time string. #[inline] pub fn to_utc_string(&self) -> String { - let datetime = self.0.to_offset(UtcOffset::UTC).format(&Rfc2822).unwrap(); + let datetime = self.0.with_timezone(&Utc).to_rfc2822(); format!("{} GMT", datetime.trim_end_matches(" +0000")) } + + /// Return an RFC 3339 and ISO 8601 date and time string with subseconds + /// formatted as [`SecondsFormat::Millis`](chrono::SecondsFormat::Millis). + #[inline] + pub fn to_iso_string(&self) -> String { + self.0.to_rfc3339_opts(SecondsFormat::Millis, true) + } } impl Default for DateTime { + /// Returns an instance which corresponds to the current date. fn default() -> Self { - Self(OffsetDateTime::now_utc()) + Self::now() } } -impl From for OffsetDateTime { - fn from(t: DateTime) -> Self { - t.0 +impl From for chrono::DateTime { + fn from(dt: DateTime) -> Self { + dt.0 + } +} + +impl From> for DateTime { + fn from(dt: chrono::DateTime) -> Self { + Self(dt) } } impl FromStr for DateTime { - type Err = Parse; + type Err = ParseError; fn from_str(s: &str) -> Result { - OffsetDateTime::parse(s, &Rfc3339).map(Self) + chrono::DateTime::::from_str(s).map(Self) } } impl fmt::Display for DateTime { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) + write!( + f, + "{}", + self.0.to_rfc3339_opts(SecondsFormat::Micros, false) + ) } } @@ -81,7 +118,12 @@ impl Add for DateTime { #[inline] fn add(self, rhs: Duration) -> Self { - Self(self.0 + rhs) + let duration = chrono::Duration::from_std(rhs).expect("Duration value is out of range"); + let datetime = self + .0 + .checked_add_signed(duration) + .expect("`DateTime + Duration` overflowed"); + Self(datetime) } } @@ -97,7 +139,12 @@ impl Sub for DateTime { #[inline] fn sub(self, rhs: Duration) -> Self { - Self(self.0 - rhs) + let duration = chrono::Duration::from_std(rhs).expect("Duration value is out of range"); + let datetime = self + .0 + .checked_sub_signed(duration) + .expect("`DateTime - Duration` overflowed"); + Self(datetime) } } diff --git a/zino-core/src/lib.rs b/zino-core/src/lib.rs index 35d3c26..214a22a 100644 --- a/zino-core/src/lib.rs +++ b/zino-core/src/lib.rs @@ -4,6 +4,7 @@ #![feature(iter_intersperse)] #![feature(once_cell)] #![feature(string_leak)] +#![feature(type_alias_impl_trait)] mod application; mod authentication; @@ -13,6 +14,7 @@ mod database; mod datetime; mod request; mod response; +mod schedule; mod state; // Reexports. @@ -23,6 +25,7 @@ pub use database::{Column, ConnectionPool, Model, Mutation, Query, Schema}; pub use datetime::DateTime; pub use request::{Context, RequestContext, Validation}; pub use response::{Rejection, Response, ResponseCode}; +pub use schedule::{AsyncCronJob, CronJob, Job, JobScheduler}; pub use state::State; /// A JSON key/value type. @@ -30,3 +33,6 @@ pub type Map = serde_json::Map; /// A UUID is a unique 128-bit number, stored as 16 octets. pub type Uuid = uuid::Uuid; + +/// An owned dynamically typed Future. +pub type BoxFuture<'a, T = ()> = futures::future::BoxFuture<'a, T>; diff --git a/zino-core/src/request/validation.rs b/zino-core/src/request/validation.rs index 54d935d..7bd9b60 100644 --- a/zino-core/src/request/validation.rs +++ b/zino-core/src/request/validation.rs @@ -122,7 +122,7 @@ impl Validation { /// Parses a json value as `DateTime`. pub fn parse_datetime<'a>( value: impl Into>, - ) -> Option> { + ) -> Option> { value.into().and_then(|v| v.as_str()).map(|s| s.parse()) } diff --git a/zino-core/src/schedule/mod.rs b/zino-core/src/schedule/mod.rs new file mode 100644 index 0000000..ff7f495 --- /dev/null +++ b/zino-core/src/schedule/mod.rs @@ -0,0 +1,177 @@ +use crate::{BoxFuture, DateTime, Map, Uuid}; +use chrono::Local; +use cron::Schedule; +use std::{str::FromStr, time::Duration}; + +/// Cron job. +pub type CronJob = fn(Uuid, &mut Map); + +/// Async cron job. +pub type AsyncCronJob = for<'a> fn(Uuid, &'a mut Map) -> BoxFuture<'a>; + +/// Exectuable job. +enum ExecutableJob { + Fn(CronJob), + AsyncFn(AsyncCronJob), +} + +/// A schedulable `Job`. +pub struct Job { + id: Uuid, + data: Map, + schedule: Schedule, + run: ExecutableJob, + last_tick: Option>, +} + +impl Job { + /// Creates a new `Job`. + #[inline] + pub fn new(cron_expr: &str, exec: CronJob) -> Self { + let schedule = Schedule::from_str(cron_expr).unwrap(); + Job { + id: Uuid::new_v4(), + data: Map::new(), + schedule, + run: ExecutableJob::Fn(exec), + last_tick: None, + } + } + + /// Creates a new async `Job`. + #[inline] + pub fn new_async(cron_expr: &str, exec: AsyncCronJob) -> Self { + let schedule = Schedule::from_str(cron_expr).unwrap(); + Job { + id: Uuid::new_v4(), + data: Map::new(), + schedule, + run: ExecutableJob::AsyncFn(exec), + last_tick: None, + } + } + + /// Returns the job ID. + #[inline] + pub fn job_id(&self) -> Uuid { + self.id + } + + /// Returns a reference to the job data. + #[inline] + pub fn job_data(&self) -> &Map { + &self.data + } + + /// Sets last tick. + #[inline] + pub fn set_last_tick(&mut self, last_tick: impl Into>) { + self.last_tick = last_tick.into().map(|dt| dt.into()); + } + + /// Executes missed runs. + pub fn tick(&mut self) { + let now = Local::now(); + if let Some(ref last_tick) = self.last_tick { + for event in self.schedule.after(last_tick) { + if event > now { + break; + } + match self.run { + ExecutableJob::Fn(exec) => exec(self.id, &mut self.data), + ExecutableJob::AsyncFn(_exec) => tracing::warn!("job {} is async", self.id), + } + } + } + self.last_tick = Some(now); + } + + /// Executes missed runs asynchronously. + pub async fn tick_async(&mut self) { + let now = Local::now(); + if let Some(ref last_tick) = self.last_tick { + for event in self.schedule.after(last_tick) { + if event > now { + break; + } + match self.run { + ExecutableJob::Fn(_exec) => tracing::warn!("job {} is not async", self.id), + ExecutableJob::AsyncFn(exec) => exec(self.id, &mut self.data).await, + } + } + } + self.last_tick = Some(now); + } +} + +/// A type contains and executes the scheduled jobs. +#[derive(Default)] +pub struct JobScheduler { + jobs: Vec, +} + +impl JobScheduler { + /// Creates a new `JobScheduler`. + #[inline] + pub fn new() -> Self { + Self { jobs: Vec::new() } + } + + /// Adds a job to the `JobScheduler` and returns the job ID. + pub fn add(&mut self, job: Job) -> Uuid { + let job_id = job.id; + self.jobs.push(job); + job_id + } + + /// Removes a job by ID from the `JobScheduler`. + pub fn remove(&mut self, job_id: Uuid) -> bool { + let position = self.jobs.iter().position(|job| job.id == job_id); + match position { + Some(index) => { + self.jobs.remove(index); + true + } + None => false, + } + } + + /// The `tick` method increments time for the `JobScheduler` and executes + /// any pending jobs. It is recommended to sleep for at least 500 + /// milliseconds between invocations of this method. + pub fn tick(&mut self) { + for job in &mut self.jobs { + job.tick(); + } + } + + /// The `tick_async` method increments time for the `JobScheduler` and executes + /// any pending jobs asynchronously. It is recommended to sleep for at least 500 + /// milliseconds between invocations of this method. + pub async fn tick_async(&mut self) { + for job in &mut self.jobs { + job.tick_async().await; + } + } + + /// The `time_till_next_job` method returns the duration till the next job + /// is supposed to run. This can be used to sleep until then without waking + /// up at a fixed interval. + pub fn time_till_next_job(&self) -> Duration { + if self.jobs.is_empty() { + Duration::from_millis(500) + } else { + let mut duration = chrono::Duration::zero(); + let now = Local::now(); + for job in self.jobs.iter() { + for event in job.schedule.after(&now).take(1) { + let d = event - now; + if duration.is_zero() || d < duration { + duration = d; + } + } + } + duration.to_std().unwrap_or(Duration::from_millis(500)) + } + } +} diff --git a/zino-derive/src/parser.rs b/zino-derive/src/parser.rs index 8886416..342d07b 100644 --- a/zino-derive/src/parser.rs +++ b/zino-derive/src/parser.rs @@ -19,10 +19,8 @@ pub(crate) fn get_type_name(ty: &Type) -> String { /// Parses an attribute and returns a list of arguments pub(crate) fn parse_attr(attr: &Attribute) -> Vec<(String, Option)> { if let Ok(meta) = attr.parse_meta() { - if let Some(ident) = meta.path().get_ident() { - if *ident != "schema" { - return Vec::new(); - } + if let Some(ident) = meta.path().get_ident() && *ident != "schema" { + return Vec::new(); } if let Meta::List(list) = meta { let mut arguments = Vec::new(); diff --git a/zino-model/src/collection.rs b/zino-model/src/collection.rs index 6a4c7a5..6e2e269 100644 --- a/zino-model/src/collection.rs +++ b/zino-model/src/collection.rs @@ -34,9 +34,9 @@ pub struct Collection { // Revisions. manager_id: Uuid, // user.id maintainer_id: Uuid, // user.id - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] created_at: DateTime, - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] updated_at: DateTime, version: u64, edition: u32, @@ -46,8 +46,6 @@ impl Model for Collection { fn new() -> Self { Self { id: Uuid::new_v4(), - created_at: DateTime::now(), - updated_at: DateTime::now(), ..Self::default() } } diff --git a/zino-model/src/dataset.rs b/zino-model/src/dataset.rs index d0d46b7..322802f 100644 --- a/zino-model/src/dataset.rs +++ b/zino-model/src/dataset.rs @@ -23,9 +23,7 @@ pub struct Dataset { // Info fields. project_id: Uuid, // group.id, group.namespace = "*:project", group.subject = "user" task_id: Option, // task.id - #[schema(default = "now")] valid_from: DateTime, - #[schema(default = "now")] expires_at: DateTime, #[schema(index = "gin")] tags: Vec, // tag.id, tag.namespace = "*:dataset" @@ -38,9 +36,9 @@ pub struct Dataset { // Revisions. manager_id: Uuid, // user.id maintainer_id: Uuid, // user.id - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] created_at: DateTime, - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] updated_at: DateTime, version: u64, edition: u32, @@ -50,10 +48,6 @@ impl Model for Dataset { fn new() -> Self { Self { id: Uuid::new_v4(), - valid_from: DateTime::now(), - expires_at: DateTime::now(), - created_at: DateTime::now(), - updated_at: DateTime::now(), ..Self::default() } } diff --git a/zino-model/src/group.rs b/zino-model/src/group.rs index eff05a4..172a6ac 100644 --- a/zino-model/src/group.rs +++ b/zino-model/src/group.rs @@ -37,9 +37,9 @@ pub struct Group { // Revisions. manager_id: Uuid, // user.id maintainer_id: Uuid, // user.id - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] created_at: DateTime, - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] updated_at: DateTime, version: u64, edition: u32, @@ -49,8 +49,6 @@ impl Model for Group { fn new() -> Self { Self { id: Uuid::new_v4(), - created_at: DateTime::now(), - updated_at: DateTime::now(), ..Self::default() } } diff --git a/zino-model/src/log.rs b/zino-model/src/log.rs index 0752d92..f8f2d9c 100644 --- a/zino-model/src/log.rs +++ b/zino-model/src/log.rs @@ -30,7 +30,7 @@ pub struct Log { #[schema(index = "text")] message: String, source: String, - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] recorded_at: DateTime, // Extensions. @@ -43,7 +43,6 @@ impl Model for Log { fn new() -> Self { Self { id: Uuid::new_v4(), - recorded_at: DateTime::now(), ..Self::default() } } diff --git a/zino-model/src/message.rs b/zino-model/src/message.rs index f18e86e..b205c97 100644 --- a/zino-model/src/message.rs +++ b/zino-model/src/message.rs @@ -37,9 +37,9 @@ pub struct Message { // Revisions. manager_id: Uuid, // user.id maintainer_id: Uuid, // user.id - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] created_at: DateTime, - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] updated_at: DateTime, version: u64, edition: u32, @@ -49,8 +49,6 @@ impl Model for Message { fn new() -> Self { Self { id: Uuid::new_v4(), - created_at: DateTime::now(), - updated_at: DateTime::now(), ..Self::default() } } diff --git a/zino-model/src/order.rs b/zino-model/src/order.rs index 46d559d..524966d 100644 --- a/zino-model/src/order.rs +++ b/zino-model/src/order.rs @@ -38,9 +38,9 @@ pub struct Order { // Revisions. manager_id: Uuid, // user.id maintainer_id: Uuid, // user.id - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] created_at: DateTime, - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] updated_at: DateTime, version: u64, edition: u32, @@ -50,8 +50,6 @@ impl Model for Order { fn new() -> Self { Self { id: Uuid::new_v4(), - created_at: DateTime::now(), - updated_at: DateTime::now(), ..Self::default() } } diff --git a/zino-model/src/policy.rs b/zino-model/src/policy.rs index e4e342b..99c2ec5 100644 --- a/zino-model/src/policy.rs +++ b/zino-model/src/policy.rs @@ -26,9 +26,7 @@ pub struct Policy { resource: String, actions: Vec, effect: String, - #[schema(default = "now")] valid_from: DateTime, - #[schema(default = "now")] expires_at: DateTime, #[schema(index = "gin")] tags: Vec, // tag.id, tag.namespace = "*:policy" @@ -41,9 +39,9 @@ pub struct Policy { // Revisions. manager_id: Uuid, // user.id maintainer_id: Uuid, // user.id - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] created_at: DateTime, - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] updated_at: DateTime, version: u64, edition: u32, @@ -53,8 +51,6 @@ impl Model for Policy { fn new() -> Self { Self { id: Uuid::new_v4(), - created_at: DateTime::now(), - updated_at: DateTime::now(), ..Self::default() } } diff --git a/zino-model/src/record.rs b/zino-model/src/record.rs index 5a038e6..57d82af 100644 --- a/zino-model/src/record.rs +++ b/zino-model/src/record.rs @@ -23,7 +23,7 @@ pub struct Record { // Info fields. integrity: String, signature: String, - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] recorded_at: DateTime, // Extensions. @@ -36,7 +36,6 @@ impl Model for Record { fn new() -> Self { Self { id: Uuid::new_v4(), - recorded_at: DateTime::now(), ..Self::default() } } diff --git a/zino-model/src/resource.rs b/zino-model/src/resource.rs index 3ff6857..eecb217 100644 --- a/zino-model/src/resource.rs +++ b/zino-model/src/resource.rs @@ -38,9 +38,9 @@ pub struct Resource { // Revisions. manager_id: Uuid, // user.id maintainer_id: Uuid, // user.id - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] created_at: DateTime, - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] updated_at: DateTime, version: u64, edition: u32, @@ -50,8 +50,6 @@ impl Model for Resource { fn new() -> Self { Self { id: Uuid::new_v4(), - created_at: DateTime::now(), - updated_at: DateTime::now(), ..Self::default() } } diff --git a/zino-model/src/source.rs b/zino-model/src/source.rs index 9a4c3de..8e46b6e 100644 --- a/zino-model/src/source.rs +++ b/zino-model/src/source.rs @@ -32,9 +32,9 @@ pub struct Source { // Revisions. manager_id: Uuid, // user.id maintainer_id: Uuid, // user.id - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] created_at: DateTime, - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] updated_at: DateTime, version: u64, edition: u32, @@ -44,8 +44,6 @@ impl Model for Source { fn new() -> Self { Self { id: Uuid::new_v4(), - created_at: DateTime::now(), - updated_at: DateTime::now(), ..Self::default() } } diff --git a/zino-model/src/tag.rs b/zino-model/src/tag.rs index efcda3e..e7d52e6 100644 --- a/zino-model/src/tag.rs +++ b/zino-model/src/tag.rs @@ -33,9 +33,9 @@ pub struct Tag { // Revisions. manager_id: Uuid, // user.id maintainer_id: Uuid, // user.id - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] created_at: DateTime, - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] updated_at: DateTime, version: u64, edition: u32, @@ -45,8 +45,6 @@ impl Model for Tag { fn new() -> Self { Self { id: Uuid::new_v4(), - created_at: DateTime::now(), - updated_at: DateTime::now(), ..Self::default() } } diff --git a/zino-model/src/task.rs b/zino-model/src/task.rs index 0054aea..6a19e2f 100644 --- a/zino-model/src/task.rs +++ b/zino-model/src/task.rs @@ -26,14 +26,10 @@ pub struct Task { output_id: Option, // source.id #[schema(index = "gin")] dependencies: Vec, // task.id - #[schema(default = "now")] valid_from: DateTime, - #[schema(default = "now")] expires_at: DateTime, schedule: String, - #[schema(default = "epoch")] last_time: DateTime, - #[schema(default = "epoch")] next_time: DateTime, priority: u16, #[schema(index = "gin")] @@ -47,9 +43,9 @@ pub struct Task { // Revisions. manager_id: Uuid, // user.id maintainer_id: Uuid, // user.id - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] created_at: DateTime, - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] updated_at: DateTime, version: u64, edition: u32, @@ -59,10 +55,6 @@ impl Model for Task { fn new() -> Self { Self { id: Uuid::new_v4(), - valid_from: DateTime::now(), - expires_at: DateTime::now(), - created_at: DateTime::now(), - updated_at: DateTime::now(), ..Self::default() } } diff --git a/zino-model/src/user.rs b/zino-model/src/user.rs index 57131ac..8f9d9d8 100644 --- a/zino-model/src/user.rs +++ b/zino-model/src/user.rs @@ -42,9 +42,9 @@ pub struct User { // Revisions. manager_id: Uuid, // user.id maintainer_id: Uuid, // user.id - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] created_at: DateTime, - #[schema(default = "now", index = "btree")] + #[schema(index = "btree")] updated_at: DateTime, version: u64, edition: u32, @@ -55,8 +55,6 @@ impl Model for User { Self { id: Uuid::new_v4(), access_key_id: AccessKeyId::new().to_string(), - created_at: DateTime::now(), - updated_at: DateTime::now(), ..Self::default() } } diff --git a/zino/Cargo.toml b/zino/Cargo.toml index 16a9c6b..0dd8c4f 100644 --- a/zino/Cargo.toml +++ b/zino/Cargo.toml @@ -26,7 +26,7 @@ serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.91" } serde_qs = { version = "0.10.1" } serde_urlencoded = { version = "0.7.1" } -tokio = { version = "1.23.0", features = ["rt-multi-thread", "sync"], optional = true } +tokio = { version = "1.24.1", features = ["rt-multi-thread", "sync"], optional = true } tokio-stream = { version = "0.1.11", features = ["sync"], optional = true } toml = { version = "0.5.10" } tower = { version = "0.4.13", features = ["timeout"], optional = true } diff --git a/zino/src/cluster/axum_cluster.rs b/zino/src/cluster/axum_cluster.rs index 3fb02ce..422b7fb 100644 --- a/zino/src/cluster/axum_cluster.rs +++ b/zino/src/cluster/axum_cluster.rs @@ -13,6 +13,7 @@ use std::{ net::SocketAddr, path::Path, sync::{Arc, LazyLock}, + thread, time::{Duration, Instant}, }; use tokio::runtime::Builder; @@ -25,7 +26,7 @@ use tower_http::{ compression::CompressionLayer, services::{ServeDir, ServeFile}, }; -use zino_core::{Application, Response, State}; +use zino_core::{Application, AsyncCronJob, Job, JobScheduler, Response, State}; /// An HTTP server cluster for `axum`. pub struct AxumCluster { @@ -37,7 +38,7 @@ pub struct AxumCluster { impl Application for AxumCluster { /// Router. - type Router = HashMap<&'static str, Router>; + type Router = Router; /// Creates a new application. fn new() -> Self { @@ -47,20 +48,33 @@ impl Application for AxumCluster { } } - /// Registers the router. - fn register(mut self, routes: Self::Router) -> Self { - self.routes = routes; - self - } - /// Returns the start time. #[inline] fn start_time(&self) -> Instant { self.start_time } + /// Registers routes. + fn register(mut self, routes: HashMap<&'static str, Self::Router>) -> Self { + self.routes = routes; + self + } + /// Runs the application. - fn run(self) -> io::Result<()> { + fn run(self, async_jobs: HashMap<&'static str, AsyncCronJob>) -> io::Result<()> { + let mut scheduler = JobScheduler::new(); + for (cron_expr, exec) in async_jobs { + scheduler.add(Job::new_async(cron_expr, exec)); + } + + let runtime = Builder::new_multi_thread().enable_all().build()?; + runtime.spawn(async move { + loop { + scheduler.tick_async().await; + thread::sleep(scheduler.time_till_next_job()); + } + }); + let current_dir = env::current_dir().unwrap(); let project_dir = Path::new(¤t_dir); let public_dir = project_dir.join("./public"); @@ -86,76 +100,73 @@ impl Application for AxumCluster { ) }, ); - Builder::new_multi_thread() - .enable_all() - .build()? - .block_on(async { - let routes = self.routes; - let shared_state = State::shared(); - let app_env = shared_state.env(); - tracing::info!("load config.{app_env}.toml"); - let listeners = shared_state.listeners(); - let servers = listeners.iter().map(|listener| { - let mut app = Router::new() - .route_service("/", serve_file_service.clone()) - .nest_service("/public", serve_dir_service.clone()) - .route("/sse", routing::get(crate::endpoint::axum_sse::sse_handler)) - .route( - "/websocket", - routing::get(crate::endpoint::axum_websocket::websocket_handler), - ); - for (path, route) in &routes { - app = app.nest(path, route.clone()); - } + runtime.block_on(async { + let routes = self.routes; + let shared_state = State::shared(); + let app_env = shared_state.env(); + tracing::info!("load config.{app_env}.toml"); - let state = Arc::new(State::default()); - app = app - .fallback_service(tower::service_fn(|_| async { - let res = Response::new(StatusCode::NOT_FOUND); - Ok::>, Infallible>(res.into()) - })) - .layer( - ServiceBuilder::new() - .layer(LazyLock::force( - &crate::middleware::tower_tracing::TRACING_MIDDLEWARE, - )) - .layer(LazyLock::force( - &crate::middleware::tower_cors::CORS_MIDDLEWARE, - )) - .layer(middleware::from_fn( - crate::middleware::axum_context::request_context, - )) - .layer(DefaultBodyLimit::disable()) - .layer(AddExtensionLayer::new(state)) - .layer(CompressionLayer::new()) - .layer(HandleErrorLayer::new(|err: BoxError| async move { - let status_code = if err.is::() { - StatusCode::REQUEST_TIMEOUT - } else if err.is::() { - StatusCode::PAYLOAD_TOO_LARGE - } else { - StatusCode::INTERNAL_SERVER_ERROR - }; - let res = Response::new(status_code); - Ok::>, Infallible>(res.into()) - })) - .layer(TimeoutLayer::new(Duration::from_secs(10))), - ); - - let addr = listener - .parse() - .inspect(|addr| tracing::info!(env = app_env, "listen on {addr}")) - .unwrap_or_else(|_| panic!("invalid socket address: {listener}")); - Server::bind(&addr) - .serve(app.into_make_service_with_connect_info::()) - }); - for result in future::join_all(servers).await { - if let Err(err) = result { - tracing::error!("server error: {err}"); - } + let listeners = shared_state.listeners(); + let servers = listeners.iter().map(|listener| { + let mut app = Router::new() + .route_service("/", serve_file_service.clone()) + .nest_service("/public", serve_dir_service.clone()) + .route("/sse", routing::get(crate::endpoint::axum_sse::sse_handler)) + .route( + "/websocket", + routing::get(crate::endpoint::axum_websocket::websocket_handler), + ); + for (path, route) in &routes { + app = app.nest(path, route.clone()); } + + let state = Arc::new(State::default()); + app = app + .fallback_service(tower::service_fn(|_| async { + let res = Response::new(StatusCode::NOT_FOUND); + Ok::>, Infallible>(res.into()) + })) + .layer( + ServiceBuilder::new() + .layer(LazyLock::force( + &crate::middleware::tower_tracing::TRACING_MIDDLEWARE, + )) + .layer(LazyLock::force( + &crate::middleware::tower_cors::CORS_MIDDLEWARE, + )) + .layer(middleware::from_fn( + crate::middleware::axum_context::request_context, + )) + .layer(DefaultBodyLimit::disable()) + .layer(AddExtensionLayer::new(state)) + .layer(CompressionLayer::new()) + .layer(HandleErrorLayer::new(|err: BoxError| async move { + let status_code = if err.is::() { + StatusCode::REQUEST_TIMEOUT + } else if err.is::() { + StatusCode::PAYLOAD_TOO_LARGE + } else { + StatusCode::INTERNAL_SERVER_ERROR + }; + let res = Response::new(status_code); + Ok::>, Infallible>(res.into()) + })) + .layer(TimeoutLayer::new(Duration::from_secs(10))), + ); + + let addr = listener + .parse() + .inspect(|addr| tracing::info!(env = app_env, "listen on {addr}")) + .unwrap_or_else(|_| panic!("invalid socket address: {listener}")); + Server::bind(&addr).serve(app.into_make_service_with_connect_info::()) }); + for result in future::join_all(servers).await { + if let Err(err) = result { + tracing::error!("server error: {err}"); + } + } + }); Ok(()) } }