Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@ private void ProcessStreamingPart(
// If ReleaseBufferSpace() throws, we no longer own the data source, so we won't dispose it
streamingDataSource = null;

// Release capacity immediately since we're not holding anything in memory
_partBufferManager.ReleaseBufferSpace();
Copy link
Contributor Author

@GarrettBeatty GarrettBeatty Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was being released here and then also when the reader read the part later on. so we remove this to fix the double release.

Side note:
The original reason i had this hear was because my intent was not count streaming to the user as "holding the part in memory". But for simplicitys sake and this bug fix, we now consider a part that gets directly streamed to the user as "taking up memory". in the future we could probably optimize/change this if we want.


_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] StreamingDataSource added and capacity released",
partNumber);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was being released here and then also when the reader read the part later on. so we remove this to fix the double release.

Side note:
The original reason i had this hear was because my intent was not count streaming to the user as "holding the part in memory". But for simplicity's takes and this bug fix we now consider a part that gets directly streamed to the user as "taking up memory". in the future we could probably optimize/change this if we want.

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ public MultipartDownloadManager(IAmazonS3 s3Client, BaseDownloadRequest request,
}
else
{
_httpConcurrencySlots = new SemaphoreSlim(_config.ConcurrentServiceRequests);
_httpConcurrencySlots = new SemaphoreSlim(
_config.ConcurrentServiceRequests, // initialCount
_config.ConcurrentServiceRequests // maxCount - prevents exceeding configured limit
);
_ownsHttpThrottler = true; // We own it, so we dispose it
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ public PartBufferManager(BufferedDownloadConfiguration config)
throw new ArgumentNullException(nameof(config));

_partDataSources = new ConcurrentDictionary<int, IPartDataSource>();
_bufferSpaceAvailable = new SemaphoreSlim(config.MaxInMemoryParts);
_bufferSpaceAvailable = new SemaphoreSlim(
config.MaxInMemoryParts, // initialCount
config.MaxInMemoryParts // maxCount - prevents exceeding configured limit
);
_partAvailable = new AutoResetEvent(false);

Logger.DebugFormat("PartBufferManager initialized with MaxInMemoryParts={0}", config.MaxInMemoryParts);
Expand Down
144 changes: 136 additions & 8 deletions sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,6 @@ public async Task ProcessPartAsync_InOrderVsOutOfOrder_VerifyStreamingVsBufferin
// Part 3 should use buffering path (ArrayPool allocation)
Assert.AreEqual(1, bufferedPartNumbers.Count, "Expected exactly 1 part to be buffered");
Assert.AreEqual(3, bufferedPartNumbers[0], "Part 3 should be buffered");

// Verify ReleaseBufferSpace was called for streaming path (immediate capacity release)
mockBufferManager.Verify(m => m.ReleaseBufferSpace(), Times.Once,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isnt true anymore since we hold it until the reader consumes it which is why i removed it

"Streaming path should release capacity immediately");
}
finally
{
Expand Down Expand Up @@ -525,10 +521,6 @@ public async Task ProcessPartAsync_AllInOrderParts_NoBufferingAllStreaming()
Assert.AreEqual(i + 1, streamingPartNumbers[i],
$"Part {i + 1} should have streamed in order");
}

// Verify capacity was released 5 times (once per streaming part)
mockBufferManager.Verify(m => m.ReleaseBufferSpace(), Times.Exactly(5),
"Capacity should be released immediately for each streaming part");
}
finally
{
Expand Down Expand Up @@ -766,6 +758,142 @@ public void Dispose_MultipleCalls_IsIdempotent()

#endregion

#region Semaphore Double Release Fix Tests

[TestMethod]
public async Task ProcessPartAsync_StreamingPart_ReleasesCapacityOnlyOnce()
{
// This test verifies the fix for the double release bug in BufferedPartDataHandler.
// Before the fix: ProcessStreamingPart() called ReleaseBufferSpace() immediately after
// adding the StreamingDataSource, causing capacity to be released twice (once immediately,
// once later when the consumer finished reading the part).
// After the fix: The immediate ReleaseBufferSpace() call was removed. Capacity is released
// only once when the consumer finishes reading the part through PartBufferManager.

// Arrange
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var mockBufferManager = new Mock<IPartBufferManager>();
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);

var releaseCount = 0;
mockBufferManager.Setup(m => m.ReleaseBufferSpace())
.Callback(() => releaseCount++);

mockBufferManager.Setup(m => m.AddBuffer(It.IsAny<IPartDataSource>()));

var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);

