Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ internal interface IDownloadManager : IDisposable
/// with the previous two-method API.
/// </remarks>
Task<DownloadResult> StartDownloadAsync(EventHandler<WriteObjectProgressArgs> progressCallback, CancellationToken cancellationToken);

/// <summary>
/// Exception that occurred during downloads, if any.
/// </summary>
Exception DownloadException { get; }
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ internal class MultipartDownloadManager : IDownloadManager
private readonly SemaphoreSlim _httpConcurrencySlots;
private readonly bool _ownsHttpThrottler;
private readonly RequestEventHandler _requestEventHandler;

private Exception _downloadException;
private bool _disposed = false;
private bool _discoveryCompleted = false;
private Task _downloadCompletionTask;
Expand Down Expand Up @@ -166,15 +164,6 @@ public MultipartDownloadManager(IAmazonS3 s3Client, BaseDownloadRequest request,
}
}

/// <inheritdoc/>
public Exception DownloadException
{
get
{
return _downloadException;
}
}

/// <summary>
/// Discovers the download strategy and starts concurrent downloads in a single unified operation.
/// This eliminates resource leakage by managing HTTP slots and buffer capacity internally.
Expand Down Expand Up @@ -259,7 +248,6 @@ private async Task<DownloadResult> PerformDiscoveryAsync(CancellationToken cance
}
catch (Exception ex)
{
_downloadException = ex;
_logger.Error(ex, "MultipartDownloadManager: Discovery failed");
throw;
}
Expand Down Expand Up @@ -336,7 +324,6 @@ private async Task PerformDownloadsAsync(DownloadResult downloadResult, EventHan
}
catch (Exception ex)
{
_downloadException = ex;
_logger.Error(ex, "MultipartDownloadManager: Download failed");

HandleDownloadError(ex, internalCts);
Expand Down Expand Up @@ -414,7 +401,7 @@ private async Task StartBackgroundDownloadsAsync(DownloadResult downloadResult,
_logger.DebugFormat("MultipartDownloadManager: Background task waiting for {0} download tasks", expectedTaskCount);

// Wait for all downloads to complete (fails fast on first exception)
await TaskHelpers.WhenAllOrFirstExceptionAsync(downloadTasks, internalCts.Token).ConfigureAwait(false);
await TaskHelpers.WhenAllFailFastAsync(downloadTasks, internalCts.Token).ConfigureAwait(false);

_logger.DebugFormat("MultipartDownloadManager: All download tasks completed successfully");

Expand All @@ -429,7 +416,6 @@ private async Task StartBackgroundDownloadsAsync(DownloadResult downloadResult,
#pragma warning disable CA1031 // Do not catch general exception types
catch (Exception ex)
{
_downloadException = ex;
HandleDownloadError(ex, internalCts);
throw;
}
Expand All @@ -451,13 +437,21 @@ private async Task CreateDownloadTasksAsync(DownloadResult downloadResult, Event
// Pre-acquire capacity in sequential order to prevent race condition deadlock
// This ensures Part 2 gets capacity before Part 3, etc., preventing out-of-order
// parts from consuming all buffer slots and blocking the next expected part
for (int partNum = 2; partNum <= downloadResult.TotalParts; partNum++)
for (int partNum = 2; partNum <= downloadResult.TotalParts && !internalCts.IsCancellationRequested; partNum++)
{
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for buffer space", partNum);

// Acquire capacity sequentially - guarantees Part 2 before Part 3, etc.
await _dataHandler.WaitForCapacityAsync(internalCts.Token).ConfigureAwait(false);

// Check cancellation after acquiring capacity - a task may have failed while waiting
if (internalCts.IsCancellationRequested)
{
_logger.InfoFormat("MultipartDownloadManager: [Part {0}] Stopping early - cancellation requested after capacity acquired", partNum);
_dataHandler.ReleaseCapacity();
break;
}

_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Buffer space acquired", partNum);

_logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for HTTP concurrency slot (Available: {1}/{2})",
Expand All @@ -466,6 +460,15 @@ private async Task CreateDownloadTasksAsync(DownloadResult downloadResult, Event
// Acquire HTTP slot in the loop before creating task
// Loop will block here if all slots are in use
await _httpConcurrencySlots.WaitAsync(internalCts.Token).ConfigureAwait(false);

// Check cancellation after acquiring HTTP slot - a task may have failed while waiting
if (internalCts.IsCancellationRequested)
{
_logger.InfoFormat("MultipartDownloadManager: [Part {0}] Stopping early - cancellation requested after HTTP slot acquired", partNum);
_httpConcurrencySlots.Release();
_dataHandler.ReleaseCapacity();
break;
}

_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot acquired", partNum);

Expand All @@ -478,10 +481,16 @@ private async Task CreateDownloadTasksAsync(DownloadResult downloadResult, Event
{
// If task creation fails, release the HTTP slot we just acquired
_httpConcurrencySlots.Release();
_dataHandler.ReleaseCapacity();
_logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot released due to task creation failure: {1}", partNum, ex);
throw;
}
}

if (internalCts.IsCancellationRequested && downloadTasks.Count < downloadResult.TotalParts - 1)
{
_logger.InfoFormat("MultipartDownloadManager: Stopped queuing early at {0} parts due to cancellation", downloadTasks.Count);
}
}

/// <summary>
Expand All @@ -491,7 +500,7 @@ private void ValidateDownloadCompletion(int expectedTaskCount, int totalParts)
{
// SEP Part GET Step 6 / Ranged GET Step 8:
// "validate that the total number of part GET requests sent matches with the expected PartsCount"
// Note: This should always be true if we reach this point, since WhenAllOrFirstException
// Note: This should always be true if we reach this point, since WhenAllFailFastAsync
// ensures all tasks completed successfully (or threw on first failure).
// The check serves as a defensive assertion for SEP compliance.
// Note: expectedTaskCount + 1 accounts for Part 1 being buffered during discovery
Expand Down
48 changes: 45 additions & 3 deletions sdk/src/Services/S3/Custom/Transfer/Internal/TaskHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,52 @@ namespace Amazon.S3.Transfer.Internal
/// </summary>
internal static class TaskHelpers
{
/// <summary>
/// Waits for all tasks to complete, failing fast on the first exception.
/// When any task faults, its exception is immediately propagated without waiting for other tasks.
/// </summary>
/// <param name="pendingTasks">List of tasks to wait for completion. This list is not modified.</param>
/// <param name="cancellationToken">Cancellation token to observe (not actively checked - caller handles cancellation)</param>
/// <returns>A task that represents the completion of all tasks or throws on first exception</returns>
/// <remarks>
/// This method creates an internal copy of the task list for tracking purposes,
/// so the caller's list remains unchanged after this method completes.
/// The caller is responsible for cancelling remaining tasks when this method throws.
/// </remarks>
internal static async Task WhenAllFailFastAsync(List<Task> pendingTasks, CancellationToken cancellationToken)
{
var remaining = new HashSet<Task>(pendingTasks);
int total = remaining.Count;
int processed = 0;

Logger.GetLogger(typeof(TaskHelpers)).DebugFormat("TaskHelpers.WhenAllFailFastAsync: Starting with TotalTasks={0}", total);

while (remaining.Count > 0)
{
// Wait for any task to complete
var completedTask = await Task.WhenAny(remaining)
.ConfigureAwait(continueOnCapturedContext: false);

// Process the completed task - will throw if faulted
// The caller's catch block handles cancellation AFTER this exception propagates,
// which ensures the original exception is always thrown (not OperationCanceledException)
await completedTask
.ConfigureAwait(continueOnCapturedContext: false);

remaining.Remove(completedTask);
processed++;

Logger.GetLogger(typeof(TaskHelpers)).DebugFormat("TaskHelpers.WhenAllFailFastAsync: Task completed (Processed={0}/{1}, Remaining={2})",
processed, total, remaining.Count);
}

Logger.GetLogger(typeof(TaskHelpers)).DebugFormat("TaskHelpers.WhenAllFailFastAsync: All tasks completed (Total={0})", total);
}

/// <summary>
/// Waits for all tasks to complete or till any task fails or is canceled.
/// </summary>
/// <param name="pendingTasks">List of tasks to wait for completion</param>
/// <param name="pendingTasks">List of tasks to wait for completion. Note: This list is mutated during processing.</param>
/// <param name="cancellationToken">Cancellation token to observe</param>
/// <returns>A task that represents the completion of all tasks or the first exception</returns>
internal static async Task WhenAllOrFirstExceptionAsync(List<Task> pendingTasks, CancellationToken cancellationToken)
Expand All @@ -47,8 +89,8 @@ internal static async Task WhenAllOrFirstExceptionAsync(List<Task> pendingTasks,
var completedTask = await Task.WhenAny(pendingTasks)
.ConfigureAwait(continueOnCapturedContext: false);

//If RanToCompletion a response will be returned
//If Faulted or Canceled an appropriate exception will be thrown
// If RanToCompletion a response will be returned
// If Faulted or Canceled an appropriate exception will be thrown
await completedTask
.ConfigureAwait(continueOnCapturedContext: false);

Expand Down
Loading