Skip to content

Commit 52b64a0

Browse files
committed
fix reviews on awschunking and default behavior
unit test for chunking stream
1 parent 74517bf commit 52b64a0

File tree

6 files changed

+191
-43
lines changed

6 files changed

+191
-43
lines changed

src/aws-cpp-sdk-core/include/smithy/client/AwsSmithyClientBase.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,11 @@ namespace client
100100
m_serviceUserAgentName(std::move(serviceUserAgentName)),
101101
m_httpClient(std::move(httpClient)),
102102
m_errorMarshaller(std::move(errorMarshaller)),
103-
m_interceptors{Aws::MakeShared<ChecksumInterceptor>("AwsSmithyClientBase", *m_clientConfig)}
103+
m_interceptors{
104+
Aws::MakeShared<ChecksumInterceptor>("AwsSmithyClientBase", *m_clientConfig),
105+
Aws::MakeShared<features::ChunkingInterceptor>("AwsSmithyClientBase", *m_clientConfig)
106+
}
104107
{
105-
if (m_clientConfig->httpClientChunkedMode == Aws::Client::HttpClientChunkedMode::DEFAULT) {
106-
m_interceptors.emplace_back(Aws::MakeShared<features::ChunkingInterceptor>("AwsSmithyClientBase", *m_clientConfig));
107-
}
108108
baseInit();
109109
}
110110

src/aws-cpp-sdk-core/include/smithy/client/features/ChunkingInterceptor.h

Lines changed: 88 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77

88
#include <aws/core/Core_EXPORTS.h>
99
#include <aws/core/client/ClientConfiguration.h>
10-
#include <aws/core/utils/stream/AwsChunkedStream.h>
10+
#include <aws/core/utils/HashingUtils.h>
11+
#include <aws/core/utils/StringUtils.h>
12+
#include <aws/core/utils/Array.h>
1113
#include <smithy/interceptor/Interceptor.h>
1214
#include <smithy/client/common/AwsSmithyClientUtils.h>
1315

