Skip to content

Commit 4c1b9ef

Browse files
authored
.NET: fix: filter filesystem checkpoint index by session (#6132)
* fix: filter filesystem checkpoint index by session * fix: filter checkpoint index by parent * .NET: preserve legacy checkpoint index discovery
1 parent e793794 commit 4c1b9ef

2 files changed

Lines changed: 155 additions & 3 deletions

File tree

dotnet/src/Microsoft.Agents.AI.Workflows/Checkpointing/FileSystemJsonCheckpointStore.cs

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System;
44
using System.Collections.Generic;
55
using System.IO;
6+
using System.Linq;
67
using System.Text;
78
using System.Text.Json;
89
using System.Text.Json.Serialization.Metadata;
@@ -11,7 +12,11 @@
1112

1213
namespace Microsoft.Agents.AI.Workflows.Checkpointing;
1314

14-
internal record CheckpointFileIndexEntry(CheckpointInfo CheckpointInfo, string FileName);
15+
internal record CheckpointFileIndexEntry(
16+
CheckpointInfo CheckpointInfo,
17+
string FileName,
18+
string? ParentCheckpointId = null,
19+
bool HasParentMetadata = false);
1520

1621
/// <summary>
1722
/// Provides a file system-based implementation of a JSON checkpoint store that persists checkpoint data and index
@@ -30,6 +35,8 @@ public sealed class FileSystemJsonCheckpointStore : JsonCheckpointStore, IDispos
3035

3136
internal DirectoryInfo Directory { get; }
3237
internal HashSet<CheckpointInfo> CheckpointIndex { get; }
38+
private Dictionary<CheckpointInfo, string?> CheckpointParents { get; } = [];
39+
private HashSet<CheckpointInfo> CheckpointsWithKnownParent { get; } = [];
3340

3441
private static JsonTypeInfo<CheckpointFileIndexEntry> EntryTypeInfo => WorkflowsJsonUtilities.JsonContext.Default.CheckpointFileIndexEntry;
3542

@@ -74,6 +81,11 @@ public FileSystemJsonCheckpointStore(DirectoryInfo directory)
7481
// We never actually use the file names from the index entries since they can be derived from the CheckpointInfo, but it is useful to
7582
// have the UrlEncoded file names in the index file for human readability
7683
this.CheckpointIndex.Add(entry.CheckpointInfo);
84+
this.CheckpointParents[entry.CheckpointInfo] = entry.ParentCheckpointId;
85+
if (entry.HasParentMetadata)
86+
{
87+
this.CheckpointsWithKnownParent.Add(entry.CheckpointInfo);
88+
}
7789
}
7890
}
7991
}
@@ -137,7 +149,11 @@ public override async ValueTask<CheckpointInfo> CreateCheckpointAsync(string ses
137149
using Utf8JsonWriter jsonWriter = new(checkpointStream, new JsonWriterOptions() { Indented = false });
138150
value.WriteTo(jsonWriter);
139151

140-
CheckpointFileIndexEntry entry = new(key, fileName);
152+
string? parentCheckpointId = parent?.CheckpointId;
153+
this.CheckpointParents[key] = parentCheckpointId;
154+
this.CheckpointsWithKnownParent.Add(key);
155+
156+
CheckpointFileIndexEntry entry = new(key, fileName, parentCheckpointId, HasParentMetadata: true);
141157
JsonSerializer.Serialize(this._indexFile!, entry, EntryTypeInfo);
142158
byte[] bytes = Encoding.UTF8.GetBytes(Environment.NewLine);
143159
await this._indexFile!.WriteAsync(bytes, 0, bytes.Length, CancellationToken.None).ConfigureAwait(false);
@@ -148,6 +164,8 @@ public override async ValueTask<CheckpointInfo> CreateCheckpointAsync(string ses
148164
catch (Exception ex)
149165
{
150166
this.CheckpointIndex.Remove(key);
167+
this.CheckpointParents.Remove(key);
168+
this.CheckpointsWithKnownParent.Remove(key);
151169

152170
try
153171
{
@@ -184,6 +202,12 @@ public override ValueTask<IEnumerable<CheckpointInfo>> RetrieveIndexAsync(string
184202
{
185203
this.CheckDisposed();
186204

187-
return new(this.CheckpointIndex);
205+
return new(this.CheckpointIndex
206+
.Where(checkpoint => checkpoint.SessionId == sessionId &&
207+
(withParent is null ||
208+
!this.CheckpointsWithKnownParent.Contains(checkpoint) ||
209+
(this.CheckpointParents.TryGetValue(checkpoint, out string? parentCheckpointId) &&
210+
parentCheckpointId == withParent.CheckpointId)))
211+
.ToArray());
188212
}
189213
}

dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/FileSystemJsonCheckpointStoreTests.cs

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
using System;
44
using System.IO;
5+
using System.Linq;
56
using System.Text.Json;
67
using System.Threading.Tasks;
78
using FluentAssertions;
@@ -197,4 +198,131 @@ public async Task RetrieveCheckpointAsync_ShouldReturnPersistedDataAsync()
197198
retrieved.GetProperty("name").GetString().Should().Be("test");
198199
retrieved.GetProperty("value").GetInt32().Should().Be(42);
199200
}
201+
202+
[Fact]
203+
public async Task RetrieveIndexAsync_ShouldOnlyReturnCheckpointsForRequestedSessionAsync()
204+
{
205+
// Arrange
206+
using TempDirectory tempDirectory = new();
207+
string firstSessionId = Guid.NewGuid().ToString("N");
208+
string secondSessionId = Guid.NewGuid().ToString("N");
209+
CheckpointInfo firstCheckpoint;
210+
CheckpointInfo secondCheckpoint;
211+
212+
using (FileSystemJsonCheckpointStore store = new(tempDirectory))
213+
{
214+
firstCheckpoint = await store.CreateCheckpointAsync(firstSessionId, TestData);
215+
secondCheckpoint = await store.CreateCheckpointAsync(secondSessionId, TestData);
216+
217+
// Act
218+
CheckpointInfo[] firstSessionIndex = (await store.RetrieveIndexAsync(firstSessionId)).ToArray();
219+
220+
// Assert
221+
firstSessionIndex.Should().ContainSingle().Which.Should().Be(firstCheckpoint);
222+
firstSessionIndex.Should().NotContain(secondCheckpoint);
223+
}
224+
225+
using (FileSystemJsonCheckpointStore reopenedStore = new(tempDirectory))
226+
{
227+
CheckpointInfo[] secondSessionIndex = (await reopenedStore.RetrieveIndexAsync(secondSessionId)).ToArray();
228+
229+
secondSessionIndex.Should().ContainSingle().Which.Should().Be(secondCheckpoint);
230+
secondSessionIndex.Should().NotContain(firstCheckpoint);
231+
}
232+
}
233+
234+
[Fact]
235+
public async Task RetrieveIndexAsync_ShouldFilterByParentCheckpointAsync()
236+
{
237+
// Arrange
238+
using TempDirectory tempDirectory = new();
239+
string sessionId = Guid.NewGuid().ToString("N");
240+
CheckpointInfo parentCheckpoint;
241+
CheckpointInfo childCheckpoint;
242+
CheckpointInfo unrelatedCheckpoint;
243+
244+
using (FileSystemJsonCheckpointStore store = new(tempDirectory))
245+
{
246+
parentCheckpoint = await store.CreateCheckpointAsync(sessionId, TestData);
247+
childCheckpoint = await store.CreateCheckpointAsync(sessionId, TestData, parentCheckpoint);
248+
unrelatedCheckpoint = await store.CreateCheckpointAsync(sessionId, TestData);
249+
250+
// Act
251+
CheckpointInfo[] childIndex = (await store.RetrieveIndexAsync(sessionId, parentCheckpoint)).ToArray();
252+
253+
// Assert
254+
childIndex.Should().ContainSingle().Which.Should().Be(childCheckpoint);
255+
childIndex.Should().NotContain(parentCheckpoint);
256+
childIndex.Should().NotContain(unrelatedCheckpoint);
257+
}
258+
259+
using (FileSystemJsonCheckpointStore reopenedStore = new(tempDirectory))
260+
{
261+
CheckpointInfo[] childIndex = (await reopenedStore.RetrieveIndexAsync(sessionId, parentCheckpoint)).ToArray();
262+
263+
childIndex.Should().ContainSingle().Which.Should().Be(childCheckpoint);
264+
childIndex.Should().NotContain(parentCheckpoint);
265+
childIndex.Should().NotContain(unrelatedCheckpoint);
266+
}
267+
}
268+
269+
[Fact]
270+
public async Task RetrieveIndexAsync_ShouldKeepLegacyEntriesDiscoverableWithParentFilterAsync()
271+
{
272+
// Arrange
273+
using TempDirectory tempDirectory = new();
274+
string sessionId = Guid.NewGuid().ToString("N");
275+
CheckpointInfo parentCheckpoint;
276+
CheckpointInfo childCheckpoint;
277+
string childFileName;
278+
279+
using (FileSystemJsonCheckpointStore store = new(tempDirectory))
280+
{
281+
parentCheckpoint = await store.CreateCheckpointAsync(sessionId, TestData);
282+
childCheckpoint = await store.CreateCheckpointAsync(sessionId, TestData, parentCheckpoint);
283+
childFileName = store.GetFileNameForCheckpoint(sessionId, childCheckpoint);
284+
}
285+
286+
string indexPath = Path.Combine(tempDirectory.FullName, "index.jsonl");
287+
string legacyEntry = JsonSerializer.Serialize(new CheckpointFileIndexEntry(childCheckpoint, childFileName));
288+
File.WriteAllText(indexPath, legacyEntry + Environment.NewLine);
289+
290+
// Act
291+
using FileSystemJsonCheckpointStore reopenedStore = new(tempDirectory);
292+
CheckpointInfo[] childIndex = (await reopenedStore.RetrieveIndexAsync(sessionId, parentCheckpoint)).ToArray();
293+
294+
// Assert
295+
childIndex.Should().ContainSingle().Which.Should().Be(childCheckpoint);
296+
}
297+
298+
[Fact]
299+
public async Task RetrieveIndexAsync_ShouldKeepLegacyChildDiscoverableWithUnrelatedParentFilterAsync()
300+
{
301+
// Arrange
302+
using TempDirectory tempDirectory = new();
303+
string sessionId = Guid.NewGuid().ToString("N");
304+
CheckpointInfo parentCheckpoint;
305+
CheckpointInfo childCheckpoint;
306+
CheckpointInfo unrelatedCheckpoint;
307+
string childFileName;
308+
309+
using (FileSystemJsonCheckpointStore store = new(tempDirectory))
310+
{
311+
parentCheckpoint = await store.CreateCheckpointAsync(sessionId, TestData);
312+
childCheckpoint = await store.CreateCheckpointAsync(sessionId, TestData, parentCheckpoint);
313+
unrelatedCheckpoint = await store.CreateCheckpointAsync(sessionId, TestData);
314+
childFileName = store.GetFileNameForCheckpoint(sessionId, childCheckpoint);
315+
}
316+
317+
string indexPath = Path.Combine(tempDirectory.FullName, "index.jsonl");
318+
string legacyEntry = JsonSerializer.Serialize(new CheckpointFileIndexEntry(childCheckpoint, childFileName));
319+
File.WriteAllText(indexPath, legacyEntry + Environment.NewLine);
320+
321+
// Act
322+
using FileSystemJsonCheckpointStore reopenedStore = new(tempDirectory);
323+
CheckpointInfo[] childIndex = (await reopenedStore.RetrieveIndexAsync(sessionId, unrelatedCheckpoint)).ToArray();
324+
325+
// Assert
326+
childIndex.Should().ContainSingle().Which.Should().Be(childCheckpoint);
327+
}
200328
}

0 commit comments

Comments
 (0)