Skip to content

Commit bbc0157

Browse files
committed
.NET: fix fan-in checkpoint edge state
1 parent ed4ff18 commit bbc0157

3 files changed

Lines changed: 126 additions & 6 deletions

File tree

dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,6 @@ private InProcessRunner(Workflow workflow, ICheckpointManager? checkpointManager
6060
this._knownValidInputTypes = knownValidInputTypes != null
6161
? [.. knownValidInputTypes]
6262
: [];
63-
64-
// Initialize the runners for each of the edges, along with the state for edges that need it.
65-
this.EdgeMap = new EdgeMap(this.RunContext, this.Workflow.Edges, this.Workflow.Ports.Values, this.Workflow.StartExecutorId, this.StepTracer);
6663
}
6764

6865
/// <inheritdoc cref="ISuperStepRunner.SessionId"/>
@@ -154,7 +151,6 @@ ValueTask ISuperStepRunner.EnqueueResponseAsync(ExternalResponse response, Cance
154151
private Workflow Workflow { get; init; }
155152
internal InProcessRunnerContext RunContext { get; init; }
156153
private ICheckpointManager? CheckpointManager { get; }
157-
private EdgeMap EdgeMap { get; init; }
158154

159155
public ConcurrentEventSink OutgoingEvents { get; } = new();
160156

@@ -358,7 +354,7 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d
358354
// Create a representation of the current workflow if it does not already exist.
359355
this._workflowInfoCache ??= this.Workflow.ToWorkflowInfo();
360356

361-
Dictionary<EdgeId, PortableValue> edgeData = await this.EdgeMap.ExportStateAsync().ConfigureAwait(false);
357+
Dictionary<EdgeId, PortableValue> edgeData = await this.RunContext.ExportEdgeStateAsync().ConfigureAwait(false);
362358

363359
await prepareTask.ConfigureAwait(false);
364360
await this.RunContext.StateManager.PublishUpdatesAsync(this.StepTracer).ConfigureAwait(false);
@@ -422,7 +418,7 @@ private async ValueTask RestoreCheckpointCoreAsync(CheckpointInfo checkpointInfo
422418

423419
Task executorNotifyTask = this.RunContext.NotifyCheckpointLoadedAsync(cancellationToken);
424420

425-
await this.EdgeMap.ImportStateAsync(checkpoint).ConfigureAwait(false);
421+
await this.RunContext.ImportEdgeStateAsync(checkpoint).ConfigureAwait(false);
426422
await Task.WhenAll(executorNotifyTask,
427423
restoreCheckpointIndexTask.AsTask()).ConfigureAwait(false);
428424

dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunnerContext.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,20 @@ internal ValueTask<RunnerStateData> ExportStateAsync()
411411
return new(result);
412412
}
413413

414+
internal ValueTask<Dictionary<EdgeId, PortableValue>> ExportEdgeStateAsync()
415+
{
416+
this.CheckEnded();
417+
418+
return this._edgeMap.ExportStateAsync();
419+
}
420+
421+
internal ValueTask ImportEdgeStateAsync(Checkpoint checkpoint)
422+
{
423+
this.CheckEnded();
424+
425+
return this._edgeMap.ImportStateAsync(checkpoint);
426+
}
427+
414428
internal async ValueTask RepublishUnservicedRequestsAsync(CancellationToken cancellationToken = default)
415429
{
416430
this.CheckEnded();

dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointResumeTests.cs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,72 @@ internal async Task Checkpoint_Restore_ClearsQueuedExternalResponsesBeforeImport
321321
"the workflow should finish once the replayed request receives a fresh response");
322322
}
323323