@@ -17,50 +19,107 @@ namespace features {
1719

1820
namespace {
1921

20-
// String constants
2122
static const char* const CHECKSUM_HEADER_PREFIX = "x-amz-checksum-";
2223
static const char* const ALLOCATION_TAG = "ChunkingInterceptor";
24+
static const size_t DATA_BUFFER_SIZE = 65536;
2325

24-
class AwsChunkedStreamBuf : public std::streambuf {
26+
} // anonymous namespace
27+
28+
class AwsChunkedStreamWrapper {
2529
public:
26-
AwsChunkedStreamBuf(Aws::Http::HttpRequest* request, const std::shared_ptr<Aws::IOStream>& originalBody)
27-
: m_chunkStream(request, originalBody) {
28-
setg(nullptr, nullptr, nullptr);
29-
}
30+
AwsChunkedStreamWrapper(Aws::Http::HttpRequest* request, const std::shared_ptr<Aws::IOStream>& originalBody, size_t bufferSize = DATA_BUFFER_SIZE)
31+
: m_streambuf(request, originalBody, bufferSize), m_iostream(&m_streambuf) {}
32+
33+
Aws::IOStream* GetIOStream() { return &m_iostream; }
34+
35+
private:
36+
class AwsChunkedStreamBuf : public std::streambuf {
37+
public:
38+
AwsChunkedStreamBuf(Aws::Http::HttpRequest* request, const std::shared_ptr<Aws::IOStream>& originalBody, size_t bufferSize)
39+
: m_request(request), m_stream(originalBody), m_data(bufferSize), m_bufferSize(bufferSize),
40+
m_chunkingStream(Aws::MakeShared<Aws::StringStream>(ALLOCATION_TAG)) {
41+
setg(nullptr, nullptr, nullptr);
42+
}
43+
44+
protected:
45+
int_type underflow() override {
46+
if (gptr() < egptr()) {
47+
return traits_type::to_int_type(*gptr());
48+
}
3049

31-
protected:
32-
int_type underflow() override {
33-
if (gptr() < egptr()) {
50+
if (m_stream->good()) {
51+
m_stream->read(m_data.GetUnderlyingData(), m_bufferSize);
52+
size_t bytesRead = static_cast<size_t>(m_stream->gcount());
53+
writeChunk(bytesRead);
54+
55+
if ((m_stream->peek() == EOF || m_stream->eof()) && !m_stream->bad()) {
56+
writeTrailerToUnderlyingStream();
57+
}
58+
}
59+
60+
if ((m_chunkingStream->peek() == EOF || m_chunkingStream->eof()) && !m_chunkingStream->bad()) {
61+
return traits_type::eof();
62+
}
63+
64+
m_chunkingStream->read(m_buffer, sizeof(m_buffer));
65+
size_t bytesRead = static_cast<size_t>(m_chunkingStream->gcount());
66+
if (bytesRead == 0) {
67+
return traits_type::eof();
68+
}
69+
70+
setg(m_buffer, m_buffer, m_buffer + bytesRead);
3471
return traits_type::to_int_type(*gptr());
3572
}
36-
37-
size_t bytesRead = m_chunkStream.BufferedRead(m_buffer, sizeof(m_buffer));
38-
if (bytesRead == 0) {
39-
return traits_type::eof();
73+
74+
private:
75+
void writeTrailerToUnderlyingStream() {
76+
Aws::StringStream chunkedTrailerStream;
77+
chunkedTrailerStream << "0\r\n";
78+
if (m_request->GetRequestHash().second != nullptr) {
79+
chunkedTrailerStream << "x-amz-checksum-" << m_request->GetRequestHash().first << ":"
80+
<< Aws::Utils::HashingUtils::Base64Encode(m_request->GetRequestHash().second->GetHash().GetResult()) << "\r\n";
4081
}
41-
42-
setg(m_buffer, m_buffer, m_buffer + bytesRead);
43-
return traits_type::to_int_type(*gptr());
82+
chunkedTrailerStream << "\r\n";
83+
const auto chunkedTrailer = chunkedTrailerStream.str();
84+
if (m_chunkingStream->eof()) {
85+
m_chunkingStream->clear();
86+
}
87+
*m_chunkingStream << chunkedTrailer;
4488
}
45-
46-
std::streamsize showmanyc() override {
47-
return egptr() - gptr();
89+
90+
void writeChunk(size_t bytesRead) {
91+
if (m_request->GetRequestHash().second != nullptr) {
92+
m_request->GetRequestHash().second->Update(reinterpret_cast<unsigned char*>(m_data.GetUnderlyingData()), bytesRead);
93+
}
94+
95+
if (bytesRead > 0 && m_chunkingStream != nullptr && !m_chunkingStream->bad()) {
96+
if (m_chunkingStream->eof()) {
97+
m_chunkingStream->clear();
98+
}
99+
*m_chunkingStream << Aws::Utils::StringUtils::ToHexString(bytesRead) << "\r\n";
100+
m_chunkingStream->write(m_data.GetUnderlyingData(), bytesRead);
101+
*m_chunkingStream << "\r\n";
102+
}
48103
}
49104

50-
private:
51-
Aws::Utils::Stream::AwsChunkedStream<> m_chunkStream;
52-
char m_buffer[8192];
105+
Aws::Http::HttpRequest* m_request;
106+
std::shared_ptr<Aws::IOStream> m_stream;
107+
Aws::Utils::Array<char> m_data;
108+
size_t m_bufferSize;
109+
std::shared_ptr<Aws::IOStream> m_chunkingStream;
110+
char m_buffer[8192];
111+
};
112+
113+
AwsChunkedStreamBuf m_streambuf;
114+
Aws::IOStream m_iostream;
53115
};
54116

55-
} // anonymous namespace
56-
57117
/**
58118
* Interceptor that handles chunked encoding for streaming requests with checksums.
59119
* Wraps request body with chunked stream and sets appropriate headers.
60120
*/
61121
class ChunkingInterceptor : public smithy::interceptor::Interceptor {
62122
public:
63-
ChunkingInterceptor() : m_httpClientChunkedMode(Aws::Client::HttpClientChunkedMode::DEFAULT) {}
64123
explicit ChunkingInterceptor(const Aws::Client::ClientConfiguration& config)
65124
: m_httpClientChunkedMode(config.httpClientChunkedMode) {}
66125
~ChunkingInterceptor() override = default;
@@ -100,11 +159,11 @@ class ChunkingInterceptor : public smithy::interceptor::Interceptor {
100159
}
101160
}
102161

103-
auto chunkedBuf = Aws::MakeUnique<AwsChunkedStreamBuf>(
162+
auto wrapper = Aws::MakeShared<AwsChunkedStreamWrapper>(
104163
ALLOCATION_TAG, request.get(), originalBody);
105164
auto chunkedBody = std::shared_ptr<Aws::IOStream>(
106-
new Aws::IOStream(chunkedBuf.release()));
107-
165+
wrapper, wrapper->GetIOStream());
166+
108167
request->AddContentBody(chunkedBody);
109168
return request;
110169
}

src/aws-cpp-sdk-core/source/client/AWSClient.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ AWSClient::AWSClient(const Aws::Client::ClientConfiguration& configuration,
140140
m_enableClockSkewAdjustment(configuration.enableClockSkewAdjustment),
141141
m_requestCompressionConfig(configuration.requestCompressionConfig),
142142
m_userAgentInterceptor{Aws::MakeShared<smithy::client::UserAgentInterceptor>(AWS_CLIENT_LOG_TAG, configuration, m_retryStrategy->GetStrategyName(), m_serviceName)},
143-
m_interceptors{Aws::MakeShared<smithy::client::ChecksumInterceptor>(AWS_CLIENT_LOG_TAG), Aws::MakeShared<smithy::client::features::ChunkingInterceptor>(AWS_CLIENT_LOG_TAG), m_userAgentInterceptor}
143+
m_interceptors{Aws::MakeShared<smithy::client::ChecksumInterceptor>(AWS_CLIENT_LOG_TAG), Aws::MakeShared<smithy::client::features::ChunkingInterceptor>(AWS_CLIENT_LOG_TAG, configuration), m_userAgentInterceptor}
144144
{
145145
}
146146

src/aws-cpp-sdk-core/source/client/ClientConfiguration.cpp

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -220,15 +220,9 @@ void setLegacyClientConfigurationParameters(ClientConfiguration& clientConfig)
220220
clientConfig.writeRateLimiter = nullptr;
221221
clientConfig.readRateLimiter = nullptr;
222222
clientConfig.httpLibOverride = Aws::Http::TransferLibType::DEFAULT_CLIENT;
223-
224-
// Set chunking mode based on HTTP client type
225-
// AWS built-in clients should use SDK's ChunkingInterceptor (DEFAULT mode)
226-
// Custom clients should handle chunking themselves (CLIENT_IMPLEMENTATION mode)
227-
if (clientConfig.httpLibOverride == Aws::Http::TransferLibType::DEFAULT_CLIENT) {
228-
clientConfig.httpClientChunkedMode = HttpClientChunkedMode::CLIENT_IMPLEMENTATION;
229-
} else {
230-
clientConfig.httpClientChunkedMode = HttpClientChunkedMode::DEFAULT;
231-
}
223+
224+
// Users can explicitly set CLIENT_IMPLEMENTATION if their custom client handles chunking
225+
clientConfig.httpClientChunkedMode = HttpClientChunkedMode::DEFAULT;
232226

233227
clientConfig.followRedirects = FollowRedirectsPolicy::DEFAULT;
234228
clientConfig.disableExpectHeader = false;

src/aws-cpp-sdk-core/source/http/curl/CurlHttpClient.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,14 @@ static size_t SeekBody(void* userdata, curl_off_t offset, int origin)
370370
return CURL_SEEKFUNC_FAIL;
371371
}
372372

373+
// Fail seek for aws-chunk encoded body as the length and offset is unknown
374+
if (context->m_request &&
375+
context->m_request->HasHeader(Aws::Http::CONTENT_ENCODING_HEADER) &&
376+
context->m_request->GetHeaderValue(Aws::Http::CONTENT_ENCODING_HEADER).find(Aws::Http::AWS_CHUNKED_VALUE) != Aws::String::npos)
377+
{
378+
return CURL_SEEKFUNC_FAIL;
379+
}
380+
373381

374382
HttpRequest* request = context->m_request;
375383
const std::shared_ptr<Aws::IOStream>& ioStream = request->GetContentBody();
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
#include <aws/core/http/standard/StandardHttpRequest.h>
6+
#include <aws/core/utils/crypto/CRC32.h>
7+
#include <smithy/client/features/ChunkingInterceptor.h>
8+
#include <aws/testing/AwsCppSdkGTestSuite.h>
9+
10+
using namespace Aws;
11+
using namespace Aws::Http::Standard;
12+
using namespace smithy::client::features;
13+
using namespace Aws::Utils::Crypto;
14+
15+
class ChunkingInterceptorTest : public Aws::Testing::AwsCppSdkGTestSuite {};
16+
17+
const char* CHUNKING_TEST_LOG_TAG = "CHUNKING_INTERCEPTOR_TEST";
18+
19+
// Helper function to read all data from a stream
20+
std::string ReadAllFromStream(std::shared_ptr<Aws::IOStream> stream) {
21+
std::stringstream result;
22+
char buffer[1024];
23+
while (stream->good() && !stream->eof()) {
24+
stream->read(buffer, sizeof(buffer));
25+
auto bytesRead = stream->gcount();
26+
if (bytesRead > 0) {
27+
result.write(buffer, bytesRead);
28+
}
29+
}
30+
return result.str();
31+
}
32+
33+
TEST_F(ChunkingInterceptorTest, ChunkedStreamShouldWork) {
34+
StandardHttpRequest request{"www.elda.com/will", Http::HttpMethod::HTTP_GET};
35+
auto requestHash = Aws::MakeShared<CRC32>(CHUNKING_TEST_LOG_TAG);
36+
request.SetRequestHash("crc32", requestHash);
37+
std::shared_ptr<Aws::IOStream> inputStream = Aws::MakeShared<Aws::StringStream>(CHUNKING_TEST_LOG_TAG, "1234567890123456789012345");
38+
39+
AwsChunkedStreamWrapper wrapper{&request, inputStream, 10};
40+
auto chunkedStream = wrapper.GetIOStream();
41+
42+
std::string output = ReadAllFromStream(std::shared_ptr<Aws::IOStream>(chunkedStream, [](Aws::IOStream*){}));
43+
auto expectedStreamWithChecksum = "A\r\n1234567890\r\nA\r\n1234567890\r\n5\r\n12345\r\n0\r\nx-amz-checksum-crc32:78DeVw==\r\n\r\n";
44+
EXPECT_EQ(expectedStreamWithChecksum, output);
45+
}
46+
47+
TEST_F(ChunkingInterceptorTest, ShouldNotRequireTwoReadsOnSmallChunk) {
48+
StandardHttpRequest request{"www.clemar.com/strohl", Http::HttpMethod::HTTP_GET};
49+
auto requestHash = Aws::MakeShared<CRC32>(CHUNKING_TEST_LOG_TAG);
50+
request.SetRequestHash("crc32", requestHash);
51+
std::shared_ptr<Aws::IOStream> inputStream = Aws::MakeShared<Aws::StringStream>(CHUNKING_TEST_LOG_TAG, "12345");
52+
53+
AwsChunkedStreamWrapper wrapper{&request, inputStream, 100};
54+
auto chunkedStream = wrapper.GetIOStream();
55+
56+
std::string output = ReadAllFromStream(std::shared_ptr<Aws::IOStream>(chunkedStream, [](Aws::IOStream*){}));
57+
auto expectedStreamWithChecksum = "5\r\n12345\r\n0\r\nx-amz-checksum-crc32:y/U6HA==\r\n\r\n";
58+
EXPECT_EQ(expectedStreamWithChecksum, output);
59+
}
60+
61+
TEST_F(ChunkingInterceptorTest, ShouldWorkOnSmallBuffer) {
62+
StandardHttpRequest request{"www.eugief.com/hesimay", Http::HttpMethod::HTTP_GET};
63+
auto requestHash = Aws::MakeShared<CRC32>(CHUNKING_TEST_LOG_TAG);
64+
request.SetRequestHash("crc32", requestHash);
65+
std::shared_ptr<Aws::IOStream> inputStream = Aws::MakeShared<Aws::StringStream>(CHUNKING_TEST_LOG_TAG, "1234567890");
66+
67+
AwsChunkedStreamWrapper wrapper{&request, inputStream, 5};
68+
auto chunkedStream = wrapper.GetIOStream();
69+
70+
std::string output = ReadAllFromStream(std::shared_ptr<Aws::IOStream>(chunkedStream, [](Aws::IOStream*){}));
71+
auto expectedStreamWithChecksum = "5\r\n12345\r\n5\r\n67890\r\n0\r\nx-amz-checksum-crc32:Jh2u5Q==\r\n\r\n";
72+
EXPECT_EQ(expectedStreamWithChecksum, output);
73+
}
74+
75+
TEST_F(ChunkingInterceptorTest, ShouldWorkOnEmptyStream) {
76+
StandardHttpRequest request{"www.nidia.com/juna", Http::HttpMethod::HTTP_GET};
77+
auto requestHash = Aws::MakeShared<CRC32>(CHUNKING_TEST_LOG_TAG);
78+
request.SetRequestHash("crc32", requestHash);
79+
std::shared_ptr<Aws::IOStream> inputStream = Aws::MakeShared<Aws::StringStream>(CHUNKING_TEST_LOG_TAG, "");
80+
81+
AwsChunkedStreamWrapper wrapper{&request, inputStream, 5};
82+
auto chunkedStream = wrapper.GetIOStream();
83+
84+
std::string output = ReadAllFromStream(std::shared_ptr<Aws::IOStream>(chunkedStream, [](Aws::IOStream*){}));
85+
auto expectedStreamWithChecksum = "0\r\nx-amz-checksum-crc32:AAAAAA==\r\n\r\n";
86+
EXPECT_EQ(expectedStreamWithChecksum, output);
87+
}

0 commit comments

Comments
 (0)