Skip to content

Commit aa2ff67

Browse files
authored
.NET: Fix to emit WorkflowStartedEvent during workflow execution (#4514)
* Fix bug to emit WorkflowStartedEvent during workflow execution * Updated based on PR comments
1 parent bcb55b4 commit aa2ff67

3 files changed

Lines changed: 79 additions & 2 deletions

File tree

dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ public async IAsyncEnumerable<WorkflowEvent> TakeEventStreamAsync(bool blockOnPe
7272
this.RunStatus = RunStatus.Running;
7373
runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted));
7474

75+
// Emit WorkflowStartedEvent to the event stream for consumers
76+
eventSink.Enqueue(new WorkflowStartedEvent());
77+
7578
do
7679
{
7780
while (this._stepRunner.HasUnprocessedMessages &&

dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,16 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)
8888

8989
// Run all available supersteps continuously
9090
// Events are streamed out in real-time as they happen via the event handler
91-
while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested)
91+
if (this._stepRunner.HasUnprocessedMessages)
9292
{
93-
await this._stepRunner.RunSuperStepAsync(linkedSource.Token).ConfigureAwait(false);
93+
// Emit WorkflowStartedEvent only when there's actual work to process
94+
// This avoids spurious events on timeout-only loop iterations
95+
await this._eventChannel.Writer.WriteAsync(new WorkflowStartedEvent(), linkedSource.Token).ConfigureAwait(false);
96+
97+
while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested)
98+
{
99+
await this._stepRunner.RunSuperStepAsync(linkedSource.Token).ConfigureAwait(false);
100+
}
94101
}
95102

96103
// Update status based on what's waiting

dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AgentEventsTests.cs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
// Copyright (c) Microsoft. All rights reserved.
22

33
using System.Collections.Generic;
4+
using System.Linq;
45
using System.Threading.Tasks;
6+
using FluentAssertions;
57
using Microsoft.Extensions.AI;
68

79
namespace Microsoft.Agents.AI.Workflows.UnitTests;
@@ -88,4 +90,69 @@ public void AgentResponseEvent_IsWorkflowOutputEvent()
8890
Assert.Same(response, evt.Response);
8991
Assert.Same(response, evt.Data);
9092
}
93+
94+
/// <summary>
95+
/// Verifies that WorkflowStartedEvent is emitted first before any SuperStepStartedEvent.
96+
/// </summary>
97+
[Fact]
98+
public async Task StreamingRun_WorkflowStartedEvent_ShouldBeEmittedBefore_SuperStepStartedAsync()
99+
{
100+
// Arrange
101+
TestEchoAgent agent = new("test-agent");
102+
Workflow workflow = AgentWorkflowBuilder.BuildSequential(agent);
103+
ChatMessage inputMessage = new(ChatRole.User, "Hello");
104+
105+
// Act
106+
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, new List<ChatMessage> { inputMessage });
107+
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
108+
109+
List<WorkflowEvent> events = [];
110+
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
111+
{
112+
events.Add(evt);
113+
}
114+
115+
// Assert
116+
events.Should().NotBeEmpty();
117+
118+
List<WorkflowStartedEvent> startedEvents = events.OfType<WorkflowStartedEvent>().ToList();
119+
startedEvents.Should().NotBeEmpty();
120+
121+
WorkflowStartedEvent? firstStartedEvent = startedEvents.FirstOrDefault();
122+
SuperStepStartedEvent? firstSuperStepEvent = events.OfType<SuperStepStartedEvent>().FirstOrDefault();
123+
firstSuperStepEvent.Should().NotBeNull();
124+
125+
int startedIndex = events.IndexOf(firstStartedEvent!);
126+
int superStepIndex = events.IndexOf(firstSuperStepEvent!);
127+
128+
startedIndex.Should().BeLessThan(superStepIndex);
129+
}
130+
131+
/// <summary>
132+
/// Verifies that WorkflowStartedEvent is emitted using Lockstep execution mode.
133+
/// </summary>
134+
[Fact]
135+
public async Task StreamingRun_LockstepExecution_ShouldEmit_WorkflowStartedEventAsync()
136+
{
137+
// Arrange
138+
TestEchoAgent agent = new("test-agent");
139+
Workflow workflow = AgentWorkflowBuilder.BuildSequential(agent);
140+
ChatMessage inputMessage = new(ChatRole.User, "Hello");
141+
142+
// Act: Use Lockstep execution mode
143+
await using StreamingRun run = await InProcessExecution.Lockstep.RunStreamingAsync(workflow, new List<ChatMessage> { inputMessage });
144+
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
145+
146+
List<WorkflowEvent> events = [];
147+
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
148+
{
149+
events.Add(evt);
150+
}
151+
152+
// Assert
153+
events.Should().NotBeEmpty();
154+
155+
List<WorkflowStartedEvent> startedEvents = events.OfType<WorkflowStartedEvent>().ToList();
156+
startedEvents.Should().NotBeEmpty();
157+
}
91158
}

0 commit comments

Comments
 (0)