From 1cecc3baa645816f661701b4e62a7ad07dbdd898 Mon Sep 17 00:00:00 2001 From: Zelda Hessler Date: Wed, 18 Sep 2024 12:06:58 -0500 Subject: [PATCH] Fix request compression (#3820) ## Description This PR includes several fixes for request compression: - `aws_smithy_compression::body::compress::CompressedBody` will no longer incorrectly return the inner body's `SizeHint`, returning `SizeHint::default()` instead. - Fixed a bug where compressed payloads would have an incorrect content length, causing those requests to hang. - Compress in-memory request payloads instead of the previous lazy approach. ## Testing Tests are included. Additionally, I ran a reproducer for a user issues to ensure that their use case works: ``` #[tokio::test] async fn use_case_reproducer() { tracing_subscriber::fmt::init(); let shared_config = aws_config::from_env() .region(aws_sdk_cloudwatch::config::Region::new("us-west-2")) .load() .await; let service_config = aws_sdk_cloudwatch::config::Config::from(&shared_config) .to_builder() .request_min_compression_size_bytes(1) .build(); let client = Client::from_conf(service_config); tracing::info!("sending metrics..."); client .put_metric_data() .namespace("CloudWatchTestMetricsBrivinc") .metric_data( aws_sdk_cloudwatch::types::MetricDatum::builder() .metric_name(format!("MyMetricNameIsALittleLong")) .value(0.0) .build(), ) .send() .await .unwrap(); } ``` ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._ --------- Co-authored-by: ysaito1001 --- .changelog/1725553080.md | 10 + aws/rust-runtime/Cargo.lock | 10 +- aws/rust-runtime/aws-config/Cargo.lock | 10 +- aws/rust-runtime/aws-config/Cargo.toml | 2 +- .../HttpRequestCompressionDecoratorTest.kt | 251 ++++++++++++++---- rust-runtime/Cargo.lock | 3 +- .../aws-smithy-compression/Cargo.toml | 7 +- .../aws-smithy-compression/src/body.rs | 175 +++++++----- rust-runtime/aws-smithy-types/Cargo.toml | 2 +- rust-runtime/aws-smithy-types/src/body.rs | 5 + .../src/client_request_compression.rs | 30 ++- tools/ci-build/publisher/Cargo.lock | 20 ++ 12 files changed, 380 insertions(+), 145 deletions(-) create mode 100644 .changelog/1725553080.md diff --git a/.changelog/1725553080.md b/.changelog/1725553080.md new file mode 100644 index 0000000000..2af69e4e0e --- /dev/null +++ b/.changelog/1725553080.md @@ -0,0 +1,10 @@ +--- +applies_to: ["client", "aws-sdk-rust"] +authors: ["Velfi"] +references: ["smithy-rs#3820"] +breaking: false +new_feature: false +bug_fix: true +--- + +Fixed a bug with the content length of compressed payloads that caused such requests to hang. diff --git a/aws/rust-runtime/Cargo.lock b/aws/rust-runtime/Cargo.lock index a523cbb5ab..1774e88d38 100644 --- a/aws/rust-runtime/Cargo.lock +++ b/aws/rust-runtime/Cargo.lock @@ -149,7 +149,7 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.4.2" +version = "1.4.3" dependencies = [ "arbitrary", "aws-credential-types", @@ -195,7 +195,7 @@ version = "0.60.3" [[package]] name = "aws-sigv4" -version = "1.2.3" +version = "1.2.4" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -259,7 +259,7 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.4" +version = "0.60.5" dependencies = [ "aws-smithy-types", "bytes", @@ -268,7 +268,7 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.10" +version = "0.60.11" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -343,7 +343,7 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.5" +version = "1.2.6" dependencies = [ "base64-simd", "bytes", diff --git a/aws/rust-runtime/aws-config/Cargo.lock b/aws/rust-runtime/aws-config/Cargo.lock index 179c7e9003..f849dcd596 100644 --- a/aws/rust-runtime/aws-config/Cargo.lock +++ b/aws/rust-runtime/aws-config/Cargo.lock @@ -89,7 +89,7 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.4.2" +version = "1.4.3" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -173,7 +173,7 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.3" +version = "1.2.4" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -203,7 +203,7 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.10" +version = "0.60.11" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -298,7 +298,7 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.5" +version = "1.2.6" dependencies = [ "base64-simd", "bytes", @@ -319,7 +319,7 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.8" +version = "0.60.9" dependencies = [ "xmlparser", ] diff --git a/aws/rust-runtime/aws-config/Cargo.toml b/aws/rust-runtime/aws-config/Cargo.toml index 07361b366d..bedb7d0bcc 100644 --- a/aws/rust-runtime/aws-config/Cargo.toml +++ b/aws/rust-runtime/aws-config/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aws-config" -version = "1.5.6" +version = "1.5.7" authors = [ "AWS Rust SDK Team ", "Russell Cohen ", diff --git a/aws/sdk-codegen/src/test/kotlin/software/amazon/smithy/rustsdk/HttpRequestCompressionDecoratorTest.kt b/aws/sdk-codegen/src/test/kotlin/software/amazon/smithy/rustsdk/HttpRequestCompressionDecoratorTest.kt index 4fadca51ef..65f439fe17 100644 --- a/aws/sdk-codegen/src/test/kotlin/software/amazon/smithy/rustsdk/HttpRequestCompressionDecoratorTest.kt +++ b/aws/sdk-codegen/src/test/kotlin/software/amazon/smithy/rustsdk/HttpRequestCompressionDecoratorTest.kt @@ -15,11 +15,8 @@ import software.amazon.smithy.rust.codegen.core.testutil.integrationTest class HttpRequestCompressionDecoratorTest { companion object { - // Can't use the dollar sign in a multiline string with doing it like this. - private const val PREFIX = "\$version: \"2\"" val model = """ - $PREFIX namespace test use aws.api#service @@ -100,7 +97,7 @@ class HttpRequestCompressionDecoratorTest { @output structure SomeIncompressibleOutput {} - """.asSmithyModel() + """.asSmithyModel(smithyVersion = "2.0") } @Test @@ -147,14 +144,22 @@ class HttpRequestCompressionDecoratorTest { .build(); let client = $moduleName::Client::from_conf(config); - let _ = client.not_a_compressible_operation().body(Blob::new(UNCOMPRESSED_INPUT)).send().await; + let _ = client + .not_a_compressible_operation() + .body(Blob::new(UNCOMPRESSED_INPUT)) + .send() + .await; let request = rx.expect_request(); // Check that the content-encoding header is not set. - assert_eq!(None, request.headers().get("content-encoding")); + assert_eq!(None, request.headers().get(#{http}::header::CONTENT_ENCODING)); - let compressed_body = ByteStream::from(request.into_body()).collect().await.unwrap().to_vec(); + let compressed_body = ByteStream::from(request.into_body()) + .collect() + .await + .unwrap() + .to_vec(); // Assert input body was not compressed - assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice()) + assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice()); } ##[#{tokio}::test] @@ -168,14 +173,25 @@ class HttpRequestCompressionDecoratorTest { .build(); let client = $moduleName::Client::from_conf(config); - let _ = client.some_operation().body(Blob::new(UNCOMPRESSED_INPUT)).send().await; + let _ = client + .some_operation() + .body(Blob::new(UNCOMPRESSED_INPUT)) + .send() + .await; let request = rx.expect_request(); // Check that the content-encoding header is not set to "gzip" - assert_ne!(Some("gzip"), request.headers().get("content-encoding")); + assert_ne!( + Some("gzip"), + request.headers().get(#{http}::header::CONTENT_ENCODING) + ); - let compressed_body = ByteStream::from(request.into_body()).collect().await.unwrap().to_vec(); + let compressed_body = ByteStream::from(request.into_body()) + .collect() + .await + .unwrap() + .to_vec(); // Assert input body was not compressed - assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice()) + assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice()); } ##[#{tokio}::test] @@ -189,15 +205,26 @@ class HttpRequestCompressionDecoratorTest { .request_min_compression_size_bytes(128) .build(); - let client = $moduleName::Client::from_conf(config); - let _ = client.some_operation().body(Blob::new(UNCOMPRESSED_INPUT)).send().await; + let client = $moduleName::Client::from_conf(config); + let _ = client + .some_operation() + .body(Blob::new(UNCOMPRESSED_INPUT)) + .send() + .await; let request = rx.expect_request(); // Check that the content-encoding header is set to "gzip" - assert_eq!(Some("gzip"), request.headers().get("content-encoding")); + assert_eq!( + Some("gzip"), + request.headers().get(#{http}::header::CONTENT_ENCODING) + ); - let compressed_body = ByteStream::from(request.into_body()).collect().await.unwrap().to_vec(); + let compressed_body = ByteStream::from(request.into_body()) + .collect() + .await + .unwrap() + .to_vec(); // Assert input body was compressed - assert_eq!(COMPRESSED_OUTPUT, compressed_body.as_slice()) + assert_eq!(COMPRESSED_OUTPUT, compressed_body.as_slice()); } ##[#{tokio}::test] @@ -211,15 +238,26 @@ class HttpRequestCompressionDecoratorTest { .request_min_compression_size_bytes(256) .build(); - let client = $moduleName::Client::from_conf(config); - let _ = client.some_operation().body(Blob::new(UNCOMPRESSED_INPUT)).send().await; + let client = $moduleName::Client::from_conf(config); + let _ = client + .some_operation() + .body(Blob::new(UNCOMPRESSED_INPUT)) + .send() + .await; let request = rx.expect_request(); // Check that the content-encoding header is not set to "gzip" - assert_ne!(Some("gzip"), request.headers().get("content-encoding")); + assert_ne!( + Some("gzip"), + request.headers().get(#{http}::header::CONTENT_ENCODING) + ); - let compressed_body = ByteStream::from(request.into_body()).collect().await.unwrap().to_vec(); + let compressed_body = ByteStream::from(request.into_body()) + .collect() + .await + .unwrap() + .to_vec(); // Assert input body was not compressed - assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice()) + assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice()); } ##[#{tokio}::test] @@ -232,15 +270,26 @@ class HttpRequestCompressionDecoratorTest { .request_min_compression_size_bytes(128) .build(); - let client = $moduleName::Client::from_conf(config); - let _ = client.some_operation().body(Blob::new(UNCOMPRESSED_INPUT)).send().await; + let client = $moduleName::Client::from_conf(config); + let _ = client + .some_operation() + .body(Blob::new(UNCOMPRESSED_INPUT)) + .send() + .await; let request = rx.expect_request(); // Check that the content-encoding header is set to "gzip" - assert_eq!(Some("gzip"), request.headers().get("content-encoding")); + assert_eq!( + Some("gzip"), + request.headers().get(#{http}::header::CONTENT_ENCODING) + ); - let compressed_body = ByteStream::from(request.into_body()).collect().await.unwrap().to_vec(); + let compressed_body = ByteStream::from(request.into_body()) + .collect() + .await + .unwrap() + .to_vec(); // Assert input body was compressed - assert_eq!(COMPRESSED_OUTPUT, compressed_body.as_slice()) + assert_eq!(COMPRESSED_OUTPUT, compressed_body.as_slice()); } ##[#{tokio}::test] @@ -253,15 +302,26 @@ class HttpRequestCompressionDecoratorTest { .disable_request_compression(false) .build(); - let client = $moduleName::Client::from_conf(config); - let _ = client.some_operation().body(Blob::new(UNCOMPRESSED_INPUT)).send().await; + let client = $moduleName::Client::from_conf(config); + let _ = client + .some_operation() + .body(Blob::new(UNCOMPRESSED_INPUT)) + .send() + .await; let request = rx.expect_request(); // Check that the content-encoding header is not set to "gzip" - assert_ne!(Some("gzip"), request.headers().get("content-encoding")); + assert_ne!( + Some("gzip"), + request.headers().get(#{http}::header::CONTENT_ENCODING) + ); - let compressed_body = ByteStream::from(request.into_body()).collect().await.unwrap().to_vec(); + let compressed_body = ByteStream::from(request.into_body()) + .collect() + .await + .unwrap() + .to_vec(); // Assert input body was not compressed - assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice()) + assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice()); } ##[#{tokio}::test] @@ -276,39 +336,116 @@ class HttpRequestCompressionDecoratorTest { .request_min_compression_size_bytes(128) .build(); - let client = $moduleName::Client::from_conf(config); - // ByteStreams created from a file are streaming and have a known size - let mut file = #{tempfile}::NamedTempFile::new().unwrap(); - use std::io::Write; - file.write_all(UNCOMPRESSED_INPUT).unwrap(); + let client = $moduleName::Client::from_conf(config); + // ByteStreams created from a file are streaming + let mut file = #{tempfile}::NamedTempFile::new().unwrap(); + use std::io::Write; + file.write_all(UNCOMPRESSED_INPUT).unwrap(); - let body = ByteStream::read_from() - .path(file.path()) - .buffer_size(1024) - .length(#{Length}::Exact(UNCOMPRESSED_INPUT.len() as u64)) - .build() - .await - .unwrap(); - let _ = client - .some_streaming_operation() - .body(body) - .send() - .await; - let request = rx.expect_request(); - // Check that the content-encoding header is set to "gzip" - assert_eq!(Some("gzip"), request.headers().get("content-encoding")); + let body = ByteStream::read_from() + .path(file.path()) + .buffer_size(1024) + .length(::aws_smithy_types::byte_stream::Length::Exact( + UNCOMPRESSED_INPUT.len() as u64, + )) + .build() + .await + .unwrap(); + let _ = client.some_streaming_operation().body(body).send().await; + let request = rx.expect_request(); + // Check that the content-encoding header is set to "gzip" + assert_eq!( + Some("gzip"), + request.headers().get(#{http}::header::CONTENT_ENCODING) + ); - let compressed_body = ByteStream::from(request.into_body()).collect().await.unwrap().to_vec(); - // Assert input body is different from uncompressed input - assert_ne!(UNCOMPRESSED_INPUT, compressed_body.as_slice()); - // Assert input body was compressed - assert_eq!(COMPRESSED_OUTPUT, compressed_body.as_slice()); + let compressed_body = ByteStream::from(request.into_body()) + .collect() + .await + .unwrap() + .to_vec(); + // Assert input body is different from uncompressed input + assert_ne!( + UNCOMPRESSED_INPUT, + compressed_body.as_slice(), + "input was not compressed" + ); + // Assert input body was compressed + assert_eq!(COMPRESSED_OUTPUT, compressed_body.as_slice()); + } + + ##[#{tokio}::test] + async fn test_compressed_content_length() { + let (http_client, rx) = ::aws_smithy_runtime::client::http::test_util::capture_request(None); + let config = $moduleName::Config::builder() + .region(Region::from_static("doesntmatter")) + .with_test_defaults() + .http_client(http_client) + .disable_request_compression(false) + .request_min_compression_size_bytes(0) + .build(); + + let client = $moduleName::Client::from_conf(config); + let _ = client + .some_operation() + .body(Blob::new(UNCOMPRESSED_INPUT)) + .send() + .await; + let request = rx.expect_request(); + // Check that the content-length header is set correctly. + if let Some(content_length) = request + .headers() + .get(#{http}::header::CONTENT_LENGTH) + .and_then(|len| len.parse::().ok()) + { + assert_ne!( + content_length, UNCOMPRESSED_INPUT.len(), + "`content-length` of in-memory payload was incorrectly set to the length of the uncompressed input but should have been set to the length of the compressed payload" + ); + assert_eq!(COMPRESSED_OUTPUT.len(), content_length); + } + + let (http_client, rx) = ::aws_smithy_runtime::client::http::test_util::capture_request(None); + let config = $moduleName::Config::builder() + .region(Region::from_static("doesntmatter")) + .with_test_defaults() + .http_client(http_client) + .disable_request_compression(false) + .request_min_compression_size_bytes(0) + .build(); + + let client = $moduleName::Client::from_conf(config); + // ByteStreams created from a file are streaming + let mut file = #{tempfile}::NamedTempFile::new().unwrap(); + use std::io::Write; + file.write_all(UNCOMPRESSED_INPUT).unwrap(); + + let body = ByteStream::read_from() + .path(file.path()) + .buffer_size(1024) + .length(::aws_smithy_types::byte_stream::Length::Exact( + UNCOMPRESSED_INPUT.len() as u64, + )) + .build() + .await + .unwrap(); + let _ = client.some_streaming_operation().body(body).send().await; + let request = rx.expect_request(); + + assert!( + request + .headers() + .get(#{http}::header::CONTENT_LENGTH) + .is_none(), + "expected that no content length header is set because the request is streaming." + ); } """, *preludeScope, "ByteStream" to RuntimeType.smithyTypes(rc).resolve("byte_stream::ByteStream"), "Blob" to RuntimeType.smithyTypes(rc).resolve("Blob"), "Region" to AwsRuntimeType.awsTypes(rc).resolve("region::Region"), + "http" to CargoDependency.Http.toType(), "tokio" to CargoDependency.Tokio.toType(), "capture_request" to RuntimeType.captureRequest(rc), "pretty_assertions" to CargoDependency.PrettyAssertions.toType(), diff --git a/rust-runtime/Cargo.lock b/rust-runtime/Cargo.lock index 24d29e233b..5373a5e3ca 100644 --- a/rust-runtime/Cargo.lock +++ b/rust-runtime/Cargo.lock @@ -369,13 +369,14 @@ version = "0.60.3" [[package]] name = "aws-smithy-compression" -version = "0.0.1" +version = "0.0.2" dependencies = [ "aws-smithy-runtime-api 1.7.2", "aws-smithy-types 1.2.6", "bytes", "bytes-utils", "flate2", + "futures-util", "http 0.2.12", "http 1.1.0", "http-body 0.4.6", diff --git a/rust-runtime/aws-smithy-compression/Cargo.toml b/rust-runtime/aws-smithy-compression/Cargo.toml index 95ecba68f0..3d8552d455 100644 --- a/rust-runtime/aws-smithy-compression/Cargo.toml +++ b/rust-runtime/aws-smithy-compression/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aws-smithy-compression" -version = "0.0.1" +version = "0.0.2" authors = [ "AWS Rust SDK Team ", "Zelda Hessler ", @@ -30,13 +30,14 @@ aws-smithy-types = { path = "../aws-smithy-types" } aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api" } bytes = "1.4.0" flate2 = "1.0.30" -pin-project-lite = "0.2.14" -tracing = "0.1.40" +futures-util = "0.3" http-0-2 = { package = "http", version = "0.2.9", optional = true } http-1-0 = { package = "http", version = "1", optional = true } http-body-0-4 = { package = "http-body", version = "0.4.5", optional = true } http-body-1-0 = { package = "http-body", version = "1", optional = true } http-body-util = { version = "0.1.1", optional = true } +pin-project-lite = "0.2.14" +tracing = "0.1.40" [dev-dependencies] bytes-utils = "0.1.2" diff --git a/rust-runtime/aws-smithy-compression/src/body.rs b/rust-runtime/aws-smithy-compression/src/body.rs index a91b3f76ed..59a547723e 100644 --- a/rust-runtime/aws-smithy-compression/src/body.rs +++ b/rust-runtime/aws-smithy-compression/src/body.rs @@ -21,6 +21,7 @@ pub mod compress { #[pin] body: InnerBody, compress_request: CompressionImpl, + is_end_stream: bool, } } @@ -30,6 +31,7 @@ pub mod compress { Self { body, compress_request, + is_end_stream: false, } } } @@ -39,6 +41,7 @@ pub mod compress { pub mod http_body_0_4_x { use super::CompressedBody; use crate::http::http_body_0_4_x::CompressRequest; + use aws_smithy_runtime_api::box_error::BoxError; use aws_smithy_types::body::SdkBody; use http_0_2::HeaderMap; use http_body_0_4::{Body, SizeHint}; @@ -60,7 +63,10 @@ pub mod compress { this.compress_request.compress_bytes(&data[..], &mut out)?; Poll::Ready(Some(Ok(out.into()))) } - Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(None) => { + *this.is_end_stream = true; + Poll::Ready(None) + } Poll::Pending => Poll::Pending, } } @@ -74,11 +80,28 @@ pub mod compress { } fn is_end_stream(&self) -> bool { - self.body.is_end_stream() + self.is_end_stream } fn size_hint(&self) -> SizeHint { - self.body.size_hint() + // We can't return a hint because we don't know exactly how + // compression will affect the content length + SizeHint::default() + } + } + + impl CompressedBody> { + /// Consumes this `CompressedBody` and returns an [`SdkBody`] containing the compressed data. + /// + /// This *requires* that the inner `SdkBody` is in-memory (i.e. not streaming). Otherwise, an error is returned. + /// If compression fails, an error is returned. + pub fn into_compressed_sdk_body(mut self) -> Result { + let mut compressed_body = Vec::new(); + let bytes = self.body.bytes().ok_or_else(|| "`into_compressed_sdk_body` requires that the inner body is 'in-memory', but it was streaming".to_string())?; + + self.compress_request + .compress_bytes(bytes, &mut compressed_body)?; + Ok(SdkBody::from(compressed_body)) } } } @@ -86,7 +109,7 @@ pub mod compress { /// Support for the `http-body-1-0` and `http-1-0` crates. #[cfg(feature = "http-body-1-x")] pub mod http_body_1_x { - use super::CompressedBody; + use crate::body::compress::CompressedBody; use crate::http::http_body_1_x::CompressRequest; use aws_smithy_types::body::SdkBody; use http_body_1_0::{Body, Frame, SizeHint}; @@ -116,16 +139,22 @@ pub mod compress { unreachable!("Frame is either data or trailers") } } + None => { + *this.is_end_stream = true; + None + } other => other, }) } fn is_end_stream(&self) -> bool { - self.body.is_end_stream() + self.is_end_stream } fn size_hint(&self) -> SizeHint { - self.body.size_hint() + // We can't return a hint because we don't know exactly how + // compression will affect the content length + SizeHint::default() } } } @@ -134,6 +163,12 @@ pub mod compress { #[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x"))] #[cfg(test)] mod test { + use crate::body::compress::CompressedBody; + use crate::{CompressionAlgorithm, CompressionOptions}; + use aws_smithy_types::body::SdkBody; + use bytes::Buf; + use bytes_utils::SegmentedBuf; + use std::io::Read; const UNCOMPRESSED_INPUT: &[u8] = b"hello world"; const COMPRESSED_OUTPUT: &[u8] = &[ 31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 203, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 0, @@ -141,77 +176,89 @@ mod test { ]; #[cfg(feature = "http-body-0-4-x")] - #[tokio::test] - async fn test_compressed_body_http_body_0_4_x() { - use super::super::{CompressionAlgorithm, CompressionOptions}; - use crate::body::compress::CompressedBody; - use aws_smithy_types::body::SdkBody; - use bytes::Buf; - use bytes_utils::SegmentedBuf; + mod http_body_0_4_x { + use super::*; use http_body_0_4::Body; - use std::io::Read; - let compression_algorithm = CompressionAlgorithm::Gzip; - let compression_options = CompressionOptions::default() - .with_min_compression_size_bytes(0) - .unwrap(); - let compress_request = - compression_algorithm.into_impl_http_body_0_4_x(&compression_options); - let body = SdkBody::from(UNCOMPRESSED_INPUT); - let mut compressed_body = CompressedBody::new(body, compress_request); + #[tokio::test] + async fn test_body_is_compressed() { + let compression_options = CompressionOptions::default() + .with_min_compression_size_bytes(0) + .unwrap(); + let compress_request = + CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&compression_options); + let body = SdkBody::from(UNCOMPRESSED_INPUT); + let mut compressed_body = CompressedBody::new(body, compress_request); - let mut output = SegmentedBuf::new(); - while let Some(buf) = compressed_body.data().await { - output.push(buf.unwrap()); + let mut output = SegmentedBuf::new(); + while let Some(buf) = compressed_body.data().await { + output.push(buf.unwrap()); + } + + let mut actual_output = Vec::new(); + output + .reader() + .read_to_end(&mut actual_output) + .expect("Doesn't cause IO errors"); + // Verify data is compressed as expected + assert_eq!(COMPRESSED_OUTPUT, actual_output); } - let mut actual_output = Vec::new(); - output - .reader() - .read_to_end(&mut actual_output) - .expect("Doesn't cause IO errors"); - // Verify data is compressed as expected - assert_eq!(COMPRESSED_OUTPUT, actual_output); + #[tokio::test] + async fn test_into_compressed_sdk_body() { + let compression_options = CompressionOptions::default() + .with_min_compression_size_bytes(0) + .unwrap(); + let compress_request = + CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&compression_options); + let body = SdkBody::from(UNCOMPRESSED_INPUT); + let compressed_sdk_body = CompressedBody::new(body, compress_request) + .into_compressed_sdk_body() + .unwrap(); + + // Verify data is compressed as expected + assert_eq!( + COMPRESSED_OUTPUT, + compressed_sdk_body.bytes().expect("body is in-memory") + ); + } } #[cfg(feature = "http-body-1-x")] - #[tokio::test] - async fn test_compressed_body_http_body_1_x() { - use super::super::{CompressionAlgorithm, CompressionOptions}; - use crate::body::compress::CompressedBody; - use aws_smithy_types::body::SdkBody; - use bytes::Buf; - use bytes_utils::SegmentedBuf; + mod http_body_1_x { + use super::*; use http_body_util::BodyExt; - use std::io::Read; - let compression_algorithm = CompressionAlgorithm::Gzip; - let compression_options = CompressionOptions::default() - .with_min_compression_size_bytes(0) - .unwrap(); - let compress_request = compression_algorithm.into_impl_http_body_1_x(&compression_options); - let body = SdkBody::from(UNCOMPRESSED_INPUT); - let mut compressed_body = CompressedBody::new(body, compress_request); + #[tokio::test] + async fn test_body_is_compressed() { + let compression_options = CompressionOptions::default() + .with_min_compression_size_bytes(0) + .unwrap(); + let compress_request = + CompressionAlgorithm::Gzip.into_impl_http_body_1_x(&compression_options); + let body = SdkBody::from(UNCOMPRESSED_INPUT); + let mut compressed_body = CompressedBody::new(body, compress_request); - let mut output = SegmentedBuf::new(); + let mut output = SegmentedBuf::new(); - loop { - let data = match compressed_body.frame().await { - Some(Ok(frame)) => frame.into_data(), - Some(Err(e)) => panic!("Error: {}", e), - // No more frames, break out of loop - None => break, + loop { + let data = match compressed_body.frame().await { + Some(Ok(frame)) => frame.into_data(), + Some(Err(e)) => panic!("Error: {}", e), + // No more frames, break out of loop + None => break, + } + .expect("frame is OK"); + output.push(data); } - .expect("frame is OK"); - output.push(data); - } - let mut actual_output = Vec::new(); - output - .reader() - .read_to_end(&mut actual_output) - .expect("Doesn't cause IO errors"); - // Verify data is compressed as expected - assert_eq!(COMPRESSED_OUTPUT, actual_output); + let mut actual_output = Vec::new(); + output + .reader() + .read_to_end(&mut actual_output) + .expect("Doesn't cause IO errors"); + // Verify data is compressed as expected + assert_eq!(COMPRESSED_OUTPUT, actual_output); + } } } diff --git a/rust-runtime/aws-smithy-types/Cargo.toml b/rust-runtime/aws-smithy-types/Cargo.toml index c91e7d3ca6..7b52b8b6d8 100644 --- a/rust-runtime/aws-smithy-types/Cargo.toml +++ b/rust-runtime/aws-smithy-types/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aws-smithy-types" -version = "1.2.6" +version = "1.2.7" authors = [ "AWS Rust SDK Team ", "Russell Cohen ", diff --git a/rust-runtime/aws-smithy-types/src/body.rs b/rust-runtime/aws-smithy-types/src/body.rs index 0065043966..f4e4285a8c 100644 --- a/rust-runtime/aws-smithy-types/src/body.rs +++ b/rust-runtime/aws-smithy-types/src/body.rs @@ -234,6 +234,11 @@ impl SdkBody { }) } + /// Return `true` if this SdkBody is streaming, `false` if it is in-memory. + pub fn is_streaming(&self) -> bool { + matches!(self.inner, Inner::Dyn { .. }) + } + /// Return the length, in bytes, of this SdkBody. If this returns `None`, then the body does not /// have a known length. pub fn content_length(&self) -> Option { diff --git a/rust-runtime/inlineable/src/client_request_compression.rs b/rust-runtime/inlineable/src/client_request_compression.rs index 6211ca88ca..92063dd739 100644 --- a/rust-runtime/inlineable/src/client_request_compression.rs +++ b/rust-runtime/inlineable/src/client_request_compression.rs @@ -98,6 +98,7 @@ impl Intercept for RequestCompressionInterceptor { layer.store_put(RequestCompressionInterceptorState { options: Some(options), }); + cfg.push_layer(layer); Ok(()) @@ -126,7 +127,8 @@ impl Intercept for RequestCompressionInterceptor { // // Because compressing small amounts of data can actually increase its size, // we check to see if the data is big enough to make compression worthwhile. - if let Some(known_size) = http_body::Body::size_hint(request.body()).exact() { + let size_hint = http_body::Body::size_hint(request.body()).exact(); + if let Some(known_size) = size_hint { if known_size < options.min_compression_size_bytes() as u64 { tracing::trace!( min_compression_size_bytes = options.min_compression_size_bytes(), @@ -135,16 +137,15 @@ impl Intercept for RequestCompressionInterceptor { ); return Ok(()); } - tracing::trace!("compressing non-streaming request body...") + tracing::trace!("compressing sized request body..."); } else { - tracing::trace!("compressing streaming request body..."); + tracing::trace!("compressing unsized request body..."); } wrap_request_body_in_compressed_body( request, CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&options), )?; - cfg.interceptor_state() .store_append::(SmithySdkFeature::GzipRequestCompression); @@ -162,11 +163,24 @@ fn wrap_request_body_in_compressed_body( ); let mut body = { let body = mem::replace(request.body_mut(), SdkBody::taken()); - body.map(move |body| { - let body = CompressedBody::new(body, request_compress_impl.clone()); - SdkBody::from_body_0_4(body) - }) + if body.is_streaming() { + request.headers_mut().remove(http::header::CONTENT_LENGTH); + body.map(move |body| { + let body = CompressedBody::new(body, request_compress_impl.clone()); + SdkBody::from_body_0_4(body) + }) + } else { + let body = CompressedBody::new(body, request_compress_impl.clone()); + let body = body.into_compressed_sdk_body().map_err(BuildError::other)?; + + let content_length = body.content_length().expect("this payload is in-memory"); + request + .headers_mut() + .insert(http::header::CONTENT_LENGTH, content_length.to_string()); + + body + } }; mem::swap(request.body_mut(), &mut body); diff --git a/tools/ci-build/publisher/Cargo.lock b/tools/ci-build/publisher/Cargo.lock index 6a12b7e0f4..e69a7b72a5 100644 --- a/tools/ci-build/publisher/Cargo.lock +++ b/tools/ci-build/publisher/Cargo.lock @@ -1231,6 +1231,19 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.9.34+deprecated" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" +dependencies = [ + "indexmap 2.2.6", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sha2" version = "0.10.8" @@ -1302,6 +1315,7 @@ dependencies = [ "semver", "serde", "serde_json", + "serde_yaml", "thiserror", "tokio", "toml 0.5.11", @@ -1674,6 +1688,12 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" +[[package]] +name = "unsafe-libyaml" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" + [[package]] name = "url" version = "2.5.0"