Feature-gate public use of `http-body` and `hyper` within `aws-smithy-types` (#3088)

## Motivation and Context
Implements https://github.com/awslabs/smithy-rs/issues/3033

## Description
This PR hides behind cargo features the third-party types from
`http-body` and `hyper` crates that are used in`aws-smithy-types`'
public API. Customers need to opt-in by enabling a cargo feature
`http-body-0-4-x` in `aws-smithy-types` to create an `SdkBody` or
`ByteStream` using those third-party types. For more details, please see
[the upgrade
guide](https://github.com/awslabs/smithy-rs/discussions/3089).

As can been seen from code changes, to reduce the surface area where we
need to feature-gate things, we have fused the
`aws_smithy_types::body::Inner::Streaming` enum variant into
`aws_smithy_types::body::Inner::Dyn` variant, thereby removing
`SdkBody::from_dyn`.

## Testing
Relied on existing tests in CI

## Checklist
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the
smithy-rs codegen or runtime crates

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._

---------

Co-authored-by: Zelda Hessler <zhessler@amazon.com>
This commit is contained in:
ysaito1001 2023-10-25 11:45:22 -05:00 committed by GitHub
parent 08a533ffd3
commit 2a51e0bceb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 537 additions and 383 deletions

View File

@ -474,3 +474,15 @@ message = "Enable custom auth schemes to work by changing the code generated aut
references = ["smithy-rs#3034", "smithy-rs#3087"]
meta = { "breaking" = false, "tada" = false, "bug" = true, "target" = "client" }
author = "rcoh"
[[smithy-rs]]
message = "Publicly exposed types from `http-body` and `hyper` crates within `aws-smithy-types` are now feature-gated. See the [upgrade guidance](https://github.com/awslabs/smithy-rs/discussions/3089) for details."
references = ["smithy-rs#3033", "smithy-rs#3088"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "all" }
author = "ysaito1001"
[[smithy-rs]]
message = "`ByteStream::poll_next` is now feature-gated. You can turn on a cargo feature `byte-stream-poll-next` in `aws-smithy-types` to use it."
references = ["smithy-rs#3033", "smithy-rs#3088"]
meta = { "breaking" = true, "tada" = false, "bug" = false, "target" = "all" }
author = "ysaito1001"

View File

@ -21,7 +21,7 @@ aws-smithy-checksums = { path = "../../../rust-runtime/aws-smithy-checksums" }
aws-smithy-http = { path = "../../../rust-runtime/aws-smithy-http" }
aws-smithy-runtime = { path = "../../../rust-runtime/aws-smithy-runtime", features = ["client"] }
aws-smithy-runtime-api = { path = "../../../rust-runtime/aws-smithy-runtime-api", features = ["client"] }
aws-smithy-types = { path = "../../../rust-runtime/aws-smithy-types" }
aws-smithy-types = { path = "../../../rust-runtime/aws-smithy-types", features = ["http-body-0-4-x"] }
bytes = "1"
hex = "0.4.3"
http = "0.2.9"

View File

@ -19,7 +19,7 @@ use aws_smithy_runtime_api::client::interceptors::context::{
use aws_smithy_runtime_api::client::interceptors::Intercept;
use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_types::body::{BoxBody, SdkBody};
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::config_bag::{ConfigBag, Layer, Storable, StoreReplace};
use aws_smithy_types::error::operation::BuildError;
use http::HeaderValue;
@ -173,7 +173,7 @@ fn wrap_streaming_request_body_in_checksum_calculating_body(
let body = AwsChunkedBody::new(body, aws_chunked_body_options);
SdkBody::from_dyn(BoxBody::new(body))
SdkBody::from_body_0_4(body)
})
};
@ -269,7 +269,7 @@ mod tests {
let crc32c_checksum = crc32c_checksum.finalize();
let mut request = HttpRequest::new(
ByteStream::read_from()
ByteStream::read_with_body_0_4_from()
.path(&file)
.buffer_size(1024)
.build()

View File

@ -14,7 +14,7 @@ use aws_smithy_runtime_api::client::interceptors::context::{
};
use aws_smithy_runtime_api::client::interceptors::Intercept;
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_types::body::{BoxBody, SdkBody};
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::config_bag::{ConfigBag, Layer, Storable, StoreReplace};
use http::HeaderValue;
use std::{fmt, mem};
@ -119,11 +119,11 @@ pub(crate) fn wrap_body_with_checksum_validator(
use aws_smithy_checksums::body::validate;
body.map(move |body| {
SdkBody::from_dyn(BoxBody::new(validate::ChecksumBody::new(
SdkBody::from_body_0_4(validate::ChecksumBody::new(
body,
checksum_algorithm.into_impl(),
precalculated_checksum.clone(),
)))
))
})
}

View File

@ -21,7 +21,11 @@ async fn set_correct_headers() {
let _resp = client
.upload_archive()
.vault_name("vault")
.body(ByteStream::from_path("tests/test-file.txt").await.unwrap())
.body(
ByteStream::from_path_body_0_4("tests/test-file.txt")
.await
.unwrap(),
)
.send()
.await;
let req = handler.expect_request();

View File

@ -177,7 +177,7 @@ async fn test_checksum_on_streaming_request<'a>(
use std::io::Write;
file.write_all(body).unwrap();
let body = aws_sdk_s3::primitives::ByteStream::read_from()
let body = aws_sdk_s3::primitives::ByteStream::read_with_body_0_4_from()
.path(file.path())
.buffer_size(1024)
.build()

View File

@ -71,12 +71,12 @@ class RequiredCustomizations : ClientCodegenDecorator {
override fun extras(codegenContext: ClientCodegenContext, rustCrate: RustCrate) {
val rc = codegenContext.runtimeConfig
// Add rt-tokio feature for `ByteStream::from_path`
// Add rt-tokio and http-body-0-4-x features for `ByteStream::from_path_0_4`
rustCrate.mergeFeature(
Feature(
"rt-tokio",
true,
listOf("aws-smithy-async/rt-tokio", "aws-smithy-http/rt-tokio"),
listOf("aws-smithy-async/rt-tokio", "aws-smithy-types/rt-tokio", "aws-smithy-types/http-body-0-4-x"),
),
)

View File

@ -29,12 +29,12 @@ class ClientHttpBoundProtocolPayloadGenerator(
_cfg.interceptor_state().store_put(signer_sender);
let adapter: #{aws_smithy_http}::event_stream::MessageStreamAdapter<_, _> =
${params.outerName}.${params.memberName}.into_body_stream(marshaller, error_marshaller, signer);
let body: #{SdkBody} = #{hyper}::Body::wrap_stream(adapter).into();
body
#{SdkBody}::from_body_0_4(#{hyper}::Body::wrap_stream(adapter))
}
""",
"hyper" to CargoDependency.HyperWithStream.toType(),
"SdkBody" to RuntimeType.sdkBody(codegenContext.runtimeConfig),
"SdkBody" to CargoDependency.smithyTypes(codegenContext.runtimeConfig).withFeature("http-body-0-4-x")
.toType().resolve("body::SdkBody"),
"aws_smithy_http" to RuntimeType.smithyHttp(codegenContext.runtimeConfig),
"DeferredSigner" to RuntimeType.smithyEventStream(codegenContext.runtimeConfig).resolve("frame::DeferredSigner"),
"marshallerConstructorFn" to params.marshallerConstructorFn,

View File

@ -37,8 +37,14 @@ class ServerRequiredCustomizations : ServerCodegenDecorator {
override fun extras(codegenContext: ServerCodegenContext, rustCrate: RustCrate) {
val rc = codegenContext.runtimeConfig
// Add rt-tokio feature for `ByteStream::from_path`
rustCrate.mergeFeature(Feature("rt-tokio", true, listOf("aws-smithy-http/rt-tokio")))
// Add rt-tokio and http-body-0-4-x features for `ByteStream::from_path_body_0_4`
rustCrate.mergeFeature(
Feature(
"rt-tokio",
true,
listOf("aws-smithy-types/rt-tokio", "aws-smithy-types/http-body-0-4-x"),
),
)
rustCrate.withModule(ServerRustModule.Types) {
pubUseSmithyPrimitives(codegenContext, codegenContext.model)(this)

View File

@ -16,7 +16,7 @@ publish = true
aws-smithy-http = { path = "../aws-smithy-http" }
aws-smithy-http-server = { path = "../aws-smithy-http-server", features = ["aws-lambda"] }
aws-smithy-json = { path = "../aws-smithy-json" }
aws-smithy-types = { path = "../aws-smithy-types" }
aws-smithy-types = { path = "../aws-smithy-types", features = ["byte-stream-poll-next", "http-body-0-4-x"] }
aws-smithy-xml = { path = "../aws-smithy-xml" }
bytes = "1.2"
futures = "0.3"

View File

@ -147,5 +147,5 @@ async def handler(bytestream):
fn streaming_bytestream_from_vec(chunks: Vec<&'static str>) -> ByteStream {
let stream = stream::iter(chunks.into_iter().map(Ok::<_, io::Error>));
let body = Body::wrap_stream(stream);
ByteStream::new(SdkBody::from(body))
ByteStream::new(SdkBody::from_body_0_4(body))
}

View File

@ -407,7 +407,7 @@ impl ByteStream {
#[staticmethod]
pub fn from_path_blocking(py: Python, path: String) -> PyResult<Py<PyAny>> {
let byte_stream = Handle::current().block_on(async {
aws_smithy_types::byte_stream::ByteStream::from_path(path)
aws_smithy_types::byte_stream::ByteStream::from_path_body_0_4(path)
.await
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
})?;
@ -423,7 +423,7 @@ impl ByteStream {
#[staticmethod]
pub fn from_path(py: Python, path: String) -> PyResult<&PyAny> {
pyo3_asyncio::tokio::future_into_py(py, async move {
let byte_stream = aws_smithy_types::byte_stream::ByteStream::from_path(path)
let byte_stream = aws_smithy_types::byte_stream::ByteStream::from_path_body_0_4(path)
.await
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
Ok(Self(Arc::new(Mutex::new(byte_stream))))

View File

@ -21,7 +21,7 @@ request-id = ["dep:uuid"]
async-trait = "0.1"
aws-smithy-http = { path = "../aws-smithy-http", features = ["rt-tokio"] }
aws-smithy-json = { path = "../aws-smithy-json" }
aws-smithy-types = { path = "../aws-smithy-types" }
aws-smithy-types = { path = "../aws-smithy-types", features = ["http-body-0-4-x", "hyper-0-14-x"] }
aws-smithy-xml = { path = "../aws-smithy-xml" }
bytes = "1.1"
futures-util = { version = "0.3.16", default-features = false }

View File

@ -16,7 +16,7 @@ rt-tokio = ["aws-smithy-types/rt-tokio"]
[dependencies]
aws-smithy-eventstream = { path = "../aws-smithy-eventstream", optional = true }
aws-smithy-types = { path = "../aws-smithy-types" }
aws-smithy-types = { path = "../aws-smithy-types", features = ["byte-stream-poll-next", "http-body-0-4-x"] }
bytes = "1"
bytes-utils = "0.1"
http = "0.2.3"

View File

@ -8,10 +8,6 @@
//! Types for representing the body of an HTTP request or response
/// A boxed generic HTTP body that, when consumed, will result in [`Bytes`](bytes::Bytes) or an [`Error`](aws_smithy_types::body::Error).
#[deprecated(note = "Moved to `aws_smithy_types::body::BoxBody`.")]
pub type BoxBody = aws_smithy_types::body::BoxBody;
/// A generic, boxed error that's `Send` and `Sync`
#[deprecated(note = "`Moved to `aws_smithy_types::body::Error`.")]
pub type Error = aws_smithy_types::body::Error;

View File

@ -342,7 +342,7 @@ mod tests {
let chunks: Vec<Result<_, IOError>> =
vec![Ok(encode_message("one")), Ok(encode_message("two"))];
let chunk_stream = futures_util::stream::iter(chunks);
let body = SdkBody::from(Body::wrap_stream(chunk_stream));
let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
assert_eq!(
TestMessage("one".into()),
@ -363,7 +363,7 @@ mod tests {
Ok(Bytes::from_static(&[])),
];
let chunk_stream = futures_util::stream::iter(chunks);
let body = SdkBody::from(Body::wrap_stream(chunk_stream));
let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
assert_eq!(
TestMessage("one".into()),
@ -384,7 +384,7 @@ mod tests {
Ok(encode_message("three").split_to(10)),
];
let chunk_stream = futures_util::stream::iter(chunks);
let body = SdkBody::from(Body::wrap_stream(chunk_stream));
let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
assert_eq!(
TestMessage("one".into()),
@ -410,7 +410,7 @@ mod tests {
)),
];
let chunk_stream = futures_util::stream::iter(chunks);
let body = SdkBody::from(Body::wrap_stream(chunk_stream));
let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
assert_eq!(
TestMessage("one".into()),
@ -463,7 +463,7 @@ mod tests {
];
let chunk_stream = futures_util::stream::iter(chunks);
let body = SdkBody::from(Body::wrap_stream(chunk_stream));
let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
for payload in &["one", "two", "three", "four", "five", "six", "seven", "eight"] {
assert_eq!(
@ -483,7 +483,7 @@ mod tests {
Err(IOError::new(ErrorKind::ConnectionReset, FakeError)),
];
let chunk_stream = futures_util::stream::iter(chunks);
let body = SdkBody::from(Body::wrap_stream(chunk_stream));
let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
assert_eq!(
TestMessage("one".into()),
@ -504,7 +504,7 @@ mod tests {
Ok(Bytes::from_static(&[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])),
];
let chunk_stream = futures_util::stream::iter(chunks);
let body = SdkBody::from(Body::wrap_stream(chunk_stream));
let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
assert_eq!(
TestMessage("one".into()),
@ -521,7 +521,7 @@ mod tests {
let chunks: Vec<Result<_, IOError>> =
vec![Ok(encode_initial_response()), Ok(encode_message("one"))];
let chunk_stream = futures_util::stream::iter(chunks);
let body = SdkBody::from(Body::wrap_stream(chunk_stream));
let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
assert!(receiver.try_recv_initial().await.unwrap().is_some());
assert_eq!(
@ -535,7 +535,7 @@ mod tests {
let chunks: Vec<Result<_, IOError>> =
vec![Ok(encode_message("one")), Ok(encode_message("two"))];
let chunk_stream = futures_util::stream::iter(chunks);
let body = SdkBody::from(Body::wrap_stream(chunk_stream));
let body = SdkBody::from_body_0_4(Body::wrap_stream(chunk_stream));
let mut receiver = Receiver::<TestMessage, EventStreamError>::new(Unmarshaller, body);
assert!(receiver.try_recv_initial().await.unwrap().is_none());
assert_eq!(

View File

@ -25,7 +25,7 @@ aws-smithy-async = { path = "../aws-smithy-async" }
aws-smithy-http = { path = "../aws-smithy-http" }
aws-smithy-protocol-test = { path = "../aws-smithy-protocol-test", optional = true }
aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api" }
aws-smithy-types = { path = "../aws-smithy-types" }
aws-smithy-types = { path = "../aws-smithy-types", features = ["http-body-0-4-x"] }
bytes = "1"
fastrand = "2.0.0"
http = "0.2.8"

View File

@ -358,7 +358,10 @@ where
let mut client = self.client.clone();
let fut = client.call(request);
HttpConnectorFuture::new(async move {
Ok(fut.await.map_err(downcast_error)?.map(SdkBody::from))
Ok(fut
.await
.map_err(downcast_error)?
.map(SdkBody::from_body_0_4))
})
}
}

View File

@ -88,7 +88,7 @@ fn record_body(
event_bus: Arc<Mutex<Vec<Event>>>,
) -> JoinHandle<()> {
let (sender, output_body) = hyper::Body::channel();
let real_body = std::mem::replace(body, SdkBody::from(output_body));
let real_body = std::mem::replace(body, SdkBody::from_body_0_4(output_body));
tokio::spawn(async move {
let mut real_body = real_body;
let mut sender = sender;

View File

@ -294,7 +294,7 @@ impl HttpConnector for ReplayingClient {
let _initial_request = events.pop_front().unwrap();
let (sender, response_body) = hyper::Body::channel();
let body = SdkBody::from(response_body);
let body = SdkBody::from_body_0_4(response_body);
let recording = self.recorded_requests.clone();
let recorded_request = tokio::spawn(async move {
let mut data_read = vec![];

View File

@ -23,7 +23,7 @@ use aws_smithy_runtime::{ev, match_events};
use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext;
use aws_smithy_runtime_api::client::orchestrator::OrchestratorError;
use aws_smithy_runtime_api::client::retries::classifiers::{ClassifyRetry, RetryAction};
use aws_smithy_types::body::{BoxBody, SdkBody};
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::retry::{ErrorKind, ProvideErrorKind, ReconnectMode, RetryConfig};
use aws_smithy_types::timeout::TimeoutConfig;
use hyper::client::Builder as HyperBuilder;
@ -150,7 +150,7 @@ async fn wire_level_test(
let request = http::Request::builder()
.uri(endpoint_url.clone())
// Make the body non-replayable since we don't actually want to retry
.body(SdkBody::from_dyn(BoxBody::new(SdkBody::from("body"))))
.body(SdkBody::from_body_0_4(SdkBody::from("body")))
.unwrap()
.try_into()
.unwrap();

View File

@ -11,6 +11,9 @@ license = "Apache-2.0"
repository = "https://github.com/awslabs/smithy-rs"
[features]
byte-stream-poll-next = []
http-body-0-4-x = ["dep:http-body-0-4"]
hyper-0-14-x = ["dep:hyper-0-14"]
rt-tokio = ["dep:tokio-util", "dep:tokio", "tokio?/rt", "tokio?/fs", "tokio?/io-util", "tokio-util?/io"]
test-util = []
serde-serialize = []
@ -21,8 +24,8 @@ base64-simd = "0.8"
bytes = "1"
bytes-utils = "0.1"
http = "0.2.3"
http-body = "0.4.4"
hyper = "0.14.26"
http-body-0-4 = { package = "http-body", version = "0.4.4", optional = true }
hyper-0-14 = { package = "hyper", version = "0.14.26", optional = true }
itoa = "1.0.0"
num-integer = "0.1.44"
pin-project-lite = "0.2.9"

View File

@ -2,14 +2,13 @@ allowed_external_types = [
"bytes::bytes::Bytes",
"bytes::buf::buf_impl::Buf",
# TODO(https://github.com/awslabs/smithy-rs/issues/3033): Feature gate based on unstable versions
# TODO(https://github.com/awslabs/smithy-rs/issues/2412): Support cargo-features for cargo-check-external-types
"http_body::Body",
"http_body::combinators::box_body::BoxBody",
"hyper::body::body::Body",
# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Feature gate Tokio `AsyncRead`
# TODO(https://github.com/awslabs/smithy-rs/issues/2412): Support cargo-features for cargo-check-external-types
"tokio::io::async_read::AsyncRead",
# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Feature gate references to Tokio `File`
# TODO(https://github.com/awslabs/smithy-rs/issues/2412): Support cargo-features for cargo-check-external-types
"tokio::fs::file::File",
]

View File

@ -6,15 +6,20 @@
//! Types for representing the body of an HTTP request or response
use bytes::Bytes;
use http::{HeaderMap, HeaderValue};
use http_body::{Body, SizeHint};
use pin_project_lite::pin_project;
use std::error::Error as StdError;
use std::fmt::{self, Debug, Formatter};
use std::future::poll_fn;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
/// This module is named after the `http-body` version number since we anticipate
/// needing to provide equivalent functionality for 1.x of that crate in the future.
/// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`.
#[cfg(feature = "http-body-0-4-x")]
pub mod http_body_0_4_x;
/// A generic, boxed error that's `Send` and `Sync`
pub type Error = Box<dyn StdError + Send + Sync>;
@ -49,7 +54,10 @@ impl Debug for SdkBody {
}
/// A boxed generic HTTP body that, when consumed, will result in [`Bytes`] or an [`Error`].
pub type BoxBody = http_body::combinators::BoxBody<Bytes, Error>;
enum BoxBody {
#[cfg(feature = "http-body-0-4-x")]
HttpBody04(http_body_0_4::combinators::BoxBody<Bytes, Error>),
}
pin_project! {
#[project = InnerProj]
@ -59,14 +67,9 @@ pin_project! {
inner: Option<Bytes>
},
// A streaming body
Streaming {
#[pin]
inner: hyper::Body
},
// Also a streaming body
Dyn {
#[pin]
inner: BoxBody
inner: BoxBody,
},
/// When a streaming body is transferred out to a stream parser, the body is replaced with
@ -80,33 +83,22 @@ impl Debug for Inner {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match &self {
Inner::Once { inner: once } => f.debug_tuple("Once").field(once).finish(),
Inner::Streaming { inner: streaming } => {
f.debug_tuple("Streaming").field(streaming).finish()
}
Inner::Taken => f.debug_tuple("Taken").finish(),
Inner::Dyn { .. } => write!(f, "BoxBody"),
Inner::Taken => f.debug_tuple("Taken").finish(),
}
}
}
impl SdkBody {
/// Construct an SdkBody from a Boxed implementation of http::Body
pub fn from_dyn(body: BoxBody) -> Self {
Self {
inner: Inner::Dyn { inner: body },
rebuild: None,
bytes_contents: None,
}
}
/// Construct an explicitly retryable SDK body
///
/// _Note: This is probably not what you want_
///
/// All bodies constructed from in-memory data (`String`, `Vec<u8>`, `Bytes`, etc.) will be
/// retryable out of the box. If you want to read data from a file, you should use
/// [`ByteStream::from_path`](crate::byte_stream::ByteStream::from_path). This function
/// is only necessary when you need to enable retries for your own streaming container.
/// retryable out of the box. If you want to read data from a file, you should turn on a feature
/// `http-body-0-4-x` and use `ByteStream::from_path_body_0_4`.
///
/// This function is only necessary when you need to enable retries for your own streaming container.
pub fn retryable(f: impl Fn() -> SdkBody + Send + Sync + 'static) -> Self {
let initial = f();
SdkBody {
@ -135,9 +127,14 @@ impl SdkBody {
}
}
fn poll_inner(
pub(crate) async fn next(&mut self) -> Option<Result<Bytes, Error>> {
let mut me = Pin::new(self);
poll_fn(|cx| me.as_mut().poll_next(cx)).await
}
pub(crate) fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
#[allow(unused)] cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
let this = self.project();
match this.inner.project() {
@ -149,8 +146,17 @@ impl SdkBody {
None => Poll::Ready(None),
}
}
InnerProj::Streaming { inner: body } => body.poll_data(cx).map_err(|e| e.into()),
InnerProj::Dyn { inner: box_body } => box_body.poll_data(cx),
InnerProj::Dyn { inner: body } => match body.get_mut() {
#[cfg(feature = "http-body-0-4-x")]
BoxBody::HttpBody04(box_body) => {
use http_body_0_4::Body;
Pin::new(box_body).poll_data(cx)
}
#[allow(unreachable_patterns)]
_ => unreachable!(
"enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
),
},
InnerProj::Taken => {
Poll::Ready(Some(Err("A `Taken` body should never be polled".into())))
}
@ -184,7 +190,53 @@ impl SdkBody {
/// Return the length, in bytes, of this SdkBody. If this returns `None`, then the body does not
/// have a known length.
pub fn content_length(&self) -> Option<u64> {
http_body::Body::size_hint(self).exact()
match self.bounds_on_remaining_length() {
(lo, Some(hi)) if lo == hi => Some(lo),
_ => None,
}
}
#[allow(dead_code)] // used by a feature-gated `http-body`'s trait method
pub(crate) fn is_end_stream(&self) -> bool {
match &self.inner {
Inner::Once { inner: None } => true,
Inner::Once { inner: Some(bytes) } => bytes.is_empty(),
Inner::Dyn { inner: box_body } => match box_body {
#[cfg(feature = "http-body-0-4-x")]
BoxBody::HttpBody04(box_body) => {
use http_body_0_4::Body;
box_body.is_end_stream()
}
#[allow(unreachable_patterns)]
_ => unreachable!(
"enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
),
},
Inner::Taken => true,
}
}
pub(crate) fn bounds_on_remaining_length(&self) -> (u64, Option<u64>) {
match &self.inner {
Inner::Once { inner: None } => (0, Some(0)),
Inner::Once { inner: Some(bytes) } => {
let len = bytes.len() as u64;
(len, Some(len))
}
Inner::Dyn { inner: box_body } => match box_body {
#[cfg(feature = "http-body-0-4-x")]
BoxBody::HttpBody04(box_body) => {
use http_body_0_4::Body;
let hint = box_body.size_hint();
(hint.lower(), hint.upper())
}
#[allow(unreachable_patterns)]
_ => unreachable!(
"enabling `http-body-0-4-x` is the only way to create the `Dyn` variant"
),
},
Inner::Taken => (0, Some(0)),
}
}
/// Given a function to modify an `SdkBody`, run that function against this `SdkBody` before
@ -238,16 +290,6 @@ impl From<Bytes> for SdkBody {
}
}
impl From<hyper::Body> for SdkBody {
fn from(body: hyper::Body) -> Self {
SdkBody {
inner: Inner::Streaming { inner: body },
rebuild: None,
bytes_contents: None,
}
}
}
impl From<Vec<u8>> for SdkBody {
fn from(data: Vec<u8>) -> Self {
Self::from(Bytes::from(data))
@ -266,64 +308,15 @@ impl From<&[u8]> for SdkBody {
}
}
impl http_body::Body for SdkBody {
type Data = Bytes;
type Error = Error;
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
self.poll_inner(cx)
}
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap<HeaderValue>>, Self::Error>> {
Poll::Ready(Ok(None))
}
fn is_end_stream(&self) -> bool {
match &self.inner {
Inner::Once { inner: None } => true,
Inner::Once { inner: Some(bytes) } => bytes.is_empty(),
Inner::Streaming { inner: hyper_body } => hyper_body.is_end_stream(),
Inner::Dyn { inner: box_body } => box_body.is_end_stream(),
Inner::Taken => true,
}
}
fn size_hint(&self) -> SizeHint {
match &self.inner {
Inner::Once { inner: None } => SizeHint::with_exact(0),
Inner::Once { inner: Some(bytes) } => SizeHint::with_exact(bytes.len() as u64),
Inner::Streaming { inner: hyper_body } => hyper_body.size_hint(),
Inner::Dyn { inner: box_body } => box_body.size_hint(),
Inner::Taken => SizeHint::new(),
}
}
}
#[cfg(test)]
mod test {
use crate::body::{BoxBody, SdkBody};
use http_body::Body;
use crate::body::SdkBody;
use std::pin::Pin;
#[test]
fn valid_size_hint() {
assert_eq!(SdkBody::from("hello").size_hint().exact(), Some(5));
assert_eq!(SdkBody::from("").size_hint().exact(), Some(0));
}
#[test]
fn map_preserve_preserves_bytes_hint() {
let initial = SdkBody::from("hello!");
assert_eq!(initial.bytes(), Some(b"hello!".as_slice()));
let new_body = initial.map_preserve_contents(|body| SdkBody::from_dyn(BoxBody::new(body)));
assert_eq!(new_body.bytes(), Some(b"hello!".as_slice()));
assert_eq!(SdkBody::from("hello").content_length(), Some(5));
assert_eq!(SdkBody::from("").content_length(), Some(0));
}
#[allow(clippy::bool_assert_comparison)]
@ -337,9 +330,9 @@ mod test {
async fn http_body_consumes_data() {
let mut body = SdkBody::from("hello!");
let mut body = Pin::new(&mut body);
let data = body.data().await;
let data = body.next().await;
assert!(data.is_some());
let data = body.data().await;
let data = body.next().await;
assert!(data.is_none());
}
@ -348,31 +341,14 @@ mod test {
// Its important to avoid sending empty chunks of data to avoid H2 data frame problems
let mut body = SdkBody::from("");
let mut body = Pin::new(&mut body);
let data = body.data().await;
let data = body.next().await;
assert!(data.is_none());
}
#[test]
fn sdkbody_debug_once() {
let body = SdkBody::from("123");
// actually don't really care what the debug impl is, just that it doesn't crash
let _ = format!("{:?}", body);
}
#[test]
fn sdkbody_debug_dyn() {
let hyper_body = hyper::Body::channel().1;
let body = SdkBody::from_dyn(BoxBody::new(hyper_body.map_err(|e| e.into())));
// actually don't really care what the debug impl is, just that it doesn't crash
let _ = format!("{:?}", body);
}
#[test]
fn sdkbody_debug_hyper() {
let hyper_body = hyper::Body::channel().1;
let body = SdkBody::from(hyper_body);
// actually don't really care what the debug impl is, just that it doesn't crash
let _ = format!("{:?}", body);
assert!(format!("{:?}", body).contains("Once"));
}
#[test]

View File

@ -0,0 +1,91 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
use crate::body::{BoxBody, Error, Inner, SdkBody};
use bytes::Bytes;
use std::pin::Pin;
use std::task::{Context, Poll};
impl SdkBody {
/// Construct an `SdkBody` from a type that implements [`http_body_0_4::Body<Data = Bytes>`](http_body_0_4::Body).
///
/// _Note: This is only available with `http-body-0-4-x` enabled._
pub fn from_body_0_4<T, E>(body: T) -> Self
where
T: http_body_0_4::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
E: Into<Error> + 'static,
{
Self {
inner: Inner::Dyn {
inner: BoxBody::HttpBody04(http_body_0_4::combinators::BoxBody::new(
body.map_err(Into::into),
)),
},
rebuild: None,
bytes_contents: None,
}
}
}
#[cfg(feature = "hyper-0-14-x")]
impl From<hyper_0_14::Body> for SdkBody {
fn from(body: hyper_0_14::Body) -> Self {
SdkBody::from_body_0_4(body)
}
}
impl http_body_0_4::Body for SdkBody {
type Data = Bytes;
type Error = Error;
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
self.poll_next(cx)
}
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap<http::HeaderValue>>, Self::Error>> {
Poll::Ready(Ok(None))
}
fn is_end_stream(&self) -> bool {
self.is_end_stream()
}
fn size_hint(&self) -> http_body_0_4::SizeHint {
let mut result = http_body_0_4::SizeHint::default();
let (lower, upper) = self.bounds_on_remaining_length();
result.set_lower(lower);
if let Some(u) = upper {
result.set_upper(u)
}
result
}
}
#[cfg(test)]
mod tests {
use crate::body::SdkBody;
#[test]
fn map_preserve_preserves_bytes_hint() {
let initial = SdkBody::from("hello!");
assert_eq!(initial.bytes(), Some(b"hello!".as_slice()));
let new_body = initial.map_preserve_contents(|body| SdkBody::from_body_0_4(body));
assert_eq!(new_body.bytes(), Some(b"hello!".as_slice()));
}
#[test]
fn sdkbody_debug_dyn() {
let hyper_body = hyper_0_14::Body::channel().1;
let body = SdkBody::from_body_0_4(hyper_body);
assert!(format!("{:?}", body).contains("BoxBody"));
}
}

View File

@ -78,10 +78,10 @@
//!
//! ### Create a ByteStream from a file
//!
//! _Note: This is only available with `rt-tokio` enabled._
//! _Note: This is only available with `rt-tokio` and `http-body-0-4-x` enabled._
//!
//! ```no_run
//! # #[cfg(feature = "rt-tokio")]
//! # #[cfg(all(feature = "rt-tokio", feature = "http-body-0-4-x"))]
//! # {
//! use aws_smithy_types::byte_stream::ByteStream;
//! use std::path::Path;
@ -90,7 +90,7 @@
//! }
//!
//! async fn bytestream_from_file() -> GetObjectInput {
//! let bytestream = ByteStream::from_path("docs/some-large-file.csv")
//! let bytestream = ByteStream::from_path_body_0_4("docs/some-large-file.csv")
//! .await
//! .expect("valid path");
//! GetObjectInput { body: bytestream }
@ -99,10 +99,10 @@
//! ```
//!
//! If you want more control over how the file is read, such as specifying the size of the buffer used to read the file
//! or the length of the file, use an [`FsBuilder`](crate::byte_stream::FsBuilder).
//! or the length of the file, use an `FsBuilder`.
//!
//! ```no_run
//! # #[cfg(feature = "rt-tokio")]
//! # #[cfg(all(feature = "rt-tokio", feature = "http-body-0-4-x"))]
//! # {
//! use aws_smithy_types::byte_stream::{ByteStream, Length};
//! use std::path::Path;
@ -111,7 +111,7 @@
//! }
//!
//! async fn bytestream_from_file() -> GetObjectInput {
//! let bytestream = ByteStream::read_from().path("docs/some-large-file.csv")
//! let bytestream = ByteStream::read_with_body_0_4_from().path("docs/some-large-file.csv")
//! .buffer_size(32_784)
//! .length(Length::Exact(123_456))
//! .build()
@ -127,7 +127,6 @@ use crate::byte_stream::error::Error;
use bytes::Buf;
use bytes::Bytes;
use bytes_utils::SegmentedBuf;
use http_body::Body;
use pin_project_lite::pin_project;
use std::future::poll_fn;
use std::io::IoSlice;
@ -144,6 +143,12 @@ pub mod error;
#[cfg(feature = "rt-tokio")]
pub use self::bytestream_util::FsBuilder;
/// This module is named after the `http-body` version number since we anticipate
/// needing to provide equivalent functionality for 1.x of that crate in the future.
/// The name has a suffix `_x` to avoid name collision with a third-party `http-body-0-4`.
#[cfg(feature = "http-body-0-4-x")]
pub mod http_body_0_4_x;
pin_project! {
/// Stream of binary data
///
@ -230,33 +235,43 @@ pin_project! {
/// ```
///
/// 2. **From a file**: ByteStreams created from a path can be retried. A new file descriptor will be opened if a retry occurs.
///
/// _Note: The `http-body-0-4-x` feature must be active to call `ByteStream::from_body_0_4`._
///
/// ```no_run
/// #[cfg(feature = "tokio-rt")]
/// #[cfg(all(feature = "tokio-rt", feature = "http-body-0-4-x"))]
/// # {
/// use aws_smithy_types::byte_stream::ByteStream;
/// let stream = ByteStream::from_path("big_file.csv");
/// let stream = ByteStream::from_path_body_0_4("big_file.csv");
/// # }
/// ```
///
/// 3. **From an `SdkBody` directly**: For more advanced / custom use cases, a ByteStream can be created directly
/// from an SdkBody. **When created from an SdkBody, care must be taken to ensure retriability.** An SdkBody is retryable
/// when constructed from in-memory data or when using [`SdkBody::retryable`](crate::body::SdkBody::retryable).
///
/// _Note: The `http-body-0-4-x` feature must be active to construct an `SdkBody` with `from_body_0_4`._
///
/// ```no_run
/// # #[cfg(feature = "http-body-0-4-x")]
/// # {
/// # use hyper_0_14 as hyper;
/// use aws_smithy_types::byte_stream::ByteStream;
/// use aws_smithy_types::body::SdkBody;
/// use bytes::Bytes;
/// let (mut tx, channel_body) = hyper::Body::channel();
/// // this will not be retryable because the SDK has no way to replay this stream
/// let stream = ByteStream::new(SdkBody::from(channel_body));
/// let stream = ByteStream::new(SdkBody::from_body_0_4(channel_body));
/// tx.send_data(Bytes::from_static(b"hello world!"));
/// tx.send_data(Bytes::from_static(b"hello again!"));
/// // NOTE! You must ensure that `tx` is dropped to ensure that EOF is sent
/// # }
/// ```
///
#[derive(Debug)]
pub struct ByteStream {
#[pin]
inner: Inner<SdkBody>
inner: Inner,
}
}
@ -291,8 +306,12 @@ impl ByteStream {
Some(self.inner.next().await?.map_err(Error::streaming))
}
#[cfg(feature = "byte-stream-poll-next")]
/// Attempt to pull out the next value of this stream, returning `None` if the stream is
/// exhausted.
// This should only be used when one needs to implement a trait method like
// `futures_core::stream::Stream::poll_next` on a new-type wrapping a `ByteStream`.
// In general, use the `next` method instead.
pub fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -334,77 +353,6 @@ impl ByteStream {
self.inner.collect().await.map_err(Error::streaming)
}
/// Returns a [`FsBuilder`](crate::byte_stream::FsBuilder), allowing you to build a `ByteStream` with
/// full control over how the file is read (eg. specifying the length of the file or the size of the buffer used to read the file).
/// ```no_run
/// # #[cfg(feature = "rt-tokio")]
/// # {
/// use aws_smithy_types::byte_stream::{ByteStream, Length};
///
/// async fn bytestream_from_file() -> ByteStream {
/// let bytestream = ByteStream::read_from()
/// .path("docs/some-large-file.csv")
/// // Specify the size of the buffer used to read the file (in bytes, default is 4096)
/// .buffer_size(32_784)
/// // Specify the length of the file used (skips an additional call to retrieve the size)
/// .length(Length::Exact(123_456))
/// .build()
/// .await
/// .expect("valid path");
/// bytestream
/// }
/// # }
/// ```
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
pub fn read_from() -> FsBuilder {
FsBuilder::new()
}
/// Create a ByteStream that streams data from the filesystem
///
/// This function creates a retryable ByteStream for a given `path`. The returned ByteStream
/// will provide a size hint when used as an HTTP body. If the request fails, the read will
/// begin again by reloading the file handle.
///
/// ## Warning
/// The contents of the file MUST not change during retries. The length & checksum of the file
/// will be cached. If the contents of the file change, the operation will almost certainly fail.
///
/// Furthermore, a partial write MAY seek in the file and resume from the previous location.
///
/// Note: If you want more control, such as specifying the size of the buffer used to read the file
/// or the length of the file, use a [`FsBuilder`](crate::byte_stream::FsBuilder) as returned
/// from `ByteStream::read_from`
///
/// # Examples
/// ```no_run
/// use aws_smithy_types::byte_stream::ByteStream;
/// use std::path::Path;
/// async fn make_bytestream() -> ByteStream {
/// ByteStream::from_path("docs/rows.csv").await.expect("file should be readable")
/// }
/// ```
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
pub async fn from_path(path: impl AsRef<std::path::Path>) -> Result<Self, Error> {
FsBuilder::new().path(path).build().await
}
/// Create a ByteStream from a file
///
/// NOTE: This will NOT result in a retryable ByteStream. For a ByteStream that can be retried in the case of
/// upstream failures, use [`ByteStream::from_path`](ByteStream::from_path)
#[deprecated(
since = "0.40.0",
note = "Prefer the more extensible ByteStream::read_from() API"
)]
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
pub async fn from_file(file: tokio::fs::File) -> Result<Self, Error> {
FsBuilder::new().file(file).build().await
}
#[cfg(feature = "rt-tokio")]
/// Convert this `ByteStream` into a struct that implements [`AsyncRead`](tokio::io::AsyncRead).
///
@ -433,7 +381,9 @@ impl ByteStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0).poll_next(cx)
Pin::new(&mut self.0.inner)
.poll_next(cx)
.map_err(Error::streaming)
}
}
tokio_util::io::StreamReader::new(FuturesStreamCompatByteStream(self))
@ -479,12 +429,6 @@ impl From<Vec<u8>> for ByteStream {
}
}
impl From<hyper::Body> for ByteStream {
fn from(input: hyper::Body) -> Self {
ByteStream::new(SdkBody::from(input))
}
}
/// Non-contiguous Binary Data Storage
///
/// When data is read from the network, it is read in a sequence of chunks that are not in
@ -543,23 +487,19 @@ impl Buf for AggregatedBytes {
}
pin_project! {
#[derive(Debug, Clone, PartialEq, Eq)]
struct Inner<B> {
#[derive(Debug)]
struct Inner {
#[pin]
body: B,
body: SdkBody,
}
}
impl<B> Inner<B> {
fn new(body: B) -> Self {
impl Inner {
fn new(body: SdkBody) -> Self {
Self { body }
}
async fn next(&mut self) -> Option<Result<Bytes, B::Error>>
where
Self: Unpin,
B: http_body::Body<Data = Bytes>,
{
async fn next(&mut self) -> Option<Result<Bytes, crate::body::Error>> {
let mut me = Pin::new(self);
poll_fn(|cx| me.as_mut().poll_next(cx)).await
}
@ -567,43 +507,34 @@ impl<B> Inner<B> {
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, B::Error>>>
where
B: http_body::Body<Data = Bytes>,
{
self.project().body.poll_data(cx)
) -> Poll<Option<Result<Bytes, crate::body::Error>>> {
self.project().body.poll_next(cx)
}
async fn collect(self) -> Result<AggregatedBytes, B::Error>
where
B: http_body::Body<Data = Bytes>,
{
async fn collect(self) -> Result<AggregatedBytes, crate::body::Error> {
let mut output = SegmentedBuf::new();
let body = self.body;
pin_utils::pin_mut!(body);
while let Some(buf) = body.data().await {
while let Some(buf) = body.next().await {
output.push(buf?);
}
Ok(AggregatedBytes(output))
}
fn size_hint(&self) -> (u64, Option<u64>)
where
B: http_body::Body<Data = Bytes>,
{
let size_hint = http_body::Body::size_hint(&self.body);
(size_hint.lower(), size_hint.upper())
fn size_hint(&self) -> (u64, Option<u64>) {
self.body.bounds_on_remaining_length()
}
}
#[cfg(test)]
mod tests {
use crate::body::SdkBody;
use crate::byte_stream::Inner;
use bytes::Bytes;
#[tokio::test]
async fn read_from_string_body() {
let body = hyper::Body::from("a simple body");
let body = SdkBody::from("a simple body");
assert_eq!(
Inner::new(body)
.collect()
@ -614,63 +545,6 @@ mod tests {
);
}
#[tokio::test]
async fn read_from_channel_body() {
let (mut sender, body) = hyper::Body::channel();
let byte_stream = Inner::new(body);
tokio::spawn(async move {
sender.send_data(Bytes::from("data 1")).await.unwrap();
sender.send_data(Bytes::from("data 2")).await.unwrap();
sender.send_data(Bytes::from("data 3")).await.unwrap();
});
assert_eq!(
byte_stream.collect().await.expect("no errors").into_bytes(),
Bytes::from("data 1data 2data 3")
);
}
#[cfg(feature = "rt-tokio")]
#[tokio::test]
async fn path_based_bytestreams() -> Result<(), Box<dyn std::error::Error>> {
use super::ByteStream;
use bytes::Buf;
use http_body::Body;
use std::io::Write;
use tempfile::NamedTempFile;
let mut file = NamedTempFile::new()?;
for i in 0..10000 {
writeln!(file, "Brian was here. Briefly. {}", i)?;
}
let body = ByteStream::from_path(&file).await?.into_inner();
// assert that a valid size hint is immediately ready
assert_eq!(body.size_hint().exact(), Some(298890));
let mut body1 = body.try_clone().expect("retryable bodies are cloneable");
// read a little bit from one of the clones
let some_data = body1
.data()
.await
.expect("should have some data")
.expect("read should not fail");
assert!(!some_data.is_empty());
// make some more clones
let body2 = body.try_clone().expect("retryable bodies are cloneable");
let body3 = body.try_clone().expect("retryable bodies are cloneable");
let body2 = ByteStream::new(body2).collect().await?.into_bytes();
let body3 = ByteStream::new(body3).collect().await?.into_bytes();
assert_eq!(body2, body3);
assert!(body2.starts_with(b"Brian was here."));
assert!(body2.ends_with(b"9999\n"));
assert_eq!(body2.len(), 298890);
assert_eq!(
ByteStream::new(body1).collect().await?.remaining(),
298890 - some_data.len()
);
Ok(())
}
#[cfg(feature = "rt-tokio")]
#[tokio::test]
async fn bytestream_into_async_read() {

View File

@ -5,14 +5,10 @@
use crate::body::SdkBody;
use crate::byte_stream::{error::Error, error::ErrorKind, ByteStream};
use bytes::Bytes;
use futures_core::ready;
use http::HeaderMap;
use http_body::{Body, SizeHint};
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt, AsyncSeekExt};
use tokio_util::io::ReaderStream;
@ -60,9 +56,11 @@ impl PathBody {
/// Builder for creating [`ByteStreams`](ByteStream) from a file/path, with full control over advanced options.
///
/// _Note: A cargo feature `http-body-0-4-x` should be active to call `ByteStream::read_with_body_0_4_from` in the following example._
///
/// Example usage:
/// ```no_run
/// # #[cfg(feature = "rt-tokio")]
/// # #[cfg(all(feature = "rt-tokio", feature = "http-body-0-4-x"))]
/// # {
/// use aws_smithy_types::byte_stream::{ByteStream, Length};
/// use std::path::Path;
@ -71,7 +69,7 @@ impl PathBody {
/// }
///
/// async fn bytestream_from_file() -> GetObjectInput {
/// let bytestream = ByteStream::read_from()
/// let bytestream = ByteStream::read_with_body_0_4_from()
/// .path("docs/some-large-file.csv")
/// // Specify the size of the buffer used to read the file (in bytes, default is 4096)
/// .buffer_size(32_784)
@ -91,12 +89,12 @@ pub struct FsBuilder {
length: Option<Length>,
buffer_size: usize,
offset: Option<u64>,
sdk_body_creator: SdkBodyCreator,
}
impl Default for FsBuilder {
fn default() -> Self {
Self::new()
}
enum SdkBodyCreator {
#[cfg(feature = "http-body-0-4-x")]
HttpBody04(Arc<dyn Fn(PathBody) -> SdkBody + Send + Sync + 'static>),
}
/// The length (in bytes) to read. Determines whether or not a short read counts as an error.
@ -110,16 +108,22 @@ pub enum Length {
}
impl FsBuilder {
#[cfg(feature = "http-body-0-4-x")]
/// Create a new [`FsBuilder`] (using a default read buffer of 4096 bytes).
///
/// You must then call either [`file`](FsBuilder::file) or [`path`](FsBuilder::path) to specify what to read from.
pub fn new() -> Self {
FsBuilder {
///
/// _Note: This is only available with `http-body-0-4-x` enabled._
pub fn new_with_body_0_4() -> Self {
Self {
buffer_size: DEFAULT_BUFFER_SIZE,
file: None,
length: None,
offset: None,
path: None,
sdk_body_creator: SdkBodyCreator::HttpBody04(Arc::new(|body: PathBody| {
SdkBody::from_body_0_4(body)
})),
}
}
@ -199,12 +203,19 @@ impl FsBuilder {
let body_loader = move || {
// If an offset was provided, seeking will be handled in `PathBody::poll_data` each
// time the file is loaded.
SdkBody::from_dyn(http_body::combinators::BoxBody::new(PathBody::from_path(
path.clone(),
length,
buffer_size,
self.offset,
)))
match &self.sdk_body_creator {
#[cfg(feature = "http-body-0-4-x")]
SdkBodyCreator::HttpBody04(f) => f(PathBody::from_path(
path.clone(),
length,
buffer_size,
self.offset,
)),
#[allow(unreachable_patterns)]
_ => unreachable!(
"`http-body-0-4-x` should've been enabled to create an `FsBuilder`"
),
}
};
Ok(ByteStream::new(SdkBody::retryable(body_loader)))
@ -214,9 +225,14 @@ impl FsBuilder {
let _s = file.seek(io::SeekFrom::Start(offset)).await?;
}
let body = SdkBody::from_dyn(http_body::combinators::BoxBody::new(
PathBody::from_file(file, length, buffer_size),
));
let body = match self.sdk_body_creator {
#[cfg(feature = "http-body-0-4-x")]
SdkBodyCreator::HttpBody04(f) => f(PathBody::from_file(file, length, buffer_size)),
#[allow(unreachable_patterns)]
_ => unreachable!(
"`http-body-0-4-x` should've been enabled to create an `FsBuilder`"
),
};
Ok(ByteStream::new(body))
} else {
@ -240,14 +256,16 @@ enum State {
Loaded(ReaderStream<io::Take<File>>),
}
impl Body for PathBody {
type Data = Bytes;
#[cfg(feature = "http-body-0-4-x")]
impl http_body_0_4::Body for PathBody {
type Data = bytes::Bytes;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
fn poll_data(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<Self::Data, Self::Error>>> {
use std::task::Poll;
let offset = self.offset.unwrap_or(DEFAULT_OFFSET);
loop {
match self.state {
@ -264,7 +282,7 @@ impl Body for PathBody {
}));
}
State::Loading(ref mut future) => {
match ready!(Pin::new(future).poll(cx)) {
match futures_core::ready!(Pin::new(future).poll(cx)) {
Ok(file) => {
self.state = State::Loaded(ReaderStream::with_capacity(
file.take(self.length),
@ -276,7 +294,7 @@ impl Body for PathBody {
}
State::Loaded(ref mut stream) => {
use futures_core::Stream;
return match ready!(Pin::new(stream).poll_next(cx)) {
return match futures_core::ready!(std::pin::Pin::new(stream).poll_next(cx)) {
Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))),
None => Poll::Ready(None),
Some(Err(e)) => Poll::Ready(Some(Err(e.into()))),
@ -287,10 +305,10 @@ impl Body for PathBody {
}
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<Option<http::HeaderMap>, Self::Error>> {
std::task::Poll::Ready(Ok(None))
}
fn is_end_stream(&self) -> bool {
@ -298,17 +316,17 @@ impl Body for PathBody {
self.length == 0
}
fn size_hint(&self) -> SizeHint {
SizeHint::with_exact(self.length)
fn size_hint(&self) -> http_body_0_4::SizeHint {
http_body_0_4::SizeHint::with_exact(self.length)
}
}
#[cfg(feature = "http-body-0-4-x")]
#[cfg(test)]
mod test {
use super::FsBuilder;
use crate::byte_stream::{ByteStream, Length};
use bytes::Buf;
use http_body::Body;
use std::io::Write;
use tempfile::NamedTempFile;
@ -325,7 +343,7 @@ mod test {
.expect("file metadata is accessible")
.len();
let body = FsBuilder::new()
let body = FsBuilder::new_with_body_0_4()
.path(&file)
.buffer_size(16384)
.length(Length::Exact(file_length))
@ -335,12 +353,12 @@ mod test {
.into_inner();
// assert that the specified length is used as size hint
assert_eq!(body.size_hint().exact(), Some(file_length));
assert_eq!(body.content_length(), Some(file_length));
let mut body1 = body.try_clone().expect("retryable bodies are cloneable");
// read a little bit from one of the clones
let some_data = body1
.data()
.next()
.await
.expect("should have some data")
.expect("read should not fail");
@ -364,7 +382,7 @@ mod test {
// Ensure that the file was written to
file.flush().expect("flushing is OK");
let body = FsBuilder::new()
let body = FsBuilder::new_with_body_0_4()
.path(&file)
// The file is longer than 1 byte, let's see if this is used to generate the size hint
.length(Length::Exact(1))
@ -373,7 +391,7 @@ mod test {
.unwrap()
.into_inner();
assert_eq!(body.size_hint().exact(), Some(1));
assert_eq!(body.content_length(), Some(1));
}
#[tokio::test]
@ -388,7 +406,7 @@ mod test {
// Ensure that the file was written to
file.flush().expect("flushing is OK");
let body = FsBuilder::new()
let body = FsBuilder::new_with_body_0_4()
.path(&file)
// We're going to read line 0 only
.length(Length::Exact(line_0.len() as u64))
@ -412,7 +430,7 @@ mod test {
// Ensure that the file was written to
file.flush().expect("flushing is OK");
assert!(FsBuilder::new()
assert!(FsBuilder::new_with_body_0_4()
.path(&file)
// The file is 30 bytes so this is fine
.length(Length::Exact(29))
@ -420,7 +438,7 @@ mod test {
.await
.is_ok());
assert!(FsBuilder::new()
assert!(FsBuilder::new_with_body_0_4()
.path(&file)
// The file is 30 bytes so this is fine
.length(Length::Exact(30))
@ -428,7 +446,7 @@ mod test {
.await
.is_ok());
assert!(FsBuilder::new()
assert!(FsBuilder::new_with_body_0_4()
.path(&file)
// Larger than 30 bytes, this will cause an error
.length(Length::Exact(31))
@ -449,7 +467,7 @@ mod test {
// Ensure that the file was written to
file.flush().expect("flushing is OK");
let body = FsBuilder::new()
let body = FsBuilder::new_with_body_0_4()
.path(&file)
// We're going to skip the first line by using offset
.offset(line_0.len() as u64)
@ -477,7 +495,7 @@ mod test {
// Ensure that the file was written to
file.flush().expect("flushing is OK");
let body = FsBuilder::new()
let body = FsBuilder::new_with_body_0_4()
.path(&file)
// We're going to skip line 0 by using offset
.offset(line_0.len() as u64)
@ -506,7 +524,7 @@ mod test {
file.flush().expect("flushing is OK");
assert_eq!(
FsBuilder::new()
FsBuilder::new_with_body_0_4()
.path(&file)
// We're going to skip all file contents by setting an offset
// much larger than the file size
@ -531,7 +549,7 @@ mod test {
// Ensure that the file was written to
file.flush().expect("flushing is OK");
let body = FsBuilder::new()
let body = FsBuilder::new_with_body_0_4()
.path(&file)
.length(Length::UpTo(9000))
.build()
@ -583,7 +601,7 @@ mod test {
chunk_size
};
let byte_stream = FsBuilder::new()
let byte_stream = FsBuilder::new_with_body_0_4()
.path(&file_path)
.offset(i * chunk_size)
.length(Length::Exact(length))

View File

@ -0,0 +1,172 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
use crate::body::SdkBody;
use crate::byte_stream::ByteStream;
use bytes::Bytes;
impl ByteStream {
/// Construct a `ByteStream` from a type that implements [`http_body_0_4::Body<Data = Bytes>`](http_body_0_4::Body).
///
/// _Note: This is only available with `http-body-0-4-x` enabled._
pub fn from_body_0_4<T, E>(body: T) -> Self
where
T: http_body_0_4::Body<Data = Bytes, Error = E> + Send + Sync + 'static,
E: Into<crate::body::Error> + 'static,
{
ByteStream::new(SdkBody::from_body_0_4(body))
}
/// Returns a [`FsBuilder`](crate::byte_stream::FsBuilder), allowing you to build a `ByteStream` with
/// full control over how the file is read (eg. specifying the length of the file or the size of the buffer used to read the file).
/// ```no_run
/// # #[cfg(all(feature = "rt-tokio", feature = "http-body-0-4-x"))]
/// # {
/// use aws_smithy_types::byte_stream::{ByteStream, Length};
///
/// async fn bytestream_from_file() -> ByteStream {
/// let bytestream = ByteStream::read_with_body_0_4_from()
/// .path("docs/some-large-file.csv")
/// // Specify the size of the buffer used to read the file (in bytes, default is 4096)
/// .buffer_size(32_784)
/// // Specify the length of the file used (skips an additional call to retrieve the size)
/// .length(Length::Exact(123_456))
/// .build()
/// .await
/// .expect("valid path");
/// bytestream
/// }
/// # }
/// ```
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
pub fn read_with_body_0_4_from() -> crate::byte_stream::FsBuilder {
crate::byte_stream::FsBuilder::new_with_body_0_4()
}
/// Create a ByteStream that streams data from the filesystem
///
/// This function creates a retryable ByteStream for a given `path`. The returned ByteStream
/// will provide a size hint when used as an HTTP body. If the request fails, the read will
/// begin again by reloading the file handle.
///
/// ## Warning
/// The contents of the file MUST not change during retries. The length & checksum of the file
/// will be cached. If the contents of the file change, the operation will almost certainly fail.
///
/// Furthermore, a partial write MAY seek in the file and resume from the previous location.
///
/// Note: If you want more control, such as specifying the size of the buffer used to read the file
/// or the length of the file, use a [`FsBuilder`](crate::byte_stream::FsBuilder) as returned
/// from `ByteStream::read_with_body_0_4_from`
///
/// # Examples
/// ```no_run
/// use aws_smithy_types::byte_stream::ByteStream;
/// use std::path::Path;
/// async fn make_bytestream() -> ByteStream {
/// ByteStream::from_path_body_0_4("docs/rows.csv").await.expect("file should be readable")
/// }
/// ```
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
pub async fn from_path_body_0_4(
path: impl AsRef<std::path::Path>,
) -> Result<Self, crate::byte_stream::error::Error> {
crate::byte_stream::FsBuilder::new_with_body_0_4()
.path(path)
.build()
.await
}
/// Create a ByteStream from a file
///
/// NOTE: This will NOT result in a retryable ByteStream. For a ByteStream that can be retried in the case of
/// upstream failures, use [`ByteStream::from_path_body_0_4`](ByteStream::from_path_body_0_4)
#[deprecated(
since = "0.40.0",
note = "Prefer the more extensible ByteStream::read_from() API"
)]
#[cfg(feature = "rt-tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))]
pub async fn from_file_body_0_4(
file: tokio::fs::File,
) -> Result<Self, crate::byte_stream::error::Error> {
crate::byte_stream::FsBuilder::new_with_body_0_4()
.file(file)
.build()
.await
}
}
#[cfg(feature = "hyper-0-14-x")]
impl From<hyper_0_14::Body> for ByteStream {
fn from(input: hyper_0_14::Body) -> Self {
ByteStream::new(SdkBody::from_body_0_4(input))
}
}
#[cfg(test)]
mod tests {
use crate::body::SdkBody;
use crate::byte_stream::Inner;
use bytes::Bytes;
#[tokio::test]
async fn read_from_channel_body() {
let (mut sender, body) = hyper_0_14::Body::channel();
let byte_stream = Inner::new(SdkBody::from_body_0_4(body));
tokio::spawn(async move {
sender.send_data(Bytes::from("data 1")).await.unwrap();
sender.send_data(Bytes::from("data 2")).await.unwrap();
sender.send_data(Bytes::from("data 3")).await.unwrap();
});
assert_eq!(
byte_stream.collect().await.expect("no errors").into_bytes(),
Bytes::from("data 1data 2data 3")
);
}
#[cfg(feature = "rt-tokio")]
#[tokio::test]
async fn path_based_bytestreams() -> Result<(), Box<dyn std::error::Error>> {
use super::ByteStream;
use bytes::Buf;
use std::io::Write;
use tempfile::NamedTempFile;
let mut file = NamedTempFile::new()?;
for i in 0..10000 {
writeln!(file, "Brian was here. Briefly. {}", i)?;
}
let body = ByteStream::from_path_body_0_4(&file).await?.into_inner();
// assert that a valid size hint is immediately ready
assert_eq!(body.content_length(), Some(298890));
let mut body1 = body.try_clone().expect("retryable bodies are cloneable");
// read a little bit from one of the clones
let some_data = body1
.next()
.await
.expect("should have some data")
.expect("read should not fail");
assert!(!some_data.is_empty());
// make some more clones
let body2 = body.try_clone().expect("retryable bodies are cloneable");
let body3 = body.try_clone().expect("retryable bodies are cloneable");
let body2 = ByteStream::new(body2).collect().await?.into_bytes();
let body3 = ByteStream::new(body3).collect().await?.into_bytes();
assert_eq!(body2, body3);
assert!(body2.starts_with(b"Brian was here."));
assert!(body2.ends_with(b"9999\n"));
assert_eq!(body2.len(), 298890);
assert_eq!(
ByteStream::new(body1).collect().await?.remaining(),
298890 - some_data.len()
);
Ok(())
}
}