Add schedule

This commit is contained in:
photino 2023-01-07 01:08:41 +08:00
parent af815b0ea7
commit ca7a4746af
33 changed files with 494 additions and 187 deletions

View File

@ -1,5 +1,6 @@
use crate::{Request, RequestContext, Response};
use serde_json::json; use serde_json::json;
use zino::Request;
use zino_core::{RequestContext, Response};
pub(crate) async fn index(req: Request) -> zino::Result { pub(crate) async fn index(req: Request) -> zino::Result {
let mut res = Response::default(); let mut res = Response::default();

View File

@ -1,5 +1,7 @@
use crate::{Model, Query, Rejection, Request, RequestContext, Response, Schema, User, Uuid};
use serde_json::json; use serde_json::json;
use zino::Request;
use zino_core::{Model, Query, Rejection, RequestContext, Response, Schema, Uuid};
use zino_model::User;
pub(crate) async fn new(mut req: Request) -> zino::Result { pub(crate) async fn new(mut req: Request) -> zino::Result {
let mut user = User::new(); let mut user = User::new();

View File

@ -1,11 +1,12 @@
mod controller; mod controller;
mod router; mod router;
mod schedule;
/// Reexports. use zino_core::Application;
use zino::{AxumCluster, Request};
use zino_core::{Application, Model, Query, Rejection, RequestContext, Response, Schema, Uuid};
use zino_model::User;
fn main() -> std::io::Result<()> { fn main() -> std::io::Result<()> {
AxumCluster::new().register(router::init()).run() zino::AxumCluster::new()
.register(router::init_routes())
.spawn(schedule::init_jobs())
.run(schedule::init_async_jobs())
} }

View File

@ -5,7 +5,7 @@ use axum::{
}; };
use std::collections::HashMap; use std::collections::HashMap;
pub(crate) fn init() -> HashMap<&'static str, Router> { pub(crate) fn init_routes() -> HashMap<&'static str, Router> {
let mut routes = HashMap::new(); let mut routes = HashMap::new();
// User controller. // User controller.

View File

@ -0,0 +1,38 @@
use zino_core::{BoxFuture, DateTime, Map, Query, Schema, Uuid};
use zino_model::User;
pub(super) fn every_15s(job_id: Uuid, job_data: &mut Map) {
let counter = job_data
.get("counter")
.map(|c| c.as_u64().unwrap_or_default() + 1)
.unwrap_or_default();
job_data.insert("current".to_string(), DateTime::now().to_string().into());
job_data.insert("counter".to_string(), counter.into());
println!("Job {job_id} is executed every 15 seconds: {job_data:?}");
}
pub(super) fn every_20s(job_id: Uuid, job_data: &mut Map) {
let counter = job_data
.get("counter")
.map(|c| c.as_u64().unwrap_or_default() + 1)
.unwrap_or_default();
job_data.insert("current".to_string(), DateTime::now().to_string().into());
job_data.insert("counter".to_string(), counter.into());
println!("Job {job_id} is executed every 20 seconds: {job_data:?}");
}
pub(super) fn every_30s(job_id: Uuid, job_data: &mut Map) -> BoxFuture {
let counter = job_data
.get("counter")
.map(|c| c.as_u64().unwrap_or_default() + 1)
.unwrap_or_default();
job_data.insert("current".to_string(), DateTime::now().to_string().into());
job_data.insert("counter".to_string(), counter.into());
println!("Job {job_id} is executed every 45 seconds: {job_data:?}");
Box::pin(async {
let query = Query::new();
let users = User::find(query).await.unwrap();
job_data.insert("users".to_string(), users.len().into());
})
}

View File

@ -0,0 +1,24 @@
use std::collections::HashMap;
use zino_core::{AsyncCronJob, CronJob};
mod job;
pub(crate) fn init_jobs() -> HashMap<&'static str, CronJob> {
let mut jobs = HashMap::new();
let run_every_15s: CronJob = job::every_15s;
let run_every_20s: CronJob = job::every_20s;
jobs.insert("0/15 * * * * *", run_every_15s);
jobs.insert("0/20 * * * * *", run_every_20s);
jobs
}
pub(crate) fn init_async_jobs() -> HashMap<&'static str, AsyncCronJob> {
let mut async_jobs = HashMap::new();
let run_every_30s: AsyncCronJob = job::every_30s;
async_jobs.insert("0/30 * * * * *", run_every_30s);
async_jobs
}

View File

@ -27,4 +27,4 @@ user = "postgres"
password = "G76hTg8T5Aa+SZQFc+0QnsRLo1UOjqpkp/jUQ+lySc8QCt4B" password = "G76hTg8T5Aa+SZQFc+0QnsRLo1UOjqpkp/jUQ+lySc8QCt4B"
[tracing] [tracing]
filter = "sqlx=warn,tower_http=info,zino=info,zino_core=info" filter = "sqlx=warn,tower_http=warn,zino=info,zino_core=info"

View File

@ -17,6 +17,8 @@ aes-gcm-siv = { version = "0.11.1" }
async-trait = { version = "0.1.60" } async-trait = { version = "0.1.60" }
base64 = { version = "0.20.0" } base64 = { version = "0.20.0" }
bytes = { version = "1.3.0" } bytes = { version = "1.3.0" }
chrono = { version = "0.4.23", features = ["serde"] }
cron = { version = "0.12.0" }
futures = { version = "0.3.25" } futures = { version = "0.3.25" }
hmac = { version = "0.12.1" } hmac = { version = "0.12.1" }
http = { version = "0.2.8" } http = { version = "0.2.8" }
@ -26,8 +28,7 @@ rand = { version = "0.8.5" }
serde = { version = "1.0.152", features = ["derive"] } serde = { version = "1.0.152", features = ["derive"] }
serde_json = { version = "1.0.91" } serde_json = { version = "1.0.91" }
sha2 = { version = "0.10.6" } sha2 = { version = "0.10.6" }
sqlx = { version = "0.6.2", features = ["runtime-tokio-native-tls", "postgres", "uuid", "time", "json"] } sqlx = { version = "0.6.2", features = ["runtime-tokio-native-tls", "postgres", "uuid", "chrono", "json"] }
time = { version = "0.3.17", features = ["local-offset", "parsing", "serde"] }
tracing = { version = "0.1.37" } tracing = { version = "0.1.37" }
toml = { version = "0.5.10" } toml = { version = "0.5.10" }
url = { version = "2.3.1" } url = { version = "2.3.1" }

View File

@ -1,4 +1,5 @@
use std::{io, time::Instant}; use crate::{AsyncCronJob, CronJob, Job, JobScheduler};
use std::{collections::HashMap, io, thread, time::Instant};
/// Application. /// Application.
pub trait Application { pub trait Application {
@ -8,12 +9,28 @@ pub trait Application {
/// Creates a new application. /// Creates a new application.
fn new() -> Self; fn new() -> Self;
/// Registers the router.
fn register(self, routes: Self::Router) -> Self;
/// Returns the start time. /// Returns the start time.
fn start_time(&self) -> Instant; fn start_time(&self) -> Instant;
/// Registers routes.
fn register(self, routes: HashMap<&'static str, Self::Router>) -> Self;
/// Spawns a new thread to run jobs.
fn spawn(self, jobs: HashMap<&'static str, CronJob>) -> Self
where
Self: Sized,
{
let mut scheduler = JobScheduler::new();
for (cron_expr, exec) in jobs {
scheduler.add(Job::new(cron_expr, exec));
}
thread::spawn(move || loop {
scheduler.tick();
thread::sleep(scheduler.time_till_next_job());
});
self
}
/// Runs the application. /// Runs the application.
fn run(self) -> io::Result<()>; fn run(self, async_jobs: HashMap<&'static str, AsyncCronJob>) -> io::Result<()>;
} }

View File

@ -121,6 +121,20 @@ impl SecurityToken {
pub fn as_str(&self) -> &str { pub fn as_str(&self) -> &str {
self.token.as_str() self.token.as_str()
} }
/// Encrypts the plaintext using AES-GCM-SIV.
pub fn encrypt(key: impl AsRef<[u8]>, plaintext: impl AsRef<[u8]>) -> Option<String> {
crypto::encrypt(key.as_ref(), plaintext.as_ref())
.ok()
.map(base64::encode)
}
/// Decrypts the data using AES-GCM-SIV.
pub fn decrypt(key: impl AsRef<[u8]>, data: impl AsRef<[u8]>) -> Option<String> {
base64::decode(data)
.ok()
.and_then(|cipher| crypto::decrypt(key.as_ref(), &cipher).ok())
}
} }
impl fmt::Display for SecurityToken { impl fmt::Display for SecurityToken {

View File

@ -2,7 +2,8 @@ use crate::{DateTime, Map};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
/// Cloud event. See [the spec](https://github.com/cloudevents/spec/blob/v1.0.1/spec.md). /// Cloud event.
/// See [the spec](https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md).
#[derive(Debug, Clone, Default, Serialize, Deserialize)] #[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
#[serde(default)] #[serde(default)]

View File

@ -1,8 +1,8 @@
use crate::{Map, Uuid}; use crate::{Map, Uuid};
use chrono::{DateTime, Local, SecondsFormat};
use serde::Serialize; use serde::Serialize;
use serde_json::Value; use serde_json::Value;
use sqlx::{postgres::PgRow, Column as _, Error, Row, TypeInfo}; use sqlx::{postgres::PgRow, Column as _, Error, Row, TypeInfo};
use time::OffsetDateTime;
/// A column is a model field with associated metadata. /// A column is a model field with associated metadata.
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
@ -140,8 +140,10 @@ impl<'a> Column<'a> {
"bool" => row.try_get_unchecked::<bool, _>(key)?.into(), "bool" => row.try_get_unchecked::<bool, _>(key)?.into(),
"String" => row.try_get_unchecked::<String, _>(key)?.into(), "String" => row.try_get_unchecked::<String, _>(key)?.into(),
"DateTime" => { "DateTime" => {
let datetime = row.try_get_unchecked::<OffsetDateTime, _>(key)?; let datetime = row.try_get_unchecked::<DateTime<Local>, _>(key)?;
datetime.to_string().into() datetime
.to_rfc3339_opts(SecondsFormat::Micros, false)
.into()
} }
"Uuid" | "Option<Uuid>" => row.try_get_unchecked::<Uuid, _>(key)?.to_string().into(), "Uuid" | "Option<Uuid>" => row.try_get_unchecked::<Uuid, _>(key)?.to_string().into(),
"Vec<u8>" => row.try_get_unchecked::<Vec<u8>, _>(key)?.into(), "Vec<u8>" => row.try_get_unchecked::<Vec<u8>, _>(key)?.into(),
@ -175,8 +177,10 @@ impl<'a> Column<'a> {
"BOOL" => row.try_get_unchecked::<bool, _>(key)?.into(), "BOOL" => row.try_get_unchecked::<bool, _>(key)?.into(),
"TEXT" | "VARCHAR" => row.try_get_unchecked::<String, _>(key)?.into(), "TEXT" | "VARCHAR" => row.try_get_unchecked::<String, _>(key)?.into(),
"TIMESTAMPTZ" => { "TIMESTAMPTZ" => {
let datetime = row.try_get_unchecked::<OffsetDateTime, _>(key)?; let datetime = row.try_get_unchecked::<DateTime<Local>, _>(key)?;
datetime.to_string().into() datetime
.to_rfc3339_opts(SecondsFormat::Micros, false)
.into()
} }
"UUID" => row.try_get_unchecked::<Uuid, _>(key)?.to_string().into(), "UUID" => row.try_get_unchecked::<Uuid, _>(key)?.to_string().into(),
"BYTEA" => row.try_get_unchecked::<Vec<u8>, _>(key)?.into(), "BYTEA" => row.try_get_unchecked::<Vec<u8>, _>(key)?.into(),

View File

@ -1,3 +1,4 @@
use crate::crypto;
use sqlx::{postgres::PgPoolOptions, Error, PgPool}; use sqlx::{postgres::PgPoolOptions, Error, PgPool};
use toml::value::Table; use toml::value::Table;
@ -44,7 +45,7 @@ impl ConnectionPool {
.as_str() .as_str()
.expect("the `postgres.password` field should be a str"); .expect("the `postgres.password` field should be a str");
let key = format!("{user}@{database}"); let key = format!("{user}@{database}");
crate::crypto::encrypt(key.as_bytes(), password.as_bytes()) crypto::encrypt(key.as_bytes(), password.as_bytes())
.ok() .ok()
.map(base64::encode) .map(base64::encode)
} }
@ -78,7 +79,7 @@ impl ConnectionPool {
.expect("the `postgres.password` field should be a str"); .expect("the `postgres.password` field should be a str");
if let Ok(data) = base64::decode(password) { if let Ok(data) = base64::decode(password) {
let key = format!("{user}@{database}"); let key = format!("{user}@{database}");
if let Ok(plaintext) = crate::crypto::decrypt(key.as_bytes(), &data) { if let Ok(plaintext) = crypto::decrypt(key.as_bytes(), &data) {
password = plaintext.leak(); password = plaintext.leak();
} }
} }

View File

@ -1,3 +1,7 @@
use chrono::{
format::{ParseError, ParseResult},
Local, SecondsFormat, TimeZone, Utc,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
fmt, fmt,
@ -5,74 +9,107 @@ use std::{
str::FromStr, str::FromStr,
time::Duration, time::Duration,
}; };
use time::{
error::Parse,
format_description::well_known::{Rfc2822, Rfc3339},
OffsetDateTime, UtcOffset,
};
/// ISO 8601 combined date and time with local time zone. /// ISO 8601 combined date and time with local time zone.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct DateTime(OffsetDateTime); pub struct DateTime(chrono::DateTime<Local>);
impl DateTime { impl DateTime {
/// Returns a new instance which corresponds to the current date. /// Returns a new instance which corresponds to the current date.
#[inline] #[inline]
pub fn now() -> Self { pub fn now() -> Self {
Self(OffsetDateTime::now_utc()) Self(Local::now())
} }
/// Returns a new instance corresponding to a UTC date and time, /// Returns a new instance corresponding to a UTC date and time,
/// from the number of non-leap seconds since the midnight UTC on January 1, 1970. /// from the number of non-leap seconds since the midnight UTC on January 1, 1970.
#[inline] #[inline]
pub fn from_timestamp(secs: i64) -> Self { pub fn from_timestamp(secs: i64) -> Self {
Self(OffsetDateTime::from_unix_timestamp(secs).unwrap()) Self(Local.timestamp_opt(secs, 0).unwrap())
}
/// Returns a new instance corresponding to a UTC date and time,
/// from the number of non-leap milliseconds since the midnight UTC on January 1, 1970.
#[inline]
pub fn from_timestamp_millis(mills: i64) -> Self {
Self(Local.timestamp_millis_opt(mills).unwrap())
} }
/// Returns the number of non-leap seconds since January 1, 1970 0:00:00 UTC. /// Returns the number of non-leap seconds since January 1, 1970 0:00:00 UTC.
#[inline] #[inline]
pub fn timestamp(&self) -> i64 { pub fn timestamp(&self) -> i64 {
self.0.unix_timestamp() self.0.timestamp()
}
/// Returns the number of non-leap-milliseconds since January 1, 1970 UTC.
#[inline]
pub fn timestamp_millis(&self) -> i64 {
self.0.timestamp_millis()
} }
/// Parses an RFC 2822 date and time. /// Parses an RFC 2822 date and time.
#[inline] #[inline]
pub fn parse_utc_str(s: &str) -> Result<Self, Parse> { pub fn parse_utc_str(s: &str) -> ParseResult<Self> {
let datetime = OffsetDateTime::parse(s, &Rfc2822)?; let datetime = chrono::DateTime::parse_from_rfc2822(s)?;
Ok(Self(datetime)) Ok(Self(datetime.with_timezone(&Local)))
}
/// Parses an RFC 3339 and ISO 8601 date and time.
#[inline]
pub fn parse_iso_str(s: &str) -> ParseResult<Self> {
let datetime = chrono::DateTime::parse_from_rfc3339(s)?;
Ok(Self(datetime.with_timezone(&Local)))
} }
/// Returns an RFC 2822 date and time string. /// Returns an RFC 2822 date and time string.
#[inline] #[inline]
pub fn to_utc_string(&self) -> String { pub fn to_utc_string(&self) -> String {
let datetime = self.0.to_offset(UtcOffset::UTC).format(&Rfc2822).unwrap(); let datetime = self.0.with_timezone(&Utc).to_rfc2822();
format!("{} GMT", datetime.trim_end_matches(" +0000")) format!("{} GMT", datetime.trim_end_matches(" +0000"))
} }
/// Return an RFC 3339 and ISO 8601 date and time string with subseconds
/// formatted as [`SecondsFormat::Millis`](chrono::SecondsFormat::Millis).
#[inline]
pub fn to_iso_string(&self) -> String {
self.0.to_rfc3339_opts(SecondsFormat::Millis, true)
}
} }
impl Default for DateTime { impl Default for DateTime {
/// Returns an instance which corresponds to the current date.
fn default() -> Self { fn default() -> Self {
Self(OffsetDateTime::now_utc()) Self::now()
} }
} }
impl From<DateTime> for OffsetDateTime { impl From<DateTime> for chrono::DateTime<Local> {
fn from(t: DateTime) -> Self { fn from(dt: DateTime) -> Self {
t.0 dt.0
}
}
impl From<chrono::DateTime<Local>> for DateTime {
fn from(dt: chrono::DateTime<Local>) -> Self {
Self(dt)
} }
} }
impl FromStr for DateTime { impl FromStr for DateTime {
type Err = Parse; type Err = ParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> { fn from_str(s: &str) -> Result<Self, Self::Err> {
OffsetDateTime::parse(s, &Rfc3339).map(Self) chrono::DateTime::<Local>::from_str(s).map(Self)
} }
} }
impl fmt::Display for DateTime { impl fmt::Display for DateTime {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0) write!(
f,
"{}",
self.0.to_rfc3339_opts(SecondsFormat::Micros, false)
)
} }
} }
@ -81,7 +118,12 @@ impl Add<Duration> for DateTime {
#[inline] #[inline]
fn add(self, rhs: Duration) -> Self { fn add(self, rhs: Duration) -> Self {
Self(self.0 + rhs) let duration = chrono::Duration::from_std(rhs).expect("Duration value is out of range");
let datetime = self
.0
.checked_add_signed(duration)
.expect("`DateTime + Duration` overflowed");
Self(datetime)
} }
} }
@ -97,7 +139,12 @@ impl Sub<Duration> for DateTime {
#[inline] #[inline]
fn sub(self, rhs: Duration) -> Self { fn sub(self, rhs: Duration) -> Self {
Self(self.0 - rhs) let duration = chrono::Duration::from_std(rhs).expect("Duration value is out of range");
let datetime = self
.0
.checked_sub_signed(duration)
.expect("`DateTime - Duration` overflowed");
Self(datetime)
} }
} }

View File

@ -4,6 +4,7 @@
#![feature(iter_intersperse)] #![feature(iter_intersperse)]
#![feature(once_cell)] #![feature(once_cell)]
#![feature(string_leak)] #![feature(string_leak)]
#![feature(type_alias_impl_trait)]
mod application; mod application;
mod authentication; mod authentication;
@ -13,6 +14,7 @@ mod database;
mod datetime; mod datetime;
mod request; mod request;
mod response; mod response;
mod schedule;
mod state; mod state;
// Reexports. // Reexports.
@ -23,6 +25,7 @@ pub use database::{Column, ConnectionPool, Model, Mutation, Query, Schema};
pub use datetime::DateTime; pub use datetime::DateTime;
pub use request::{Context, RequestContext, Validation}; pub use request::{Context, RequestContext, Validation};
pub use response::{Rejection, Response, ResponseCode}; pub use response::{Rejection, Response, ResponseCode};
pub use schedule::{AsyncCronJob, CronJob, Job, JobScheduler};
pub use state::State; pub use state::State;
/// A JSON key/value type. /// A JSON key/value type.
@ -30,3 +33,6 @@ pub type Map = serde_json::Map<String, serde_json::Value>;
/// A UUID is a unique 128-bit number, stored as 16 octets. /// A UUID is a unique 128-bit number, stored as 16 octets.
pub type Uuid = uuid::Uuid; pub type Uuid = uuid::Uuid;
/// An owned dynamically typed Future.
pub type BoxFuture<'a, T = ()> = futures::future::BoxFuture<'a, T>;

View File

@ -122,7 +122,7 @@ impl Validation {
/// Parses a json value as `DateTime`. /// Parses a json value as `DateTime`.
pub fn parse_datetime<'a>( pub fn parse_datetime<'a>(
value: impl Into<Option<&'a Value>>, value: impl Into<Option<&'a Value>>,
) -> Option<Result<DateTime, time::error::Parse>> { ) -> Option<Result<DateTime, chrono::format::ParseError>> {
value.into().and_then(|v| v.as_str()).map(|s| s.parse()) value.into().and_then(|v| v.as_str()).map(|s| s.parse())
} }

View File

@ -0,0 +1,177 @@
use crate::{BoxFuture, DateTime, Map, Uuid};
use chrono::Local;
use cron::Schedule;
use std::{str::FromStr, time::Duration};
/// Cron job.
pub type CronJob = fn(Uuid, &mut Map);
/// Async cron job.
pub type AsyncCronJob = for<'a> fn(Uuid, &'a mut Map) -> BoxFuture<'a>;
/// Exectuable job.
enum ExecutableJob {
Fn(CronJob),
AsyncFn(AsyncCronJob),
}
/// A schedulable `Job`.
pub struct Job {
id: Uuid,
data: Map,
schedule: Schedule,
run: ExecutableJob,
last_tick: Option<chrono::DateTime<Local>>,
}
impl Job {
/// Creates a new `Job`.
#[inline]
pub fn new(cron_expr: &str, exec: CronJob) -> Self {
let schedule = Schedule::from_str(cron_expr).unwrap();
Job {
id: Uuid::new_v4(),
data: Map::new(),
schedule,
run: ExecutableJob::Fn(exec),
last_tick: None,
}
}
/// Creates a new async `Job`.
#[inline]
pub fn new_async(cron_expr: &str, exec: AsyncCronJob) -> Self {
let schedule = Schedule::from_str(cron_expr).unwrap();
Job {
id: Uuid::new_v4(),
data: Map::new(),
schedule,
run: ExecutableJob::AsyncFn(exec),
last_tick: None,
}
}
/// Returns the job ID.
#[inline]
pub fn job_id(&self) -> Uuid {
self.id
}
/// Returns a reference to the job data.
#[inline]
pub fn job_data(&self) -> &Map {
&self.data
}
/// Sets last tick.
#[inline]
pub fn set_last_tick(&mut self, last_tick: impl Into<Option<DateTime>>) {
self.last_tick = last_tick.into().map(|dt| dt.into());
}
/// Executes missed runs.
pub fn tick(&mut self) {
let now = Local::now();
if let Some(ref last_tick) = self.last_tick {
for event in self.schedule.after(last_tick) {
if event > now {
break;
}
match self.run {
ExecutableJob::Fn(exec) => exec(self.id, &mut self.data),
ExecutableJob::AsyncFn(_exec) => tracing::warn!("job {} is async", self.id),
}
}
}
self.last_tick = Some(now);
}
/// Executes missed runs asynchronously.
pub async fn tick_async(&mut self) {
let now = Local::now();
if let Some(ref last_tick) = self.last_tick {
for event in self.schedule.after(last_tick) {
if event > now {
break;
}
match self.run {
ExecutableJob::Fn(_exec) => tracing::warn!("job {} is not async", self.id),
ExecutableJob::AsyncFn(exec) => exec(self.id, &mut self.data).await,
}
}
}
self.last_tick = Some(now);
}
}
/// A type contains and executes the scheduled jobs.
#[derive(Default)]
pub struct JobScheduler {
jobs: Vec<Job>,
}
impl JobScheduler {
/// Creates a new `JobScheduler`.
#[inline]
pub fn new() -> Self {
Self { jobs: Vec::new() }
}
/// Adds a job to the `JobScheduler` and returns the job ID.
pub fn add(&mut self, job: Job) -> Uuid {
let job_id = job.id;
self.jobs.push(job);
job_id
}
/// Removes a job by ID from the `JobScheduler`.
pub fn remove(&mut self, job_id: Uuid) -> bool {
let position = self.jobs.iter().position(|job| job.id == job_id);
match position {
Some(index) => {
self.jobs.remove(index);
true
}
None => false,
}
}
/// The `tick` method increments time for the `JobScheduler` and executes
/// any pending jobs. It is recommended to sleep for at least 500
/// milliseconds between invocations of this method.
pub fn tick(&mut self) {
for job in &mut self.jobs {
job.tick();
}
}
/// The `tick_async` method increments time for the `JobScheduler` and executes
/// any pending jobs asynchronously. It is recommended to sleep for at least 500
/// milliseconds between invocations of this method.
pub async fn tick_async(&mut self) {
for job in &mut self.jobs {
job.tick_async().await;
}
}
/// The `time_till_next_job` method returns the duration till the next job
/// is supposed to run. This can be used to sleep until then without waking
/// up at a fixed interval.
pub fn time_till_next_job(&self) -> Duration {
if self.jobs.is_empty() {
Duration::from_millis(500)
} else {
let mut duration = chrono::Duration::zero();
let now = Local::now();
for job in self.jobs.iter() {
for event in job.schedule.after(&now).take(1) {
let d = event - now;
if duration.is_zero() || d < duration {
duration = d;
}
}
}
duration.to_std().unwrap_or(Duration::from_millis(500))
}
}
}

View File

@ -19,10 +19,8 @@ pub(crate) fn get_type_name(ty: &Type) -> String {
/// Parses an attribute and returns a list of arguments /// Parses an attribute and returns a list of arguments
pub(crate) fn parse_attr(attr: &Attribute) -> Vec<(String, Option<String>)> { pub(crate) fn parse_attr(attr: &Attribute) -> Vec<(String, Option<String>)> {
if let Ok(meta) = attr.parse_meta() { if let Ok(meta) = attr.parse_meta() {
if let Some(ident) = meta.path().get_ident() { if let Some(ident) = meta.path().get_ident() && *ident != "schema" {
if *ident != "schema" { return Vec::new();
return Vec::new();
}
} }
if let Meta::List(list) = meta { if let Meta::List(list) = meta {
let mut arguments = Vec::new(); let mut arguments = Vec::new();

View File

@ -34,9 +34,9 @@ pub struct Collection {
// Revisions. // Revisions.
manager_id: Uuid, // user.id manager_id: Uuid, // user.id
maintainer_id: Uuid, // user.id maintainer_id: Uuid, // user.id
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
created_at: DateTime, created_at: DateTime,
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
updated_at: DateTime, updated_at: DateTime,
version: u64, version: u64,
edition: u32, edition: u32,
@ -46,8 +46,6 @@ impl Model for Collection {
fn new() -> Self { fn new() -> Self {
Self { Self {
id: Uuid::new_v4(), id: Uuid::new_v4(),
created_at: DateTime::now(),
updated_at: DateTime::now(),
..Self::default() ..Self::default()
} }
} }

View File

@ -23,9 +23,7 @@ pub struct Dataset {
// Info fields. // Info fields.
project_id: Uuid, // group.id, group.namespace = "*:project", group.subject = "user" project_id: Uuid, // group.id, group.namespace = "*:project", group.subject = "user"
task_id: Option<Uuid>, // task.id task_id: Option<Uuid>, // task.id
#[schema(default = "now")]
valid_from: DateTime, valid_from: DateTime,
#[schema(default = "now")]
expires_at: DateTime, expires_at: DateTime,
#[schema(index = "gin")] #[schema(index = "gin")]
tags: Vec<Uuid>, // tag.id, tag.namespace = "*:dataset" tags: Vec<Uuid>, // tag.id, tag.namespace = "*:dataset"
@ -38,9 +36,9 @@ pub struct Dataset {
// Revisions. // Revisions.
manager_id: Uuid, // user.id manager_id: Uuid, // user.id
maintainer_id: Uuid, // user.id maintainer_id: Uuid, // user.id
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
created_at: DateTime, created_at: DateTime,
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
updated_at: DateTime, updated_at: DateTime,
version: u64, version: u64,
edition: u32, edition: u32,
@ -50,10 +48,6 @@ impl Model for Dataset {
fn new() -> Self { fn new() -> Self {
Self { Self {
id: Uuid::new_v4(), id: Uuid::new_v4(),
valid_from: DateTime::now(),
expires_at: DateTime::now(),
created_at: DateTime::now(),
updated_at: DateTime::now(),
..Self::default() ..Self::default()
} }
} }

View File

@ -37,9 +37,9 @@ pub struct Group {
// Revisions. // Revisions.
manager_id: Uuid, // user.id manager_id: Uuid, // user.id
maintainer_id: Uuid, // user.id maintainer_id: Uuid, // user.id
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
created_at: DateTime, created_at: DateTime,
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
updated_at: DateTime, updated_at: DateTime,
version: u64, version: u64,
edition: u32, edition: u32,
@ -49,8 +49,6 @@ impl Model for Group {
fn new() -> Self { fn new() -> Self {
Self { Self {
id: Uuid::new_v4(), id: Uuid::new_v4(),
created_at: DateTime::now(),
updated_at: DateTime::now(),
..Self::default() ..Self::default()
} }
} }

View File

@ -30,7 +30,7 @@ pub struct Log {
#[schema(index = "text")] #[schema(index = "text")]
message: String, message: String,
source: String, source: String,
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
recorded_at: DateTime, recorded_at: DateTime,
// Extensions. // Extensions.
@ -43,7 +43,6 @@ impl Model for Log {
fn new() -> Self { fn new() -> Self {
Self { Self {
id: Uuid::new_v4(), id: Uuid::new_v4(),
recorded_at: DateTime::now(),
..Self::default() ..Self::default()
} }
} }

View File

@ -37,9 +37,9 @@ pub struct Message {
// Revisions. // Revisions.
manager_id: Uuid, // user.id manager_id: Uuid, // user.id
maintainer_id: Uuid, // user.id maintainer_id: Uuid, // user.id
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
created_at: DateTime, created_at: DateTime,
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
updated_at: DateTime, updated_at: DateTime,
version: u64, version: u64,
edition: u32, edition: u32,
@ -49,8 +49,6 @@ impl Model for Message {
fn new() -> Self { fn new() -> Self {
Self { Self {
id: Uuid::new_v4(), id: Uuid::new_v4(),
created_at: DateTime::now(),
updated_at: DateTime::now(),
..Self::default() ..Self::default()
} }
} }

View File

@ -38,9 +38,9 @@ pub struct Order {
// Revisions. // Revisions.
manager_id: Uuid, // user.id manager_id: Uuid, // user.id
maintainer_id: Uuid, // user.id maintainer_id: Uuid, // user.id
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
created_at: DateTime, created_at: DateTime,
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
updated_at: DateTime, updated_at: DateTime,
version: u64, version: u64,
edition: u32, edition: u32,
@ -50,8 +50,6 @@ impl Model for Order {
fn new() -> Self { fn new() -> Self {
Self { Self {
id: Uuid::new_v4(), id: Uuid::new_v4(),
created_at: DateTime::now(),
updated_at: DateTime::now(),
..Self::default() ..Self::default()
} }
} }

View File

@ -26,9 +26,7 @@ pub struct Policy {
resource: String, resource: String,
actions: Vec<String>, actions: Vec<String>,
effect: String, effect: String,
#[schema(default = "now")]
valid_from: DateTime, valid_from: DateTime,
#[schema(default = "now")]
expires_at: DateTime, expires_at: DateTime,
#[schema(index = "gin")] #[schema(index = "gin")]
tags: Vec<Uuid>, // tag.id, tag.namespace = "*:policy" tags: Vec<Uuid>, // tag.id, tag.namespace = "*:policy"
@ -41,9 +39,9 @@ pub struct Policy {
// Revisions. // Revisions.
manager_id: Uuid, // user.id manager_id: Uuid, // user.id
maintainer_id: Uuid, // user.id maintainer_id: Uuid, // user.id
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
created_at: DateTime, created_at: DateTime,
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
updated_at: DateTime, updated_at: DateTime,
version: u64, version: u64,
edition: u32, edition: u32,
@ -53,8 +51,6 @@ impl Model for Policy {
fn new() -> Self { fn new() -> Self {
Self { Self {
id: Uuid::new_v4(), id: Uuid::new_v4(),
created_at: DateTime::now(),
updated_at: DateTime::now(),
..Self::default() ..Self::default()
} }
} }

View File

@ -23,7 +23,7 @@ pub struct Record {
// Info fields. // Info fields.
integrity: String, integrity: String,
signature: String, signature: String,
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
recorded_at: DateTime, recorded_at: DateTime,
// Extensions. // Extensions.
@ -36,7 +36,6 @@ impl Model for Record {
fn new() -> Self { fn new() -> Self {
Self { Self {
id: Uuid::new_v4(), id: Uuid::new_v4(),
recorded_at: DateTime::now(),
..Self::default() ..Self::default()
} }
} }

View File

@ -38,9 +38,9 @@ pub struct Resource {
// Revisions. // Revisions.
manager_id: Uuid, // user.id manager_id: Uuid, // user.id
maintainer_id: Uuid, // user.id maintainer_id: Uuid, // user.id
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
created_at: DateTime, created_at: DateTime,
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
updated_at: DateTime, updated_at: DateTime,
version: u64, version: u64,
edition: u32, edition: u32,
@ -50,8 +50,6 @@ impl Model for Resource {
fn new() -> Self { fn new() -> Self {
Self { Self {
id: Uuid::new_v4(), id: Uuid::new_v4(),
created_at: DateTime::now(),
updated_at: DateTime::now(),
..Self::default() ..Self::default()
} }
} }

View File

@ -32,9 +32,9 @@ pub struct Source {
// Revisions. // Revisions.
manager_id: Uuid, // user.id manager_id: Uuid, // user.id
maintainer_id: Uuid, // user.id maintainer_id: Uuid, // user.id
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
created_at: DateTime, created_at: DateTime,
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
updated_at: DateTime, updated_at: DateTime,
version: u64, version: u64,
edition: u32, edition: u32,
@ -44,8 +44,6 @@ impl Model for Source {
fn new() -> Self { fn new() -> Self {
Self { Self {
id: Uuid::new_v4(), id: Uuid::new_v4(),
created_at: DateTime::now(),
updated_at: DateTime::now(),
..Self::default() ..Self::default()
} }
} }

View File

@ -33,9 +33,9 @@ pub struct Tag {
// Revisions. // Revisions.
manager_id: Uuid, // user.id manager_id: Uuid, // user.id
maintainer_id: Uuid, // user.id maintainer_id: Uuid, // user.id
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
created_at: DateTime, created_at: DateTime,
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
updated_at: DateTime, updated_at: DateTime,
version: u64, version: u64,
edition: u32, edition: u32,
@ -45,8 +45,6 @@ impl Model for Tag {
fn new() -> Self { fn new() -> Self {
Self { Self {
id: Uuid::new_v4(), id: Uuid::new_v4(),
created_at: DateTime::now(),
updated_at: DateTime::now(),
..Self::default() ..Self::default()
} }
} }

View File

@ -26,14 +26,10 @@ pub struct Task {
output_id: Option<Uuid>, // source.id output_id: Option<Uuid>, // source.id
#[schema(index = "gin")] #[schema(index = "gin")]
dependencies: Vec<Uuid>, // task.id dependencies: Vec<Uuid>, // task.id
#[schema(default = "now")]
valid_from: DateTime, valid_from: DateTime,
#[schema(default = "now")]
expires_at: DateTime, expires_at: DateTime,
schedule: String, schedule: String,
#[schema(default = "epoch")]
last_time: DateTime, last_time: DateTime,
#[schema(default = "epoch")]
next_time: DateTime, next_time: DateTime,
priority: u16, priority: u16,
#[schema(index = "gin")] #[schema(index = "gin")]
@ -47,9 +43,9 @@ pub struct Task {
// Revisions. // Revisions.
manager_id: Uuid, // user.id manager_id: Uuid, // user.id
maintainer_id: Uuid, // user.id maintainer_id: Uuid, // user.id
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
created_at: DateTime, created_at: DateTime,
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
updated_at: DateTime, updated_at: DateTime,
version: u64, version: u64,
edition: u32, edition: u32,
@ -59,10 +55,6 @@ impl Model for Task {
fn new() -> Self { fn new() -> Self {
Self { Self {
id: Uuid::new_v4(), id: Uuid::new_v4(),
valid_from: DateTime::now(),
expires_at: DateTime::now(),
created_at: DateTime::now(),
updated_at: DateTime::now(),
..Self::default() ..Self::default()
} }
} }

View File

@ -42,9 +42,9 @@ pub struct User {
// Revisions. // Revisions.
manager_id: Uuid, // user.id manager_id: Uuid, // user.id
maintainer_id: Uuid, // user.id maintainer_id: Uuid, // user.id
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
created_at: DateTime, created_at: DateTime,
#[schema(default = "now", index = "btree")] #[schema(index = "btree")]
updated_at: DateTime, updated_at: DateTime,
version: u64, version: u64,
edition: u32, edition: u32,
@ -55,8 +55,6 @@ impl Model for User {
Self { Self {
id: Uuid::new_v4(), id: Uuid::new_v4(),
access_key_id: AccessKeyId::new().to_string(), access_key_id: AccessKeyId::new().to_string(),
created_at: DateTime::now(),
updated_at: DateTime::now(),
..Self::default() ..Self::default()
} }
} }

View File

@ -26,7 +26,7 @@ serde = { version = "1.0.152", features = ["derive"] }
serde_json = { version = "1.0.91" } serde_json = { version = "1.0.91" }
serde_qs = { version = "0.10.1" } serde_qs = { version = "0.10.1" }
serde_urlencoded = { version = "0.7.1" } serde_urlencoded = { version = "0.7.1" }
tokio = { version = "1.23.0", features = ["rt-multi-thread", "sync"], optional = true } tokio = { version = "1.24.1", features = ["rt-multi-thread", "sync"], optional = true }
tokio-stream = { version = "0.1.11", features = ["sync"], optional = true } tokio-stream = { version = "0.1.11", features = ["sync"], optional = true }
toml = { version = "0.5.10" } toml = { version = "0.5.10" }
tower = { version = "0.4.13", features = ["timeout"], optional = true } tower = { version = "0.4.13", features = ["timeout"], optional = true }

View File

@ -13,6 +13,7 @@ use std::{
net::SocketAddr, net::SocketAddr,
path::Path, path::Path,
sync::{Arc, LazyLock}, sync::{Arc, LazyLock},
thread,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::runtime::Builder; use tokio::runtime::Builder;
@ -25,7 +26,7 @@ use tower_http::{
compression::CompressionLayer, compression::CompressionLayer,
services::{ServeDir, ServeFile}, services::{ServeDir, ServeFile},
}; };
use zino_core::{Application, Response, State}; use zino_core::{Application, AsyncCronJob, Job, JobScheduler, Response, State};
/// An HTTP server cluster for `axum`. /// An HTTP server cluster for `axum`.
pub struct AxumCluster { pub struct AxumCluster {
@ -37,7 +38,7 @@ pub struct AxumCluster {
impl Application for AxumCluster { impl Application for AxumCluster {
/// Router. /// Router.
type Router = HashMap<&'static str, Router>; type Router = Router;
/// Creates a new application. /// Creates a new application.
fn new() -> Self { fn new() -> Self {
@ -47,20 +48,33 @@ impl Application for AxumCluster {
} }
} }
/// Registers the router.
fn register(mut self, routes: Self::Router) -> Self {
self.routes = routes;
self
}
/// Returns the start time. /// Returns the start time.
#[inline] #[inline]
fn start_time(&self) -> Instant { fn start_time(&self) -> Instant {
self.start_time self.start_time
} }
/// Registers routes.
fn register(mut self, routes: HashMap<&'static str, Self::Router>) -> Self {
self.routes = routes;
self
}
/// Runs the application. /// Runs the application.
fn run(self) -> io::Result<()> { fn run(self, async_jobs: HashMap<&'static str, AsyncCronJob>) -> io::Result<()> {
let mut scheduler = JobScheduler::new();
for (cron_expr, exec) in async_jobs {
scheduler.add(Job::new_async(cron_expr, exec));
}
let runtime = Builder::new_multi_thread().enable_all().build()?;
runtime.spawn(async move {
loop {
scheduler.tick_async().await;
thread::sleep(scheduler.time_till_next_job());
}
});
let current_dir = env::current_dir().unwrap(); let current_dir = env::current_dir().unwrap();
let project_dir = Path::new(&current_dir); let project_dir = Path::new(&current_dir);
let public_dir = project_dir.join("./public"); let public_dir = project_dir.join("./public");
@ -86,76 +100,73 @@ impl Application for AxumCluster {
) )
}, },
); );
Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(async {
let routes = self.routes;
let shared_state = State::shared();
let app_env = shared_state.env();
tracing::info!("load config.{app_env}.toml");
let listeners = shared_state.listeners(); runtime.block_on(async {
let servers = listeners.iter().map(|listener| { let routes = self.routes;
let mut app = Router::new() let shared_state = State::shared();
.route_service("/", serve_file_service.clone()) let app_env = shared_state.env();
.nest_service("/public", serve_dir_service.clone()) tracing::info!("load config.{app_env}.toml");
.route("/sse", routing::get(crate::endpoint::axum_sse::sse_handler))
.route(
"/websocket",
routing::get(crate::endpoint::axum_websocket::websocket_handler),
);
for (path, route) in &routes {
app = app.nest(path, route.clone());
}
let state = Arc::new(State::default()); let listeners = shared_state.listeners();
app = app let servers = listeners.iter().map(|listener| {
.fallback_service(tower::service_fn(|_| async { let mut app = Router::new()
let res = Response::new(StatusCode::NOT_FOUND); .route_service("/", serve_file_service.clone())
Ok::<http::Response<Full<Bytes>>, Infallible>(res.into()) .nest_service("/public", serve_dir_service.clone())
})) .route("/sse", routing::get(crate::endpoint::axum_sse::sse_handler))
.layer( .route(
ServiceBuilder::new() "/websocket",
.layer(LazyLock::force( routing::get(crate::endpoint::axum_websocket::websocket_handler),
&crate::middleware::tower_tracing::TRACING_MIDDLEWARE, );
)) for (path, route) in &routes {
.layer(LazyLock::force( app = app.nest(path, route.clone());
&crate::middleware::tower_cors::CORS_MIDDLEWARE,
))
.layer(middleware::from_fn(
crate::middleware::axum_context::request_context,
))
.layer(DefaultBodyLimit::disable())
.layer(AddExtensionLayer::new(state))
.layer(CompressionLayer::new())
.layer(HandleErrorLayer::new(|err: BoxError| async move {
let status_code = if err.is::<Elapsed>() {
StatusCode::REQUEST_TIMEOUT
} else if err.is::<LengthLimitError>() {
StatusCode::PAYLOAD_TOO_LARGE
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
let res = Response::new(status_code);
Ok::<http::Response<Full<Bytes>>, Infallible>(res.into())
}))
.layer(TimeoutLayer::new(Duration::from_secs(10))),
);
let addr = listener
.parse()
.inspect(|addr| tracing::info!(env = app_env, "listen on {addr}"))
.unwrap_or_else(|_| panic!("invalid socket address: {listener}"));
Server::bind(&addr)
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
});
for result in future::join_all(servers).await {
if let Err(err) = result {
tracing::error!("server error: {err}");
}
} }
let state = Arc::new(State::default());
app = app
.fallback_service(tower::service_fn(|_| async {
let res = Response::new(StatusCode::NOT_FOUND);
Ok::<http::Response<Full<Bytes>>, Infallible>(res.into())
}))
.layer(
ServiceBuilder::new()
.layer(LazyLock::force(
&crate::middleware::tower_tracing::TRACING_MIDDLEWARE,
))
.layer(LazyLock::force(
&crate::middleware::tower_cors::CORS_MIDDLEWARE,
))
.layer(middleware::from_fn(
crate::middleware::axum_context::request_context,
))
.layer(DefaultBodyLimit::disable())
.layer(AddExtensionLayer::new(state))
.layer(CompressionLayer::new())
.layer(HandleErrorLayer::new(|err: BoxError| async move {
let status_code = if err.is::<Elapsed>() {
StatusCode::REQUEST_TIMEOUT
} else if err.is::<LengthLimitError>() {
StatusCode::PAYLOAD_TOO_LARGE
} else {
StatusCode::INTERNAL_SERVER_ERROR
};
let res = Response::new(status_code);
Ok::<http::Response<Full<Bytes>>, Infallible>(res.into())
}))
.layer(TimeoutLayer::new(Duration::from_secs(10))),
);
let addr = listener
.parse()
.inspect(|addr| tracing::info!(env = app_env, "listen on {addr}"))
.unwrap_or_else(|_| panic!("invalid socket address: {listener}"));
Server::bind(&addr).serve(app.into_make_service_with_connect_info::<SocketAddr>())
}); });
for result in future::join_all(servers).await {
if let Err(err) = result {
tracing::error!("server error: {err}");
}
}
});
Ok(()) Ok(())
} }
} }