Skip to content

Commit 236d7bd

Browse files
feat: introduce ComputeStrategy interface and extract AgentCoreComputeStrategy
Refactor the orchestrator to use a strategy pattern for compute backends, as specified in the design docs. Extract AgentCore-specific session logic (invoke/poll/stop) into AgentCoreComputeStrategy, resolved via a factory based on blueprintConfig.compute_type. State transitions remain in the handler. Pure refactor — no behavior or CloudFormation template changes.
1 parent 03b8c83 commit 236d7bd

8 files changed

Lines changed: 363 additions & 76 deletions

File tree

docs/src/content/docs/roadmap/Roadmap.md

Lines changed: 21 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/handlers/orchestrate-task.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import { withDurableExecution, type DurableExecutionHandler } from '@aws/durable-execution-sdk-js';
2121
import { TaskStatus, TERMINAL_STATUSES } from '../constructs/task-status';
22+
import { resolveComputeStrategy } from './shared/compute-strategy';
2223
import {
2324
admissionControl,
2425
emitTaskEvent,
@@ -28,7 +29,7 @@ import {
2829
loadBlueprintConfig,
2930
loadTask,
3031
pollTaskStatus,
31-
startSession,
32+
transitionTask,
3233
type PollState,
3334
} from './shared/orchestrator';
3435

@@ -87,10 +88,22 @@ const durableHandler: DurableExecutionHandler<OrchestrateTaskEvent, void> = asyn
8788
}
8889
});
8990

