Skip to content

Commit a6ca453

Browse files
committed
Fix PR #81 review follow-ups
1 parent c00b07c commit a6ca453

File tree

5 files changed

+162
-15
lines changed

5 files changed

+162
-15
lines changed

DotPilot.Runtime.Host/Features/RuntimeFoundation/EmbeddedRuntimeTrafficPolicy.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ internal static class EmbeddedRuntimeTrafficPolicy
99
private const string MermaidHeader = "flowchart LR";
1010
private const string MermaidArrow = " --> ";
1111
private const string MermaidActiveArrow = " ==> ";
12-
private const string ClientTargetMethods = "GetAsync, UpsertAsync";
1312

1413
public static string Summary => PolicySummary;
1514

@@ -109,9 +108,6 @@ private static string CreateMermaidDiagramCore((string Source, string Target, st
109108
transition.SourceMethods.Contains(activeTransition.Value.SourceMethod, StringComparer.Ordinal) &&
110109
transition.TargetMethods.Contains(activeTransition.Value.TargetMethod, StringComparer.Ordinal);
111110
var arrow = isActive ? MermaidActiveArrow : MermaidArrow;
112-
var targetMethods = transition.Target == EmbeddedRuntimeHostNames.ClientSourceName
113-
? ClientTargetMethods
114-
: string.Join(", ", transition.TargetMethods);
115111
lines.Add(
116112
string.Concat(
117113
transition.Source,
@@ -120,7 +116,7 @@ private static string CreateMermaidDiagramCore((string Source, string Target, st
120116
" : ",
121117
string.Join(", ", transition.SourceMethods),
122118
" -> ",
123-
targetMethods));
119+
string.Join(", ", transition.TargetMethods)));
124120
}
125121

126122
return string.Join(Environment.NewLine, lines);

DotPilot.Runtime/Features/RuntimeFoundation/AgentFrameworkRuntimeClient.cs

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,17 @@ public sealed class AgentFrameworkRuntimeClient : IAgentRuntimeClient
1818
private const string StartReplayKind = "run-started";
1919
private const string PauseReplayKind = "approval-pending";
2020
private const string ResumeReplayKind = "run-resumed";
21+
private const string RejectedReplayKind = "approval-rejected";
2122
private const string CompletedReplayKind = "run-completed";
23+
private const string ResumeNotAllowedDetailFormat =
24+
"Session {0} is not paused with pending approval and cannot be resumed.";
25+
private static readonly System.Text.CompositeFormat ResumeNotAllowedDetailCompositeFormat =
26+
System.Text.CompositeFormat.Parse(ResumeNotAllowedDetailFormat);
2227
private readonly IGrainFactory _grainFactory;
2328
private readonly RuntimeSessionArchiveStore _archiveStore;
2429
private readonly DeterministicAgentTurnEngine _turnEngine;
2530
private readonly Workflow _workflow;
31+
private readonly TimeProvider _timeProvider;
2632

2733
public AgentFrameworkRuntimeClient(IGrainFactory grainFactory, RuntimeSessionArchiveStore archiveStore)
2834
: this(grainFactory, archiveStore, TimeProvider.System)
@@ -36,6 +42,7 @@ internal AgentFrameworkRuntimeClient(
3642
{
3743
_grainFactory = grainFactory;
3844
_archiveStore = archiveStore;
45+
_timeProvider = timeProvider;
3946
_turnEngine = new DeterministicAgentTurnEngine(timeProvider);
4047
_workflow = BuildWorkflow();
4148
}
@@ -81,12 +88,26 @@ public async ValueTask<Result<AgentTurnResult>> ResumeAsync(AgentTurnResumeReque
8188
{
8289
cancellationToken.ThrowIfCancellationRequested();
8390

84-
var archive = await _archiveStore.LoadAsync(request.SessionId, cancellationToken);
91+
StoredRuntimeSessionArchive? archive;
92+
try
93+
{
94+
archive = await _archiveStore.LoadAsync(request.SessionId, cancellationToken);
95+
}
96+
catch (JsonException)
97+
{
98+
return Result<AgentTurnResult>.Fail(RuntimeCommunicationProblems.SessionArchiveCorrupted(request.SessionId));
99+
}
100+
85101
if (archive is null)
86102
{
87103
return Result<AgentTurnResult>.Fail(RuntimeCommunicationProblems.SessionArchiveMissing(request.SessionId));
88104
}
89105

106+
if (archive.Phase is not SessionPhase.Paused || archive.ApprovalState is not ApprovalState.Pending)
107+
{
108+
return Result<AgentTurnResult>.Fail(CreateResumeNotAllowedProblem(request.SessionId));
109+
}
110+
90111
if (string.IsNullOrWhiteSpace(archive.CheckpointId))
91112
{
92113
return Result<AgentTurnResult>.Fail(RuntimeCommunicationProblems.ResumeCheckpointMissing(request.SessionId));
@@ -115,7 +136,7 @@ await PersistRuntimeStateAsync(
115136
archive.OriginalRequest,
116137
result,
117138
resolvedCheckpoint,
118-
request.ApprovalState is ApprovalState.Approved ? ResumeReplayKind : PauseReplayKind,
139+
ResolveResumeReplayKind(request.ApprovalState),
119140
cancellationToken);
120141

121142
return Result<AgentTurnResult>.Succeed(result);
@@ -232,13 +253,14 @@ private async ValueTask PersistRuntimeStateAsync(
232253
{
233254
var existingArchive = await _archiveStore.LoadAsync(originalRequest.SessionId, cancellationToken);
234255
var replay = existingArchive?.Replay.ToList() ?? [];
256+
var recordedAt = _timeProvider.GetUtcNow();
235257
replay.Add(
236258
new RuntimeSessionReplayEntry(
237259
replayKind,
238260
result.Summary,
239261
result.NextPhase,
240262
result.ApprovalState,
241-
DateTimeOffset.UtcNow));
263+
recordedAt));
242264
if (result.NextPhase is SessionPhase.Execute or SessionPhase.Review or SessionPhase.Failed)
243265
{
244266
replay.Add(
@@ -247,7 +269,7 @@ private async ValueTask PersistRuntimeStateAsync(
247269
result.Summary,
248270
result.NextPhase,
249271
result.ApprovalState,
250-
DateTimeOffset.UtcNow));
272+
recordedAt));
251273
}
252274

253275
var archive = new StoredRuntimeSessionArchive(
@@ -257,16 +279,16 @@ private async ValueTask PersistRuntimeStateAsync(
257279
originalRequest,
258280
result.NextPhase,
259281
result.ApprovalState,
260-
DateTimeOffset.UtcNow,
282+
recordedAt,
261283
replay,
262284
result.ProducedArtifacts);
263285

264286
await _archiveStore.SaveAsync(archive, cancellationToken);
265-
await UpsertSessionStateAsync(originalRequest, result);
287+
await UpsertSessionStateAsync(originalRequest, result, recordedAt);
266288
await UpsertArtifactsAsync(result.ProducedArtifacts);
267289
}
268290

269-
private async ValueTask UpsertSessionStateAsync(AgentTurnRequest request, AgentTurnResult result)
291+
private async ValueTask UpsertSessionStateAsync(AgentTurnRequest request, AgentTurnResult result, DateTimeOffset timestamp)
270292
{
271293
var session = new SessionDescriptor
272294
{
@@ -276,8 +298,8 @@ private async ValueTask UpsertSessionStateAsync(AgentTurnRequest request, AgentT
276298
Phase = result.NextPhase,
277299
ApprovalState = result.ApprovalState,
278300
AgentProfileIds = [request.AgentProfileId],
279-
CreatedAt = DateTimeOffset.UtcNow,
280-
UpdatedAt = DateTimeOffset.UtcNow,
301+
CreatedAt = timestamp,
302+
UpdatedAt = timestamp,
281303
};
282304

283305
await _grainFactory.GetGrain<ISessionGrain>(request.SessionId.ToString()).UpsertAsync(session);
@@ -352,6 +374,27 @@ await context.QueueStateUpdateAsync(
352374
await context.RequestHaltAsync();
353375
}
354376

377+
private static string ResolveResumeReplayKind(ApprovalState approvalState)
378+
{
379+
return approvalState switch
380+
{
381+
ApprovalState.Approved => ResumeReplayKind,
382+
ApprovalState.Rejected => RejectedReplayKind,
383+
_ => PauseReplayKind,
384+
};
385+
}
386+
387+
private static Problem CreateResumeNotAllowedProblem(SessionId sessionId)
388+
{
389+
return Problem.Create(
390+
RuntimeCommunicationProblemCode.ResumeCheckpointMissing,
391+
string.Format(
392+
System.Globalization.CultureInfo.InvariantCulture,
393+
ResumeNotAllowedDetailCompositeFormat,
394+
sessionId),
395+
(int)System.Net.HttpStatusCode.Conflict);
396+
}
397+
355398
private async ValueTask HandleResumeAsync(
356399
RuntimeWorkflowSignal signal,
357400
RuntimeWorkflowState state,

DotPilot.Runtime/Features/RuntimeFoundation/RuntimeFoundationCatalog.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public sealed class RuntimeFoundationCatalog : IRuntimeFoundationCatalog
2525
"Agent Framework orchestrates local runs, approvals, and checkpoints without moving execution logic into the Uno app.";
2626
private const string TrafficPolicyName = "Traffic policy";
2727
private const string TrafficPolicySummary =
28-
"Allowed grain transitions are explicit, testable, and visualized through Orleans.Graph instead of hidden conventions.";
28+
"Allowed grain transitions are explicit, testable, and surfaced through the embedded traffic-policy Mermaid catalog instead of hidden conventions.";
2929
private const string SessionPersistenceName = "Session persistence";
3030
private const string SessionPersistenceSummary =
3131
"Checkpoint, replay, and resume data survive host restarts in local session archives without changing the Orleans storage topology.";

DotPilot.Tests/Features/RuntimeFoundation/AgentFrameworkRuntimeClientTests.cs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ public sealed class AgentFrameworkRuntimeClientTests
99
private const string PlanPrompt = "Plan the embedded runtime rollout.";
1010
private const string ApprovedResumeSummary = "Approved by the operator.";
1111
private const string RejectedResumeSummary = "Rejected by the operator.";
12+
private const string ResumeRejectedKind = "approval-rejected";
1213
private const string ArchiveFileName = "archive.json";
1314
private const string ReplayFileName = "replay.md";
15+
private static readonly DateTimeOffset FixedTimestamp = new(2026, 3, 14, 9, 30, 0, TimeSpan.Zero);
1416

1517
[Test]
1618
public async Task ExecuteAsyncPersistsAReplayArchiveForPlanMode()
@@ -96,9 +98,41 @@ public async Task ResumeAsyncPersistsRejectedApprovalAsFailedReplay()
9698
rejectedResult.Value.ApprovalState.Should().Be(ApprovalState.Rejected);
9799
archiveResult.IsSuccess.Should().BeTrue();
98100
archiveResult.Value!.Phase.Should().Be(SessionPhase.Failed);
101+
archiveResult.Value.Replay.Should().Contain(entry => entry.Kind == ResumeRejectedKind && entry.Phase == SessionPhase.Failed);
99102
archiveResult.Value.Replay.Should().Contain(entry => entry.Kind == "run-completed" && entry.Phase == SessionPhase.Failed);
100103
}
101104

105+
[Test]
106+
public async Task ResumeAsyncRejectsArchivedSessionsThatAreNoLongerPausedForApproval()
107+
{
108+
using var runtimeDirectory = new TemporaryRuntimePersistenceDirectory();
109+
var request = CreateRequest(ApprovalPrompt, AgentExecutionMode.Execute);
110+
111+
{
112+
using var firstHost = CreateHost(runtimeDirectory.Root);
113+
await firstHost.StartAsync();
114+
var firstClient = firstHost.Services.GetRequiredService<IAgentRuntimeClient>();
115+
_ = await firstClient.ExecuteAsync(request, CancellationToken.None);
116+
_ = await firstClient.ResumeAsync(
117+
new AgentTurnResumeRequest(request.SessionId, ApprovalState.Approved, ApprovedResumeSummary),
118+
CancellationToken.None);
119+
}
120+
121+
{
122+
using var secondHost = CreateHost(runtimeDirectory.Root);
123+
await secondHost.StartAsync();
124+
var secondClient = secondHost.Services.GetRequiredService<IAgentRuntimeClient>();
125+
126+
var result = await secondClient.ResumeAsync(
127+
new AgentTurnResumeRequest(request.SessionId, ApprovalState.Approved, ApprovedResumeSummary),
128+
CancellationToken.None);
129+
130+
result.IsFailed.Should().BeTrue();
131+
result.Problem!.HasErrorCode(RuntimeCommunicationProblemCode.ResumeCheckpointMissing).Should().BeTrue();
132+
result.Problem.Detail.Should().Contain("cannot be resumed");
133+
}
134+
}
135+
102136
[Test]
103137
public async Task GetSessionArchiveAsyncReturnsMissingProblemWhenNothingWasPersisted()
104138
{
@@ -133,6 +167,52 @@ public async Task GetSessionArchiveAsyncReturnsCorruptionProblemForInvalidArchiv
133167
result.Problem!.HasErrorCode(RuntimeCommunicationProblemCode.SessionArchiveCorrupted).Should().BeTrue();
134168
}
135169

170+
[Test]
171+
public async Task ResumeAsyncReturnsCorruptionProblemForInvalidArchivePayload()
172+
{
173+
using var runtimeDirectory = new TemporaryRuntimePersistenceDirectory();
174+
var sessionId = SessionId.New();
175+
var sessionDirectory = Path.Combine(runtimeDirectory.Root, sessionId.ToString());
176+
Directory.CreateDirectory(sessionDirectory);
177+
await File.WriteAllTextAsync(Path.Combine(sessionDirectory, ArchiveFileName), "{ invalid json", CancellationToken.None);
178+
179+
using var host = CreateHost(runtimeDirectory.Root);
180+
await host.StartAsync();
181+
var client = host.Services.GetRequiredService<IAgentRuntimeClient>();
182+
183+
var result = await client.ResumeAsync(
184+
new AgentTurnResumeRequest(sessionId, ApprovalState.Approved, ApprovedResumeSummary),
185+
CancellationToken.None);
186+
187+
result.IsFailed.Should().BeTrue();
188+
result.Problem!.HasErrorCode(RuntimeCommunicationProblemCode.SessionArchiveCorrupted).Should().BeTrue();
189+
}
190+
191+
[Test]
192+
public async Task AgentFrameworkRuntimeClientUsesTheInjectedTimeProviderForReplayArchiveAndSessionTimestamps()
193+
{
194+
using var runtimeDirectory = new TemporaryRuntimePersistenceDirectory();
195+
using var host = CreateHost(runtimeDirectory.Root);
196+
await host.StartAsync();
197+
var client = CreateClient(host.Services, runtimeDirectory.Root, new FixedTimeProvider(FixedTimestamp));
198+
var request = CreateRequest(PlanPrompt, AgentExecutionMode.Plan);
199+
200+
var result = await client.ExecuteAsync(request, CancellationToken.None);
201+
var archiveResult = await client.GetSessionArchiveAsync(request.SessionId, CancellationToken.None);
202+
var session = await host.Services
203+
.GetRequiredService<IGrainFactory>()
204+
.GetGrain<ISessionGrain>(request.SessionId.ToString())
205+
.GetAsync();
206+
207+
result.IsSuccess.Should().BeTrue();
208+
archiveResult.IsSuccess.Should().BeTrue();
209+
archiveResult.Value!.UpdatedAt.Should().Be(FixedTimestamp);
210+
archiveResult.Value.Replay.Should().OnlyContain(entry => entry.RecordedAt == FixedTimestamp);
211+
session.Should().NotBeNull();
212+
session!.CreatedAt.Should().Be(FixedTimestamp);
213+
session.UpdatedAt.Should().Be(FixedTimestamp);
214+
}
215+
136216
[Test]
137217
public async Task ExtractCheckpointReturnsNullWhenRunHasNoCheckpointData()
138218
{
@@ -212,6 +292,24 @@ private static AgentTurnRequest CreateRequest(string prompt, AgentExecutionMode
212292
return new AgentTurnRequest(SessionId.New(), AgentProfileId.New(), prompt, mode, ProviderConnectionStatus.Available);
213293
}
214294

295+
private static AgentFrameworkRuntimeClient CreateClient(IServiceProvider services, string rootDirectory, TimeProvider timeProvider)
296+
{
297+
return (AgentFrameworkRuntimeClient)Activator.CreateInstance(
298+
typeof(AgentFrameworkRuntimeClient),
299+
BindingFlags.Instance | BindingFlags.NonPublic,
300+
binder: null,
301+
args:
302+
[
303+
services.GetRequiredService<IGrainFactory>(),
304+
new RuntimeSessionArchiveStore(new RuntimePersistenceOptions
305+
{
306+
RootDirectoryPath = rootDirectory,
307+
}),
308+
timeProvider,
309+
],
310+
culture: null)!;
311+
}
312+
215313
private static Microsoft.Agents.AI.Workflows.Workflow CreateNoCheckpointWorkflow()
216314
{
217315
var executor = new Microsoft.Agents.AI.Workflows.FunctionExecutor<string>(
@@ -248,6 +346,11 @@ private static int GetFreeTcpPort()
248346
}
249347
}
250348

349+
internal sealed class FixedTimeProvider(DateTimeOffset timestamp) : TimeProvider
350+
{
351+
public override DateTimeOffset GetUtcNow() => timestamp;
352+
}
353+
251354
internal sealed class TemporaryRuntimePersistenceDirectory : IDisposable
252355
{
253356
public TemporaryRuntimePersistenceDirectory()

DotPilot.Tests/RuntimeFoundationCatalogTests.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ public void CatalogGroupsEpicTwelveIntoFourSequencedSlices()
2525
RuntimeFoundationIssues.AgentFrameworkRuntime,
2626
RuntimeFoundationIssues.GrainTrafficPolicy,
2727
RuntimeFoundationIssues.SessionPersistence);
28+
snapshot.Slices.Single(slice => slice.IssueNumber == RuntimeFoundationIssues.GrainTrafficPolicy)
29+
.Summary
30+
.Should()
31+
.Contain("Mermaid")
32+
.And.NotContain("Orleans.Graph");
2833
}
2934

3035
[Test]

0 commit comments

Comments
 (0)