Add Store::set_deadline
This implements execution deadlines with wasmtime's "epoch interruption" feature. Signed-off-by: Lann Martin <lann.martin@fermyon.com>
This commit is contained in:
parent
767509e6de
commit
0f573730ca
|
@ -4135,6 +4135,7 @@ version = "0.6.0"
|
|||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"crossbeam-channel",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tracing",
|
||||
|
|
|
@ -6,8 +6,9 @@ edition = { workspace = true }
|
|||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
tracing = "0.1"
|
||||
async-trait = "0.1"
|
||||
crossbeam-channel = "0.5"
|
||||
tracing = "0.1"
|
||||
wasi-cap-std-sync = { workspace = true }
|
||||
wasi-common = { workspace = true }
|
||||
wasmtime = { workspace = true }
|
||||
|
|
|
@ -12,13 +12,16 @@ mod io;
|
|||
mod limits;
|
||||
mod store;
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{
|
||||
sync::{Arc, Mutex},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
use crossbeam_channel::Sender;
|
||||
use tracing::instrument;
|
||||
use wasmtime_wasi::WasiCtx;
|
||||
|
||||
pub use wasmtime::{self, Instance, Module, Trap};
|
||||
use wasmtime_wasi::WasiCtx;
|
||||
|
||||
use self::host_component::{HostComponents, HostComponentsBuilder};
|
||||
|
||||
|
@ -26,6 +29,9 @@ pub use host_component::{HostComponent, HostComponentDataHandle, HostComponentsD
|
|||
pub use io::OutputBuffer;
|
||||
pub use store::{Store, StoreBuilder};
|
||||
|
||||
/// The default [`Config::epoch_tick_interval`].
|
||||
pub const DEFAULT_EPOCH_TICK_INTERVAL: Duration = Duration::from_millis(10);
|
||||
|
||||
/// Global configuration for `EngineBuilder`.
|
||||
///
|
||||
/// This is currently only used for advanced (undocumented) use cases.
|
||||
|
@ -46,6 +52,7 @@ impl Default for Config {
|
|||
fn default() -> Self {
|
||||
let mut inner = wasmtime::Config::new();
|
||||
inner.async_support(true);
|
||||
inner.epoch_interruption(true);
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
@ -80,6 +87,8 @@ pub struct EngineBuilder<T> {
|
|||
engine: wasmtime::Engine,
|
||||
linker: Linker<T>,
|
||||
host_components_builder: HostComponentsBuilder,
|
||||
epoch_tick_interval: Duration,
|
||||
epoch_ticker_thread: bool,
|
||||
}
|
||||
|
||||
impl<T: Send + Sync> EngineBuilder<T> {
|
||||
|
@ -93,6 +102,8 @@ impl<T: Send + Sync> EngineBuilder<T> {
|
|||
engine,
|
||||
linker,
|
||||
host_components_builder: HostComponents::builder(),
|
||||
epoch_tick_interval: DEFAULT_EPOCH_TICK_INTERVAL,
|
||||
epoch_ticker_thread: true,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -128,16 +139,58 @@ impl<T: Send + Sync> EngineBuilder<T> {
|
|||
.add_host_component(&mut self.linker, host_component)
|
||||
}
|
||||
|
||||
/// Sets the epoch tick internal for the built [`Engine`].
|
||||
///
|
||||
/// This is used by [`Store::set_deadline`] to calculate the number of
|
||||
/// "ticks" for epoch interruption, and by the default epoch ticker thread.
|
||||
/// The default is [`DEFAULT_EPOCH_TICK_INTERVAL`].
|
||||
///
|
||||
/// See [`EngineBuilder::epoch_ticker_thread`] and
|
||||
/// [`wasmtime::Config::epoch_interruption`](https://docs.rs/wasmtime/latest/wasmtime/struct.Config.html#method.epoch_interruption).
|
||||
pub fn epoch_tick_interval(&mut self, interval: Duration) {
|
||||
self.epoch_tick_interval = interval;
|
||||
}
|
||||
|
||||
/// Configures whether the epoch ticker thread will be spawned when this
|
||||
/// [`Engine`] is built.
|
||||
///
|
||||
/// Enabled by default; if disabled, the user must arrange to call
|
||||
/// `engine.as_ref().increment_epoch()` every `epoch_tick_interval` or
|
||||
/// interrupt-based features like `Store::set_deadline` will not work.
|
||||
pub fn epoch_ticker_thread(&mut self, enable: bool) {
|
||||
self.epoch_ticker_thread = enable;
|
||||
}
|
||||
|
||||
fn maybe_spawn_epoch_ticker(&self) -> Option<Sender<()>> {
|
||||
if !self.epoch_ticker_thread {
|
||||
return None;
|
||||
}
|
||||
let engine = self.engine.clone();
|
||||
let interval = self.epoch_tick_interval;
|
||||
let (send, recv) = crossbeam_channel::bounded(0);
|
||||
std::thread::spawn(move || loop {
|
||||
match recv.recv_timeout(interval) {
|
||||
Err(crossbeam_channel::RecvTimeoutError::Timeout) => (),
|
||||
Err(crossbeam_channel::RecvTimeoutError::Disconnected) => break,
|
||||
res => panic!("unexpected epoch_ticker_signal: {res:?}"),
|
||||
}
|
||||
engine.increment_epoch();
|
||||
});
|
||||
Some(send)
|
||||
}
|
||||
|
||||
/// Builds an [`Engine`] from this builder with the given host state data.
|
||||
///
|
||||
/// Note that this data will generally go entirely unused, but is needed
|
||||
/// by the implementation of [`Engine::instantiate_pre`]. If `T: Default`,
|
||||
/// it is probably preferable to use [`EngineBuilder::build`].
|
||||
pub fn build_with_data(self, instance_pre_data: T) -> Engine<T> {
|
||||
let epoch_ticker_signal = self.maybe_spawn_epoch_ticker();
|
||||
|
||||
let host_components = self.host_components_builder.build();
|
||||
|
||||
let instance_pre_store = Arc::new(Mutex::new(
|
||||
StoreBuilder::new(self.engine.clone(), &host_components)
|
||||
StoreBuilder::new(self.engine.clone(), Duration::ZERO, &host_components)
|
||||
.build_with_data(instance_pre_data)
|
||||
.expect("instance_pre_store build should not fail"),
|
||||
));
|
||||
|
@ -147,6 +200,8 @@ impl<T: Send + Sync> EngineBuilder<T> {
|
|||
linker: self.linker,
|
||||
host_components,
|
||||
instance_pre_store,
|
||||
epoch_tick_interval: self.epoch_tick_interval,
|
||||
_epoch_ticker_signal: epoch_ticker_signal,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -165,6 +220,9 @@ pub struct Engine<T> {
|
|||
linker: Linker<T>,
|
||||
host_components: HostComponents,
|
||||
instance_pre_store: Arc<Mutex<Store<T>>>,
|
||||
epoch_tick_interval: Duration,
|
||||
// Matching receiver closes on drop
|
||||
_epoch_ticker_signal: Option<Sender<()>>,
|
||||
}
|
||||
|
||||
impl<T: Send + Sync> Engine<T> {
|
||||
|
@ -175,7 +233,11 @@ impl<T: Send + Sync> Engine<T> {
|
|||
|
||||
/// Creates a new [`StoreBuilder`].
|
||||
pub fn store_builder(&self) -> StoreBuilder {
|
||||
StoreBuilder::new(self.inner.clone(), &self.host_components)
|
||||
StoreBuilder::new(
|
||||
self.inner.clone(),
|
||||
self.epoch_tick_interval,
|
||||
&self.host_components,
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates a new [`InstancePre`] for the given [`Module`].
|
||||
|
|
|
@ -2,6 +2,7 @@ use anyhow::{anyhow, Result};
|
|||
use std::{
|
||||
io::{Read, Write},
|
||||
path::{Path, PathBuf},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use wasi_cap_std_sync::{ambient_authority, Dir};
|
||||
use wasi_common::{dir::DirCaps, pipe::WritePipe, WasiFile};
|
||||
|
@ -24,6 +25,7 @@ use super::{
|
|||
/// A `Store` can be built with a [`StoreBuilder`].
|
||||
pub struct Store<T> {
|
||||
inner: wasmtime::Store<Data<T>>,
|
||||
epoch_tick_interval: Duration,
|
||||
}
|
||||
|
||||
impl<T> Store<T> {
|
||||
|
@ -31,6 +33,27 @@ impl<T> Store<T> {
|
|||
pub fn host_components_data(&mut self) -> &mut HostComponentsData {
|
||||
&mut self.inner.data_mut().host_components_data
|
||||
}
|
||||
|
||||
/// Sets the execution deadline.
|
||||
///
|
||||
/// This is a rough deadline; an instance will trap some time after this
|
||||
/// deadline, determined by [`EngineBuilder::epoch_tick_interval`] and
|
||||
/// details of the system's thread scheduler.
|
||||
///
|
||||
/// See [`wasmtime::Store::set_epoch_deadline`](https://docs.rs/wasmtime/latest/wasmtime/struct.Store.html#method.set_epoch_deadline).
|
||||
pub fn set_deadline(&mut self, deadline: Instant) {
|
||||
let now = Instant::now();
|
||||
let duration = deadline - now;
|
||||
let ticks = if duration.is_zero() {
|
||||
tracing::warn!("Execution deadline set in past: {deadline:?} < {now:?}");
|
||||
0
|
||||
} else {
|
||||
let ticks = duration.as_micros() / self.epoch_tick_interval.as_micros();
|
||||
let ticks = ticks.min(u64::MAX as u128) as u64;
|
||||
ticks + 1 // Add one to allow for current partially-completed tick
|
||||
};
|
||||
self.inner.set_epoch_deadline(ticks);
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AsRef<wasmtime::Store<Data<T>>> for Store<T> {
|
||||
|
@ -82,6 +105,7 @@ const READ_ONLY_FILE_CAPS: FileCaps = FileCaps::from_bits_truncate(
|
|||
/// A new [`StoreBuilder`] can be obtained with [`crate::Engine::store_builder`].
|
||||
pub struct StoreBuilder {
|
||||
engine: wasmtime::Engine,
|
||||
epoch_tick_interval: Duration,
|
||||
wasi: std::result::Result<Option<WasiCtxBuilder>, String>,
|
||||
read_only_preopened_dirs: Vec<(Dir, PathBuf)>,
|
||||
host_components_data: HostComponentsData,
|
||||
|
@ -90,9 +114,14 @@ pub struct StoreBuilder {
|
|||
|
||||
impl StoreBuilder {
|
||||
// Called by Engine::store_builder.
|
||||
pub(crate) fn new(engine: wasmtime::Engine, host_components: &HostComponents) -> Self {
|
||||
pub(crate) fn new(
|
||||
engine: wasmtime::Engine,
|
||||
epoch_tick_interval: Duration,
|
||||
host_components: &HostComponents,
|
||||
) -> Self {
|
||||
Self {
|
||||
engine,
|
||||
epoch_tick_interval,
|
||||
wasi: Ok(Some(WasiCtxBuilder::new())),
|
||||
read_only_preopened_dirs: Vec::new(),
|
||||
host_components_data: host_components.new_data(),
|
||||
|
@ -243,8 +272,19 @@ impl StoreBuilder {
|
|||
store_limits: self.store_limits,
|
||||
},
|
||||
);
|
||||
|
||||
inner.limiter_async(move |data| &mut data.store_limits);
|
||||
Ok(Store { inner })
|
||||
|
||||
// With epoch interruption enabled, there must be _some_ deadline set
|
||||
// or execution will trap immediately. Since this is a delta, we need
|
||||
// to avoid overflow so we'll use 2^63 which is still "practically
|
||||
// forever" for any plausible tick interval.
|
||||
inner.set_epoch_deadline(u64::MAX / 2);
|
||||
|
||||
Ok(Store {
|
||||
inner,
|
||||
epoch_tick_interval: self.epoch_tick_interval,
|
||||
})
|
||||
}
|
||||
|
||||
/// Builds a [`Store`] from this builder with `Default` host state data.
|
||||
|
|
|
@ -3,6 +3,8 @@
|
|||
//! failure (which is sometimes expected in a test), and some other code on
|
||||
//! invalid argument(s).
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
#[link(wasm_import_module = "multiplier")]
|
||||
extern "C" {
|
||||
fn multiply(n: i32) -> i32;
|
||||
|
@ -48,6 +50,12 @@ fn main() -> Result {
|
|||
let output = unsafe { multiply(input) };
|
||||
println!("{output}");
|
||||
}
|
||||
"sleep" => {
|
||||
let duration =
|
||||
Duration::from_millis(args.next().expect("duration_ms").parse().expect("u64"));
|
||||
eprintln!("sleep {duration:?}");
|
||||
std::thread::sleep(duration);
|
||||
}
|
||||
"panic" => {
|
||||
eprintln!("panic");
|
||||
panic!("intentional panic");
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
use std::{io::Cursor, path::PathBuf};
|
||||
use std::{
|
||||
io::Cursor,
|
||||
path::PathBuf,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use spin_core::{Config, Engine, HostComponent, Module, StoreBuilder, Trap};
|
||||
use spin_core::{Config, Engine, HostComponent, Module, Store, StoreBuilder, Trap};
|
||||
use tempfile::TempDir;
|
||||
use wasmtime::TrapCode;
|
||||
|
||||
|
@ -88,6 +92,36 @@ async fn test_max_memory_size_violated() {
|
|||
assert_eq!(trap.i32_exit_status(), Some(1));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_set_deadline_obeyed() {
|
||||
run_core_wasi_test_engine(
|
||||
&test_engine(),
|
||||
["sleep", "20"],
|
||||
|_| {},
|
||||
|store| {
|
||||
store.set_deadline(Instant::now() + Duration::from_millis(1000));
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_set_deadline_violated() {
|
||||
let err = run_core_wasi_test_engine(
|
||||
&test_engine(),
|
||||
["sleep", "100"],
|
||||
|_| {},
|
||||
|store| {
|
||||
store.set_deadline(Instant::now() + Duration::from_millis(10));
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
let trap = err.downcast::<Trap>().expect("trap");
|
||||
assert_eq!(trap.trap_code(), Some(TrapCode::Interrupt));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_host_component() {
|
||||
let stdout = run_core_wasi_test(["multiply", "5"], |_| {}).await.unwrap();
|
||||
|
@ -103,11 +137,16 @@ async fn test_host_component_data_update() {
|
|||
.unwrap();
|
||||
let engine: Engine<()> = engine_builder.build();
|
||||
|
||||
let stdout = run_core_wasi_test_engine(&engine, ["multiply", "5"], |store_builder| {
|
||||
store_builder
|
||||
.host_components_data()
|
||||
.set(factor_data_handle, 100);
|
||||
})
|
||||
let stdout = run_core_wasi_test_engine(
|
||||
&engine,
|
||||
["multiply", "5"],
|
||||
|store_builder| {
|
||||
store_builder
|
||||
.host_components_data()
|
||||
.set(factor_data_handle, 100);
|
||||
},
|
||||
|_| {},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(stdout, "500");
|
||||
|
@ -129,29 +168,31 @@ fn test_config() -> Config {
|
|||
config
|
||||
}
|
||||
|
||||
fn test_engine() -> Engine<()> {
|
||||
let mut builder = Engine::builder(&test_config()).unwrap();
|
||||
builder.add_host_component(MultiplierHostComponent).unwrap();
|
||||
builder.build()
|
||||
}
|
||||
|
||||
async fn run_core_wasi_test<'a>(
|
||||
args: impl IntoIterator<Item = &'a str>,
|
||||
f: impl FnOnce(&mut StoreBuilder),
|
||||
) -> anyhow::Result<String> {
|
||||
let mut engine_builder = Engine::builder(&test_config()).unwrap();
|
||||
engine_builder
|
||||
.add_host_component(MultiplierHostComponent)
|
||||
.unwrap();
|
||||
let engine: Engine<()> = engine_builder.build();
|
||||
run_core_wasi_test_engine(&engine, args, f).await
|
||||
run_core_wasi_test_engine(&test_engine(), args, f, |_| {}).await
|
||||
}
|
||||
|
||||
async fn run_core_wasi_test_engine<'a>(
|
||||
engine: &Engine<()>,
|
||||
args: impl IntoIterator<Item = &'a str>,
|
||||
f: impl FnOnce(&mut StoreBuilder),
|
||||
update_store_builder: impl FnOnce(&mut StoreBuilder),
|
||||
update_store: impl FnOnce(&mut Store<()>),
|
||||
) -> anyhow::Result<String> {
|
||||
let mut store_builder: StoreBuilder = engine.store_builder();
|
||||
let mut stdout_buf = store_builder.stdout_buffered();
|
||||
store_builder.stderr_pipe(TestWriter);
|
||||
store_builder.args(args).unwrap();
|
||||
|
||||
f(&mut store_builder);
|
||||
update_store_builder(&mut store_builder);
|
||||
|
||||
let mut store = store_builder.build().unwrap();
|
||||
let module_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
||||
|
@ -161,6 +202,8 @@ async fn run_core_wasi_test_engine<'a>(
|
|||
let instance = instance_pre.instantiate_async(&mut store).await.unwrap();
|
||||
let func = instance.get_func(&mut store, "_start").unwrap();
|
||||
|
||||
update_store(&mut store);
|
||||
|
||||
func.call_async(&mut store, &[], &mut []).await?;
|
||||
|
||||
let stdout = String::from_utf8(stdout_buf.take())?.trim_end().into();
|
||||
|
|
Loading…
Reference in New Issue