Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d1f90f1
fix: Retry after partial file reads.
kinyoklion Jan 9, 2026
53d2f62
Merge branch 'main' into rlamb/file-data-source-json-retry
kinyoklion Feb 6, 2026
f724861
Merge branch 'main' into rlamb/file-data-source-json-retry
kinyoklion May 26, 2026
2859064
test: stop pinning segment version in flaky FileDataSource tests
kinyoklion May 26, 2026
f9eb455
ci: trigger flake check run 1/19
kinyoklion May 26, 2026
d1add2a
ci: trigger flake check run 2/19
kinyoklion May 26, 2026
9924c10
ci: trigger flake check run 3/19
kinyoklion May 26, 2026
a13e6da
ci: trigger flake check run 4/19
kinyoklion May 26, 2026
e35db5d
ci: trigger flake check run 5/19
kinyoklion May 26, 2026
ce9dd18
ci: trigger flake check run 6/19
kinyoklion May 26, 2026
8151abf
ci: trigger flake check run 7/19
kinyoklion May 26, 2026
c04a01d
ci: trigger flake check run 8/19
kinyoklion May 26, 2026
1baf7ec
ci: trigger flake check run 9/19
kinyoklion May 26, 2026
a458d00
ci: trigger flake check run 10/19
kinyoklion May 26, 2026
81cf9ac
ci: trigger flake check run 11/19
kinyoklion May 26, 2026
d330eee
ci: trigger flake check run 12/19
kinyoklion May 26, 2026
98c80ec
ci: trigger flake check run 13/19
kinyoklion May 26, 2026
907e9bc
ci: trigger flake check run 14/19
kinyoklion May 26, 2026
50215b5
ci: trigger flake check run 15/19
kinyoklion May 26, 2026
75ab3bf
ci: trigger flake check run 16/19
kinyoklion May 26, 2026
9ca9b0e
ci: trigger flake check run 17/19
kinyoklion May 26, 2026
9d4913b
ci: trigger flake check run 18/19
kinyoklion May 26, 2026
0066cd9
ci: trigger flake check run 19/19
kinyoklion May 26, 2026
99a9cd0
ci: trigger flake check run 20
kinyoklion May 26, 2026
b0b31e1
ci: trigger flake check run 21
kinyoklion May 26, 2026
764e7d3
ci: trigger flake check run 22
kinyoklion May 26, 2026
dc58101
ci: trigger flake check run 23
kinyoklion May 26, 2026
22c836c
ci: trigger flake check run 24
kinyoklion May 26, 2026
72cc40a
ci: trigger flake check run 25
kinyoklion May 26, 2026
f90b545
ci: trigger flake check run 26
kinyoklion May 26, 2026
ee1bf7e
ci: trigger flake check run 27
kinyoklion May 26, 2026
824e929
ci: trigger flake check run 28
kinyoklion May 26, 2026
ba42d22
ci: trigger flake check run 29
kinyoklion May 26, 2026
fc32955
ci: trigger flake check run 30
kinyoklion May 26, 2026
5bc5f9b
ci: trigger flake check run 31
kinyoklion May 26, 2026
956bd47
ci: trigger flake check run 32
kinyoklion May 26, 2026
97bacd2
ci: trigger flake check run 33
kinyoklion May 26, 2026
2de17ef
ci: trigger flake check run 34
kinyoklion May 26, 2026
0a14226
ci: trigger flake check run 35
kinyoklion May 26, 2026
c709ee4
ci: trigger flake check run 36
kinyoklion May 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions pkgs/sdk/server/src/Internal/DataSources/FileDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@ internal sealed class FileDataSource : IDataSource
private readonly Logger _logger;
private volatile bool _started;
private volatile bool _loadedValidData;
private volatile bool _disposed;
private volatile int _lastVersion;
private object _updateLock = new object();

private const int MaxRetries = 5;
private static readonly TimeSpan RetryDelay = TimeSpan.FromMilliseconds(600);
// Per-path JSON-parse retry counters. Only touched inside _updateLock.
private readonly Dictionary<string, int> _retryCounts = new Dictionary<string, int>();

