Skip to content

Commit 397a1d1

Browse files
[fix] Improve stale upload slice resilience (#293)
1 parent 8f1afaa commit 397a1d1

7 files changed

Lines changed: 196 additions & 32 deletions

File tree

src/ByteSync.Client/Services/Communications/Transfers/Strategies/UploadFailureClassifier.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ namespace ByteSync.Services.Communications.Transfers.Strategies;
99

1010
public static class UploadFailureClassifier
1111
{
12+
private static readonly string[] UnexpectedTransportClosureMessageFragments =
13+
{
14+
"unexpected EOF",
15+
"0 bytes from the transport stream",
16+
};
17+
1218
public static UploadFileResponse Classify(Exception exception, CancellationToken cancellationToken)
1319
{
1420
if (exception is OperationCanceledException && cancellationToken.IsCancellationRequested)
@@ -21,6 +27,11 @@ public static UploadFileResponse Classify(Exception exception, CancellationToken
2127
return UploadFileResponse.ClientTimeout(exception);
2228
}
2329

30+
if (cancellationToken.IsCancellationRequested)
31+
{
32+
return UploadFileResponse.ClientCancellation(exception);
33+
}
34+
2435
if (IsClientNetworkError(exception))
2536
{
2637
return UploadFileResponse.ClientNetworkError(exception);
@@ -43,16 +54,35 @@ private static bool IsClientNetworkError(Exception exception)
4354
{
4455
return socketException.SocketErrorCode is SocketError.ConnectionReset
4556
or SocketError.ConnectionAborted
57+
or SocketError.OperationAborted
4658
or SocketError.TimedOut
4759
or SocketError.NetworkDown
4860
or SocketError.NetworkUnreachable
4961
or SocketError.HostDown
5062
or SocketError.HostUnreachable;
5163
}
5264

65+
if (current is IOException ioException && HasUnexpectedTransportClosureMessage(ioException))
66+
{
67+
return true;
68+
}
69+
5370
current = current.InnerException;
5471
}
5572

5673
return false;
5774
}
75+
76+
private static bool HasUnexpectedTransportClosureMessage(IOException exception)
77+
{
78+
foreach (var fragment in UnexpectedTransportClosureMessageFragments)
79+
{
80+
if (exception.Message.Contains(fragment, StringComparison.OrdinalIgnoreCase))
81+
{
82+
return true;
83+
}
84+
}
85+
86+
return false;
87+
}
5888
}

src/ByteSync.Client/Services/Communications/Transfers/Uploading/AdaptiveUploadController.cs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public class AdaptiveUploadController : IAdaptiveUploadController
1515
private const int MIN_PARALLELISM = 2;
1616
private const int MAX_PARALLELISM = 4;
1717
private const int CLIENT_NETWORK_ISSUES_BEFORE_DOWNSCALE = 2;
18+
private const double CLIENT_NETWORK_ISSUE_CHUNK_FACTOR = 0.5;
1819

1920
private const double MULTIPLIER_2_X = 2.0;
2021
private const double MULTIPLIER_1_75_X = 1.75;
@@ -182,7 +183,7 @@ private void HandleClientNetworkIssue(string? fileId, UploadFailureKind failureK
182183
fileId ?? "-",
183184
_consecutiveClientNetworkIssues);
184185
_consecutiveClientNetworkIssues = 0;
185-
Downscale(fileId, "client network issues");
186+
DownscaleForClientNetworkIssue(fileId, failureKind);
186187
}
187188

188189
private bool HandleBandwidthReset(bool isSuccess, int? statusCode)
@@ -259,6 +260,29 @@ private void Downscale(string? fileId, string reason)
259260

260261
ResetWindow();
261262
}
263+
264+
private void DownscaleForClientNetworkIssue(string? fileId, UploadFailureKind failureKind)
265+
{
266+
var previousParallelism = _currentParallelism;
267+
var previousChunkSizeBytes = _currentChunkSizeBytes;
268+
269+
_currentParallelism = MIN_PARALLELISM;
270+
_currentChunkSizeBytes = Math.Max(
271+
MIN_CHUNK_SIZE_BYTES,
272+
(int)Math.Round(_currentChunkSizeBytes * CLIENT_NETWORK_ISSUE_CHUNK_FACTOR));
273+
_windowSize = _currentParallelism;
274+
275+
_logger.LogInformation(
276+
"Adaptive: file {FileId} client network issue downscale ({FailureKind}). Parallelism {PrevParallelism}->{NextParallelism}, chunkSize {PrevChunkKb}->{NextChunkKb} KB",
277+
fileId ?? "-",
278+
failureKind,
279+
previousParallelism,
280+
_currentParallelism,
281+
Math.Round(previousChunkSizeBytes / 1024d),
282+
Math.Round(_currentChunkSizeBytes / 1024d));
283+
284+
ResetWindow();
285+
}
262286