try
{
var response = CreateMockGetObjectResponse(512);

// Act - Process an in-order (streaming) part
await handler.ProcessPartAsync(1, response, CancellationToken.None);

// Assert - ReleaseBufferSpace should NOT have been called during ProcessPartAsync
// (The removed code that called it immediately has been deleted)
// Capacity will be released later by PartBufferManager when consumer finishes reading
Assert.AreEqual(0, releaseCount,
"ProcessPartAsync should not release capacity for streaming parts. " +
"Capacity is released by PartBufferManager when consumer completes reading.");

// Verify AddBuffer was called with StreamingDataSource (streaming path taken)
mockBufferManager.Verify(m => m.AddBuffer(
It.Is<IPartDataSource>(ds => ds is StreamingDataSource)), Times.Once);
}
finally
{
handler.Dispose();
}
}

[TestMethod]
public async Task ProcessPartAsync_BufferedPart_DoesNotReleaseCapacityImmediately()
{
// This test verifies that buffered (out-of-order) parts don't release capacity immediately.
// Capacity is released later by PartBufferManager when the consumer finishes reading the part.

// Arrange
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var mockBufferManager = new Mock<IPartBufferManager>();
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);

var releaseCount = 0;
mockBufferManager.Setup(m => m.ReleaseBufferSpace())
.Callback(() => releaseCount++);

mockBufferManager.Setup(m => m.AddBuffer(It.IsAny<StreamPartBuffer>()));

var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);

try
{
var response = CreateMockGetObjectResponse(512);

// Act - Process an out-of-order (buffered) part
await handler.ProcessPartAsync(3, response, CancellationToken.None);

// Assert - ReleaseBufferSpace should NOT have been called
// Capacity will be released later by PartBufferManager when consumer finishes reading
Assert.AreEqual(0, releaseCount,
"ProcessPartAsync should not release capacity for buffered parts. " +
"Capacity is released by PartBufferManager when consumer completes reading.");

// Verify AddBuffer was called with StreamPartBuffer (buffering path taken)
mockBufferManager.Verify(m => m.AddBuffer(
It.IsAny<StreamPartBuffer>()), Times.Once);
}
finally
{
handler.Dispose();
}
}

[TestMethod]
public async Task ProcessPartAsync_StreamingPartError_DoesNotDoubleRelease()
{
// This test verifies that when an error occurs during streaming part processing,
// capacity is released correctly through ReleaseCapacity() without double-releasing.

// Arrange
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var mockBufferManager = new Mock<IPartBufferManager>();
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);

var releaseCount = 0;
mockBufferManager.Setup(m => m.ReleaseBufferSpace())
.Callback(() => releaseCount++);

// Simulate error when adding buffer
mockBufferManager.Setup(m => m.AddBuffer(It.IsAny<IPartDataSource>()))
.Throws(new InvalidOperationException("Test error"));

var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);

try
{
var response = CreateMockGetObjectResponse(512);

// Act & Assert - Should throw
await Assert.ThrowsExceptionAsync<InvalidOperationException>(async () =>
{
await handler.ProcessPartAsync(1, response, CancellationToken.None);
});

// Verify ReleaseBufferSpace was NOT called during error handling
// (The old double-release bug would have called it, causing issues)
Assert.AreEqual(0, releaseCount,
"Error handling should not release capacity for streaming parts. " +
"Streaming parts don't hold capacity slots in BufferedPartDataHandler.");
}
finally
{
handler.Dispose();
}
}

#endregion

#region Helper Methods

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,25 +601,19 @@ public async Task DiscoverUsingRangeStrategy_CalculatesPartCount()
public async Task StartDownloadsAsync_SinglePart_ReturnsImmediately()
{
// Arrange
var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client();
var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had to update all of these tests to call DiscoverDownloadStrategyAsync/reservere buffer space before doing any action that would release buffer space. otherwise it was getting exceeded semaphore maximum error (which is expected now)

var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(
(req, ct) => Task.FromResult(mockResponse));
var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest();
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object);

