-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Expand file tree
/
Copy pathFunctionTriggers.cs
More file actions
335 lines (300 loc) · 13.1 KB
/
Copy pathFunctionTriggers.cs
File metadata and controls
335 lines (300 loc) · 13.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
// Copyright (c) Microsoft. All rights reserved.
using System.Text;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.DurableTask;
using Microsoft.Agents.AI.Hosting.AzureFunctions;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;
namespace ReliableStreaming;
/// <summary>
/// HTTP trigger functions for reliable streaming of durable agent responses.
/// </summary>
/// <remarks>
/// This class exposes two endpoints:
/// <list type="bullet">
/// <item>
/// <term>Create</term>
/// <description>Starts an agent run and streams responses. The response format depends on the
/// <c>Accept</c> header: <c>text/plain</c> returns raw text (ideal for terminals), while
/// <c>text/event-stream</c> or any other value returns Server-Sent Events (SSE).</description>
/// </item>
/// <item>
/// <term>Stream</term>
/// <description>Resumes a stream from a cursor position, enabling reliable message delivery</description>
/// </item>
/// </list>
/// </remarks>
public sealed class FunctionTriggers
{
private readonly RedisStreamResponseHandler _streamHandler;
private readonly ILogger<FunctionTriggers> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="FunctionTriggers"/> class.
/// </summary>
/// <param name="streamHandler">The Redis stream handler for reading/writing agent responses.</param>
/// <param name="logger">The logger instance.</param>
public FunctionTriggers(RedisStreamResponseHandler streamHandler, ILogger<FunctionTriggers> logger)
{
this._streamHandler = streamHandler;
this._logger = logger;
}
/// <summary>
/// Creates a new agent session, starts an agent run with the provided prompt,
/// and streams the response back to the client.
/// </summary>
/// <remarks>
/// <para>
/// The response format depends on the <c>Accept</c> header:
/// <list type="bullet">
/// <item><c>text/plain</c>: Returns raw text output, ideal for terminal display with curl</item>
/// <item><c>text/event-stream</c> or other: Returns Server-Sent Events (SSE) with cursor support</item>
/// </list>
/// </para>
/// <para>
/// The response includes an <c>x-conversation-id</c> header containing the conversation ID.
/// For SSE responses, clients can use this conversation ID to resume the stream if disconnected
/// by calling the <see cref="StreamAsync"/> endpoint with the conversation ID and the last received cursor.
/// </para>
/// <para>
/// Each SSE event contains the following fields:
/// <list type="bullet">
/// <item><c>id</c>: The Redis stream entry ID (use as cursor for resumption)</item>
/// <item><c>event</c>: Either "message" for content or "done" for stream completion</item>
/// <item><c>data</c>: The text content of the response chunk</item>
/// </list>
/// </para>
/// </remarks>
/// <param name="request">The HTTP request containing the prompt in the body.</param>
/// <param name="durableClient">The Durable Task client for signaling agents.</param>
/// <param name="context">The function invocation context.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A streaming response in the format specified by the Accept header.</returns>
[Function(nameof(CreateAsync))]
public async Task<IActionResult> CreateAsync(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "agent/create")] HttpRequest request,
[DurableClient] DurableTaskClient durableClient,
FunctionContext context,
CancellationToken cancellationToken)
{
// Read the prompt from the request body
string prompt = await new StreamReader(request.Body).ReadToEndAsync(cancellationToken);
if (string.IsNullOrWhiteSpace(prompt))
{
return new BadRequestObjectResult("Request body must contain a prompt.");
}
AIAgent agentProxy = durableClient.AsDurableAgentProxy(context, "TravelPlanner");
// Create a new agent session
AgentSession session = await agentProxy.CreateSessionAsync(cancellationToken);
string agentSessionId = session.GetService<AgentSessionId>().ToString();
this._logger.LogInformation("Creating new agent session: {AgentSessionId}", agentSessionId);
// Run the agent in the background (fire-and-forget)
DurableAgentRunOptions options = new() { IsFireAndForget = true };
await agentProxy.RunAsync(prompt, session, options, cancellationToken);
this._logger.LogInformation("Agent run started for session: {AgentSessionId}", agentSessionId);
// Check Accept header to determine response format
// text/plain = raw text output (ideal for terminals)
// text/event-stream or other = SSE format (supports resumption)
string? acceptHeader = request.Headers.Accept.FirstOrDefault();
bool useSseFormat = acceptHeader?.Contains("text/plain", StringComparison.OrdinalIgnoreCase) != true;
return await this.StreamToClientAsync(
conversationId: agentSessionId, cursor: null, useSseFormat, request.HttpContext, cancellationToken);
}
/// <summary>
/// Resumes streaming from a specific cursor position for an existing session.
/// </summary>
/// <remarks>
/// <para>
/// Use this endpoint to resume a stream after disconnection. Pass the conversation ID
/// (from the <c>x-conversation-id</c> response header) and the last received cursor
/// (Redis stream entry ID) to continue from where you left off.
/// </para>
/// <para>
/// If no cursor is provided, streaming starts from the beginning of the stream.
/// This allows clients to replay the entire response if needed.
/// </para>
/// <para>
/// The response format depends on the <c>Accept</c> header:
/// <list type="bullet">
/// <item><c>text/plain</c>: Returns raw text output, ideal for terminal display with curl</item>
/// <item><c>text/event-stream</c> or other: Returns Server-Sent Events (SSE) with cursor support</item>
/// </list>
/// </para>
/// </remarks>
/// <param name="request">The HTTP request. Use the <c>cursor</c> query parameter to specify the cursor position.</param>
/// <param name="conversationId">The conversation ID to stream from.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A streaming response in the format specified by the Accept header.</returns>
[Function(nameof(StreamAsync))]
public async Task<IActionResult> StreamAsync(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "agent/stream/{conversationId}")] HttpRequest request,
string conversationId,
CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(conversationId))
{
return new BadRequestObjectResult("Conversation ID is required.");
}
// Get the cursor from query string (optional)
string? cursor = request.Query["cursor"].FirstOrDefault();
this._logger.LogInformation(
"Resuming stream for conversation {ConversationId} from cursor: {Cursor}",
SanitizeLogValue(conversationId),
SanitizeLogValue(cursor) ?? "(beginning)");
// Check Accept header to determine response format
// text/plain = raw text output (ideal for terminals)
// text/event-stream or other = SSE format (supports cursor-based resumption)
string? acceptHeader = request.Headers.Accept.FirstOrDefault();
bool useSseFormat = acceptHeader?.Contains("text/plain", StringComparison.OrdinalIgnoreCase) != true;
return await this.StreamToClientAsync(conversationId, cursor, useSseFormat, request.HttpContext, cancellationToken);
}
/// <summary>
/// Streams chunks from the Redis stream to the HTTP response.
/// </summary>
/// <param name="conversationId">The conversation ID to stream from.</param>
/// <param name="cursor">Optional cursor to resume from. If null, streams from the beginning.</param>
/// <param name="useSseFormat">True to use SSE format, false for plain text.</param>
/// <param name="httpContext">The HTTP context for writing the response.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>An empty result after streaming completes.</returns>
private async Task<IActionResult> StreamToClientAsync(
string conversationId,
string? cursor,
bool useSseFormat,
HttpContext httpContext,
CancellationToken cancellationToken)
{
// Set response headers based on format
httpContext.Response.Headers.ContentType = useSseFormat
? "text/event-stream"
: "text/plain; charset=utf-8";
httpContext.Response.Headers.CacheControl = "no-cache";
httpContext.Response.Headers.Connection = "keep-alive";
httpContext.Response.Headers["x-conversation-id"] = conversationId;
// Disable response buffering if supported
httpContext.Features.Get<IHttpResponseBodyFeature>()?.DisableBuffering();
try
{
await foreach (StreamChunk chunk in this._streamHandler.ReadStreamAsync(
conversationId,
cursor,
cancellationToken))
{
if (chunk.Error != null)
{
this._logger.LogWarning("Stream error for conversation {ConversationId}: {Error}", SanitizeLogValue(conversationId), chunk.Error);
await WriteErrorAsync(httpContext.Response, chunk.Error, useSseFormat, cancellationToken);
break;
}
if (chunk.IsDone)
{
await WriteEndOfStreamAsync(httpContext.Response, chunk.EntryId, useSseFormat, cancellationToken);
break;
}
if (chunk.Text != null)
{
await WriteChunkAsync(httpContext.Response, chunk, useSseFormat, cancellationToken);
}
}
}
catch (OperationCanceledException)
{
this._logger.LogInformation("Client disconnected from stream {ConversationId}", SanitizeLogValue(conversationId));
}
return new EmptyResult();
}
/// <summary>
/// Writes a text chunk to the response.
/// </summary>
private static async Task WriteChunkAsync(
HttpResponse response,
StreamChunk chunk,
bool useSseFormat,
CancellationToken cancellationToken)
{
if (useSseFormat)
{
await WriteSSEEventAsync(response, "message", chunk.Text!, chunk.EntryId);
}
else
{
await response.WriteAsync(chunk.Text!, cancellationToken);
}
await response.Body.FlushAsync(cancellationToken);
}
/// <summary>
/// Writes an end-of-stream marker to the response.
/// </summary>
private static async Task WriteEndOfStreamAsync(
HttpResponse response,
string entryId,
bool useSseFormat,
CancellationToken cancellationToken)
{
if (useSseFormat)
{
await WriteSSEEventAsync(response, "done", "[DONE]", entryId);
}
else
{
await response.WriteAsync("\n", cancellationToken);
}
await response.Body.FlushAsync(cancellationToken);
}
/// <summary>
/// Writes an error message to the response.
/// </summary>
private static async Task WriteErrorAsync(
HttpResponse response,
string error,
bool useSseFormat,
CancellationToken cancellationToken)
{
if (useSseFormat)
{
await WriteSSEEventAsync(response, "error", error, null);
}
else
{
await response.WriteAsync($"\n[Error: {error}]\n", cancellationToken);
}
await response.Body.FlushAsync(cancellationToken);
}
/// <summary>
/// Writes a Server-Sent Event to the response stream.
/// </summary>
private static async Task WriteSSEEventAsync(
HttpResponse response,
string eventType,
string data,
string? id)
{
StringBuilder sb = new();
// Include the ID if provided (used as cursor for resumption)
if (!string.IsNullOrEmpty(id))
{
sb.AppendLine($"id: {id}");
}
sb.AppendLine($"event: {eventType}");
sb.AppendLine($"data: {data}");
sb.AppendLine(); // Empty line marks end of event
await response.WriteAsync(sb.ToString());
}
/// <summary>
/// Sanitizes a user-provided value for safe inclusion in log entries
/// by removing control characters that could be used for log forging.
/// </summary>
private static string? SanitizeLogValue(string? value)
{
if (value is null)
{
return null;
}
return value
.Replace("\r", string.Empty, StringComparison.Ordinal)
.Replace("\n", string.Empty, StringComparison.Ordinal);
}
}