263287
private void TryHandleUpscale(string? fileId)
264288
{

src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileUploadWorker.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,18 +174,22 @@ private async Task<UploadFileResponse> ExecuteUploadAttemptAsync(FileUploaderSli
174174
slice.MemoryStream.Length,
175175
attempt,
176176
currentChunkSizeBytes);
177+
var chunkRatio = currentChunkSizeBytes > 0
178+
? slice.MemoryStream.Length / (double)currentChunkSizeBytes
179+
: 0d;
177180
using var attemptCts = CancellationTokenSource.CreateLinkedTokenSource(globalToken);
178181
attemptCts.CancelAfter(TimeSpan.FromSeconds(timeoutSec));
179182

180183
var beforeWait = _uploadSlots.CurrentCount;
181184
_logger.LogDebug(
182-
"UploadAvailableSlice: worker {WorkerId} waiting for upload slot (available {Available}), attempt {Attempt}, timeout {TimeoutSec}s, slice {SliceKb} KB, currentChunk {CurrentChunkKb} KB",
185+
"UploadAvailableSlice: worker {WorkerId} waiting for upload slot (available {Available}), attempt {Attempt}, timeout {TimeoutSec}s, slice {SliceKb} KB, currentChunk {CurrentChunkKb} KB, chunkRatio {ChunkRatio}",
183186
workerId,
184187
beforeWait,
185188
attempt,
186189
timeoutSec,
187190
Math.Round(slice.MemoryStream.Length / 1024d),
188-
Math.Round(currentChunkSizeBytes / 1024d));
191+
Math.Round(currentChunkSizeBytes / 1024d),
192+
Math.Round(chunkRatio, 2));
189193

190194
var acquired = false;
191195
try

src/ByteSync.Client/Services/Communications/Transfers/Uploading/UploadAttemptTimeoutPolicy.cs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,26 @@ public static class UploadAttemptTimeoutPolicy
66
{
77
private const int AttemptTimeoutFloorSeconds = 60;
88
private const int AttemptTimeoutCeilingSeconds = 180;
9+
private const int ExtendedOversizedSliceTimeoutCeilingSeconds = 300;
910
private const int SecondsPerMegabyteHeuristic = 3;
1011
private const int RetryGrowthSeconds = 15;
11-
private const int StaleChunkPenaltySeconds = 5;
12+
private const int OversizedSliceSecondsPerCurrentChunk = 30;
13+
private const int ExtendedOversizedSliceThresholdBytes = 1024 * 1024;
14+
private const int ExtendedOversizedSliceChunkRatioThreshold = 8;
1215

1316
public static int ComputeTimeoutSeconds(long sliceLengthBytes, int attempt, int currentChunkSizeBytes)
1417
{
15-
var timeoutSec = (long)ComputeBaseTimeoutSeconds(sliceLengthBytes);
16-
if (attempt <= 1)
18+
var timeoutCeilingSeconds = ComputeTimeoutCeilingSeconds(sliceLengthBytes, currentChunkSizeBytes);
19+
var timeoutSec = Math.Max(
20+
(long)ComputeBaseTimeoutSeconds(sliceLengthBytes),
21+
ComputeOversizedSliceTimeoutSeconds(sliceLengthBytes, currentChunkSizeBytes, timeoutCeilingSeconds));
22+
23+
if (attempt > 1)
1724
{
18-
return (int)timeoutSec;
25+
timeoutSec += (long)(attempt - 1) * RetryGrowthSeconds;
1926
}
2027

21-
var staleChunkPenalty = ComputeStaleChunkPenaltySeconds(sliceLengthBytes, currentChunkSizeBytes);
22-
timeoutSec += (long)(attempt - 1) * RetryGrowthSeconds + staleChunkPenalty;
23-
24-
return (int)Math.Clamp(timeoutSec, AttemptTimeoutFloorSeconds, AttemptTimeoutCeilingSeconds);
28+
return (int)Math.Clamp(timeoutSec, AttemptTimeoutFloorSeconds, timeoutCeilingSeconds);
2529
}
2630

2731
private static int ComputeBaseTimeoutSeconds(long sliceLengthBytes)
@@ -39,19 +43,36 @@ private static int ComputeBaseTimeoutSeconds(long sliceLengthBytes)
3943
AttemptTimeoutCeilingSeconds);
4044
}
4145

