Skip to content

Commit 8202106

Browse files
committed
fix: address review feedback on streaming support
- Replace unsafe `as MemoryStrategyType` casts with Zod parse - Include deliveryType in streamDeliveryResources conflict check - Reject deliveryType without dataStreamArn - Validate streamDeliveryResources JSON eagerly in validator - Include Zod error details in parseStreamDeliveryResources - Extract DEFAULT_DELIVERY_TYPE constant - Tighten createMemory strategies param type
1 parent 505cb43 commit 8202106

File tree

3 files changed

+103
-32
lines changed

3 files changed

+103
-32
lines changed

src/cli/commands/add/__tests__/validate.test.ts

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,13 +1032,23 @@ describe('validate', () => {
10321032
});
10331033

10341034
it('rejects invalid deliveryType', () => {
1035-
const result = validateAddMemoryOptions({ ...validMemoryOptions, deliveryType: 'sqs' });
1035+
const result = validateAddMemoryOptions({
1036+
...validMemoryOptions,
1037+
dataStreamArn: 'arn:aws:kinesis:us-west-2:123456789012:stream/test',
1038+
deliveryType: 'sqs',
1039+
});
10361040
expect(result.valid).toBe(false);
10371041
expect(result.error).toContain('Invalid delivery type');
10381042
});
10391043

10401044
it('accepts valid deliveryType', () => {
1041-
expect(validateAddMemoryOptions({ ...validMemoryOptions, deliveryType: 'kinesis' })).toEqual({ valid: true });
1045+
expect(
1046+
validateAddMemoryOptions({
1047+
...validMemoryOptions,
1048+
dataStreamArn: 'arn:aws:kinesis:us-west-2:123456789012:stream/test',
1049+
deliveryType: 'kinesis',
1050+
})
1051+
).toEqual({ valid: true });
10421052
});
10431053

10441054
it('rejects dataStreamArn not starting with arn:', () => {
@@ -1059,6 +1069,44 @@ describe('validate', () => {
10591069
expect(result.valid).toBe(false);
10601070
expect(result.error).toContain('cannot be combined');
10611071
});
1072+
1073+
it('rejects combining streamDeliveryResources with deliveryType', () => {
1074+
const result = validateAddMemoryOptions({
1075+
...validMemoryOptions,
1076+
deliveryType: 'kinesis',
1077+
streamDeliveryResources:
1078+
'{"resources":[{"kinesis":{"dataStreamArn":"arn:aws:kinesis:us-west-2:123456789012:stream/test","contentConfigurations":[{"type":"MEMORY_RECORDS","level":"FULL_CONTENT"}]}}]}',
1079+
});
1080+
expect(result.valid).toBe(false);
1081+
expect(result.error).toContain('cannot be combined');
1082+
});
1083+
1084+
it('rejects deliveryType without dataStreamArn', () => {
1085+
const result = validateAddMemoryOptions({
1086+
...validMemoryOptions,
1087+
deliveryType: 'kinesis',
1088+
});
1089+
expect(result.valid).toBe(false);
1090+
expect(result.error).toContain('--data-stream-arn is required');
1091+
});
1092+
1093+
it('rejects invalid streamDeliveryResources JSON', () => {
1094+
const result = validateAddMemoryOptions({
1095+
...validMemoryOptions,
1096+
streamDeliveryResources: 'not json',
1097+
});
1098+
expect(result.valid).toBe(false);
1099+
expect(result.error).toContain('Invalid JSON');
1100+
});
1101+
1102+
it('rejects streamDeliveryResources that fails schema validation', () => {
1103+
const result = validateAddMemoryOptions({
1104+
...validMemoryOptions,
1105+
streamDeliveryResources: '{"resources":[]}',
1106+
});
1107+
expect(result.valid).toBe(false);
1108+
expect(result.error).toContain('does not match the expected schema');
1109+
});
10621110
});
10631111