90-
// Step 4: Start agent session — invoke runtime and transition to RUNNING
91+
// Step 4: Start agent session — resolve compute strategy, invoke runtime, transition to RUNNING
9192
await context.step('start-session', async () => {
9293
try {
93-
return await startSession(task, payload, blueprintConfig);
94+
const strategy = resolveComputeStrategy(blueprintConfig);
95+
const handle = await strategy.startSession({ taskId, payload, blueprintConfig });
96+
97+
await transitionTask(taskId, TaskStatus.HYDRATING, TaskStatus.RUNNING, {
98+
session_id: handle.sessionId,
99+
started_at: new Date().toISOString(),
100+
});
101+
await emitTaskEvent(taskId, 'session_started', {
102+
session_id: handle.sessionId,
103+
strategy_type: handle.strategyType,
104+
});
105+
106+
return handle.sessionId;
94107
} catch (err) {
95108
await failTask(taskId, TaskStatus.HYDRATING, `Session start failed: ${String(err)}`, task.user_id, true);
96109
throw err;
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* MIT No Attribution
3+
*
4+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
7+
* the Software without restriction, including without limitation the rights to
8+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
9+
* the Software, and to permit persons to whom the Software is furnished to do so.
10+
*
11+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
12+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
13+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
14+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
15+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
16+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
17+
* SOFTWARE.
18+
*/
19+
20+
import type { BlueprintConfig } from './repo-config';
21+
import { AgentCoreComputeStrategy } from './strategies/agentcore-strategy';
22+
23+
export interface SessionHandle {
24+
readonly sessionId: string;
25+
readonly strategyType: string;
26+
readonly metadata: Record<string, unknown>;
27+
}
28+
29+
export type SessionStatus =
30+
| { readonly status: 'running' }
31+
| { readonly status: 'completed' }
32+
| { readonly status: 'failed'; readonly error: string };
33+
34+
export interface ComputeStrategy {
35+
readonly type: string;
36+
startSession(input: {
37+
taskId: string;
38+
payload: Record<string, unknown>;
39+
blueprintConfig: BlueprintConfig;
40+
}): Promise<SessionHandle>;
41+
pollSession(handle: SessionHandle): Promise<SessionStatus>;
42+
stopSession(handle: SessionHandle): Promise<void>;
43+
}
44+
45+
export function resolveComputeStrategy(blueprintConfig: BlueprintConfig): ComputeStrategy {
46+
const computeType = blueprintConfig.compute_type;
47+
switch (computeType) {
48+
case 'agentcore':
49+
return new AgentCoreComputeStrategy({ runtimeArn: blueprintConfig.runtime_arn });
50+
default:
51+
throw new Error(`Unknown compute_type: '${computeType}'`);
52+
}
53+
}

src/handlers/shared/orchestrator.ts

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
* SOFTWARE.
1818
*/
1919

20-
import { randomUUID } from 'crypto';
21-
import { InvokeAgentRuntimeCommand, BedrockAgentCoreClient } from '@aws-sdk/client-bedrock-agentcore';
2220
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
2321
import { DynamoDBDocumentClient, GetCommand, PutCommand, UpdateCommand } from '@aws-sdk/lib-dynamodb';
2422
import { ulid } from 'ulid';
@@ -32,7 +30,6 @@ import { computeTtlEpoch, DEFAULT_MAX_TURNS } from './validation';
3230
import { TaskStatus, TERMINAL_STATUSES, VALID_TRANSITIONS, type TaskStatusType } from '../../constructs/task-status';
3331

3432
const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({}));
35-
const agentCoreClient = new BedrockAgentCoreClient({});
3633

3734
const TABLE_NAME = process.env.TASK_TABLE_NAME!;
3835
const EVENTS_TABLE_NAME = process.env.TASK_EVENTS_TABLE_NAME!;
@@ -305,43 +302,6 @@ export async function hydrateAndTransition(task: TaskRecord, blueprintConfig?: B
305302
return payload;
306303
}
307304

308-
/**
309-
* Start an AgentCore runtime session and transition task to RUNNING.
310-
* @param task - the task record.
311-
* @param payload - the hydrated invocation payload.
312-
* @param blueprintConfig - optional per-repo blueprint config for runtime ARN override.
313-
* @returns the session ID.
314-
*/
315-
export async function startSession(
316-
task: TaskRecord,
317-
payload: Record<string, unknown>,
318-
blueprintConfig?: BlueprintConfig,
319-
): Promise<string> {
320-
// AgentCore requires runtimeSessionId >= 33 chars; UUID v4 is 36 chars.
321-
const sessionId = randomUUID();
322-
const runtimeArn = blueprintConfig?.runtime_arn ?? RUNTIME_ARN;
323-
324-
const command = new InvokeAgentRuntimeCommand({
325-
agentRuntimeArn: runtimeArn,
326-
runtimeSessionId: sessionId,
327-
contentType: 'application/json',
328-
accept: 'application/json',
329-
payload: new TextEncoder().encode(JSON.stringify({ input: payload })),
330-
});
331-
332-
await agentCoreClient.send(command);
333-
334-
await transitionTask(task.task_id, TaskStatus.HYDRATING, TaskStatus.RUNNING, {
335-
session_id: sessionId,
336-
started_at: new Date().toISOString(),
337-
});
338-
await emitTaskEvent(task.task_id, 'session_started', { session_id: sessionId });
339-
340-
logger.info('Session started', { task_id: task.task_id, session_id: sessionId });
341-
342-
return sessionId;
343-
}
344-
345305
/**
346306
* Poll the task record in DynamoDB to check if the agent wrote a terminal status.
347307
* Returns the updated PollState; the waitStrategy decides whether to continue.
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/**
2+
* MIT No Attribution
3+
*
4+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
7+
* the Software without restriction, including without limitation the rights to
8+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
9+
* the Software, and to permit persons to whom the Software is furnished to do so.
10+
*
11+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
12+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
13+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
14+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
15+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
16+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
17+
* SOFTWARE.
18+
*/
19+
20+
import { randomUUID } from 'crypto';
21+
import { BedrockAgentCoreClient, InvokeAgentRuntimeCommand, StopRuntimeSessionCommand } from '@aws-sdk/client-bedrock-agentcore';
22+
import type { ComputeStrategy, SessionHandle, SessionStatus } from '../compute-strategy';
23+
import { logger } from '../logger';
24+
import type { BlueprintConfig } from '../repo-config';
25+
26+
export class AgentCoreComputeStrategy implements ComputeStrategy {
27+
readonly type = 'agentcore';
28+
private readonly client: BedrockAgentCoreClient;
29+
private readonly runtimeArn: string;
30+
31+
constructor(options: { runtimeArn: string }) {
32+
this.runtimeArn = options.runtimeArn;
33+
this.client = new BedrockAgentCoreClient({});
34+
}
35+
36+
async startSession(input: {
37+
taskId: string;
38+
payload: Record<string, unknown>;
39+
blueprintConfig: BlueprintConfig;
40+
}): Promise<SessionHandle> {
41+
// AgentCore requires runtimeSessionId >= 33 chars; UUID v4 is 36 chars.
42+
const sessionId = randomUUID();
43+
const runtimeArn = input.blueprintConfig.runtime_arn ?? this.runtimeArn;
44+
45+
const command = new InvokeAgentRuntimeCommand({
46+
agentRuntimeArn: runtimeArn,
47+
runtimeSessionId: sessionId,
48+
contentType: 'application/json',
49+
accept: 'application/json',
50+
payload: new TextEncoder().encode(JSON.stringify({ input: input.payload })),
51+
});
52+
53+
await this.client.send(command);
54+
55+
logger.info('AgentCore session invoked', { task_id: input.taskId, session_id: sessionId, runtime_arn: runtimeArn });
56+
57+
return {
58+
sessionId,
59+
strategyType: this.type,
60+
metadata: { runtimeArn },
61+
};
62+
}
63+
64+
async pollSession(_handle: SessionHandle): Promise<SessionStatus> {
65+
// Polling is currently done at the orchestrator level via DDB reads.
66+
// This method exists for PR 2 where different strategies may poll differently.
67+
return { status: 'running' };
68+
}
69+
70+
async stopSession(handle: SessionHandle): Promise<void> {
71+
const runtimeArn = handle.metadata.runtimeArn as string | undefined;
72+
if (!runtimeArn) {
73+
logger.warn('No runtimeArn in session handle, cannot stop session', { session_id: handle.sessionId });
74+
return;
75+
}
76+
77+
try {
78+
await this.client.send(new StopRuntimeSessionCommand({
79+
agentRuntimeArn: runtimeArn,
80+
runtimeSessionId: handle.sessionId,
81+
}));
82+
logger.info('AgentCore session stopped', { session_id: handle.sessionId });
83+
} catch (err) {
84+
logger.warn('Failed to stop AgentCore session (best-effort)', {
85+
session_id: handle.sessionId,
86+
error: err instanceof Error ? err.message : String(err),
87+
});
88+
}
89+
}
90+
}

test/handlers/orchestrate-task.test.ts

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ jest.mock('@aws-sdk/lib-dynamodb', () => ({
2727
UpdateCommand: jest.fn((input: unknown) => ({ _type: 'Update', input })),
2828
}));
2929

30-
const mockAgentCoreSend = jest.fn();
3130
jest.mock('@aws-sdk/client-bedrock-agentcore', () => ({
32-
BedrockAgentCoreClient: jest.fn(() => ({ send: mockAgentCoreSend })),
31+
BedrockAgentCoreClient: jest.fn(() => ({ send: jest.fn() })),
3332
InvokeAgentRuntimeCommand: jest.fn((input: unknown) => ({ _type: 'InvokeAgentRuntime', input })),
33+
StopRuntimeSessionCommand: jest.fn(),
3434
}));
3535

3636
const mockHydrateContext = jest.fn();
@@ -75,7 +75,6 @@ import {
7575
loadBlueprintConfig,
7676
loadTask,
7777
pollTaskStatus,
78-
startSession,
7978
transitionTask,
8079
} from '../../src/handlers/shared/orchestrator';
8180

@@ -192,18 +191,6 @@ describe('hydrateAndTransition', () => {
192191
});
193192
});
194193

