Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
private const int MAX_CHUNK_SIZE_BYTES = 16 * 1024 * 1024; // 16 MB
private const int MIN_PARALLELISM = 2;
private const int MAX_PARALLELISM = 4;
private const int CLIENT_TIMEOUTS_BEFORE_DOWNSCALE = 2;

private const double MULTIPLIER_2_X = 2.0;
private const double MULTIPLIER_1_75_X = 1.75;
Expand All @@ -36,6 +37,7 @@
private readonly Queue<long> _recentBytes;
private int _successesInWindow;
private int _windowSize;
private int _consecutiveClientTimeouts;
private readonly ILogger<AdaptiveUploadController> _logger;
private readonly object _syncRoot = new();

Expand Down Expand Up @@ -86,11 +88,20 @@
{
lock (_syncRoot)
{
if (IsClientSideFailure(uploadResult.FailureKind))
if (uploadResult.FailureKind == UploadFailureKind.ClientCancellation)
{
Comment thread
paul-fresquet marked this conversation as resolved.
_consecutiveClientTimeouts = 0;
return;
}

if (uploadResult.FailureKind == UploadFailureKind.ClientTimeout)
{
HandleClientTimeout(uploadResult.FileId);
return;
}

_consecutiveClientTimeouts = 0;

EnqueueSample(uploadResult.Elapsed, uploadResult.IsSuccess, uploadResult.ActualBytes);

if (HandleBandwidthReset(uploadResult.IsSuccess, uploadResult.StatusCode))
Expand All @@ -105,13 +116,13 @@

var maxElapsed = GetMaxElapsedInWindow();

_logger.LogDebug(
"Adaptive: file {FileId} maxElapsed={MaxElapsedMs} ms, window={Window}, parallelism={Parallelism}, chunkSize={ChunkKb} KB",
uploadResult.FileId ?? "-",
maxElapsed.TotalMilliseconds,
_windowSize,
_currentParallelism,
Math.Round(_currentChunkSizeBytes / 1024d));

Check warning on line 125 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3Efs2J2TusSyGBZqEO&open=AZ3Efs2J2TusSyGBZqEO&pullRequest=291

if (TryHandleDownscale(maxElapsed, uploadResult.FileId))
{
Expand Down Expand Up @@ -151,9 +162,26 @@
}
}

private static bool IsClientSideFailure(UploadFailureKind failureKind)
private void HandleClientTimeout(string? fileId)
{
return failureKind is UploadFailureKind.ClientCancellation or UploadFailureKind.ClientTimeout;
_consecutiveClientTimeouts += 1;
if (_consecutiveClientTimeouts < CLIENT_TIMEOUTS_BEFORE_DOWNSCALE)
{
_logger.LogDebug(
"Adaptive: file {FileId} client timeout {TimeoutCount}/{Threshold}. Waiting before downscale",
fileId ?? "-",
_consecutiveClientTimeouts,
CLIENT_TIMEOUTS_BEFORE_DOWNSCALE);

Check warning on line 174 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3Efs2J2TusSyGBZqEP&open=AZ3Efs2J2TusSyGBZqEP&pullRequest=291

return;
}

_logger.LogInformation(
"Adaptive: file {FileId} client timeout threshold reached ({TimeoutCount}). Downscaling upload settings",
fileId ?? "-",
_consecutiveClientTimeouts);

Check warning on line 182 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3Efs2J2TusSyGBZqEQ&open=AZ3Efs2J2TusSyGBZqEQ&pullRequest=291
_consecutiveClientTimeouts = 0;
Downscale(fileId, "client timeouts");
}

private bool HandleBandwidthReset(bool isSuccess, int? statusCode)
Expand Down Expand Up @@ -192,40 +220,45 @@
{
if (maxElapsed > _downscaleThreshold)
{
if (_currentParallelism > MIN_PARALLELISM)
{
_logger.LogInformation(
"Adaptive: file {FileId} Downscale. Reducing parallelism {Prev} -> {Next}. Resetting window (window before {WindowBefore})",
fileId ?? "-",
_currentParallelism, _currentParallelism - 1,
_windowSize);
_currentParallelism -= 1;
_windowSize = _currentParallelism;
ResetWindow();

return true;
}

var reduced = (int)Math.Max(MIN_CHUNK_SIZE_BYTES, _currentChunkSizeBytes * 0.75);
if (reduced != _currentChunkSizeBytes)
{
_currentChunkSizeBytes = reduced;
_logger.LogInformation(
"Adaptive: file {FileId} Downscale. maxElapsed={MaxElapsedMs} ms > {ThresholdMs} ms. New chunkSize={ChunkKb} KB",
fileId ?? "-",
maxElapsed.TotalMilliseconds,
_downscaleThreshold.TotalMilliseconds,
Math.Round(_currentChunkSizeBytes / 1024d));
}

ResetWindow();

Downscale(fileId, $"maxElapsed={maxElapsed.TotalMilliseconds} ms > {_downscaleThreshold.TotalMilliseconds} ms");
return true;
}

return false;
}

private void Downscale(string? fileId, string reason)
{
if (_currentParallelism > MIN_PARALLELISM)
{
_logger.LogInformation(
"Adaptive: file {FileId} Downscale ({Reason}). Reducing parallelism {Prev} -> {Next}. Resetting window (window before {WindowBefore})",
fileId ?? "-",
reason,
_currentParallelism,
_currentParallelism - 1,
_windowSize);

Check warning on line 240 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3Efs2J2TusSyGBZqER&open=AZ3Efs2J2TusSyGBZqER&pullRequest=291
_currentParallelism -= 1;
_windowSize = _currentParallelism;
ResetWindow();

return;
}

var reduced = (int)Math.Max(MIN_CHUNK_SIZE_BYTES, _currentChunkSizeBytes * 0.75);
if (reduced != _currentChunkSizeBytes)
{
_currentChunkSizeBytes = reduced;
_logger.LogInformation(
"Adaptive: file {FileId} Downscale ({Reason}). New chunkSize={ChunkKb} KB",
fileId ?? "-",
reason,
Math.Round(_currentChunkSizeBytes / 1024d));

Check warning on line 256 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3Efs2J2TusSyGBZqES&open=AZ3Efs2J2TusSyGBZqES&pullRequest=291
}

ResetWindow();
}

private void TryHandleUpscale(string? fileId)
{
var recentDurations = _recentDurations.ToArray();
Expand Down Expand Up @@ -273,12 +306,12 @@
var increased = (int)Math.Round(_currentChunkSizeBytes * multiplier);
_currentChunkSizeBytes = Math.Clamp(increased, MIN_CHUNK_SIZE_BYTES, MAX_CHUNK_SIZE_BYTES);

_logger.LogInformation(
"Adaptive: file {FileId} Upscale. maxElapsed={MaxElapsedMs} ms <= {ThresholdMs} ms. New chunkSize={ChunkKb} KB",
fileId ?? "-",
maxElapsedEligible.TotalMilliseconds,
_upscaleThreshold.TotalMilliseconds,
Math.Round(_currentChunkSizeBytes / 1024d));

Check warning on line 314 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3Efs2J2TusSyGBZqET&open=AZ3Efs2J2TusSyGBZqET&pullRequest=291

UpdateParallelismOnUpscale(fileId);
_currentParallelism = Math.Min(_currentParallelism, MAX_PARALLELISM);
Expand Down Expand Up @@ -316,8 +349,8 @@
_currentParallelism = Math.Max(_currentParallelism, 4);
if (_currentParallelism != prev)
{
_logger.LogInformation("Adaptive: file {FileId} Upscale. Increasing parallelism {Prev} -> {Next} due to chunk>=8MB",
fileId ?? "-", prev, _currentParallelism);

Check warning on line 353 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3Efs2J2TusSyGBZqEU&open=AZ3Efs2J2TusSyGBZqEU&pullRequest=291
}
}
else if (_currentChunkSizeBytes >= FOUR_MB)
Expand All @@ -326,8 +359,8 @@
_currentParallelism = Math.Max(_currentParallelism, 3);
if (_currentParallelism != prev)
{
_logger.LogInformation("Adaptive: file {FileId} Upscale. Increasing parallelism {Prev} -> {Next} due to chunk>=4MB",
fileId ?? "-", prev, _currentParallelism);

Check warning on line 363 in src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Evaluation of this argument may be expensive and unnecessary if logging is disabled

See more on https://sonarcloud.io/project/issues?id=POW-Software_ByteSync&issues=AZ3Efs2J2TusSyGBZqEV&open=AZ3Efs2J2TusSyGBZqEV&pullRequest=291
}
}
}
Expand Down Expand Up @@ -362,6 +395,7 @@
_currentChunkSizeBytes = Math.Clamp(INITIAL_CHUNK_SIZE_BYTES, MIN_CHUNK_SIZE_BYTES, MAX_CHUNK_SIZE_BYTES);
_currentParallelism = MIN_PARALLELISM;
_windowSize = _currentParallelism;
_consecutiveClientTimeouts = 0;
}

