Skip to content

Commit 58d4444

Browse files
committed
use max size semaphore
stack-info: PR: #4220, branch: GarrettBeatty/gcbeatty/taskoptimization/3
1 parent 2643108 commit 58d4444

File tree

6 files changed

+424
-50
lines changed

6 files changed

+424
-50
lines changed

sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,6 @@ private void ProcessStreamingPart(
153153
// If ReleaseBufferSpace() throws, we no longer own the data source, so we won't dispose it
154154
streamingDataSource = null;
155155

156-
// Release capacity immediately since we're not holding anything in memory
157-
_partBufferManager.ReleaseBufferSpace();
158-
159156
_logger.DebugFormat("BufferedPartDataHandler: [Part {0}] StreamingDataSource added and capacity released",
160157
partNumber);
161158
}

sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,10 @@ public MultipartDownloadManager(IAmazonS3 s3Client, BaseDownloadRequest request,
159159
}
160160
else
161161
{
162-
_httpConcurrencySlots = new SemaphoreSlim(_config.ConcurrentServiceRequests);
162+
_httpConcurrencySlots = new SemaphoreSlim(
163+
_config.ConcurrentServiceRequests, // initialCount
164+
_config.ConcurrentServiceRequests // maxCount - prevents exceeding configured limit
165+
);
163166
_ownsHttpThrottler = true; // We own it, so we dispose it
164167
}
165168
}

sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,10 @@ public PartBufferManager(BufferedDownloadConfiguration config)
199199
throw new ArgumentNullException(nameof(config));
200200

201201
_partDataSources = new ConcurrentDictionary<int, IPartDataSource>();
202-
_bufferSpaceAvailable = new SemaphoreSlim(config.MaxInMemoryParts);
202+
_bufferSpaceAvailable = new SemaphoreSlim(
203+
config.MaxInMemoryParts, // initialCount
204+
config.MaxInMemoryParts // maxCount - prevents exceeding configured limit
205+
);
203206
_partAvailable = new AutoResetEvent(false);
204207

205208
Logger.DebugFormat("PartBufferManager initialized with MaxInMemoryParts={0}", config.MaxInMemoryParts);

sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs

Lines changed: 136 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -463,10 +463,6 @@ public async Task ProcessPartAsync_InOrderVsOutOfOrder_VerifyStreamingVsBufferin
463463
// Part 3 should use buffering path (ArrayPool allocation)
464464
Assert.AreEqual(1, bufferedPartNumbers.Count, "Expected exactly 1 part to be buffered");
465465
Assert.AreEqual(3, bufferedPartNumbers[0], "Part 3 should be buffered");
466-
467-
// Verify ReleaseBufferSpace was called for streaming path (immediate capacity release)
468-
mockBufferManager.Verify(m => m.ReleaseBufferSpace(), Times.Once,
469-
"Streaming path should release capacity immediately");
470466
}
471467
finally
472468
{
@@ -525,10 +521,6 @@ public async Task ProcessPartAsync_AllInOrderParts_NoBufferingAllStreaming()
525521
Assert.AreEqual(i + 1, streamingPartNumbers[i],
526522
$"Part {i + 1} should have streamed in order");
527523
}
528-
529-
// Verify capacity was released 5 times (once per streaming part)
530-
mockBufferManager.Verify(m => m.ReleaseBufferSpace(), Times.Exactly(5),
531-
"Capacity should be released immediately for each streaming part");
532524
}
533525
finally
534526
{
@@ -766,6 +758,142 @@ public void Dispose_MultipleCalls_IsIdempotent()
766758

767759
#endregion
768760

761+
#region Semaphore Double Release Fix Tests
762+
763+
[TestMethod]
764+
public async Task ProcessPartAsync_StreamingPart_ReleasesCapacityOnlyOnce()
765+
{
766+
// This test verifies the fix for the double release bug in BufferedPartDataHandler.
767+
// Before the fix: ProcessStreamingPart() called ReleaseBufferSpace() immediately after
768+
// adding the StreamingDataSource, causing capacity to be released twice (once immediately,
769+
// once later when the consumer finished reading the part).
770+
// After the fix: The immediate ReleaseBufferSpace() call was removed. Capacity is released
771+
// only once when the consumer finishes reading the part through PartBufferManager.
772+
773+
// Arrange
774+
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
775+
var mockBufferManager = new Mock<IPartBufferManager>();
776+
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);
777+
778+
var releaseCount = 0;
779+
mockBufferManager.Setup(m => m.ReleaseBufferSpace())
780+
.Callback(() => releaseCount++);
781+
782+
mockBufferManager.Setup(m => m.AddBuffer(It.IsAny<IPartDataSource>()));
783+
784+
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
785+
786+
try
787+
{
788+
var response = CreateMockGetObjectResponse(512);
789+
790+
// Act - Process an in-order (streaming) part
791+
await handler.ProcessPartAsync(1, response, CancellationToken.None);
792+
793+
// Assert - ReleaseBufferSpace should NOT have been called during ProcessPartAsync
794+
// (The removed code that called it immediately has been deleted)
795+
// Capacity will be released later by PartBufferManager when consumer finishes reading
796+
Assert.AreEqual(0, releaseCount,
797+
"ProcessPartAsync should not release capacity for streaming parts. " +
798+
"Capacity is released by PartBufferManager when consumer completes reading.");
799+
800+
// Verify AddBuffer was called with StreamingDataSource (streaming path taken)
801+
mockBufferManager.Verify(m => m.AddBuffer(
802+
It.Is<IPartDataSource>(ds => ds is StreamingDataSource)), Times.Once);
803+
}
804+
finally
805+
{
806+
handler.Dispose();
807+
}
808+
}
809+
810+
[TestMethod]
811+
public async Task ProcessPartAsync_BufferedPart_DoesNotReleaseCapacityImmediately()
812+
{
813+
// This test verifies that buffered (out-of-order) parts don't release capacity immediately.
814+
// Capacity is released later by PartBufferManager when the consumer finishes reading the part.
815+
816+
// Arrange
817+
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
818+
var mockBufferManager = new Mock<IPartBufferManager>();
819+
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);
820+
821+
var releaseCount = 0;
822+
mockBufferManager.Setup(m => m.ReleaseBufferSpace())
823+
.Callback(() => releaseCount++);
824+
825+
mockBufferManager.Setup(m => m.AddBuffer(It.IsAny<StreamPartBuffer>()));
826+
827+
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
828+
829+
try
830+
{
831+
var response = CreateMockGetObjectResponse(512);
832+
833+
// Act - Process an out-of-order (buffered) part
834+
await handler.ProcessPartAsync(3, response, CancellationToken.None);
835+
836+
// Assert - ReleaseBufferSpace should NOT have been called
837+
// Capacity will be released later by PartBufferManager when consumer finishes reading
838+
Assert.AreEqual(0, releaseCount,
839+
"ProcessPartAsync should not release capacity for buffered parts. " +
840+
"Capacity is released by PartBufferManager when consumer completes reading.");
841+
842+
// Verify AddBuffer was called with StreamPartBuffer (buffering path taken)
843+
mockBufferManager.Verify(m => m.AddBuffer(
844+
It.IsAny<StreamPartBuffer>()), Times.Once);
845+
}
846+
finally
847+
{
848+
handler.Dispose();
849+
}
850+
}
851+
852+
[TestMethod]
853+
public async Task ProcessPartAsync_StreamingPartError_DoesNotDoubleRelease()
854+
{
855+
// This test verifies that when an error occurs during streaming part processing,
856+
// capacity is released correctly through ReleaseCapacity() without double-releasing.
857+
858+
// Arrange
859+
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
860+
var mockBufferManager = new Mock<IPartBufferManager>();
861+
mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1);
862+
863+
var releaseCount = 0;
864+
mockBufferManager.Setup(m => m.ReleaseBufferSpace())
865+
.Callback(() => releaseCount++);
866+
867+
// Simulate error when adding buffer
868+
mockBufferManager.Setup(m => m.AddBuffer(It.IsAny<IPartDataSource>()))
869+
.Throws(new InvalidOperationException("Test error"));
870+
871+
var handler = new BufferedPartDataHandler(mockBufferManager.Object, config);
872+
873+
try
874+
{
875+
var response = CreateMockGetObjectResponse(512);
876+
877+
// Act & Assert - Should throw
878+
await Assert.ThrowsExceptionAsync<InvalidOperationException>(async () =>
879+
{
880+
await handler.ProcessPartAsync(1, response, CancellationToken.None);
881+
});
882+
883+
// Verify ReleaseBufferSpace was NOT called during error handling
884+
// (The old double-release bug would have called it, causing issues)
885+
Assert.AreEqual(0, releaseCount,
886+
"Error handling should not release capacity for streaming parts. " +
887+
"Streaming parts don't hold capacity slots in BufferedPartDataHandler.");
888+
}
889+
finally
890+
{
891+
handler.Dispose();
892+
}
893+
}
894+
895+
#endregion
896+
769897
#region Helper Methods
770898

771899
/// <summary>

sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs

Lines changed: 41 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -601,25 +601,19 @@ public async Task DiscoverUsingRangeStrategy_CalculatesPartCount()
601601
public async Task StartDownloadsAsync_SinglePart_ReturnsImmediately()
602602
{
603603
// Arrange
604-
var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client();
604+
var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024);
605+
var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(
606+
(req, ct) => Task.FromResult(mockResponse));
605607
var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest();
606608
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
607609
var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object);
608610

609-
var discoveryResult = new DownloadDiscoveryResult
610-
{
611-
TotalParts = 1,
612-
ObjectSize = 1024,
613-
InitialResponse = new GetObjectResponse()
614-
};
615-
616-
var mockBufferManager = new Mock<IPartBufferManager>();
617-
618-
// Act
611+
// Act - Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore
612+
var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
619613
await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None);
620614