var discoveryResult = new DownloadDiscoveryResult
{
TotalParts = 1,
ObjectSize = 1024,
InitialResponse = new GetObjectResponse()
};

var mockBufferManager = new Mock<IPartBufferManager>();

// Act
// Act - Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore
var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None);

// Assert - should complete without any downloads
mockClient.Verify(x => x.GetObjectAsync(It.IsAny<GetObjectRequest>(), It.IsAny<CancellationToken>()), Times.Never);
// Assert - should complete without any additional downloads (discovery already made the call)
mockClient.Verify(x => x.GetObjectAsync(It.IsAny<GetObjectRequest>(), It.IsAny<CancellationToken>()), Times.Once);
}

[TestMethod]
Expand Down Expand Up @@ -1230,22 +1224,35 @@ public async Task StartDownloadsAsync_EarlyError_DisposesCancellationTokenSource
// Arrange - Test CancellationTokenSource disposal when error occurs before background task starts
var mockDataHandler = new Mock<IPartDataHandler>();

// WaitForCapacityAsync succeeds (needed for discovery)
mockDataHandler
.Setup(x => x.WaitForCapacityAsync(It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);

// ProcessPartAsync succeeds for Part 1 (discovery)
mockDataHandler
.Setup(x => x.ProcessPartAsync(1, It.IsAny<GetObjectResponse>(), It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);

// Simulate error during PrepareAsync (before background task is created)
mockDataHandler
.Setup(x => x.PrepareAsync(It.IsAny<DownloadDiscoveryResult>(), It.IsAny<CancellationToken>()))
.ThrowsAsync(new InvalidOperationException("Simulated prepare failure"));

var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client();
var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest();
var totalParts = 2;
var partSize = 8 * 1024 * 1024;
var totalObjectSize = totalParts * partSize;

var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart(
totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true);

var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(
downloadType: MultipartDownloadType.PART);
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object);

var discoveryResult = new DownloadDiscoveryResult
{
TotalParts = 2,
ObjectSize = 16 * 1024 * 1024,
InitialResponse = new GetObjectResponse()
};
// Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore
var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);

// Act & Assert
try
Expand Down Expand Up @@ -1599,26 +1606,25 @@ public async Task StartDownloadsAsync_PassesCancellationTokenToBufferManager()
public async Task StartDownloadsAsync_SinglePart_DoesNotThrowOnCancellation()
{
// Arrange - Single part download should return immediately without using cancellation token
var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client();
var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024);
var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(
(req, ct) => Task.FromResult(mockResponse));

var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest();
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object);

var discoveryResult = new DownloadDiscoveryResult
{
TotalParts = 1,
ObjectSize = 1024,
InitialResponse = new GetObjectResponse()
};
// Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore
var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);

var cts = new CancellationTokenSource();
cts.Cancel();

// Act - should complete without throwing even though token is cancelled
await coordinator.StartDownloadsAsync(discoveryResult, null, cts.Token);

// Assert - no exception thrown, no S3 calls made
mockClient.Verify(x => x.GetObjectAsync(It.IsAny<GetObjectRequest>(), It.IsAny<CancellationToken>()), Times.Never);
// Assert - discovery already made the S3 call, StartDownloadsAsync doesn't make additional calls for single-part
mockClient.Verify(x => x.GetObjectAsync(It.IsAny<GetObjectRequest>(), It.IsAny<CancellationToken>()), Times.Once);
}

[TestMethod]
Expand Down Expand Up @@ -1831,19 +1837,17 @@ public async Task StartDownloadsAsync_ReturnsImmediately_PreventsDeadlock()
public async Task StartDownloadsAsync_SinglePart_ReturnsImmediatelyWithoutBackgroundTask()
{
// Arrange - Single-part downloads should not create background tasks
var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client();
var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024);
var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(
(req, ct) => Task.FromResult(mockResponse));
var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest();
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();

var mockDataHandler = CreateMockDataHandler();
var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object);

var discoveryResult = new DownloadDiscoveryResult
{
TotalParts = 1,
ObjectSize = 1024,
InitialResponse = new GetObjectResponse()
};
// Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore
var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);

// Act
var stopwatch = System.Diagnostics.Stopwatch.StartNew();
Expand Down
Loading