Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ internal sealed class CloudFetchDownloader : ICloudFetchDownloader
/// <param name="httpClient">The HTTP client to use for downloads.</param>
/// <param name="maxParallelDownloads">The maximum number of parallel downloads.</param>
/// <param name="isLz4Compressed">Whether the results are LZ4 compressed.</param>
/// <param name="logger">The logger instance.</param>
/// <param name="maxRetries">The maximum number of retry attempts.</param>
/// <param name="retryDelayMs">The delay between retry attempts in milliseconds.</param>
public CloudFetchDownloader(
Expand Down Expand Up @@ -184,6 +185,12 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken)
{
await Task.Yield();

int totalFiles = 0;
int successfulDownloads = 0;
int failedDownloads = 0;
long totalBytes = 0;
var overallStopwatch = Stopwatch.StartNew();

try
{
// Keep track of active download tasks
Expand All @@ -193,6 +200,8 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken)
// Process items from the download queue until it's completed
foreach (var downloadResult in _downloadQueue.GetConsumingEnumerable(cancellationToken))
{
totalFiles++;

// Check if there's an error before processing more downloads
if (HasError)
{
Expand All @@ -213,7 +222,7 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken)
}
catch (Exception ex)
{
Debug.WriteLine($"Error waiting for downloads to complete: {ex.Message}");
Trace.TraceWarning($"Error waiting for downloads to complete: {ex.Message}");
// Don't set error here, as individual download tasks will handle their own errors
}
}
Expand Down Expand Up @@ -245,17 +254,23 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken)
if (t.IsFaulted)
{
Exception ex = t.Exception?.InnerException ?? new Exception("Unknown error");
Debug.WriteLine($"Download failed: {ex.Message}");
Trace.TraceError($"Download failed for file {SanitizeUrl(downloadResult.Link.FileLink)}: {ex.Message}");

// Set the download as failed
downloadResult.SetFailed(ex);
failedDownloads++;

// Set the error state to stop the download process
SetError(ex);

// Signal that we should stop processing downloads
downloadTaskCompletionSource.TrySetException(ex);
}
else if (!t.IsFaulted && !t.IsCanceled)
{
successfulDownloads++;
totalBytes += downloadResult.Size;
}
}, cancellationToken);