10641112
describe('validateAddCredentialOptions', () => {

src/cli/commands/add/validate.ts

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
ProtocolModeSchema,
99
RuntimeAuthorizerTypeSchema,
1010
SDKFrameworkSchema,
11+
StreamDeliveryResourcesSchema,
1112
TARGET_TYPE_AUTH_CONFIG,
1213
TargetLanguageSchema,
1314
getSupportedFrameworksForProtocol,
@@ -36,7 +37,8 @@ export interface ValidationResult {
3637
const MEMORY_OPTIONS = ['none', 'shortTerm', 'longAndShortTerm'] as const;
3738
const VALID_STRATEGIES = ['SEMANTIC', 'SUMMARIZATION', 'USER_PREFERENCE', 'EPISODIC'];
3839
const VALID_STREAM_CONTENT_LEVELS = ['FULL_CONTENT', 'METADATA_ONLY'];
39-
const VALID_DELIVERY_TYPES = ['kinesis'];
40+
const VALID_DELIVERY_TYPES = ['kinesis'] as const;
41+
export const DEFAULT_DELIVERY_TYPE = 'kinesis';
4042

4143
/**
4244
* Validate that a credential name exists in the project spec.
@@ -679,22 +681,30 @@ export function validateAddMemoryOptions(options: AddMemoryOptions): ValidationR
679681
}
680682
}
681683

682-
if (options.streamDeliveryResources && (options.dataStreamArn || options.contentLevel)) {
684+
if (options.streamDeliveryResources && (options.dataStreamArn || options.contentLevel || options.deliveryType)) {
683685
return {
684686
valid: false,
685-
error: '--stream-delivery-resources cannot be combined with --data-stream-arn or --stream-content-level',
687+
error:
688+
'--stream-delivery-resources cannot be combined with --data-stream-arn, --stream-content-level, or --delivery-type',
686689
};
687690
}
688691

689692
if (options.contentLevel && !options.dataStreamArn) {
690693
return { valid: false, error: '--data-stream-arn is required when --stream-content-level is set' };
691694
}
692695

696+
if (options.deliveryType && !options.dataStreamArn) {
697+
return { valid: false, error: '--data-stream-arn is required when --delivery-type is set' };
698+
}
699+
693700
if (options.dataStreamArn && !options.dataStreamArn.startsWith('arn:')) {
694701
return { valid: false, error: '--data-stream-arn must be a valid ARN (starts with arn:)' };
695702
}
696703

697-
if (options.deliveryType && !VALID_DELIVERY_TYPES.includes(options.deliveryType)) {
704+
if (
705+
options.deliveryType &&
706+
!VALID_DELIVERY_TYPES.includes(options.deliveryType as (typeof VALID_DELIVERY_TYPES)[number])
707+
) {
698708
return {
699709
valid: false,
700710
error: `Invalid delivery type. Must be one of: ${VALID_DELIVERY_TYPES.join(', ')}`,
@@ -708,6 +718,20 @@ export function validateAddMemoryOptions(options: AddMemoryOptions): ValidationR
708718
};
709719
}
710720

721+
if (options.streamDeliveryResources) {
722+
try {
723+
StreamDeliveryResourcesSchema.parse(JSON.parse(options.streamDeliveryResources));
724+
} catch (e) {
725+
return {
726+
valid: false,
727+
error:
728+
e instanceof SyntaxError
729+
? 'Invalid JSON in --stream-delivery-resources'
730+
: 'Invalid --stream-delivery-resources: does not match the expected schema',
731+
};
732+
}
733+
}
734+
711735
return { valid: true };
712736
}
713737

src/cli/primitives/MemoryPrimitive.tsx

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,18 @@ import {
1010
DEFAULT_EPISODIC_REFLECTION_NAMESPACES,
1111
DEFAULT_STRATEGY_NAMESPACES,
1212
MemorySchema,
13+
MemoryStrategyTypeSchema,
1314
StreamContentLevelSchema,
1415
StreamDeliveryResourcesSchema,
1516
} from '../../schema';
16-
import { validateAddMemoryOptions } from '../commands/add/validate';
17+
import { DEFAULT_DELIVERY_TYPE, validateAddMemoryOptions } from '../commands/add/validate';
1718
import { getErrorMessage } from '../errors';
1819
import type { RemovalPreview, RemovalResult, SchemaChange } from '../operations/remove/types';
1920
import { DEFAULT_EVENT_EXPIRY } from '../tui/screens/memory/types';
2021
import { BasePrimitive } from './BasePrimitive';
2122
import type { AddResult, AddScreenComponent, RemovableResource } from './types';
2223
import type { Command } from '@commander-js/extra-typings';
24+
import { z } from 'zod';
2325

2426
/**
2527
* Options for adding a memory resource.
@@ -57,14 +59,14 @@ export class MemoryPrimitive extends BasePrimitive<AddMemoryOptions, RemovableMe
5759
.split(',')
5860
.map(s => s.trim())
5961
.filter(Boolean)
60-
.map(type => ({ type: type as MemoryStrategyType }))
62+
.map(type => ({ type: MemoryStrategyTypeSchema.parse(type) }))
6163
: [];
6264

6365
const streamDeliveryResources = options.streamDeliveryResources
6466
? this.parseStreamDeliveryResources(options.streamDeliveryResources)
6567
: options.dataStreamArn
6668
? this.buildStreamDeliveryResources({
67-
deliveryType: options.deliveryType ?? 'kinesis',
69+
deliveryType: options.deliveryType ?? DEFAULT_DELIVERY_TYPE,
6870
dataStreamArn: options.dataStreamArn,
6971
contentLevel: StreamContentLevelSchema.parse(options.contentLevel ?? 'FULL_CONTENT'),
7072
})
@@ -268,7 +270,7 @@ export class MemoryPrimitive extends BasePrimitive<AddMemoryOptions, RemovableMe
268270
private async createMemory(config: {
269271
name: string;
270272
eventExpiryDuration: number;
271-
strategies: { type: string }[];
273+
strategies: { type: MemoryStrategyType }[];
272274
streamDeliveryResources?: StreamDeliveryResources;
273275
}): Promise<Memory> {
274276
const project = await this.readProjectSpec();
@@ -277,12 +279,11 @@ export class MemoryPrimitive extends BasePrimitive<AddMemoryOptions, RemovableMe
277279

278280
// Map strategies with their default namespaces
279281
const strategies: MemoryStrategy[] = config.strategies.map(s => {
280-
const strategyType = s.type as MemoryStrategyType;
281-
const defaultNamespaces = DEFAULT_STRATEGY_NAMESPACES[strategyType];
282+
const defaultNamespaces = DEFAULT_STRATEGY_NAMESPACES[s.type];
282283
return {
283-
type: strategyType,
284+
type: s.type,
284285
...(defaultNamespaces && { namespaces: defaultNamespaces }),
285-
...(strategyType === 'EPISODIC' && { reflectionNamespaces: DEFAULT_EPISODIC_REFLECTION_NAMESPACES }),
286+
...(s.type === 'EPISODIC' && { reflectionNamespaces: DEFAULT_EPISODIC_REFLECTION_NAMESPACES }),
286287
};
287288
});
288289

@@ -304,32 +305,30 @@ export class MemoryPrimitive extends BasePrimitive<AddMemoryOptions, RemovableMe
304305
dataStreamArn: string;
305306
contentLevel: StreamContentLevel;
306307
}): StreamDeliveryResources {
307-
switch (config.deliveryType) {
308-
case 'kinesis':
309-
return {
310-
resources: [
311-
{
312-
kinesis: {
313-
dataStreamArn: config.dataStreamArn,
314-
contentConfigurations: [{ type: 'MEMORY_RECORDS', level: config.contentLevel }],
315-
},
308+
if (config.deliveryType === DEFAULT_DELIVERY_TYPE) {
309+
return {
310+
resources: [
311+
{
312+
kinesis: {
313+
dataStreamArn: config.dataStreamArn,
314+
contentConfigurations: [{ type: 'MEMORY_RECORDS', level: config.contentLevel }],
316315
},
317-
],
318-
};
319-
default:
320-
throw new Error('Unsupported delivery type. Supported types: kinesis');
316+
},
317+
],
318+
};
321319
}
320+
throw new Error(`Unsupported delivery type: ${config.deliveryType}`);
322321
}
323322

324323
private parseStreamDeliveryResources(input: string): StreamDeliveryResources {
325324
try {
326325
return StreamDeliveryResourcesSchema.parse(JSON.parse(input));
327326
} catch (e) {
328-
const message =
329-
e instanceof SyntaxError
330-
? 'Invalid JSON in stream delivery config'
331-
: 'Stream delivery config does not match the expected schema';
332-
throw new Error(message);
327+
if (e instanceof SyntaxError) {
328+
throw new Error('Invalid JSON in stream delivery config');
329+
}
330+
const detail = e instanceof z.ZodError ? `: ${e.issues.map(i => i.message).join(', ')}` : '';
331+
throw new Error(`Stream delivery config does not match the expected schema${detail}`);
333332
}
334333
}
335334
}

0 commit comments

Comments
 (0)