Skip to content

Commit 8254274

Browse files
committed
[fix] Improve upload retry resilience
Give stale pre-downscale slices a larger retry timeout budget and stop sibling upload workers promptly after fatal upload failures. Made-with: Cursor
1 parent ee8fc4e commit 8254274

6 files changed

Lines changed: 335 additions & 75 deletions

File tree

src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileUploadCoordinator.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public class FileUploadCoordinator : IFileUploadCoordinator
1212
private readonly ManualResetEvent _uploadingIsFinished;
1313
private readonly ManualResetEvent _exceptionOccurred;
1414
private readonly ILogger<FileUploadCoordinator> _logger;
15-
private const int CHANNEL_CAPACITY = 8;
15+
private const int CHANNEL_CAPACITY = 2;
1616

1717
public FileUploadCoordinator(ILogger<FileUploadCoordinator> logger)
1818
{

src/ByteSync.Client/Services/Communications/Transfers/Uploading/FileUploadWorker.cs

Lines changed: 86 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ public class FileUploadWorker : IFileUploadWorker
2929

3030
private static int _workerTaskCounter;
3131

32-
private const int AttemptTimeoutFloorSeconds = 60;
33-
private const int AttemptTimeoutCeilingSeconds = 120;
34-
private const int SecondsPerMegabyteHeuristic = 3;
35-
3632
public FileUploadWorker(
3733
IPolicyFactory policyFactory,
3834
IFileTransferApiClient fileTransferApiClient,
@@ -86,66 +82,84 @@ public FileUploadWorker(
8682
public async Task UploadAvailableSlicesAdaptiveAsync(Channel<FileUploaderSlice> availableSlices, UploadProgressState progressState)
8783
{
8884
var workerId = Interlocked.Increment(ref _workerTaskCounter);
89-
while (await availableSlices.Reader.WaitToReadAsync())
85+
try
9086
{
91-
if (!availableSlices.Reader.TryRead(out var slice))
87+
while (await availableSlices.Reader.WaitToReadAsync(CancellationTokenSource.Token))
9288
{
93-
continue;
94-
}
95-
96-
try
97-
{
98-
var sliceStart = Stopwatch.StartNew();
99-
100-
await IncrementConcurrentAsync(progressState);
101-
var policy = _policyFactory.BuildFileUploadPolicy();
102-
var attempt = 0;
103-
104-
var response = await policy.ExecuteAsync(async () =>
89+
if (_exceptionOccurred.WaitOne(0))
10590
{
106-
attempt++;
107-
108-
return await ExecuteUploadAttemptAsync(slice, workerId, attempt, CancellationTokenSource.Token);
109-
});
110-
111-
EnsureSuccessOrThrow(response);
112-
113-
var fileName = _sharedFileDefinition.GetFileName(slice.PartNumber);
114-
var assertSw = Stopwatch.StartNew();
115-
_logger.LogDebug("UploadAvailableSlice: worker {WorkerId} start asserting slice {Number} for {FileName}",
116-
workerId, slice.PartNumber, fileName);
117-
118-
var transferParameters = new TransferParameters
91+
return;
92+
}
93+
94+
if (!availableSlices.Reader.TryRead(out var slice))
11995
{
120-
SessionId = _sharedFileDefinition.SessionId,
121-
SharedFileDefinition = _sharedFileDefinition,
122-
PartNumber = slice.PartNumber,
123-
PartSizeInBytes = slice.MemoryStream.Length
124-
};
125-
126-
await AssertSliceUploadedAsync(policy, transferParameters, workerId, slice.PartNumber, fileName, assertSw);
127-
assertSw.Stop();
128-
_logger.LogDebug(
129-
"UploadAvailableSlice: worker {WorkerId} finished asserting slice {Number} for {FileName} in {ElapsedMs} ms",
130-
workerId, slice.PartNumber, fileName, assertSw.ElapsedMilliseconds);
131-
132-
// Success path bookkeeping
133-
await UpdateProgressOnSuccessAsync(progressState, slice, sliceStart);
134-
}
135-
catch (Exception ex)
136-
{
137-
await HandleUploadExceptionAsync(progressState, ex, workerId);
138-
139-
return;
140-
}
141-
finally
142-
{
143-
DisposeSlice(slice);
144-
await DecrementConcurrentAsync(progressState);
145-
146-
// No final release here: attempts handled slot release per attempt
96+
continue;
97+
}
98+
99+
try
100+
{
101+
var sliceStart = Stopwatch.StartNew();
102+
103+
await IncrementConcurrentAsync(progressState);
104+
var policy = _policyFactory.BuildFileUploadPolicy();
105+
var attempt = 0;
106+
107+
var response = await policy.ExecuteAsync(async () =>
108+
{
109+
attempt++;
110+
111+
return await ExecuteUploadAttemptAsync(slice, workerId, attempt, CancellationTokenSource.Token);
112+
});
113+
114+
if (!response.IsSuccess && CancellationTokenSource.IsCancellationRequested && _exceptionOccurred.WaitOne(0))
115+
{
116+
return;
117+
}
118+
119+
EnsureSuccessOrThrow(response);
120+
121+
var fileName = _sharedFileDefinition.GetFileName(slice.PartNumber);
122+
var assertSw = Stopwatch.StartNew();
123+
_logger.LogDebug("UploadAvailableSlice: worker {WorkerId} start asserting slice {Number} for {FileName}",
124+
workerId, slice.PartNumber, fileName);
125+
126+
var transferParameters = new TransferParameters
127+
{
128+
SessionId = _sharedFileDefinition.SessionId,
129+
SharedFileDefinition = _sharedFileDefinition,
130+
PartNumber = slice.PartNumber,
131+
PartSizeInBytes = slice.MemoryStream.Length
132+
};
133+
134+
await AssertSliceUploadedAsync(policy, transferParameters, workerId, slice.PartNumber, fileName, assertSw);
135+
assertSw.Stop();
136+
_logger.LogDebug(
137+
"UploadAvailableSlice: worker {WorkerId} finished asserting slice {Number} for {FileName} in {ElapsedMs} ms",
138+
workerId, slice.PartNumber, fileName, assertSw.ElapsedMilliseconds);
139+
140+
await UpdateProgressOnSuccessAsync(progressState, slice, sliceStart);
141+
}
142+
catch (OperationCanceledException) when (CancellationTokenSource.IsCancellationRequested && _exceptionOccurred.WaitOne(0))
143+
{
144+
return;
145+
}
146+
catch (Exception ex)
147+
{
148+
await HandleUploadExceptionAsync(progressState, ex, workerId);
149+
150+
return;
151+
}
152+
finally
153+
{
154+
DisposeSlice(slice);
155+
await DecrementConcurrentAsync(progressState);
156+
}
147157
}
148158
}
159+
catch (OperationCanceledException) when (CancellationTokenSource.IsCancellationRequested)
160+
{
161+
return;
162+
}
149163

150164
await CompleteIfFinishedAsync(progressState);
151165
}
@@ -154,13 +168,23 @@ private async Task<UploadFileResponse> ExecuteUploadAttemptAsync(FileUploaderSli
154168
CancellationToken globalToken)
155169
{
156170
var attemptStart = DateTime.UtcNow;
157-
var timeoutSec = ComputeAttemptTimeoutSeconds(slice);
171+
var currentChunkSizeBytes = _adaptiveUploadController.CurrentChunkSizeBytes;
172+
var timeoutSec = UploadAttemptTimeoutPolicy.ComputeTimeoutSeconds(
173+
slice.MemoryStream.Length,
174+
attempt,
175+
currentChunkSizeBytes);
158176
using var attemptCts = CancellationTokenSource.CreateLinkedTokenSource(globalToken);
159177
attemptCts.CancelAfter(TimeSpan.FromSeconds(timeoutSec));
160178

161179
var beforeWait = _uploadSlots.CurrentCount;
162-
_logger.LogDebug("UploadAvailableSlice: worker {WorkerId} waiting for upload slot (available {Available})",
163-
workerId, beforeWait);
180+
_logger.LogDebug(
181+
"UploadAvailableSlice: worker {WorkerId} waiting for upload slot (available {Available}), attempt {Attempt}, timeout {TimeoutSec}s, slice {SliceKb} KB, currentChunk {CurrentChunkKb} KB",
182+
workerId,
183+
beforeWait,
184+
attempt,
185+
timeoutSec,
186+
Math.Round(slice.MemoryStream.Length / 1024d),
187+
Math.Round(currentChunkSizeBytes / 1024d));
164188

165189
var acquired = false;
166190
try
@@ -269,19 +293,6 @@ private async Task<UploadFileResponse> ExecuteUploadAttemptAsync(FileUploaderSli
269293
}
270294
}
271295

272-
private static int ComputeAttemptTimeoutSeconds(FileUploaderSlice slice)
273-
{
274-
return ComputeAttemptTimeoutSeconds(slice.MemoryStream.Length);
275-
}
276-
277-
private static int ComputeAttemptTimeoutSeconds(long sliceLengthBytes)
278-
{
279-
var sizeMb = Math.Max(1, (int)Math.Ceiling(sliceLengthBytes / (1024d * 1024d)));
280-
var timeoutSec = Math.Clamp(SecondsPerMegabyteHeuristic * sizeMb, AttemptTimeoutFloorSeconds, AttemptTimeoutCeilingSeconds);
281-
282-
return timeoutSec;
283-
}
284-
285296
private static UploadFailureKind RefineFailureKind(UploadFailureKind kind, CancellationTokenSource attemptCts,
286297
CancellationToken globalToken)
287298
{
@@ -414,6 +425,7 @@ private async Task HandleUploadExceptionAsync(UploadProgressState progressState,
414425
}
415426

416427
_exceptionOccurred.Set();
428+
await CancellationTokenSource.CancelAsync();
417429
}
418430

419431
private void DisposeSlice(FileUploaderSlice slice)
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
using System;
2+
3+
namespace ByteSync.Services.Communications.Transfers.Uploading;
4+
5+
public static class UploadAttemptTimeoutPolicy
6+
{
7+
private const int AttemptTimeoutFloorSeconds = 60;
8+
private const int AttemptTimeoutCeilingSeconds = 120;
9+
private const int SecondsPerMegabyteHeuristic = 3;
10+
private const int RetryGrowthSeconds = 15;
11+
private const int StaleChunkPenaltySeconds = 5;
12+
13+
public static int ComputeTimeoutSeconds(long sliceLengthBytes, int attempt, int currentChunkSizeBytes)
14+
{
15+
var timeoutSec = ComputeBaseTimeoutSeconds(sliceLengthBytes);
16+
if (attempt <= 1)
17+
{
18+
return timeoutSec;
19+
}
20+
21+
var staleChunkPenalty = ComputeStaleChunkPenaltySeconds(sliceLengthBytes, currentChunkSizeBytes);
22+
timeoutSec += (attempt - 1) * RetryGrowthSeconds + staleChunkPenalty;
23+
24+
return Math.Clamp(timeoutSec, AttemptTimeoutFloorSeconds, AttemptTimeoutCeilingSeconds);
25+
}
26+
27+
private static int ComputeBaseTimeoutSeconds(long sliceLengthBytes)
28+
{
29+
var sizeMb = Math.Max(1, (int)Math.Ceiling(sliceLengthBytes / (1024d * 1024d)));
30+
31+
return Math.Clamp(
32+
SecondsPerMegabyteHeuristic * sizeMb,
33+
AttemptTimeoutFloorSeconds,
34+
AttemptTimeoutCeilingSeconds);
35+
}
36+
37+
private static int ComputeStaleChunkPenaltySeconds(long sliceLengthBytes, int currentChunkSizeBytes)
38+
{
39+
if (currentChunkSizeBytes <= 0 || sliceLengthBytes <= currentChunkSizeBytes)
40+
{
41+
return 0;
42+
}
43+
44+
var chunkRatio = (int)Math.Ceiling(sliceLengthBytes / (double)currentChunkSizeBytes);
45+
46+
return (chunkRatio - 1) * StaleChunkPenaltySeconds;
47+
}
48+
}

tests/ByteSync.Client.UnitTests/Services/Communications/Transfers/Uploading/FileUploadCoordinatorTests.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,28 @@ public void AvailableSlices_ShouldAllowWritingAndReading()
177177
action.Should().NotThrowAsync();
178178
}
179179

180+
[Test]
181+
public async Task AvailableSlices_ShouldApplyTightBackpressure()
182+
{
183+
// Arrange
184+
var first = new FileUploaderSlice(1, new MemoryStream());
185+
var second = new FileUploaderSlice(2, new MemoryStream());
186+
var third = new FileUploaderSlice(3, new MemoryStream());
187+
188+
// Act
189+
await _coordinator.AvailableSlices.Writer.WriteAsync(first);
190+
await _coordinator.AvailableSlices.Writer.WriteAsync(second);
191+
var thirdWrite = _coordinator.AvailableSlices.Writer.WriteAsync(third).AsTask();
192+
193+
// Assert
194+
thirdWrite.IsCompleted.Should().BeFalse();
195+
196+
var readSlice = await _coordinator.AvailableSlices.Reader.ReadAsync();
197+
readSlice.Should().Be(first);
198+
199+
await thirdWrite.WaitAsync(TimeSpan.FromSeconds(1));
200+
}
201+
180202
[Test]
181203
public void MultipleSetExceptionCalls_ShouldNotThrow()
182204
{

0 commit comments

Comments
 (0)