Merge smithy-rs-release-1.x.y back into main (#3248)

This commit is contained in:
AWS SDK Rust Bot 2023-11-22 14:33:36 -06:00 committed by GitHub
commit 8754c99ba8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 469 additions and 602 deletions

View File

@ -1,4 +1,9 @@
<!-- Do not manually edit this file. Use the `changelogger` tool. -->
November 21st, 2023
===================
**Internal changes only with this release**
November 17th, 2023
===================
**Breaking Changes:**

View File

@ -5,423 +5,6 @@
{
"smithy-rs": [],
"aws-sdk-rust": [
{
"message": "(Behavior Break!) The SSO credentials provider is no longer enabled by default in `aws-config`, and so SSO profile config will no longer work out of box. The `credentials-sso` feature in `aws-config` was removed from the default features, and renamed to `sso`. If you need credentials from SSO, then enable the `sso` feature in `aws-config`.",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "jdisanti",
"references": [
"smithy-rs#2917"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "The `SsoCredentialsProvider` now supports token refresh and is compatible with the token cache file paths the latest AWS CLI uses.",
"meta": {
"bug": false,
"breaking": false,
"tada": true
},
"author": "jdisanti",
"references": [
"smithy-rs#2917",
"aws-sdk-rust#703",
"aws-sdk-rust#699"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "HTTP connector configuration has changed significantly. See the [upgrade guidance](https://github.com/smithy-lang/smithy-rs/discussions/3022) for details.",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "jdisanti",
"references": [
"smithy-rs#3011"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "Several breaking changes were made to the aws-sigv4 API to remove the direct HTTP dependency:\n- The `take_parameters` and `take_headers` APIs were removed from `SigningInstructions`. Use `into_parts()` instead\n- The arguments of `SignableRequest::new` were changed to accept string types instead of types from the HTTP crate\n- `SigningInstructions::apply_to_request` was gated beyond an `http0-compat` feature flag for backwards compatibility. This API MAY be removed in a future release.\n- Several public accessors were removed from `SigningInstructions`.\n",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "rcoh",
"references": [
"smithy-rs#2921"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "In sigV4-related code, rename 'signing service' to 'signing name'. This aligns with the terminology used by the endpoint resolver.",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "Velfi",
"references": [
"smithy-rs#2911"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "Struct members modeled as required are no longer wrapped in `Option`s [when possible](https://smithy.io/2.0/spec/aggregate-types.html#structure-member-optionality). For upgrade guidance and more info, see [here](https://github.com/smithy-lang/smithy-rs/discussions/2929).",
"meta": {
"bug": false,
"breaking": true,
"tada": true
},
"author": "Velfi",
"references": [
"smithy-rs#2916",
"aws-sdk-rust#536"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "All versions of SigningParams have been updated to contain an [`Identity`](https://docs.rs/aws-smithy-runtime-api/latest/aws_smithy_runtime_api/client/identity/struct.Identity.html)\nas opposed to AWS credentials in `&str` form. [Read more](https://github.com/awslabs/aws-sdk-rust/discussions/868).\n",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "Velfi",
"references": [
"smithy-rs#2913"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "Update MSRV to Rust 1.70.0",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "Velfi",
"references": [
"smithy-rs#2948"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "Correctly identify HTTP 200 responses from S3 with `<Error>` as the root Element as errors. **Note**: This a behavior change and will change the error type returned by the SDK in some cases.",
"meta": {
"bug": true,
"breaking": false,
"tada": false
},
"author": "rcoh",
"references": [
"smithy-rs#2958",
"aws-sdk-rust#873"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "Allow `no_credentials` to be used with all S3 operations.",
"meta": {
"bug": true,
"breaking": false,
"tada": false
},
"author": "jdisanti",
"references": [
"smithy-rs#2955",
"aws-sdk-rust#878"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "`CustomizableOperation`, created as a result of calling the `.customize` method on a fluent builder, ceased to be `Send` and `Sync` in the previous releases. It is now `Send` and `Sync` again.",
"meta": {
"bug": true,
"breaking": false,
"tada": false
},
"author": "ysaito1001",
"references": [
"smithy-rs#2944",
"smithy-rs#2951"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "Make `bucket` required for request construction for S3. When `bucket` is not set, a **different** operation than intended can be triggered.",
"meta": {
"bug": true,
"breaking": false,
"tada": false
},
"author": "rcoh",
"references": [
"smithy-rs#1668",
"aws-sdk-rust#873",
"smithy-rs#2964"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "Remove `once_cell` from public API.",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "ysaito1001",
"references": [
"smithy-rs#2973"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "Fix regression with redacting sensitive HTTP response bodies.",
"meta": {
"bug": true,
"breaking": false,
"tada": false
},
"author": "ysaito1001",
"references": [
"smithy-rs#2926",
"smithy-rs#2972"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "Structure members with the type `Option<Vec<T>>` now produce an accessor with the type `&[T]` instead of `Option<&[T]>`. To determine if the field was actually set use `.<field_name>.is_some()`.",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "rcoh",
"references": [
"smithy-rs#2995"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "The `futures_core::stream::Stream` trait has been removed from public API. It should not affect usual SDK use cases. If your code uses paginators, you do not need to use the `Stream` trait or its exntension traits, but only the `next`, `try_next`, `collect`, and `try_collect` methods are supported on `PaginationStream`. Other stream operations that were previously available through the trait or its extension traits can be added later in a backward compatible manner. Finally, `fn_stream` has been moved to be a child module of `pagination_stream`.",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "ysaito1001",
"references": [
"smithy-rs#2978"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "Add support for Sigv4A request signing. Sigv4a signing will be used automatically when appropriate for a given operation. Currently, it's used for S3 and EventBridge.",
"meta": {
"bug": false,
"breaking": true,
"tada": true
},
"author": "Velfi",
"references": [
"smithy-rs#1797"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "The `futures_core::stream::Stream` trait has been removed from [`ByteStream`](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html). The methods mentioned in the [doc](https://docs.rs/aws-smithy-http/latest/aws_smithy_http/byte_stream/struct.ByteStream.html#getting-data-out-of-a-bytestream) will continue to be supported. Other stream operations that were previously available through the trait or its extension traits can be added later in a backward compatible manner.",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "ysaito1001",
"references": [
"smithy-rs#2983"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "The IMDS Client builder's `build()` method is no longer async.",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "jdisanti",
"references": [
"smithy-rs#2997"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "The API for [`AssumeRoleProvider`](https://docs.rs/aws-config/latest/aws_config/sts/struct.AssumeRoleProvider.html) has been updated to derive configuration from [`SdkConfig`](https://docs.rs/aws-config/latest/aws_config/struct.SdkConfig.html) instead of `ProviderConfig`.\n\nFor more information, see the [Change Log Discussion](https://github.com/awslabs/aws-sdk-rust/discussions/906)",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "rcoh",
"references": [
"smithy-rs#3014"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "STS and SSO-based credential providers will now respect both `use_fips` and `use_dual_stack` when those settings are configured in a user's environment or profile.",
"meta": {
"bug": true,
"breaking": true,
"tada": true
},
"author": "Velfi",
"references": [
"aws-sdk-rust#882",
"smithy-rs#3007"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "Retry classifiers are now configurable at the service and operation levels. Users may also define their own custom retry classifiers.\n\nFor more information, see the [guide](https://github.com/smithy-lang/smithy-rs/discussions/3050).\n",
"meta": {
"bug": false,
"breaking": true,
"tada": true
},
"author": "Velfi",
"references": [
"smithy-rs#2417",
"smithy-rs#3018"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "The future return types on traits `EndpointResolver` and `IdentityResolver` changed to new-types `EndpointFuture` and `IdentityFuture` respectively.",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "jdisanti",
"references": [
"smithy-rs#3055"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "Lifetimes have been added to `EndpointResolver` and `IdentityResolver` traits.",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "jdisanti",
"references": [
"smithy-rs#3061"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "Several traits have been renamed from noun form to verb form to be more idiomatic:\n- `EndpointResolver` -> `ResolveEndpoint`\n- `Interceptor` -> `Intercept`\n",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "jdisanti",
"references": [
"smithy-rs#3065"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "[`PresignedRequest`](https://docs.rs/aws-sdk-s3/latest/aws_sdk_s3/presigning/struct.PresignedRequest.html) now returns standard-library types instead of types from the `http` crate. `to_http_request` has been renamed `to_http_02x_request`.",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "rcoh",
"references": [
"smithy-rs#3059"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "A bug was fixed where the credentials-process provider was executing the subprocess in the worker thread, potentially stalling the runtime.",
"meta": {
"bug": true,
"breaking": false,
"tada": false
},
"author": "rcoh",
"references": [
"smithy-rs#3052"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "The `credentials-process` feature was added to `aws-config`. If you currently use `no-default-features` for `aws-config`, you MUST enable this feature to use the [`CredentialProcessProvider`](https://docs.rs/aws-config/latest/aws_config/credential_process/struct.CredentialProcessProvider.html) provider directly or via `~/.aws/config`.",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "rcoh",
"references": [
"smithy-rs#3052"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "**This change has [detailed upgrade guidance](https://github.com/awslabs/aws-sdk-rust/discussions/923).** <br><br>The AWS credentials cache has been replaced with a more generic identity cache.",
"meta": {
"bug": false,
"breaking": true,
"tada": false
},
"author": "jdisanti",
"references": [
"smithy-rs#3077"
],
"since-commit": "b56d34847591494a15b8fabcce55f730400ebea9",
"age": 5
},
{
"message": "Change `ByteStream::into_async_read` to return `AsyncBufRead`",
"meta": {
@ -434,7 +17,7 @@
"smithy-rs#3164"
],
"since-commit": "f78ee50d9b28c1d2337ca6236e592dfc243ae1c9",
"age": 3
"age": 4
},
{
"message": "[Upgrade guidance for HTTP Request/Response changes](https://github.com/awslabs/aws-sdk-rust/discussions/950). HTTP request types moved, and a new HTTP response type was added.",
@ -449,7 +32,7 @@
"smithy-rs#3148"
],
"since-commit": "f78ee50d9b28c1d2337ca6236e592dfc243ae1c9",
"age": 3
"age": 4
},
{
"message": "An operation output that supports receiving events from stream now provides a new-type wrapping `aws_smithy_http::event_stream::receiver::Receiver`. The new-type supports the `.recv()` method whose signature is the same as [`aws_smithy_http::event_stream::receiver::Receiver::recv`](https://docs.rs/aws-smithy-http/0.57.0/aws_smithy_http/event_stream/struct.Receiver.html#method.recv).\n",
@ -464,7 +47,7 @@
"smithy-rs#3114"
],
"since-commit": "f78ee50d9b28c1d2337ca6236e592dfc243ae1c9",
"age": 3
"age": 4
},
{
"message": "The `RequestId` trait has moved from the aws-http crate into aws-types.",
@ -478,7 +61,7 @@
"smithy-rs#3160"
],
"since-commit": "f78ee50d9b28c1d2337ca6236e592dfc243ae1c9",
"age": 3
"age": 4
},
{
"message": "Clients now require a `BehaviorVersion` to be provided. For must customers, `latest` is the best choice. This will be enabled automatically if you enable the `behavior-version-latest` cargo feature on `aws-config` or on an SDK crate. For customers that wish to pin to a specific behavior major version, it can be set in `aws-config` or when constructing the service client.\n\n```rust\nasync fn example() {\n // with aws-config\n let conf = aws_config::defaults(aws_config::BehaviorVersion::v2023_11_09());\n\n // when creating a client\n let client = my_service::Client::from_conf(my_service::Config::builder().behavior_version(..).<other params>.build());\n}\n```",
@ -492,7 +75,7 @@
"smithy-rs#3151"
],
"since-commit": "f78ee50d9b28c1d2337ca6236e592dfc243ae1c9",
"age": 3
"age": 4
},
{
"message": "Add `ProvideErrorMetadata` impl for service `Error` type.",
@ -507,7 +90,7 @@
"smithy-rs#3189"
],
"since-commit": "f78ee50d9b28c1d2337ca6236e592dfc243ae1c9",
"age": 3
"age": 4
},
{
"message": "Remove deprecated error kind type aliases.",
@ -521,7 +104,7 @@
"smithy-rs#3189"
],
"since-commit": "f78ee50d9b28c1d2337ca6236e592dfc243ae1c9",
"age": 3
"age": 4
},
{
"message": "Unhandled errors have been made opaque to ensure code is written in a future-proof manner. Where previously, you\nmight have:\n```rust\nmatch service_error.err() {\n GetStorageError::StorageAccessNotAuthorized(_) => { /* ... */ }\n GetStorageError::Unhandled(unhandled) if unhandled.code() == Some(\"SomeUnmodeledErrorCode\") {\n // unhandled error handling\n }\n _ => { /* ... */ }\n}\n```\nIt should now look as follows:\n```rust\nmatch service_error.err() {\n GetStorageError::StorageAccessNotAuthorized(_) => { /* ... */ }\n err if err.code() == Some(\"SomeUnmodeledErrorCode\") {\n // unhandled error handling\n }\n _ => { /* ... */ }\n}\n```\nThe `Unhandled` variant should never be referenced directly.\n",
@ -535,7 +118,7 @@
"smithy-rs#3191"
],
"since-commit": "f78ee50d9b28c1d2337ca6236e592dfc243ae1c9",
"age": 3
"age": 4
},
{
"message": "imds::client::Builder::endpoint has been updated to accept a string instead of a URI. The method now returns a result instead.",
@ -549,7 +132,7 @@
"smithy-rs#3205"
],
"since-commit": "681bf77733a242e01458e87381a0700616c918da",
"age": 2
"age": 3
},
{
"message": "The `AssumeRoleBuilder::policy_arns` now accepts strings instead of an STS specific type",
@ -563,7 +146,7 @@
"smithy-rs#3205"
],
"since-commit": "681bf77733a242e01458e87381a0700616c918da",
"age": 2
"age": 3
},
{
"message": "Fix optional types in S3. Many types in S3 were modeled as non-optional but this causes serialization issues.",
@ -577,7 +160,7 @@
"smithy-rs#3213"
],
"since-commit": "681bf77733a242e01458e87381a0700616c918da",
"age": 2
"age": 3
},
{
"message": "Add configurable stalled-stream protection for downloads.\n\nWhen making HTTP calls,\nit's possible for a connection to 'stall out' and emit no more data due to server-side issues.\nIn the event this happens, it's desirable for the stream to error out as quickly as possible.\nWhile timeouts can protect you from this issue, they aren't adaptive to the amount of data\nbeing sent and so must be configured specifically for each use case. When enabled, stalled-stream\nprotection will ensure that bad streams error out quickly, regardless of the amount of data being\ndownloaded.\n\nProtection is enabled by default for all clients but can be configured or disabled.\nSee [this discussion](https://github.com/awslabs/aws-sdk-rust/discussions/956) for more details.\n",
@ -591,7 +174,7 @@
"smithy-rs#3202"
],
"since-commit": "f66f9246bccc376462ef47aec5707569fca214f5",
"age": 1
"age": 2
},
{
"message": "Make certain types for EMR Serverless optional. Previously, they defaulted to 0, but this created invalid requests.",
@ -605,7 +188,7 @@
"smithy-rs#3217"
],
"since-commit": "f66f9246bccc376462ef47aec5707569fca214f5",
"age": 1
"age": 2
},
{
"message": "Prevent multiplication overflow in backoff computation",
@ -620,7 +203,7 @@
"aws-sdk-rust#960"
],
"since-commit": "f66f9246bccc376462ef47aec5707569fca214f5",
"age": 1
"age": 2
},
{
"message": "Make some types for various services optional. Previously, they defaulted to 0, but this created invalid requests.",
@ -634,7 +217,7 @@
"smithy-rs#3228"
],
"since-commit": "f66f9246bccc376462ef47aec5707569fca214f5",
"age": 1
"age": 2
},
{
"message": "Types/functions that were deprecated in previous releases were removed. Unfortunately, some of these deprecations\nwere ignored by the Rust compiler (we found out later that `#[deprecated]` on `pub use` doesn't work). See\nthe [deprecations removal list](https://github.com/smithy-lang/smithy-rs/discussions/3223) for more details.\n",
@ -648,7 +231,7 @@
"smithy-rs#3222"
],
"since-commit": "f66f9246bccc376462ef47aec5707569fca214f5",
"age": 1
"age": 2
},
{
"message": "Add `Display` impl for `DateTime`.",
@ -662,7 +245,7 @@
"smithy-rs#3183"
],
"since-commit": "f66f9246bccc376462ef47aec5707569fca214f5",
"age": 1
"age": 2
},
{
"message": "Types/functions that were previously `#[doc(hidden)]` in `aws-config`, `aws-inlineable`, `aws-types`, and the SDK crates are now visible. For those that are not intended to be used directly, they are called out in their docs as such.",
@ -676,6 +259,20 @@
"smithy-rs#3226"
],
"since-commit": "f66f9246bccc376462ef47aec5707569fca214f5",
"age": 2
},
{
"message": "Make properties of S3Control PublicAccessBlockConfiguration optional. Previously, they defaulted to false, but this created invalid requests.",
"meta": {
"bug": true,
"breaking": true,
"tada": false
},
"author": "milesziemer",
"references": [
"smithy-rs#3246"
],
"since-commit": "e155c3048b9989fe406ef575d461ea01dfaf294c",
"age": 1
}
],

View File

@ -7,6 +7,7 @@ package software.amazon.smithy.rustsdk.customize
import software.amazon.smithy.model.Model
import software.amazon.smithy.model.shapes.ServiceShape
import software.amazon.smithy.model.shapes.ShapeId
import software.amazon.smithy.rust.codegen.client.smithy.ClientRustSettings
import software.amazon.smithy.rust.codegen.client.smithy.customize.ClientCodegenDecorator
import software.amazon.smithy.rust.codegen.core.util.shapeId
@ -24,38 +25,43 @@ class RemoveDefaultsDecorator : ClientCodegenDecorator {
// Service shape id -> Shape id of each root shape to remove the default from.
// TODO(https://github.com/smithy-lang/smithy-rs/issues/3220): Remove this customization after model updates.
private val removeDefaults = mapOf(
"com.amazonaws.amplifyuibuilder#AmplifyUIBuilder".shapeId() to setOf(
"com.amazonaws.amplifyuibuilder#ListComponentsLimit".shapeId(),
"com.amazonaws.amplifyuibuilder#ListFormsLimit".shapeId(),
"com.amazonaws.amplifyuibuilder#ListThemesLimit".shapeId(),
private val removeDefaults: Map<ShapeId, Set<ShapeId>> = mapOf(
"com.amazonaws.amplifyuibuilder#AmplifyUIBuilder" to setOf(
"com.amazonaws.amplifyuibuilder#ListComponentsLimit",
"com.amazonaws.amplifyuibuilder#ListFormsLimit",
"com.amazonaws.amplifyuibuilder#ListThemesLimit",
),
"com.amazonaws.drs#ElasticDisasterRecoveryService".shapeId() to setOf(
"com.amazonaws.drs#Validity".shapeId(),
"com.amazonaws.drs#CostOptimizationConfiguration\$burstBalanceThreshold".shapeId(),
"com.amazonaws.drs#CostOptimizationConfiguration\$burstBalanceDeltaThreshold".shapeId(),
"com.amazonaws.drs#ListStagingAccountsRequest\$maxResults".shapeId(),
"com.amazonaws.drs#StrictlyPositiveInteger".shapeId(),
"com.amazonaws.drs#MaxResultsType".shapeId(),
"com.amazonaws.drs#MaxResultsReplicatingSourceServers".shapeId(),
"com.amazonaws.drs#LaunchActionOrder".shapeId(),
"com.amazonaws.drs#ElasticDisasterRecoveryService" to setOf(
"com.amazonaws.drs#Validity",
"com.amazonaws.drs#CostOptimizationConfiguration\$burstBalanceThreshold",
"com.amazonaws.drs#CostOptimizationConfiguration\$burstBalanceDeltaThreshold",
"com.amazonaws.drs#ListStagingAccountsRequest\$maxResults",
"com.amazonaws.drs#StrictlyPositiveInteger",
"com.amazonaws.drs#MaxResultsType",
"com.amazonaws.drs#MaxResultsReplicatingSourceServers",
"com.amazonaws.drs#LaunchActionOrder",
),
"com.amazonaws.evidently#Evidently".shapeId() to setOf(
"com.amazonaws.evidently#ResultsPeriod".shapeId(),
"com.amazonaws.evidently#Evidently" to setOf(
"com.amazonaws.evidently#ResultsPeriod",
),
"com.amazonaws.location#LocationService".shapeId() to setOf(
"com.amazonaws.location#ListPlaceIndexesRequest\$MaxResults".shapeId(),
"com.amazonaws.location#SearchPlaceIndexForSuggestionsRequest\$MaxResults".shapeId(),
"com.amazonaws.location#PlaceIndexSearchResultLimit".shapeId(),
"com.amazonaws.location#LocationService" to setOf(
"com.amazonaws.location#ListPlaceIndexesRequest\$MaxResults",
"com.amazonaws.location#SearchPlaceIndexForSuggestionsRequest\$MaxResults",
"com.amazonaws.location#PlaceIndexSearchResultLimit",
),
"com.amazonaws.paymentcryptographydata#PaymentCryptographyDataPlane".shapeId() to setOf(
"com.amazonaws.paymentcryptographydata#IntegerRangeBetween4And12".shapeId(),
"com.amazonaws.paymentcryptographydata#PaymentCryptographyDataPlane" to setOf(
"com.amazonaws.paymentcryptographydata#IntegerRangeBetween4And12",
),
"com.amazonaws.emrserverless#AwsToledoWebService".shapeId() to setOf(
// Service expects this to have a min value > 0
"com.amazonaws.emrserverless#WorkerCounts".shapeId(),
"com.amazonaws.emrserverless#AwsToledoWebService" to setOf(
"com.amazonaws.emrserverless#WorkerCounts",
),
)
"com.amazonaws.s3control#AWSS3ControlServiceV20180820" to setOf(
"com.amazonaws.s3control#PublicAccessBlockConfiguration\$BlockPublicAcls",
"com.amazonaws.s3control#PublicAccessBlockConfiguration\$IgnorePublicAcls",
"com.amazonaws.s3control#PublicAccessBlockConfiguration\$BlockPublicPolicy",
"com.amazonaws.s3control#PublicAccessBlockConfiguration\$RestrictPublicBuckets",
),
).map { (k, v) -> k.shapeId() to v.map { it.shapeId() }.toSet() }.toMap()
private fun applies(service: ServiceShape) =
removeDefaults.containsKey(service.id)

View File

@ -24,7 +24,7 @@ object CrateVersioner {
IndependentCrateVersioner(
VersionsManifest.fromFile(versionsManifestPath),
ModelMetadata.fromFile(modelMetadataPath),
devPreview = true,
devPreview = false,
smithyRsVersion = getSmithyRsVersion(rootProject),
)
}

View File

@ -12,10 +12,7 @@ rust.msrv=1.70.0
org.gradle.jvmargs=-Xmx1024M
# Version number to use for the generated stable runtime crates
#
# TODO(GA): This is currently a placeholder for crates we are going to stabilize at the time of GA.
# Until then, a value of this key can contain 0 for the major version.
smithy.rs.runtime.crate.stable.version=1.0.0
smithy.rs.runtime.crate.stable.version=1.0.1
# Version number to use for the generated unstable runtime crates
smithy.rs.runtime.crate.unstable.version=0.60.0

View File

@ -49,9 +49,10 @@ aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api", features = ["test
aws-smithy-types = { path = "../aws-smithy-types", features = ["test-util"] }
futures-util = "0.3.28"
pretty_assertions = "1.4.0"
tokio = { version = "1.25", features = ["macros", "rt", "rt-multi-thread", "test-util"] }
tokio = { version = "1.25", features = ["macros", "rt", "rt-multi-thread", "test-util", "full"] }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
tracing-test = "0.2.1"
hyper_0_14 = { package = "hyper", version = "0.14.27",features = ["client", "server", "tcp", "http1", "http2"] }
[package.metadata.docs.rs]
all-features = true

View File

@ -12,6 +12,7 @@ pub mod http_body_0_4_x;
/// Options for a [`MinimumThroughputBody`].
pub mod options;
pub use throughput::Throughput;
mod throughput;
use aws_smithy_async::rt::sleep::Sleep;
@ -21,7 +22,8 @@ use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::shared::IntoShared;
use options::MinimumThroughputBodyOptions;
use std::fmt;
use throughput::{Throughput, ThroughputLogs};
use std::time::SystemTime;
use throughput::ThroughputLogs;
pin_project_lite::pin_project! {
/// A body-wrapping type that ensures data is being streamed faster than some lower limit.
@ -41,7 +43,7 @@ pin_project_lite::pin_project! {
}
}
const SIZE_OF_ONE_LOG: usize = std::mem::size_of::<(std::time::SystemTime, u64)>(); // 24 bytes per log
const SIZE_OF_ONE_LOG: usize = std::mem::size_of::<(SystemTime, u64)>(); // 24 bytes per log
const NUMBER_OF_LOGS_IN_ONE_KB: f64 = 1024.0 / SIZE_OF_ONE_LOG as f64;
impl<B> MinimumThroughputBody<B> {
@ -57,7 +59,6 @@ impl<B> MinimumThroughputBody<B> {
// Never keep more than 10KB of logs in memory. This currently
// equates to 426 logs.
(NUMBER_OF_LOGS_IN_ONE_KB * 10.0) as usize,
options.minimum_throughput().per_time_elapsed(),
),
async_sleep: async_sleep.into_shared(),
time_source: time_source.into_shared(),

View File

@ -21,49 +21,69 @@ where
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
// this code is called quite frequently in production—one every millisecond or so when downloading
// a stream. However, SystemTime::now is on the order of nanoseconds
let now = self.time_source.now();
// Attempt to read the data from the inner body, then update the
// throughput logs.
let mut this = self.as_mut().project();
let poll_res = match this.inner.poll_data(cx) {
Poll::Ready(Some(Ok(bytes))) => {
tracing::trace!("received data: {}", bytes.len());
this.throughput_logs.push((now, bytes.len() as u64));
Poll::Ready(Some(Ok(bytes)))
}
Poll::Pending => Poll::Pending,
Poll::Pending => {
tracing::trace!("received poll pending");
this.throughput_logs.push((now, 0));
Poll::Pending
}
// If we've read all the data or an error occurred, then return that result.
res => return res,
};
// Push a start log if we haven't already done so.
if this.throughput_logs.is_empty() {
this.throughput_logs.push((now, 0));
}
// Check the sleep future to see if it needs refreshing.
let mut sleep_fut = this.sleep_fut.take().unwrap_or_else(|| {
this.async_sleep
.sleep(this.options.minimum_throughput().per_time_elapsed())
});
let mut sleep_fut = this
.sleep_fut
.take()
.unwrap_or_else(|| this.async_sleep.sleep(this.options.check_interval()));
if let Poll::Ready(()) = pin!(&mut sleep_fut).poll(cx) {
tracing::trace!("sleep future triggered—triggering a wakeup");
// Whenever the sleep future expires, we replace it.
sleep_fut = this
.async_sleep
.sleep(this.options.minimum_throughput().per_time_elapsed());
sleep_fut = this.async_sleep.sleep(this.options.check_interval());
// We also schedule a wake up for current task to ensure that
// it gets polled at least one more time.
cx.waker().wake_by_ref();
};
this.sleep_fut.replace(sleep_fut);
let calculated_tpt = match this
.throughput_logs
.calculate_throughput(now, this.options.check_window())
{
Some(tpt) => tpt,
None => {
tracing::trace!("calculated throughput is None!");
return poll_res;
}
};
tracing::trace!(
"calculated throughput {:?} (window: {:?})",
calculated_tpt,
this.options.check_window()
);
// Calculate the current throughput and emit an error if it's too low and
// the grace period has elapsed.
let actual_throughput = this.throughput_logs.calculate_throughput(now);
let is_below_minimum_throughput = actual_throughput
.map(|t| t < this.options.minimum_throughput())
.unwrap_or_default();
let is_below_minimum_throughput = calculated_tpt <= this.options.minimum_throughput();
if is_below_minimum_throughput {
// Check the grace period future to see if it needs creating.
tracing::trace!(
in_grace_period = this.grace_period_fut.is_some(),
observed_throughput = ?calculated_tpt,
minimum_throughput = ?this.options.minimum_throughput(),
"below minimum throughput"
);
let mut grace_period_fut = this
.grace_period_fut
.take()
@ -72,7 +92,7 @@ where
// The grace period has ended!
return Poll::Ready(Some(Err(Box::new(Error::ThroughputBelowMinimum {
expected: self.options.minimum_throughput(),
actual: actual_throughput.unwrap(),
actual: calculated_tpt,
}))));
};
this.grace_period_fut.replace(grace_period_fut);
@ -95,10 +115,11 @@ where
}
// These tests use `hyper::body::Body::wrap_stream`
#[cfg(all(test, feature = "connector-hyper-0-14-x"))]
#[cfg(all(test, feature = "connector-hyper-0-14-x", feature = "test-util"))]
mod test {
use super::{super::Throughput, Error, MinimumThroughputBody};
use crate::client::http::body::minimum_throughput::options::MinimumThroughputBodyOptions;
use crate::test_util::capture_test_logs::capture_test_logs;
use aws_smithy_async::rt::sleep::AsyncSleep;
use aws_smithy_async::test_util::{instant_time_and_sleep, InstantSleep, ManualTimeSource};
use aws_smithy_types::body::SdkBody;
@ -111,8 +132,8 @@ mod test {
use pretty_assertions::assert_eq;
use std::convert::Infallible;
use std::error::Error as StdError;
use std::future::Future;
use std::pin::Pin;
use std::future::{poll_fn, Future};
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
use std::time::{Duration, UNIX_EPOCH};
@ -216,7 +237,7 @@ mod test {
eight_byte_per_second_stream_with_minimum_throughput_timeout(minimum_throughput);
let expected_err = Error::ThroughputBelowMinimum {
expected: minimum_throughput,
actual: Throughput::new(8.889, Duration::from_secs(1)),
actual: Throughput::new(8, Duration::from_secs(1)),
};
match res.await {
Ok(_) => {
@ -236,7 +257,7 @@ mod test {
#[tokio::test]
async fn test_throughput_timeout_less_than() {
let minimum_throughput = Throughput::new_bytes_per_second(9.0);
let minimum_throughput = Throughput::new_bytes_per_second(9);
expect_error(minimum_throughput).await;
}
@ -255,39 +276,42 @@ mod test {
#[tokio::test]
async fn test_throughput_timeout_equal_to() {
let minimum_throughput = Throughput::new(32.0, Duration::from_secs(4));
let (_guard, _) = capture_test_logs();
// a tiny bit less. To capture 0-throughput properly, we need to allow 0 to be 0
let minimum_throughput = Throughput::new(31, Duration::from_secs(4));
expect_success(minimum_throughput).await;
}
#[tokio::test]
async fn test_throughput_timeout_greater_than() {
let minimum_throughput = Throughput::new(20.0, Duration::from_secs(3));
let minimum_throughput = Throughput::new(20, Duration::from_secs(3));
expect_success(minimum_throughput).await;
}
// A multiplier for the sine wave amplitude; Chosen arbitrarily.
const BYTE_COUNT_UPPER_LIMIT: f64 = 100.0;
const BYTE_COUNT_UPPER_LIMIT: u64 = 1000;
fn create_shrinking_sine_wave_stream(
/// emits 1000B/S for 5 seconds then suddenly stops
fn sudden_stop(
async_sleep: impl AsyncSleep + Clone,
) -> impl futures_util::Stream<Item = Result<Bytes, Infallible>> {
let sleep_dur = Duration::from_millis(50);
fastrand::seed(0);
futures_util::stream::unfold(1, move |i| {
let async_sleep = async_sleep.clone();
async move {
if i > 255 {
None
let number_seconds = (i * sleep_dur).as_secs_f64();
async_sleep.sleep(sleep_dur).await;
if number_seconds > 5.0 {
Some((Result::<Bytes, Infallible>::Ok(Bytes::new()), i + 1))
} else {
let byte_count = (i as f64).sin().floor().abs() + 1.0 / (i as f64 + 1.0);
let byte_count = (byte_count * BYTE_COUNT_UPPER_LIMIT) as u64;
let mut bytes = BytesMut::new();
bytes.put_u8(i as u8);
if byte_count > 0 {
for _ in 0..byte_count {
bytes.put_u8(0)
}
let bytes_per_segment =
(BYTE_COUNT_UPPER_LIMIT as f64) * sleep_dur.as_secs_f64();
for _ in 0..bytes_per_segment as usize {
bytes.put_u8(0)
}
async_sleep.sleep(Duration::from_secs(1)).await;
Some((Result::<Bytes, Infallible>::Ok(bytes.into()), i + 1))
}
}
@ -295,17 +319,52 @@ mod test {
}
#[tokio::test]
async fn test_throughput_timeout_shrinking_sine_wave() {
async fn test_stalled_stream_detection() {
test_suddenly_stopping_stream(0, Duration::from_secs(6)).await
}
#[tokio::test]
async fn test_slow_stream_detection() {
test_suddenly_stopping_stream(BYTE_COUNT_UPPER_LIMIT / 2, Duration::from_secs_f64(5.50))
.await
}
#[tokio::test]
async fn test_check_interval() {
let (_guard, _) = capture_test_logs();
let (ts, sleep) = instant_time_and_sleep(UNIX_EPOCH);
let mut body = MinimumThroughputBody::new(
ts,
sleep.clone(),
NeverBody,
MinimumThroughputBodyOptions::builder()
.check_interval(Duration::from_millis(1234))
.grace_period(Duration::from_millis(456))
.build(),
);
let mut body = pin!(body);
let _ = poll_fn(|cx| body.as_mut().poll_data(cx)).await;
assert_eq!(
sleep.logs(),
vec![
// sleep, by second sleep we know we have no data, then the grace period
Duration::from_millis(1234),
Duration::from_millis(1234),
Duration::from_millis(456)
]
);
}
async fn test_suddenly_stopping_stream(throughput_limit: u64, time_until_timeout: Duration) {
let (_guard, _) = capture_test_logs();
let options = MinimumThroughputBodyOptions::builder()
// Minimum throughput per second will be approx. half of the BYTE_COUNT_UPPER_LIMIT.
.minimum_throughput(Throughput::new_bytes_per_second(
BYTE_COUNT_UPPER_LIMIT / 2.0 + 2.0,
))
.minimum_throughput(Throughput::new_bytes_per_second(throughput_limit))
.build();
let (time_source, async_sleep) = instant_time_and_sleep(UNIX_EPOCH);
let time_clone = time_source.clone();
let stream = create_shrinking_sine_wave_stream(async_sleep.clone());
let stream = sudden_stop(async_sleep.clone());
let body = ByteStream::new(SdkBody::from_body_0_4(hyper_0_14::body::Body::wrap_stream(
stream,
)));
@ -326,14 +385,17 @@ mod test {
match res.await {
Ok(_res) => {
assert_eq!(255.0, time_source.seconds_since_unix_epoch());
assert_eq!(Duration::from_secs(255), async_sleep.total_duration());
panic!("stream should have timed out");
}
Err(err) => {
dbg!(err);
assert_eq!(
async_sleep.total_duration(),
time_until_timeout,
"With throughput limit {:?} expected timeout after {:?} (stream starts sending 0's at 5 seconds.",
throughput_limit, time_until_timeout
);
}
Err(err) => panic!(
"test stopped after {:?} due to {}",
async_sleep.total_duration(),
DisplayErrorContext(err.source().unwrap())
),
}
}
}

View File

@ -19,9 +19,18 @@ pub struct MinimumThroughputBodyOptions {
/// If this is set to a positive value, whenever throughput is below the minimum throughput,
/// a timer is started. If the timer expires before throughput rises above the minimum,
/// an error is emitted.
///
/// It is recommended to set this to a small value (e.g. 200ms) to avoid issues during
/// stream-startup.
grace_period: Duration,
/// The interval at which the throughput is checked.
check_interval: Duration,
/// The period of time to consider when computing the throughput
///
/// This SHOULD be longer than the check interval, or stuck-streams may evade detection.
check_window: Duration,
}
impl MinimumThroughputBodyOptions {
@ -52,6 +61,10 @@ impl MinimumThroughputBodyOptions {
self.minimum_throughput
}
pub(crate) fn check_window(&self) -> Duration {
self.check_window
}
/// The rate at which the throughput is checked.
///
/// The actual rate throughput is checked may be higher than this value,
@ -67,6 +80,7 @@ impl Default for MinimumThroughputBodyOptions {
minimum_throughput: DEFAULT_MINIMUM_THROUGHPUT,
grace_period: DEFAULT_GRACE_PERIOD,
check_interval: DEFAULT_CHECK_INTERVAL,
check_window: DEFAULT_CHECK_WINDOW,
}
}
}
@ -79,13 +93,15 @@ pub struct MinimumThroughputBodyOptionsBuilder {
grace_period: Option<Duration>,
}
const DEFAULT_CHECK_INTERVAL: Duration = Duration::from_secs(1);
const DEFAULT_CHECK_INTERVAL: Duration = Duration::from_millis(500);
const DEFAULT_GRACE_PERIOD: Duration = Duration::from_secs(0);
const DEFAULT_MINIMUM_THROUGHPUT: Throughput = Throughput {
bytes_read: 1.0,
bytes_read: 1,
per_time_elapsed: Duration::from_secs(1),
};
const DEFAULT_CHECK_WINDOW: Duration = Duration::from_secs(1);
impl MinimumThroughputBodyOptionsBuilder {
/// Create a new `MinimumThroughputBodyOptionsBuilder`.
pub fn new() -> Self {
@ -146,6 +162,7 @@ impl MinimumThroughputBodyOptionsBuilder {
.minimum_throughput
.unwrap_or(DEFAULT_MINIMUM_THROUGHPUT),
check_interval: self.check_interval.unwrap_or(DEFAULT_CHECK_INTERVAL),
check_window: DEFAULT_CHECK_WINDOW,
}
}
}
@ -156,6 +173,7 @@ impl From<StalledStreamProtectionConfig> for MinimumThroughputBodyOptions {
grace_period: value.grace_period(),
minimum_throughput: DEFAULT_MINIMUM_THROUGHPUT,
check_interval: DEFAULT_CHECK_INTERVAL,
check_window: DEFAULT_CHECK_WINDOW,
}
}
}

View File

@ -7,23 +7,16 @@ use std::collections::VecDeque;
use std::fmt;
use std::time::{Duration, SystemTime};
/// Throughput representation for use when configuring [`super::MinimumThroughputBody`]
#[derive(Debug, Clone, Copy)]
pub struct Throughput {
pub(super) bytes_read: f64,
pub(super) bytes_read: u64,
pub(super) per_time_elapsed: Duration,
}
impl Throughput {
/// Create a new throughput with the given bytes read and time elapsed.
pub fn new(bytes_read: f64, per_time_elapsed: Duration) -> Self {
debug_assert!(
!bytes_read.is_nan(),
"cannot create a throughput if bytes_read == NaN"
);
debug_assert!(
bytes_read.is_finite(),
"cannot create a throughput if bytes_read == Inf"
);
pub fn new(bytes_read: u64, per_time_elapsed: Duration) -> Self {
debug_assert!(
!per_time_elapsed.is_zero(),
"cannot create a throughput if per_time_elapsed == 0"
@ -36,7 +29,7 @@ impl Throughput {
}
/// Create a new throughput in bytes per second.
pub fn new_bytes_per_second(bytes: f64) -> Self {
pub fn new_bytes_per_second(bytes: u64) -> Self {
Self {
bytes_read: bytes,
per_time_elapsed: Duration::from_secs(1),
@ -44,32 +37,28 @@ impl Throughput {
}
/// Create a new throughput in kilobytes per second.
pub fn new_kilobytes_per_second(kilobytes: f64) -> Self {
pub fn new_kilobytes_per_second(kilobytes: u64) -> Self {
Self {
bytes_read: kilobytes * 1000.0,
bytes_read: kilobytes * 1000,
per_time_elapsed: Duration::from_secs(1),
}
}
/// Create a new throughput in megabytes per second.
pub fn new_megabytes_per_second(megabytes: f64) -> Self {
pub fn new_megabytes_per_second(megabytes: u64) -> Self {
Self {
bytes_read: megabytes * 1000.0 * 1000.0,
bytes_read: megabytes * 1000 * 1000,
per_time_elapsed: Duration::from_secs(1),
}
}
pub(super) fn per_time_elapsed(&self) -> Duration {
self.per_time_elapsed
}
pub(super) fn bytes_per_second(&self) -> f64 {
let per_time_elapsed_secs = self.per_time_elapsed.as_secs_f64();
if per_time_elapsed_secs == 0.0 {
return 0.0; // Avoid dividing by zero.
};
self.bytes_read / per_time_elapsed_secs
self.bytes_read as f64 / per_time_elapsed_secs
}
}
@ -102,7 +91,7 @@ impl fmt::Display for Throughput {
impl From<(u64, Duration)> for Throughput {
fn from(value: (u64, Duration)) -> Self {
Self {
bytes_read: value.0 as f64,
bytes_read: value.0,
per_time_elapsed: value.1,
}
}
@ -111,62 +100,80 @@ impl From<(u64, Duration)> for Throughput {
#[derive(Clone)]
pub(super) struct ThroughputLogs {
max_length: usize,
min_elapsed_time: Duration,
inner: VecDeque<(SystemTime, u64)>,
bytes_processed: u64,
}
impl ThroughputLogs {
pub(super) fn new(max_length: usize, min_elapsed_time: Duration) -> Self {
pub(super) fn new(max_length: usize) -> Self {
Self {
inner: VecDeque::new(),
min_elapsed_time,
inner: VecDeque::with_capacity(max_length),
max_length,
bytes_processed: 0,
}
}
pub(super) fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub(super) fn push(&mut self, throughput: (SystemTime, u64)) {
self.inner.push_back(throughput);
// When the number of logs exceeds the max length, toss the oldest log.
if self.inner.len() > self.max_length {
self.inner.pop_front();
if self.inner.len() == self.max_length {
self.bytes_processed -= self.inner.pop_front().map(|(_, sz)| sz).unwrap_or_default();
}
debug_assert!(self.inner.capacity() > self.inner.len());
self.bytes_processed += throughput.1;
self.inner.push_back(throughput);
}
pub(super) fn front(&self) -> Option<&(SystemTime, u64)> {
self.inner.front()
fn buffer_full(&self) -> bool {
self.inner.len() == self.max_length
}
pub(super) fn calculate_throughput(&self, now: SystemTime) -> Option<Throughput> {
match self.front() {
Some((front_t, _)) => {
// Ensure that enough time has passed between the first and last logs.
// If not, we can't calculate throughput so we return `None`.
// In the case that `now` is earlier than the first log time, we also return `None`.
let time_elapsed = now.duration_since(*front_t).unwrap_or_default();
if time_elapsed < self.min_elapsed_time {
return None;
}
// Floating back never contains bytes, so we don't care that
// it's missed in this calculation.
let total_bytes_logged = self
.inner
.iter()
.fold(0, |acc, (_, bytes_read)| acc + bytes_read)
as f64;
pub(super) fn calculate_throughput(
&self,
now: SystemTime,
time_window: Duration,
) -> Option<Throughput> {
// There are a lot of pathological cases that are 0 throughput. These cases largely shouldn't
// happen, because the check interval MUST be less than the check window
let total_length = self
.inner
.iter()
.last()?
.0
.duration_since(self.inner.get(0)?.0)
.ok()?;
// during a "healthy" request we'll only have a few milliseconds of logs (shorter than the check window)
if total_length < time_window {
// if we haven't hit our requested time window & the buffer still isn't full, then
// return `None` — this is the "startup grace period"
return if !self.buffer_full() {
None
} else {
// Otherwise, if the entire buffer fits in the timewindow, we can the shortcut to
// avoid recomputing all the data
Some(Throughput {
bytes_read: total_bytes_logged,
per_time_elapsed: time_elapsed,
bytes_read: self.bytes_processed,
per_time_elapsed: total_length,
})
}
_ => None,
};
}
let minimum_ts = now - time_window;
let first_item = self.inner.iter().find(|(ts, _)| *ts >= minimum_ts)?.0;
let time_elapsed = now.duration_since(first_item).unwrap_or_default();
let total_bytes_logged = self
.inner
.iter()
.rev()
.take_while(|(ts, _)| *ts > minimum_ts)
.map(|t| t.1)
.sum::<u64>();
Some(Throughput {
bytes_read: total_bytes_logged,
per_time_elapsed: time_elapsed,
})
}
}
@ -177,9 +184,9 @@ mod test {
#[test]
fn test_throughput_eq() {
let t1 = Throughput::new(1.0, Duration::from_secs(1));
let t2 = Throughput::new(25.0, Duration::from_secs(25));
let t3 = Throughput::new(100.0, Duration::from_secs(100));
let t1 = Throughput::new(1, Duration::from_secs(1));
let t2 = Throughput::new(25, Duration::from_secs(25));
let t3 = Throughput::new(100, Duration::from_secs(100));
assert_eq!(t1, t2);
assert_eq!(t2, t3);
@ -190,7 +197,7 @@ mod test {
tick_duration: Duration,
rate: u64,
) -> (ThroughputLogs, SystemTime) {
let mut throughput_logs = ThroughputLogs::new(length as usize, Duration::from_secs(1));
let mut throughput_logs = ThroughputLogs::new(length as usize);
for i in 1..=length {
throughput_logs.push((UNIX_EPOCH + (tick_duration * i), rate));
}
@ -199,48 +206,78 @@ mod test {
(throughput_logs, UNIX_EPOCH + (tick_duration * length))
}
const EPSILON: f64 = 0.001;
macro_rules! assert_delta {
($x:expr, $y:expr, $d:expr) => {
if !(($x as f64) - $y < $d || $y - ($x as f64) < $d) {
panic!();
}
};
}
#[test]
fn test_throughput_log_calculate_throughput_1() {
let (throughput_logs, now) = build_throughput_log(1000, Duration::from_secs(1), 1);
let throughput = throughput_logs.calculate_throughput(now).unwrap();
// Floats being what they are
assert_eq!(1.001001001001001, throughput.bytes_per_second());
for dur in [10, 100, 100] {
let throughput = throughput_logs
.calculate_throughput(now, Duration::from_secs(dur))
.unwrap();
assert_eq!(1.0, throughput.bytes_per_second());
}
let throughput = throughput_logs
.calculate_throughput(now, Duration::from_secs_f64(101.5))
.unwrap();
assert_delta!(1, throughput.bytes_per_second(), EPSILON);
}
#[test]
fn test_throughput_log_calculate_throughput_2() {
let (throughput_logs, now) = build_throughput_log(1000, Duration::from_secs(5), 5);
let throughput = throughput_logs.calculate_throughput(now).unwrap();
// Floats being what they are
assert_eq!(1.001001001001001, throughput.bytes_per_second());
let throughput = throughput_logs
.calculate_throughput(now, Duration::from_secs(1000))
.unwrap();
assert_eq!(1.0, throughput.bytes_per_second());
}
#[test]
fn test_throughput_log_calculate_throughput_3() {
let (throughput_logs, now) = build_throughput_log(1000, Duration::from_millis(200), 1024);
let throughput = throughput_logs.calculate_throughput(now).unwrap();
let throughput = throughput_logs
.calculate_throughput(now, Duration::from_secs(5))
.unwrap();
let expected_throughput = 1024.0 * 5.0;
// Floats being what they are
assert_eq!(
expected_throughput + 5.125125125125,
throughput.bytes_per_second()
);
assert_eq!(expected_throughput, throughput.bytes_per_second());
}
#[test]
fn test_throughput_log_calculate_throughput_4() {
let (throughput_logs, now) = build_throughput_log(1000, Duration::from_millis(100), 12);
let throughput = throughput_logs.calculate_throughput(now).unwrap();
let throughput = throughput_logs
.calculate_throughput(now, Duration::from_secs(1))
.unwrap();
let expected_throughput = 12.0 * 10.0;
// Floats being what they are
assert_eq!(
expected_throughput + 0.12012012012012,
throughput.bytes_per_second()
);
assert_eq!(expected_throughput, throughput.bytes_per_second());
}
#[test]
fn test_throughput_followed_by_0() {
let tick = Duration::from_millis(100);
let (mut throughput_logs, now) = build_throughput_log(1000, tick, 12);
let throughput = throughput_logs
.calculate_throughput(now, Duration::from_secs(1))
.unwrap();
let expected_throughput = 12.0 * 10.0;
assert_eq!(expected_throughput, throughput.bytes_per_second());
throughput_logs.push((now + tick, 0));
let throughput = throughput_logs
.calculate_throughput(now + tick, Duration::from_secs(1))
.unwrap();
assert_eq!(108.0, throughput.bytes_per_second());
}
}

View File

@ -0,0 +1,115 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
#![cfg(all(feature = "client", feature = "test-util"))]
use aws_smithy_async::rt::sleep::TokioSleep;
use aws_smithy_async::time::{SystemTimeSource, TimeSource};
use aws_smithy_runtime::client::http::body::minimum_throughput::MinimumThroughputBody;
use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::byte_stream::ByteStream;
use bytes::{BufMut, Bytes, BytesMut};
use hyper_0_14::server::conn::AddrStream;
use hyper_0_14::service::{make_service_fn, service_fn, Service};
use hyper_0_14::{Body, Server};
use std::convert::Infallible;
use std::net::TcpListener;
use std::time::Duration;
fn make_block(sz: usize) -> Bytes {
let mut b = BytesMut::with_capacity(sz);
b.put_bytes(1, sz);
b.freeze()
}
// TODO(postGA): convert this to an actual benchmark
// This test evaluates streaming 1GB of data over the loopback with and without the body wrapper
// enabled. After optimizations, the body wrapper seems to make minimal differences
// NOTE: make sure you run this test in release mode to get a sensible result
#[tokio::test]
#[ignore]
async fn stalled_stream_performance() {
// 1GB
let data_size = 1_000_000_000;
// observed block size during actual HTTP requests
let block_size = 16384;
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let make_service = make_service_fn(move |_connection: &AddrStream| async move {
Ok::<_, Infallible>(service_fn(
move |_: http::Request<hyper_0_14::Body>| async move {
let (mut sender, body) = hyper_0_14::Body::channel();
tokio::task::spawn(async move {
for _i in 0..(data_size / block_size) {
sender
.send_data(make_block(block_size))
.await
.expect("failed to write data");
}
});
Ok::<_, Infallible>(http::Response::new(body))
},
))
});
let addr = format!("http://localhost:{}", listener.local_addr().unwrap().port());
let server = Server::from_tcp(listener).unwrap().serve(make_service);
tokio::spawn(server);
let mut no_wrapping = vec![];
let mut wrapping = vec![];
let runs = 10;
for _i in 0..runs {
no_wrapping.push(make_request(&addr, false).await);
wrapping.push(make_request(&addr, true).await);
}
println!(
"Average w/ wrapping: {}",
wrapping.iter().map(|it| it.as_millis() as f64).sum::<f64>() / runs as f64
);
println!(
"Average w/o wrapping: {}",
no_wrapping
.iter()
.map(|it: &Duration| it.as_millis() as f64)
.sum::<f64>()
/ runs as f64
)
}
async fn make_request(address: &str, wrap_body: bool) -> Duration {
let mut client = hyper_0_14::Client::new();
let req = ::http::Request::builder()
.uri(address)
.body(Body::empty())
.unwrap();
let resp = client.call(req).await;
let body = resp.unwrap().into_body();
let mut body = SdkBody::from_body_0_4(body);
if wrap_body {
body = body.map_preserve_contents(|body| {
let time_source = SystemTimeSource::new();
let sleep = TokioSleep::new();
let opts = StalledStreamProtectionConfig::enabled().build();
let mtb = MinimumThroughputBody::new(time_source, sleep, body, opts.into());
SdkBody::from_body_0_4(mtb)
});
}
let sdk_body = ByteStream::new(body);
let ts = SystemTimeSource::new();
let start = ts.now();
// this a slow way to count total bytes, but we need to actually read the bytes into segments
// otherwise some of our code seems to be optimized away
let total_bytes = sdk_body
.collect()
.await
.unwrap()
.into_segments()
.map(|seg| seg.len())
.sum::<usize>();
println!("total: {:?}", total_bytes);
let end = ts.now();
end.duration_since(start).unwrap()
}

View File

@ -42,6 +42,7 @@ pub struct UpgradeRuntimeCratesVersionArgs {
pub async fn subcommand_upgrade_runtime_crates_version(
args: &UpgradeRuntimeCratesVersionArgs,
) -> Result<(), anyhow::Error> {
check_crate_ver_against_stability(&args.version, PackageStability::Unstable)?;
let upgraded_unstable_version = semver::Version::parse(args.version.as_str())
.with_context(|| format!("{} is not a valid semver version", &args.version))?;
let fs = Fs::Real;
@ -57,9 +58,8 @@ pub async fn subcommand_upgrade_runtime_crates_version(
&args.gradle_properties_path
)
})?;
// TODO(GA): Error out if args.stable_version starts with "0."
// https://github.com/smithy-lang/smithy-rs/pull/3082#discussion_r1378637315
let updated_gradle_properties = if let Some(stable_version) = &args.stable_version {
check_crate_ver_against_stability(stable_version, PackageStability::Stable)?;
let upgraded_stable_version = semver::Version::parse(stable_version.as_str())
.with_context(|| format!("{} is not a valid semver version", &stable_version))?;
update_gradle_properties(
@ -125,9 +125,26 @@ async fn update_gradle_properties_file(
Ok(())
}
fn check_crate_ver_against_stability(
crate_ver: &str,
package_stability: PackageStability,
) -> Result<(), anyhow::Error> {
match package_stability {
PackageStability::Stable if crate_ver.starts_with("0.") => Err(anyhow::Error::msg(
format!("{} is an invalid stable crate version", &crate_ver),
)),
PackageStability::Unstable if !crate_ver.starts_with("0.") => Err(anyhow::Error::msg(
format!("{} is an invalid unstable crate version", &crate_ver),
)),
_ => Ok(()),
}
}
#[cfg(test)]
mod tests {
use crate::subcommand::upgrade_runtime_crates_version::update_gradle_properties;
use crate::subcommand::upgrade_runtime_crates_version::{
check_crate_ver_against_stability, update_gradle_properties,
};
use smithy_rs_tool_common::package::PackageStability;
#[test]
@ -189,4 +206,15 @@ mod tests {
assert!(result.is_err());
assert!(format!("{:?}", result).contains("downgrade"));
}
#[test]
fn test_check_crate_ver_against_stability() {
assert!(check_crate_ver_against_stability("0.60.0", PackageStability::Stable).is_err());
assert!(check_crate_ver_against_stability("1.0.0", PackageStability::Stable).is_ok());
assert!(check_crate_ver_against_stability("2.0.0", PackageStability::Stable).is_ok());
assert!(check_crate_ver_against_stability("0.60.0", PackageStability::Unstable).is_ok());
assert!(check_crate_ver_against_stability("1.0.0", PackageStability::Unstable).is_err());
assert!(check_crate_ver_against_stability("2.0.0", PackageStability::Unstable).is_err());
}
}