Simplify default config and add default async sleep (#3071)

While implementing identity caching, I noticed we don't have a default
async sleep impl wired up for generic clients, which was causing caching
to panic in many tests since it needs a sleep impl for timeouts. I
likely need to figure out what to do other than panic if there's no
sleep impl, but that's a problem for a different PR.

This PR adds a default sleep impl to generic clients, and also
simplifies how default config works. Previously, the generated config
`Builder::build` method set all the defaults with a series of "if not
set, then set default" statements. In this PR, defaults are registered
via default ordered runtime plugins.

Additionally, I cleaned up the standard retry strategy:
- The `TokenBucketPartition` didn't appear to be used at all, so I
deleted it.
- `StandardRetryStrategy` was taking retry config at construction, which
means it isn't possible to config override the retry config unless the
strategy is recreated. It now doesn't take any config at all during
construction.
- The adaptive retry client rate limiter was created at construction
based on retry config at that point in time. This means config overrides
wouldn't work with it, so it is also no longer set up at construction
time.
- Removed some unused runtime plugins.

## Checklist
<!--- If a checkbox below is not applicable, then please DELETE it
rather than leaving it unchecked -->
- [ ] I have updated `CHANGELOG.next.toml` if I made changes to the
smithy-rs codegen or runtime crates
- [ ] I have updated `CHANGELOG.next.toml` if I made changes to the AWS
SDK, generated SDK code, or SDK runtime crates

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._
This commit is contained in:
John DiSanti 2023-10-17 15:15:51 -07:00 committed by GitHub
parent d48acae8a2
commit bcfc211277
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 352 additions and 331 deletions

View File

@ -238,13 +238,14 @@ impl ImdsCommonRuntimePlugin {
fn new(
config: &ProviderConfig,
endpoint_resolver: ImdsEndpointResolver,
retry_config: &RetryConfig,
retry_config: RetryConfig,
timeout_config: TimeoutConfig,
) -> Self {
let mut layer = Layer::new("ImdsCommonRuntimePlugin");
layer.store_put(AuthSchemeOptionResolverParams::new(()));
layer.store_put(EndpointResolverParams::new(()));
layer.store_put(SensitiveOutput);
layer.store_put(retry_config);
layer.store_put(timeout_config);
layer.store_put(user_agent());
@ -255,7 +256,7 @@ impl ImdsCommonRuntimePlugin {
.with_endpoint_resolver(Some(endpoint_resolver))
.with_interceptor(UserAgentInterceptor::new())
.with_retry_classifier(SharedRetryClassifier::new(ImdsResponseRetryClassifier))
.with_retry_strategy(Some(StandardRetryStrategy::new(retry_config)))
.with_retry_strategy(Some(StandardRetryStrategy::new()))
.with_time_source(Some(config.time_source()))
.with_sleep_impl(config.sleep_impl()),
}
@ -423,7 +424,7 @@ impl Builder {
let common_plugin = SharedRuntimePlugin::new(ImdsCommonRuntimePlugin::new(
&config,
endpoint_resolver,
&retry_config,
retry_config,
timeout_config,
));
let operation = Operation::builder()

View File

@ -7,20 +7,16 @@ package software.amazon.smithy.rust.codegen.client.smithy.customizations
import software.amazon.smithy.rust.codegen.client.smithy.ClientCodegenContext
import software.amazon.smithy.rust.codegen.client.smithy.ClientRustModule
import software.amazon.smithy.rust.codegen.client.smithy.generators.ServiceRuntimePluginCustomization
import software.amazon.smithy.rust.codegen.client.smithy.generators.ServiceRuntimePluginSection
import software.amazon.smithy.rust.codegen.client.smithy.generators.config.ConfigCustomization
import software.amazon.smithy.rust.codegen.client.smithy.generators.config.ServiceConfig
import software.amazon.smithy.rust.codegen.core.rustlang.Attribute
import software.amazon.smithy.rust.codegen.core.rustlang.Writable
import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.writable
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.preludeScope
import software.amazon.smithy.rust.codegen.core.smithy.RustCrate
import software.amazon.smithy.rust.codegen.core.util.sdkId
class ResiliencyConfigCustomization(private val codegenContext: ClientCodegenContext) : ConfigCustomization() {
class ResiliencyConfigCustomization(codegenContext: ClientCodegenContext) : ConfigCustomization() {
private val runtimeConfig = codegenContext.runtimeConfig
private val retryConfig = RuntimeType.smithyTypes(runtimeConfig).resolve("retry")
private val sleepModule = RuntimeType.smithyAsync(runtimeConfig).resolve("rt::sleep")
@ -44,8 +40,6 @@ class ResiliencyConfigCustomization(private val codegenContext: ClientCodegenCon
"StandardRetryStrategy" to retries.resolve("strategy::StandardRetryStrategy"),
"SystemTime" to RuntimeType.std.resolve("time::SystemTime"),
"TimeoutConfig" to timeoutModule.resolve("TimeoutConfig"),
"TokenBucket" to retries.resolve("TokenBucket"),
"TokenBucketPartition" to retries.resolve("TokenBucketPartition"),
)
override fun section(section: ServiceConfig) =
@ -281,57 +275,6 @@ class ResiliencyConfigCustomization(private val codegenContext: ClientCodegenCon
)
}
is ServiceConfig.BuilderBuild -> {
rustTemplate(
"""
if layer.load::<#{RetryConfig}>().is_none() {
layer.store_put(#{RetryConfig}::disabled());
}
let retry_config = layer.load::<#{RetryConfig}>().expect("set to default above").clone();
if layer.load::<#{RetryPartition}>().is_none() {
layer.store_put(#{RetryPartition}::new("${codegenContext.serviceShape.sdkId()}"));
}
let retry_partition = layer.load::<#{RetryPartition}>().expect("set to default above").clone();
if retry_config.has_retry() {
#{debug}!("using retry strategy with partition '{}'", retry_partition);
}
if retry_config.mode() == #{RetryMode}::Adaptive {
if let #{Some}(time_source) = self.runtime_components.time_source() {
let seconds_since_unix_epoch = time_source
.now()
.duration_since(#{SystemTime}::UNIX_EPOCH)
.expect("the present takes place after the UNIX_EPOCH")
.as_secs_f64();
let client_rate_limiter_partition = #{ClientRateLimiterPartition}::new(retry_partition.clone());
let client_rate_limiter = CLIENT_RATE_LIMITER.get_or_init(client_rate_limiter_partition, || {
#{ClientRateLimiter}::new(seconds_since_unix_epoch)
});
layer.store_put(client_rate_limiter);
}
}
// The token bucket is used for both standard AND adaptive retries.
let token_bucket_partition = #{TokenBucketPartition}::new(retry_partition);
let token_bucket = TOKEN_BUCKET.get_or_init(token_bucket_partition, #{TokenBucket}::default);
layer.store_put(token_bucket);
// TODO(enableNewSmithyRuntimeCleanup): Should not need to provide a default once smithy-rs##2770
// is resolved
if layer.load::<#{TimeoutConfig}>().is_none() {
layer.store_put(#{TimeoutConfig}::disabled());
}
self.runtime_components.set_retry_strategy(#{Some}(
#{SharedRetryStrategy}::new(#{StandardRetryStrategy}::new(&retry_config)))
);
""",
*codegenScope,
)
}
else -> emptySection
}
}
@ -366,32 +309,3 @@ class ResiliencyReExportCustomization(codegenContext: ClientCodegenContext) {
}
}
}
class ResiliencyServiceRuntimePluginCustomization(codegenContext: ClientCodegenContext) : ServiceRuntimePluginCustomization() {
private val runtimeConfig = codegenContext.runtimeConfig
private val smithyRuntime = RuntimeType.smithyRuntime(runtimeConfig)
private val retries = smithyRuntime.resolve("client::retries")
private val codegenScope = arrayOf(
"TokenBucket" to retries.resolve("TokenBucket"),
"TokenBucketPartition" to retries.resolve("TokenBucketPartition"),
"ClientRateLimiter" to retries.resolve("ClientRateLimiter"),
"ClientRateLimiterPartition" to retries.resolve("ClientRateLimiterPartition"),
"StaticPartitionMap" to smithyRuntime.resolve("static_partition_map::StaticPartitionMap"),
)
override fun section(section: ServiceRuntimePluginSection): Writable = writable {
when (section) {
is ServiceRuntimePluginSection.DeclareSingletons -> {
rustTemplate(
"""
static TOKEN_BUCKET: #{StaticPartitionMap}<#{TokenBucketPartition}, #{TokenBucket}> = #{StaticPartitionMap}::new();
static CLIENT_RATE_LIMITER: #{StaticPartitionMap}<#{ClientRateLimiterPartition}, #{ClientRateLimiter}> = #{StaticPartitionMap}::new();
""",
*codegenScope,
)
}
else -> emptySection
}
}
}

View File

@ -14,7 +14,6 @@ import software.amazon.smithy.rust.codegen.client.smithy.customizations.Intercep
import software.amazon.smithy.rust.codegen.client.smithy.customizations.MetadataCustomization
import software.amazon.smithy.rust.codegen.client.smithy.customizations.ResiliencyConfigCustomization
import software.amazon.smithy.rust.codegen.client.smithy.customizations.ResiliencyReExportCustomization
import software.amazon.smithy.rust.codegen.client.smithy.customizations.ResiliencyServiceRuntimePluginCustomization
import software.amazon.smithy.rust.codegen.client.smithy.customizations.RetryClassifierConfigCustomization
import software.amazon.smithy.rust.codegen.client.smithy.customizations.RetryClassifierOperationCustomization
import software.amazon.smithy.rust.codegen.client.smithy.customizations.RetryClassifierServiceRuntimePluginCustomization
@ -113,7 +112,6 @@ class RequiredCustomizations : ClientCodegenDecorator {
codegenContext: ClientCodegenContext,
baseCustomizations: List<ServiceRuntimePluginCustomization>,
): List<ServiceRuntimePluginCustomization> = baseCustomizations +
ResiliencyServiceRuntimePluginCustomization(codegenContext) +
ConnectionPoisoningRuntimePluginCustomization(codegenContext) +
RetryClassifierServiceRuntimePluginCustomization(codegenContext)
}