42-
private static long ComputeStaleChunkPenaltySeconds(long sliceLengthBytes, int currentChunkSizeBytes)
46+
private static int ComputeTimeoutCeilingSeconds(long sliceLengthBytes, int currentChunkSizeBytes)
47+
{
48+
if (currentChunkSizeBytes <= 0 || sliceLengthBytes <= currentChunkSizeBytes)
49+
{
50+
return AttemptTimeoutCeilingSeconds;
51+
}
52+
53+
var chunkRatio = Math.Ceiling(sliceLengthBytes / (double)currentChunkSizeBytes);
54+
if (sliceLengthBytes >= ExtendedOversizedSliceThresholdBytes
55+
&& chunkRatio >= ExtendedOversizedSliceChunkRatioThreshold)
56+
{
57+
return ExtendedOversizedSliceTimeoutCeilingSeconds;
58+
}
59+
60+
return AttemptTimeoutCeilingSeconds;
61+
}
62+
63+
private static long ComputeOversizedSliceTimeoutSeconds(long sliceLengthBytes, int currentChunkSizeBytes, int timeoutCeilingSeconds)
4364
{
4465
if (currentChunkSizeBytes <= 0 || sliceLengthBytes <= currentChunkSizeBytes)
4566
{
4667
return 0;
4768
}
4869

4970
var chunkRatio = Math.Ceiling(sliceLengthBytes / (double)currentChunkSizeBytes);
50-
if (chunkRatio >= AttemptTimeoutCeilingSeconds / (double)StaleChunkPenaltySeconds + 1)
71+
if (chunkRatio >= timeoutCeilingSeconds / (double)OversizedSliceSecondsPerCurrentChunk)
5172
{
52-
return AttemptTimeoutCeilingSeconds;
73+
return timeoutCeilingSeconds;
5374
}
5475

55-
return (long)(chunkRatio - 1) * StaleChunkPenaltySeconds;
76+
return (long)chunkRatio * OversizedSliceSecondsPerCurrentChunk;
5677
}
5778
}

tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Strategies/UploadFailureClassifierTests.cs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ public void Classify_TaskCanceledException_WithNonCancelledToken_ShouldReturnCli
7070
public void Classify_GenericException_ShouldReturnServerError500()
7171
{
7272
using var cts = new CancellationTokenSource();
73-
cts.Cancel();
7473
var ex = new InvalidOperationException("broken");
7574

7675
var response = UploadFailureClassifier.Classify(ex, cts.Token);
@@ -111,6 +110,22 @@ public void Classify_HttpRequestExceptionWithConnectionReset_ShouldReturnClientN
111110
response.Exception.Should().BeSameAs(ex);
112111
}
113112

