forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathProgram.cs
More file actions
134 lines (116 loc) · 5.47 KB
/
Copy pathProgram.cs
File metadata and controls
134 lines (116 loc) · 5.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright (c) Microsoft. All rights reserved.
using Microsoft.Agents.AI.Workflows;
namespace WorkflowSharedStatesSample;
/// <summary>
/// This sample introduces the concept of shared states within a workflow.
/// It demonstrates how multiple executors can read from and write to shared states,
/// allowing for more complex data sharing and coordination between tasks.
/// </summary>
/// <remarks>
/// Pre-requisites:
/// - Foundational samples should be completed first.
/// - This sample also uses the fan-out and fan-in patterns to achieve parallel processing.
/// </remarks>
public static class Program
{
private static async Task Main()
{
// Create the executors
var fileRead = new FileReadExecutor();
var wordCount = new WordCountingExecutor();
var paragraphCount = new ParagraphCountingExecutor();
var aggregate = new AggregationExecutor();
// Build the workflow by connecting executors sequentially
var workflow = new WorkflowBuilder(fileRead)
.AddFanOutEdge(fileRead, [wordCount, paragraphCount])
.AddFanInBarrierEdge([wordCount, paragraphCount], aggregate)
.WithOutputFrom(aggregate)
.Build();
// Execute the workflow with input data
await using Run run = await InProcessExecution.RunAsync(workflow, "Lorem_Ipsum.txt");
foreach (WorkflowEvent evt in run.NewEvents)
{
if (evt is WorkflowOutputEvent outputEvent)
{
Console.WriteLine(outputEvent.Data);
}
else if (evt is WorkflowErrorEvent workflowError)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine(workflowError.Exception?.ToString() ?? "Unknown workflow error occurred.");
Console.ResetColor();
}
else if (evt is ExecutorFailedEvent executorFailed)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Executor '{executorFailed.ExecutorId}' failed with {(executorFailed.Data == null ? "unknown error" : $"exception {executorFailed.Data}")}.");
Console.ResetColor();
}
}
}
}
/// <summary>
/// Constants for shared state scopes.
/// </summary>
internal static class FileContentStateConstants
{
public const string FileContentStateScope = "FileContentState";
}
internal sealed class FileReadExecutor() : Executor<string, string>("FileReadExecutor")
{
public override async ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Read file content from embedded resource
string fileContent = Resources.Read(message);
// Store file content in a shared state for access by other executors
string fileID = Guid.NewGuid().ToString("N");
await context.QueueStateUpdateAsync(fileID, fileContent, scopeName: FileContentStateConstants.FileContentStateScope, cancellationToken);
return fileID;
}
}
internal sealed class FileStats
{
public int ParagraphCount { get; set; }
public int WordCount { get; set; }
}
internal sealed class WordCountingExecutor() : Executor<string, FileStats>("WordCountingExecutor")
{
public override async ValueTask<FileStats> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Retrieve the file content from the shared state
var fileContent = await context.ReadStateAsync<string>(message, scopeName: FileContentStateConstants.FileContentStateScope, cancellationToken)
?? throw new InvalidOperationException("File content state not found");
int wordCount = fileContent.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
return new FileStats { WordCount = wordCount };
}
}
internal sealed class ParagraphCountingExecutor() : Executor<string, FileStats>("ParagraphCountingExecutor")
{
public override async ValueTask<FileStats> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Retrieve the file content from the shared state
var fileContent = await context.ReadStateAsync<string>(message, scopeName: FileContentStateConstants.FileContentStateScope, cancellationToken)
?? throw new InvalidOperationException("File content state not found");
int paragraphCount = fileContent.Split(['\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
return new FileStats { ParagraphCount = paragraphCount };
}
}
/// <summary>
/// The aggregation executor collects results from both executors and yields the final output.
/// </summary>
[YieldsOutput(typeof(string))]
internal sealed class AggregationExecutor() : Executor<FileStats>("AggregationExecutor")
{
private readonly List<FileStats> _messages = [];
public override async ValueTask HandleAsync(FileStats message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.Add(message);
if (this._messages.Count == 2)
{
// Aggregate the results from both executors
var totalParagraphCount = this._messages.Sum(m => m.ParagraphCount);
var totalWordCount = this._messages.Sum(m => m.WordCount);
await context.YieldOutputAsync($"Total Paragraphs: {totalParagraphCount}, Total Words: {totalWordCount}", cancellationToken);
}
}
}