Skip to content

Commit 5777ed2

Browse files
authored
.NET: fix: Add session support for Handoff-hosted Agents (#5280)
* fix: Add session support for Handoff-hosted Agents In order to better support using `Workflows` hosted as `AIAgents` inside of Handoff workflows, we need to make proper use of AgentSession. This causes potential issues around checkpointing and making sure that we properly compute only the new incoming messages for each agent invocation. * fix: AgentSession checkpointing using AIAgent's Serialize/Deserialize methods We cannot rely on implicit serialization through `HandoffHostState` because we are missing type information. * fix: Thread safety issue in `MultiPartyConversation.AllMessages` * fix: Enable unwrapping of FunctionResultContent when ExternalRequest was wrapped into FunctionCallContent
1 parent 52303a8 commit 5777ed2

13 files changed

Lines changed: 710 additions & 317 deletions

dotnet/src/Microsoft.Agents.AI.Workflows/AIAgentsAbstractionsExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ private static ChatMessage ChatAssistantToUserIfNotFromNamed(this ChatMessage me
4848
/// any that have a different <see cref="ChatMessage.AuthorName"/> from <paramref name="targetAgentName"/> to
4949
/// <see cref="ChatRole.User"/>.
5050
/// </summary>
51-
public static List<ChatMessage>? ChangeAssistantToUserForOtherParticipants(this List<ChatMessage> messages, string targetAgentName)
51+
public static List<ChatMessage>? ChangeAssistantToUserForOtherParticipants(this IEnumerable<ChatMessage> messages, string targetAgentName)
5252
{
5353
List<ChatMessage>? roleChanged = null;
5454
foreach (var m in messages)

dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffAgentExecutor.cs

Lines changed: 166 additions & 199 deletions
Large diffs are not rendered by default.

dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffEndExecutor.cs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
// Copyright (c) Microsoft. All rights reserved.
22

3+
using System;
34
using System.Collections.Generic;
45
using System.Threading;
56
using System.Threading.Tasks;
@@ -12,23 +13,33 @@ internal sealed class HandoffEndExecutor(bool returnToPrevious) : Executor(Execu
1213
{
1314
public const string ExecutorId = "HandoffEnd";
1415

16+
private readonly StateRef<HandoffSharedState> _sharedStateRef = new(HandoffConstants.HandoffSharedStateKey,
17+
HandoffConstants.HandoffSharedStateScope);
18+
1519
protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBuilder) =>
16-
protocolBuilder.ConfigureRoutes(routeBuilder => routeBuilder.AddHandler<HandoffState>((handoff, context, cancellationToken) =>
17-
this.HandleAsync(handoff, context, cancellationToken)))
20+
protocolBuilder.ConfigureRoutes(routeBuilder => routeBuilder.AddHandler<HandoffState>(
21+
(handoff, context, cancellationToken) => this.HandleAsync(handoff, context, cancellationToken)))
1822
.YieldsOutput<List<ChatMessage>>();
1923

2024
private async ValueTask HandleAsync(HandoffState handoff, IWorkflowContext context, CancellationToken cancellationToken)
2125
{
22-
if (returnToPrevious)
23-
{
24-
await context.QueueStateUpdateAsync<string?>(HandoffConstants.PreviousAgentTrackerKey,
25-
handoff.PreviousAgentId,
26-
HandoffConstants.PreviousAgentTrackerScope,
27-
cancellationToken)
28-
.ConfigureAwait(false);
29-
}
30-
31-
await context.YieldOutputAsync(handoff.Messages, cancellationToken).ConfigureAwait(false);
26+
await this._sharedStateRef.InvokeWithStateAsync(
27+
async (HandoffSharedState? sharedState, IWorkflowContext context, CancellationToken cancellationToken) =>
28+
{
29+
if (sharedState == null)
30+
{
31+
throw new InvalidOperationException("Handoff Orchestration shared state was not properly initialized.");
32+
}
33+
34+
if (returnToPrevious)
35+
{
36+
sharedState.PreviousAgentId = handoff.PreviousAgentId;
37+
}
38+
39+
await context.YieldOutputAsync(sharedState.Conversation.CloneAllMessages(), cancellationToken).ConfigureAwait(false);
40+
41+
return sharedState;
42+
}, context, cancellationToken).ConfigureAwait(false);
3243
}
3344

3445
public ValueTask ResetAsync() => default;
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Diagnostics.CodeAnalysis;
6+
using System.Linq;
7+
using Microsoft.Extensions.AI;
8+
9+
namespace Microsoft.Agents.AI.Workflows.Specialized;
10+
11+
[Experimental(DiagnosticConstants.ExperimentalFeatureDiagnostic)]
12+
internal sealed class HandoffMessagesFilter
13+
{
14+
private readonly HandoffToolCallFilteringBehavior _filteringBehavior;
15+
16+
public HandoffMessagesFilter(HandoffToolCallFilteringBehavior filteringBehavior)
17+
{
18+
this._filteringBehavior = filteringBehavior;
19+
}
20+
21+
[Experimental(DiagnosticConstants.ExperimentalFeatureDiagnostic)]
22+
internal static bool IsHandoffFunctionName(string name)
23+
{
24+
return name.StartsWith(HandoffWorkflowBuilder.FunctionPrefix, StringComparison.Ordinal);
25+
}
26+
27+
public IEnumerable<ChatMessage> FilterMessages(IEnumerable<ChatMessage> messages)
28+
{
29+
if (this._filteringBehavior == HandoffToolCallFilteringBehavior.None)
30+
{
31+
return messages;
32+
}
33+
34+
Dictionary<string, FilterCandidateState> filteringCandidates = new();
35+
List<ChatMessage> filteredMessages = [];
36+
HashSet<int> messagesToRemove = [];
37+
38+
bool filterHandoffOnly = this._filteringBehavior == HandoffToolCallFilteringBehavior.HandoffOnly;
39+
foreach (ChatMessage unfilteredMessage in messages)
40+
{
41+
ChatMessage filteredMessage = unfilteredMessage.Clone();
42+
43+
// .Clone() is shallow, so we cannot modify the contents of the cloned message in place.
44+
List<AIContent> contents = [];
45+
contents.Capacity = unfilteredMessage.Contents?.Count ?? 0;
46+
filteredMessage.Contents = contents;
47+
48+
// Because this runs after the role changes from assistant to user for the target agent, we cannot rely on tool calls
49+
// originating only from messages with the Assistant role. Instead, we need to inspect the contents of all non-Tool (result)
50+
// FunctionCallContent.
51+
if (unfilteredMessage.Role != ChatRole.Tool)
52+
{
53+
for (int i = 0; i < unfilteredMessage.Contents!.Count; i++)
54+
{
55+
AIContent content = unfilteredMessage.Contents[i];
56+
if (content is not FunctionCallContent fcc || (filterHandoffOnly && !IsHandoffFunctionName(fcc.Name)))
57+
{
58+
filteredMessage.Contents.Add(content);
59+
60+
// Track non-handoff function calls so their tool results are preserved in HandoffOnly mode
61+
if (filterHandoffOnly && content is FunctionCallContent nonHandoffFcc)
62+
{
63+
filteringCandidates[nonHandoffFcc.CallId] = new FilterCandidateState(nonHandoffFcc.CallId)
64+
{
65+
IsHandoffFunction = false,
66+
};
67+
}
68+
}
69+
else if (filterHandoffOnly)
70+
{
71+
if (!filteringCandidates.TryGetValue(fcc.CallId, out FilterCandidateState? candidateState))
72+
{
73+
filteringCandidates[fcc.CallId] = new FilterCandidateState(fcc.CallId)
74+
{
75+
IsHandoffFunction = true,
76+
};
77+
}
78+
else
79+
{
80+
candidateState.IsHandoffFunction = true;
81+
(int messageIndex, int contentIndex) = candidateState.FunctionCallResultLocation!.Value;
82+
ChatMessage messageToFilter = filteredMessages[messageIndex];
83+
messageToFilter.Contents.RemoveAt(contentIndex);
84+
if (messageToFilter.Contents.Count == 0)
85+
{
86+
messagesToRemove.Add(messageIndex);
87+
}
88+
}
89+
}
90+
else
91+
{
92+
// All mode: strip all FunctionCallContent
93+
}
94+
}
95+
}
96+
else
97+
{
98+
if (!filterHandoffOnly)
99+
{
100+
continue;
101+
}
102+
103+
for (int i = 0; i < unfilteredMessage.Contents!.Count; i++)
104+
{
105+
AIContent content = unfilteredMessage.Contents[i];
106+
if (content is not FunctionResultContent frc
107+
|| (filteringCandidates.TryGetValue(frc.CallId, out FilterCandidateState? candidateState)
108+
&& candidateState.IsHandoffFunction is false))
109+
{
110+
// Either this is not a function result content, so we should let it through, or it is a FRC that
111+
// we know is not related to a handoff call. In either case, we should include it.
112+
filteredMessage.Contents.Add(content);
113+
}
114+
else if (candidateState is null)
115+
{
116+
// We haven't seen the corresponding function call yet, so add it as a candidate to be filtered later
117+
filteringCandidates[frc.CallId] = new FilterCandidateState(frc.CallId)
118+
{
119+
FunctionCallResultLocation = (filteredMessages.Count, filteredMessage.Contents.Count),
120+
};
121+
}
122+
// else we have seen the corresponding function call and it is a handoff, so we should filter it out.
123+
}
124+
}
125+
126+
if (filteredMessage.Contents.Count > 0)
127+
{
128+
filteredMessages.Add(filteredMessage);
129+
}
130+
}
131+
132+
return filteredMessages.Where((_, index) => !messagesToRemove.Contains(index));
133+
}
134+
135+
private class FilterCandidateState(string callId)
136+
{
137+
public (int MessageIndex, int ContentIndex)? FunctionCallResultLocation { get; set; }
138+
139+
public string CallId => callId;
140+
141+
public bool? IsHandoffFunction { get; set; }
142+
}
143+
}

dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/HandoffStartExecutor.cs

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,23 @@ namespace Microsoft.Agents.AI.Workflows.Specialized;
99

1010
internal static class HandoffConstants
1111
{
12+
internal const string HandoffOrchestrationSharedScope = "HandoffOrchestration";
13+
1214
internal const string PreviousAgentTrackerKey = "LastAgentId";
13-
internal const string PreviousAgentTrackerScope = "HandoffOrchestration";
15+
internal const string PreviousAgentTrackerScope = HandoffOrchestrationSharedScope;
16+
17+
internal const string MultiPartyConversationKey = "MultiPartyConversation";
18+
internal const string MultiPartyConversationScope = HandoffOrchestrationSharedScope;
19+
20+
internal const string HandoffSharedStateKey = "SharedState";
21+
internal const string HandoffSharedStateScope = HandoffOrchestrationSharedScope;
22+
}
23+
24+
internal sealed class HandoffSharedState
25+
{
26+
public MultiPartyConversation Conversation { get; } = new();
27+
28+
public string? PreviousAgentId { get; set; }
1429
}
1530

1631
/// <summary>Executor used at the start of a handoffs workflow to accumulate messages and emit them as HandoffState upon receiving a turn token.</summary>
@@ -29,23 +44,25 @@ protected override ProtocolBuilder ConfigureProtocol(ProtocolBuilder protocolBui
2944

3045
protected override ValueTask TakeTurnAsync(List<ChatMessage> messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default)
3146
{
32-
if (returnToPrevious)
33-
{
34-
return context.InvokeWithStateAsync(
35-
async (string? previousAgentId, IWorkflowContext context, CancellationToken cancellationToken) =>
36-
{
37-
HandoffState handoffState = new(new(emitEvents), null, messages, previousAgentId);
38-
await context.SendMessageAsync(handoffState, cancellationToken).ConfigureAwait(false);
39-
40-
return previousAgentId;
41-
},
42-
HandoffConstants.PreviousAgentTrackerKey,
43-
HandoffConstants.PreviousAgentTrackerScope,
44-
cancellationToken);
45-
}
46-
47-
HandoffState handoff = new(new(emitEvents), null, messages);
48-
return context.SendMessageAsync(handoff, cancellationToken);
47+
return context.InvokeWithStateAsync(
48+
async (HandoffSharedState? sharedState, IWorkflowContext context, CancellationToken cancellationToken) =>
49+
{
50+
sharedState ??= new HandoffSharedState();
51+
sharedState.Conversation.AddMessages(messages);
52+
53+
string? previousAgentId = sharedState.PreviousAgentId;
54+
55+
// If we are configured to return to the previous agent, include the previous agent id in the handoff state.
56+
// If there was no previousAgent, it will still be null.
57+
HandoffState turnState = new(new(emitEvents), null, returnToPrevious ? previousAgentId : null);
58+
59+
await context.SendMessageAsync(turnState, cancellationToken).ConfigureAwait(false);
60+
61+
return sharedState;
62+
},
63+
HandoffConstants.HandoffSharedStateKey,
64+
HandoffConstants.HandoffSharedStateScope,
65+
cancellationToken);
4966
}
5067

5168
public new ValueTask ResetAsync() => base.ResetAsync();
Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
// Copyright (c) Microsoft. All rights reserved.
22

3-
using System.Collections.Generic;
4-
using Microsoft.Extensions.AI;
5-
63
namespace Microsoft.Agents.AI.Workflows.Specialized;
74

85
internal sealed record class HandoffState(
96
TurnToken TurnToken,
107
string? RequestedHandoffTargetAgentId,
11-
List<ChatMessage> Messages,
128
string? PreviousAgentId = null);
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Linq;
6+
using Microsoft.Extensions.AI;
7+
8+
namespace Microsoft.Agents.AI.Workflows.Specialized;
9+
10+
internal sealed class MultiPartyConversation
11+
{
12+
private readonly List<ChatMessage> _history = [];
13+
private readonly object _mutex = new();
14+
15+
public List<ChatMessage> CloneAllMessages()
16+
{
17+
lock (this._mutex)
18+
{
19+
return this._history.ToList();
20+
}
21+
}
22+
23+
public (ChatMessage[], int) CollectNewMessages(int bookmark)
24+
{
25+
lock (this._mutex)
26+
{
27+
int count = this._history.Count - bookmark;
28+
if (count < 0)
29+
{
30+
throw new InvalidOperationException($"Bookmark value too large: {bookmark} vs count={count}");
31+
}
32+
33+
return (this._history.Skip(bookmark).ToArray(), this.CurrentBookmark);
34+
}
35+
}
36+
37+
private int CurrentBookmark => this._history.Count;
38+
39+
public int AddMessages(IEnumerable<ChatMessage> messages)
40+
{
41+
lock (this._mutex)
42+
{
43+
this._history.AddRange(messages);
44+
return this.CurrentBookmark;
45+
}
46+
}
47+
48+
public int AddMessage(ChatMessage message)
49+
{
50+
lock (this._mutex)
51+
{
52+
this._history.Add(message);
53+
return this.CurrentBookmark;
54+
}
55+
}
56+
}

dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowHostingExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ internal static FunctionCallContent ToFunctionCall(this ExternalRequest request)
4141
{
4242
Dictionary<string, object?> parameters = new()
4343
{
44-
{ "data", request.Data}
44+
{ "data", request.Data }
4545
};
4646

4747
return new FunctionCallContent(request.RequestId, request.PortInfo.PortId, parameters);

0 commit comments

Comments
 (0)