Fix request compression (#3820)

## Description
<!--- Describe your changes in detail -->
This PR includes several fixes for request compression:
- `aws_smithy_compression::body::compress::CompressedBody` will no
longer incorrectly return the inner body's `SizeHint`, returning
`SizeHint::default()` instead.
- Fixed a bug where compressed payloads would have an incorrect content
length, causing those requests to hang.
- Compress in-memory request payloads instead of the previous lazy
approach.

## Testing
<!--- Please describe in detail how you tested your changes -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->
Tests are included. Additionally, I ran a reproducer for a user issues
to ensure that their use case works:

```
#[tokio::test]
async fn use_case_reproducer() {
    tracing_subscriber::fmt::init();
    let shared_config = aws_config::from_env()
        .region(aws_sdk_cloudwatch::config::Region::new("us-west-2"))
        .load()
        .await;
    let service_config = aws_sdk_cloudwatch::config::Config::from(&shared_config)
        .to_builder()
        .request_min_compression_size_bytes(1)
        .build();
    let client = Client::from_conf(service_config);

    tracing::info!("sending metrics...");
    client
        .put_metric_data()
        .namespace("CloudWatchTestMetricsBrivinc")
        .metric_data(
            aws_sdk_cloudwatch::types::MetricDatum::builder()
                .metric_name(format!("MyMetricNameIsALittleLong"))
                .value(0.0)
                .build(),
        )
        .send()
        .await
        .unwrap();
}
``` 

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._

---------

Co-authored-by: ysaito1001 <awsaito@amazon.com>
This commit is contained in:
Zelda Hessler 2024-09-18 12:06:58 -05:00 committed by GitHub
parent ec226c0223
commit 1cecc3baa6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 380 additions and 145 deletions

10
.changelog/1725553080.md Normal file
View File

@ -0,0 +1,10 @@
---
applies_to: ["client", "aws-sdk-rust"]
authors: ["Velfi"]
references: ["smithy-rs#3820"]
breaking: false
new_feature: false
bug_fix: true
---
Fixed a bug with the content length of compressed payloads that caused such requests to hang.

View File

@ -149,7 +149,7 @@ dependencies = [
[[package]]
name = "aws-runtime"
version = "1.4.2"
version = "1.4.3"
dependencies = [
"arbitrary",
"aws-credential-types",
@ -195,7 +195,7 @@ version = "0.60.3"
[[package]]
name = "aws-sigv4"
version = "1.2.3"
version = "1.2.4"
dependencies = [
"aws-credential-types",
"aws-smithy-eventstream",
@ -259,7 +259,7 @@ dependencies = [
[[package]]
name = "aws-smithy-eventstream"
version = "0.60.4"
version = "0.60.5"
dependencies = [
"aws-smithy-types",
"bytes",
@ -268,7 +268,7 @@ dependencies = [
[[package]]
name = "aws-smithy-http"
version = "0.60.10"
version = "0.60.11"
dependencies = [
"aws-smithy-runtime-api",
"aws-smithy-types",
@ -343,7 +343,7 @@ dependencies = [
[[package]]
name = "aws-smithy-types"
version = "1.2.5"
version = "1.2.6"
dependencies = [
"base64-simd",
"bytes",

View File

@ -89,7 +89,7 @@ dependencies = [
[[package]]
name = "aws-runtime"
version = "1.4.2"
version = "1.4.3"
dependencies = [
"aws-credential-types",
"aws-sigv4",
@ -173,7 +173,7 @@ dependencies = [
[[package]]
name = "aws-sigv4"
version = "1.2.3"
version = "1.2.4"
dependencies = [
"aws-credential-types",
"aws-smithy-http",
@ -203,7 +203,7 @@ dependencies = [
[[package]]
name = "aws-smithy-http"
version = "0.60.10"
version = "0.60.11"
dependencies = [
"aws-smithy-runtime-api",
"aws-smithy-types",
@ -298,7 +298,7 @@ dependencies = [
[[package]]
name = "aws-smithy-types"
version = "1.2.5"
version = "1.2.6"
dependencies = [
"base64-simd",
"bytes",
@ -319,7 +319,7 @@ dependencies = [
[[package]]
name = "aws-smithy-xml"
version = "0.60.8"
version = "0.60.9"
dependencies = [
"xmlparser",
]

View File

@ -1,6 +1,6 @@
[package]
name = "aws-config"
version = "1.5.6"
version = "1.5.7"
authors = [
"AWS Rust SDK Team <aws-sdk-rust@amazon.com>",
"Russell Cohen <rcoh@amazon.com>",

View File

@ -15,11 +15,8 @@ import software.amazon.smithy.rust.codegen.core.testutil.integrationTest
class HttpRequestCompressionDecoratorTest {
companion object {
// Can't use the dollar sign in a multiline string with doing it like this.
private const val PREFIX = "\$version: \"2\""
val model =
"""
$PREFIX
namespace test
use aws.api#service
@ -100,7 +97,7 @@ class HttpRequestCompressionDecoratorTest {
@output
structure SomeIncompressibleOutput {}
""".asSmithyModel()
""".asSmithyModel(smithyVersion = "2.0")
}
@Test
@ -147,14 +144,22 @@ class HttpRequestCompressionDecoratorTest {
.build();
let client = $moduleName::Client::from_conf(config);
let _ = client.not_a_compressible_operation().body(Blob::new(UNCOMPRESSED_INPUT)).send().await;
let _ = client
.not_a_compressible_operation()
.body(Blob::new(UNCOMPRESSED_INPUT))
.send()
.await;
let request = rx.expect_request();
// Check that the content-encoding header is not set.
assert_eq!(None, request.headers().get("content-encoding"));
assert_eq!(None, request.headers().get(#{http}::header::CONTENT_ENCODING));
let compressed_body = ByteStream::from(request.into_body()).collect().await.unwrap().to_vec();
let compressed_body = ByteStream::from(request.into_body())
.collect()
.await
.unwrap()
.to_vec();
// Assert input body was not compressed
assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice())
assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice());
}
##[#{tokio}::test]
@ -168,14 +173,25 @@ class HttpRequestCompressionDecoratorTest {
.build();
let client = $moduleName::Client::from_conf(config);
let _ = client.some_operation().body(Blob::new(UNCOMPRESSED_INPUT)).send().await;
let _ = client
.some_operation()
.body(Blob::new(UNCOMPRESSED_INPUT))
.send()
.await;
let request = rx.expect_request();
// Check that the content-encoding header is not set to "gzip"
assert_ne!(Some("gzip"), request.headers().get("content-encoding"));
assert_ne!(
Some("gzip"),
request.headers().get(#{http}::header::CONTENT_ENCODING)
);
let compressed_body = ByteStream::from(request.into_body()).collect().await.unwrap().to_vec();
let compressed_body = ByteStream::from(request.into_body())
.collect()
.await
.unwrap()
.to_vec();
// Assert input body was not compressed
assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice())
assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice());
}
##[#{tokio}::test]
@ -189,15 +205,26 @@ class HttpRequestCompressionDecoratorTest {
.request_min_compression_size_bytes(128)
.build();
let client = $moduleName::Client::from_conf(config);
let _ = client.some_operation().body(Blob::new(UNCOMPRESSED_INPUT)).send().await;
let client = $moduleName::Client::from_conf(config);
let _ = client
.some_operation()
.body(Blob::new(UNCOMPRESSED_INPUT))
.send()
.await;
let request = rx.expect_request();
// Check that the content-encoding header is set to "gzip"
assert_eq!(Some("gzip"), request.headers().get("content-encoding"));
assert_eq!(
Some("gzip"),
request.headers().get(#{http}::header::CONTENT_ENCODING)
);
let compressed_body = ByteStream::from(request.into_body()).collect().await.unwrap().to_vec();
let compressed_body = ByteStream::from(request.into_body())
.collect()
.await
.unwrap()
.to_vec();
// Assert input body was compressed
assert_eq!(COMPRESSED_OUTPUT, compressed_body.as_slice())
assert_eq!(COMPRESSED_OUTPUT, compressed_body.as_slice());
}
##[#{tokio}::test]
@ -211,15 +238,26 @@ class HttpRequestCompressionDecoratorTest {
.request_min_compression_size_bytes(256)
.build();
let client = $moduleName::Client::from_conf(config);
let _ = client.some_operation().body(Blob::new(UNCOMPRESSED_INPUT)).send().await;
let client = $moduleName::Client::from_conf(config);
let _ = client
.some_operation()
.body(Blob::new(UNCOMPRESSED_INPUT))
.send()
.await;
let request = rx.expect_request();
// Check that the content-encoding header is not set to "gzip"
assert_ne!(Some("gzip"), request.headers().get("content-encoding"));
assert_ne!(
Some("gzip"),
request.headers().get(#{http}::header::CONTENT_ENCODING)
);
let compressed_body = ByteStream::from(request.into_body()).collect().await.unwrap().to_vec();
let compressed_body = ByteStream::from(request.into_body())
.collect()
.await
.unwrap()
.to_vec();
// Assert input body was not compressed
assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice())
assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice());
}
##[#{tokio}::test]
@ -232,15 +270,26 @@ class HttpRequestCompressionDecoratorTest {
.request_min_compression_size_bytes(128)
.build();
let client = $moduleName::Client::from_conf(config);
let _ = client.some_operation().body(Blob::new(UNCOMPRESSED_INPUT)).send().await;
let client = $moduleName::Client::from_conf(config);
let _ = client
.some_operation()
.body(Blob::new(UNCOMPRESSED_INPUT))
.send()
.await;
let request = rx.expect_request();
// Check that the content-encoding header is set to "gzip"
assert_eq!(Some("gzip"), request.headers().get("content-encoding"));
assert_eq!(
Some("gzip"),
request.headers().get(#{http}::header::CONTENT_ENCODING)
);
let compressed_body = ByteStream::from(request.into_body()).collect().await.unwrap().to_vec();
let compressed_body = ByteStream::from(request.into_body())
.collect()
.await
.unwrap()
.to_vec();
// Assert input body was compressed
assert_eq!(COMPRESSED_OUTPUT, compressed_body.as_slice())
assert_eq!(COMPRESSED_OUTPUT, compressed_body.as_slice());
}
##[#{tokio}::test]
@ -253,15 +302,26 @@ class HttpRequestCompressionDecoratorTest {
.disable_request_compression(false)
.build();
let client = $moduleName::Client::from_conf(config);
let _ = client.some_operation().body(Blob::new(UNCOMPRESSED_INPUT)).send().await;
let client = $moduleName::Client::from_conf(config);
let _ = client
.some_operation()
.body(Blob::new(UNCOMPRESSED_INPUT))
.send()
.await;
let request = rx.expect_request();
// Check that the content-encoding header is not set to "gzip"
assert_ne!(Some("gzip"), request.headers().get("content-encoding"));
assert_ne!(
Some("gzip"),
request.headers().get(#{http}::header::CONTENT_ENCODING)
);
let compressed_body = ByteStream::from(request.into_body()).collect().await.unwrap().to_vec();
let compressed_body = ByteStream::from(request.into_body())
.collect()
.await
.unwrap()
.to_vec();
// Assert input body was not compressed
assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice())
assert_eq!(UNCOMPRESSED_INPUT, compressed_body.as_slice());
}
##[#{tokio}::test]
@ -276,39 +336,116 @@ class HttpRequestCompressionDecoratorTest {
.request_min_compression_size_bytes(128)
.build();
let client = $moduleName::Client::from_conf(config);
// ByteStreams created from a file are streaming and have a known size
let mut file = #{tempfile}::NamedTempFile::new().unwrap();
use std::io::Write;
file.write_all(UNCOMPRESSED_INPUT).unwrap();
let client = $moduleName::Client::from_conf(config);
// ByteStreams created from a file are streaming
let mut file = #{tempfile}::NamedTempFile::new().unwrap();
use std::io::Write;
file.write_all(UNCOMPRESSED_INPUT).unwrap();
let body = ByteStream::read_from()
.path(file.path())
.buffer_size(1024)
.length(#{Length}::Exact(UNCOMPRESSED_INPUT.len() as u64))
.build()
.await
.unwrap();
let _ = client
.some_streaming_operation()
.body(body)
.send()
.await;
let request = rx.expect_request();
// Check that the content-encoding header is set to "gzip"
assert_eq!(Some("gzip"), request.headers().get("content-encoding"));
let body = ByteStream::read_from()
.path(file.path())
.buffer_size(1024)
.length(::aws_smithy_types::byte_stream::Length::Exact(
UNCOMPRESSED_INPUT.len() as u64,
))
.build()
.await
.unwrap();
let _ = client.some_streaming_operation().body(body).send().await;
let request = rx.expect_request();
// Check that the content-encoding header is set to "gzip"
assert_eq!(
Some("gzip"),
request.headers().get(#{http}::header::CONTENT_ENCODING)
);
let compressed_body = ByteStream::from(request.into_body()).collect().await.unwrap().to_vec();
// Assert input body is different from uncompressed input
assert_ne!(UNCOMPRESSED_INPUT, compressed_body.as_slice());
// Assert input body was compressed
assert_eq!(COMPRESSED_OUTPUT, compressed_body.as_slice());
let compressed_body = ByteStream::from(request.into_body())
.collect()
.await
.unwrap()
.to_vec();
// Assert input body is different from uncompressed input
assert_ne!(
UNCOMPRESSED_INPUT,
compressed_body.as_slice(),
"input was not compressed"
);
// Assert input body was compressed
assert_eq!(COMPRESSED_OUTPUT, compressed_body.as_slice());
}
##[#{tokio}::test]
async fn test_compressed_content_length() {
let (http_client, rx) = ::aws_smithy_runtime::client::http::test_util::capture_request(None);
let config = $moduleName::Config::builder()
.region(Region::from_static("doesntmatter"))
.with_test_defaults()
.http_client(http_client)
.disable_request_compression(false)
.request_min_compression_size_bytes(0)
.build();
let client = $moduleName::Client::from_conf(config);
let _ = client
.some_operation()
.body(Blob::new(UNCOMPRESSED_INPUT))
.send()
.await;
let request = rx.expect_request();
// Check that the content-length header is set correctly.
if let Some(content_length) = request
.headers()
.get(#{http}::header::CONTENT_LENGTH)
.and_then(|len| len.parse::<usize>().ok())
{
assert_ne!(
content_length, UNCOMPRESSED_INPUT.len(),
"`content-length` of in-memory payload was incorrectly set to the length of the uncompressed input but should have been set to the length of the compressed payload"
);
assert_eq!(COMPRESSED_OUTPUT.len(), content_length);
}
let (http_client, rx) = ::aws_smithy_runtime::client::http::test_util::capture_request(None);
let config = $moduleName::Config::builder()
.region(Region::from_static("doesntmatter"))
.with_test_defaults()
.http_client(http_client)
.disable_request_compression(false)
.request_min_compression_size_bytes(0)
.build();
let client = $moduleName::Client::from_conf(config);
// ByteStreams created from a file are streaming
let mut file = #{tempfile}::NamedTempFile::new().unwrap();
use std::io::Write;
file.write_all(UNCOMPRESSED_INPUT).unwrap();
let body = ByteStream::read_from()
.path(file.path())
.buffer_size(1024)
.length(::aws_smithy_types::byte_stream::Length::Exact(
UNCOMPRESSED_INPUT.len() as u64,
))
.build()
.await
.unwrap();
let _ = client.some_streaming_operation().body(body).send().await;
let request = rx.expect_request();
assert!(
request
.headers()
.get(#{http}::header::CONTENT_LENGTH)
.is_none(),
"expected that no content length header is set because the request is streaming."
);
}
""",
*preludeScope,
"ByteStream" to RuntimeType.smithyTypes(rc).resolve("byte_stream::ByteStream"),
"Blob" to RuntimeType.smithyTypes(rc).resolve("Blob"),
"Region" to AwsRuntimeType.awsTypes(rc).resolve("region::Region"),
"http" to CargoDependency.Http.toType(),
"tokio" to CargoDependency.Tokio.toType(),
"capture_request" to RuntimeType.captureRequest(rc),
"pretty_assertions" to CargoDependency.PrettyAssertions.toType(),

View File

@ -369,13 +369,14 @@ version = "0.60.3"
[[package]]
name = "aws-smithy-compression"
version = "0.0.1"
version = "0.0.2"
dependencies = [
"aws-smithy-runtime-api 1.7.2",
"aws-smithy-types 1.2.6",
"bytes",
"bytes-utils",
"flate2",
"futures-util",
"http 0.2.12",
"http 1.1.0",
"http-body 0.4.6",

View File

@ -1,6 +1,6 @@
[package]
name = "aws-smithy-compression"
version = "0.0.1"
version = "0.0.2"
authors = [
"AWS Rust SDK Team <aws-sdk-rust@amazon.com>",
"Zelda Hessler <zhessler@amazon.com>",
@ -30,13 +30,14 @@ aws-smithy-types = { path = "../aws-smithy-types" }
aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api" }
bytes = "1.4.0"
flate2 = "1.0.30"
pin-project-lite = "0.2.14"
tracing = "0.1.40"
futures-util = "0.3"
http-0-2 = { package = "http", version = "0.2.9", optional = true }
http-1-0 = { package = "http", version = "1", optional = true }
http-body-0-4 = { package = "http-body", version = "0.4.5", optional = true }
http-body-1-0 = { package = "http-body", version = "1", optional = true }
http-body-util = { version = "0.1.1", optional = true }
pin-project-lite = "0.2.14"
tracing = "0.1.40"
[dev-dependencies]
bytes-utils = "0.1.2"

View File

@ -21,6 +21,7 @@ pub mod compress {
#[pin]
body: InnerBody,
compress_request: CompressionImpl,
is_end_stream: bool,
}
}
@ -30,6 +31,7 @@ pub mod compress {
Self {
body,
compress_request,
is_end_stream: false,
}
}
}
@ -39,6 +41,7 @@ pub mod compress {
pub mod http_body_0_4_x {
use super::CompressedBody;
use crate::http::http_body_0_4_x::CompressRequest;
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_types::body::SdkBody;
use http_0_2::HeaderMap;
use http_body_0_4::{Body, SizeHint};
@ -60,7 +63,10 @@ pub mod compress {
this.compress_request.compress_bytes(&data[..], &mut out)?;
Poll::Ready(Some(Ok(out.into())))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(None) => {
*this.is_end_stream = true;
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
@ -74,11 +80,28 @@ pub mod compress {
}
fn is_end_stream(&self) -> bool {
self.body.is_end_stream()
self.is_end_stream
}
fn size_hint(&self) -> SizeHint {
self.body.size_hint()
// We can't return a hint because we don't know exactly how
// compression will affect the content length
SizeHint::default()
}
}
impl CompressedBody<SdkBody, Box<dyn CompressRequest>> {
/// Consumes this `CompressedBody` and returns an [`SdkBody`] containing the compressed data.
///
/// This *requires* that the inner `SdkBody` is in-memory (i.e. not streaming). Otherwise, an error is returned.
/// If compression fails, an error is returned.
pub fn into_compressed_sdk_body(mut self) -> Result<SdkBody, BoxError> {
let mut compressed_body = Vec::new();
let bytes = self.body.bytes().ok_or_else(|| "`into_compressed_sdk_body` requires that the inner body is 'in-memory', but it was streaming".to_string())?;
self.compress_request
.compress_bytes(bytes, &mut compressed_body)?;
Ok(SdkBody::from(compressed_body))
}
}
}
@ -86,7 +109,7 @@ pub mod compress {
/// Support for the `http-body-1-0` and `http-1-0` crates.
#[cfg(feature = "http-body-1-x")]
pub mod http_body_1_x {
use super::CompressedBody;
use crate::body::compress::CompressedBody;
use crate::http::http_body_1_x::CompressRequest;
use aws_smithy_types::body::SdkBody;
use http_body_1_0::{Body, Frame, SizeHint};
@ -116,16 +139,22 @@ pub mod compress {
unreachable!("Frame is either data or trailers")
}
}
None => {
*this.is_end_stream = true;
None
}
other => other,
})
}
fn is_end_stream(&self) -> bool {
self.body.is_end_stream()
self.is_end_stream
}
fn size_hint(&self) -> SizeHint {
self.body.size_hint()
// We can't return a hint because we don't know exactly how
// compression will affect the content length
SizeHint::default()
}
}
}
@ -134,6 +163,12 @@ pub mod compress {
#[cfg(any(feature = "http-body-0-4-x", feature = "http-body-1-x"))]
#[cfg(test)]
mod test {
use crate::body::compress::CompressedBody;
use crate::{CompressionAlgorithm, CompressionOptions};
use aws_smithy_types::body::SdkBody;
use bytes::Buf;
use bytes_utils::SegmentedBuf;
use std::io::Read;
const UNCOMPRESSED_INPUT: &[u8] = b"hello world";
const COMPRESSED_OUTPUT: &[u8] = &[
31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 203, 72, 205, 201, 201, 87, 40, 207, 47, 202, 73, 1, 0,
@ -141,77 +176,89 @@ mod test {
];
#[cfg(feature = "http-body-0-4-x")]
#[tokio::test]
async fn test_compressed_body_http_body_0_4_x() {
use super::super::{CompressionAlgorithm, CompressionOptions};
use crate::body::compress::CompressedBody;
use aws_smithy_types::body::SdkBody;
use bytes::Buf;
use bytes_utils::SegmentedBuf;
mod http_body_0_4_x {
use super::*;
use http_body_0_4::Body;
use std::io::Read;
let compression_algorithm = CompressionAlgorithm::Gzip;
let compression_options = CompressionOptions::default()
.with_min_compression_size_bytes(0)
.unwrap();
let compress_request =
compression_algorithm.into_impl_http_body_0_4_x(&compression_options);
let body = SdkBody::from(UNCOMPRESSED_INPUT);
let mut compressed_body = CompressedBody::new(body, compress_request);
#[tokio::test]
async fn test_body_is_compressed() {
let compression_options = CompressionOptions::default()
.with_min_compression_size_bytes(0)
.unwrap();
let compress_request =
CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&compression_options);
let body = SdkBody::from(UNCOMPRESSED_INPUT);
let mut compressed_body = CompressedBody::new(body, compress_request);
let mut output = SegmentedBuf::new();
while let Some(buf) = compressed_body.data().await {
output.push(buf.unwrap());
let mut output = SegmentedBuf::new();
while let Some(buf) = compressed_body.data().await {
output.push(buf.unwrap());
}
let mut actual_output = Vec::new();
output
.reader()
.read_to_end(&mut actual_output)
.expect("Doesn't cause IO errors");
// Verify data is compressed as expected
assert_eq!(COMPRESSED_OUTPUT, actual_output);
}
let mut actual_output = Vec::new();
output
.reader()
.read_to_end(&mut actual_output)
.expect("Doesn't cause IO errors");
// Verify data is compressed as expected
assert_eq!(COMPRESSED_OUTPUT, actual_output);
#[tokio::test]
async fn test_into_compressed_sdk_body() {
let compression_options = CompressionOptions::default()
.with_min_compression_size_bytes(0)
.unwrap();
let compress_request =
CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&compression_options);
let body = SdkBody::from(UNCOMPRESSED_INPUT);
let compressed_sdk_body = CompressedBody::new(body, compress_request)
.into_compressed_sdk_body()
.unwrap();
// Verify data is compressed as expected
assert_eq!(
COMPRESSED_OUTPUT,
compressed_sdk_body.bytes().expect("body is in-memory")
);
}
}
#[cfg(feature = "http-body-1-x")]
#[tokio::test]
async fn test_compressed_body_http_body_1_x() {
use super::super::{CompressionAlgorithm, CompressionOptions};
use crate::body::compress::CompressedBody;
use aws_smithy_types::body::SdkBody;
use bytes::Buf;
use bytes_utils::SegmentedBuf;
mod http_body_1_x {
use super::*;
use http_body_util::BodyExt;
use std::io::Read;
let compression_algorithm = CompressionAlgorithm::Gzip;
let compression_options = CompressionOptions::default()
.with_min_compression_size_bytes(0)
.unwrap();
let compress_request = compression_algorithm.into_impl_http_body_1_x(&compression_options);
let body = SdkBody::from(UNCOMPRESSED_INPUT);
let mut compressed_body = CompressedBody::new(body, compress_request);
#[tokio::test]
async fn test_body_is_compressed() {
let compression_options = CompressionOptions::default()
.with_min_compression_size_bytes(0)
.unwrap();
let compress_request =
CompressionAlgorithm::Gzip.into_impl_http_body_1_x(&compression_options);
let body = SdkBody::from(UNCOMPRESSED_INPUT);
let mut compressed_body = CompressedBody::new(body, compress_request);
let mut output = SegmentedBuf::new();
let mut output = SegmentedBuf::new();
loop {
let data = match compressed_body.frame().await {
Some(Ok(frame)) => frame.into_data(),
Some(Err(e)) => panic!("Error: {}", e),
// No more frames, break out of loop
None => break,
loop {
let data = match compressed_body.frame().await {
Some(Ok(frame)) => frame.into_data(),
Some(Err(e)) => panic!("Error: {}", e),
// No more frames, break out of loop
None => break,
}
.expect("frame is OK");
output.push(data);
}
.expect("frame is OK");
output.push(data);
}
let mut actual_output = Vec::new();
output
.reader()
.read_to_end(&mut actual_output)
.expect("Doesn't cause IO errors");
// Verify data is compressed as expected
assert_eq!(COMPRESSED_OUTPUT, actual_output);
let mut actual_output = Vec::new();
output
.reader()
.read_to_end(&mut actual_output)
.expect("Doesn't cause IO errors");
// Verify data is compressed as expected
assert_eq!(COMPRESSED_OUTPUT, actual_output);
}
}
}

View File

@ -1,6 +1,6 @@
[package]
name = "aws-smithy-types"
version = "1.2.6"
version = "1.2.7"
authors = [
"AWS Rust SDK Team <aws-sdk-rust@amazon.com>",
"Russell Cohen <rcoh@amazon.com>",

View File

@ -234,6 +234,11 @@ impl SdkBody {
})
}
/// Return `true` if this SdkBody is streaming, `false` if it is in-memory.
pub fn is_streaming(&self) -> bool {
matches!(self.inner, Inner::Dyn { .. })
}
/// Return the length, in bytes, of this SdkBody. If this returns `None`, then the body does not
/// have a known length.
pub fn content_length(&self) -> Option<u64> {

View File

@ -98,6 +98,7 @@ impl Intercept for RequestCompressionInterceptor {
layer.store_put(RequestCompressionInterceptorState {
options: Some(options),
});
cfg.push_layer(layer);
Ok(())
@ -126,7 +127,8 @@ impl Intercept for RequestCompressionInterceptor {
//
// Because compressing small amounts of data can actually increase its size,
// we check to see if the data is big enough to make compression worthwhile.
if let Some(known_size) = http_body::Body::size_hint(request.body()).exact() {
let size_hint = http_body::Body::size_hint(request.body()).exact();
if let Some(known_size) = size_hint {
if known_size < options.min_compression_size_bytes() as u64 {
tracing::trace!(
min_compression_size_bytes = options.min_compression_size_bytes(),
@ -135,16 +137,15 @@ impl Intercept for RequestCompressionInterceptor {
);
return Ok(());
}
tracing::trace!("compressing non-streaming request body...")
tracing::trace!("compressing sized request body...");
} else {
tracing::trace!("compressing streaming request body...");
tracing::trace!("compressing unsized request body...");
}
wrap_request_body_in_compressed_body(
request,
CompressionAlgorithm::Gzip.into_impl_http_body_0_4_x(&options),
)?;
cfg.interceptor_state()
.store_append::<SmithySdkFeature>(SmithySdkFeature::GzipRequestCompression);
@ -162,11 +163,24 @@ fn wrap_request_body_in_compressed_body(
);
let mut body = {
let body = mem::replace(request.body_mut(), SdkBody::taken());
body.map(move |body| {
let body = CompressedBody::new(body, request_compress_impl.clone());
SdkBody::from_body_0_4(body)
})
if body.is_streaming() {
request.headers_mut().remove(http::header::CONTENT_LENGTH);
body.map(move |body| {
let body = CompressedBody::new(body, request_compress_impl.clone());
SdkBody::from_body_0_4(body)
})
} else {
let body = CompressedBody::new(body, request_compress_impl.clone());
let body = body.into_compressed_sdk_body().map_err(BuildError::other)?;
let content_length = body.content_length().expect("this payload is in-memory");
request
.headers_mut()
.insert(http::header::CONTENT_LENGTH, content_length.to_string());
body
}
};
mem::swap(request.body_mut(), &mut body);

View File

@ -1231,6 +1231,19 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_yaml"
version = "0.9.34+deprecated"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47"
dependencies = [
"indexmap 2.2.6",
"itoa",
"ryu",
"serde",
"unsafe-libyaml",
]
[[package]]
name = "sha2"
version = "0.10.8"
@ -1302,6 +1315,7 @@ dependencies = [
"semver",
"serde",
"serde_json",
"serde_yaml",
"thiserror",
"tokio",
"toml 0.5.11",
@ -1674,6 +1688,12 @@ version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85"
[[package]]
name = "unsafe-libyaml"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861"
[[package]]
name = "url"
version = "2.5.0"