View File

@ -40,7 +40,6 @@ import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.stripOuter
import software.amazon.smithy.rust.codegen.core.rustlang.withBlockTemplate
import software.amazon.smithy.rust.codegen.core.rustlang.writable
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType.Companion.preludeScope
import software.amazon.smithy.rust.codegen.core.smithy.RustCrate
@ -50,9 +49,11 @@ import software.amazon.smithy.rust.codegen.core.smithy.expectRustMetadata
import software.amazon.smithy.rust.codegen.core.smithy.generators.getterName
import software.amazon.smithy.rust.codegen.core.smithy.generators.setterName
import software.amazon.smithy.rust.codegen.core.smithy.rustType
import software.amazon.smithy.rust.codegen.core.util.dq
import software.amazon.smithy.rust.codegen.core.util.inputShape
import software.amazon.smithy.rust.codegen.core.util.orNull
import software.amazon.smithy.rust.codegen.core.util.outputShape
import software.amazon.smithy.rust.codegen.core.util.sdkId
import software.amazon.smithy.rust.codegen.core.util.toSnakeCase
class FluentClientGenerator(
@ -161,7 +162,7 @@ class FluentClientGenerator(
}
""",
*clientScope,
"base_client_runtime_plugins" to baseClientRuntimePluginsFn(runtimeConfig),
"base_client_runtime_plugins" to baseClientRuntimePluginsFn(codegenContext),
)
}
@ -446,8 +447,10 @@ class FluentClientGenerator(
}
}
private fun baseClientRuntimePluginsFn(runtimeConfig: RuntimeConfig): RuntimeType =
private fun baseClientRuntimePluginsFn(codegenContext: ClientCodegenContext): RuntimeType = codegenContext.runtimeConfig.let { rc ->
RuntimeType.forInlineFun("base_client_runtime_plugins", ClientRustModule.config) {
val api = RuntimeType.smithyRuntimeApi(rc)
val rt = RuntimeType.smithyRuntime(rc)
rustTemplate(
"""
pub(crate) fn base_client_runtime_plugins(
@ -455,13 +458,25 @@ private fun baseClientRuntimePluginsFn(runtimeConfig: RuntimeConfig): RuntimeTyp
) -> #{RuntimePlugins} {
let mut configured_plugins = #{Vec}::new();
::std::mem::swap(&mut config.runtime_plugins, &mut configured_plugins);
let defaults = [
#{default_http_client_plugin}(),
#{default_retry_config_plugin}(${codegenContext.serviceShape.sdkId().dq()}),
#{default_sleep_impl_plugin}(),
#{default_time_source_plugin}(),
#{default_timeout_config_plugin}(),
].into_iter().flatten();
let mut plugins = #{RuntimePlugins}::new()
.with_client_plugin(#{default_http_client_plugin}())
// defaults
.with_client_plugins(defaults)
// user config
.with_client_plugin(
#{StaticRuntimePlugin}::new()
.with_config(config.config.clone())
.with_runtime_components(config.runtime_components.clone())
)
// codegen config
.with_client_plugin(crate::config::ServiceRuntimePlugin::new(config))
.with_client_plugin(#{NoAuthRuntimePlugin}::new());
for plugin in configured_plugins {
@ -471,15 +486,17 @@ private fun baseClientRuntimePluginsFn(runtimeConfig: RuntimeConfig): RuntimeTyp
}
""",
*preludeScope,
"RuntimePlugins" to RuntimeType.runtimePlugins(runtimeConfig),
"NoAuthRuntimePlugin" to RuntimeType.smithyRuntime(runtimeConfig)
.resolve("client::auth::no_auth::NoAuthRuntimePlugin"),
"StaticRuntimePlugin" to RuntimeType.smithyRuntimeApi(runtimeConfig)
.resolve("client::runtime_plugin::StaticRuntimePlugin"),
"default_http_client_plugin" to RuntimeType.smithyRuntime(runtimeConfig)
.resolve("client::http::default_http_client_plugin"),
"default_http_client_plugin" to rt.resolve("client::defaults::default_http_client_plugin"),
"default_retry_config_plugin" to rt.resolve("client::defaults::default_retry_config_plugin"),
"default_sleep_impl_plugin" to rt.resolve("client::defaults::default_sleep_impl_plugin"),
"default_timeout_config_plugin" to rt.resolve("client::defaults::default_timeout_config_plugin"),
"default_time_source_plugin" to rt.resolve("client::defaults::default_time_source_plugin"),
"NoAuthRuntimePlugin" to rt.resolve("client::auth::no_auth::NoAuthRuntimePlugin"),
"RuntimePlugins" to RuntimeType.runtimePlugins(rc),
"StaticRuntimePlugin" to api.resolve("client::runtime_plugin::StaticRuntimePlugin"),
)
}
}
/**
* For a given `operation` shape, return a list of strings where each string describes the name and input type of one of

View File

@ -45,7 +45,7 @@ internal class ResiliencyConfigCustomizationTest {
project.withModule(ClientRustModule.config) {
ServiceRuntimePluginGenerator(codegenContext).render(
this,
listOf(ResiliencyServiceRuntimePluginCustomization(codegenContext)),
emptyList(),
)
}
ResiliencyReExportCustomization(codegenContext).extras(project)

View File

@ -8,6 +8,7 @@ package software.amazon.smithy.rust.codegen.client.smithy.customizations
import org.junit.jupiter.api.Test
import software.amazon.smithy.rust.codegen.client.testutil.clientIntegrationTest
import software.amazon.smithy.rust.codegen.core.rustlang.Attribute
import software.amazon.smithy.rust.codegen.core.rustlang.CargoDependency
import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeConfig
import software.amazon.smithy.rust.codegen.core.smithy.RuntimeType
@ -16,6 +17,8 @@ import software.amazon.smithy.rust.codegen.core.testutil.integrationTest
class SensitiveOutputDecoratorTest {
private fun codegenScope(runtimeConfig: RuntimeConfig): Array<Pair<String, Any>> = arrayOf(
"capture_test_logs" to CargoDependency.smithyRuntimeTestUtil(runtimeConfig).toType()
.resolve("test_util::capture_test_logs::capture_test_logs"),
"capture_request" to RuntimeType.captureRequest(runtimeConfig),
"SdkBody" to RuntimeType.sdkBody(runtimeConfig),
)
@ -48,10 +51,10 @@ class SensitiveOutputDecoratorTest {
rustCrate.integrationTest("redacting_sensitive_response_body") {
val moduleName = codegenContext.moduleUseName()
Attribute.TokioTest.render(this)
Attribute.TracedTest.render(this)
rustTemplate(
"""
async fn redacting_sensitive_response_body() {
let (_logs, logs_rx) = #{capture_test_logs}();
let (http_client, _r) = #{capture_request}(Some(
http::Response::builder()
.status(200)
@ -69,7 +72,8 @@ class SensitiveOutputDecoratorTest {
.await
.expect("success");
assert!(logs_contain("** REDACTED **"));
let log_contents = logs_rx.contents();
assert!(log_contents.contains("** REDACTED **"));
}
""",
*codegenScope(codegenContext.runtimeConfig),

View File

@ -142,7 +142,7 @@ internal class ConfigOverrideRuntimePluginGeneratorTest {
}
@Test
fun `operation overrides retry strategy`() {
fun `operation overrides retry config`() {
clientIntegrationTest(model) { clientCodegenContext, rustCrate ->
val runtimeConfig = clientCodegenContext.runtimeConfig
val codegenScope = arrayOf(
@ -164,11 +164,13 @@ internal class ConfigOverrideRuntimePluginGeneratorTest {
.resolve("client::retries::RetryClassifiers"),
"RuntimeComponentsBuilder" to RuntimeType.runtimeComponentsBuilder(runtimeConfig),
"RuntimePlugin" to RuntimeType.runtimePlugin(runtimeConfig),
"StandardRetryStrategy" to RuntimeType.smithyRuntime(runtimeConfig)
.resolve("client::retries::strategy::StandardRetryStrategy"),
"ShouldAttempt" to RuntimeType.smithyRuntimeApi(runtimeConfig)
.resolve("client::retries::ShouldAttempt"),
)
rustCrate.testModule {
unitTest("test_operation_overrides_retry_strategy") {
unitTest("test_operation_overrides_retry_config") {
rustTemplate(
"""
use #{RuntimePlugin};
@ -193,6 +195,8 @@ internal class ConfigOverrideRuntimePluginGeneratorTest {
// Emulate the merging of runtime components from runtime plugins that the orchestrator does
let runtime_components = #{RuntimeComponentsBuilder}::for_tests()
// emulate the default retry config plugin by setting a retry strategy
.with_retry_strategy(#{Some}(#{StandardRetryStrategy}::new()))
.merge_from(&client_config.runtime_components)
.merge_from(&retry_classifiers_component)
.build()
@ -219,6 +223,8 @@ internal class ConfigOverrideRuntimePluginGeneratorTest {
// Emulate the merging of runtime components from runtime plugins that the orchestrator does
let runtime_components = #{RuntimeComponentsBuilder}::for_tests()
// emulate the default retry config plugin by setting a retry strategy
.with_retry_strategy(#{Some}(#{StandardRetryStrategy}::new()))
.merge_from(&client_config.runtime_components)
.merge_from(&retry_classifiers_component)
.merge_from(&config_override.runtime_components)

View File

@ -524,7 +524,6 @@ class Attribute(val inner: Writable, val isDeriveHelper: Boolean = false) {
val Test = Attribute("test")
val TokioTest = Attribute(RuntimeType.Tokio.resolve("test").writable)
val TracedTest = Attribute(RuntimeType.TracingTest.resolve("traced_test").writable)
val AwsSdkUnstableAttribute = Attribute(cfg("aws_sdk_unstable"))
/**

View File

@ -236,10 +236,22 @@ pub struct RuntimePlugins {
}
impl RuntimePlugins {
/// Create a new empty set of runtime plugins.
pub fn new() -> Self {
Default::default()
}
/// Add several client-level runtime plugins from an iterator.
pub fn with_client_plugins(
mut self,
plugins: impl IntoIterator<Item = SharedRuntimePlugin>,
) -> Self {
for plugin in plugins.into_iter() {
self = self.with_client_plugin(plugin);
}
self
}
/// Adds a client-level runtime plugin.
pub fn with_client_plugin(mut self, plugin: impl RuntimePlugin + 'static) -> Self {
insert_plugin!(
@ -249,6 +261,17 @@ impl RuntimePlugins {
self
}
/// Add several operation-level runtime plugins from an iterator.
pub fn with_operation_plugins(
mut self,
plugins: impl IntoIterator<Item = SharedRuntimePlugin>,
) -> Self {
for plugin in plugins.into_iter() {
self = self.with_operation_plugin(plugin);
}
self
}
/// Adds an operation-level runtime plugin.
pub fn with_operation_plugin(mut self, plugin: impl RuntimePlugin + 'static) -> Self {
insert_plugin!(
@ -258,6 +281,7 @@ impl RuntimePlugins {
self
}
/// Apply the client-level runtime plugins' config to the given config bag.
pub fn apply_client_configuration(
&self,
cfg: &mut ConfigBag,
@ -265,6 +289,7 @@ impl RuntimePlugins {
apply_plugins!(client, self.client_plugins, cfg)
}
/// Apply the operation-level runtime plugins' config to the given config bag.
pub fn apply_operation_configuration(
&self,
cfg: &mut ConfigBag,

View File

@ -6,6 +6,8 @@
/// Smithy auth scheme implementations.
pub mod auth;
pub mod defaults;
pub mod dns;
/// Built-in Smithy HTTP clients and connectors.

View File

@ -0,0 +1,104 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
//! Runtime plugins that provide defaults for clients.
//!
//! Note: these are the absolute base-level defaults. They may not be the defaults
//! for _your_ client, since many things can change these defaults on the way to
//! code generating and constructing a full client.
use crate::client::retries::strategy::StandardRetryStrategy;
use crate::client::retries::RetryPartition;
use aws_smithy_async::rt::sleep::default_async_sleep;
use aws_smithy_async::time::SystemTimeSource;
use aws_smithy_runtime_api::client::http::SharedHttpClient;
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
use aws_smithy_runtime_api::client::runtime_plugin::{
Order, SharedRuntimePlugin, StaticRuntimePlugin,
};
use aws_smithy_runtime_api::shared::IntoShared;
use aws_smithy_types::config_bag::{FrozenLayer, Layer};
use aws_smithy_types::retry::RetryConfig;
use aws_smithy_types::timeout::TimeoutConfig;
use std::borrow::Cow;
fn default_plugin<CompFn>(name: &'static str, components_fn: CompFn) -> StaticRuntimePlugin
where
CompFn: FnOnce(RuntimeComponentsBuilder) -> RuntimeComponentsBuilder,
{
StaticRuntimePlugin::new()
.with_order(Order::Defaults)
.with_runtime_components((components_fn)(RuntimeComponentsBuilder::new(name)))
}
fn layer<LayerFn>(name: &'static str, layer_fn: LayerFn) -> FrozenLayer
where
LayerFn: FnOnce(&mut Layer),
{
let mut layer = Layer::new(name);
(layer_fn)(&mut layer);
layer.freeze()
}
/// Runtime plugin that provides a default connector.
pub fn default_http_client_plugin() -> Option<SharedRuntimePlugin> {
let _default: Option<SharedHttpClient> = None;
#[cfg(feature = "connector-hyper-0-14-x")]
let _default = crate::client::http::hyper_014::default_client();
_default.map(|default| {
default_plugin("default_http_client_plugin", |components| {
components.with_http_client(Some(default))
})
.into_shared()
})
}
/// Runtime plugin that provides a default async sleep implementation.
pub fn default_sleep_impl_plugin() -> Option<SharedRuntimePlugin> {
default_async_sleep().map(|default| {
default_plugin("default_sleep_impl_plugin", |components| {
components.with_sleep_impl(Some(default))
})
.into_shared()
})
}
/// Runtime plugin that provides a default time source.
pub fn default_time_source_plugin() -> Option<SharedRuntimePlugin> {
Some(
default_plugin("default_time_source_plugin", |components| {
components.with_time_source(Some(SystemTimeSource::new()))
})
.into_shared(),
)
}
/// Runtime plugin that sets the default retry strategy, config (disabled), and partition.
pub fn default_retry_config_plugin(
default_partition_name: impl Into<Cow<'static, str>>,
) -> Option<SharedRuntimePlugin> {
Some(
default_plugin("default_retry_config_plugin", |components| {
components.with_retry_strategy(Some(StandardRetryStrategy::new()))
})
.with_config(layer("default_retry_config", |layer| {
layer.store_put(RetryConfig::disabled());
layer.store_put(RetryPartition::new(default_partition_name));
}))
.into_shared(),
)
}
/// Runtime plugin that sets the default timeout config (no timeouts).
pub fn default_timeout_config_plugin() -> Option<SharedRuntimePlugin> {
Some(
default_plugin("default_timeout_config_plugin", |c| c)
.with_config(layer("default_timeout_config", |layer| {
layer.store_put(TimeoutConfig::disabled());
}))
.into_shared(),
)
}

View File

@ -3,12 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/
use aws_smithy_runtime_api::client::http::SharedHttpClient;
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
use aws_smithy_runtime_api::client::runtime_plugin::{
Order, SharedRuntimePlugin, StaticRuntimePlugin,
};
/// Interceptor for connection poisoning.
pub mod connection_poisoning;
@ -21,17 +15,3 @@ pub mod test_util;
/// needing to provide equivalent functionality for hyper 1.x in the future.
#[cfg(feature = "connector-hyper-0-14-x")]
pub mod hyper_014;
/// Runtime plugin that provides a default connector. Intended to be used by the generated code.
pub fn default_http_client_plugin() -> SharedRuntimePlugin {
let _default: Option<SharedHttpClient> = None;
#[cfg(feature = "connector-hyper-0-14-x")]
let _default = hyper_014::default_client();
let plugin = StaticRuntimePlugin::new()
.with_order(Order::Defaults)
.with_runtime_components(
RuntimeComponentsBuilder::new("default_http_client_plugin").with_http_client(_default),
);
SharedRuntimePlugin::new(plugin)
}

View File

@ -4,8 +4,11 @@
*/
use crate::client::auth::no_auth::{NoAuthScheme, NO_AUTH_SCHEME_ID};
use crate::client::defaults::{
default_http_client_plugin, default_retry_config_plugin, default_sleep_impl_plugin,
default_time_source_plugin, default_timeout_config_plugin,
};
use crate::client::http::connection_poisoning::ConnectionPoisoningInterceptor;
use crate::client::http::default_http_client_plugin;
use crate::client::identity::no_auth::NoAuthIdentityResolver;
use crate::client::orchestrator::endpoints::StaticUriEndpointResolver;
use crate::client::retries::strategy::{NeverRetryStrategy, StandardRetryStrategy};
@ -222,9 +225,7 @@ impl<I, O, E> OperationBuilder<I, O, E> {
pub fn standard_retry(mut self, retry_config: &RetryConfig) -> Self {
self.config.store_put(retry_config.clone());
self.runtime_components
.set_retry_strategy(Some(SharedRetryStrategy::new(StandardRetryStrategy::new(
retry_config,
))));
.set_retry_strategy(Some(SharedRetryStrategy::new(StandardRetryStrategy::new())));
self
}
@ -323,8 +324,19 @@ impl<I, O, E> OperationBuilder<I, O, E> {
pub fn build(self) -> Operation<I, O, E> {
let service_name = self.service_name.expect("service_name required");
let operation_name = self.operation_name.expect("operation_name required");
let defaults = [
default_http_client_plugin(),
default_retry_config_plugin(service_name.clone()),
default_sleep_impl_plugin(),
default_time_source_plugin(),
default_timeout_config_plugin(),
]
.into_iter()
.flatten();
let mut runtime_plugins = RuntimePlugins::new()
.with_client_plugin(default_http_client_plugin())
.with_client_plugins(defaults)
.with_client_plugin(
StaticRuntimePlugin::new()
.with_config(self.config.freeze())

View File

@ -15,30 +15,29 @@ mod token_bucket;
use aws_smithy_types::config_bag::{Storable, StoreReplace};
use std::fmt;
pub use client_rate_limiter::{ClientRateLimiter, ClientRateLimiterRuntimePlugin};
pub use token_bucket::{TokenBucket, TokenBucketRuntimePlugin};
pub use client_rate_limiter::ClientRateLimiter;
pub use token_bucket::TokenBucket;
#[doc(hidden)]
pub use client_rate_limiter::ClientRateLimiterPartition;
#[doc(hidden)]
pub use token_bucket::TokenBucketPartition;
use std::borrow::Cow;
#[doc(hidden)]
#[non_exhaustive]
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct RetryPartition {
inner: &'static str,
name: Cow<'static, str>,
}
impl RetryPartition {
pub fn new(name: &'static str) -> Self {
Self { inner: name }
pub fn new(name: impl Into<Cow<'static, str>>) -> Self {
Self { name: name.into() }
}
}
impl fmt::Display for RetryPartition {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.inner)
f.write_str(&self.name)
}
}

View File

@ -9,38 +9,11 @@
#![allow(dead_code)]
use crate::client::retries::RetryPartition;
use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugin;
use aws_smithy_runtime_api::{builder, builder_methods, builder_struct};
use aws_smithy_types::config_bag::{FrozenLayer, Layer, Storable, StoreReplace};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing::debug;
/// A [`RuntimePlugin`] to provide a client rate limiter, usable by a retry strategy.
#[non_exhaustive]
#[derive(Debug)]
pub struct ClientRateLimiterRuntimePlugin {
rate_limiter: ClientRateLimiter,
}
impl ClientRateLimiterRuntimePlugin {
/// Create a new [`ClientRateLimiterRuntimePlugin`].
pub fn new(seconds_since_unix_epoch: f64) -> Self {
Self {
rate_limiter: ClientRateLimiter::new(seconds_since_unix_epoch),
}
}
}
impl RuntimePlugin for ClientRateLimiterRuntimePlugin {
fn config(&self) -> Option<FrozenLayer> {
let mut cfg = Layer::new("client rate limiter");
cfg.store_put(self.rate_limiter.clone());
Some(cfg.freeze())
}
}
#[doc(hidden)]
#[non_exhaustive]
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
@ -104,10 +77,6 @@ pub(crate) enum RequestReason {
InitialRequest,
}
impl Storable for ClientRateLimiter {
type Storer = StoreReplace<Self>;
}
impl ClientRateLimiter {
/// Creates a new [`ClientRateLimiter`].
pub fn new(seconds_since_unix_epoch: f64) -> Self {

View File

@ -9,29 +9,26 @@ use crate::client::retries::strategy::standard::ReleaseResult::{
APermitWasReleased, NoPermitWasReleased,
};
use crate::client::retries::token_bucket::TokenBucket;
use crate::client::retries::{ClientRateLimiterPartition, RetryPartition};
use crate::static_partition_map::StaticPartitionMap;
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext;
use aws_smithy_runtime_api::client::retries::classifiers::{RetryAction, RetryReason};
use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_types::config_bag::{ConfigBag, Storable, StoreReplace};
use aws_smithy_types::retry::{ErrorKind, RetryConfig};
use aws_smithy_types::retry::{ErrorKind, RetryConfig, RetryMode};
use std::sync::Mutex;
use std::time::{Duration, SystemTime};
use tokio::sync::OwnedSemaphorePermit;
use tracing::debug;
// The initial attempt, plus three retries.
const DEFAULT_MAX_ATTEMPTS: u32 = 4;
static CLIENT_RATE_LIMITER: StaticPartitionMap<ClientRateLimiterPartition, ClientRateLimiter> =
StaticPartitionMap::new();
/// Retry strategy with exponential backoff, max attempts, and a token bucket.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct StandardRetryStrategy {
// Retry settings
base: fn() -> f64,
initial_backoff: Duration,
max_attempts: u32,
max_backoff: Duration,
retry_permit: Mutex<Option<OwnedSemaphorePermit>>,
}
@ -41,41 +38,8 @@ impl Storable for StandardRetryStrategy {
impl StandardRetryStrategy {
/// Create a new standard retry strategy with the given config.
pub fn new(retry_config: &RetryConfig) -> Self {
let base = if retry_config.use_static_exponential_base() {
|| 1.0
} else {
fastrand::f64
};
Self::default()
.with_base(base)
.with_max_backoff(retry_config.max_backoff())
.with_max_attempts(retry_config.max_attempts())
.with_initial_backoff(retry_config.initial_backoff())
}
/// Changes the exponential backoff base.
pub fn with_base(mut self, base: fn() -> f64) -> Self {
self.base = base;
self
}
/// Changes the max number of attempts.
pub fn with_max_attempts(mut self, max_attempts: u32) -> Self {
self.max_attempts = max_attempts;
self
}
/// Changes the initial backoff time.
pub fn with_initial_backoff(mut self, initial_backoff: Duration) -> Self {
self.initial_backoff = initial_backoff;
self
}
/// Changes the maximum backoff time.
pub fn with_max_backoff(mut self, max_backoff: Duration) -> Self {
self.max_backoff = max_backoff;
self
pub fn new() -> Self {
Default::default()
}
fn release_retry_permit(&self) -> ReleaseResult {
@ -98,10 +62,37 @@ impl StandardRetryStrategy {
}
}
/// Returns a [`ClientRateLimiter`] if adaptive retry is configured.
fn adaptive_retry_rate_limiter(
runtime_components: &RuntimeComponents,
cfg: &ConfigBag,
) -> Option<ClientRateLimiter> {
let retry_config = cfg.load::<RetryConfig>().expect("retry config is required");
if retry_config.mode() == RetryMode::Adaptive {
if let Some(time_source) = runtime_components.time_source() {
let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
let seconds_since_unix_epoch = time_source
.now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("the present takes place after the UNIX_EPOCH")
.as_secs_f64();
let client_rate_limiter_partition =
ClientRateLimiterPartition::new(retry_partition.clone());
let client_rate_limiter = CLIENT_RATE_LIMITER
.get_or_init(client_rate_limiter_partition, || {
ClientRateLimiter::new(seconds_since_unix_epoch)
});
return Some(client_rate_limiter);
}
}
None
}
fn calculate_backoff(
&self,
runtime_components: &RuntimeComponents,
cfg: &ConfigBag,
retry_cfg: &RetryConfig,
retry_reason: &RetryAction,
) -> Result<Duration, ShouldAttempt> {
let request_attempts = cfg
@ -119,13 +110,13 @@ impl StandardRetryStrategy {
);
if let Some(delay) = *retry_after {
let delay = delay.min(self.max_backoff);
let delay = delay.min(retry_cfg.max_backoff());
debug!("explicit request from server to delay {delay:?} before retrying");
Ok(delay)
} else if let Some(delay) =
check_rate_limiter_for_delay(runtime_components, cfg, *kind)
{
let delay = delay.min(self.max_backoff);
let delay = delay.min(retry_cfg.max_backoff());
debug!("rate limiter has requested a {delay:?} delay before retrying");
Ok(delay)
} else {
@ -139,23 +130,28 @@ impl StandardRetryStrategy {
}
}
let base = if retry_cfg.use_static_exponential_base() {
1.0
} else {
fastrand::f64()
};
let backoff = calculate_exponential_backoff(
// Generate a random base multiplier to create jitter
(self.base)(),
base,
// Get the backoff time multiplier in seconds (with fractional seconds)
self.initial_backoff.as_secs_f64(),
retry_cfg.initial_backoff().as_secs_f64(),
// `self.local.attempts` tracks number of requests made including the initial request
// The initial attempt shouldn't count towards backoff calculations so we subtract it
request_attempts - 1,
);
Ok(Duration::from_secs_f64(backoff).min(self.max_backoff))
Ok(Duration::from_secs_f64(backoff).min(retry_cfg.max_backoff()))
}
}
RetryAction::RetryForbidden | RetryAction::NoActionIndicated => {
update_rate_limiter_if_exists(runtime_components, cfg, false);
debug!(
attempts = request_attempts,
max_attempts = self.max_attempts,
max_attempts = retry_cfg.max_attempts(),
"encountered unretryable error"
);
Err(ShouldAttempt::No)
@ -170,26 +166,13 @@ enum ReleaseResult {
NoPermitWasReleased,
}
impl Default for StandardRetryStrategy {
fn default() -> Self {
Self {
max_attempts: DEFAULT_MAX_ATTEMPTS,
max_backoff: Duration::from_secs(20),
// by default, use a random base for exponential backoff
base: fastrand::f64,
initial_backoff: Duration::from_secs(1),
retry_permit: Mutex::new(None),
}
}
}
impl RetryStrategy for StandardRetryStrategy {
fn should_attempt_initial_request(
&self,
runtime_components: &RuntimeComponents,
cfg: &ConfigBag,
) -> Result<ShouldAttempt, BoxError> {
if let Some(crl) = cfg.load::<ClientRateLimiter>() {
if let Some(crl) = Self::adaptive_retry_rate_limiter(runtime_components, cfg) {
let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components);
if let Err(delay) = crl.acquire_permission_to_send_a_request(
seconds_since_unix_epoch,
@ -210,6 +193,7 @@ impl RetryStrategy for StandardRetryStrategy {
runtime_components: &RuntimeComponents,
cfg: &ConfigBag,
) -> Result<ShouldAttempt, BoxError> {
let retry_cfg = cfg.load::<RetryConfig>().expect("retry config is required");
// Look a the result. If it's OK then we're done; No retry required. Otherwise, we need to inspect it
let output_or_error = ctx.output_or_error().expect(
"This must never be called without reaching the point where the result exists.",
@ -237,12 +221,12 @@ impl RetryStrategy for StandardRetryStrategy {
.load::<RequestAttempts>()
.expect("at least one request attempt is made before any retry is attempted")
.attempts();
if request_attempts >= self.max_attempts {
if request_attempts >= retry_cfg.max_attempts() {
update_rate_limiter_if_exists(runtime_components, cfg, false);
debug!(
attempts = request_attempts,
max_attempts = self.max_attempts,
max_attempts = retry_cfg.max_attempts(),
"not retrying because we are out of attempts"
);
return Ok(ShouldAttempt::No);
@ -253,11 +237,12 @@ impl RetryStrategy for StandardRetryStrategy {
let classifier_result = run_classifiers_on_ctx(retry_classifiers, ctx);
// Calculate the appropriate backoff time.
let backoff = match self.calculate_backoff(runtime_components, cfg, &classifier_result) {
Ok(value) => value,
// In some cases, backoff calculation will decide that we shouldn't retry at all.
Err(value) => return Ok(value),
};
let backoff =
match self.calculate_backoff(runtime_components, cfg, retry_cfg, &classifier_result) {
Ok(value) => value,
// In some cases, backoff calculation will decide that we shouldn't retry at all.
Err(value) => return Ok(value),
};
debug!(
"attempt #{request_attempts} failed with {:?}; retrying after {:?}",
classifier_result, backoff,
@ -272,7 +257,7 @@ fn update_rate_limiter_if_exists(
cfg: &ConfigBag,
is_throttling_error: bool,
) {
if let Some(crl) = cfg.load::<ClientRateLimiter>() {
if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) {
let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components);
crl.update_rate_limiter(seconds_since_unix_epoch, is_throttling_error);
}
@ -283,7 +268,7 @@ fn check_rate_limiter_for_delay(
cfg: &ConfigBag,
kind: ErrorKind,
) -> Option<Duration> {
if let Some(crl) = cfg.load::<ClientRateLimiter>() {
if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) {
let retry_reason = if kind == ErrorKind::ThrottlingError {
RequestReason::RetryTimeout
} else {
@ -336,7 +321,11 @@ mod tests {
#[test]
fn no_retry_necessary_for_ok_result() {
let cfg = ConfigBag::base();
let cfg = ConfigBag::of_layers(vec![{
let mut layer = Layer::new("test");
layer.store_put(RetryConfig::standard());
layer
}]);
let rc = RuntimeComponentsBuilder::for_tests().build().unwrap();
let mut ctx = InterceptorContext::new(Input::doesnt_matter());
let strategy = StandardRetryStrategy::default();
@ -350,6 +339,7 @@ mod tests {
fn set_up_cfg_and_context(
error_kind: ErrorKind,
current_request_attempts: u32,
retry_config: RetryConfig,
) -> (InterceptorContext, RuntimeComponents, ConfigBag) {
let mut ctx = InterceptorContext::new(Input::doesnt_matter());
ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
@ -359,6 +349,7 @@ mod tests {
.unwrap();
let mut layer = Layer::new("test");
layer.store_put(RequestAttempts::new(current_request_attempts));
layer.store_put(retry_config);
let cfg = ConfigBag::of_layers(vec![layer]);
(ctx, rc, cfg)
@ -367,8 +358,14 @@ mod tests {
// Test that error kinds produce the correct "retry after X seconds" output.
// All error kinds are handled in the same way for the standard strategy.
fn test_should_retry_error_kind(error_kind: ErrorKind) {
let (ctx, rc, cfg) = set_up_cfg_and_context(error_kind, 3);
let strategy = StandardRetryStrategy::default().with_base(|| 1.0);
let (ctx, rc, cfg) = set_up_cfg_and_context(
error_kind,
3,
RetryConfig::standard()
.with_use_static_exponential_base(true)
.with_max_attempts(4),
);
let strategy = StandardRetryStrategy::new();
let actual = strategy
.should_attempt_retry(&ctx, &rc, &cfg)
.expect("method is infallible for this use");
@ -399,10 +396,14 @@ mod tests {
fn dont_retry_when_out_of_attempts() {
let current_attempts = 4;
let max_attempts = current_attempts;
let (ctx, rc, cfg) = set_up_cfg_and_context(ErrorKind::TransientError, current_attempts);
let strategy = StandardRetryStrategy::default()
.with_base(|| 1.0)
.with_max_attempts(max_attempts);
let (ctx, rc, cfg) = set_up_cfg_and_context(
ErrorKind::TransientError,
current_attempts,
RetryConfig::standard()
.with_use_static_exponential_base(true)
.with_max_attempts(max_attempts),
);
let strategy = StandardRetryStrategy::new();
let actual = strategy
.should_attempt_retry(&ctx, &rc, &cfg)
.expect("method is infallible for this use");
@ -471,6 +472,7 @@ mod tests {
#[cfg(feature = "test-util")]
fn setup_test(
retry_reasons: Vec<RetryAction>,
retry_config: RetryConfig,
) -> (ConfigBag, RuntimeComponents, InterceptorContext) {
let rc = RuntimeComponentsBuilder::for_tests()
.with_retry_classifier(SharedRetryClassifier::new(
@ -478,7 +480,9 @@ mod tests {
))
.build()
.unwrap();
let cfg = ConfigBag::base();
let mut layer = Layer::new("test");
layer.store_put(retry_config);
let cfg = ConfigBag::of_layers(vec![layer]);
let mut ctx = InterceptorContext::new(Input::doesnt_matter());
// This type doesn't matter b/c the classifier will just return whatever we tell it to.
ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
@ -489,10 +493,13 @@ mod tests {
#[cfg(feature = "test-util")]
#[test]
fn eventual_success() {
let (mut cfg, rc, mut ctx) = setup_test(vec![RetryAction::server_error()]);
let strategy = StandardRetryStrategy::default()
.with_base(|| 1.0)
.with_max_attempts(5);
let (mut cfg, rc, mut ctx) = setup_test(
vec![RetryAction::server_error()],
RetryConfig::standard()
.with_use_static_exponential_base(true)
.with_max_attempts(5),
);
let strategy = StandardRetryStrategy::new();
cfg.interceptor_state().store_put(TokenBucket::default());
let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
@ -519,10 +526,13 @@ mod tests {
#[cfg(feature = "test-util")]
#[test]
fn no_more_attempts() {
let (mut cfg, rc, ctx) = setup_test(vec![RetryAction::server_error()]);
let strategy = StandardRetryStrategy::default()
.with_base(|| 1.0)
.with_max_attempts(3);
let (mut cfg, rc, ctx) = setup_test(
vec![RetryAction::server_error()],
RetryConfig::standard()
.with_use_static_exponential_base(true)
.with_max_attempts(3),
);
let strategy = StandardRetryStrategy::new();
cfg.interceptor_state().store_put(TokenBucket::default());
let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
@ -547,10 +557,13 @@ mod tests {
#[cfg(feature = "test-util")]
#[test]
fn no_quota() {
let (mut cfg, rc, ctx) = setup_test(vec![RetryAction::server_error()]);
let strategy = StandardRetryStrategy::default()
.with_base(|| 1.0)
.with_max_attempts(5);
let (mut cfg, rc, ctx) = setup_test(
vec![RetryAction::server_error()],
RetryConfig::standard()
.with_use_static_exponential_base(true)
.with_max_attempts(5),
);
let strategy = StandardRetryStrategy::new();
cfg.interceptor_state().store_put(TokenBucket::new(5));
let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
@ -569,16 +582,19 @@ mod tests {
#[cfg(feature = "test-util")]
#[test]
fn quota_replenishes_on_success() {
let (mut cfg, rc, mut ctx) = setup_test(vec![
RetryAction::transient_error(),
RetryAction::retryable_error_with_explicit_delay(
ErrorKind::TransientError,
Duration::from_secs(1),
),
]);
let strategy = StandardRetryStrategy::default()
.with_base(|| 1.0)
.with_max_attempts(5);
let (mut cfg, rc, mut ctx) = setup_test(
vec![
RetryAction::transient_error(),
RetryAction::retryable_error_with_explicit_delay(
ErrorKind::TransientError,
Duration::from_secs(1),
),
],
RetryConfig::standard()
.with_use_static_exponential_base(true)
.with_max_attempts(5),
);
let strategy = StandardRetryStrategy::new();
cfg.interceptor_state().store_put(TokenBucket::new(100));
let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
@ -607,10 +623,13 @@ mod tests {
#[test]
fn quota_replenishes_on_first_try_success() {
const PERMIT_COUNT: usize = 20;
let (mut cfg, rc, mut ctx) = setup_test(vec![RetryAction::transient_error()]);
let strategy = StandardRetryStrategy::default()
.with_base(|| 1.0)
.with_max_attempts(u32::MAX);
let (mut cfg, rc, mut ctx) = setup_test(
vec![RetryAction::transient_error()],
RetryConfig::standard()
.with_use_static_exponential_base(true)
.with_max_attempts(u32::MAX),
);
let strategy = StandardRetryStrategy::new();
cfg.interceptor_state()
.store_put(TokenBucket::new(PERMIT_COUNT));
let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
@ -657,10 +676,13 @@ mod tests {
#[cfg(feature = "test-util")]
#[test]
fn backoff_timing() {
let (mut cfg, rc, ctx) = setup_test(vec![RetryAction::server_error()]);
let strategy = StandardRetryStrategy::default()
.with_base(|| 1.0)
.with_max_attempts(5);
let (mut cfg, rc, ctx) = setup_test(
vec![RetryAction::server_error()],
RetryConfig::standard()
.with_use_static_exponential_base(true)
.with_max_attempts(5),
);
let strategy = StandardRetryStrategy::new();
cfg.interceptor_state().store_put(TokenBucket::default());
let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
@ -697,12 +719,15 @@ mod tests {
#[cfg(feature = "test-util")]
#[test]
fn max_backoff_time() {
let (mut cfg, rc, ctx) = setup_test(vec![RetryAction::server_error()]);
let strategy = StandardRetryStrategy::default()
.with_base(|| 1.0)
.with_max_attempts(5)
.with_initial_backoff(Duration::from_secs(1))
.with_max_backoff(Duration::from_secs(3));
let (mut cfg, rc, ctx) = setup_test(
vec![RetryAction::server_error()],
RetryConfig::standard()
.with_use_static_exponential_base(true)
.with_max_attempts(5)
.with_initial_backoff(Duration::from_secs(1))
.with_max_backoff(Duration::from_secs(3)),
);
let strategy = StandardRetryStrategy::new();
cfg.interceptor_state().store_put(TokenBucket::default());
let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();

View File

@ -3,52 +3,12 @@
* SPDX-License-Identifier: Apache-2.0
*/
use crate::client::retries::RetryPartition;
use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugin;
use aws_smithy_types::config_bag::{FrozenLayer, Layer, Storable, StoreReplace};
use aws_smithy_types::config_bag::{Storable, StoreReplace};
use aws_smithy_types::retry::ErrorKind;
use std::sync::Arc;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::trace;
/// A [`RuntimePlugin`] to provide a token bucket, usable by a retry strategy.
#[non_exhaustive]
#[derive(Debug, Default)]
pub struct TokenBucketRuntimePlugin {
token_bucket: TokenBucket,
}
impl TokenBucketRuntimePlugin {
/// Creates a new `TokenBucketRuntimePlugin` with the given initial quota.
pub fn new(initial_tokens: usize) -> Self {
Self {
token_bucket: TokenBucket::new(initial_tokens),
}
}
}
impl RuntimePlugin for TokenBucketRuntimePlugin {
fn config(&self) -> Option<FrozenLayer> {
let mut cfg = Layer::new("standard token bucket");
cfg.store_put(self.token_bucket.clone());
Some(cfg.freeze())
}
}
#[doc(hidden)]
#[non_exhaustive]
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct TokenBucketPartition {
retry_partition: RetryPartition,
}
impl TokenBucketPartition {
pub fn new(retry_partition: RetryPartition) -> Self {
Self { retry_partition }
}
}
const DEFAULT_CAPACITY: usize = 500;
const RETRY_COST: u32 = 5;
const RETRY_TIMEOUT_COST: u32 = RETRY_COST * 2;

View File

@ -393,6 +393,12 @@ impl RetryConfig {
self
}
/// Set the maximum backoff time.
pub fn with_max_backoff(mut self, max_backoff: Duration) -> Self {
self.max_backoff = max_backoff;
self
}
/// Hint to the retry strategy whether to use a static exponential base.
///
/// When a retry strategy uses exponential backoff, it calculates a random base. This causes the