Remove boxing from `StreamBody` (#241)

I just had a thought: Why should `response::Headers` be generic, but
`body::StreamBody` should not? `StreamBody` previously boxed the stream
to erase the generics. So we had `response::Headers<T>` but
`body::StreamBody`, without generics.

After thinking about it I think it actually makes sense for responses to
remain generic because you're able to use `impl IntoResponse` so you
don't have to name the generics.

Whereas in the case of `BodyStream` (an extractor) you cannot use `impl Trait`
so it makes sense to box the inner body to make the type easier to name. Besides,
`BodyStream` is mostly useful when the request body isn't `hyper::Body`, as
that already implements `Stream`.
This commit is contained in:
David Pedersen 2021-08-22 22:03:56 +02:00 committed by GitHub
parent b75c34b821
commit a753eac23f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 113 additions and 58 deletions

View File

@ -32,6 +32,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **added:** Add `OriginalUri` for extracting original request URI in nested services ([#197](https://github.com/tokio-rs/axum/pull/197))
- **added:** Implement `FromRequest` for `http::Extensions` ([#169](https://github.com/tokio-rs/axum/pull/169))
- **added:** Make `RequestParts::{new, try_into_request}` public so extractors can be used outside axum ([#194](https://github.com/tokio-rs/axum/pull/194))
- **added:** Implement `FromRequest` for `axum::body::Body` ([#241](https://github.com/tokio-rs/axum/pull/241))
- **changed:** Removed `extract::UrlParams` and `extract::UrlParamsMap`. Use `extract::Path` instead ([#154](https://github.com/tokio-rs/axum/pull/154))
- **changed:** `extractor_middleware` now requires `RequestBody: Default` ([#167](https://github.com/tokio-rs/axum/pull/167))
- **changed:** Convert `RequestAlreadyExtracted` to an enum with each possible error variant ([#167](https://github.com/tokio-rs/axum/pull/167))

View File

@ -1,9 +1,12 @@
use crate::{BoxError, Error};
use crate::{response::IntoResponse, BoxError, Error};
use bytes::Bytes;
use futures_util::stream::{self, Stream, TryStreamExt};
use http::HeaderMap;
use futures_util::{
ready,
stream::{self, TryStream},
};
use http::{HeaderMap, Response};
use http_body::Body;
use std::convert::Infallible;
use pin_project_lite::pin_project;
use std::{
fmt,
pin::Pin,
@ -11,80 +14,108 @@ use std::{
};
use sync_wrapper::SyncWrapper;
/// An [`http_body::Body`] created from a [`Stream`].
///
/// # Example
///
/// ```
/// use axum::{
/// Router,
/// handler::get,
/// body::StreamBody,
/// };
/// use futures::stream;
///
/// async fn handler() -> StreamBody {
/// let chunks: Vec<Result<_, std::io::Error>> = vec![
/// Ok("Hello,"),
/// Ok(" "),
/// Ok("world!"),
/// ];
/// let stream = stream::iter(chunks);
/// StreamBody::new(stream)
/// }
///
/// let app = Router::new().route("/", get(handler));
/// # async {
/// # axum::Server::bind(&"".parse().unwrap()).serve(app.into_make_service()).await.unwrap();
/// # };
/// ```
///
/// [`Stream`]: futures_util::stream::Stream
// this should probably be extracted to `http_body`, eventually...
pub struct StreamBody {
stream: SyncWrapper<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>>,
pin_project! {
/// An [`http_body::Body`] created from a [`Stream`].
///
/// If purpose of this type is to be used in responses. If you want to
/// extract the request body as a stream consider using
/// [`extract::BodyStream`].
///
/// # Example
///
/// ```
/// use axum::{
/// Router,
/// handler::get,
/// body::StreamBody,
/// response::IntoResponse,
/// };
/// use futures::stream;
///
/// async fn handler() -> impl IntoResponse {
/// let chunks: Vec<Result<_, std::io::Error>> = vec![
/// Ok("Hello,"),
/// Ok(" "),
/// Ok("world!"),
/// ];
/// let stream = stream::iter(chunks);
/// StreamBody::new(stream)
/// }
///
/// let app = Router::new().route("/", get(handler));
/// # async {
/// # axum::Server::bind(&"".parse().unwrap()).serve(app.into_make_service()).await.unwrap();
/// # };
/// ```
///
/// [`Stream`]: futures_util::stream::Stream
pub struct StreamBody<S> {
#[pin]
stream: SyncWrapper<S>,
}
}
impl StreamBody {
impl<S> StreamBody<S> {
/// Create a new `StreamBody` from a [`Stream`].
///
/// [`Stream`]: futures_util::stream::Stream
pub fn new<S, T, E>(stream: S) -> Self
pub fn new(stream: S) -> Self
where
S: Stream<Item = Result<T, E>> + Send + 'static,
T: Into<Bytes> + 'static,
E: Into<BoxError> + 'static,
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
let stream = stream
.map_ok(Into::into)
.map_err(|err| Error::new(err.into()));
Self {
stream: SyncWrapper::new(Box::pin(stream)),
stream: SyncWrapper::new(stream),
}
}
}
impl Default for StreamBody {
fn default() -> Self {
Self::new(stream::empty::<Result<Bytes, Infallible>>())
impl<S> IntoResponse for StreamBody<S>
where
S: TryStream + Send + 'static,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
type Body = Self;
type BodyError = Error;
fn into_response(self) -> Response<Self> {
Response::new(self)
}
}
impl fmt::Debug for StreamBody {
impl Default for StreamBody<futures_util::stream::Empty<Result<Bytes, Error>>> {
fn default() -> Self {
Self::new(stream::empty())
}
}
impl<S> fmt::Debug for StreamBody<S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("StreamBody").finish()
}
}
impl Body for StreamBody {
impl<S> Body for StreamBody<S>
where
S: TryStream,
S::Ok: Into<Bytes>,
S::Error: Into<BoxError>,
{
type Data = Bytes;
type Error = Error;
fn poll_data(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
Pin::new(self.stream.get_mut()).poll_next(cx)
let stream = self.project().stream.get_pin_mut();
match ready!(stream.try_poll_next(cx)) {
Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk.into()))),
Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
None => Poll::Ready(None),
}
}
fn poll_trailers(
@ -97,7 +128,11 @@ impl Body for StreamBody {
#[test]
fn stream_body_traits() {
crate::tests::assert_send::<StreamBody>();
crate::tests::assert_sync::<StreamBody>();
crate::tests::assert_unpin::<StreamBody>();
use futures_util::stream::Empty;
type EmptyStream = StreamBody<Empty<Result<Bytes, BoxError>>>;
crate::tests::assert_send::<EmptyStream>();
crate::tests::assert_sync::<EmptyStream>();
crate::tests::assert_unpin::<EmptyStream>();
}

View File

@ -1,5 +1,5 @@
use super::{rejection::*, take_body, Extension, FromRequest, RequestParts};
use crate::{BoxError, Error};
use crate::{body::Body, BoxError, Error};
use async_trait::async_trait;
use bytes::Bytes;
use futures_util::stream::Stream;
@ -171,6 +171,11 @@ where
/// Extractor that extracts the request body as a [`Stream`].
///
/// Note if your request body is [`body::Body`] you can extract that directly
/// and since it already implements [`Stream`] you don't need this type. The
/// purpose of this type is to extract other types of request bodies as a
/// [`Stream`].
///
/// # Example
///
/// ```rust,no_run
@ -194,6 +199,7 @@ where
/// ```
///
/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
/// [`body::Body`]: crate::body::Body
pub struct BodyStream(
SyncWrapper<Pin<Box<dyn http_body::Body<Data = Bytes, Error = Error> + Send + 'static>>>,
);
@ -238,6 +244,9 @@ fn body_stream_traits() {
/// Extractor that extracts the raw request body.
///
/// Note that [`body::Body`] can be extracted directly. This purpose of this
/// type is to extract other types of request bodies.
///
/// # Example
///
/// ```rust,no_run
@ -257,8 +266,10 @@ fn body_stream_traits() {
/// # axum::Server::bind(&"".parse().unwrap()).serve(app.into_make_service()).await.unwrap();
/// # };
/// ```
///
/// [`body::Body`]: crate::body::Body
#[derive(Debug, Default, Clone)]
pub struct RawBody<B = crate::body::Body>(pub B);
pub struct RawBody<B = Body>(pub B);
#[async_trait]
impl<B> FromRequest<B> for Bytes
@ -280,6 +291,15 @@ where
}
}
#[async_trait]
impl FromRequest<Body> for Body {
type Rejection = BodyAlreadyExtracted;
async fn from_request(req: &mut RequestParts<Body>) -> Result<Self, Self::Rejection> {
req.take_body().ok_or(BodyAlreadyExtracted)
}
}
#[async_trait]
impl<B> FromRequest<B> for String
where

View File

@ -198,7 +198,6 @@ macro_rules! impl_into_response_for_body {
impl_into_response_for_body!(hyper::Body);
impl_into_response_for_body!(Full<Bytes>);
impl_into_response_for_body!(Empty<Bytes>);
impl_into_response_for_body!(crate::body::StreamBody);
impl<E> IntoResponse for http_body::combinators::BoxBody<Bytes, E>
where