More tidying up of `aws-smithy-runtime-api` (#2869)

This PR finishes documenting `aws-smithy-runtime-api` and also:

- Adds `Send + Sync` bounds to `Interceptor`.
- Moves `Interceptors` into `aws-smithy-runtime`.
- Expands the more complicated macros in the interceptor context
wrappers.
- Makes the `OrchestratorError` an informative error according to
[RFC-22](https://github.com/awslabs/smithy-rs/blob/main/design/src/rfcs/rfc0022_error_context_and_compatibility.md).
- Renames `ConnectorError::is_other` to `as_other` and adds an
equivalent `is_other` that returns a boolean.

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
This commit is contained in:
John DiSanti 2023-07-25 12:59:17 -07:00 committed by GitHub
parent a5465b79a5
commit 2922f561b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 801 additions and 709 deletions

View File

@ -77,7 +77,7 @@ impl<AP> RequestChecksumInterceptor<AP> {
impl<AP> Interceptor for RequestChecksumInterceptor<AP>
where
AP: Fn(&Input) -> Result<Option<ChecksumAlgorithm>, BoxError>,
AP: Fn(&Input) -> Result<Option<ChecksumAlgorithm>, BoxError> + Send + Sync,
{
fn read_before_serialization(
&self,

View File

@ -54,7 +54,7 @@ impl<VE> ResponseChecksumInterceptor<VE> {
impl<VE> Interceptor for ResponseChecksumInterceptor<VE>
where
VE: Fn(&Input) -> bool,
VE: Fn(&Input) -> bool + Send + Sync,
{
fn read_before_serialization(
&self,

View File

@ -69,7 +69,7 @@ where
impl<G, T> Interceptor for Route53ResourceIdInterceptor<G, T>
where
G: for<'a> Fn(&'a mut T) -> &'a mut Option<String>,
G: for<'a> Fn(&'a mut T) -> &'a mut Option<String> + Send + Sync,
T: fmt::Debug + Send + Sync + 'static,
{
fn modify_before_serialization(

View File

@ -91,7 +91,7 @@ class InterceptorConfigCustomization(codegenContext: ClientCodegenContext) : Con
/// ## }
/// ## }
/// ```
pub fn interceptor(mut self, interceptor: impl #{Interceptor} + Send + Sync + 'static) -> Self {
pub fn interceptor(mut self, interceptor: impl #{Interceptor} + 'static) -> Self {
self.push_interceptor(#{SharedInterceptor}::new(interceptor));
self
}

View File

@ -199,7 +199,7 @@ class CustomizableOperationGenerator(
/// `map_request`, and `mutate_request` (the last two are implemented via interceptors under the hood).
/// The order in which those user-specified operation interceptors are invoked should not be relied upon
/// as it is an implementation detail.
pub fn interceptor(mut self, interceptor: impl #{Interceptor} + #{Send} + #{Sync} + 'static) -> Self {
pub fn interceptor(mut self, interceptor: impl #{Interceptor} + 'static) -> Self {
self.interceptors.push(#{SharedInterceptor}::new(interceptor));
self
}

View File

@ -232,11 +232,16 @@ impl DispatchFailure {
self.source.is_user()
}
/// Returns the optional error kind associated with an unclassified error
pub fn is_other(&self) -> Option<ErrorKind> {
/// Returns true if the error is an unclassified error.
pub fn is_other(&self) -> bool {
self.source.is_other()
}
/// Returns the optional error kind associated with an unclassified error
pub fn as_other(&self) -> Option<ErrorKind> {
self.source.as_other()
}
/// Returns the inner error if it is a connector error
pub fn as_connector_error(&self) -> Option<&ConnectorError> {
Some(&self.source)
@ -633,8 +638,13 @@ impl ConnectorError {
matches!(self.kind, ConnectorErrorKind::User)
}
/// Returns true if the error is an unclassified error.
pub fn is_other(&self) -> bool {
matches!(self.kind, ConnectorErrorKind::Other(..))
}
/// Returns the optional error kind associated with an unclassified error
pub fn is_other(&self) -> Option<ErrorKind> {
pub fn as_other(&self) -> Option<ErrorKind> {
match &self.kind {
ConnectorErrorKind::Other(ek) => *ek,
_ => None,

View File

@ -45,7 +45,7 @@ impl DefaultResponseRetryClassifier {
Err(SdkError::DispatchFailure(err)) => {
if err.is_timeout() || err.is_io() {
Err(RetryKind::Error(ErrorKind::TransientError))
} else if let Some(ek) = err.is_other() {
} else if let Some(ek) = err.as_other() {
Err(RetryKind::Error(ek))
} else {
Err(RetryKind::UnretryableFailure)

View File

@ -13,12 +13,10 @@ use crate::client::interceptors::context::{
BeforeDeserializationInterceptorContextRef, BeforeSerializationInterceptorContextMut,
BeforeSerializationInterceptorContextRef, BeforeTransmitInterceptorContextMut,
BeforeTransmitInterceptorContextRef, FinalizerInterceptorContextMut,
FinalizerInterceptorContextRef, InterceptorContext,
FinalizerInterceptorContextRef,
};
use crate::client::runtime_components::RuntimeComponents;
use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
use aws_smithy_types::error::display::DisplayErrorContext;
use context::{Error, Input, Output};
use std::fmt;
use std::marker::PhantomData;
use std::ops::Deref;
@ -38,9 +36,7 @@ macro_rules! interceptor_trait_fn {
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
let _ctx = context;
let _rc = runtime_components;
let _cfg = cfg;
let (_ctx, _rc, _cfg) = (context, runtime_components, cfg);
Ok(())
}
};
@ -52,9 +48,7 @@ macro_rules! interceptor_trait_fn {
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
let _ctx = context;
let _rc = runtime_components;
let _cfg = cfg;
let (_ctx, _rc, _cfg) = (context, runtime_components, cfg);
Ok(())
}
};
@ -70,7 +64,7 @@ macro_rules! interceptor_trait_fn {
/// of the SDK s request execution pipeline. Hooks are either "read" hooks, which make it possible
/// to read in-flight request or response messages, or "read/write" hooks, which make it possible
/// to modify in-flight request or output messages.
pub trait Interceptor: fmt::Debug {
pub trait Interceptor: fmt::Debug + Send + Sync {
/// A hook called at the start of an execution, before the SDK
/// does anything else.
///
@ -78,14 +72,14 @@ pub trait Interceptor: fmt::Debug {
/// between invocation of this hook and `after_execution` is very close
/// to full duration of the execution.
///
/// **Available Information:** The [InterceptorContext::input()] is
/// **ALWAYS** available. Other information **WILL NOT** be available.
/// **Available Information:** The [`InterceptorContext::input`](context::InterceptorContext::input)
/// is **ALWAYS** available. Other information **WILL NOT** be available.
///
/// **Error Behavior:** Errors raised by this hook will be stored
/// until all interceptors have had their `before_execution` invoked.
/// Other hooks will then be skipped and execution will jump to
/// `modify_before_completion` with the raised error as the
/// [InterceptorContext::output_or_error()]. If multiple
/// [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error). If multiple
/// `before_execution` methods raise errors, the latest
/// will be used and earlier ones will be logged and dropped.
fn read_before_execution(
@ -93,8 +87,7 @@ pub trait Interceptor: fmt::Debug {
context: &BeforeSerializationInterceptorContextRef<'_>,
cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
let _ctx = context;
let _cfg = cfg;
let (_ctx, _cfg) = (context, cfg);
Ok(())
}
@ -110,14 +103,14 @@ pub trait Interceptor: fmt::Debug {
**When:** This will **ALWAYS** be called once per execution, except when a
failure occurs earlier in the request pipeline.
**Available Information:** The [InterceptorContext::input()] is
**Available Information:** The [`InterceptorContext::input`](context::InterceptorContext::input) is
**ALWAYS** available. This request may have been modified by earlier
`modify_before_serialization` hooks, and may be modified further by
later hooks. Other information **WILL NOT** be available.
**Error Behavior:** If errors are raised by this hook,
execution will jump to `modify_before_completion` with the raised
error as the [InterceptorContext::output_or_error()].
error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
**Return Constraints:** The input message returned by this hook
MUST be the same type of input message passed into this hook.
@ -138,12 +131,12 @@ pub trait Interceptor: fmt::Debug {
duration between invocation of this hook and `after_serialization` is
very close to the amount of time spent marshalling the request.
**Available Information:** The [InterceptorContext::input()] is
**Available Information:** The [`InterceptorContext::input`](context::InterceptorContext::input) is
**ALWAYS** available. Other information **WILL NOT** be available.
**Error Behavior:** If errors are raised by this hook,
execution will jump to `modify_before_completion` with the raised
error as the [InterceptorContext::output_or_error()].
error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
"
);
@ -151,21 +144,20 @@ pub trait Interceptor: fmt::Debug {
read_after_serialization,
BeforeTransmitInterceptorContextRef,
"
/// A hook called after the input message is marshalled into
/// a transport message.
///
/// **When:** This will **ALWAYS** be called once per execution, except when a
/// failure occurs earlier in the request pipeline. The duration
/// between invocation of this hook and `before_serialization` is very
/// close to the amount of time spent marshalling the request.
///
/// **Available Information:** The [InterceptorContext::input()]
/// and [InterceptorContext::request()] are **ALWAYS** available.
/// Other information **WILL NOT** be available.
///
/// **Error Behavior:** If errors are raised by this hook,
/// execution will jump to `modify_before_completion` with the raised
/// error as the [InterceptorContext::output_or_error()].
A hook called after the input message is marshalled into
a transport message.
**When:** This will **ALWAYS** be called once per execution, except when a
failure occurs earlier in the request pipeline. The duration
between invocation of this hook and `before_serialization` is very
close to the amount of time spent marshalling the request.
**Available Information:** The [`InterceptorContext::request`](context::InterceptorContext::request)
is **ALWAYS** available. Other information **WILL NOT** be available.
**Error Behavior:** If errors are raised by this hook,
execution will jump to `modify_before_completion` with the raised
error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
"
);
@ -177,13 +169,12 @@ pub trait Interceptor: fmt::Debug {
has the ability to modify and return a new transport request
message of the same type, except when a failure occurs earlier in the request pipeline.
**Available Information:** The [InterceptorContext::input()]
and [InterceptorContext::request()] are **ALWAYS** available.
Other information **WILL NOT** be available.
**Available Information:** The [`InterceptorContext::request`](context::InterceptorContext::request)
is **ALWAYS** available. Other information **WILL NOT** be available.
**Error Behavior:** If errors are raised by this hook,
execution will jump to `modify_before_completion` with the raised
error as the [InterceptorContext::output_or_error()].
error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
**Return Constraints:** The transport request message returned by this
hook MUST be the same type of request message passed into this hook
@ -202,9 +193,8 @@ pub trait Interceptor: fmt::Debug {
failure occurs earlier in the request pipeline. This method will be
called multiple times in the event of retries.
**Available Information:** The [InterceptorContext::input()]
and [InterceptorContext::request()] are **ALWAYS** available.
Other information **WILL NOT** be available. In the event of retries,
**Available Information:** The [`InterceptorContext::request`](context::InterceptorContext::request)
is **ALWAYS** available. Other information **WILL NOT** be available. In the event of retries,
the `InterceptorContext` will not include changes made in previous
attempts (e.g. by request signers or other interceptors).
@ -212,7 +202,7 @@ pub trait Interceptor: fmt::Debug {
until all interceptors have had their `before_attempt` invoked.
Other hooks will then be skipped and execution will jump to
`modify_before_attempt_completion` with the raised error as the
[InterceptorContext::output_or_error()]. If multiple
[`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error). If multiple
`before_attempt` methods raise errors, the latest will be used
and earlier ones will be logged and dropped.
"
@ -230,18 +220,16 @@ pub trait Interceptor: fmt::Debug {
failure occurs earlier in the request pipeline. This method may be
called multiple times in the event of retries.
**Available Information:** The [InterceptorContext::input()]
and [InterceptorContext::request()] are **ALWAYS** available.
The `http::Request` may have been modified by earlier
**Available Information:** The [`InterceptorContext::request`](context::InterceptorContext::request)
is **ALWAYS** available. The `http::Request` may have been modified by earlier
`modify_before_signing` hooks, and may be modified further by later
hooks. Other information **WILL NOT** be available. In the event of
retries, the `InterceptorContext` will not include changes made
in previous attempts
(e.g. by request signers or other interceptors).
in previous attempts (e.g. by request signers or other interceptors).
**Error Behavior:** If errors are raised by this
hook, execution will jump to `modify_before_attempt_completion` with
the raised error as the [InterceptorContext::output_or_error()].
the raised error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
**Return Constraints:** The transport request message returned by this
hook MUST be the same type of request message passed into this hook
@ -262,15 +250,14 @@ pub trait Interceptor: fmt::Debug {
invocation of this hook and `after_signing` is very close to
the amount of time spent signing the request.
**Available Information:** The [InterceptorContext::input()]
and [InterceptorContext::request()] are **ALWAYS** available.
**Available Information:** The [`InterceptorContext::request`](context::InterceptorContext::request) is **ALWAYS** available.
Other information **WILL NOT** be available. In the event of retries,
the `InterceptorContext` will not include changes made in previous
attempts (e.g. by request signers or other interceptors).
**Error Behavior:** If errors are raised by this
hook, execution will jump to `modify_before_attempt_completion` with
the raised error as the [InterceptorContext::output_or_error()].
the raised error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
"
);
@ -286,15 +273,14 @@ pub trait Interceptor: fmt::Debug {
invocation of this hook and `before_signing` is very close to
the amount of time spent signing the request.
**Available Information:** The [InterceptorContext::input()]
and [InterceptorContext::request()] are **ALWAYS** available.
**Available Information:** The [`InterceptorContext::request`](context::InterceptorContext::request) is **ALWAYS** available.
Other information **WILL NOT** be available. In the event of retries,
the `InterceptorContext` will not include changes made in previous
attempts (e.g. by request signers or other interceptors).
**Error Behavior:** If errors are raised by this
hook, execution will jump to `modify_before_attempt_completion` with
the raised error as the [InterceptorContext::output_or_error()].
the raised error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
"
);
@ -302,26 +288,25 @@ pub trait Interceptor: fmt::Debug {
mut modify_before_transmit,
BeforeTransmitInterceptorContextMut,
"
/// A hook called before the transport request message is sent to the
/// service. This method has the ability to modify and return
/// a new transport request message of the same type.
///
/// **When:** This will **ALWAYS** be called once per attempt, except when a
/// failure occurs earlier in the request pipeline. This method may be
/// called multiple times in the event of retries.
///
/// **Available Information:** The [InterceptorContext::input()]
/// and [InterceptorContext::request()] are **ALWAYS** available.
/// The `http::Request` may have been modified by earlier
/// `modify_before_transmit` hooks, and may be modified further by later
/// hooks. Other information **WILL NOT** be available.
/// In the event of retries, the `InterceptorContext` will not include
/// changes made in previous attempts (e.g. by request signers or
A hook called before the transport request message is sent to the
service. This method has the ability to modify and return
a new transport request message of the same type.
**When:** This will **ALWAYS** be called once per attempt, except when a
failure occurs earlier in the request pipeline. This method may be
called multiple times in the event of retries.
**Available Information:** The [`InterceptorContext::request`](context::InterceptorContext::request)
is **ALWAYS** available. The `http::Request` may have been modified by earlier
`modify_before_transmit` hooks, and may be modified further by later
hooks. Other information **WILL NOT** be available.
In the event of retries, the `InterceptorContext` will not include
changes made in previous attempts (e.g. by request signers or
other interceptors).
**Error Behavior:** If errors are raised by this
hook, execution will jump to `modify_before_attempt_completion` with
the raised error as the [InterceptorContext::output_or_error()].
the raised error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
**Return Constraints:** The transport request message returned by this
hook MUST be the same type of request message passed into this hook
@ -345,16 +330,15 @@ pub trait Interceptor: fmt::Debug {
Depending on the protocol, the duration may not include the
time spent reading the response data.
**Available Information:** The [InterceptorContext::input()]
and [InterceptorContext::request()] are **ALWAYS** available.
Other information **WILL NOT** be available. In the event of retries,
**Available Information:** The [`InterceptorContext::request`](context::InterceptorContext::request)
is **ALWAYS** available. Other information **WILL NOT** be available. In the event of retries,
the `InterceptorContext` will not include changes made in previous
attempts (e.g. by request signers or other interceptors).
**Error Behavior:** If errors are raised by this
hook, execution will jump to `modify_before_attempt_completion` with
the raised error as the [InterceptorContext::output_or_error()].
the raised error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
"
);
@ -373,16 +357,14 @@ pub trait Interceptor: fmt::Debug {
Depending on the protocol, the duration may not include the time
spent reading the response data.
**Available Information:** The [InterceptorContext::input()],
[InterceptorContext::request()] and
[InterceptorContext::response()] are **ALWAYS** available.
Other information **WILL NOT** be available. In the event of retries,
**Available Information:** The [`InterceptorContext::response`](context::InterceptorContext::response)
is **ALWAYS** available. Other information **WILL NOT** be available. In the event of retries,
the `InterceptorContext` will not include changes made in previous
attempts (e.g. by request signers or other interceptors).
**Error Behavior:** If errors are raised by this
hook, execution will jump to `modify_before_attempt_completion` with
the raised error as the [InterceptorContext::output_or_error()].
the raised error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
"
);
@ -398,10 +380,8 @@ pub trait Interceptor: fmt::Debug {
failure occurs earlier in the request pipeline. This method may be
called multiple times in the event of retries.
**Available Information:** The [InterceptorContext::input()],
[InterceptorContext::request()] and
[InterceptorContext::response()] are **ALWAYS** available.
The transmit_response may have been modified by earlier
**Available Information:** The [`InterceptorContext::response`](context::InterceptorContext::response)
is **ALWAYS** available. The transmit_response may have been modified by earlier
`modify_before_deserialization` hooks, and may be modified further by
later hooks. Other information **WILL NOT** be available. In the event of
retries, the `InterceptorContext` will not include changes made in
@ -410,7 +390,7 @@ pub trait Interceptor: fmt::Debug {
**Error Behavior:** If errors are raised by this
hook, execution will jump to `modify_before_attempt_completion` with
the raised error as the
[InterceptorContext::output_or_error()].
[`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
**Return Constraints:** The transport response message returned by this
hook MUST be the same type of response message passed into
@ -432,16 +412,14 @@ pub trait Interceptor: fmt::Debug {
Depending on the protocol and operation, the duration may include
the time spent downloading the response data.
**Available Information:** The [InterceptorContext::input()],
[InterceptorContext::request()] and
[InterceptorContext::response()] are **ALWAYS** available.
Other information **WILL NOT** be available. In the event of retries,
**Available Information:** The [`InterceptorContext::response`](context::InterceptorContext::response)
is **ALWAYS** available. Other information **WILL NOT** be available. In the event of retries,
the `InterceptorContext` will not include changes made in previous
attempts (e.g. by request signers or other interceptors).
**Error Behavior:** If errors are raised by this
hook, execution will jump to `modify_before_attempt_completion`
with the raised error as the [InterceptorContext::output_or_error()].
with the raised error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
"
);
@ -459,16 +437,14 @@ pub trait Interceptor: fmt::Debug {
the duration may include the time spent downloading
the response data.
**Available Information:** The [InterceptorContext::input()],
[InterceptorContext::request()],
[InterceptorContext::response()] and
[InterceptorContext::output_or_error()] are **ALWAYS** available. In the event
of retries, the `InterceptorContext` will not include changes made
**Available Information:** The [`InterceptorContext::response`](context::InterceptorContext::response)
and [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error)
are **ALWAYS** available. In the event of retries, the `InterceptorContext` will not include changes made
in previous attempts (e.g. by request signers or other interceptors).
**Error Behavior:** If errors are raised by this
hook, execution will jump to `modify_before_attempt_completion` with
the raised error as the [InterceptorContext::output_or_error()].
the raised error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
"
);
@ -480,16 +456,19 @@ pub trait Interceptor: fmt::Debug {
/// failure occurs before `before_attempt`. This method may
/// be called multiple times in the event of retries.
///
/// **Available Information:** The [InterceptorContext::input()],
/// [InterceptorContext::request()],
/// [InterceptorContext::response()] and
/// [InterceptorContext::output_or_error()] are **ALWAYS** available. In the event
/// of retries, the `InterceptorContext` will not include changes made
/// **Available Information:**
/// The [`InterceptorContext::input`](context::InterceptorContext::input),
/// [`InterceptorContext::request`](context::InterceptorContext::request),
/// [`InterceptorContext::response`](context::InterceptorContext::response), or
/// [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error) **MAY** be available.
/// If the operation succeeded, the `output` will be available. Otherwise, any of the other
/// pieces of information may be available depending on where in the operation lifecycle it failed.
/// In the event of retries, the `InterceptorContext` will not include changes made
/// in previous attempts (e.g. by request signers or other interceptors).
///
/// **Error Behavior:** If errors are raised by this
/// hook, execution will jump to `after_attempt` with
/// the raised error as the [InterceptorContext::output_or_error()].
/// the raised error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
///
/// **Return Constraints:** Any output message returned by this
/// hook MUST match the operation being invoked. Any error type can be
@ -500,9 +479,7 @@ pub trait Interceptor: fmt::Debug {
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
let _ctx = context;
let _rc = runtime_components;
let _cfg = cfg;
let (_ctx, _rc, _cfg) = (context, runtime_components, cfg);
Ok(())
}
@ -511,14 +488,15 @@ pub trait Interceptor: fmt::Debug {
/// **When:** This will **ALWAYS** be called once per attempt, as long as
/// `before_attempt` has been executed.
///
/// **Available Information:** The [InterceptorContext::input()],
/// [InterceptorContext::request()] and
/// [InterceptorContext::output_or_error()] are **ALWAYS** available.
/// The [InterceptorContext::response()] is available if a
/// response was received by the service for this attempt.
/// In the event of retries, the `InterceptorContext` will not include
/// changes made in previous attempts (e.g. by request signers or other
/// interceptors).
/// **Available Information:**
/// The [`InterceptorContext::input`](context::InterceptorContext::input),
/// [`InterceptorContext::request`](context::InterceptorContext::request),
/// [`InterceptorContext::response`](context::InterceptorContext::response), or
/// [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error) **MAY** be available.
/// If the operation succeeded, the `output` will be available. Otherwise, any of the other
/// pieces of information may be available depending on where in the operation lifecycle it failed.
/// In the event of retries, the `InterceptorContext` will not include changes made
/// in previous attempts (e.g. by request signers or other interceptors).
///
/// **Error Behavior:** Errors raised by this hook will be stored
/// until all interceptors have had their `after_attempt` invoked.
@ -527,16 +505,14 @@ pub trait Interceptor: fmt::Debug {
/// retry strategy determines that the execution is retryable,
/// execution will then jump to `before_attempt`. Otherwise,
/// execution will jump to `modify_before_attempt_completion` with the
/// raised error as the [InterceptorContext::output_or_error()].
/// raised error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
fn read_after_attempt(
&self,
context: &FinalizerInterceptorContextRef<'_>,
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
let _ctx = context;
let _rc = runtime_components;
let _cfg = cfg;
let (_ctx, _rc, _cfg) = (context, runtime_components, cfg);
Ok(())
}
@ -547,15 +523,19 @@ pub trait Interceptor: fmt::Debug {
///
/// **When:** This will **ALWAYS** be called once per execution.
///
/// **Available Information:** The [InterceptorContext::input()]
/// and [InterceptorContext::output_or_error()] are **ALWAYS** available. The
/// [InterceptorContext::request()]
/// and [InterceptorContext::response()] are available if the
/// execution proceeded far enough for them to be generated.
/// **Available Information:**
/// The [`InterceptorContext::input`](context::InterceptorContext::input),
/// [`InterceptorContext::request`](context::InterceptorContext::request),
/// [`InterceptorContext::response`](context::InterceptorContext::response), or
/// [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error) **MAY** be available.
/// If the operation succeeded, the `output` will be available. Otherwise, any of the other
/// pieces of information may be available depending on where in the operation lifecycle it failed.
/// In the event of retries, the `InterceptorContext` will not include changes made
/// in previous attempts (e.g. by request signers or other interceptors).
///
/// **Error Behavior:** If errors are raised by this
/// hook , execution will jump to `after_attempt` with
/// the raised error as the [InterceptorContext::output_or_error()].
/// the raised error as the [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error).
///
/// **Return Constraints:** Any output message returned by this
/// hook MUST match the operation being invoked. Any error type can be
@ -566,9 +546,7 @@ pub trait Interceptor: fmt::Debug {
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
let _ctx = context;
let _rc = runtime_components;
let _cfg = cfg;
let (_ctx, _rc, _cfg) = (context, runtime_components, cfg);
Ok(())
}
@ -578,17 +556,21 @@ pub trait Interceptor: fmt::Debug {
/// between invocation of this hook and `before_execution` is very
/// close to the full duration of the execution.
///
/// **Available Information:** The [InterceptorContext::input()]
/// and [InterceptorContext::output_or_error()] are **ALWAYS** available. The
/// [InterceptorContext::request()] and
/// [InterceptorContext::response()] are available if the
/// execution proceeded far enough for them to be generated.
/// **Available Information:**
/// The [`InterceptorContext::input`](context::InterceptorContext::input),
/// [`InterceptorContext::request`](context::InterceptorContext::request),
/// [`InterceptorContext::response`](context::InterceptorContext::response), or
/// [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error) **MAY** be available.
/// If the operation succeeded, the `output` will be available. Otherwise, any of the other
/// pieces of information may be available depending on where in the operation lifecycle it failed.
/// In the event of retries, the `InterceptorContext` will not include changes made
/// in previous attempts (e.g. by request signers or other interceptors).
///
/// **Error Behavior:** Errors raised by this hook will be stored
/// until all interceptors have had their `after_execution` invoked.
/// The error will then be treated as the
/// [InterceptorContext::output_or_error()] to the customer. If multiple
/// `after_execution` methods raise errors , the latest will be
/// [`InterceptorContext::output_or_error`](context::InterceptorContext::output_or_error)
/// to the customer. If multiple `after_execution` methods raise errors , the latest will be
/// used and earlier ones will be logged and dropped.
fn read_after_execution(
&self,
@ -596,9 +578,7 @@ pub trait Interceptor: fmt::Debug {
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
let _ctx = context;
let _rc = runtime_components;
let _cfg = cfg;
let (_ctx, _rc, _cfg) = (context, runtime_components, cfg);
Ok(())
}
}
@ -606,7 +586,7 @@ pub trait Interceptor: fmt::Debug {
/// Interceptor wrapper that may be shared
#[derive(Clone)]
pub struct SharedInterceptor {
interceptor: Arc<dyn Interceptor + Send + Sync>,
interceptor: Arc<dyn Interceptor>,
check_enabled: Arc<dyn Fn(&ConfigBag) -> bool + Send + Sync>,
}
@ -619,8 +599,8 @@ impl fmt::Debug for SharedInterceptor {
}
impl SharedInterceptor {
/// Create a new `SharedInterceptor` from `Interceptor`
pub fn new<T: Interceptor + Send + Sync + 'static>(interceptor: T) -> Self {
/// Create a new `SharedInterceptor` from `Interceptor`.
pub fn new<T: Interceptor + 'static>(interceptor: T) -> Self {
Self {
interceptor: Arc::new(interceptor),
check_enabled: Arc::new(|conf: &ConfigBag| {
@ -629,7 +609,8 @@ impl SharedInterceptor {
}
}
fn enabled(&self, conf: &ConfigBag) -> bool {
/// Checks if this interceptor is enabled in the given config.
pub fn enabled(&self, conf: &ConfigBag) -> bool {
(self.check_enabled)(conf)
}
}
@ -641,84 +622,12 @@ impl AsRef<dyn Interceptor> for SharedInterceptor {
}
impl Deref for SharedInterceptor {
type Target = Arc<dyn Interceptor + Send + Sync>;
type Target = Arc<dyn Interceptor>;
fn deref(&self) -> &Self::Target {
&self.interceptor
}
}
/// A interceptor wrapper to conditionally enable the interceptor based on [`DisableInterceptor`]
struct ConditionallyEnabledInterceptor(SharedInterceptor);
impl ConditionallyEnabledInterceptor {
fn if_enabled(&self, cfg: &ConfigBag) -> Option<&dyn Interceptor> {
if self.0.enabled(cfg) {
Some(self.0.as_ref())
} else {
None
}
}
}
#[derive(Debug)]
pub struct Interceptors<I> {
interceptors: I,
}
macro_rules! interceptor_impl_fn {
(mut $interceptor:ident) => {
pub fn $interceptor(
self,
ctx: &mut InterceptorContext,
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!(concat!(
"running `",
stringify!($interceptor),
"` interceptors"
));
let mut result: Result<(), BoxError> = Ok(());
let mut ctx = ctx.into();
for interceptor in self.into_iter() {
if let Some(interceptor) = interceptor.if_enabled(cfg) {
if let Err(new_error) =
interceptor.$interceptor(&mut ctx, runtime_components, cfg)
{
if let Err(last_error) = result {
tracing::debug!("{}", DisplayErrorContext(&*last_error));
}
result = Err(new_error);
}
}
}
result.map_err(InterceptorError::$interceptor)
}
};
(ref $interceptor:ident) => {
pub fn $interceptor(
self,
ctx: &InterceptorContext,
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
let mut result: Result<(), BoxError> = Ok(());
let ctx = ctx.into();
for interceptor in self.into_iter() {
if let Some(interceptor) = interceptor.if_enabled(cfg) {
if let Err(new_error) = interceptor.$interceptor(&ctx, runtime_components, cfg)
{
if let Err(last_error) = result {
tracing::debug!("{}", DisplayErrorContext(&*last_error));
}
result = Err(new_error);
}
}
}
result.map_err(InterceptorError::$interceptor)
}
};
}
/// Generalized interceptor disabling interface
///
/// RuntimePlugins can disable interceptors by inserting [`DisableInterceptor<T>`](DisableInterceptor) into the config bag
@ -744,215 +653,3 @@ pub fn disable_interceptor<T: Interceptor>(cause: &'static str) -> DisableInterc
cause,
}
}
impl<I> Interceptors<I>
where
I: Iterator<Item = SharedInterceptor>,
{
pub fn new(interceptors: I) -> Self {
Self { interceptors }
}
fn into_iter(self) -> impl Iterator<Item = ConditionallyEnabledInterceptor> {
self.interceptors.map(ConditionallyEnabledInterceptor)
}
pub fn read_before_execution(
self,
operation: bool,
ctx: &InterceptorContext<Input, Output, Error>,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!(
"running {} `read_before_execution` interceptors",
if operation { "operation" } else { "client" }
);
let mut result: Result<(), BoxError> = Ok(());
let ctx: BeforeSerializationInterceptorContextRef<'_> = ctx.into();
for interceptor in self.into_iter() {
if let Some(interceptor) = interceptor.if_enabled(cfg) {
if let Err(new_error) = interceptor.read_before_execution(&ctx, cfg) {
if let Err(last_error) = result {
tracing::debug!("{}", DisplayErrorContext(&*last_error));
}
result = Err(new_error);
}
}
}
result.map_err(InterceptorError::read_before_execution)
}
interceptor_impl_fn!(mut modify_before_serialization);
interceptor_impl_fn!(ref read_before_serialization);
interceptor_impl_fn!(ref read_after_serialization);
interceptor_impl_fn!(mut modify_before_retry_loop);
interceptor_impl_fn!(ref read_before_attempt);
interceptor_impl_fn!(mut modify_before_signing);
interceptor_impl_fn!(ref read_before_signing);
interceptor_impl_fn!(ref read_after_signing);
interceptor_impl_fn!(mut modify_before_transmit);
interceptor_impl_fn!(ref read_before_transmit);
interceptor_impl_fn!(ref read_after_transmit);
interceptor_impl_fn!(mut modify_before_deserialization);
interceptor_impl_fn!(ref read_before_deserialization);
interceptor_impl_fn!(ref read_after_deserialization);
pub fn modify_before_attempt_completion(
self,
ctx: &mut InterceptorContext<Input, Output, Error>,
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!("running `modify_before_attempt_completion` interceptors");
let mut result: Result<(), BoxError> = Ok(());
let mut ctx: FinalizerInterceptorContextMut<'_> = ctx.into();
for interceptor in self.into_iter() {
if let Some(interceptor) = interceptor.if_enabled(cfg) {
if let Err(new_error) =
interceptor.modify_before_attempt_completion(&mut ctx, runtime_components, cfg)
{
if let Err(last_error) = result {
tracing::debug!("{}", DisplayErrorContext(&*last_error));
}
result = Err(new_error);
}
}
}
result.map_err(InterceptorError::modify_before_attempt_completion)
}
pub fn read_after_attempt(
self,
ctx: &InterceptorContext<Input, Output, Error>,
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!("running `read_after_attempt` interceptors");
let mut result: Result<(), BoxError> = Ok(());
let ctx: FinalizerInterceptorContextRef<'_> = ctx.into();
for interceptor in self.into_iter() {
if let Some(interceptor) = interceptor.if_enabled(cfg) {
if let Err(new_error) =
interceptor.read_after_attempt(&ctx, runtime_components, cfg)
{
if let Err(last_error) = result {
tracing::debug!("{}", DisplayErrorContext(&*last_error));
}
result = Err(new_error);
}
}
}
result.map_err(InterceptorError::read_after_attempt)
}
pub fn modify_before_completion(
self,
ctx: &mut InterceptorContext<Input, Output, Error>,
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!("running `modify_before_completion` interceptors");
let mut result: Result<(), BoxError> = Ok(());
let mut ctx: FinalizerInterceptorContextMut<'_> = ctx.into();
for interceptor in self.into_iter() {
if let Some(interceptor) = interceptor.if_enabled(cfg) {
if let Err(new_error) =
interceptor.modify_before_completion(&mut ctx, runtime_components, cfg)
{
if let Err(last_error) = result {
tracing::debug!("{}", DisplayErrorContext(&*last_error));
}
result = Err(new_error);
}
}
}
result.map_err(InterceptorError::modify_before_completion)
}
pub fn read_after_execution(
self,
ctx: &InterceptorContext<Input, Output, Error>,
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!("running `read_after_execution` interceptors");
let mut result: Result<(), BoxError> = Ok(());
let ctx: FinalizerInterceptorContextRef<'_> = ctx.into();
for interceptor in self.into_iter() {
if let Some(interceptor) = interceptor.if_enabled(cfg) {
if let Err(new_error) =
interceptor.read_after_execution(&ctx, runtime_components, cfg)
{
if let Err(last_error) = result {
tracing::debug!("{}", DisplayErrorContext(&*last_error));
}
result = Err(new_error);
}
}
}
result.map_err(InterceptorError::read_after_execution)
}
}
#[cfg(all(test, feature = "test-util"))]
mod tests {
use crate::client::interceptors::context::Input;
use crate::client::interceptors::{
disable_interceptor, BeforeTransmitInterceptorContextRef, BoxError, Interceptor,
InterceptorContext, Interceptors, SharedInterceptor,
};
use crate::client::runtime_components::{RuntimeComponents, RuntimeComponentsBuilder};
use aws_smithy_types::config_bag::ConfigBag;
#[derive(Debug)]
struct TestInterceptor;
impl Interceptor for TestInterceptor {}
#[test]
fn test_disable_interceptors() {
#[derive(Debug)]
struct PanicInterceptor;
impl Interceptor for PanicInterceptor {
fn read_before_transmit(
&self,
_context: &BeforeTransmitInterceptorContextRef<'_>,
_rc: &RuntimeComponents,
_cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
Err("boom".into())
}
}
let rc = RuntimeComponentsBuilder::for_tests()
.with_interceptor(SharedInterceptor::new(PanicInterceptor))
.with_interceptor(SharedInterceptor::new(TestInterceptor))
.build()
.unwrap();
let mut cfg = ConfigBag::base();
let interceptors = Interceptors::new(rc.interceptors());
assert_eq!(
interceptors
.into_iter()
.filter(|i| i.if_enabled(&cfg).is_some())
.count(),
2
);
Interceptors::new(rc.interceptors())
.read_before_transmit(&InterceptorContext::new(Input::new(5)), &rc, &mut cfg)
.expect_err("interceptor returns error");
cfg.interceptor_state()
.store_put(disable_interceptor::<PanicInterceptor>("test"));
assert_eq!(
Interceptors::new(rc.interceptors())
.into_iter()
.filter(|i| i.if_enabled(&cfg).is_some())
.count(),
1
);
// shouldn't error because interceptors won't run
Interceptors::new(rc.interceptors())
.read_before_transmit(&InterceptorContext::new(Input::new(5)), &rc, &mut cfg)
.expect("interceptor is now disabled");
}
}

View File

@ -37,9 +37,13 @@ use std::{fmt, mem};
use tracing::{debug, error, trace};
// TODO(enableNewSmithyRuntimeLaunch): New-type `Input`/`Output`/`Error`
/// Type-erased operation input.
pub type Input = TypeErasedBox;
/// Type-erased operation output.
pub type Output = TypeErasedBox;
/// Type-erased operation error.
pub type Error = TypeErasedError;
/// Type-erased result for an operation.
pub type OutputOrError = Result<Output, OrchestratorError<Error>>;
type Request = HttpRequest;
@ -89,38 +93,7 @@ impl InterceptorContext<Input, Output, Error> {
}
}
impl<I, O, E: Debug> InterceptorContext<I, O, E> {
/// Decomposes the context into its constituent parts.
#[doc(hidden)]
#[allow(clippy::type_complexity)]
pub fn into_parts(
self,
) -> (
Option<I>,
Option<Result<O, OrchestratorError<E>>>,
Option<Request>,
Option<Response>,
) {
(
self.input,
self.output_or_error,
self.request,
self.response,
)
}
pub fn finalize(self) -> Result<O, SdkError<E, HttpResponse>> {
let Self {
output_or_error,
response,
phase,
..
} = self;
output_or_error
.expect("output_or_error must always be set before finalize is called.")
.map_err(|error| OrchestratorError::into_sdk_error(error, &phase, response))
}
impl<I, O, E> InterceptorContext<I, O, E> {
/// Retrieve the input for the operation being invoked.
pub fn input(&self) -> Option<&I> {
self.input.as_ref()
@ -188,6 +161,14 @@ impl<I, O, E: Debug> InterceptorContext<I, O, E> {
self.output_or_error.as_mut()
}
/// Return `true` if this context's `output_or_error` is an error. Otherwise, return `false`.
pub fn is_failed(&self) -> bool {
self.output_or_error
.as_ref()
.map(Result::is_err)
.unwrap_or_default()
}
/// Advance to the Serialization phase.
#[doc(hidden)]
pub fn enter_serialization_phase(&mut self) {
@ -314,6 +295,44 @@ impl<I, O, E: Debug> InterceptorContext<I, O, E> {
self.output_or_error = None;
RewindResult::Occurred
}
}
impl<I, O, E> InterceptorContext<I, O, E>
where
E: Debug,
{
/// Decomposes the context into its constituent parts.
#[doc(hidden)]
#[allow(clippy::type_complexity)]
pub fn into_parts(
self,
) -> (
Option<I>,
Option<Result<O, OrchestratorError<E>>>,
Option<Request>,
Option<Response>,
) {
(
self.input,
self.output_or_error,
self.request,
self.response,
)
}
/// Convert this context into the final operation result that is returned in client's the public API.
#[doc(hidden)]
pub fn finalize(self) -> Result<O, SdkError<E, HttpResponse>> {
let Self {
output_or_error,
response,
phase,
..
} = self;
output_or_error
.expect("output_or_error must always be set before finalize is called.")
.map_err(|error| OrchestratorError::into_sdk_error(error, &phase, response))
}
/// Mark this context as failed due to errors during the operation. Any errors already contained
/// by the context will be replaced by the given error.
@ -328,14 +347,6 @@ impl<I, O, E: Debug> InterceptorContext<I, O, E> {
error!("orchestrator context received an error but one was already present; Throwing away previous error: {:?}", existing_err);
}
}
/// Return `true` if this context's `output_or_error` is an error. Otherwise, return `false`.
pub fn is_failed(&self) -> bool {
self.output_or_error
.as_ref()
.map(Result::is_err)
.unwrap_or_default()
}
}
/// The result of attempting to rewind a request.

View File

@ -8,191 +8,222 @@ use crate::client::interceptors::context::{Request, Response};
use crate::client::orchestrator::OrchestratorError;
use std::fmt::Debug;
macro_rules! output {
(&Option<Result<$o_ty:ty, $e_ty:ty>>) => {
Option<Result<&$o_ty, &$e_ty>>
};
(&Option<$ty:ty>) => {
Option<&$ty>
};
(&mut Option<$ty:ty>) => {
Option<&mut $ty>
};
(&Result<$o_ty:ty, $e_ty:ty>) => {
Result<&$o_ty, &$e_ty>
};
(&$($tt:tt)+) => {
&$($tt)+
};
(&mut $($tt:tt)+) => {
&mut $($tt)+
};
}
macro_rules! declare_method {
(&mut $name:ident, $inner_name:ident, $doc:literal, Option<$ty:ty>) => {
#[doc=$doc]
pub fn $name(&mut self) -> Option<&mut $ty> {
self.inner.$inner_name.as_ref()
}
};
(&$name:ident, $inner_name:ident, $doc:literal, Option<$ty:ty>) => {
#[doc=$doc]
pub fn $name(&self) -> Option<$ty> {
self.inner.$inner_name.as_mut()
}
};
(&mut $name:ident, $doc:literal, $($tt:tt)+) => {
#[doc=$doc]
pub fn $name(&mut self) -> output!(&mut $($tt)+) {
self.inner.$name().expect(concat!("`", stringify!($name), "` wasn't set in the underlying interceptor context. This is a bug."))
}
};
(&$name:ident, $doc:literal, $($tt:tt)+) => {
#[doc=$doc]
pub fn $name(&self) -> output!(&$($tt)+) {
self.inner.$name().expect(concat!("`", stringify!($name), "` wasn't set in the underlying interceptor context. This is a bug."))
}
};
}
macro_rules! declare_known_method {
(output_or_error: &mut $($tt:tt)+) => {
declare_method!(&mut output_or_error_mut, "Returns a mutable reference to the deserialized output or error.", $($tt)+);
};
(output_or_error: &$($tt:tt)+) => {
declare_method!(&output_or_error, "Returns a reference to the deserialized output or error.", $($tt)+);
};
(input: &mut $($tt:tt)+) => {
declare_method!(&mut input_mut, "Returns a mutable reference to the input.", $($tt)+);
};
(input: &$($tt:tt)+) => {
declare_method!(&input, "Returns a reference to the input.", $($tt)+);
};
(request: &mut $($tt:tt)+) => {
declare_method!(&mut request_mut, "Returns a mutable reference to the transmittable request for the operation being invoked.", $($tt)+);
};
(request: &$($tt:tt)+) => {
declare_method!(&request, "Returns a reference to the transmittable request for the operation being invoked.", $($tt)+);
};
(response: &mut $($tt:tt)+) => {
declare_method!(&mut response_mut, "Returns a mutable reference to the response.", $($tt)+);
};
(response: &$($tt:tt)+) => {
declare_method!(&response, "Returns a reference to the response.", $($tt)+);
};
}
macro_rules! declare_wrapper {
(($ref_struct_name:ident readonly)$($tt:tt)+) => {
pub struct $ref_struct_name<'a, I = Input, O = Output, E = Error> {
inner: &'a InterceptorContext<I, O, E>,
}
impl<'a, I, O, E: Debug> From<&'a InterceptorContext<I, O, E>> for $ref_struct_name<'a, I, O, E>
{
macro_rules! impl_from_interceptor_context {
(ref $wrapper:ident) => {
impl<'a, I, O, E> From<&'a InterceptorContext<I, O, E>> for $wrapper<'a, I, O, E> {
fn from(inner: &'a InterceptorContext<I, O, E>) -> Self {
Self { inner }
}
}
impl<'a, I, O, E: Debug> $ref_struct_name<'a, I, O, E> {
declare_ref_wrapper_methods!($($tt)+);
}
};
(($ref_struct_name:ident $mut_struct_name:ident)$($tt:tt)+) => {
declare_wrapper!(($ref_struct_name readonly) $($tt)+);
pub struct $mut_struct_name<'a, I = Input, O = Output, E = Error> {
inner: &'a mut InterceptorContext<I, O, E>,
}
impl<'a, I, O, E: Debug> From<&'a mut InterceptorContext<I, O, E>> for $mut_struct_name<'a, I, O, E>
{
(mut $wrapper:ident) => {
impl<'a, I, O, E> From<&'a mut InterceptorContext<I, O, E>> for $wrapper<'a, I, O, E> {
fn from(inner: &'a mut InterceptorContext<I, O, E>) -> Self {
Self { inner }
}
}
impl<'a, I, O, E: Debug> $mut_struct_name<'a, I, O, E> {
declare_ref_wrapper_methods!($($tt)+);
declare_mut_wrapper_methods!($($tt)+);
}
};
}
macro_rules! declare_ref_wrapper_methods {
(($field:ident: $($head:tt)+)$($tail:tt)+) => {
declare_known_method!($field: &$($head)+);
declare_ref_wrapper_methods!($($tail)+);
};
(($field:ident: $($tt:tt)+)) => {
declare_known_method!($field: &$($tt)+);
macro_rules! expect {
($self:ident, $what:ident) => {
$self.inner.$what().expect(concat!(
"`",
stringify!($what),
"` wasn't set in the underlying interceptor context. This is a bug."
))
};
}
macro_rules! declare_mut_wrapper_methods {
(($field:ident: $($head:tt)+)$($tail:tt)+) => {
declare_known_method!($field: &mut $($head)+);
declare_mut_wrapper_methods!($($tail)+);
};
(($field:ident: $($tt:tt)+)) => {
declare_known_method!($field: &mut $($tt)+);
};
//
// BeforeSerializationInterceptorContextRef
//
/// Interceptor context for the `read_before_execution` and `read_before_serialization` hooks.
///
/// Only the input is available at this point in the operation.
#[derive(Debug)]
pub struct BeforeSerializationInterceptorContextRef<'a, I = Input, O = Output, E = Error> {
inner: &'a InterceptorContext<I, O, E>,
}
declare_wrapper!(
(BeforeSerializationInterceptorContextRef BeforeSerializationInterceptorContextMut)
(input: I)
);
impl_from_interceptor_context!(ref BeforeSerializationInterceptorContextRef);
declare_wrapper!(
(BeforeTransmitInterceptorContextRef BeforeTransmitInterceptorContextMut)
(request: Request)
);
impl<'a, I, O, E> BeforeSerializationInterceptorContextRef<'a, I, O, E> {
/// Returns a reference to the input.
pub fn input(&self) -> &I {
expect!(self, input)
}
}
declare_wrapper!(
(BeforeDeserializationInterceptorContextRef BeforeDeserializationInterceptorContextMut)
(input: I)
(request: Request)
(response: Response)
);
//
// BeforeSerializationInterceptorContextMut
//
/// Interceptor context for the `modify_before_serialization` hook.
///
/// Only the input is available at this point in the operation.
#[derive(Debug)]
pub struct BeforeSerializationInterceptorContextMut<'a, I = Input, O = Output, E = Error> {
inner: &'a mut InterceptorContext<I, O, E>,
}
impl_from_interceptor_context!(mut BeforeSerializationInterceptorContextMut);
impl<'a, I, O, E> BeforeSerializationInterceptorContextMut<'a, I, O, E> {
/// Returns a reference to the input.
pub fn input(&self) -> &I {
expect!(self, input)
}
/// Returns a mutable reference to the input.
pub fn input_mut(&mut self) -> &mut I {
expect!(self, input_mut)
}
}
//
// BeforeSerializationInterceptorContextRef
//
/// Interceptor context for several hooks in between serialization and transmission.
///
/// Only the request is available at this point in the operation.
#[derive(Debug)]
pub struct BeforeTransmitInterceptorContextRef<'a, I = Input, O = Output, E = Error> {
inner: &'a InterceptorContext<I, O, E>,
}
impl_from_interceptor_context!(ref BeforeTransmitInterceptorContextRef);
impl<'a, I, O, E> BeforeTransmitInterceptorContextRef<'a, I, O, E> {
/// Returns a reference to the transmittable request for the operation being invoked.
pub fn request(&self) -> &Request {
expect!(self, request)
}
}
//
// BeforeSerializationInterceptorContextMut
//
/// Interceptor context for several hooks in between serialization and transmission.
///
/// Only the request is available at this point in the operation.
#[derive(Debug)]
pub struct BeforeTransmitInterceptorContextMut<'a, I = Input, O = Output, E = Error> {
inner: &'a mut InterceptorContext<I, O, E>,
}
impl_from_interceptor_context!(mut BeforeTransmitInterceptorContextMut);
impl<'a, I, O, E> BeforeTransmitInterceptorContextMut<'a, I, O, E> {
/// Returns a reference to the transmittable request for the operation being invoked.
pub fn request(&self) -> &Request {
expect!(self, request)
}
/// Returns a mutable reference to the transmittable request for the operation being invoked.
pub fn request_mut(&mut self) -> &mut Request {
expect!(self, request_mut)
}
}
//
// BeforeDeserializationInterceptorContextRef
//
/// Interceptor context for hooks before deserializing the response.
///
/// Only the response is available at this point in the operation.
#[derive(Debug)]
pub struct BeforeDeserializationInterceptorContextRef<'a, I = Input, O = Output, E = Error> {
inner: &'a InterceptorContext<I, O, E>,
}
impl_from_interceptor_context!(ref BeforeDeserializationInterceptorContextRef);
impl<'a, I, O, E> BeforeDeserializationInterceptorContextRef<'a, I, O, E> {
/// Returns a reference to the response.
pub fn response(&self) -> &Response {
expect!(self, response)
}
}
//
// BeforeDeserializationInterceptorContextMut
//
/// Interceptor context for hooks before deserializing the response.
///
/// Only the response is available at this point in the operation.
pub struct BeforeDeserializationInterceptorContextMut<'a, I = Input, O = Output, E = Error> {
inner: &'a mut InterceptorContext<I, O, E>,
}
impl_from_interceptor_context!(mut BeforeDeserializationInterceptorContextMut);
impl<'a, I, O, E> BeforeDeserializationInterceptorContextMut<'a, I, O, E> {
/// Returns a reference to the response.
pub fn response(&self) -> &Response {
expect!(self, response)
}
/// Returns a mutable reference to the response.
pub fn response_mut(&mut self) -> &mut Response {
expect!(self, response_mut)
}
impl<'a, I, O, E: Debug> BeforeDeserializationInterceptorContextMut<'a, I, O, E> {
#[doc(hidden)]
/// Downgrade this helper struct, returning the underlying InterceptorContext. There's no good
/// reason to use this unless you're writing tests or you have to interact with an API that
/// doesn't support the helper structs.
pub fn into_inner(&mut self) -> &'_ mut InterceptorContext<I, O, E> {
pub fn inner_mut(&mut self) -> &'_ mut InterceptorContext<I, O, E> {
self.inner
}
}
declare_wrapper!(
(AfterDeserializationInterceptorContextRef readonly)
(input: I)
(request: Request)
(response: Response)
(output_or_error: Result<O, OrchestratorError<E>>
));
//
// AfterDeserializationInterceptorContextRef
//
// Why are all the rest of these defined with a macro but these last two aren't? I simply ran out of
// time. Consider updating the macros to support these last two if you're looking for a challenge.
// - Zelda
/// Interceptor context for hooks after deserializing the response.
///
/// The response and the deserialized output or error are available at this point in the operation.
pub struct AfterDeserializationInterceptorContextRef<'a, I = Input, O = Output, E = Error> {
inner: &'a InterceptorContext<I, O, E>,
}
impl_from_interceptor_context!(ref AfterDeserializationInterceptorContextRef);
impl<'a, I, O, E> AfterDeserializationInterceptorContextRef<'a, I, O, E> {
/// Returns a reference to the response.
pub fn response(&self) -> &Response {
expect!(self, response)
}
/// Returns a reference to the deserialized output or error.
pub fn output_or_error(&self) -> Result<&O, &OrchestratorError<E>> {
expect!(self, output_or_error)
}
}
//
// FinalizerInterceptorContextRef
//
/// Interceptor context for finalization hooks.
///
/// This context is used by the `read_after_attempt` and `read_after_execution` hooks
/// that are all called upon both success and failure, and may have varying levels
/// of context available depending on where a failure occurred if the operation failed.
pub struct FinalizerInterceptorContextRef<'a, I = Input, O = Output, E = Error> {
inner: &'a InterceptorContext<I, O, E>,
}
impl<'a, I, O, E: Debug> From<&'a InterceptorContext<I, O, E>>
for FinalizerInterceptorContextRef<'a, I, O, E>
{
fn from(inner: &'a InterceptorContext<I, O, E>) -> Self {
Self { inner }
}
}
impl_from_interceptor_context!(ref FinalizerInterceptorContextRef);
impl<'a, I, O, E: Debug> FinalizerInterceptorContextRef<'a, I, O, E> {
impl<'a, I, O, E> FinalizerInterceptorContextRef<'a, I, O, E> {
/// Returns the operation input.
pub fn input(&self) -> Option<&I> {
self.inner.input.as_ref()
@ -214,19 +245,22 @@ impl<'a, I, O, E: Debug> FinalizerInterceptorContextRef<'a, I, O, E> {
}
}
//
// FinalizerInterceptorContextMut
//
/// Interceptor context for finalization hooks.
///
/// This context is used by the `modify_before_attempt_completion` and `modify_before_completion` hooks
/// that are all called upon both success and failure, and may have varying levels
/// of context available depending on where a failure occurred if the operation failed.
pub struct FinalizerInterceptorContextMut<'a, I = Input, O = Output, E = Error> {
inner: &'a mut InterceptorContext<I, O, E>,
}
impl<'a, I, O, E: Debug> From<&'a mut InterceptorContext<I, O, E>>
for FinalizerInterceptorContextMut<'a, I, O, E>
{
fn from(inner: &'a mut InterceptorContext<I, O, E>) -> Self {
Self { inner }
}
}
impl_from_interceptor_context!(mut FinalizerInterceptorContextMut);
impl<'a, I, O, E: Debug> FinalizerInterceptorContextMut<'a, I, O, E> {
impl<'a, I, O, E> FinalizerInterceptorContextMut<'a, I, O, E> {
/// Returns the operation input.
pub fn input(&self) -> Option<&I> {
self.inner.input.as_ref()

View File

@ -71,62 +71,110 @@ impl Storable for LoadedRequestBody {
type Storer = StoreReplace<Self>;
}
// TODO(enableNewSmithyRuntimeLaunch): Make OrchestratorError adhere to the errors RFC
/// Errors that can occur while running the orchestrator.
#[derive(Debug)]
#[non_exhaustive]
pub enum OrchestratorError<E> {
enum ErrorKind<E> {
/// An error occurred within an interceptor.
Interceptor { err: InterceptorError },
Interceptor { source: InterceptorError },
/// An error returned by a service.
Operation { err: E },
/// An error that occurs when a request times out.
Timeout { err: BoxError },
Timeout { source: BoxError },
/// An error that occurs when request dispatch fails.
Connector { err: ConnectorError },
Connector { source: ConnectorError },
/// An error that occurs when a response can't be deserialized.
Response { err: BoxError },
Response { source: BoxError },
/// A general orchestrator error.
Other { err: BoxError },
Other { source: BoxError },
}
impl<E: Debug> OrchestratorError<E> {
/// Create a new `OrchestratorError` from a [`BoxError`].
pub fn other(err: impl Into<Box<dyn std::error::Error + Send + Sync + 'static>>) -> Self {
let err = err.into();
Self::Other { err }
/// Errors that can occur while running the orchestrator.
#[derive(Debug)]
pub struct OrchestratorError<E> {
kind: ErrorKind<E>,
}
impl<E> OrchestratorError<E> {
/// Create a new `OrchestratorError` from the given source.
pub fn other(source: impl Into<Box<dyn std::error::Error + Send + Sync + 'static>>) -> Self {
Self {
kind: ErrorKind::Other {
source: source.into(),
},
}
}
/// Create a new `OrchestratorError` from an error received from a service.
/// Create an operation error.
pub fn operation(err: E) -> Self {
Self::Operation { err }
Self {
kind: ErrorKind::Operation { err },
}
}
/// Create a new `OrchestratorError::Interceptor` from an [`InterceptorError`].
pub fn interceptor(err: InterceptorError) -> Self {
Self::Interceptor { err }
/// True if the underlying error is an operation error.
pub fn is_operation_error(&self) -> bool {
matches!(self.kind, ErrorKind::Operation { .. })
}
/// Create a new `OrchestratorError::Timeout` from a [`BoxError`].
pub fn timeout(err: BoxError) -> Self {
Self::Timeout { err }
}
/// Create a new `OrchestratorError::Response` from a [`BoxError`].
pub fn response(err: BoxError) -> Self {
Self::Response { err }
}
/// Create a new `OrchestratorError::Connector` from a [`ConnectorError`].
pub fn connector(err: ConnectorError) -> Self {
Self::Connector { err }
}
/// Convert the `OrchestratorError` into `Some` operation specific error if it is one. Otherwise,
/// return `None`.
/// Return this orchestrator error as an operation error if possible.
pub fn as_operation_error(&self) -> Option<&E> {
match self {
Self::Operation { err } => Some(err),
match &self.kind {
ErrorKind::Operation { err } => Some(err),
_ => None,
}
}
/// Create an interceptor error with the given source.
pub fn interceptor(source: InterceptorError) -> Self {
Self {
kind: ErrorKind::Interceptor { source },
}
}
/// True if the underlying error is an interceptor error.
pub fn is_interceptor_error(&self) -> bool {
matches!(self.kind, ErrorKind::Interceptor { .. })
}
/// Create a timeout error with the given source.
pub fn timeout(source: BoxError) -> Self {
Self {
kind: ErrorKind::Timeout { source },
}
}
/// True if the underlying error is a timeout error.
pub fn is_timeout_error(&self) -> bool {
matches!(self.kind, ErrorKind::Timeout { .. })
}
/// Create a response error with the given source.
pub fn response(source: BoxError) -> Self {
Self {
kind: ErrorKind::Response { source },
}
}
/// True if the underlying error is a response error.
pub fn is_response_error(&self) -> bool {
matches!(self.kind, ErrorKind::Response { .. })
}
/// Create a connector error with the given source.
pub fn connector(source: ConnectorError) -> Self {
Self {
kind: ErrorKind::Connector { source },
}
}
/// True if the underlying error is a [`ConnectorError`].
pub fn is_connector_error(&self) -> bool {
matches!(self.kind, ErrorKind::Connector { .. })
}
/// Return this orchestrator error as a connector error if possible.
pub fn as_connector_error(&self) -> Option<&ConnectorError> {
match &self.kind {
ErrorKind::Connector { source } => Some(source),
_ => None,
}
}
@ -137,34 +185,36 @@ impl<E: Debug> OrchestratorError<E> {
phase: &Phase,
response: Option<HttpResponse>,
) -> SdkError<E, HttpResponse> {
match self {
Self::Interceptor { err } => {
match self.kind {
ErrorKind::Interceptor { source } => {
use Phase::*;
match phase {
BeforeSerialization | Serialization => SdkError::construction_failure(err),
BeforeSerialization | Serialization => SdkError::construction_failure(source),
BeforeTransmit | Transmit => match response {
Some(response) => SdkError::response_error(err, response),
None => SdkError::dispatch_failure(ConnectorError::other(err.into(), None)),
Some(response) => SdkError::response_error(source, response),
None => {
SdkError::dispatch_failure(ConnectorError::other(source.into(), None))
}
},
BeforeDeserialization | Deserialization | AfterDeserialization => {
SdkError::response_error(err, response.expect("phase has a response"))
SdkError::response_error(source, response.expect("phase has a response"))
}
}
}
Self::Operation { err } => {
ErrorKind::Operation { err } => {
debug_assert!(phase.is_after_deserialization(), "operation errors are a result of successfully receiving and parsing a response from the server. Therefore, we must be in the 'After Deserialization' phase.");
SdkError::service_error(err, response.expect("phase has a response"))
}
Self::Connector { err } => SdkError::dispatch_failure(err),
Self::Timeout { err } => SdkError::timeout_error(err),
Self::Response { err } => SdkError::response_error(err, response.unwrap()),
Self::Other { err } => {
ErrorKind::Connector { source } => SdkError::dispatch_failure(source),
ErrorKind::Timeout { source } => SdkError::timeout_error(source),
ErrorKind::Response { source } => SdkError::response_error(source, response.unwrap()),
ErrorKind::Other { source } => {
use Phase::*;
match phase {
BeforeSerialization | Serialization => SdkError::construction_failure(err),
BeforeTransmit | Transmit => convert_dispatch_error(err, response),
BeforeSerialization | Serialization => SdkError::construction_failure(source),
BeforeTransmit | Transmit => convert_dispatch_error(source, response),
BeforeDeserialization | Deserialization | AfterDeserialization => {
SdkError::response_error(err, response.expect("phase has a response"))
SdkError::response_error(source, response.expect("phase has a response"))
}
}
}
@ -202,12 +252,3 @@ impl From<TypeErasedError> for OrchestratorError<TypeErasedError> {
Self::operation(err)
}
}
impl<E> From<aws_smithy_http::byte_stream::error::Error> for OrchestratorError<E>
where
E: Debug + std::error::Error + 'static,
{
fn from(err: aws_smithy_http::byte_stream::error::Error) -> Self {
Self::other(err)
}
}

View File

@ -4,8 +4,7 @@
*/
#![warn(
// TODO(enableNewSmithyRuntimeLaunch): Add in remaining missing docs
// missing_docs,
missing_docs,
rustdoc::missing_crate_level_docs,
unreachable_pub,
rust_2018_idioms

View File

@ -72,7 +72,7 @@ impl Interceptor for ConnectionPoisoningInterceptor {
.ok_or("retry classifiers are required for connection poisoning to work")?;
let error_is_transient = retry_classifiers
.classify_retry(context.into_inner())
.classify_retry(context.inner_mut())
.map(|reason| reason == RetryReason::Error(ErrorKind::TransientError))
.unwrap_or_default();
let connection_poisoning_is_enabled =

View File

@ -5,15 +5,246 @@
use aws_smithy_http::body::SdkBody;
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextMut;
use aws_smithy_runtime_api::client::interceptors::Interceptor;
use aws_smithy_runtime_api::client::interceptors::context::{
BeforeSerializationInterceptorContextRef, BeforeTransmitInterceptorContextMut,
FinalizerInterceptorContextMut, FinalizerInterceptorContextRef,
};
use aws_smithy_runtime_api::client::interceptors::context::{
Error, Input, InterceptorContext, Output,
};
use aws_smithy_runtime_api::client::interceptors::{
Interceptor, InterceptorError, SharedInterceptor,
};
use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_types::config_bag::ConfigBag;
use aws_smithy_types::error::display::DisplayErrorContext;
use std::error::Error as StdError;
use std::fmt;
use std::marker::PhantomData;
macro_rules! interceptor_impl_fn {
(mut $interceptor:ident) => {
pub(crate) fn $interceptor(
self,
ctx: &mut InterceptorContext,
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!(concat!(
"running `",
stringify!($interceptor),
"` interceptors"
));
let mut result: Result<(), BoxError> = Ok(());
let mut ctx = ctx.into();
for interceptor in self.into_iter() {
if let Some(interceptor) = interceptor.if_enabled(cfg) {
if let Err(new_error) =
interceptor.$interceptor(&mut ctx, runtime_components, cfg)
{
if let Err(last_error) = result {
tracing::debug!("{}", DisplayErrorContext(&*last_error));
}
result = Err(new_error);
}
}
}
result.map_err(InterceptorError::$interceptor)
}
};
(ref $interceptor:ident) => {
pub(crate) fn $interceptor(
self,
ctx: &InterceptorContext,
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
let mut result: Result<(), BoxError> = Ok(());
let ctx = ctx.into();
for interceptor in self.into_iter() {
if let Some(interceptor) = interceptor.if_enabled(cfg) {
if let Err(new_error) = interceptor.$interceptor(&ctx, runtime_components, cfg)
{
if let Err(last_error) = result {
tracing::debug!("{}", DisplayErrorContext(&*last_error));
}
result = Err(new_error);
}
}
}
result.map_err(InterceptorError::$interceptor)
}
};
}
#[derive(Debug)]
pub(crate) struct Interceptors<I> {
interceptors: I,
}
impl<I> Interceptors<I>
where
I: Iterator<Item = SharedInterceptor>,
{
pub(crate) fn new(interceptors: I) -> Self {
Self { interceptors }
}
fn into_iter(self) -> impl Iterator<Item = ConditionallyEnabledInterceptor> {
self.interceptors.map(ConditionallyEnabledInterceptor)
}
pub(crate) fn read_before_execution(
self,
operation: bool,
ctx: &InterceptorContext<Input, Output, Error>,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!(
"running {} `read_before_execution` interceptors",
if operation { "operation" } else { "client" }
);
let mut result: Result<(), BoxError> = Ok(());
let ctx: BeforeSerializationInterceptorContextRef<'_> = ctx.into();
for interceptor in self.into_iter() {
if let Some(interceptor) = interceptor.if_enabled(cfg) {
if let Err(new_error) = interceptor.read_before_execution(&ctx, cfg) {
if let Err(last_error) = result {
tracing::debug!("{}", DisplayErrorContext(&*last_error));
}
result = Err(new_error);
}
}
}
result.map_err(InterceptorError::read_before_execution)
}
interceptor_impl_fn!(mut modify_before_serialization);
interceptor_impl_fn!(ref read_before_serialization);
interceptor_impl_fn!(ref read_after_serialization);
interceptor_impl_fn!(mut modify_before_retry_loop);
interceptor_impl_fn!(ref read_before_attempt);
interceptor_impl_fn!(mut modify_before_signing);
interceptor_impl_fn!(ref read_before_signing);
interceptor_impl_fn!(ref read_after_signing);
interceptor_impl_fn!(mut modify_before_transmit);
interceptor_impl_fn!(ref read_before_transmit);
interceptor_impl_fn!(ref read_after_transmit);
interceptor_impl_fn!(mut modify_before_deserialization);
interceptor_impl_fn!(ref read_before_deserialization);
interceptor_impl_fn!(ref read_after_deserialization);
pub(crate) fn modify_before_attempt_completion(
self,
ctx: &mut InterceptorContext<Input, Output, Error>,
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!("running `modify_before_attempt_completion` interceptors");
let mut result: Result<(), BoxError> = Ok(());
let mut ctx: FinalizerInterceptorContextMut<'_> = ctx.into();
for interceptor in self.into_iter() {
if let Some(interceptor) = interceptor.if_enabled(cfg) {
if let Err(new_error) =
interceptor.modify_before_attempt_completion(&mut ctx, runtime_components, cfg)
{
if let Err(last_error) = result {
tracing::debug!("{}", DisplayErrorContext(&*last_error));
}
result = Err(new_error);
}
}
}
result.map_err(InterceptorError::modify_before_attempt_completion)
}
pub(crate) fn read_after_attempt(
self,
ctx: &InterceptorContext<Input, Output, Error>,
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!("running `read_after_attempt` interceptors");
let mut result: Result<(), BoxError> = Ok(());
let ctx: FinalizerInterceptorContextRef<'_> = ctx.into();
for interceptor in self.into_iter() {
if let Some(interceptor) = interceptor.if_enabled(cfg) {
if let Err(new_error) =
interceptor.read_after_attempt(&ctx, runtime_components, cfg)
{
if let Err(last_error) = result {
tracing::debug!("{}", DisplayErrorContext(&*last_error));
}
result = Err(new_error);
}
}
}
result.map_err(InterceptorError::read_after_attempt)
}
pub(crate) fn modify_before_completion(
self,
ctx: &mut InterceptorContext<Input, Output, Error>,
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!("running `modify_before_completion` interceptors");
let mut result: Result<(), BoxError> = Ok(());
let mut ctx: FinalizerInterceptorContextMut<'_> = ctx.into();
for interceptor in self.into_iter() {
if let Some(interceptor) = interceptor.if_enabled(cfg) {
if let Err(new_error) =
interceptor.modify_before_completion(&mut ctx, runtime_components, cfg)
{
if let Err(last_error) = result {
tracing::debug!("{}", DisplayErrorContext(&*last_error));
}
result = Err(new_error);
}
}
}
result.map_err(InterceptorError::modify_before_completion)
}
pub(crate) fn read_after_execution(
self,
ctx: &InterceptorContext<Input, Output, Error>,
runtime_components: &RuntimeComponents,
cfg: &mut ConfigBag,
) -> Result<(), InterceptorError> {
tracing::trace!("running `read_after_execution` interceptors");
let mut result: Result<(), BoxError> = Ok(());
let ctx: FinalizerInterceptorContextRef<'_> = ctx.into();
for interceptor in self.into_iter() {
if let Some(interceptor) = interceptor.if_enabled(cfg) {
if let Err(new_error) =
interceptor.read_after_execution(&ctx, runtime_components, cfg)
{
if let Err(last_error) = result {
tracing::debug!("{}", DisplayErrorContext(&*last_error));
}
result = Err(new_error);
}
}
}
result.map_err(InterceptorError::read_after_execution)
}
}
/// A interceptor wrapper to conditionally enable the interceptor based on
/// [`DisableInterceptor`](aws_smithy_runtime_api::client::interceptors::DisableInterceptor)
struct ConditionallyEnabledInterceptor(SharedInterceptor);
impl ConditionallyEnabledInterceptor {
fn if_enabled(&self, cfg: &ConfigBag) -> Option<&dyn Interceptor> {
if self.0.enabled(cfg) {
Some(self.0.as_ref())
} else {
None
}
}
}
pub struct MapRequestInterceptor<F, E> {
f: F,
_phantom: PhantomData<E>,
@ -86,3 +317,71 @@ where
Ok(())
}
}
#[cfg(all(test, feature = "test-util"))]
mod tests {
use super::*;
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::interceptors::context::{
BeforeTransmitInterceptorContextRef, Input, InterceptorContext,
};
use aws_smithy_runtime_api::client::interceptors::{
disable_interceptor, Interceptor, SharedInterceptor,
};
use aws_smithy_runtime_api::client::runtime_components::{
RuntimeComponents, RuntimeComponentsBuilder,
};
use aws_smithy_types::config_bag::ConfigBag;
#[derive(Debug)]
struct TestInterceptor;
impl Interceptor for TestInterceptor {}
#[test]
fn test_disable_interceptors() {
#[derive(Debug)]
struct PanicInterceptor;
impl Interceptor for PanicInterceptor {
fn read_before_transmit(
&self,
_context: &BeforeTransmitInterceptorContextRef<'_>,
_rc: &RuntimeComponents,
_cfg: &mut ConfigBag,
) -> Result<(), BoxError> {
Err("boom".into())
}
}
let rc = RuntimeComponentsBuilder::for_tests()
.with_interceptor(SharedInterceptor::new(PanicInterceptor))
.with_interceptor(SharedInterceptor::new(TestInterceptor))
.build()
.unwrap();
let mut cfg = ConfigBag::base();
let interceptors = Interceptors::new(rc.interceptors());
assert_eq!(
interceptors
.into_iter()
.filter(|i| i.if_enabled(&cfg).is_some())
.count(),
2
);
Interceptors::new(rc.interceptors())
.read_before_transmit(&InterceptorContext::new(Input::new(5)), &rc, &mut cfg)
.expect_err("interceptor returns error");
cfg.interceptor_state()
.store_put(disable_interceptor::<PanicInterceptor>("test"));
assert_eq!(
Interceptors::new(rc.interceptors())
.into_iter()
.filter(|i| i.if_enabled(&cfg).is_some())
.count(),
1
);
// shouldn't error because interceptors won't run
Interceptors::new(rc.interceptors())
.read_before_transmit(&InterceptorContext::new(Input::new(5)), &rc, &mut cfg)
.expect("interceptor is now disabled");
}
}

View File

@ -7,6 +7,7 @@
#![allow(unknown_lints)]
use self::auth::orchestrate_auth;
use crate::client::interceptors::Interceptors;
use crate::client::orchestrator::endpoints::orchestrate_endpoint;
use crate::client::orchestrator::http::read_body;
use crate::client::timeout::{MaybeTimeout, MaybeTimeoutConfig, TimeoutKind};
@ -19,7 +20,6 @@ use aws_smithy_runtime_api::client::connectors::HttpConnector;
use aws_smithy_runtime_api::client::interceptors::context::{
Error, Input, InterceptorContext, Output, RewindResult,
};
use aws_smithy_runtime_api::client::interceptors::Interceptors;
use aws_smithy_runtime_api::client::orchestrator::{
HttpResponse, LoadedRequestBody, OrchestratorError,
};
@ -200,7 +200,10 @@ async fn try_op(
debug!("loading request body into memory");
let mut body = SdkBody::taken();
mem::swap(&mut body, ctx.request_mut().expect("set above").body_mut());
let loaded_body = halt_on_err!([ctx] => ByteStream::new(body).collect().await).into_bytes();
let loaded_body = halt_on_err!([ctx] =>
ByteStream::new(body).collect().await.map_err(OrchestratorError::other)
)
.into_bytes();
*ctx.request_mut().as_mut().expect("set above").body_mut() =
SdkBody::from(loaded_body.clone());
cfg.interceptor_state()

View File

@ -4,7 +4,6 @@
*/
use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext;
use aws_smithy_runtime_api::client::orchestrator::OrchestratorError;
use aws_smithy_runtime_api::client::retries::{ClassifyRetry, RetryReason};
use aws_smithy_types::retry::{ErrorKind, ProvideErrorKind};
use std::borrow::Cow;
@ -77,17 +76,16 @@ where
Err(err) => err,
};
match error {
OrchestratorError::Response { .. } | OrchestratorError::Timeout { .. } => {
if error.is_response_error() || error.is_timeout_error() {
Some(RetryReason::Error(ErrorKind::TransientError))
} else if let Some(error) = error.as_connector_error() {
if error.is_timeout() || error.is_io() {
Some(RetryReason::Error(ErrorKind::TransientError))
} else {
error.as_other().map(RetryReason::Error)
}
OrchestratorError::Connector { err } if err.is_timeout() || err.is_io() => {
Some(RetryReason::Error(ErrorKind::TransientError))
}
OrchestratorError::Connector { err } if err.is_other().is_some() => {
err.is_other().map(RetryReason::Error)
}
_ => None,
} else {
None
}
}