Add prototype for distributing Spin applications using OCI

This commit adds experimental support for distributing Spin applications
using OCI registries.

Specifically, it uses the OCI Artifacts specification
(https://github.com/opencontainers/artifacts) to package and distribute Spin
applications.

This PR implements `spin oci push`, `spin oci pull`, and `spin oci run`
to interact with a supporting container registry - for example:

```bash
$ spin oci push ghcr.io/<username>/my-spin-application:v1
INFO spin_publish::oci::client: Pushed "https://ghcr.io/v2/<username>/my-spin-application/manifests/sha256:9f4e7eebb27c0174fe6654ef5e0f908f1edc8f625324a9f49967ccde44a6516b"
$ spin oci pull ghcr.io/<username>/my-spin-application:v1
INFO spin_publish::oci::client: Pulled ghcr.io/<username>/my-spin-application:v1@sha256:9f4e7eebb27c0174fe6654ef5e0f908f1edc8f625324a9f49967ccde44a6516b
$ spin oci run ghcr.io/<username>/my-spin-application:v1
INFO spin_publish::oci::client: Pulled ghcr.io/<username>/my-spin-application:v1@sha256:9f4e7eebb27c0174fe6654ef5e0f908f1edc8f625324a9f49967ccde44a6516b
```

Following the SIP (https://github.com/fermyon/spin/pull/1033), this PR
defines a new `config.mediaType` for a Spin application,
`application/vnd.fermyon.spin.application.v1+config`, and two media
types for the potential content that can be found in such an artifact:
`application/vnd.wasm.content.layer.v1+wasm` for a Wasm module, and
`application/vnd.wasm.content.layer.v1+data` for a static file.
(Note that the media types *can* change in a future iteration of this
experimental work if a community consensus on the media type used to
represent Wasm is reached.)
Following the SIP, this PR distributes the Spin lockfile for a given
application as the OCI configuration object.

This PR also introduces a global cache for layers and
manifests pulled from the registry. This is a content addressable cache,
and its use will be extended in the future for Wasm modules pulled from
HTTP sources.

Currently, `spin oci pull` (or `spin oci run`) will always make at least
an initial request to the registry to fetch the manifest from the
registry. After the manifest is fetched, already pulled layers will not
be pulled again.
In a future commit, the behavior of the initial manifest fetch
regardless of its existence in the cache will be controllable by a flag.

Signed-off-by: Radu Matei <radu.matei@fermyon.com>
This commit is contained in:
Radu Matei 2022-12-20 20:18:57 +00:00
parent 4954c34dee
commit b63e513fc9
No known key found for this signature in database
GPG Key ID: 53A4E7168B7782C2
27 changed files with 1494 additions and 553 deletions

2
.gitignore vendored
View File

@ -1,6 +1,6 @@
target
.husky
cache
# cache
node_modules
ignored-assets
main.wasm

1098
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,7 @@ rust-version = "1.64"
[workspace.package]
version = "0.7.1"
authors = [ "Fermyon Engineering <engineering@fermyon.com>" ]
authors = ["Fermyon Engineering <engineering@fermyon.com>"]
edition = "2021"
[dependencies]
@ -39,9 +39,10 @@ regex = "1.5.5"
reqwest = { version = "0.11", features = ["stream"] }
rpassword = "7.0"
semver = "1.0"
serde = { version = "1.0", features = [ "derive" ] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.82"
sha2 = "0.10.2"
spin-app = { path = "crates/app" }
spin-build = { path = "crates/build" }
spin-config = { path = "crates/config" }
spin-http = { path = "crates/http" }
@ -53,10 +54,10 @@ spin-redis-engine = { path = "crates/redis" }
spin-templates = { path = "crates/templates" }
spin-trigger = { path = "crates/trigger" }
tempfile = "3.3.0"
tokio = { version = "1.23", features = [ "full" ] }
tokio = { version = "1.23", features = ["full"] }
toml = "0.6"
tracing = { workspace = true }
tracing-subscriber = { version = "0.3.7", features = [ "env-filter" ] }
tracing-subscriber = { version = "0.3.7", features = ["env-filter"] }
url = "2.2.2"
uuid = "^1.0"
wasmtime = { workspace = true }
@ -67,13 +68,16 @@ wasmtime = { workspace = true }
openssl = { version = "0.10" }
[dev-dependencies]
hyper = { version = "0.14", features = [ "full" ] }
hyper = { version = "0.14", features = ["full"] }
sha2 = "0.10.1"
which = "4.2.5"
[build-dependencies]
cargo-target-dep = { git = "https://github.com/fermyon/cargo-target-dep", rev = "b7b1989fe0984c0f7c4966398304c6538e52fe49" }
vergen = { version = "7", default-features = false, features = [ "build", "git" ] }
vergen = { version = "7", default-features = false, features = [
"build",
"git",
] }
[features]
default = []
@ -85,23 +89,23 @@ outbound-mysql-tests = []
[workspace]
members = [
"crates/abi-conformance",
"crates/app",
"crates/build",
"crates/config",
"crates/core",
"crates/http",
"crates/loader",
"crates/manifest",
"crates/outbound-http",
"crates/outbound-redis",
"crates/plugins",
"crates/redis",
"crates/templates",
"crates/testing",
"crates/trigger",
"sdk/rust",
"sdk/rust/macro"
"crates/abi-conformance",
"crates/app",
"crates/build",
"crates/config",
"crates/core",
"crates/http",
"crates/loader",
"crates/manifest",
"crates/outbound-http",
"crates/outbound-redis",
"crates/plugins",
"crates/redis",
"crates/templates",
"crates/testing",
"crates/trigger",
"sdk/rust",
"sdk/rust/macro",
]
[workspace.dependencies]

View File

@ -9,17 +9,23 @@ anyhow = "1"
async-trait = "0.1.52"
bindle = { workspace = true }
bytes = "1.1.0"
dirs = "4.0"
dunce = "1.0"
futures = "0.3.17"
glob = "0.3.0"
itertools = "0.10.3"
lazy_static = "1.4.0"
mime_guess = { version = "2.0" }
oci-distribution = { git = "https://github.com/radu-matei/oci-distribution", branch = "update-dockerhub-default-registry" }
outbound-http = { path = "../outbound-http" }
path-absolutize = "3.0.11"
regex = "1.5.4"
reqwest = "0.11.9"
sha2 = "0.10.1"
serde = { version = "1.0", features = [ "derive" ] }
semver = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
spin-manifest = { path = "../manifest" }
sha2 = "0.10.1"
tempfile = "3.3.0"
tokio = { version = "1.23", features = [ "full" ] }
tokio-util = "0.6"

View File

@ -16,7 +16,7 @@ pub(crate) async fn create_dir(base: impl AsRef<Path>, id: &str) -> Result<PathB
}
/// Get the path of a file relative to a given directory.
pub(crate) fn to_relative(path: impl AsRef<Path>, relative_to: impl AsRef<Path>) -> Result<String> {
pub fn to_relative(path: impl AsRef<Path>, relative_to: impl AsRef<Path>) -> Result<String> {
let rel = path.as_ref().strip_prefix(&relative_to).with_context(|| {
format!(
"Copied path '{}' did not belong with expected prefix '{}'",

View File

@ -1,7 +1,7 @@
//! Loaders for Spin applications.
//! This crate implements the possible application sources for Spin applications,
//! and includes functionality to convert the specific configuration (for example
//! local configuration files, or pulled from a Bindle) into Spin configuration that
//! local configuration files, pulled from Bindle, or from OCI) into Spin configuration that
//! can be consumed by the Spin execution context.
//!
//! This crate can be extended (or replaced entirely) to support additional loaders,
@ -15,6 +15,7 @@ pub mod bindle;
mod common;
pub mod digest;
pub mod local;
pub mod oci;
/// Load a Spin application configuration from a spin.toml manifest file.
pub use local::from_file;
@ -24,3 +25,5 @@ pub use crate::bindle::from_bindle;
/// Maximum number of assets to process in parallel
pub(crate) const MAX_PARALLEL_ASSET_PROCESSING: usize = 16;
pub use assets::to_relative;

View File

@ -0,0 +1,166 @@
//! Cache for OCI registry entities.
use anyhow::{bail, Context, Result};
use oci_distribution::Reference;
use tokio::fs;
use std::path::{Path, PathBuf};
const CONFIG_DIR: &str = "spin";
const REGISTRY_CACHE_DIR: &str = "registry";
const MANIFESTS_DIR: &str = "manifests";
const WASM_DIR: &str = "wasm";
const DATA_DIR: &str = "data";
const MANIFEST_FILE: &str = "manifest.json";
const CONFIG_FILE: &str = "config.json";
const LATEST_TAG: &str = "latest";
/// Cache for registry entities.
pub struct Cache {
/// Root directory for the cache instance.
pub root: PathBuf,
}
impl Cache {
/// Create a new cache given an optional root directory.
pub async fn new(root: Option<PathBuf>) -> Result<Self> {
let root = match root {
Some(root) => root,
None => dirs::cache_dir()
.context("cannot get cache directory")?
.join(CONFIG_DIR),
};
let root = root.join(REGISTRY_CACHE_DIR);
Self::ensure_dirs(&root).await?;
Ok(Self { root })
}
/// The manifests directory for the current cache.
fn manifests_dir(&self) -> PathBuf {
self.root.join(MANIFESTS_DIR)
}
/// The Wasm bytes directory for the current cache.
fn wasm_dir(&self) -> PathBuf {
self.root.join(WASM_DIR)
}
/// The data directory for the current cache.
fn data_dir(&self) -> PathBuf {
self.root.join(DATA_DIR)
}
/// Return the path to a wasm file given its digest.
pub fn wasm_file(&self, digest: impl AsRef<str>) -> Result<PathBuf> {
let path = &self.wasm_dir().join(digest.as_ref());
match path.exists() {
true => Ok(path.into()),
false => bail!(format!(
"cannot find wasm file for digest {}",
digest.as_ref()
)),
}
}
/// Return the path to a data file given its digest.
pub fn data_file(&self, digest: impl AsRef<str>) -> Result<PathBuf> {
let path = &self.data_dir().join(digest.as_ref());
match path.exists() {
true => Ok(path.into()),
false => bail!(format!(
"cannot find data file for digest {}",
digest.as_ref()
)),
}
}
/// Get the file path to an OCI manifest given a reference.
/// If the directory for the manifest does not exist, this will create it.
pub async fn oci_manifest_path(&self, reference: impl AsRef<str>) -> Result<PathBuf> {
let reference: Reference = reference
.as_ref()
.parse()
.context("cannot parse OCI reference")?;
let p = self
.manifests_dir()
.join(reference.registry())
.join(reference.repository())
.join(reference.tag().unwrap_or(LATEST_TAG));
if !p.is_dir() {
fs::create_dir_all(&p)
.await
.context("cannot find directory for OCI manifest")?;
}
Ok(p.join(MANIFEST_FILE))
}
/// Get the file path to the OCI configuration object given a reference.
pub async fn lockfile_path(&self, reference: impl AsRef<str>) -> Result<PathBuf> {
let reference: Reference = reference
.as_ref()
.parse()
.context("cannot parse reference")?;
let p = self
.manifests_dir()
.join(reference.registry())
.join(reference.repository())
.join(reference.tag().unwrap_or(LATEST_TAG));
if !p.is_dir() {
fs::create_dir_all(&p)
.await
.context("cannot find configuration object for reference")?;
}
Ok(p.join(CONFIG_FILE))
}
/// Write the contents in the cache's wasm directory.
pub async fn write_wasm(&self, bytes: impl AsRef<[u8]>, digest: impl AsRef<str>) -> Result<()> {
fs::write(self.wasm_dir().join(digest.as_ref()), bytes.as_ref()).await?;
Ok(())
}
/// Write the contents in the cache's data directory.
pub async fn write_data(&self, bytes: impl AsRef<[u8]>, digest: impl AsRef<str>) -> Result<()> {
fs::write(self.data_dir().join(digest.as_ref()), bytes.as_ref()).await?;
Ok(())
}
/// Ensure the expected configuration directories are found in the root.
/// └── <configuration-root>
/// └── registry
/// └──manifests
/// └──wasm
/// └─-data
async fn ensure_dirs(root: &Path) -> Result<()> {
tracing::debug!("using cache root directory {}", root.display());
let p = root.join(MANIFESTS_DIR);
if !p.is_dir() {
fs::create_dir_all(&p).await.with_context(|| {
format!("failed to create manifests directory `{}`", p.display())
})?;
}
let p = root.join(WASM_DIR);
if !p.is_dir() {
fs::create_dir_all(&p)
.await
.with_context(|| format!("failed to create wasm directory `{}`", p.display()))?;
}
let p = root.join(DATA_DIR);
if !p.is_dir() {
fs::create_dir_all(&p)
.await
.with_context(|| format!("failed to create assets directory `{}`", p.display()))?;
}
Ok(())
}
}

View File

@ -0,0 +1,5 @@
//! Functionality to get a prepared Spin application from an OCI registry.
#![deny(missing_docs)]
pub mod cache;

View File

@ -7,17 +7,26 @@ edition = { workspace = true }
[dependencies]
anyhow = "1.0"
bindle = { workspace = true }
docker_credential = "1.0"
dirs = "4.0"
dunce = "1.0"
futures = "0.3.14"
itertools = "0.10.3"
lazy_static = "1.4.0"
mime_guess = { version = "2.0" }
oci-distribution = { git = "https://github.com/radu-matei/oci-distribution", branch = "update-dockerhub-default-registry" }
regex = "1.5.4"
reqwest = "0.11"
semver = "1.0"
serde = { version = "1.0", features = [ "derive" ] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
spin-app = { path = "../app" }
spin-loader = { path = "../loader" }
spin-manifest = { path = "../manifest" }
spin-trigger = { path = "../trigger" }
tempfile = "3.3.0"
thiserror = "1.0.37"
tokio = "1.16.1"
tokio = { version = "1.20", features = ["full"] }
tracing = { workspace = true }
toml = "0.5"
walkdir = "2.3.2"

View File

@ -1,6 +1,6 @@
#![deny(missing_docs)]
use crate::{PublishError, PublishResult};
use super::{PublishError, PublishResult};
use bindle::{standalone::StandaloneRead, Id};
use std::path::Path;

View File

@ -1,6 +1,6 @@
#![deny(missing_docs)]
use crate::{expander::expand_manifest, PublishError, PublishResult};
use super::{expander::expand_manifest, PublishError, PublishResult};
use bindle::{Invoice, Parcel};
use spin_loader::local::parent_dir;
use std::{

View File

@ -1,7 +1,9 @@
#![deny(missing_docs)]
use crate::bindle_writer::{self, ParcelSources};
use crate::{PublishError, PublishResult};
use super::{
bindle_writer::{self, ParcelSources},
PublishError, PublishResult,
};
use bindle::{BindleSpec, Condition, Group, Invoice, Label, Parcel};
use semver::BuildMetadata;
use spin_loader::{

View File

@ -0,0 +1,11 @@
//! Functions for publishing Spin applications to Bindle.
mod bindle_pusher;
mod bindle_writer;
mod error;
mod expander;
pub use bindle_pusher::push_all;
pub use bindle_writer::{prepare_bindle, write};
pub use error::{PublishError, PublishResult};
pub use expander::expand_manifest;

View File

@ -1,13 +1,6 @@
#![deny(missing_docs)]
//! Functions for publishing Spin applications to Bindle.
//! Functions for publishing Spin applications to remote registries.
mod bindle_pusher;
mod bindle_writer;
mod error;
mod expander;
pub use bindle_pusher::push_all;
pub use bindle_writer::{prepare_bindle, write};
pub use error::{PublishError, PublishResult};
pub use expander::expand_manifest;
pub mod bindle;
pub mod oci;

View File

@ -0,0 +1,250 @@
//! Client for distributing Spin applications using OCI registries.
use anyhow::{bail, Context, Result};
use docker_credential::{CredentialRetrievalError, DockerCredential};
use oci_distribution::{
client::{Config, ImageLayer},
manifest::OciImageManifest,
secrets::RegistryAuth,
Reference,
};
use spin_app::locked::{ContentPath, ContentRef};
use spin_loader::oci::cache::Cache;
use spin_manifest::Application;
use tokio::fs;
use walkdir::WalkDir;
use std::path::{Path, PathBuf};
// TODO: the media types for application, wasm module, and data layer are not final.
const SPIN_APPLICATION_MEDIA_TYPE: &str = "application/vnd.fermyon.spin.application.v1+config";
const WASM_LAYER_MEDIA_TYPE: &str = "application/vnd.wasm.content.layer.v1+wasm";
const DATA_MEDIATYPE: &str = "application/vnd.wasm.content.layer.v1+data";
/// Client for interacting with an OCI registry for Spin applications.
pub struct Client {
/// Global cache for the metadata, Wasm modules, and static assets pulled from OCI registries.
pub cache: Cache,
oci: oci_distribution::Client,
}
impl Client {
/// Create a new instance of an OCI client for distributing Spin applications.
pub async fn new(insecure: bool, cache_root: Option<PathBuf>) -> Result<Self> {
let client = oci_distribution::Client::new(Self::build_config(insecure));
let cache = Cache::new(cache_root).await?;
Ok(Self { oci: client, cache })
}
/// Push a Spin application to an OCI registry.
pub async fn push(&mut self, app: &Application, reference: impl AsRef<str>) -> Result<()> {
let reference: Reference = reference
.as_ref()
.parse()
.with_context(|| format!("cannot parse reference {}", reference.as_ref()))?;
let auth = Self::auth(&reference)?;
let working_dir = tempfile::tempdir()?;
// Create a locked application from the application manifest.
// TODO: We don't need an extra copy here for each asset to prepare the application.
// We should be able to use assets::collect instead when constructing the locked app.
let locked = spin_trigger::locked::build_locked_app(app.clone(), working_dir.path())
.context("cannot create locked app")?;
let mut locked = locked.clone();
// For each component in the application, add layers for the wasm module and
// all static assets and update the locked application with the file digests.
let mut layers = Vec::new();
let mut components = Vec::new();
for mut c in locked.components {
// Add the wasm module for the component as layers.
let source = c
.clone()
.source
.content
.source
.context("component loaded from disk should contain a file source")?;
let source = spin_trigger::parse_file_url(source.as_str())?;
let layer = Self::wasm_layer(&source).await?;
let digest = &layer.sha256_digest();
layers.push(layer);
// Update the module source with the content digest of the layer.
c.source.content = ContentRef {
source: None,
digest: Some(digest.clone()),
};
// Add a layer for each file referenced in the mount directory.
// Note that this is in fact a directory, and not a single file, so we need to
// recursively traverse it and add layers for each file.
let mut files = Vec::new();
for f in c.files {
let source = f
.content
.source
.context("file mount loaded from disk should contain a file source")?;
let source = spin_trigger::parse_file_url(source.as_str())?;
// Traverse each mount directory, add all static assets as layers, then update the
// locked application file with the file digest.
for entry in WalkDir::new(&source) {
let entry = entry?;
if entry.file_type().is_file() && !entry.file_type().is_dir() {
tracing::trace!(
"Adding new layer for asset {:?}",
spin_loader::to_relative(entry.path(), &source)?
);
let layer = Self::data_layer(entry.path()).await?;
let digest = &layer.sha256_digest();
layers.push(layer);
files.push(ContentPath {
content: ContentRef {
source: None,
digest: Some(digest.clone()),
},
path: PathBuf::from(spin_loader::to_relative(entry.path(), &source)?),
});
}
}
}
c.files = files;
components.push(c);
}
locked.components = components;
locked.metadata.remove(&"origin".to_string());
let oci_config = Config {
data: serde_json::to_vec(&locked)?,
media_type: SPIN_APPLICATION_MEDIA_TYPE.to_string(),
annotations: None,
};
let manifest = OciImageManifest::build(&layers, &oci_config, None);
let response = self
.oci
.push(&reference, &layers, oci_config, &auth, Some(manifest))
.await
.map(|push_response| push_response.manifest_url)
.context("cannot push Spin application")?;
tracing::info!("Pushed {:?}", response);
Ok(())
}
/// Pull a Spin application from an OCI registry.
pub async fn pull(&mut self, reference: &str) -> Result<()> {
let reference: Reference = reference.parse().context("cannot parse reference")?;
let auth = Self::auth(&reference)?;
// Pull the manifest from the registry.
let (manifest, digest) = self.oci.pull_image_manifest(&reference, &auth).await?;
let manifest_json = serde_json::to_string(&manifest)?;
tracing::debug!("Pulled manifest: {}", manifest_json);
// Write the manifest in `<cache_root>/registry/oci/manifests/repository:<tag_or_latest>/manifest.json`
let m = self.cache.oci_manifest_path(&reference.to_string()).await?;
fs::write(&m, &manifest_json).await?;
let mut cfg_bytes = Vec::new();
self.oci
.pull_blob(&reference, &manifest.config.digest, &mut cfg_bytes)
.await?;
let cfg = std::str::from_utf8(&cfg_bytes)?;
tracing::debug!("Pulled config: {}", cfg);
// Write the config object in `<cache_root>/registry/oci/manifests/repository:<tag_or_latest>/config.json`
let c = self.cache.lockfile_path(&reference.to_string()).await?;
fs::write(&c, &cfg).await?;
// If a layer is a Wasm module, write it in the Wasm directory.
// Otherwise, write it in the data directory.
for layer in manifest.layers {
// Skip pulling if the digest already exists in the wasm or data directories.
if self.cache.wasm_file(&layer.digest).is_ok()
|| self.cache.data_file(&layer.digest).is_ok()
{
tracing::debug!("Layer {} already exists in cache", &layer.digest);
continue;
}
tracing::debug!("Pulling layer {}", &layer.digest);
let mut bytes = Vec::new();
self.oci
.pull_blob(&reference, &layer.digest, &mut bytes)
.await?;
match layer.media_type.as_str() {
WASM_LAYER_MEDIA_TYPE => self.cache.write_wasm(&bytes, &layer.digest).await?,
_ => self.cache.write_data(&bytes, &layer.digest).await?,
}
}
tracing::info!("Pulled {}@{}", reference, digest);
Ok(())
}
/// Create a new wasm layer based on a file.
pub async fn wasm_layer(file: &Path) -> Result<ImageLayer> {
tracing::log::trace!("Reading wasm module from {:?}", file);
Ok(ImageLayer::new(
fs::read(file).await.context("cannot read wasm module")?,
WASM_LAYER_MEDIA_TYPE.to_string(),
None,
))
}
/// Create a new data layer based on a file.
pub async fn data_layer(file: &Path) -> Result<ImageLayer> {
tracing::log::trace!("Reading data file from {:?}", file);
Ok(ImageLayer::new(
fs::read(&file).await?,
DATA_MEDIATYPE.to_string(),
None,
))
}
/// Construct the registry authentication based on the reference.
fn auth(reference: &Reference) -> Result<RegistryAuth> {
let server = reference
.resolve_registry()
.strip_suffix('/')
.unwrap_or_else(|| reference.resolve_registry());
let creds = docker_credential::get_credential(server);
match creds {
Err(CredentialRetrievalError::ConfigNotFound) => Ok(RegistryAuth::Anonymous),
Err(CredentialRetrievalError::NoCredentialConfigured) => Ok(RegistryAuth::Anonymous),
Err(CredentialRetrievalError::ConfigReadError) => Ok(RegistryAuth::Anonymous),
Err(e) => bail!("Error handling docker configuration file: {}", e),
Ok(DockerCredential::UsernamePassword(username, password)) => {
tracing::trace!("Found docker credentials");
Ok(RegistryAuth::Basic(username, password))
}
Ok(DockerCredential::IdentityToken(_)) => {
println!("Cannot use contents of docker config, identity token not supported. Using anonymous auth");
Ok(RegistryAuth::Anonymous)
}
}
}
/// Build the OCI client configuration given the insecure option.
fn build_config(insecure: bool) -> oci_distribution::client::ClientConfig {
let protocol = if insecure {
oci_distribution::client::ClientProtocol::Http
} else {
oci_distribution::client::ClientProtocol::Https
};
oci_distribution::client::ClientConfig {
protocol,
..Default::default()
}
}
}

View File

@ -0,0 +1,3 @@
//! Functions for publishing Spin applications to OCI.
pub mod client;

View File

@ -11,6 +11,7 @@ clap = { version = "3.1.15", features = ["derive", "env"] }
ctrlc = { version = "3.2", features = ["termination"] }
dirs = "4"
futures = "0.3"
oci-distribution = { git = "https://github.com/radu-matei/oci-distribution", branch = "update-dockerhub-default-registry" }
outbound-http = { path = "../outbound-http" }
outbound-redis = { path = "../outbound-redis" }
outbound-pg = { path = "../outbound-pg" }

View File

@ -8,8 +8,8 @@ use tokio::{
time::{sleep, Duration},
};
use crate::stdio::StdioLoggingTriggerHooks;
use crate::{config::TriggerExecutorBuilderConfig, loader::TriggerLoader, stdio::FollowComponents};
use crate::{loader::OciTriggerLoader, stdio::StdioLoggingTriggerHooks};
use crate::{TriggerExecutor, TriggerExecutorBuilder};
pub const APP_LOG_DIR: &str = "APP_LOG_DIR";
@ -88,6 +88,9 @@ where
#[clap(long = "help-args-only", hide = true)]
pub help_args_only: bool,
#[clap(long = "oci")]
pub oci: bool,
}
/// An empty implementation of clap::Args to be used as TriggerExecutor::RunConfig
@ -114,21 +117,44 @@ where
let working_dir = std::env::var(SPIN_WORKING_DIR).context(SPIN_WORKING_DIR)?;
let locked_url = std::env::var(SPIN_LOCKED_URL).context(SPIN_LOCKED_URL)?;
let loader = TriggerLoader::new(working_dir, self.allow_transient_write);
// TODO: I assume there is a way to do this with a single let mut loader: Box<dyn Loader>
// variable instead of the entire executor.
let executor: Executor = match self.oci {
true => {
let loader =
OciTriggerLoader::new(working_dir, self.allow_transient_write, None).await?;
let trigger_config =
TriggerExecutorBuilderConfig::load_from_file(self.runtime_config_file.clone())?;
let trigger_config =
TriggerExecutorBuilderConfig::load_from_file(self.runtime_config_file.clone())?;
let executor: Executor = {
let _sloth_warning = warn_if_wasm_build_slothful();
let _sloth_warning = warn_if_wasm_build_slothful();
let mut builder = TriggerExecutorBuilder::new(loader);
self.update_wasmtime_config(builder.wasmtime_config_mut())?;
let mut builder = TriggerExecutorBuilder::new(loader);
self.update_wasmtime_config(builder.wasmtime_config_mut())?;
let logging_hooks = StdioLoggingTriggerHooks::new(self.follow_components(), self.log);
builder.hooks(logging_hooks);
let logging_hooks =
StdioLoggingTriggerHooks::new(self.follow_components(), self.log);
builder.hooks(logging_hooks);
builder.build(locked_url, trigger_config).await?
builder.build(locked_url, trigger_config).await?
}
false => {
let loader = TriggerLoader::new(working_dir, self.allow_transient_write);
let trigger_config =
TriggerExecutorBuilderConfig::load_from_file(self.runtime_config_file.clone())?;
let _sloth_warning = warn_if_wasm_build_slothful();
let mut builder = TriggerExecutorBuilder::new(loader);
self.update_wasmtime_config(builder.wasmtime_config_mut())?;
let logging_hooks =
StdioLoggingTriggerHooks::new(self.follow_components(), self.log);
builder.hooks(logging_hooks);
builder.build(locked_url, trigger_config).await?
}
};
let run_fut = executor.run(self.run_config);

View File

@ -308,7 +308,7 @@ pub trait TriggerHooks: Send + Sync {
impl TriggerHooks for () {}
pub(crate) fn parse_file_url(url: &str) -> Result<PathBuf> {
pub fn parse_file_url(url: &str) -> Result<PathBuf> {
url::Url::parse(url)
.with_context(|| format!("Invalid URL: {url:?}"))?
.to_file_path()

View File

@ -5,10 +5,12 @@ use std::path::PathBuf;
use anyhow::{ensure, Context, Result};
use async_trait::async_trait;
use spin_app::{
locked::{LockedApp, LockedComponentSource},
locked::{ContentRef, LockedApp, LockedComponentSource},
AppComponent, Loader,
};
use spin_core::StoreBuilder;
use spin_loader::oci::cache::Cache;
use url::Url;
use crate::parse_file_url;
@ -78,3 +80,119 @@ impl Loader for TriggerLoader {
Ok(())
}
}
pub struct OciTriggerLoader {
working_dir: PathBuf,
allow_transient_write: bool,
cache: Cache,
}
impl OciTriggerLoader {
// TODO: support a different cache root directory
pub async fn new(
working_dir: impl Into<PathBuf>,
allow_transient_write: bool,
cache_root: Option<PathBuf>,
) -> Result<Self> {
Ok(Self {
working_dir: working_dir.into(),
allow_transient_write,
cache: Cache::new(cache_root).await?,
})
}
}
#[async_trait]
impl Loader for OciTriggerLoader {
// Read the locked app from the OCI cache and update the module source for each
// component with the path to the Wasm modules from the OCI cache.
async fn load_app(&self, url: &str) -> Result<LockedApp> {
let path = parse_file_url(url)?;
let contents =
std::fs::read(&path).with_context(|| format!("failed to read manifest at {path:?}"))?;
let app: LockedApp =
serde_json::from_slice(&contents).context("failed to parse app lock file JSON")?;
let mut res = app;
let mut components = Vec::new();
for mut c in res.components {
let digest =
c.clone().source.content.digest.expect(
"locked application from OCI cache should have a digest for the source",
);
let url = Url::from_file_path(self.cache.wasm_file(digest)?.to_str().unwrap())
.expect("cannot crate file url from path for module source");
c.source.content = ContentRef {
digest: None,
source: Some(url.to_string()),
};
components.push(c);
}
res.components = components;
Ok(res)
}
async fn load_module(
&self,
engine: &spin_core::wasmtime::Engine,
source: &LockedComponentSource,
) -> Result<spin_core::Module> {
let source = source
.content
.source
.as_ref()
.context("LockedComponentSource missing source field")?;
let path = parse_file_url(source)?;
spin_core::Module::from_file(engine, &path)
.with_context(|| format!("loading module {path:?}"))
}
// Copy static assets from the locked application into a temporary mount directory.
async fn mount_files(
&self,
store_builder: &mut StoreBuilder,
component: &AppComponent,
) -> Result<()> {
let temp_mount = self.working_dir.join("files");
tokio::fs::create_dir_all(&temp_mount)
.await
.context("cannot create temporary mount directory")?;
for f in component.files() {
let src = self
.cache
.data_file(f.clone().content.digest.context(format!(
"static asset {:?} from OCI cache must have a digest",
f
))?)?;
let dst = temp_mount.join(&f.path);
let parent = dst.parent().context(format!(
"path for static asset mount path {:?} must have a parent directory",
dst
))?;
tokio::fs::create_dir_all(&parent)
.await
.context("cannot create directory structure for temporary mounts")?;
tracing::trace!("Attempting to copy {:?}->{:?}", src, dst);
tokio::fs::copy(&src, &dst)
.await
.context("cannot copy file mount")?;
}
if self.allow_transient_write {
store_builder.read_write_preopened_dir(temp_mount, "/".into())?;
} else {
store_builder.read_only_preopened_dir(temp_mount, "/".into())?
}
Ok(())
}
}

View File

@ -9,6 +9,7 @@ use spin_cli::commands::{
external::execute_external_subcommand,
login::LoginCommand,
new::{AddCommand, NewCommand},
oci::OciCommands,
plugins::PluginCommands,
templates::TemplateCommands,
up::UpCommand,
@ -50,6 +51,8 @@ enum SpinApp {
Up(UpCommand),
#[clap(subcommand)]
Bindle(BindleCommands),
#[clap(subcommand)]
Oci(OciCommands),
Deploy(DeployCommand),
Build(BuildCommand),
Login(LoginCommand),
@ -76,6 +79,7 @@ impl SpinApp {
Self::New(cmd) => cmd.run().await,
Self::Add(cmd) => cmd.run().await,
Self::Bindle(cmd) => cmd.run().await,
Self::Oci(cmd) => cmd.run().await,
Self::Deploy(cmd) => cmd.run().await,
Self::Build(cmd) => cmd.run().await,
Self::Trigger(TriggerCommands::Http(cmd)) => cmd.run().await,

View File

@ -12,6 +12,8 @@ pub mod external;
pub mod login;
/// Command for creating a new application.
pub mod new;
/// Commands for working with OCI registries.
pub mod oci;
/// Command for adding a plugin to Spin
pub mod plugins;
/// Commands for working with templates.

View File

@ -126,7 +126,7 @@ impl Prepare {
.unwrap_or_else(|| DEFAULT_MANIFEST_FILE.as_ref());
let dest_dir = &self.staging_dir;
let bindle_id = spin_publish::prepare_bindle(app_file, self.buildinfo, dest_dir)
let bindle_id = spin_publish::bindle::prepare_bindle(app_file, self.buildinfo, dest_dir)
.await
.map_err(crate::wrap_prepare_bindle_error)?;
@ -162,7 +162,7 @@ impl Push {
Some(path) => path.as_path(),
};
let bindle_id = spin_publish::prepare_bindle(app_file, self.buildinfo, dest_dir)
let bindle_id = spin_publish::bindle::prepare_bindle(app_file, self.buildinfo, dest_dir)
.await
.map_err(crate::wrap_prepare_bindle_error)?;
@ -171,7 +171,7 @@ impl Push {
self.bindle_server_url
));
spin_publish::push_all(&dest_dir, &bindle_id, bindle_connection_info.clone())
spin_publish::bindle::push_all(&dest_dir, &bindle_id, bindle_connection_info.clone())
.await
.with_context(|| {
crate::push_all_failed_msg(dest_dir, bindle_connection_info.base_url())

View File

@ -597,7 +597,7 @@ impl DeployCommand {
Some(path) => path.as_path(),
};
let bindle_id = spin_publish::prepare_bindle(&self.app, buildinfo, dest_dir)
let bindle_id = spin_publish::bindle::prepare_bindle(&self.app, buildinfo, dest_dir)
.await
.map_err(crate::wrap_prepare_bindle_error)?;
@ -607,8 +607,10 @@ impl DeployCommand {
bindle_id.version()
);
match spin_publish::push_all(dest_dir, &bindle_id, bindle_connection_info.clone()).await {
Err(spin_publish::PublishError::BindleAlreadyExists(err_msg)) => {
match spin_publish::bindle::push_all(dest_dir, &bindle_id, bindle_connection_info.clone())
.await
{
Err(spin_publish::bindle::PublishError::BindleAlreadyExists(err_msg)) => {
if self.redeploy {
Ok(bindle_id.clone())
} else {
@ -618,9 +620,9 @@ impl DeployCommand {
))
}
}
Err(spin_publish::PublishError::BindleClient(bindle::client::ClientError::Other(
e,
))) if e.to_string().contains("application exceeds") => {
Err(spin_publish::bindle::PublishError::BindleClient(
bindle::client::ClientError::Other(e),
)) if e.to_string().contains("application exceeds") => {
Err(anyhow!(e.trim_start_matches("Unknown error: ").to_owned()))
}
Err(err) => Err(err).with_context(|| {

210
src/commands/oci.rs Normal file
View File

@ -0,0 +1,210 @@
use anyhow::{bail, Context, Result};
use clap::{Parser, Subcommand};
use reqwest::Url;
use spin_app::locked::LockedApp;
use spin_trigger::cli::{SPIN_LOCKED_URL, SPIN_WORKING_DIR};
use std::{
ffi::OsString,
path::{Path, PathBuf},
};
use crate::opts::*;
/// Commands for working with OCI registries to distribute applications.
/// The set of commands for OCI is EXPERIMENTAL, and may change in future versions of Spin.
/// Currently, the OCI commands are reusing the credentials from ~/.docker/config.json to
/// authenticate to registries.
#[derive(Subcommand, Debug)]
pub enum OciCommands {
/// Push a Spin application to an OCI registry.
Push(Push),
/// Pull a Spin application from an OCI registry.
Pull(Pull),
/// Run a Spin application from an OCI registry.
Run(Run),
}
impl OciCommands {
pub async fn run(self) -> Result<()> {
match self {
OciCommands::Push(cmd) => cmd.run().await,
OciCommands::Pull(cmd) => cmd.run().await,
OciCommands::Run(cmd) => cmd.run().await,
}
}
}
#[derive(Parser, Debug)]
pub struct Push {
/// Path to spin.toml
#[clap(
name = APP_CONFIG_FILE_OPT,
short = 'f',
long = "file",
)]
pub app: Option<PathBuf>,
/// Ignore server certificate errors
#[clap(
name = INSECURE_OPT,
short = 'k',
long = "insecure",
takes_value = false,
)]
pub insecure: bool,
/// Reference of the Spin application
#[clap()]
pub reference: String,
}
impl Push {
pub async fn run(self) -> Result<()> {
let app_file = self
.app
.as_deref()
.unwrap_or_else(|| DEFAULT_MANIFEST_FILE.as_ref());
let dir = tempfile::tempdir()?;
let app = spin_loader::local::from_file(&app_file, Some(dir.path()), &None).await?;
let mut client = spin_publish::oci::client::Client::new(self.insecure, None).await?;
client.push(&app, &self.reference).await?;
Ok(())
}
}
#[derive(Parser, Debug)]
pub struct Pull {
/// Ignore server certificate errors
#[clap(
name = INSECURE_OPT,
short = 'k',
long = "insecure",
takes_value = false,
)]
pub insecure: bool,
/// Reference of the Spin application
#[clap()]
pub reference: String,
}
impl Pull {
/// Pull a Spin application from an OCI registry
pub async fn run(self) -> Result<()> {
let mut client = spin_publish::oci::client::Client::new(self.insecure, None).await?;
client.pull(&self.reference).await?;
Ok(())
}
}
#[derive(Parser, Debug)]
pub struct Run {
/// Connect to the registry endpoint over HTTP, not HTTPS.
#[clap(
name = INSECURE_OPT,
short = 'k',
long = "insecure",
takes_value = false,
)]
pub insecure: bool,
/// Pass an environment variable (key=value) to all components of the application.
#[clap(short = 'e', long = "env", parse(try_from_str = parse_env_var))]
pub env: Vec<(String, String)>,
/// Reference of the Spin application
#[clap()]
pub reference: String,
/// All other args, to be passed through to the trigger
/// TODO: The arguments have to be passed like `-- --follow-all` for now.
#[clap(hide = true)]
pub trigger_args: Vec<OsString>,
}
impl Run {
/// Run a Spin application from an OCI registry
pub async fn run(self) -> Result<()> {
let mut client = spin_publish::oci::client::Client::new(self.insecure, None).await?;
client.pull(&self.reference).await?;
let app_path = client.cache.lockfile_path(&self.reference).await?;
let working_dir = tempfile::tempdir()?;
// Read the lockfile from the registry cache and mutate it to add environment variables.
let mut app: LockedApp = serde_json::from_slice(&tokio::fs::read(&app_path).await?)?;
// Apply --env to component environments
if !self.env.is_empty() {
for component in app.components.iter_mut() {
component.env.extend(self.env.iter().cloned());
}
}
let trigger_type = &app
.triggers
.first()
.context("application expected to have at least one trigger")?
.trigger_type;
let mut cmd = std::process::Command::new(std::env::current_exe().unwrap());
cmd.arg("trigger")
.arg(trigger_type)
// TODO: This should be inferred from the lockfile.
.arg("--oci")
// TODO: Once we figure out how to handle the flags for triggers, i.e. `-- --follow-all`, remove this.
.arg("--follow-all")
.args(&self.trigger_args)
.env(SPIN_WORKING_DIR, working_dir.path());
let app_path = Self::write_locked_app(&app, working_dir.path()).await?;
let url = Url::from_file_path(app_path)
.expect("cannot parse URL from locked app file")
.to_string();
cmd.env(SPIN_LOCKED_URL, &url);
tracing::trace!("Running trigger executor: {:?}", cmd);
let mut child = cmd.spawn().context("Failed to execute trigger")?;
// Terminate trigger executor if `spin up` itself receives a termination signal
#[cfg(not(windows))]
{
// https://github.com/nix-rust/nix/issues/656
let pid = nix::unistd::Pid::from_raw(child.id() as i32);
ctrlc::set_handler(move || {
if let Err(err) = nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM) {
tracing::warn!("Failed to kill trigger handler process: {:?}", err)
}
})?;
}
let status = child.wait()?;
if status.success() {
Ok(())
} else {
bail!(status);
}
}
async fn write_locked_app(app: &LockedApp, working_dir: &Path) -> Result<PathBuf> {
let path = working_dir.join("spin.lock");
let contents = serde_json::to_vec(&app)?;
tokio::fs::write(&path, contents).await?;
Ok(path)
}
}
// Parse the environment variables passed in `key=value` pairs.
fn parse_env_var(s: &str) -> Result<(String, String)> {
let parts: Vec<_> = s.splitn(2, '=').collect();
if parts.len() != 2 {
bail!("Environment variable must be of the form `key=value`");
}
Ok((parts[0].to_owned(), parts[1].to_owned()))
}

View File

@ -4,7 +4,7 @@ mod sloth;
use anyhow::{anyhow, Result};
use semver::BuildMetadata;
use spin_publish::PublishError;
use spin_publish::bindle::PublishError;
use std::path::Path;
pub(crate) fn push_all_failed_msg(path: &Path, server_url: &str) -> String {