Skip to content

Commit a8a1f79

Browse files
authored
feat: add streamDeliveryResources schema, CLI flags, and validation for memory record streaming (#531)
* feat: add streamDeliveryResources schema, CLI flags, and validation for memory record streaming * fix: address review feedback on streaming support - Replace unsafe `as MemoryStrategyType` casts with Zod parse - Include deliveryType in streamDeliveryResources conflict check - Reject deliveryType without dataStreamArn - Validate streamDeliveryResources JSON eagerly in validator - Include Zod error details in parseStreamDeliveryResources - Extract DEFAULT_DELIVERY_TYPE constant - Tighten createMemory strategies param type * fix: update create-memory test for strict strategy validation CUSTOM is not a valid MemoryStrategyType. The previous test relied on an unsafe `as` cast to pass an invalid strategy through. Now that we use Zod parse, invalid strategies are correctly rejected.
1 parent 3164310 commit a8a1f79

File tree

8 files changed

+518
-70
lines changed

8 files changed

+518
-70
lines changed

docs/memory.md

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,66 @@ Memory events expire after a configurable duration (7-365 days, default 30):
222222
}
223223
```
224224

225+
## Memory Record Streaming
226+
227+
Memory record streaming delivers real-time events when memory records are created, updated, or deleted. Events are
228+
pushed to a delivery target in your account, enabling event-driven architectures without polling.
229+
230+
### Enabling Streaming
231+
232+
Via CLI flags:
233+
234+
```bash
235+
agentcore add memory \
236+
--name MyMemory \
237+
--strategies SEMANTIC \
238+
--data-stream-arn arn:aws:kinesis:us-west-2:123456789012:stream/my-stream \
239+
--stream-content-level FULL_CONTENT
240+
```
241+
242+
For advanced configurations (e.g. multiple delivery targets), pass the full JSON:
243+
244+
```bash
245+
agentcore add memory \
246+
--name MyMemory \
247+
--strategies SEMANTIC \
248+
--stream-delivery-resources '{"resources":[{"kinesis":{"dataStreamArn":"arn:aws:kinesis:us-west-2:123456789012:stream/my-stream","contentConfigurations":[{"type":"MEMORY_RECORDS","level":"FULL_CONTENT"}]}}]}'
249+
```
250+
251+
### Configuration
252+
253+
```json
254+
{
255+
"type": "AgentCoreMemory",
256+
"name": "MyMemory",
257+
"eventExpiryDuration": 30,
258+
"strategies": [{ "type": "SEMANTIC" }],
259+
"streamDeliveryResources": {
260+
"resources": [
261+
{
262+
"kinesis": {
263+
"dataStreamArn": "arn:aws:kinesis:us-west-2:123456789012:stream/my-stream",
264+
"contentConfigurations": [{ "type": "MEMORY_RECORDS", "level": "FULL_CONTENT" }]
265+
}
266+
}
267+
]
268+
}
269+
}
270+
```
271+
272+
### Content Level
273+
274+
| Level | Description |
275+
| --------------- | ---------------------------------------------------------- |
276+
| `FULL_CONTENT` | Events include memory record text and all metadata |
277+
| `METADATA_ONLY` | Events include only metadata (IDs, timestamps, namespaces) |
278+
279+
The CDK construct automatically grants the memory execution role permission to publish to the configured delivery
280+
target.
281+
282+
For more details, see the
283+
[Memory Record Streaming documentation](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/memory-record-streaming.html).
284+
225285
## Using Memory in Code
226286

227287
The memory ID is available via environment variable:

src/cli/commands/add/__tests__/validate.test.ts

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -994,6 +994,119 @@ describe('validate', () => {
994994
valid: true,
995995
});
996996
});
997+
998+
// Streaming validation
999+
it('accepts valid streaming options', () => {
1000+
expect(
1001+
validateAddMemoryOptions({
1002+
...validMemoryOptions,
1003+
dataStreamArn: 'arn:aws:kinesis:us-west-2:123456789012:stream/test',
1004+
contentLevel: 'FULL_CONTENT',
1005+
})
1006+
).toEqual({ valid: true });
1007+
});
1008+
1009+
it('accepts dataStreamArn without contentLevel (defaults to FULL_CONTENT)', () => {
1010+
expect(
1011+
validateAddMemoryOptions({
1012+
...validMemoryOptions,
1013+
dataStreamArn: 'arn:aws:kinesis:us-west-2:123456789012:stream/test',
1014+
})
1015+
).toEqual({ valid: true });
1016+
});
1017+
1018+
it('rejects contentLevel without dataStreamArn', () => {
1019+
const result = validateAddMemoryOptions({ ...validMemoryOptions, contentLevel: 'FULL_CONTENT' });
1020+
expect(result.valid).toBe(false);
1021+
expect(result.error).toContain('--data-stream-arn is required');
1022+
});
1023+
1024+
it('rejects invalid contentLevel', () => {
1025+
const result = validateAddMemoryOptions({
1026+
...validMemoryOptions,
1027+
dataStreamArn: 'arn:aws:kinesis:us-west-2:123456789012:stream/test',
1028+
contentLevel: 'INVALID',
1029+
});
1030+
expect(result.valid).toBe(false);
1031+
expect(result.error).toContain('Invalid content level');
1032+
});
1033+
1034+
it('rejects invalid deliveryType', () => {
1035+
const result = validateAddMemoryOptions({
1036+
...validMemoryOptions,
1037+
dataStreamArn: 'arn:aws:kinesis:us-west-2:123456789012:stream/test',
1038+
deliveryType: 'sqs',
1039+
});
1040+
expect(result.valid).toBe(false);
1041+
expect(result.error).toContain('Invalid delivery type');
1042+
});
1043+
1044+
it('accepts valid deliveryType', () => {
1045+
expect(
1046+
validateAddMemoryOptions({
1047+
...validMemoryOptions,
1048+
dataStreamArn: 'arn:aws:kinesis:us-west-2:123456789012:stream/test',
1049+
deliveryType: 'kinesis',
1050+
})
1051+
).toEqual({ valid: true });
1052+
});
1053+
1054+
it('rejects dataStreamArn not starting with arn:', () => {
1055+
const result = validateAddMemoryOptions({
1056+
...validMemoryOptions,
1057+
dataStreamArn: 'not-an-arn',
1058+
});
1059+
expect(result.valid).toBe(false);
1060+
expect(result.error).toContain('valid ARN');
1061+
});
1062+
1063+
it('rejects combining streamDeliveryResources with flat flags', () => {
1064+
const result = validateAddMemoryOptions({
1065+
...validMemoryOptions,
1066+
dataStreamArn: 'arn:aws:kinesis:us-west-2:123456789012:stream/test',
1067+
streamDeliveryResources: '{"resources":[]}',
1068+
});
1069+
expect(result.valid).toBe(false);
1070+
expect(result.error).toContain('cannot be combined');
1071+
});
1072+
1073+
it('rejects combining streamDeliveryResources with deliveryType', () => {
1074+
const result = validateAddMemoryOptions({
1075+
...validMemoryOptions,
1076+
deliveryType: 'kinesis',
1077+
streamDeliveryResources:
1078+
'{"resources":[{"kinesis":{"dataStreamArn":"arn:aws:kinesis:us-west-2:123456789012:stream/test","contentConfigurations":[{"type":"MEMORY_RECORDS","level":"FULL_CONTENT"}]}}]}',
1079+
});
1080+
expect(result.valid).toBe(false);
1081+
expect(result.error).toContain('cannot be combined');
1082+
});
1083+
1084+
it('rejects deliveryType without dataStreamArn', () => {
1085+
const result = validateAddMemoryOptions({
1086+
...validMemoryOptions,
1087+
deliveryType: 'kinesis',
1088+
});
1089+
expect(result.valid).toBe(false);
1090+
expect(result.error).toContain('--data-stream-arn is required');
1091+
});
1092+
1093+
it('rejects invalid streamDeliveryResources JSON', () => {
1094+
const result = validateAddMemoryOptions({
1095+
...validMemoryOptions,
1096+
streamDeliveryResources: 'not json',
1097+
});
1098+
expect(result.valid).toBe(false);
1099+
expect(result.error).toContain('Invalid JSON');
1100+
});
1101+
1102+
it('rejects streamDeliveryResources that fails schema validation', () => {
1103+
const result = validateAddMemoryOptions({
1104+
...validMemoryOptions,
1105+
streamDeliveryResources: '{"resources":[]}',
1106+
});
1107+
expect(result.valid).toBe(false);
1108+
expect(result.error).toContain('does not match the expected schema');
1109+
});
9971110
});
9981111

9991112
describe('validateAddCredentialOptions', () => {

src/cli/commands/add/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ export interface AddMemoryOptions {
109109
name?: string;
110110
strategies?: string;
111111
expiry?: number;
112+
deliveryType?: string;
113+
dataStreamArn?: string;
114+
contentLevel?: string;
115+
streamDeliveryResources?: string;
112116
json?: boolean;
113117
}
114118

src/cli/commands/add/validate.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
ProtocolModeSchema,
99
RuntimeAuthorizerTypeSchema,
1010
SDKFrameworkSchema,
11+
StreamDeliveryResourcesSchema,
1112
TARGET_TYPE_AUTH_CONFIG,
1213
TargetLanguageSchema,
1314
getSupportedFrameworksForProtocol,
@@ -35,6 +36,9 @@ export interface ValidationResult {
3536
// Constants
3637
const MEMORY_OPTIONS = ['none', 'shortTerm', 'longAndShortTerm'] as const;
3738
const VALID_STRATEGIES = ['SEMANTIC', 'SUMMARIZATION', 'USER_PREFERENCE', 'EPISODIC'];
39+
const VALID_STREAM_CONTENT_LEVELS = ['FULL_CONTENT', 'METADATA_ONLY'];
40+
const VALID_DELIVERY_TYPES = ['kinesis'] as const;
41+
export const DEFAULT_DELIVERY_TYPE = 'kinesis';
3842

3943
/**
4044
* Validate that a credential name exists in the project spec.
@@ -677,6 +681,57 @@ export function validateAddMemoryOptions(options: AddMemoryOptions): ValidationR
677681
}
678682
}
679683

684+
if (options.streamDeliveryResources && (options.dataStreamArn || options.contentLevel || options.deliveryType)) {
685+
return {
686+
valid: false,
687+
error:
688+
'--stream-delivery-resources cannot be combined with --data-stream-arn, --stream-content-level, or --delivery-type',
689+
};
690+
}
691+
692+
if (options.contentLevel && !options.dataStreamArn) {
693+
return { valid: false, error: '--data-stream-arn is required when --stream-content-level is set' };
694+
}
695+
696+
if (options.deliveryType && !options.dataStreamArn) {
697+
return { valid: false, error: '--data-stream-arn is required when --delivery-type is set' };
698+
}
699+
700+
if (options.dataStreamArn && !options.dataStreamArn.startsWith('arn:')) {
701+
return { valid: false, error: '--data-stream-arn must be a valid ARN (starts with arn:)' };
702+
}
703+
704+
if (
705+
options.deliveryType &&
706+
!VALID_DELIVERY_TYPES.includes(options.deliveryType as (typeof VALID_DELIVERY_TYPES)[number])
707+
) {
708+
return {
709+
valid: false,
710+
error: `Invalid delivery type. Must be one of: ${VALID_DELIVERY_TYPES.join(', ')}`,
711+
};
712+
}
713+
714+
if (options.contentLevel && !VALID_STREAM_CONTENT_LEVELS.includes(options.contentLevel)) {
715+
return {
716+
valid: false,
717+
error: `Invalid content level. Must be one of: ${VALID_STREAM_CONTENT_LEVELS.join(', ')}`,
718+
};
719+
}
720+
721+
if (options.streamDeliveryResources) {
722+
try {
723+
StreamDeliveryResourcesSchema.parse(JSON.parse(options.streamDeliveryResources));
724+
} catch (e) {
725+
return {
726+
valid: false,
727+
error:
728+
e instanceof SyntaxError
729+
? 'Invalid JSON in --stream-delivery-resources'
730+
: 'Invalid --stream-delivery-resources: does not match the expected schema',
731+
};
732+
}
733+
}
734+
680735
return { valid: true };
681736
}
682737

src/cli/operations/memory/__tests__/create-memory.test.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,20 +76,18 @@ describe('add', () => {
7676
expect(addedMemory.strategies[0]!.namespaces).toEqual(['/users/{actorId}/facts']);
7777
});
7878

79-
it('creates memory with strategy without default namespaces', async () => {
79+
it('rejects invalid strategy type', async () => {
8080
const project = makeProject([]);
8181
mockReadProjectSpec.mockResolvedValue(project);
82-
mockWriteProjectSpec.mockResolvedValue(undefined);
8382

84-
await primitive.add({
83+
const result = await primitive.add({
8584
name: 'NewMem',
8685
strategies: 'CUSTOM',
8786
expiry: 30,
8887
});
8988

90-
const writtenSpec = mockWriteProjectSpec.mock.calls[0]![0];
91-
const addedMemory = writtenSpec.memories.find((m: { name: string }) => m.name === 'NewMem');
92-
expect(addedMemory.strategies[0]!.namespaces).toBeUndefined();
89+
expect(result).toEqual(expect.objectContaining({ success: false, error: expect.any(String) }));
90+
expect(mockWriteProjectSpec).not.toHaveBeenCalled();
9391
});
9492

9593
it('returns error on duplicate memory name', async () => {

0 commit comments

Comments
 (0)