-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathtask-orchestrator.ts
More file actions
320 lines (285 loc) · 11.4 KB
/
task-orchestrator.ts
File metadata and controls
320 lines (285 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
/**
* MIT No Attribution
*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* the Software without restriction, including without limitation the rights to
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
* the Software, and to permit persons to whom the Software is furnished to do so.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import * as path from 'path';
import { Duration, Stack } from 'aws-cdk-lib';
import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as iam from 'aws-cdk-lib/aws-iam';
import { Runtime, Architecture } from 'aws-cdk-lib/aws-lambda';
import * as lambda from 'aws-cdk-lib/aws-lambda-nodejs';
import * as secretsmanager from 'aws-cdk-lib/aws-secretsmanager';
import { NagSuppressions } from 'cdk-nag';
import { Construct } from 'constructs';
/**
* Properties for TaskOrchestrator construct.
*/
export interface TaskOrchestratorProps {
/**
* The DynamoDB task table.
*/
readonly taskTable: dynamodb.ITable;
/**
* The DynamoDB task events table.
*/
readonly taskEventsTable: dynamodb.ITable;
/**
* The DynamoDB user concurrency table.
*/
readonly userConcurrencyTable: dynamodb.ITable;
/**
* ARN of the AgentCore runtime.
*/
readonly runtimeArn: string;
/**
* The DynamoDB repo config table. When provided, the orchestrator loads
* per-repo blueprint configuration at the start of each task.
*/
readonly repoTable?: dynamodb.ITable;
/**
* Maximum concurrent tasks per user.
* @default 3
*/
readonly maxConcurrentTasksPerUser?: number;
/**
* Number of days to retain completed task and event records before DynamoDB TTL deletes them.
* @default 90
*/
readonly taskRetentionDays?: number;
/**
* ARN of the Secrets Manager secret containing the GitHub token.
* When provided, the orchestrator fetches issue context from GitHub during hydration.
*/
readonly githubTokenSecretArn?: string;
/**
* Additional AgentCore runtime ARNs the orchestrator may invoke.
* Required when Blueprints specify per-repo runtime ARN overrides.
*/
readonly additionalRuntimeArns?: string[];
/**
* Additional Secrets Manager ARNs the orchestrator may read.
* Required when Blueprints specify per-repo GitHub token secrets.
*/
readonly additionalSecretArns?: string[];
/**
* Maximum token budget for the assembled user prompt.
* @default 100000
*/
readonly userPromptTokenBudget?: number;
/**
* AgentCore Memory resource ID for cross-task learning.
* When provided, the orchestrator reads memory context during hydration
* and writes fallback episodes during finalization.
*/
readonly memoryId?: string;
/**
* Bedrock Guardrail ID used by the orchestrator to screen assembled PR prompts
* for prompt injection during context hydration. The same guardrail is also
* used by the Task API for submission-time task description screening.
*/
readonly guardrailId?: string;
/**
* Bedrock Guardrail version. Required when guardrailId is provided.
*/
readonly guardrailVersion?: string;
/**
* ECS Fargate compute strategy configuration.
* When provided, ECS-related env vars and IAM policies are added to the orchestrator.
* All fields are required — this makes the all-or-nothing constraint self-evident at the type level.
*/
readonly ecsConfig?: {
readonly clusterArn: string;
readonly taskDefinitionArn: string;
readonly subnets: string;
readonly securityGroup: string;
readonly containerName: string;
readonly taskRoleArn: string;
readonly executionRoleArn: string;
};
}
/**
* CDK construct that creates the orchestrator Lambda function with durable execution
* for managing the task lifecycle (admission → hydration → session → poll → finalize).
*/
export class TaskOrchestrator extends Construct {
/**
* The orchestrator Lambda function.
*/
public readonly fn: lambda.NodejsFunction;
/**
* The Lambda alias (required for durable function invocation).
*/
public readonly alias: iam.IGrantable & { functionArn: string };
/**
* CloudWatch alarm that fires when the orchestrator Lambda errors exceed threshold.
*/
public readonly errorAlarm: cloudwatch.Alarm;
constructor(scope: Construct, id: string, props: TaskOrchestratorProps) {
super(scope, id);
if (props.guardrailId && !props.guardrailVersion) {
throw new Error('guardrailVersion is required when guardrailId is provided');
}
if (!props.guardrailId && props.guardrailVersion) {
throw new Error('guardrailId is required when guardrailVersion is provided');
}
const handlersDir = path.join(__dirname, '..', 'handlers');
const maxConcurrent = props.maxConcurrentTasksPerUser ?? 3;
this.fn = new lambda.NodejsFunction(this, 'OrchestratorFn', {
entry: path.join(handlersDir, 'orchestrate-task.ts'),
handler: 'handler',
runtime: Runtime.NODEJS_24_X,
architecture: Architecture.ARM_64,
timeout: Duration.seconds(60),
memorySize: 256,
durableConfig: {
executionTimeout: Duration.hours(9),
retentionPeriod: Duration.days(14),
},
environment: {
TASK_TABLE_NAME: props.taskTable.tableName,
TASK_EVENTS_TABLE_NAME: props.taskEventsTable.tableName,
USER_CONCURRENCY_TABLE_NAME: props.userConcurrencyTable.tableName,
RUNTIME_ARN: props.runtimeArn,
MAX_CONCURRENT_TASKS_PER_USER: String(maxConcurrent),
TASK_RETENTION_DAYS: String(props.taskRetentionDays ?? 90),
...(props.repoTable && { REPO_TABLE_NAME: props.repoTable.tableName }),
...(props.githubTokenSecretArn && { GITHUB_TOKEN_SECRET_ARN: props.githubTokenSecretArn }),
...(props.userPromptTokenBudget !== undefined && {
USER_PROMPT_TOKEN_BUDGET: String(props.userPromptTokenBudget),
}),
...(props.memoryId && { MEMORY_ID: props.memoryId }),
...(props.guardrailId && { GUARDRAIL_ID: props.guardrailId }),
...(props.guardrailVersion && { GUARDRAIL_VERSION: props.guardrailVersion }),
...(props.ecsConfig && {
ECS_CLUSTER_ARN: props.ecsConfig.clusterArn,
ECS_TASK_DEFINITION_ARN: props.ecsConfig.taskDefinitionArn,
ECS_SUBNETS: props.ecsConfig.subnets,
ECS_SECURITY_GROUP: props.ecsConfig.securityGroup,
ECS_CONTAINER_NAME: props.ecsConfig.containerName,
}),
},
bundling: {
externalModules: ['@aws-sdk/*'],
},
});
// DynamoDB grants
props.taskTable.grantReadWriteData(this.fn);
props.taskEventsTable.grantReadWriteData(this.fn);
props.userConcurrencyTable.grantReadWriteData(this.fn);
if (props.repoTable) {
props.repoTable.grantReadData(this.fn);
}
// Durable execution managed policy
this.fn.role!.addManagedPolicy(
iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaBasicDurableExecutionRolePolicy'),
);
// Secrets Manager grant for GitHub token (context hydration)
if (props.githubTokenSecretArn) {
const githubTokenSecret = secretsmanager.Secret.fromSecretCompleteArn(
this, 'GitHubTokenSecret', props.githubTokenSecretArn,
);
githubTokenSecret.grantRead(this.fn);
}
// AgentCore runtime invocation permissions
// The InvokeAgentRuntime API targets a sub-resource (runtime-endpoint/DEFAULT),
// so we need a wildcard after the runtime ARN.
const runtimeArns = [props.runtimeArn, ...(props.additionalRuntimeArns ?? [])];
const runtimeResources = runtimeArns.flatMap(arn => [arn, `${arn}/*`]);
this.fn.addToRolePolicy(new iam.PolicyStatement({
actions: [
'bedrock-agentcore:InvokeAgentRuntime',
'bedrock-agentcore:StopRuntimeSession',
],
resources: runtimeResources,
}));
// ECS compute strategy permissions (only when ECS is configured)
if (props.ecsConfig) {
this.fn.addToRolePolicy(new iam.PolicyStatement({
actions: [
'ecs:RunTask',
'ecs:DescribeTasks',
'ecs:StopTask',
],
resources: ['*'],
conditions: {
ArnEquals: {
'ecs:cluster': props.ecsConfig.clusterArn,
},
},
}));
this.fn.addToRolePolicy(new iam.PolicyStatement({
actions: ['iam:PassRole'],
resources: [props.ecsConfig.taskRoleArn, props.ecsConfig.executionRoleArn],
conditions: {
StringEquals: {
'iam:PassedToService': 'ecs-tasks.amazonaws.com',
},
},
}));
}
// Per-repo Secrets Manager grants (e.g. per-repo GitHub tokens from Blueprints)
for (const [index, secretArn] of (props.additionalSecretArns ?? []).entries()) {
const secret = secretsmanager.Secret.fromSecretCompleteArn(
this, `AdditionalSecret${index}`, secretArn,
);
secret.grantRead(this.fn);
}
// Bedrock Guardrail permissions
if (props.guardrailId) {
this.fn.addToRolePolicy(new iam.PolicyStatement({
actions: ['bedrock:ApplyGuardrail'],
resources: [
Stack.of(this).formatArn({
service: 'bedrock',
resource: 'guardrail',
resourceName: props.guardrailId,
}),
],
}));
}
// Create alias for durable function invocation
const fnAlias = this.fn.currentVersion.addAlias('live');
this.alias = fnAlias;
// Retry config: durable execution handles retries; disable Lambda-level retries
// to avoid duplicate invocations that could lead to double task execution.
fnAlias.configureAsyncInvoke({
retryAttempts: 0,
});
// CloudWatch alarm on orchestrator errors — alerts when async invocations
// are consistently failing (throttled, dropped, or crashing).
this.errorAlarm = new cloudwatch.Alarm(this, 'OrchestratorErrorAlarm', {
metric: this.fn.metricErrors({
period: Duration.minutes(5),
}),
threshold: 3,
evaluationPeriods: 2,
alarmDescription: 'Orchestrator Lambda errors exceeded threshold — tasks may be stuck in SUBMITTED state',
treatMissingData: cloudwatch.TreatMissingData.NOT_BREACHING,
});
NagSuppressions.addResourceSuppressions(this.fn, [
{
id: 'AwsSolutions-IAM4',
reason: 'AWSLambdaBasicDurableExecutionRolePolicy is the AWS-recommended managed policy for durable Lambda functions',
},
{
id: 'AwsSolutions-IAM5',
reason: 'DynamoDB index/* wildcards generated by CDK grantReadWriteData; AgentCore runtime/* required for sub-resource invocation; Secrets Manager wildcards generated by CDK grantRead; AgentCore Memory wildcards generated by CDK grantRead/grantWrite; ECS RunTask/DescribeTasks/StopTask conditioned on cluster ARN; iam:PassRole scoped to ECS task/execution roles and conditioned on ecs-tasks.amazonaws.com',
},
], true);
}
}