// Add the task to the dictionary
Expand All @@ -274,14 +289,21 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Expected when cancellation is requested
Trace.TraceInformation("Download process was cancelled");
}
catch (Exception ex)
{
Debug.WriteLine($"Error in download loop: {ex.Message}");
Trace.TraceError($"Error in download loop: {ex.Message}");
SetError(ex);
}
finally
{
overallStopwatch.Stop();

Trace.TraceInformation(
Comment thread
jadewang-db marked this conversation as resolved.
$"Download process completed. Total files: {totalFiles}, Successful: {successfulDownloads}, " +
$"Failed: {failedDownloads}, Total size: {totalBytes / 1024.0 / 1024.0:F2} MB, Total time: {overallStopwatch.ElapsedMilliseconds / 1000.0:F2} sec");

// If there's an error, add the error to the result queue
if (HasError)
{
Expand All @@ -293,11 +315,18 @@ private async Task DownloadFilesAsync(CancellationToken cancellationToken)
private async Task DownloadFileAsync(IDownloadResult downloadResult, CancellationToken cancellationToken)
{
string url = downloadResult.Link.FileLink;
string sanitizedUrl = SanitizeUrl(downloadResult.Link.FileLink);
byte[]? fileData = null;

// Use the size directly from the download result
long size = downloadResult.Size;

// Create a stopwatch to track download time
var stopwatch = Stopwatch.StartNew();

// Log download start
Trace.TraceInformation($"Starting download of file {sanitizedUrl}, expected size: {size / 1024.0:F2} KB");

// Acquire memory before downloading
await _memoryManager.AcquireMemoryAsync(size, cancellationToken).ConfigureAwait(false);

Expand All @@ -318,7 +347,7 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio
long? contentLength = response.Content.Headers.ContentLength;
if (contentLength.HasValue && contentLength.Value > 0)
{
Debug.WriteLine($"Downloading file of size: {contentLength.Value / 1024.0 / 1024.0:F2} MB");
Trace.TraceInformation($"Actual file size for {sanitizedUrl}: {contentLength.Value / 1024.0 / 1024.0:F2} MB");
}

// Read the file data
Expand All @@ -328,36 +357,50 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio
catch (Exception ex) when (retry < _maxRetries - 1 && !cancellationToken.IsCancellationRequested)
{
// Log the error and retry
Debug.WriteLine($"Error downloading file (attempt {retry + 1}/{_maxRetries}): {ex.Message}");
Trace.TraceError($"Error downloading file {SanitizeUrl(url)} (attempt {retry + 1}/{_maxRetries}): {ex.Message}");

await Task.Delay(_retryDelayMs * (retry + 1), cancellationToken).ConfigureAwait(false);
}
}

if (fileData == null)
{
stopwatch.Stop();
Trace.TraceError($"Failed to download file {sanitizedUrl} after {_maxRetries} attempts. Elapsed time: {stopwatch.ElapsedMilliseconds} ms");

// Release the memory we acquired
_memoryManager.ReleaseMemory(size);
throw new InvalidOperationException($"Failed to download file from {url} after {_maxRetries} attempts.");
}

// Process the downloaded file data
MemoryStream dataStream;
long actualSize = fileData.Length;

// If the data is LZ4 compressed, decompress it
if (_isLz4Compressed)
{
try
{
var decompressStopwatch = Stopwatch.StartNew();
dataStream = new MemoryStream();
using (var inputStream = new MemoryStream(fileData))
using (var decompressor = LZ4Stream.Decode(inputStream))
{
await decompressor.CopyToAsync(dataStream, 81920, cancellationToken).ConfigureAwait(false);
}
dataStream.Position = 0;
decompressStopwatch.Stop();

Trace.TraceInformation($"Decompressed file {sanitizedUrl} in {decompressStopwatch.ElapsedMilliseconds} ms. Compressed size: {actualSize / 1024.0:F2} KB, Decompressed size: {dataStream.Length / 1024.0:F2} KB");

actualSize = dataStream.Length;
}
catch (Exception ex)
{
stopwatch.Stop();
Trace.TraceError($"Error decompressing data for file {sanitizedUrl}: {ex.Message}. Elapsed time: {stopwatch.ElapsedMilliseconds} ms");

// Release the memory we acquired
_memoryManager.ReleaseMemory(size);
throw new InvalidOperationException($"Error decompressing data: {ex.Message}", ex);
Expand All @@ -368,6 +411,10 @@ private async Task DownloadFileAsync(IDownloadResult downloadResult, Cancellatio
dataStream = new MemoryStream(fileData);
}

// Stop the stopwatch and log download completion
stopwatch.Stop();
Trace.TraceInformation($"Completed download of file {sanitizedUrl}. Size: {actualSize / 1024.0:F2} KB, Latency: {stopwatch.ElapsedMilliseconds} ms, Throughput: {(actualSize / 1024.0 / 1024.0) / (stopwatch.ElapsedMilliseconds / 1000.0):F2} MB/s");

// Set the download as completed with the original size
downloadResult.SetCompleted(dataStream, size);
}
Expand All @@ -378,6 +425,7 @@ private void SetError(Exception ex)
{
if (_error == null)
{
Trace.TraceError($"Setting error state: {ex.Message}");
_error = ex;
}
}
Expand All @@ -395,7 +443,22 @@ private void CompleteWithError()
}
catch (Exception ex)
{
Debug.WriteLine($"Error completing with error: {ex.Message}");
Trace.TraceError($"Error completing with error: {ex.Message}");
}
}

// Helper method to sanitize URLs for logging (to avoid exposing sensitive information)
private string SanitizeUrl(string url)
{
try
{
var uri = new Uri(url);
return $"{uri.Scheme}://{uri.Host}/{Path.GetFileName(uri.LocalPath)}";
}
catch
{
// If URL parsing fails, return a generic identifier
return "cloud-storage-url";
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ internal sealed class CloudFetchReader : IArrowArrayStream
{
private readonly Schema schema;
private readonly bool isLz4Compressed;
private readonly ICloudFetchDownloadManager downloadManager;
private ICloudFetchDownloadManager? downloadManager;
private ArrowStreamReader? currentReader;
private IDownloadResult? currentDownloadResult;
private bool isPrefetchEnabled;
Expand Down Expand Up @@ -136,6 +136,8 @@ public CloudFetchReader(DatabricksStatement statement, Schema schema, bool isLz4
this.currentDownloadResult = await this.downloadManager.GetNextDownloadedFileAsync(cancellationToken);
if (this.currentDownloadResult == null)
{
this.downloadManager.Dispose();
this.downloadManager = null;
// No more files
return null;
}
Expand Down
2 changes: 2 additions & 0 deletions csharp/test/Drivers/Databricks/CloudFetchE2ETest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ private async Task TestRealDatabricksCloudFetchLargeQuery(string query, int rowC

Assert.True(totalRows >= rowCount);

Assert.Null(await result.Stream.ReadNextRecordBatchAsync());

// Also log to the test output helper if available
OutputHelper?.WriteLine($"Read {totalRows} rows from range function");
}
Expand Down
Loading