diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs index 82d0a8f4d590..256a0228d086 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/BufferedPartDataHandler.cs @@ -153,9 +153,6 @@ private void ProcessStreamingPart( // If ReleaseBufferSpace() throws, we no longer own the data source, so we won't dispose it streamingDataSource = null; - // Release capacity immediately since we're not holding anything in memory - _partBufferManager.ReleaseBufferSpace(); - _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] StreamingDataSource added and capacity released", partNumber); } diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs index 939cedffad55..1537d48c58da 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs @@ -159,7 +159,10 @@ public MultipartDownloadManager(IAmazonS3 s3Client, BaseDownloadRequest request, } else { - _httpConcurrencySlots = new SemaphoreSlim(_config.ConcurrentServiceRequests); + _httpConcurrencySlots = new SemaphoreSlim( + _config.ConcurrentServiceRequests, // initialCount + _config.ConcurrentServiceRequests // maxCount - prevents exceeding configured limit + ); _ownsHttpThrottler = true; // We own it, so we dispose it } } diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs index 16baf6644384..33edf2fa0ad1 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/PartBufferManager.cs @@ -199,7 +199,10 @@ public PartBufferManager(BufferedDownloadConfiguration config) throw new ArgumentNullException(nameof(config)); _partDataSources = new ConcurrentDictionary(); - _bufferSpaceAvailable = new SemaphoreSlim(config.MaxInMemoryParts); + _bufferSpaceAvailable = new SemaphoreSlim( + config.MaxInMemoryParts, // initialCount + config.MaxInMemoryParts // maxCount - prevents exceeding configured limit + ); _partAvailable = new AutoResetEvent(false); Logger.DebugFormat("PartBufferManager initialized with MaxInMemoryParts={0}", config.MaxInMemoryParts); diff --git a/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs index 6a98b8fbaba2..645dae927051 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/BufferedPartDataHandlerTests.cs @@ -463,10 +463,6 @@ public async Task ProcessPartAsync_InOrderVsOutOfOrder_VerifyStreamingVsBufferin // Part 3 should use buffering path (ArrayPool allocation) Assert.AreEqual(1, bufferedPartNumbers.Count, "Expected exactly 1 part to be buffered"); Assert.AreEqual(3, bufferedPartNumbers[0], "Part 3 should be buffered"); - - // Verify ReleaseBufferSpace was called for streaming path (immediate capacity release) - mockBufferManager.Verify(m => m.ReleaseBufferSpace(), Times.Once, - "Streaming path should release capacity immediately"); } finally { @@ -525,10 +521,6 @@ public async Task ProcessPartAsync_AllInOrderParts_NoBufferingAllStreaming() Assert.AreEqual(i + 1, streamingPartNumbers[i], $"Part {i + 1} should have streamed in order"); } - - // Verify capacity was released 5 times (once per streaming part) - mockBufferManager.Verify(m => m.ReleaseBufferSpace(), Times.Exactly(5), - "Capacity should be released immediately for each streaming part"); } finally { @@ -766,6 +758,142 @@ public void Dispose_MultipleCalls_IsIdempotent() #endregion + #region Semaphore Double Release Fix Tests + + [TestMethod] + public async Task ProcessPartAsync_StreamingPart_ReleasesCapacityOnlyOnce() + { + // This test verifies the fix for the double release bug in BufferedPartDataHandler. + // Before the fix: ProcessStreamingPart() called ReleaseBufferSpace() immediately after + // adding the StreamingDataSource, causing capacity to be released twice (once immediately, + // once later when the consumer finished reading the part). + // After the fix: The immediate ReleaseBufferSpace() call was removed. Capacity is released + // only once when the consumer finishes reading the part through PartBufferManager. + + // Arrange + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); + var mockBufferManager = new Mock(); + mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); + + var releaseCount = 0; + mockBufferManager.Setup(m => m.ReleaseBufferSpace()) + .Callback(() => releaseCount++); + + mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())); + + var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); + + try + { + var response = CreateMockGetObjectResponse(512); + + // Act - Process an in-order (streaming) part + await handler.ProcessPartAsync(1, response, CancellationToken.None); + + // Assert - ReleaseBufferSpace should NOT have been called during ProcessPartAsync + // (The removed code that called it immediately has been deleted) + // Capacity will be released later by PartBufferManager when consumer finishes reading + Assert.AreEqual(0, releaseCount, + "ProcessPartAsync should not release capacity for streaming parts. " + + "Capacity is released by PartBufferManager when consumer completes reading."); + + // Verify AddBuffer was called with StreamingDataSource (streaming path taken) + mockBufferManager.Verify(m => m.AddBuffer( + It.Is(ds => ds is StreamingDataSource)), Times.Once); + } + finally + { + handler.Dispose(); + } + } + + [TestMethod] + public async Task ProcessPartAsync_BufferedPart_DoesNotReleaseCapacityImmediately() + { + // This test verifies that buffered (out-of-order) parts don't release capacity immediately. + // Capacity is released later by PartBufferManager when the consumer finishes reading the part. + + // Arrange + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); + var mockBufferManager = new Mock(); + mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); + + var releaseCount = 0; + mockBufferManager.Setup(m => m.ReleaseBufferSpace()) + .Callback(() => releaseCount++); + + mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())); + + var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); + + try + { + var response = CreateMockGetObjectResponse(512); + + // Act - Process an out-of-order (buffered) part + await handler.ProcessPartAsync(3, response, CancellationToken.None); + + // Assert - ReleaseBufferSpace should NOT have been called + // Capacity will be released later by PartBufferManager when consumer finishes reading + Assert.AreEqual(0, releaseCount, + "ProcessPartAsync should not release capacity for buffered parts. " + + "Capacity is released by PartBufferManager when consumer completes reading."); + + // Verify AddBuffer was called with StreamPartBuffer (buffering path taken) + mockBufferManager.Verify(m => m.AddBuffer( + It.IsAny()), Times.Once); + } + finally + { + handler.Dispose(); + } + } + + [TestMethod] + public async Task ProcessPartAsync_StreamingPartError_DoesNotDoubleRelease() + { + // This test verifies that when an error occurs during streaming part processing, + // capacity is released correctly through ReleaseCapacity() without double-releasing. + + // Arrange + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); + var mockBufferManager = new Mock(); + mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); + + var releaseCount = 0; + mockBufferManager.Setup(m => m.ReleaseBufferSpace()) + .Callback(() => releaseCount++); + + // Simulate error when adding buffer + mockBufferManager.Setup(m => m.AddBuffer(It.IsAny())) + .Throws(new InvalidOperationException("Test error")); + + var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); + + try + { + var response = CreateMockGetObjectResponse(512); + + // Act & Assert - Should throw + await Assert.ThrowsExceptionAsync(async () => + { + await handler.ProcessPartAsync(1, response, CancellationToken.None); + }); + + // Verify ReleaseBufferSpace was NOT called during error handling + // (The old double-release bug would have called it, causing issues) + Assert.AreEqual(0, releaseCount, + "Error handling should not release capacity for streaming parts. " + + "Streaming parts don't hold capacity slots in BufferedPartDataHandler."); + } + finally + { + handler.Dispose(); + } + } + + #endregion + #region Helper Methods /// diff --git a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs index ecf90cbc4d67..d39db70c38d3 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs @@ -601,25 +601,19 @@ public async Task DiscoverUsingRangeStrategy_CalculatesPartCount() public async Task StartDownloadsAsync_SinglePart_ReturnsImmediately() { // Arrange - var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); + var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024); + var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( + (req, ct) => Task.FromResult(mockResponse)); var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - var discoveryResult = new DownloadDiscoveryResult - { - TotalParts = 1, - ObjectSize = 1024, - InitialResponse = new GetObjectResponse() - }; - - var mockBufferManager = new Mock(); - - // Act + // Act - Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); - // Assert - should complete without any downloads - mockClient.Verify(x => x.GetObjectAsync(It.IsAny(), It.IsAny()), Times.Never); + // Assert - should complete without any additional downloads (discovery already made the call) + mockClient.Verify(x => x.GetObjectAsync(It.IsAny(), It.IsAny()), Times.Once); } [TestMethod] @@ -1230,22 +1224,35 @@ public async Task StartDownloadsAsync_EarlyError_DisposesCancellationTokenSource // Arrange - Test CancellationTokenSource disposal when error occurs before background task starts var mockDataHandler = new Mock(); + // WaitForCapacityAsync succeeds (needed for discovery) + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + // ProcessPartAsync succeeds for Part 1 (discovery) + mockDataHandler + .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + // Simulate error during PrepareAsync (before background task is created) mockDataHandler .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) .ThrowsAsync(new InvalidOperationException("Simulated prepare failure")); - var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); - var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); + var totalParts = 2; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); - var discoveryResult = new DownloadDiscoveryResult - { - TotalParts = 2, - ObjectSize = 16 * 1024 * 1024, - InitialResponse = new GetObjectResponse() - }; + // Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); // Act & Assert try @@ -1599,17 +1606,16 @@ public async Task StartDownloadsAsync_PassesCancellationTokenToBufferManager() public async Task StartDownloadsAsync_SinglePart_DoesNotThrowOnCancellation() { // Arrange - Single part download should return immediately without using cancellation token - var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); + var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024); + var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( + (req, ct) => Task.FromResult(mockResponse)); + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); - var discoveryResult = new DownloadDiscoveryResult - { - TotalParts = 1, - ObjectSize = 1024, - InitialResponse = new GetObjectResponse() - }; + // Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); var cts = new CancellationTokenSource(); cts.Cancel(); @@ -1617,8 +1623,8 @@ public async Task StartDownloadsAsync_SinglePart_DoesNotThrowOnCancellation() // Act - should complete without throwing even though token is cancelled await coordinator.StartDownloadsAsync(discoveryResult, null, cts.Token); - // Assert - no exception thrown, no S3 calls made - mockClient.Verify(x => x.GetObjectAsync(It.IsAny(), It.IsAny()), Times.Never); + // Assert - discovery already made the S3 call, StartDownloadsAsync doesn't make additional calls for single-part + mockClient.Verify(x => x.GetObjectAsync(It.IsAny(), It.IsAny()), Times.Once); } [TestMethod] @@ -1831,19 +1837,17 @@ public async Task StartDownloadsAsync_ReturnsImmediately_PreventsDeadlock() public async Task StartDownloadsAsync_SinglePart_ReturnsImmediatelyWithoutBackgroundTask() { // Arrange - Single-part downloads should not create background tasks - var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); + var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024); + var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( + (req, ct) => Task.FromResult(mockResponse)); var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); var mockDataHandler = CreateMockDataHandler(); var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); - var discoveryResult = new DownloadDiscoveryResult - { - TotalParts = 1, - ObjectSize = 1024, - InitialResponse = new GetObjectResponse() - }; + // Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); // Act var stopwatch = System.Diagnostics.Stopwatch.StartNew(); diff --git a/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs index 121c50f0dede..37ab24be0fb3 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/PartBufferManagerTests.cs @@ -82,6 +82,7 @@ public async Task NextExpectedPartNumber_IncrementsAfterPartComplete() // Add part 1 byte[] testBuffer = ArrayPool.Shared.Rent(512); var partBuffer = new StreamPartBuffer(1, testBuffer, 512); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(partBuffer); // Read part 1 completely @@ -244,6 +245,8 @@ public async Task AddBuffer_CreatesBufferedDataSource() byte[] testBuffer = ArrayPool.Shared.Rent(512); var partBuffer = new StreamPartBuffer(1, testBuffer, 512); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); + // Act manager.AddBuffer(partBuffer); @@ -301,6 +304,7 @@ public async Task AddBuffer_SignalsPartAvailable() // Add the part byte[] testBuffer = ArrayPool.Shared.Rent(512); var partBuffer = new StreamPartBuffer(1, testBuffer, 512); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(partBuffer); // Assert - Read should complete @@ -331,6 +335,7 @@ public async Task AddDataSource_AddsToCollection() var dataSource = new BufferedDataSource(partBuffer); // Act + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddDataSource(dataSource); // Assert - Should be able to read from part 1 @@ -415,6 +420,7 @@ public async Task ReadAsync_ReadsDataSequentially() Buffer.BlockCopy(testData, 0, testBuffer, 0, 512); var partBuffer = new StreamPartBuffer(1, testBuffer, 512); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(partBuffer); // Act @@ -443,6 +449,7 @@ public async Task ReadAsync_AdvancesNextExpectedPartNumber() // Add part 1 byte[] testBuffer = ArrayPool.Shared.Rent(512); var partBuffer = new StreamPartBuffer(1, testBuffer, 512); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(partBuffer); // Read part 1 completely @@ -576,6 +583,7 @@ public async Task ReadAsync_WaitsForPartAvailability() // Add the part asynchronously byte[] testBuffer = ArrayPool.Shared.Rent(512); var partBuffer = new StreamPartBuffer(1, testBuffer, 512); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(partBuffer); // Assert - Read should complete @@ -657,6 +665,7 @@ public async Task ReadAsync_ReadingAcrossPartBoundary_FillsBuffer() byte[] testBuffer1 = ArrayPool.Shared.Rent(100); Buffer.BlockCopy(testData1, 0, testBuffer1, 0, 100); var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 100); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(partBuffer1); // Add Part 2 (100 bytes) @@ -664,6 +673,7 @@ public async Task ReadAsync_ReadingAcrossPartBoundary_FillsBuffer() byte[] testBuffer2 = ArrayPool.Shared.Rent(100); Buffer.BlockCopy(testData2, 0, testBuffer2, 0, 100); var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(partBuffer2); // Act - Request 150 bytes (spans both parts) @@ -704,6 +714,7 @@ public async Task ReadAsync_MultiplePartsInSingleRead_AdvancesCorrectly() byte[] testBuffer = ArrayPool.Shared.Rent(50); Buffer.BlockCopy(testData, 0, testBuffer, 0, 50); var partBuffer = new StreamPartBuffer(i, testBuffer, 50); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(partBuffer); } @@ -733,6 +744,7 @@ public async Task ReadAsync_PartCompletes_AdvancesToNextPart() // Add part 1 byte[] testBuffer1 = ArrayPool.Shared.Rent(100); var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 100); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(partBuffer1); // Read part 1 completely @@ -745,6 +757,7 @@ public async Task ReadAsync_PartCompletes_AdvancesToNextPart() // Add part 2 byte[] testBuffer2 = ArrayPool.Shared.Rent(100); var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(partBuffer2); // Read part 2 @@ -772,6 +785,7 @@ public async Task ReadAsync_EmptyPart_ContinuesToNextPart() // Add empty part 1 byte[] testBuffer1 = ArrayPool.Shared.Rent(100); var partBuffer1 = new StreamPartBuffer(1, testBuffer1, 0); // 0 bytes + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(partBuffer1); // Add part 2 with data @@ -779,6 +793,7 @@ public async Task ReadAsync_EmptyPart_ContinuesToNextPart() byte[] testBuffer2 = ArrayPool.Shared.Rent(100); Buffer.BlockCopy(testData2, 0, testBuffer2, 0, 100); var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 100); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(partBuffer2); // Act - Try to read 100 bytes starting from part 1 @@ -959,6 +974,7 @@ public async Task AddBufferAsync_IPartDataSource_WithStreamingDataSource_AddsSuc var streamingSource = new StreamingDataSource(1, response); // Act + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(streamingSource); // Assert - Should be able to read from part 1 @@ -990,6 +1006,7 @@ public async Task AddBufferAsync_IPartDataSource_WithBufferedDataSource_AddsSucc var bufferedSource = new BufferedDataSource(partBuffer); // Act + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(bufferedSource); // Assert - Should be able to read from part 1 @@ -1054,6 +1071,7 @@ public async Task AddBufferAsync_IPartDataSource_SignalsPartAvailable() var streamingSource = new StreamingDataSource(1, response); // Act + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(streamingSource); // Assert - Read should complete @@ -1087,6 +1105,7 @@ public async Task ReadAsync_FromStreamingDataSource_ReadsCorrectly() ResponseStream = new MemoryStream(testData) }; var streamingSource = new StreamingDataSource(1, response); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(streamingSource); // Act - Read in multiple chunks @@ -1126,6 +1145,7 @@ public async Task ReadAsync_FromMixedSources_ReadsSequentially() ResponseStream = new MemoryStream(testData1) }; var streamingSource = new StreamingDataSource(1, response1); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer((IPartDataSource)streamingSource); // Add buffered source for part 2 @@ -1133,6 +1153,7 @@ public async Task ReadAsync_FromMixedSources_ReadsSequentially() byte[] testBuffer2 = ArrayPool.Shared.Rent(500); Buffer.BlockCopy(testData2, 0, testBuffer2, 0, 500); var partBuffer2 = new StreamPartBuffer(2, testBuffer2, 500); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(partBuffer2); // Act - Read across both parts @@ -1173,6 +1194,7 @@ public async Task ReadAsync_StreamingDataSource_DisposesAfterCompletion() ResponseStream = new MemoryStream(testData) }; var streamingSource = new StreamingDataSource(1, response); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(streamingSource); // Act - Read all data @@ -1208,6 +1230,7 @@ public async Task ReadAsync_MultipleStreamingSources_ReadsSequentially() ResponseStream = new MemoryStream(testData) }; var streamingSource = new StreamingDataSource(i, response); + await manager.WaitForBufferSpaceAsync(CancellationToken.None); manager.AddBuffer(streamingSource); } @@ -1368,6 +1391,9 @@ public async Task NextExpectedPartNumber_ConcurrentReads_SeeConsistentValue() // by adding and reading parts sequentially for (int partNum = 1; partNum <= NumIncrements; partNum++) { + // Wait for buffer space before adding part + await manager.WaitForBufferSpaceAsync(CancellationToken.None); + // Add part byte[] testBuffer = ArrayPool.Shared.Rent(100); var partBuffer = new StreamPartBuffer(partNum, testBuffer, 100); @@ -1413,5 +1439,218 @@ public async Task NextExpectedPartNumber_ConcurrentReads_SeeConsistentValue() } #endregion + + #region Semaphore MaxCount Tests + + [TestMethod] + public async Task WaitForBufferSpaceAsync_WithMaxCount_DoesNotExceedConfiguredLimit() + { + // This test verifies the fix for the double release bug. + // Before the fix: SemaphoreSlim without maxCount allowed unlimited Release() calls, + // which could corrupt the semaphore state and allow more concurrent operations than configured. + // After the fix: maxCount parameter prevents exceeding MaxInMemoryParts limit. + + // Arrange + const int maxInMemoryParts = 3; + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(maxInMemoryParts: maxInMemoryParts); + var manager = new PartBufferManager(config); + + try + { + // Acquire all available slots + for (int i = 0; i < maxInMemoryParts; i++) + { + await manager.WaitForBufferSpaceAsync(CancellationToken.None); + } + + // Release all acquired slots + for (int i = 0; i < maxInMemoryParts; i++) + { + manager.ReleaseBufferSpace(); + } + + // Attempt to release beyond maxCount (should throw) + Assert.ThrowsException(() => + { + manager.ReleaseBufferSpace(); + }, "Releasing beyond maxCount should throw SemaphoreFullException"); + + // Attempt one more release to confirm protection is consistent + Assert.ThrowsException(() => + { + manager.ReleaseBufferSpace(); + }, "Second excessive release should also throw SemaphoreFullException"); + + // Act - Try to acquire slots again + var acquiredSlots = 0; + for (int i = 0; i < maxInMemoryParts + 2; i++) + { + var waitTask = manager.WaitForBufferSpaceAsync(CancellationToken.None); + if (await Task.WhenAny(waitTask, Task.Delay(100)) == waitTask) + { + acquiredSlots++; + } + else + { + break; // Task didn't complete, no more slots available + } + } + + // Assert - Should only be able to acquire maxInMemoryParts slots, not more + // With maxCount fix: Can only acquire 3 slots (respects limit) + // Without maxCount fix: Could acquire 5 slots (2 extra from double releases) + Assert.AreEqual(maxInMemoryParts, acquiredSlots, + $"Semaphore should respect maxCount={maxInMemoryParts} limit despite excessive releases"); + } + finally + { + manager.Dispose(); + } + } + + [TestMethod] + public async Task ReleaseBufferSpace_ExcessiveReleases_MaintainsSemaphoreIntegrity() + { + // This test verifies that excessive Release() calls don't corrupt semaphore state. + // The maxCount parameter ensures CurrentCount never exceeds MaxInMemoryParts. + + // Arrange + const int maxInMemoryParts = 5; + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(maxInMemoryParts: maxInMemoryParts); + var manager = new PartBufferManager(config); + + try + { + // Acquire half the slots + for (int i = 0; i < maxInMemoryParts / 2; i++) + { + await manager.WaitForBufferSpaceAsync(CancellationToken.None); + } + + // Release the acquired slots + for (int i = 0; i < maxInMemoryParts / 2; i++) + { + manager.ReleaseBufferSpace(); + } + + // Now semaphore should be at full capacity (maxInMemoryParts) + // Attempt to release beyond maxCount - each should throw + var excessiveReleaseCount = 0; + for (int i = 0; i < 5; i++) + { + try + { + manager.ReleaseBufferSpace(); + Assert.Fail($"Release #{i + 1} beyond maxCount should have thrown SemaphoreFullException"); + } + catch (SemaphoreFullException) + { + excessiveReleaseCount++; + } + } + + // Assert - All excessive releases should have thrown + Assert.AreEqual(5, excessiveReleaseCount, "All excessive releases should throw SemaphoreFullException"); + + // Act - Count how many slots are now available + var availableSlots = 0; + for (int i = 0; i < maxInMemoryParts * 2; i++) + { + var waitTask = manager.WaitForBufferSpaceAsync(CancellationToken.None); + if (waitTask.IsCompleted) + { + availableSlots++; + await waitTask; + } + else + { + break; + } + } + + // Assert - Should never exceed maxInMemoryParts + Assert.IsTrue(availableSlots <= maxInMemoryParts, + $"Available slots ({availableSlots}) should not exceed maxInMemoryParts ({maxInMemoryParts})"); + } + finally + { + manager.Dispose(); + } + } + + [TestMethod] + public async Task BufferCapacity_ConcurrentOperations_RespectsMaxCountLimit() + { + // This test simulates the real-world scenario where multiple parts are being + // processed concurrently, verifying that the maxCount parameter prevents + // exceeding the configured buffer capacity limit. + + // Arrange + const int maxInMemoryParts = 4; + const int totalParts = 10; + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(maxInMemoryParts: maxInMemoryParts); + var manager = new PartBufferManager(config); + + try + { + var activeParts = 0; + var maxActiveParts = 0; + var lockObj = new object(); + + // Simulate concurrent part processing + var tasks = new List(); + for (int partNum = 1; partNum <= totalParts; partNum++) + { + int capturedPartNum = partNum; + tasks.Add(Task.Run(async () => + { + // Wait for buffer space (enforces maxInMemoryParts limit) + await manager.WaitForBufferSpaceAsync(CancellationToken.None); + + lock (lockObj) + { + activeParts++; + if (activeParts > maxActiveParts) + { + maxActiveParts = activeParts; + } + } + + // Simulate buffering the part + byte[] testBuffer = ArrayPool.Shared.Rent(100); + var partBuffer = new StreamPartBuffer(capturedPartNum, testBuffer, 100); + manager.AddBuffer(partBuffer); + + // Simulate some processing time + await Task.Delay(10); + + // Consumer reads the part (happens asynchronously in real scenario) + // For this test, we'll manually release after a delay + await Task.Delay(20); + + lock (lockObj) + { + activeParts--; + } + + // Release is normally done by consumer after reading part + manager.ReleaseBufferSpace(); + })); + } + + // Wait for all parts to be processed + await Task.WhenAll(tasks); + + // Assert - Should never have exceeded maxInMemoryParts + Assert.IsTrue(maxActiveParts <= maxInMemoryParts, + $"Maximum concurrent buffered parts ({maxActiveParts}) exceeded configured limit ({maxInMemoryParts})"); + } + finally + { + manager.Dispose(); + } + } + + #endregion } }