195-
describe('startSession', () => {
196-
test('invokes agent runtime and transitions to RUNNING', async () => {
197-
mockAgentCoreSend.mockResolvedValueOnce({}); // InvokeAgentRuntime
198-
mockDdbSend.mockResolvedValue({}); // transitionTask + emitTaskEvent
199-
200-
const sessionId = await startSession(baseTask as any, { repo_url: 'org/repo', task_id: 'TASK001' });
201-
// Session ID is a UUID v4 (36 chars), not a ULID
202-
expect(sessionId).toMatch(/^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/);
203-
expect(mockAgentCoreSend).toHaveBeenCalledTimes(1);
204-
});
205-
});
206-
207194
describe('pollTaskStatus', () => {
208195
test('increments attempt count and reads status', async () => {
209196
mockDdbSend.mockResolvedValueOnce({ Item: { status: 'RUNNING' } });
@@ -385,21 +372,6 @@ describe('hydrateAndTransition with blueprint config', () => {
385372
});
386373
});
387374

388-
describe('startSession with blueprint config', () => {
389-
test('uses blueprint runtime ARN override', async () => {
390-
mockAgentCoreSend.mockResolvedValueOnce({});
391-
mockDdbSend.mockResolvedValue({});
392-
393-
await startSession(baseTask as any, { repo_url: 'org/repo', task_id: 'TASK001' }, {
394-
compute_type: 'agentcore',
395-
runtime_arn: 'arn:aws:bedrock-agentcore:us-east-1:123:runtime/custom',
396-
});
397-
398-
const invokeCall = mockAgentCoreSend.mock.calls[0][0];
399-
expect(invokeCall.input.agentRuntimeArn).toBe('arn:aws:bedrock-agentcore:us-east-1:123:runtime/custom');
400-
});
401-
});
402-
403375
describe('finalizeTask', () => {
404376
test('handles already-terminal task', async () => {
405377
mockDdbSend
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* MIT No Attribution
3+
*
4+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
7+
* the Software without restriction, including without limitation the rights to
8+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
9+
* the Software, and to permit persons to whom the Software is furnished to do so.
10+
*
11+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
12+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
13+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
14+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
15+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
16+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
17+
* SOFTWARE.
18+
*/
19+
20+
jest.mock('@aws-sdk/client-bedrock-agentcore', () => ({
21+
BedrockAgentCoreClient: jest.fn(() => ({ send: jest.fn() })),
22+
InvokeAgentRuntimeCommand: jest.fn(),
23+
StopRuntimeSessionCommand: jest.fn(),
24+
}));
25+
26+
import { resolveComputeStrategy } from '../../../src/handlers/shared/compute-strategy';
27+
import { AgentCoreComputeStrategy } from '../../../src/handlers/shared/strategies/agentcore-strategy';
28+
29+
describe('resolveComputeStrategy', () => {
30+
test('returns AgentCoreComputeStrategy for compute_type agentcore', () => {
31+
const strategy = resolveComputeStrategy({
32+
compute_type: 'agentcore',
33+
runtime_arn: 'arn:aws:bedrock-agentcore:us-east-1:123456789012:runtime/test',
34+
});
35+
expect(strategy).toBeInstanceOf(AgentCoreComputeStrategy);
36+
expect(strategy.type).toBe('agentcore');
37+
});
38+
39+
test('throws for unknown compute_type', () => {
40+
expect(() => resolveComputeStrategy({
41+
compute_type: 'unknown-type',
42+
runtime_arn: 'arn:test',
43+
})).toThrow("Unknown compute_type: 'unknown-type'");
44+
});
45+
});

0 commit comments

Comments
 (0)