113+
[TestCase("Received an unexpected EOF from the transport stream.")]
114+
[TestCase("Received 0 bytes from the transport stream.")]
115+
public void Classify_HttpRequestExceptionWithUnexpectedTransportClosureMessage_ShouldReturnClientNetworkError(string message)
116+
{
117+
using var cts = new CancellationTokenSource();
118+
var ioException = new IOException(message);
119+
var ex = new HttpRequestException("The SSL connection could not be established, see inner exception.", ioException);
120+
121+
var response = UploadFailureClassifier.Classify(ex, cts.Token);
122+
123+
response.IsSuccess.Should().BeFalse();
124+
response.StatusCode.Should().Be(0);
125+
response.FailureKind.Should().Be(UploadFailureKind.ClientNetworkError);
126+
response.Exception.Should().BeSameAs(ex);
127+
}
128+
114129
[Test]
115130
public void Classify_DirectSocketExceptionWithConnectionReset_ShouldReturnClientNetworkError()
116131
{
@@ -124,4 +139,33 @@ public void Classify_DirectSocketExceptionWithConnectionReset_ShouldReturnClient
124139
response.FailureKind.Should().Be(UploadFailureKind.ClientNetworkError);
125140
response.Exception.Should().BeSameAs(ex);
126141
}
142+
143+
[Test]
144+
public void Classify_DirectSocketExceptionWithOperationAborted_ShouldReturnClientNetworkError()
145+
{
146+
using var cts = new CancellationTokenSource();
147+
var ex = new SocketException((int)SocketError.OperationAborted);
148+
149+
var response = UploadFailureClassifier.Classify(ex, cts.Token);
150+
151+
response.IsSuccess.Should().BeFalse();
152+
response.StatusCode.Should().Be(0);
153+
response.FailureKind.Should().Be(UploadFailureKind.ClientNetworkError);
154+
response.Exception.Should().BeSameAs(ex);
155+
}
156+
157+
[Test]
158+
public void Classify_DirectSocketExceptionWithOperationAbortedAndCancelledToken_ShouldReturnClientCancellation()
159+
{
160+
using var cts = new CancellationTokenSource();
161+
cts.Cancel();
162+
var ex = new SocketException((int)SocketError.OperationAborted);
163+
164+
var response = UploadFailureClassifier.Classify(ex, cts.Token);
165+
166+
response.IsSuccess.Should().BeFalse();
167+
response.StatusCode.Should().Be(0);
168+
response.FailureKind.Should().Be(UploadFailureKind.ClientCancellation);
169+
response.Exception.Should().BeSameAs(ex);
170+
}
127171
}

tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/AdaptiveUploadControllerTests.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ public void Downscale_ReducesChunkSize_WhenAtMinParallelism()
120120
_controller.CurrentParallelism.Should().Be(2);
121121
_controller.CurrentChunkSizeBytes.Should().BeLessThan(before);
122122
_controller.CurrentChunkSizeBytes.Should().BeGreaterThanOrEqualTo(64 * 1024);
123+
_controller.CurrentChunkSizeBytes.Should().Be(375 * 1024);
123124
}
124125

125126
[Test]
@@ -197,7 +198,7 @@ public void ClientTimeouts_DownscaleBelowInitialChunkSize_WhenAtMinParallelism()
197198
// Assert
198199
_controller.CurrentParallelism.Should().Be(2);
199200
_controller.CurrentChunkSizeBytes.Should().BeLessThan(500 * 1024);
200-
_controller.CurrentChunkSizeBytes.Should().Be(375 * 1024);
201+
_controller.CurrentChunkSizeBytes.Should().Be(250 * 1024);
201202
}
202203

203204
[Test]
@@ -212,11 +213,11 @@ public void ClientNetworkErrors_DownscaleBelowInitialChunkSize_WhenAtMinParallel
212213

213214
// Assert
214215
_controller.CurrentParallelism.Should().Be(2);
215-
_controller.CurrentChunkSizeBytes.Should().Be(375 * 1024);
216+
_controller.CurrentChunkSizeBytes.Should().Be(250 * 1024);
216217
}
217218

218219
[Test]
219-
public void ClientTimeouts_ReduceParallelismFirst_WhenAboveMinParallelism()
220+
public void ClientTimeouts_DownscaleParallelismAndChunk_WhenAboveMinParallelism()
220221
{
221222
// Arrange
222223
var safety = 100;
@@ -233,8 +234,9 @@ public void ClientTimeouts_ReduceParallelismFirst_WhenAboveMinParallelism()
233234
FeedClientTimeouts(_controller, 2);
234235

235236
// Assert
236-
_controller.CurrentParallelism.Should().Be(beforeParallelism - 1);
237-
_controller.CurrentChunkSizeBytes.Should().Be(beforeChunk);
237+
_controller.CurrentParallelism.Should().Be(2);
238+
_controller.CurrentParallelism.Should().BeLessThan(beforeParallelism);
239+
_controller.CurrentChunkSizeBytes.Should().Be(Math.Max(64 * 1024, (int)Math.Round(beforeChunk * 0.5)));
238240
}
239241

240242
[Test]

0 commit comments

Comments
 (0)