324+
/// <summary>
325+
/// Verifies that fan-in edge state buffered before a checkpoint is still present after resume.
326+
/// </summary>
327+
[Theory]
328+
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
329+
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
330+
internal async Task Checkpoint_Resume_PreservesFanInBarrierBufferedMessagesAsync(ExecutionEnvironment environment)
331+
{
332+
// Arrange
333+
const string RequestPortId = "Approval";
334+
const string SinkId = "Sink";
335+
336+
ExecutorBinding beforePause = new PreCheckpointBarrierSource("BeforePause", RequestPortId, SinkId);
337+
ExecutorBinding afterResume = new PostCheckpointBarrierSource("AfterResume", SinkId);
338+
ExecutorBinding sink = new BarrierSink(SinkId);
339+
RequestPort<ApprovalRequest, ApprovalReply> requestPort = RequestPort.Create<ApprovalRequest, ApprovalReply>(RequestPortId);
340+
341+
Workflow workflow = new WorkflowBuilder(beforePause)
342+
.AddEdge(beforePause, requestPort)
343+
.AddEdge(requestPort, afterResume)
344+
.AddFanInBarrierEdge([beforePause, afterResume], sink)
345+
.Build();
346+
347+
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
348+
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();
349+
350+
ExternalRequest pendingRequest;
351+
CheckpointInfo checkpoint;
352+
353+
await using (StreamingRun firstRun = await env.WithCheckpointing(checkpointManager)
354+
.RunStreamingAsync(workflow, "start"))
355+
{
356+
(pendingRequest, checkpoint) = await CapturePendingRequestAndCheckpointAsync(firstRun);
357+
}
358+
359+
// Act
360+
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager)
361+
.ResumeStreamingAsync(workflow, checkpoint);
362+
363+
List<WorkflowEvent> resumedEvents = await ReadToHaltAsync(resumed);
364+
ExternalRequest replayedRequest = resumedEvents.OfType<RequestInfoEvent>()
365+
.Select(evt => evt.Request)
366+
.Should()
367+
.ContainSingle("resume should replay the request captured after the first fan-in source")
368+
.Subject;
369+
370+
await resumed.SendResponseAsync(replayedRequest.CreateResponse(new ApprovalReply("yes")));
371+
372+
List<WorkflowEvent> completionEvents = await ReadToHaltAsync(resumed);
373+
374+
// Assert
375+
completionEvents.OfType<WorkflowErrorEvent>().Should().BeEmpty(
376+
"resuming across a partially satisfied fan-in barrier should not raise workflow errors");
377+
378+
string[] outputs = [.. completionEvents.OfType<BarrierReleasedEvent>().Select(evt => evt.Source)];
379+
outputs.Should().BeEquivalentTo(["before", "after"],
380+
"the barrier should release the contribution buffered before the checkpoint and the one produced after resume");
381+
382+
RunStatus status = await resumed.GetStatusAsync();
383+
status.Should().Be(RunStatus.Idle,
384+
"the fan-in target should run after the post-resume source contributes");
385+
386+
pendingRequest.RequestId.Should().Be(replayedRequest.RequestId,
387+
"the replayed request should be the one from the checkpointed superstep");
388+
}
389+
324390
/// <summary>
325391
/// Verifies that a resumed parent workflow re-emits pending requests that originated in a subworkflow.
326392
/// </summary>
@@ -484,4 +550,48 @@ private static async ValueTask<List<WorkflowEvent>> ReadToHaltAsync(StreamingRun
484550

485551
return events;
486552
}
553+
554+
private sealed record BarrierContribution(string Source);
555+
556+
private sealed record ApprovalRequest(string Prompt);
557+
558+
private sealed record ApprovalReply(string Value);
559+
560+
private sealed class BarrierReleasedEvent(string source) : WorkflowEvent
561+
{
562+
public string Source { get; } = source;
563+
}
564+
565+
private sealed class PreCheckpointBarrierSource(string id, string requestPortId, string sinkId) : Executor(id)
566+
{
567+
protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder)
568+
=> protocolBuilder.ConfigureRoutes(routeBuilder => routeBuilder.AddHandler<string>(this.HandleAsync))
569+
.SendsMessage<BarrierContribution>()
570+
.SendsMessage<ApprovalRequest>();
571+
572+
private async ValueTask HandleAsync(string input, IWorkflowContext ctx)
573+
{
574+
await ctx.SendMessageAsync(new BarrierContribution("before"), sinkId).ConfigureAwait(false);
575+
await ctx.SendMessageAsync(new ApprovalRequest("continue?"), requestPortId).ConfigureAwait(false);
576+
}
577+
}
578+
579+
private sealed class PostCheckpointBarrierSource(string id, string sinkId) : Executor(id)
580+
{
581+
protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder)
582+
=> protocolBuilder.ConfigureRoutes(routeBuilder => routeBuilder.AddHandler<ApprovalReply>(this.HandleAsync))
583+
.SendsMessage<BarrierContribution>();
584+
585+
private ValueTask HandleAsync(ApprovalReply reply, IWorkflowContext ctx)
586+
=> ctx.SendMessageAsync(new BarrierContribution("after"), sinkId);
587+
}
588+
589+
private sealed class BarrierSink(string id) : Executor(id)
590+
{
591+
protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder)
592+
=> protocolBuilder.ConfigureRoutes(routeBuilder => routeBuilder.AddHandler<BarrierContribution>(this.HandleAsync));
593+
594+
private ValueTask HandleAsync(BarrierContribution contribution, IWorkflowContext ctx)
595+
=> ctx.AddEventAsync(new BarrierReleasedEvent(contribution.Source));
596+
}
487597
}

0 commit comments

Comments
 (0)