ResetWindow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void Downscale_ReducesParallelism_WhenAboveMin()
FeedFastWindow(_controller);
}

_controller.CurrentChunkSizeBytes.Should().BeGreaterThanOrEqualTo(4 * 1024 * 1024);
_controller.CurrentParallelism.Should().BeGreaterThan(2);
var beforeParallelism = _controller.CurrentParallelism;
var beforeChunk = _controller.CurrentChunkSizeBytes;
Expand Down Expand Up @@ -184,40 +185,58 @@ public void ClientTimeout_DoesNotResetChunkSize()
}

[Test]
public void ClientTimeout_DoesNotEnterAdaptiveWindow_AndDoesNotResetChunkSize()
public void ClientTimeouts_DownscaleBelowInitialChunkSize_WhenAtMinParallelism()
{
// Arrange
_controller.CurrentParallelism.Should().Be(2);
_controller.CurrentChunkSizeBytes.Should().Be(500 * 1024);

// Act - Two consecutive client-side timeouts trigger controlled downscale
FeedClientTimeouts(_controller, 2);

// Assert
_controller.CurrentParallelism.Should().Be(2);
_controller.CurrentChunkSizeBytes.Should().BeLessThan(500 * 1024);
_controller.CurrentChunkSizeBytes.Should().Be(375 * 1024);
}