public FileDataSource(IDataSourceUpdates dataSourceUpdates, FileDataTypes.IFileReader fileReader,
List<string> paths, bool autoUpdate, Func<string, object> alternateParser, bool skipMissingPaths,
FileDataTypes.DuplicateKeysHandling duplicateKeysHandling,
Expand Down Expand Up @@ -83,6 +89,7 @@ private void Dispose(bool disposing)
{
if (disposing)
{
_disposed = true;
_reloader?.Dispose();
}
}
Expand All @@ -91,6 +98,10 @@ private void LoadAll()
{
lock (_updateLock)
{
if (_disposed)
{
return;
}
var version = Interlocked.Increment(ref _lastVersion);
var flags = new Dictionary<string, ItemDescriptor>();
var segments = new Dictionary<string, ItemDescriptor>();
Expand All @@ -102,11 +113,43 @@ private void LoadAll()
_logger.Debug("file data: {0}", content);
var data = _parser.Parse(content, version);
_dataMerger.AddToData(data, flags, segments);
_retryCounts.Remove(path);
}
catch (FileNotFoundException) when (_skipMissingPaths)
{
_logger.Debug("{0}: {1}", path, "File not found");
}
catch (System.Text.Json.JsonException)
{
// A file-change notification can fire while the file is mid-write, so we may read an
// empty or partially written file. Retry up to MaxRetries times before giving up; the
// counter is cleared on the next successful load.
if (!_retryCounts.ContainsKey(path))
{
_retryCounts[path] = 0;
}
_retryCounts[path]++;

if (_retryCounts[path] < MaxRetries)
{
_logger.Warn("{0}: Failed to parse file, retrying in {1} ms", path, RetryDelay.TotalMilliseconds);
Task.Run(async () =>
{
await Task.Delay(RetryDelay).ConfigureAwait(false);
if (_disposed)
{
return;
}
LoadAll();
});
}
else
{
_logger.Error("{0}: Failed to parse file after {1} retries", path, MaxRetries);
}

return;
}
catch (Exception e)
{
LogHelpers.LogException(_logger, "Failed to load " + path, e);
Expand Down
102 changes: 38 additions & 64 deletions pkgs/sdk/server/test/Internal/DataSources/FileDataSourceTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,50 +151,7 @@ public void ModifiedFileIsReloadedIfAutoUpdateIsOn()

file.SetContentFromPath(TestUtils.TestFilePath("segment-only.json"));

AssertHelpers.ExpectPredicate(_updateSink.Inits, actual =>
{
var segments = actual.Data.First(item => item.Key == DataModel.Segments);
var features = actual.Data.First(item => item.Key == DataModel.Features);
if (!features.Value.Items.IsNullOrEmpty())
{
return false;
}

var segmentItems = segments.Value.Items.ToList();

if (segmentItems.Count != 1)
{
return false;
}

var segmentDescriptor = segmentItems[0];
if (segmentDescriptor.Key != "seg1")
{
return false;
}

if (segmentDescriptor.Value.Version == 1)
{
return false;
}

if (!(segmentDescriptor.Value.Item is Segment segment))
{
return false;
}

if (segment.Deleted)
{
return false;
}

if (segment.Included.Count != 1)
{
return false;
}

return segment.Included[0] == "user1";
},
AssertHelpers.ExpectPredicate(_updateSink.Inits, IsSegmentOnlyDataAfterReload,
"Did not receive expected update from the file data source.",
TimeSpan.FromSeconds(30));
}
Expand Down Expand Up @@ -270,16 +227,9 @@ public void ModifiedFileIsReloadedEvenIfOneFileIsMissingIfSkipMissingPathsIsSet(

file1.SetContentFromPath(TestUtils.TestFilePath("segment-only.json"));

// Use ExpectJsonValue to handle potential race conditions where the file watcher
// may trigger multiple reload events. This keeps checking events until one matches
// the expected JSON or the timeout expires.
var newData = AssertHelpers.ExpectJsonValue(
_updateSink.Inits,
DataSetAsJson(ExpectedDataSetForSegmentOnlyFile(2)),
DataSetAsJson,
AssertHelpers.ExpectPredicate(_updateSink.Inits, IsSegmentOnlyDataAfterReload,
"Did not receive expected update from the file data source.",
TimeSpan.FromSeconds(30));

AssertJsonEqual(DataSetAsJson(ExpectedDataSetForSegmentOnlyFile(2)), DataSetAsJson(newData));
}
}
}
Expand All @@ -298,18 +248,9 @@ public void IfFlagsAreBadAtStartTimeAutoUpdateCanStillLoadGoodDataLater()

file.SetContentFromPath(TestUtils.TestFilePath("segment-only.json"));

// Use ExpectJsonValue to handle potential race conditions where the file watcher
// may trigger multiple reload events. This keeps checking events until one matches
// the expected JSON or the timeout expires.
// Note that the expected version is 2 because we increment the version on each
// *attempt* to load the files, not on each successful load.
var newData = AssertHelpers.ExpectJsonValue(
_updateSink.Inits,
DataSetAsJson(ExpectedDataSetForSegmentOnlyFile(2)),
DataSetAsJson,
AssertHelpers.ExpectPredicate(_updateSink.Inits, IsSegmentOnlyDataAfterReload,
"Did not receive expected update from the file data source.",
TimeSpan.FromSeconds(30));

AssertJsonEqual(DataSetAsJson(ExpectedDataSetForSegmentOnlyFile(2)), DataSetAsJson(newData));
}
}
}
Expand Down Expand Up @@ -365,5 +306,38 @@ private static FullDataSet<ItemDescriptor> ExpectedDataSetForSegmentOnlyFile(int
new SegmentBuilder("seg1").Version(version).Included("user1").Build()
)
.Build();

// Predicate that matches the structure of segment-only.json reloaded after the initial load.
// We deliberately don't pin the exact version: with the file watcher firing on truncate-then-write
// and the JsonException retry, the version of the successful load is non-deterministic, so we
// only require that it isn't the initial version 1.
private static bool IsSegmentOnlyDataAfterReload(FullDataSet<ItemDescriptor> actual)
{
var features = actual.Data.First(item => item.Key == DataModel.Features);
if (!features.Value.Items.IsNullOrEmpty())
{
return false;
}

var segments = actual.Data.First(item => item.Key == DataModel.Segments);
var segmentItems = segments.Value.Items.ToList();
if (segmentItems.Count != 1)
{
return false;
}

var segmentDescriptor = segmentItems[0];
if (segmentDescriptor.Key != "seg1" || segmentDescriptor.Value.Version == 1)
{
return false;
}

if (!(segmentDescriptor.Value.Item is Segment segment) || segment.Deleted)
{
return false;
}

return segment.Included.Count == 1 && segment.Included[0] == "user1";
}
}
}
Loading