621-
// Assert - should complete without any downloads
622-
mockClient.Verify(x => x.GetObjectAsync(It.IsAny<GetObjectRequest>(), It.IsAny<CancellationToken>()), Times.Never);
615+
// Assert - should complete without any additional downloads (discovery already made the call)
616+
mockClient.Verify(x => x.GetObjectAsync(It.IsAny<GetObjectRequest>(), It.IsAny<CancellationToken>()), Times.Once);
623617
}
624618

625619
[TestMethod]
@@ -1230,22 +1224,35 @@ public async Task StartDownloadsAsync_EarlyError_DisposesCancellationTokenSource
12301224
// Arrange - Test CancellationTokenSource disposal when error occurs before background task starts
12311225
var mockDataHandler = new Mock<IPartDataHandler>();
12321226

1227+
// WaitForCapacityAsync succeeds (needed for discovery)
1228+
mockDataHandler
1229+
.Setup(x => x.WaitForCapacityAsync(It.IsAny<CancellationToken>()))
1230+
.Returns(Task.CompletedTask);
1231+
1232+
// ProcessPartAsync succeeds for Part 1 (discovery)
1233+
mockDataHandler
1234+
.Setup(x => x.ProcessPartAsync(1, It.IsAny<GetObjectResponse>(), It.IsAny<CancellationToken>()))
1235+
.Returns(Task.CompletedTask);
1236+
12331237
// Simulate error during PrepareAsync (before background task is created)
12341238
mockDataHandler
12351239
.Setup(x => x.PrepareAsync(It.IsAny<DownloadDiscoveryResult>(), It.IsAny<CancellationToken>()))
12361240
.ThrowsAsync(new InvalidOperationException("Simulated prepare failure"));
12371241

1238-
var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client();
1239-
var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest();
1242+
var totalParts = 2;
1243+
var partSize = 8 * 1024 * 1024;
1244+
var totalObjectSize = totalParts * partSize;
1245+
1246+
var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart(
1247+
totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true);
1248+
1249+
var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(
1250+
downloadType: MultipartDownloadType.PART);
12401251
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
12411252
var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object);
12421253

1243-
var discoveryResult = new DownloadDiscoveryResult
1244-
{
1245-
TotalParts = 2,
1246-
ObjectSize = 16 * 1024 * 1024,
1247-
InitialResponse = new GetObjectResponse()
1248-
};
1254+
// Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore
1255+
var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
12491256

12501257
// Act & Assert
12511258
try
@@ -1599,26 +1606,25 @@ public async Task StartDownloadsAsync_PassesCancellationTokenToBufferManager()
15991606
public async Task StartDownloadsAsync_SinglePart_DoesNotThrowOnCancellation()
16001607
{
16011608
// Arrange - Single part download should return immediately without using cancellation token
1602-
var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client();
1609+
var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024);
1610+
var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(
1611+
(req, ct) => Task.FromResult(mockResponse));
1612+
16031613
var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest();
16041614
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
16051615
var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object);
16061616

1607-
var discoveryResult = new DownloadDiscoveryResult
1608-
{
1609-
TotalParts = 1,
1610-
ObjectSize = 1024,
1611-
InitialResponse = new GetObjectResponse()
1612-
};
1617+
// Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore
1618+
var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
16131619

16141620
var cts = new CancellationTokenSource();
16151621
cts.Cancel();
16161622

16171623
// Act - should complete without throwing even though token is cancelled
16181624
await coordinator.StartDownloadsAsync(discoveryResult, null, cts.Token);
16191625

1620-
// Assert - no exception thrown, no S3 calls made
1621-
mockClient.Verify(x => x.GetObjectAsync(It.IsAny<GetObjectRequest>(), It.IsAny<CancellationToken>()), Times.Never);
1626+
// Assert - discovery already made the S3 call, StartDownloadsAsync doesn't make additional calls for single-part
1627+
mockClient.Verify(x => x.GetObjectAsync(It.IsAny<GetObjectRequest>(), It.IsAny<CancellationToken>()), Times.Once);
16221628
}
16231629

16241630
[TestMethod]
@@ -1831,19 +1837,17 @@ public async Task StartDownloadsAsync_ReturnsImmediately_PreventsDeadlock()
18311837
public async Task StartDownloadsAsync_SinglePart_ReturnsImmediatelyWithoutBackgroundTask()
18321838
{
18331839
// Arrange - Single-part downloads should not create background tasks
1834-
var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client();
1840+
var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024);
1841+
var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(
1842+
(req, ct) => Task.FromResult(mockResponse));
18351843
var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest();
18361844
var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration();
18371845

18381846
var mockDataHandler = CreateMockDataHandler();
18391847
var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object);
18401848

1841-
var discoveryResult = new DownloadDiscoveryResult
1842-
{
1843-
TotalParts = 1,
1844-
ObjectSize = 1024,
1845-
InitialResponse = new GetObjectResponse()
1846-
};
1849+
// Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore
1850+
var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None);
18471851

18481852
// Act
18491853
var stopwatch = System.Diagnostics.Stopwatch.StartNew();

0 commit comments

Comments
 (0)