[Test]
public void ClientTimeouts_ReduceParallelismFirst_WhenAboveMinParallelism()
{
// Arrange - Inflate chunk and make sure parallelism is just at min (=2)
var safety = 10;
while (_controller.CurrentChunkSizeBytes < 1024 * 1024 && safety-- > 0)
// Arrange
var safety = 100;
while (_controller.CurrentChunkSizeBytes < 4 * 1024 * 1024 && safety-- > 0)
{
FeedFastWindow(_controller);
}

Comment thread
paul-fresquet marked this conversation as resolved.
_controller.CurrentParallelism.Should().BeGreaterThan(2);
var beforeParallelism = _controller.CurrentParallelism;
var beforeChunk = _controller.CurrentChunkSizeBytes;

// Act
FeedClientTimeouts(_controller, 2);

// Assert
_controller.CurrentParallelism.Should().Be(beforeParallelism - 1);
_controller.CurrentChunkSizeBytes.Should().Be(beforeChunk);
}

[Test]
public void ClientTimeouts_DoNotReduceChunkSizeBelowMinimum()
{
// Arrange
_controller.CurrentParallelism.Should().Be(2);
var inflatedChunk = _controller.CurrentChunkSizeBytes;

// Act - Feed a window of slow client-timeout failures (with the new failure kind)
var p = _controller.CurrentParallelism;
for (var i = 0; i < p; i++)
// Act
for (var i = 0; i < 20; i++)
{
RecordUploadResult(
_controller,
TimeSpan.FromSeconds(60),
isSuccess: false,
partNumber: i + 1,
statusCode: 0,
failureKind: UploadFailureKind.ClientTimeout);
FeedClientTimeouts(_controller, 2);
}

RecordUploadResult(
_controller,
TimeSpan.FromSeconds(1),
isSuccess: true,
partNumber: 100);

// Assert - the timeout samples were ignored and cannot trigger a later downscale
_controller.CurrentChunkSizeBytes.Should().NotBe(500 * 1024);
_controller.CurrentChunkSizeBytes.Should().Be(inflatedChunk,
because: "client-side cancellations are not bandwidth signals and must not influence chunk sizing");
// Assert
_controller.CurrentChunkSizeBytes.Should().Be(64 * 1024);
_controller.CurrentParallelism.Should().Be(2);
}

[Test]
Expand Down Expand Up @@ -248,6 +267,20 @@ private static void FeedWindow(AdaptiveUploadController controller, TimeSpan ela
}
}

private static void FeedClientTimeouts(AdaptiveUploadController controller, int count)
{
for (var i = 0; i < count; i++)
{
RecordUploadResult(
controller,
TimeSpan.FromSeconds(60),
isSuccess: false,
partNumber: i + 1,
statusCode: 0,
failureKind: UploadFailureKind.ClientTimeout);
}
}

private static void RecordUploadResult(
AdaptiveUploadController controller,
TimeSpan elapsed,
Expand Down
Loading