mirror of https://github.com/smithy-lang/smithy-rs
Establish `ConnectorError` for errors from SmithyConnector (#744)
* Introduce ClientError to differentiate between different error types coming from HTTP connectors * Add test * Fix tests * Fix the DVR connections * Fix event-stream client * clippy cleanups * Fix poorly named variables in downcast error * Update docs * Rename ClientError to ConnectorError * Fix more incorrectly named errors * Update rust-runtime/smithy-http/src/result.rs Co-authored-by: John DiSanti <jdisanti@amazon.com> * Update changelog * Update SDK_CHANGELOG.md Co-authored-by: John DiSanti <jdisanti@amazon.com>
This commit is contained in:
parent
6cc1c514d1
commit
bef5382650
|
@ -12,12 +12,15 @@ v0.25 (October 7th, 2021)
|
|||
to rely on composition instead of inheritance
|
||||
- `HttpProtocolTestGenerator` became `ProtocolTestGenerator`
|
||||
- `Protocol` moved into `software.amazon.smithy.rust.codegen.smithy.protocols`
|
||||
- `SmithyConnector` and `DynConnector` now return `ConnectorError` instead of `Box<dyn Error>`. If you have written a custom connector, it will need to be updated to return the new error type. (#744)
|
||||
- The `DispatchError` variant of `SdkError` now contains `ConnectorError` instead of `Box<dyn Error>` (#744).
|
||||
|
||||
**New this week**
|
||||
- :bug: Fix an issue where `smithy-xml` may have generated invalid XML (smithy-rs#719)
|
||||
- :bug: Fix error when receiving empty event stream messages (smithy-rs#736)
|
||||
- :bug: Fix bug in event stream receiver that could cause the last events in the response stream to be lost (smithy-rs#736)
|
||||
|
||||
- Add connect & HTTP read timeouts to IMDS, defaulting to 1 second
|
||||
- IO and timeout errors from Hyper can now be retried (#744)
|
||||
|
||||
**Contributors**
|
||||
|
||||
|
|
|
@ -3,6 +3,8 @@ vNext (Month Day, Year)
|
|||
|
||||
**Breaking changes**
|
||||
- :warning: MSRV increased from 1.52.1 to 1.53.0 per our 3-behind MSRV policy.
|
||||
- `SmithyConnector` and `DynConnector` now return `ConnectorError` instead of `Box<dyn Error>`. If you have written a custom connector, it will need to be updated to return the new error type. (#744)
|
||||
- The `DispatchError` variant of `SdkError` now contains `ConnectorError` instead of `Box<dyn Error>` (#744).
|
||||
|
||||
**Tasks to cut release**
|
||||
- [ ] Bump MSRV on aws-sdk-rust, then delete this line.
|
||||
|
@ -11,6 +13,8 @@ vNext (Month Day, Year)
|
|||
|
||||
- :tada: Add presigned request support and examples for S3 GetObject and PutObject (smithy-rs#731, aws-sdk-rust#139)
|
||||
- :tada: Add presigned request support and example for Polly SynthesizeSpeech (smithy-rs#735, aws-sdk-rust#139)
|
||||
- Add connect & HTTP read timeouts to IMDS, defaulting to 1 second
|
||||
- IO and timeout errors from Hyper can now be retried (#744)
|
||||
- :bug: Fix error when receiving `Cont` event from S3 SelectObjectContent (smithy-rs#736)
|
||||
- :bug: Fix bug in event stream receiver that could cause the last events in the response stream to be lost when using S3 SelectObjectContent (smithy-rs#736)
|
||||
- Updated EC2 code examples to include readme; refactored operations from main into separate functions.
|
||||
|
|
|
@ -61,6 +61,15 @@ where
|
|||
let (err, response) = match err {
|
||||
Ok(_) => return RetryKind::NotRetryable,
|
||||
Err(SdkError::ServiceError { err, raw }) => (err, raw),
|
||||
Err(SdkError::DispatchFailure(err)) => {
|
||||
return if err.is_timeout() || err.is_io() {
|
||||
RetryKind::Error(ErrorKind::TransientError)
|
||||
} else if let Some(ek) = err.is_other() {
|
||||
RetryKind::Error(ek)
|
||||
} else {
|
||||
RetryKind::NotRetryable
|
||||
}
|
||||
}
|
||||
Err(_) => return RetryKind::NotRetryable,
|
||||
};
|
||||
if let Some(retry_after_delay) = response
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
//! [do not need to be repeated]: https://github.com/rust-lang/rust/issues/20671#issuecomment-529752828
|
||||
|
||||
use crate::*;
|
||||
use smithy_http::result::ConnectorError;
|
||||
|
||||
/// A service that has parsed a raw Smithy response.
|
||||
pub type Parsed<S, O, Retry> = smithy_http_tower::parse_response::ParseResponseService<S, O, Retry>;
|
||||
|
@ -38,7 +39,7 @@ pub trait SmithyConnector:
|
|||
/// Forwarding type to `<Self as Service>::Error` for bound inference.
|
||||
///
|
||||
/// See module-level docs for details.
|
||||
type Error: Into<BoxError> + Send + Sync + 'static;
|
||||
type Error: Into<ConnectorError> + Send + Sync + 'static;
|
||||
|
||||
/// Forwarding type to `<Self as Service>::Future` for bound inference.
|
||||
///
|
||||
|
@ -53,7 +54,7 @@ where
|
|||
+ Sync
|
||||
+ Clone
|
||||
+ 'static,
|
||||
T::Error: Into<BoxError> + Send + Sync + 'static,
|
||||
T::Error: Into<ConnectorError> + Send + Sync + 'static,
|
||||
T::Future: Send + 'static,
|
||||
{
|
||||
type Error = T::Error;
|
||||
|
|
|
@ -3,8 +3,9 @@
|
|||
* SPDX-License-Identifier: Apache-2.0.
|
||||
*/
|
||||
|
||||
use crate::{bounds, erase, retry, BoxError, Client};
|
||||
use crate::{bounds, erase, retry, Client};
|
||||
use smithy_http::body::SdkBody;
|
||||
use smithy_http::result::ConnectorError;
|
||||
|
||||
/// A builder that provides more customization options when constructing a [`Client`].
|
||||
///
|
||||
|
@ -81,7 +82,7 @@ impl<M, R> Builder<(), M, R> {
|
|||
pub fn connector_fn<F, FF>(self, map: F) -> Builder<tower::util::ServiceFn<F>, M, R>
|
||||
where
|
||||
F: Fn(http::Request<SdkBody>) -> FF + Send,
|
||||
FF: std::future::Future<Output = Result<http::Response<SdkBody>, BoxError>>,
|
||||
FF: std::future::Future<Output = Result<http::Response<SdkBody>, ConnectorError>>,
|
||||
// NOTE: The extra bound here is to help the type checker give better errors earlier.
|
||||
tower::util::ServiceFn<F>: bounds::SmithyConnector,
|
||||
{
|
||||
|
|
|
@ -11,13 +11,14 @@ use std::task::{Context, Poll};
|
|||
|
||||
use http_body::Body;
|
||||
use tokio::task::JoinHandle;
|
||||
use tower::{BoxError, Service};
|
||||
use tower::Service;
|
||||
|
||||
use smithy_http::body::SdkBody;
|
||||
|
||||
use crate::dvr::{self, Action, BodyData, ConnectionId, Direction, Error, NetworkTraffic, Version};
|
||||
|
||||
use super::Event;
|
||||
use std::fmt::Display;
|
||||
|
||||
/// Recording Connection Wrapper
|
||||
///
|
||||
|
@ -138,18 +139,18 @@ where
|
|||
+ Send
|
||||
+ Clone
|
||||
+ 'static,
|
||||
S::Error: Into<BoxError> + Send + Sync + 'static,
|
||||
S::Error: Display + Send + Sync + 'static,
|
||||
S::Future: Send + 'static,
|
||||
ResponseBody: Into<SdkBody>,
|
||||
{
|
||||
type Response = http::Response<SdkBody>;
|
||||
type Error = BoxError;
|
||||
type Error = S::Error;
|
||||
#[allow(clippy::type_complexity)]
|
||||
type Future =
|
||||
Pin<Box<dyn Future<Output = Result<http::Response<SdkBody>, Self::Error>> + Send>>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready(cx).map_err(|err| err.into())
|
||||
self.inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, mut req: http::Request<SdkBody>) -> Self::Future {
|
||||
|
@ -200,7 +201,6 @@ where
|
|||
Ok(resp)
|
||||
}
|
||||
Err(e) => {
|
||||
let e = e.into();
|
||||
events.lock().unwrap().push(Event {
|
||||
connection_id: event_id,
|
||||
action: Action::Response {
|
||||
|
|
|
@ -8,6 +8,7 @@ use bytes::{Bytes, BytesMut};
|
|||
use http::{Request, Version};
|
||||
use http_body::Body;
|
||||
use smithy_http::body::SdkBody;
|
||||
use smithy_http::result::ConnectorError;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::error::Error;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
@ -211,7 +212,7 @@ fn convert_version(version: &str) -> Version {
|
|||
|
||||
impl tower::Service<http::Request<SdkBody>> for ReplayingConnection {
|
||||
type Response = http::Response<SdkBody>;
|
||||
type Error = Box<dyn Error + Send + Sync + 'static>;
|
||||
type Error = ConnectorError;
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
type Future = std::pin::Pin<
|
||||
|
@ -227,11 +228,10 @@ impl tower::Service<http::Request<SdkBody>> for ReplayingConnection {
|
|||
let mut events = match self.live_events.lock().unwrap().remove(&event_id) {
|
||||
Some(traffic) => traffic,
|
||||
None => {
|
||||
return Box::pin(std::future::ready(Err(format!(
|
||||
"no data for event {}. req: {:?}",
|
||||
event_id.0, req
|
||||
)
|
||||
.into())))
|
||||
return Box::pin(std::future::ready(Err(ConnectorError::other(
|
||||
format!("no data for event {}. req: {:?}", event_id.0, req).into(),
|
||||
None,
|
||||
))))
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -265,7 +265,7 @@ impl tower::Service<http::Request<SdkBody>> for ReplayingConnection {
|
|||
Action::Request { .. } => panic!("invalid"),
|
||||
Action::Response {
|
||||
response: Err(error),
|
||||
} => break Err(error.0.into()),
|
||||
} => break Err(ConnectorError::other(error.0.into(), None)),
|
||||
Action::Response {
|
||||
response: Ok(response),
|
||||
} => {
|
||||
|
|
|
@ -11,8 +11,9 @@
|
|||
pub mod boxclone;
|
||||
use boxclone::*;
|
||||
|
||||
use crate::{bounds, retry, BoxError, Client};
|
||||
use crate::{bounds, retry, Client};
|
||||
use smithy_http::body::SdkBody;
|
||||
use smithy_http::result::ConnectorError;
|
||||
use std::fmt;
|
||||
use tower::{Layer, Service, ServiceExt};
|
||||
|
||||
|
@ -135,7 +136,9 @@ where
|
|||
/// to matter in all but the highest-performance settings.
|
||||
#[non_exhaustive]
|
||||
#[derive(Clone)]
|
||||
pub struct DynConnector(BoxCloneService<http::Request<SdkBody>, http::Response<SdkBody>, BoxError>);
|
||||
pub struct DynConnector(
|
||||
BoxCloneService<http::Request<SdkBody>, http::Response<SdkBody>, ConnectorError>,
|
||||
);
|
||||
|
||||
impl fmt::Debug for DynConnector {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
|
@ -148,7 +151,7 @@ impl DynConnector {
|
|||
pub fn new<E, C>(connector: C) -> Self
|
||||
where
|
||||
C: bounds::SmithyConnector<Error = E> + Send + 'static,
|
||||
E: Into<BoxError>,
|
||||
E: Into<ConnectorError>,
|
||||
{
|
||||
Self(BoxCloneService::new(connector.map_err(|e| e.into())))
|
||||
}
|
||||
|
@ -156,7 +159,7 @@ impl DynConnector {
|
|||
|
||||
impl Service<http::Request<SdkBody>> for DynConnector {
|
||||
type Response = http::Response<SdkBody>;
|
||||
type Error = BoxError;
|
||||
type Error = ConnectorError;
|
||||
type Future = BoxFuture<Self::Response, Self::Error>;
|
||||
|
||||
fn poll_ready(
|
||||
|
|
|
@ -9,14 +9,17 @@ use http::Uri;
|
|||
use hyper::client::connect::Connection;
|
||||
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tower::Service;
|
||||
use tower::{BoxError, Service};
|
||||
|
||||
use smithy_async::rt::sleep::{default_async_sleep, AsyncSleep};
|
||||
use smithy_http::body::SdkBody;
|
||||
use smithy_http::result::ConnectorError;
|
||||
pub use smithy_http::result::{SdkError, SdkSuccess};
|
||||
use std::error::Error;
|
||||
|
||||
use crate::hyper_impls::timeout_middleware::{ConnectTimeout, HttpReadTimeout};
|
||||
use crate::{timeout, BoxError, Builder as ClientBuilder};
|
||||
use crate::hyper_impls::timeout_middleware::{ConnectTimeout, HttpReadTimeout, TimeoutError};
|
||||
use crate::{timeout, Builder as ClientBuilder};
|
||||
use smithy_async::future::timeout::TimedOutError;
|
||||
|
||||
/// Adapter from a [`hyper::Client`] to a connector usable by a [`Client`](crate::Client).
|
||||
///
|
||||
|
@ -34,7 +37,7 @@ where
|
|||
C::Error: Into<BoxError>,
|
||||
{
|
||||
type Response = http::Response<SdkBody>;
|
||||
type Error = BoxError;
|
||||
type Error = ConnectorError;
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
type Future = std::pin::Pin<
|
||||
|
@ -45,12 +48,12 @@ where
|
|||
&mut self,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Result<(), Self::Error>> {
|
||||
self.0.poll_ready(cx)
|
||||
self.0.poll_ready(cx).map_err(downcast_error)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: http::Request<SdkBody>) -> Self::Future {
|
||||
let fut = self.0.call(req);
|
||||
Box::pin(async move { Ok(fut.await?.map(SdkBody::from)) })
|
||||
Box::pin(async move { Ok(fut.await.map_err(downcast_error)?.map(SdkBody::from)) })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -64,6 +67,53 @@ impl HyperAdapter<()> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Downcast errors coming out of hyper into an appropriate `ConnectorError`
|
||||
fn downcast_error(err: BoxError) -> ConnectorError {
|
||||
// is a `TimedOutError` (from smithy_async::timeout) in the chain? if it is, this is a timeout
|
||||
if find_source::<TimedOutError>(err.as_ref()).is_some() {
|
||||
return ConnectorError::timeout(err);
|
||||
}
|
||||
// is the top of chain error actually already a `ConnectorError`? return that directly
|
||||
let err = match err.downcast::<ConnectorError>() {
|
||||
Ok(connector_error) => return *connector_error,
|
||||
Err(box_error) => box_error,
|
||||
};
|
||||
// generally, the top of chain will probably be a hyper error. Go through a set of hyper specific
|
||||
// error classifications
|
||||
let err = match err.downcast::<hyper::Error>() {
|
||||
Ok(hyper_error) => return to_connector_error(*hyper_error),
|
||||
Err(box_error) => box_error,
|
||||
};
|
||||
|
||||
// otherwise, we have no idea!
|
||||
ConnectorError::other(err, None)
|
||||
}
|
||||
|
||||
/// Convert a [`hyper::Error`] into a [`ConnectorError`]
|
||||
fn to_connector_error(err: hyper::Error) -> ConnectorError {
|
||||
if err.is_timeout() || find_source::<TimeoutError>(&err).is_some() {
|
||||
ConnectorError::timeout(err.into())
|
||||
} else if err.is_user() {
|
||||
ConnectorError::user(err.into())
|
||||
} else if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(&err).is_some()
|
||||
{
|
||||
ConnectorError::io(err.into())
|
||||
} else {
|
||||
ConnectorError::other(err.into(), None)
|
||||
}
|
||||
}
|
||||
|
||||
fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
|
||||
let mut next = Some(err);
|
||||
while let Some(err) = next {
|
||||
if let Some(matching_err) = err.downcast_ref::<E>() {
|
||||
return Some(matching_err);
|
||||
}
|
||||
next = err.source();
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
/// Builder for [`HyperAdapter`]
|
||||
pub struct Builder {
|
||||
|
@ -205,11 +255,31 @@ mod timeout_middleware {
|
|||
use pin_project_lite::pin_project;
|
||||
|
||||
use smithy_async::future;
|
||||
use smithy_async::future::timeout::Timeout;
|
||||
use smithy_async::future::timeout::{TimedOutError, Timeout};
|
||||
use smithy_async::rt::sleep::AsyncSleep;
|
||||
use smithy_async::rt::sleep::Sleep;
|
||||
use std::error::Error;
|
||||
use std::fmt::Formatter;
|
||||
use tower::BoxError;
|
||||
|
||||
use crate::BoxError;
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TimeoutError {
|
||||
operation: &'static str,
|
||||
duration: Duration,
|
||||
cause: TimedOutError,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TimeoutError {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "timed out after {:?}", self.duration)
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for TimeoutError {
|
||||
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
||||
Some(&self.cause)
|
||||
}
|
||||
}
|
||||
|
||||
/// Timeout wrapper that will timeout on the initial TCP connection
|
||||
///
|
||||
|
@ -274,7 +344,9 @@ mod timeout_middleware {
|
|||
pub enum MaybeTimeoutFuture<F> {
|
||||
Timeout {
|
||||
#[pin]
|
||||
timeout: Timeout<F, Sleep>
|
||||
timeout: Timeout<F, Sleep>,
|
||||
error_type: &'static str,
|
||||
duration: Duration,
|
||||
},
|
||||
NoTimeout {
|
||||
#[pin]
|
||||
|
@ -291,15 +363,24 @@ mod timeout_middleware {
|
|||
type Output = Result<T, BoxError>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let timeout_future = match self.project() {
|
||||
let (timeout_future, timeout_type, dur) = match self.project() {
|
||||
MaybeTimeoutFutureProj::NoTimeout { future } => {
|
||||
return future.poll(cx).map_err(|err| err.into())
|
||||
}
|
||||
MaybeTimeoutFutureProj::Timeout { timeout } => timeout,
|
||||
MaybeTimeoutFutureProj::Timeout {
|
||||
timeout,
|
||||
error_type,
|
||||
duration,
|
||||
} => (timeout, error_type, duration),
|
||||
};
|
||||
match timeout_future.poll(cx) {
|
||||
Poll::Ready(Ok(response)) => Poll::Ready(response.map_err(|err| err.into())),
|
||||
Poll::Ready(Err(timeout)) => Poll::Ready(Err(timeout.into())),
|
||||
Poll::Ready(Err(_timeout)) => Poll::Ready(Err(TimeoutError {
|
||||
operation: timeout_type,
|
||||
duration: *dur,
|
||||
cause: TimedOutError,
|
||||
}
|
||||
.into())),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
@ -324,6 +405,8 @@ mod timeout_middleware {
|
|||
let sleep = sleep.sleep(*duration);
|
||||
MaybeTimeoutFuture::Timeout {
|
||||
timeout: future::timeout::Timeout::new(self.inner.call(req), sleep),
|
||||
error_type: "connect",
|
||||
duration: *duration,
|
||||
}
|
||||
}
|
||||
None => MaybeTimeoutFuture::NoTimeout {
|
||||
|
@ -335,8 +418,7 @@ mod timeout_middleware {
|
|||
|
||||
impl<I, B> tower::Service<http::Request<B>> for HttpReadTimeout<I>
|
||||
where
|
||||
I: tower::Service<http::Request<B>>,
|
||||
I::Error: Into<BoxError>,
|
||||
I: tower::Service<http::Request<B>, Error = hyper::Error>,
|
||||
{
|
||||
type Response = I::Response;
|
||||
type Error = BoxError;
|
||||
|
@ -352,6 +434,9 @@ mod timeout_middleware {
|
|||
let sleep = sleep.sleep(*duration);
|
||||
MaybeTimeoutFuture::Timeout {
|
||||
timeout: future::timeout::Timeout::new(self.inner.call(req), sleep),
|
||||
|
||||
error_type: "HTTP read",
|
||||
duration: *duration,
|
||||
}
|
||||
}
|
||||
None => MaybeTimeoutFuture::NoTimeout {
|
||||
|
@ -414,7 +499,11 @@ mod timeout_middleware {
|
|||
)
|
||||
.await
|
||||
.expect_err("timeout");
|
||||
assert_eq!(format!("{}", resp), "error trying to connect: timed out");
|
||||
assert!(resp.is_timeout(), "{:?}", resp);
|
||||
assert_eq!(
|
||||
format!("{}", resp),
|
||||
"timeout: error trying to connect: timed out after 1s"
|
||||
);
|
||||
assert_elapsed!(now, Duration::from_secs(1));
|
||||
}
|
||||
|
||||
|
@ -430,7 +519,7 @@ mod timeout_middleware {
|
|||
.build(inner);
|
||||
let now = tokio::time::Instant::now();
|
||||
tokio::time::pause();
|
||||
let _resp = hyper
|
||||
let resp = hyper
|
||||
.call(
|
||||
http::Request::builder()
|
||||
.uri("http://foo.com")
|
||||
|
@ -439,7 +528,105 @@ mod timeout_middleware {
|
|||
)
|
||||
.await
|
||||
.expect_err("timeout");
|
||||
assert!(resp.is_timeout(), "{:?}", resp);
|
||||
assert_elapsed!(now, Duration::from_secs(2));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::hyper_impls::HyperAdapter;
|
||||
use http::Uri;
|
||||
use hyper::client::connect::{Connected, Connection};
|
||||
|
||||
use smithy_http::body::SdkBody;
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
|
||||
use tower::BoxError;
|
||||
|
||||
#[tokio::test]
|
||||
async fn hyper_io_error() {
|
||||
let connector = TestConnection {
|
||||
inner: HangupStream,
|
||||
};
|
||||
let mut adapter = HyperAdapter::builder().build(connector);
|
||||
use tower::Service;
|
||||
let err = adapter
|
||||
.call(
|
||||
http::Request::builder()
|
||||
.uri("http://amazon.com")
|
||||
.body(SdkBody::empty())
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.expect_err("socket hangup");
|
||||
assert!(err.is_io(), "{:?}", err);
|
||||
}
|
||||
|
||||
// ---- machinery to make a Hyper connector that responds with an IO Error
|
||||
#[derive(Clone)]
|
||||
struct HangupStream;
|
||||
|
||||
impl Connection for HangupStream {
|
||||
fn connected(&self) -> Connected {
|
||||
Connected::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for HangupStream {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut Context<'_>,
|
||||
_buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
Poll::Ready(Err(std::io::Error::new(
|
||||
ErrorKind::ConnectionReset,
|
||||
"connection reset",
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for HangupStream {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut Context<'_>,
|
||||
_buf: &[u8],
|
||||
) -> Poll<Result<usize, Error>> {
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct TestConnection<T> {
|
||||
inner: T,
|
||||
}
|
||||
|
||||
impl<T> tower::Service<Uri> for TestConnection<T>
|
||||
where
|
||||
T: Clone + hyper::client::connect::Connection,
|
||||
{
|
||||
type Response = T;
|
||||
type Error = BoxError;
|
||||
type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, _req: Uri) -> Self::Future {
|
||||
std::future::ready(Ok(self.inner.clone()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -91,8 +91,6 @@ use std::error::Error;
|
|||
|
||||
use tower::{Layer, Service, ServiceBuilder, ServiceExt};
|
||||
|
||||
type BoxError = Box<dyn Error + Send + Sync + 'static>;
|
||||
|
||||
/// Smithy service client.
|
||||
///
|
||||
/// The service client is customizeable in a number of ways (see [`Builder`]), but most customers
|
||||
|
|
|
@ -15,6 +15,7 @@ use std::task::{Context, Poll};
|
|||
use tokio::net::TcpStream;
|
||||
|
||||
use crate::erase::boxclone::BoxFuture;
|
||||
use smithy_http::result::ConnectorError;
|
||||
use tower::BoxError;
|
||||
|
||||
/// A service that will never return whatever it is you want
|
||||
|
@ -104,7 +105,7 @@ mod stream {
|
|||
}
|
||||
}
|
||||
|
||||
/// A service where the underyling TCP connection never connects
|
||||
/// A service where the underlying TCP connection never connects
|
||||
pub type NeverConnected = NeverService<TcpStream>;
|
||||
|
||||
/// A service that will connect but never send any data
|
||||
|
@ -133,7 +134,7 @@ impl tower::Service<Uri> for NeverReplies {
|
|||
|
||||
impl<Req, Resp> tower::Service<Req> for NeverService<Resp> {
|
||||
type Response = Resp;
|
||||
type Error = BoxError;
|
||||
type Error = ConnectorError;
|
||||
type Future = BoxFuture<Self::Response, Self::Error>;
|
||||
|
||||
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
|
|
|
@ -13,6 +13,7 @@ use http::Request;
|
|||
use protocol_test_helpers::{assert_ok, validate_body, MediaType};
|
||||
|
||||
use smithy_http::body::SdkBody;
|
||||
use smithy_http::result::ConnectorError;
|
||||
use std::future::Ready;
|
||||
|
||||
use std::ops::Deref;
|
||||
|
@ -21,7 +22,6 @@ use std::sync::{Arc, Mutex};
|
|||
use std::task::{Context, Poll};
|
||||
|
||||
use tokio::sync::oneshot;
|
||||
use tower::BoxError;
|
||||
|
||||
/// Test Connection to capture a single request
|
||||
#[derive(Debug, Clone)]
|
||||
|
@ -50,7 +50,7 @@ pub use crate::never;
|
|||
|
||||
impl tower::Service<http::Request<SdkBody>> for CaptureRequestHandler {
|
||||
type Response = http::Response<SdkBody>;
|
||||
type Error = BoxError;
|
||||
type Error = ConnectorError;
|
||||
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
|
@ -220,7 +220,7 @@ where
|
|||
SdkBody: From<B>,
|
||||
{
|
||||
type Response = http::Response<SdkBody>;
|
||||
type Error = BoxError;
|
||||
type Error = ConnectorError;
|
||||
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
|
@ -236,7 +236,7 @@ where
|
|||
.push(ValidateRequest { expected, actual });
|
||||
std::future::ready(Ok(resp.map(SdkBody::from)))
|
||||
} else {
|
||||
std::future::ready(Err("No more data".into()))
|
||||
std::future::ready(Err(ConnectorError::other("No more data".into(), None)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -256,10 +256,12 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::bounds::SmithyConnector;
|
||||
use crate::test_connection::{capture_request, never::NeverService, TestConnection};
|
||||
use crate::{BoxError, Client};
|
||||
use crate::Client;
|
||||
use hyper::service::Service;
|
||||
use smithy_http::body::SdkBody;
|
||||
use smithy_http::result::ConnectorError;
|
||||
|
||||
fn is_send_sync<T: Send + Sync>(_: T) {}
|
||||
|
||||
|
@ -270,14 +272,19 @@ mod tests {
|
|||
is_send_sync(client);
|
||||
}
|
||||
|
||||
fn is_valid_smithy_connector<T>(_: T)
|
||||
fn is_a_connector<T>(_: &T)
|
||||
where
|
||||
T: SmithyConnector,
|
||||
{
|
||||
}
|
||||
fn quacks_like_a_connector<T>(_: &T)
|
||||
where
|
||||
T: Service<http::Request<SdkBody>, Response = http::Response<SdkBody>>
|
||||
+ Send
|
||||
+ Sync
|
||||
+ Clone
|
||||
+ 'static,
|
||||
T::Error: Into<BoxError> + Send + Sync + 'static,
|
||||
T::Error: Into<ConnectorError> + Send + Sync + 'static,
|
||||
T::Future: Send + 'static,
|
||||
{
|
||||
}
|
||||
|
@ -285,11 +292,12 @@ mod tests {
|
|||
#[test]
|
||||
fn oneshot_client() {
|
||||
let (tx, _rx) = capture_request(None);
|
||||
is_valid_smithy_connector(tx);
|
||||
quacks_like_a_connector(&tx);
|
||||
is_a_connector(&tx)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn never_test() {
|
||||
is_valid_smithy_connector(NeverService::new())
|
||||
is_a_connector(&NeverService::new())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,10 +6,11 @@
|
|||
use crate::SendOperationError;
|
||||
use smithy_http::body::SdkBody;
|
||||
use smithy_http::operation;
|
||||
use smithy_http::result::ConnectorError;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use tower::{BoxError, Layer, Service};
|
||||
use tower::{Layer, Service};
|
||||
use tracing::trace;
|
||||
|
||||
/// Connects Operation driven middleware to an HTTP implementation.
|
||||
|
@ -26,7 +27,7 @@ type BoxedResultFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send>
|
|||
impl<S> Service<operation::Request> for DispatchService<S>
|
||||
where
|
||||
S: Service<http::Request<SdkBody>, Response = http::Response<SdkBody>> + Clone + Send + 'static,
|
||||
S::Error: Into<BoxError>,
|
||||
S::Error: Into<ConnectorError>,
|
||||
S::Future: Send + 'static,
|
||||
{
|
||||
type Response = operation::Response;
|
||||
|
|
|
@ -7,7 +7,7 @@ pub mod dispatch;
|
|||
pub mod map_request;
|
||||
pub mod parse_response;
|
||||
|
||||
use smithy_http::result::SdkError;
|
||||
use smithy_http::result::{ConnectorError, SdkError};
|
||||
use tower::BoxError;
|
||||
|
||||
/// An Error Occurred During the process of sending an Operation
|
||||
|
@ -34,7 +34,7 @@ pub enum SendOperationError {
|
|||
RequestConstructionError(BoxError),
|
||||
|
||||
/// The request could not be dispatched
|
||||
RequestDispatchError(BoxError),
|
||||
RequestDispatchError(ConnectorError),
|
||||
}
|
||||
|
||||
/// Convert a `SendOperationError` into an `SdkError`
|
||||
|
@ -63,6 +63,7 @@ mod tests {
|
|||
use smithy_http::operation;
|
||||
use smithy_http::operation::{Operation, Request};
|
||||
use smithy_http::response::ParseStrictResponse;
|
||||
use smithy_http::result::ConnectorError;
|
||||
use std::convert::{Infallible, TryInto};
|
||||
use tower::{service_fn, Service, ServiceBuilder};
|
||||
|
||||
|
@ -96,7 +97,7 @@ mod tests {
|
|||
if _request.headers().contains_key("X-Test") {
|
||||
Ok(http::Response::new(SdkBody::from("ok")))
|
||||
} else {
|
||||
Err("header not set")
|
||||
Err(ConnectorError::user("header not set".into()))
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
*/
|
||||
|
||||
use crate::body::SdkBody;
|
||||
use crate::result::SdkError;
|
||||
use crate::result::{ConnectorError, SdkError};
|
||||
use bytes::Buf;
|
||||
use bytes::Bytes;
|
||||
use bytes_utils::SegmentedBuf;
|
||||
|
@ -170,7 +170,7 @@ impl<T, E> Receiver<T, E> {
|
|||
.data()
|
||||
.await
|
||||
.transpose()
|
||||
.map_err(|err| SdkError::DispatchFailure(err))?;
|
||||
.map_err(|err| SdkError::DispatchFailure(ConnectorError::io(err)))?;
|
||||
let buffer = mem::replace(&mut self.buffer, RecvBuf::Empty);
|
||||
if let Some(chunk) = next_chunk {
|
||||
self.buffer = buffer.with_partial(chunk);
|
||||
|
|
|
@ -3,7 +3,17 @@
|
|||
* SPDX-License-Identifier: Apache-2.0.
|
||||
*/
|
||||
|
||||
#![warn(
|
||||
missing_debug_implementations,
|
||||
missing_docs,
|
||||
rustdoc::all,
|
||||
unreachable_pub
|
||||
)]
|
||||
|
||||
//! `Result` wrapper types for [success](SdkSuccess) and [failure](SdkError) responses.
|
||||
|
||||
use crate::operation;
|
||||
use smithy_types::retry::ErrorKind;
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::fmt::{Debug, Display, Formatter};
|
||||
|
@ -13,7 +23,10 @@ type BoxError = Box<dyn Error + Send + Sync>;
|
|||
/// Successful SDK Result
|
||||
#[derive(Debug)]
|
||||
pub struct SdkSuccess<O> {
|
||||
/// Raw Response from the service. (eg. Http Response)
|
||||
pub raw: operation::Response,
|
||||
|
||||
/// Parsed response from the service
|
||||
pub parsed: O,
|
||||
}
|
||||
|
||||
|
@ -25,14 +38,136 @@ pub enum SdkError<E, R = operation::Response> {
|
|||
|
||||
/// The request failed during dispatch. An HTTP response was not received. The request MAY
|
||||
/// have been sent.
|
||||
DispatchFailure(BoxError),
|
||||
DispatchFailure(ConnectorError),
|
||||
|
||||
/// A response was received but it was not parseable according the the protocol (for example
|
||||
/// the server hung up while the body was being read)
|
||||
ResponseError { err: BoxError, raw: R },
|
||||
ResponseError {
|
||||
/// Error encountered while parsing the response
|
||||
err: BoxError,
|
||||
/// Raw response that was available
|
||||
raw: R,
|
||||
},
|
||||
|
||||
/// An error response was received from the service
|
||||
ServiceError { err: E, raw: R },
|
||||
ServiceError {
|
||||
/// Modeled service error
|
||||
err: E,
|
||||
/// Raw response from the service
|
||||
raw: R,
|
||||
},
|
||||
}
|
||||
|
||||
/// Error from the underlying Connector
|
||||
///
|
||||
/// Connector exists to attach a `ConnectorErrorKind` to what would otherwise be an opaque `Box<dyn Error>`
|
||||
/// that comes off a potentially generic or dynamic connector.
|
||||
/// The attached `kind` is used to determine what retry behavior should occur (if any) based on the
|
||||
/// connector error.
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectorError {
|
||||
err: BoxError,
|
||||
kind: ConnectorErrorKind,
|
||||
}
|
||||
|
||||
impl Display for ConnectorError {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}: {}", self.kind, self.err)
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for ConnectorError {
|
||||
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
||||
Some(self.err.as_ref())
|
||||
}
|
||||
}
|
||||
|
||||
impl ConnectorError {
|
||||
/// Construct a [`ConnectorError`] from an error caused by a timeout
|
||||
///
|
||||
/// Timeout errors are typically retried on a new connection.
|
||||
pub fn timeout(err: BoxError) -> Self {
|
||||
Self {
|
||||
err,
|
||||
kind: ConnectorErrorKind::Timeout,
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct a [`ConnectorError`] from an error caused by the user (eg. invalid HTTP request)
|
||||
pub fn user(err: BoxError) -> Self {
|
||||
Self {
|
||||
err,
|
||||
kind: ConnectorErrorKind::User,
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct a [`ConnectorError`] from an IO related error (eg. socket hangup)
|
||||
pub fn io(err: BoxError) -> Self {
|
||||
Self {
|
||||
err,
|
||||
kind: ConnectorErrorKind::Io,
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct a [`ConnectorError`] from an different unclassified error.
|
||||
///
|
||||
/// Optionally, an explicit `Kind` may be passed.
|
||||
pub fn other(err: BoxError, kind: Option<ErrorKind>) -> Self {
|
||||
Self {
|
||||
err,
|
||||
kind: ConnectorErrorKind::Other(kind),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if the error is an IO error
|
||||
pub fn is_io(&self) -> bool {
|
||||
matches!(self.kind, ConnectorErrorKind::Io)
|
||||
}
|
||||
|
||||
/// Returns true if the error is an timeout error
|
||||
pub fn is_timeout(&self) -> bool {
|
||||
matches!(self.kind, ConnectorErrorKind::Timeout)
|
||||
}
|
||||
|
||||
/// Returns true if the error is a user error
|
||||
pub fn is_user(&self) -> bool {
|
||||
matches!(self.kind, ConnectorErrorKind::User)
|
||||
}
|
||||
|
||||
/// Returns the optional error kind associated with an unclassified error
|
||||
pub fn is_other(&self) -> Option<ErrorKind> {
|
||||
match &self.kind {
|
||||
ConnectorErrorKind::Other(ek) => *ek,
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum ConnectorErrorKind {
|
||||
/// A timeout occurred while processing the request
|
||||
Timeout,
|
||||
|
||||
/// A user-caused error (eg. invalid HTTP request)
|
||||
User,
|
||||
|
||||
/// Socket/IO error
|
||||
Io,
|
||||
|
||||
/// An unclassified Error with an explicit error kind
|
||||
Other(Option<ErrorKind>),
|
||||
}
|
||||
|
||||
impl Display for ConnectorErrorKind {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
ConnectorErrorKind::Timeout => write!(f, "timeout"),
|
||||
ConnectorErrorKind::User => write!(f, "user error"),
|
||||
ConnectorErrorKind::Io => write!(f, "io error"),
|
||||
ConnectorErrorKind::Other(Some(kind)) => write!(f, "{:?}", kind),
|
||||
ConnectorErrorKind::Other(None) => write!(f, "other"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E, R> Display for SdkError<E, R>
|
||||
|
@ -56,9 +191,10 @@ where
|
|||
{
|
||||
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
||||
match self {
|
||||
SdkError::ConstructionFailure(err)
|
||||
| SdkError::DispatchFailure(err)
|
||||
| SdkError::ResponseError { err, .. } => Some(err.as_ref()),
|
||||
SdkError::ConstructionFailure(err) | SdkError::ResponseError { err, .. } => {
|
||||
Some(err.as_ref())
|
||||
}
|
||||
SdkError::DispatchFailure(err) => Some(err),
|
||||
SdkError::ServiceError { err, .. } => Some(err),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue