Skip to content

Commit c2594c7

Browse files
committed
queue fixes
remove downloadexception field code updates code updates code updates
1 parent d4a59a9 commit c2594c7

File tree

4 files changed

+759
-104
lines changed

4 files changed

+759
-104
lines changed

sdk/src/Services/S3/Custom/Transfer/Internal/IDownloadManager.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,6 @@ internal interface IDownloadManager : IDisposable
5555
/// with the previous two-method API.
5656
/// </remarks>
5757
Task<DownloadResult> StartDownloadAsync(EventHandler<WriteObjectProgressArgs> progressCallback, CancellationToken cancellationToken);
58-
59-
/// <summary>
60-
/// Exception that occurred during downloads, if any.
61-
/// </summary>
62-
Exception DownloadException { get; }
6358
}
6459

6560
/// <summary>

sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ internal class MultipartDownloadManager : IDownloadManager
4646
private readonly SemaphoreSlim _httpConcurrencySlots;
4747
private readonly bool _ownsHttpThrottler;
4848
private readonly RequestEventHandler _requestEventHandler;
49-
50-
private Exception _downloadException;
5149
private bool _disposed = false;
5250
private bool _discoveryCompleted = false;
5351
private Task _downloadCompletionTask;
@@ -166,15 +164,6 @@ public MultipartDownloadManager(IAmazonS3 s3Client, BaseDownloadRequest request,
166164
}
167165
}
168166

169-
/// <inheritdoc/>
170-
public Exception DownloadException
171-
{
172-
get
173-
{
174-
return _downloadException;
175-
}
176-
}
177-
178167
/// <summary>
179168
/// Discovers the download strategy and starts concurrent downloads in a single unified operation.
180169
/// This eliminates resource leakage by managing HTTP slots and buffer capacity internally.
@@ -259,7 +248,6 @@ private async Task<DownloadResult> PerformDiscoveryAsync(CancellationToken cance
259248
}
260249
catch (Exception ex)
261250
{
262-
_downloadException = ex;
263251
_logger.Error(ex, "MultipartDownloadManager: Discovery failed");
264252
throw;
265253
}
@@ -336,7 +324,6 @@ private async Task PerformDownloadsAsync(DownloadResult downloadResult, EventHan
336324
}
337325
catch (Exception ex)
338326
{
339-
_downloadException = ex;
340327
_logger.Error(ex, "MultipartDownloadManager: Download failed");
341328

342329
HandleDownloadError(ex, internalCts);
@@ -414,7 +401,7 @@ private async Task StartBackgroundDownloadsAsync(DownloadResult downloadResult,
414401
_logger.DebugFormat("MultipartDownloadManager: Background task waiting for {0} download tasks", expectedTaskCount);
415402

416403
// Wait for all downloads to complete (fails fast on first exception)
417-
await TaskHelpers.WhenAllOrFirstExceptionAsync(downloadTasks, internalCts.Token).ConfigureAwait(false);
404+
await TaskHelpers.WhenAllFailFastAsync(downloadTasks, internalCts.Token).ConfigureAwait(false);
418405

419406
_logger.DebugFormat("MultipartDownloadManager: All download tasks completed successfully");
420407

@@ -429,7 +416,6 @@ private async Task StartBackgroundDownloadsAsync(DownloadResult downloadResult,
429416
#pragma warning disable CA1031 // Do not catch general exception types
430417
catch (Exception ex)
431418
{
432-
_downloadException = ex;
433419
HandleDownloadError(ex, internalCts);
434420
throw;
435421
}
@@ -451,13 +437,21 @@ private async Task CreateDownloadTasksAsync(DownloadResult downloadResult, Event
451437
// Pre-acquire capacity in sequential order to prevent race condition deadlock
452438
// This ensures Part 2 gets capacity before Part 3, etc., preventing out-of-order
453439
// parts from consuming all buffer slots and blocking the next expected part
454-
for (int partNum = 2; partNum <= downloadResult.TotalParts; partNum++)
440+
for (int partNum = 2; partNum <= downloadResult.TotalParts && !internalCts.IsCancellationRequested; partNum++)
455441
{
456442
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNum);
457443

458444
// Acquire capacity sequentially - guarantees Part 2 before Part 3, etc.
459445
await _dataHandler.WaitForCapacityAsync(internalCts.Token).ConfigureAwait(false);
460446

447+
// Check cancellation after acquiring capacity - a task may have failed while waiting
448+
if (internalCts.IsCancellationRequested)
449+
{
450+
_logger.InfoFormat("MultipartDownloadManager: [Part {0}] Stopping early - cancellation requested after capacity acquired", partNum);
451+
_dataHandler.ReleaseCapacity();
452+
break;
453+
}
454+
461455
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Buffer space acquired", partNum);
462456

463457
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for HTTP concurrency slot (Available: {1}/{2})",
@@ -466,6 +460,15 @@ private async Task CreateDownloadTasksAsync(DownloadResult downloadResult, Event
466460
// Acquire HTTP slot in the loop before creating task
467461
// Loop will block here if all slots are in use
468462
await _httpConcurrencySlots.WaitAsync(internalCts.Token).ConfigureAwait(false);
463+
464+
// Check cancellation after acquiring HTTP slot - a task may have failed while waiting
465+
if (internalCts.IsCancellationRequested)
466+
{
467+
_logger.InfoFormat("MultipartDownloadManager: [Part {0}] Stopping early - cancellation requested after HTTP slot acquired", partNum);
468+
_httpConcurrencySlots.Release();
469+
_dataHandler.ReleaseCapacity();
470+
break;
471+
}
469472

470473
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot acquired", partNum);
471474

@@ -478,10 +481,16 @@ private async Task CreateDownloadTasksAsync(DownloadResult downloadResult, Event
478481
{
479482
// If task creation fails, release the HTTP slot we just acquired
480483
_httpConcurrencySlots.Release();
484+
_dataHandler.ReleaseCapacity();
481485
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot released due to task creation failure: {1}", partNum, ex);
482486
throw;
483487
}
484488
}
489+
490+
if (internalCts.IsCancellationRequested && downloadTasks.Count < downloadResult.TotalParts - 1)
491+
{
492+
_logger.InfoFormat("MultipartDownloadManager: Stopped queuing early at {0} parts due to cancellation", downloadTasks.Count);
493+
}
485494
}
486495

487496
/// <summary>
@@ -491,7 +500,7 @@ private void ValidateDownloadCompletion(int expectedTaskCount, int totalParts)
491500
{
492501
// SEP Part GET Step 6 / Ranged GET Step 8:
493502
// "validate that the total number of part GET requests sent matches with the expected PartsCount"
494-
// Note: This should always be true if we reach this point, since WhenAllOrFirstException
503+
// Note: This should always be true if we reach this point, since WhenAllFailFastAsync
495504
// ensures all tasks completed successfully (or threw on first failure).
496505
// The check serves as a defensive assertion for SEP compliance.
497506
// Note: expectedTaskCount + 1 accounts for Part 1 being buffered during discovery

sdk/src/Services/S3/Custom/Transfer/Internal/TaskHelpers.cs

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,52 @@ namespace Amazon.S3.Transfer.Internal
2727
/// </summary>
2828
internal static class TaskHelpers
2929
{
30+
/// <summary>
31+
/// Waits for all tasks to complete, failing fast on the first exception.
32+
/// When any task faults, its exception is immediately propagated without waiting for other tasks.
33+
/// </summary>
34+
/// <param name="pendingTasks">List of tasks to wait for completion. This list is not modified.</param>
35+
/// <param name="cancellationToken">Cancellation token to observe (not actively checked - caller handles cancellation)</param>
36+
/// <returns>A task that represents the completion of all tasks or throws on first exception</returns>
37+
/// <remarks>
38+
/// This method creates an internal copy of the task list for tracking purposes,
39+
/// so the caller's list remains unchanged after this method completes.
40+
/// The caller is responsible for cancelling remaining tasks when this method throws.
41+
/// </remarks>
42+
internal static async Task WhenAllFailFastAsync(List<Task> pendingTasks, CancellationToken cancellationToken)
43+
{
44+
var remaining = new HashSet<Task>(pendingTasks);
45+
int total = remaining.Count;
46+
int processed = 0;
47+
48+
Logger.GetLogger(typeof(TaskHelpers)).DebugFormat("TaskHelpers.WhenAllFailFastAsync: Starting with TotalTasks={0}", total);
49+
50+
while (remaining.Count > 0)
51+
{
52+
// Wait for any task to complete
53+
var completedTask = await Task.WhenAny(remaining)
54+
.ConfigureAwait(continueOnCapturedContext: false);
55+
56+
// Process the completed task - will throw if faulted
57+
// The caller's catch block handles cancellation AFTER this exception propagates,
58+
// which ensures the original exception is always thrown (not OperationCanceledException)
59+
await completedTask
60+
.ConfigureAwait(continueOnCapturedContext: false);
61+
62+
remaining.Remove(completedTask);
63+
processed++;
64+
65+
Logger.GetLogger(typeof(TaskHelpers)).DebugFormat("TaskHelpers.WhenAllFailFastAsync: Task completed (Processed={0}/{1}, Remaining={2})",
66+
processed, total, remaining.Count);
67+
}
68+
69+
Logger.GetLogger(typeof(TaskHelpers)).DebugFormat("TaskHelpers.WhenAllFailFastAsync: All tasks completed (Total={0})", total);
70+
}
71+
3072
/// <summary>
3173
/// Waits for all tasks to complete or till any task fails or is canceled.
3274
/// </summary>
33-
/// <param name="pendingTasks">List of tasks to wait for completion</param>
75+
/// <param name="pendingTasks">List of tasks to wait for completion. Note: This list is mutated during processing.</param>
3476
/// <param name="cancellationToken">Cancellation token to observe</param>
3577
/// <returns>A task that represents the completion of all tasks or the first exception</returns>
3678
internal static async Task WhenAllOrFirstExceptionAsync(List<Task> pendingTasks, CancellationToken cancellationToken)
@@ -47,8 +89,8 @@ internal static async Task WhenAllOrFirstExceptionAsync(List<Task> pendingTasks,
4789
var completedTask = await Task.WhenAny(pendingTasks)
4890
.ConfigureAwait(continueOnCapturedContext: false);
4991

50-
//If RanToCompletion a response will be returned
51-
//If Faulted or Canceled an appropriate exception will be thrown
92+
// If RanToCompletion a response will be returned
93+
// If Faulted or Canceled an appropriate exception will be thrown
5294
await completedTask
5395
.ConfigureAwait(continueOnCapturedContext: false);
5496

0 commit comments

Comments
 (0)