-
Notifications
You must be signed in to change notification settings - Fork 873
use max size semaphore #4220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
use max size semaphore #4220
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -153,9 +153,6 @@ private void ProcessStreamingPart( | |
| // If ReleaseBufferSpace() throws, we no longer own the data source, so we won't dispose it | ||
| streamingDataSource = null; | ||
|
|
||
| // Release capacity immediately since we're not holding anything in memory | ||
| _partBufferManager.ReleaseBufferSpace(); | ||
|
|
||
| _logger.DebugFormat("BufferedPartDataHandler: [Part {0}] StreamingDataSource added and capacity released", | ||
| partNumber); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was being released here and then also when the reader read the part later on. so we remove this to fix the double release. Side note: |
||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -463,10 +463,6 @@ public async Task ProcessPartAsync_InOrderVsOutOfOrder_VerifyStreamingVsBufferin | |
| // Part 3 should use buffering path (ArrayPool allocation) | ||
| Assert.AreEqual(1, bufferedPartNumbers.Count, "Expected exactly 1 part to be buffered"); | ||
| Assert.AreEqual(3, bufferedPartNumbers[0], "Part 3 should be buffered"); | ||
|
|
||
| // Verify ReleaseBufferSpace was called for streaming path (immediate capacity release) | ||
| mockBufferManager.Verify(m => m.ReleaseBufferSpace(), Times.Once, | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this isnt true anymore since we hold it until the reader consumes it which is why i removed it |
||
| "Streaming path should release capacity immediately"); | ||
| } | ||
| finally | ||
| { | ||
|
|
@@ -525,10 +521,6 @@ public async Task ProcessPartAsync_AllInOrderParts_NoBufferingAllStreaming() | |
| Assert.AreEqual(i + 1, streamingPartNumbers[i], | ||
| $"Part {i + 1} should have streamed in order"); | ||
| } | ||
|
|
||
| // Verify capacity was released 5 times (once per streaming part) | ||
| mockBufferManager.Verify(m => m.ReleaseBufferSpace(), Times.Exactly(5), | ||
| "Capacity should be released immediately for each streaming part"); | ||
| } | ||
| finally | ||
| { | ||
|
|
@@ -766,6 +758,142 @@ public void Dispose_MultipleCalls_IsIdempotent() | |
|
|
||
| #endregion | ||
|
|
||
| #region Semaphore Double Release Fix Tests | ||
|
|
||
| [TestMethod] | ||
| public async Task ProcessPartAsync_StreamingPart_ReleasesCapacityOnlyOnce() | ||
| { | ||
| // This test verifies the fix for the double release bug in BufferedPartDataHandler. | ||
| // Before the fix: ProcessStreamingPart() called ReleaseBufferSpace() immediately after | ||
| // adding the StreamingDataSource, causing capacity to be released twice (once immediately, | ||
| // once later when the consumer finished reading the part). | ||
| // After the fix: The immediate ReleaseBufferSpace() call was removed. Capacity is released | ||
| // only once when the consumer finishes reading the part through PartBufferManager. | ||
|
|
||
| // Arrange | ||
| var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); | ||
| var mockBufferManager = new Mock<IPartBufferManager>(); | ||
| mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); | ||
|
|
||
| var releaseCount = 0; | ||
| mockBufferManager.Setup(m => m.ReleaseBufferSpace()) | ||
| .Callback(() => releaseCount++); | ||
|
|
||
| mockBufferManager.Setup(m => m.AddBuffer(It.IsAny<IPartDataSource>())); | ||
|
|
||
| var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); | ||
|
|
||
| try | ||
| { | ||
| var response = CreateMockGetObjectResponse(512); | ||
|
|
||
| // Act - Process an in-order (streaming) part | ||
| await handler.ProcessPartAsync(1, response, CancellationToken.None); | ||
|
|
||
| // Assert - ReleaseBufferSpace should NOT have been called during ProcessPartAsync | ||
| // (The removed code that called it immediately has been deleted) | ||
| // Capacity will be released later by PartBufferManager when consumer finishes reading | ||
| Assert.AreEqual(0, releaseCount, | ||
| "ProcessPartAsync should not release capacity for streaming parts. " + | ||
| "Capacity is released by PartBufferManager when consumer completes reading."); | ||
|
|
||
| // Verify AddBuffer was called with StreamingDataSource (streaming path taken) | ||
| mockBufferManager.Verify(m => m.AddBuffer( | ||
| It.Is<IPartDataSource>(ds => ds is StreamingDataSource)), Times.Once); | ||
| } | ||
| finally | ||
| { | ||
| handler.Dispose(); | ||
| } | ||
| } | ||
|
|
||
| [TestMethod] | ||
| public async Task ProcessPartAsync_BufferedPart_DoesNotReleaseCapacityImmediately() | ||
| { | ||
| // This test verifies that buffered (out-of-order) parts don't release capacity immediately. | ||
| // Capacity is released later by PartBufferManager when the consumer finishes reading the part. | ||
|
|
||
| // Arrange | ||
| var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); | ||
| var mockBufferManager = new Mock<IPartBufferManager>(); | ||
| mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); | ||
|
|
||
| var releaseCount = 0; | ||
| mockBufferManager.Setup(m => m.ReleaseBufferSpace()) | ||
| .Callback(() => releaseCount++); | ||
|
|
||
| mockBufferManager.Setup(m => m.AddBuffer(It.IsAny<StreamPartBuffer>())); | ||
|
|
||
| var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); | ||
|
|
||
| try | ||
| { | ||
| var response = CreateMockGetObjectResponse(512); | ||
|
|
||
| // Act - Process an out-of-order (buffered) part | ||
| await handler.ProcessPartAsync(3, response, CancellationToken.None); | ||
|
|
||
| // Assert - ReleaseBufferSpace should NOT have been called | ||
| // Capacity will be released later by PartBufferManager when consumer finishes reading | ||
| Assert.AreEqual(0, releaseCount, | ||
| "ProcessPartAsync should not release capacity for buffered parts. " + | ||
| "Capacity is released by PartBufferManager when consumer completes reading."); | ||
|
|
||
| // Verify AddBuffer was called with StreamPartBuffer (buffering path taken) | ||
| mockBufferManager.Verify(m => m.AddBuffer( | ||
| It.IsAny<StreamPartBuffer>()), Times.Once); | ||
| } | ||
| finally | ||
| { | ||
| handler.Dispose(); | ||
| } | ||
| } | ||
|
|
||
| [TestMethod] | ||
| public async Task ProcessPartAsync_StreamingPartError_DoesNotDoubleRelease() | ||
| { | ||
| // This test verifies that when an error occurs during streaming part processing, | ||
| // capacity is released correctly through ReleaseCapacity() without double-releasing. | ||
|
|
||
| // Arrange | ||
| var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); | ||
| var mockBufferManager = new Mock<IPartBufferManager>(); | ||
| mockBufferManager.Setup(m => m.NextExpectedPartNumber).Returns(1); | ||
|
|
||
| var releaseCount = 0; | ||
| mockBufferManager.Setup(m => m.ReleaseBufferSpace()) | ||
| .Callback(() => releaseCount++); | ||
|
|
||
| // Simulate error when adding buffer | ||
| mockBufferManager.Setup(m => m.AddBuffer(It.IsAny<IPartDataSource>())) | ||
| .Throws(new InvalidOperationException("Test error")); | ||
|
|
||
| var handler = new BufferedPartDataHandler(mockBufferManager.Object, config); | ||
|
|
||
| try | ||
| { | ||
| var response = CreateMockGetObjectResponse(512); | ||
|
|
||
| // Act & Assert - Should throw | ||
| await Assert.ThrowsExceptionAsync<InvalidOperationException>(async () => | ||
| { | ||
| await handler.ProcessPartAsync(1, response, CancellationToken.None); | ||
| }); | ||
|
|
||
| // Verify ReleaseBufferSpace was NOT called during error handling | ||
| // (The old double-release bug would have called it, causing issues) | ||
| Assert.AreEqual(0, releaseCount, | ||
| "Error handling should not release capacity for streaming parts. " + | ||
| "Streaming parts don't hold capacity slots in BufferedPartDataHandler."); | ||
| } | ||
| finally | ||
| { | ||
| handler.Dispose(); | ||
| } | ||
| } | ||
|
|
||
| #endregion | ||
|
|
||
| #region Helper Methods | ||
|
|
||
| /// <summary> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -601,25 +601,19 @@ public async Task DiscoverUsingRangeStrategy_CalculatesPartCount() | |
| public async Task StartDownloadsAsync_SinglePart_ReturnsImmediately() | ||
| { | ||
| // Arrange | ||
| var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); | ||
| var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. had to update all of these tests to call DiscoverDownloadStrategyAsync/reservere buffer space before doing any action that would release buffer space. otherwise it was getting exceeded semaphore maximum error (which is expected now) |
||
| var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( | ||
| (req, ct) => Task.FromResult(mockResponse)); | ||
| var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); | ||
| var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); | ||
| var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); | ||
|
|
||
| var discoveryResult = new DownloadDiscoveryResult | ||
| { | ||
| TotalParts = 1, | ||
| ObjectSize = 1024, | ||
| InitialResponse = new GetObjectResponse() | ||
| }; | ||
|
|
||
| var mockBufferManager = new Mock<IPartBufferManager>(); | ||
|
|
||
| // Act | ||
| // Act - Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore | ||
| var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); | ||
| await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); | ||
|
|
||
| // Assert - should complete without any downloads | ||
| mockClient.Verify(x => x.GetObjectAsync(It.IsAny<GetObjectRequest>(), It.IsAny<CancellationToken>()), Times.Never); | ||
| // Assert - should complete without any additional downloads (discovery already made the call) | ||
| mockClient.Verify(x => x.GetObjectAsync(It.IsAny<GetObjectRequest>(), It.IsAny<CancellationToken>()), Times.Once); | ||
| } | ||
|
|
||
| [TestMethod] | ||
|
|
@@ -1230,22 +1224,35 @@ public async Task StartDownloadsAsync_EarlyError_DisposesCancellationTokenSource | |
| // Arrange - Test CancellationTokenSource disposal when error occurs before background task starts | ||
| var mockDataHandler = new Mock<IPartDataHandler>(); | ||
|
|
||
| // WaitForCapacityAsync succeeds (needed for discovery) | ||
| mockDataHandler | ||
| .Setup(x => x.WaitForCapacityAsync(It.IsAny<CancellationToken>())) | ||
| .Returns(Task.CompletedTask); | ||
|
|
||
| // ProcessPartAsync succeeds for Part 1 (discovery) | ||
| mockDataHandler | ||
| .Setup(x => x.ProcessPartAsync(1, It.IsAny<GetObjectResponse>(), It.IsAny<CancellationToken>())) | ||
| .Returns(Task.CompletedTask); | ||
|
|
||
| // Simulate error during PrepareAsync (before background task is created) | ||
| mockDataHandler | ||
| .Setup(x => x.PrepareAsync(It.IsAny<DownloadDiscoveryResult>(), It.IsAny<CancellationToken>())) | ||
| .ThrowsAsync(new InvalidOperationException("Simulated prepare failure")); | ||
|
|
||
| var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); | ||
| var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); | ||
| var totalParts = 2; | ||
| var partSize = 8 * 1024 * 1024; | ||
| var totalObjectSize = totalParts * partSize; | ||
|
|
||
| var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( | ||
| totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); | ||
|
|
||
| var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( | ||
| downloadType: MultipartDownloadType.PART); | ||
| var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); | ||
| var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); | ||
|
|
||
| var discoveryResult = new DownloadDiscoveryResult | ||
| { | ||
| TotalParts = 2, | ||
| ObjectSize = 16 * 1024 * 1024, | ||
| InitialResponse = new GetObjectResponse() | ||
| }; | ||
| // Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore | ||
| var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); | ||
|
|
||
| // Act & Assert | ||
| try | ||
|
|
@@ -1599,26 +1606,25 @@ public async Task StartDownloadsAsync_PassesCancellationTokenToBufferManager() | |
| public async Task StartDownloadsAsync_SinglePart_DoesNotThrowOnCancellation() | ||
| { | ||
| // Arrange - Single part download should return immediately without using cancellation token | ||
| var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); | ||
| var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024); | ||
| var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( | ||
| (req, ct) => Task.FromResult(mockResponse)); | ||
|
|
||
| var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); | ||
| var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); | ||
| var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, CreateMockDataHandler().Object); | ||
|
|
||
| var discoveryResult = new DownloadDiscoveryResult | ||
| { | ||
| TotalParts = 1, | ||
| ObjectSize = 1024, | ||
| InitialResponse = new GetObjectResponse() | ||
| }; | ||
| // Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore | ||
| var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); | ||
|
|
||
| var cts = new CancellationTokenSource(); | ||
| cts.Cancel(); | ||
|
|
||
| // Act - should complete without throwing even though token is cancelled | ||
| await coordinator.StartDownloadsAsync(discoveryResult, null, cts.Token); | ||
|
|
||
| // Assert - no exception thrown, no S3 calls made | ||
| mockClient.Verify(x => x.GetObjectAsync(It.IsAny<GetObjectRequest>(), It.IsAny<CancellationToken>()), Times.Never); | ||
| // Assert - discovery already made the S3 call, StartDownloadsAsync doesn't make additional calls for single-part | ||
| mockClient.Verify(x => x.GetObjectAsync(It.IsAny<GetObjectRequest>(), It.IsAny<CancellationToken>()), Times.Once); | ||
| } | ||
|
|
||
| [TestMethod] | ||
|
|
@@ -1831,19 +1837,17 @@ public async Task StartDownloadsAsync_ReturnsImmediately_PreventsDeadlock() | |
| public async Task StartDownloadsAsync_SinglePart_ReturnsImmediatelyWithoutBackgroundTask() | ||
| { | ||
| // Arrange - Single-part downloads should not create background tasks | ||
| var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); | ||
| var mockResponse = MultipartDownloadTestHelpers.CreateSinglePartResponse(1024); | ||
| var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( | ||
| (req, ct) => Task.FromResult(mockResponse)); | ||
| var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); | ||
| var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); | ||
|
|
||
| var mockDataHandler = CreateMockDataHandler(); | ||
| var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object); | ||
|
|
||
| var discoveryResult = new DownloadDiscoveryResult | ||
| { | ||
| TotalParts = 1, | ||
| ObjectSize = 1024, | ||
| InitialResponse = new GetObjectResponse() | ||
| }; | ||
| // Call DiscoverDownloadStrategyAsync first to properly acquire HTTP semaphore | ||
| var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); | ||
|
|
||
| // Act | ||
| var stopwatch = System.Diagnostics.Stopwatch.StartNew(); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was being released here and then also when the reader read the part later on. so we remove this to fix the double release.
Side note:
The original reason i had this hear was because my intent was not count streaming to the user as "holding the part in memory". But for simplicitys sake and this bug fix, we now consider a part that gets directly streamed to the user as "taking up memory". in the future we could probably optimize/change this if we want.