Skip to content

Commit a9d4c6f

Browse files
author
bgagent
committed
fix(pr): review comments
1 parent 0aae3a6 commit a9d4c6f

4 files changed

Lines changed: 250 additions & 22 deletions

File tree

cdk/src/handlers/cleanup-pending-uploads.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,22 +190,27 @@ async function cleanupTaskAttachments(task: ExpiredTask): Promise<void> {
190190
let keyMarker: string | undefined;
191191
let versionIdMarker: string | undefined;
192192
let totalDeleted = 0;
193+
let isTruncated = true;
193194

194-
do {
195+
while (isTruncated) {
195196
const listResp = await s3.send(new ListObjectVersionsCommand({
196197
Bucket: ATTACHMENTS_BUCKET,
197198
Prefix: prefix,
198199
KeyMarker: keyMarker,
199200
VersionIdMarker: versionIdMarker,
200201
}));
201202

203+
isTruncated = listResp.IsTruncated ?? false;
204+
keyMarker = listResp.NextKeyMarker;
205+
versionIdMarker = listResp.NextVersionIdMarker;
206+
202207
// Collect all versions and delete markers for deletion
203208
const objects = [
204209
...(listResp.Versions ?? []).map(v => ({ Key: v.Key!, VersionId: v.VersionId })),
205210
...(listResp.DeleteMarkers ?? []).map(d => ({ Key: d.Key!, VersionId: d.VersionId })),
206211
].filter(obj => obj.Key !== undefined);
207212

208-
if (objects.length === 0) break;
213+
if (objects.length === 0) continue;
209214

210215
const deleteResp = await s3.send(new DeleteObjectsCommand({
211216
Bucket: ATTACHMENTS_BUCKET,
@@ -220,10 +225,7 @@ async function cleanupTaskAttachments(task: ExpiredTask): Promise<void> {
220225
}
221226

222227
totalDeleted += objects.length - (deleteResp.Errors?.length ?? 0);
223-
224-
keyMarker = listResp.NextKeyMarker;
225-
versionIdMarker = listResp.NextVersionIdMarker;
226-
} while (keyMarker);
228+
}
227229

228230
if (totalDeleted > 0) {
229231
logger.info('Cleaned up S3 objects for expired pending-upload task', {

cdk/src/handlers/confirm-uploads.ts

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
2525
import { InvokeCommand, LambdaClient } from '@aws-sdk/client-lambda';
26-
import { DeleteObjectsCommand, GetObjectCommand, HeadObjectCommand, PutObjectCommand, S3Client } from '@aws-sdk/client-s3';
26+
import { DeleteObjectsCommand, GetObjectCommand, HeadObjectCommand, S3Client } from '@aws-sdk/client-s3';
2727
import { DynamoDBDocumentClient, GetCommand, PutCommand, UpdateCommand } from '@aws-sdk/lib-dynamodb';
2828
import type { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda';
2929
import { ulid } from 'ulid';
@@ -230,8 +230,10 @@ export async function handler(event: APIGatewayProxyEvent, context: Context): Pr
230230
request_id: requestId,
231231
});
232232
// Fail the entire task
233-
await failTaskOnScreening(task, taskId, att.filename, err.message, requestId);
234-
await cleanupAllAttachments(task, taskId);
233+
const transitioned = await failTaskOnScreening(task, taskId, att.filename, err.message, requestId);
234+
if (transitioned) {
235+
await cleanupAllAttachments(task, taskId);
236+
}
235237
return errorResponse(400, ErrorCode.ATTACHMENT_BLOCKED,
236238
`Attachment '${att.filename}' was rejected: ${err.message}`, requestId);
237239
}
@@ -260,8 +262,10 @@ export async function handler(event: APIGatewayProxyEvent, context: Context): Pr
260262
const categories = blockedAtt.screening.status === 'blocked'
261263
? blockedAtt.screening.categories.join(', ')
262264
: 'content_policy_violation';
263-
await failTaskOnScreening(task, taskId, blockedAtt.filename, categories, requestId);
264-
await cleanupAllAttachments(task, taskId);
265+
const transitioned = await failTaskOnScreening(task, taskId, blockedAtt.filename, categories, requestId);
266+
if (transitioned) {
267+
await cleanupAllAttachments(task, taskId);
268+
}
265269
return errorResponse(400, ErrorCode.ATTACHMENT_BLOCKED,
266270
`Attachment '${blockedAtt.filename}' was blocked by content policy (${categories}).`, requestId);
267271
}
@@ -342,13 +346,7 @@ async function screenSingleAttachment(
342346
});
343347
}
344348

345-
// Screening passed — re-upload cleaned content (EXIF-stripped for images)
346-
const putResult = await s3Client.send(new PutObjectCommand({
347-
Bucket: ATTACHMENTS_BUCKET,
348-
Key: s3Key,
349-
Body: screenResult.content,
350-
ContentType: att.content_type,
351-
}));
349+
// Screening passed — reuse existing S3 object (no transformation needed)
352350

353351
// Estimate token cost for images (using shared utility)
354352
let tokenEstimate: number | undefined;
@@ -362,8 +360,8 @@ async function screenSingleAttachment(
362360
content_type: att.content_type,
363361
filename: att.filename,
364362
s3_key: s3Key,
365-
s3_version_id: putResult.VersionId ?? 'unversioned',
366-
size_bytes: screenResult.content.length,
363+
s3_version_id: versionId ?? 'unversioned',
364+
size_bytes: sizeBytes,
367365
screening: { status: 'passed', screened_at: new Date().toISOString() },
368366
checksum_sha256: screenResult.checksum,
369367
...(tokenEstimate !== undefined && { token_estimate: tokenEstimate }),
@@ -537,7 +535,7 @@ async function failTaskOnScreening(
537535
filename: string,
538536
reason: string,
539537
requestId: string,
540-
): Promise<void> {
538+
): Promise<boolean> {
541539
const now = new Date().toISOString();
542540
try {
543541
await ddb.send(new UpdateCommand({
@@ -566,7 +564,7 @@ async function failTaskOnScreening(
566564
task_id: taskId,
567565
request_id: requestId,
568566
});
569-
return;
567+
return false;
570568
}
571569
throw err;
572570
}
@@ -592,6 +590,8 @@ async function failTaskOnScreening(
592590
request_id: requestId,
593591
});
594592
}
593+
594+
return true;
595595
}
596596

597597
// ---------------------------------------------------------------------------

cdk/test/handlers/cleanup-pending-uploads.test.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,63 @@ describe('cleanup-pending-uploads handler', () => {
173173
await expect(handler()).rejects.toThrow('All 1 expired PENDING_UPLOADS task(s) failed to process');
174174
});
175175

176+
test('continues pagination when S3 returns empty page with IsTruncated=true', async () => {
177+
const thirtyFiveMinAgo = new Date(Date.now() - 35 * 60 * 1000).toISOString();
178+
179+
mockDdbSend.mockResolvedValueOnce({
180+
Items: [{
181+
task_id: { S: 'TASK001' },
182+
user_id: { S: 'user-123' },
183+
created_at: { S: thirtyFiveMinAgo },
184+
}],
185+
});
186+
187+
// cancelExpiredTask succeeds
188+
mockDdbSend.mockResolvedValueOnce({});
189+
// write event
190+
mockDdbSend.mockResolvedValueOnce({});
191+
192+
// Page 1: empty but IsTruncated=true (S3 scanned past prefix boundary)
193+
mockS3Send.mockResolvedValueOnce({
194+
Versions: [],
195+
DeleteMarkers: [],
196+
IsTruncated: true,
197+
NextKeyMarker: 'attachments/user-123/TASK001/ATT001/image.png',
198+
NextVersionIdMarker: 'v1',
199+
});
200+
// Page 2: has objects, not truncated
201+
mockS3Send.mockResolvedValueOnce({
202+
Versions: [
203+
{ Key: 'attachments/user-123/TASK001/ATT001/image.png', VersionId: 'v1' },
204+
],
205+
DeleteMarkers: [],
206+
IsTruncated: false,
207+
});
208+
// DeleteObjects succeeds
209+
mockS3Send.mockResolvedValueOnce({ Deleted: [{}] });
210+
211+
await handler();
212+
213+
// Verify both ListObjectVersions calls were made (pagination continued past empty page)
214+
const listCalls = mockS3Send.mock.calls.filter(
215+
(call: any[]) => call[0]?._type === 'ListObjectVersions',
216+
);
217+
expect(listCalls).toHaveLength(2);
218+
219+
// Verify second list call used the marker from first response
220+
expect(listCalls[1][0].input.KeyMarker).toBe('attachments/user-123/TASK001/ATT001/image.png');
221+
expect(listCalls[1][0].input.VersionIdMarker).toBe('v1');
222+
223+
// Verify delete was called with the objects from page 2
224+
const deleteCalls = mockS3Send.mock.calls.filter(
225+
(call: any[]) => call[0]?._type === 'DeleteObjects',
226+
);
227+
expect(deleteCalls).toHaveLength(1);
228+
expect(deleteCalls[0][0].input.Delete.Objects).toEqual([
229+
{ Key: 'attachments/user-123/TASK001/ATT001/image.png', VersionId: 'v1' },
230+
]);
231+
});
232+
176233
test('does not throw on partial success (some cancelled, some errored)', async () => {
177234
const thirtyFiveMinAgo = new Date(Date.now() - 35 * 60 * 1000).toISOString();
178235

cdk/test/handlers/confirm-uploads.test.ts

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,4 +312,173 @@ describe('confirm-uploads handler', () => {
312312
const body = JSON.parse(result.body);
313313
expect(body.error.code).toBe('ATTACHMENT_BLOCKED');
314314
});
315+
316+
test('skips S3 cleanup when failTaskOnScreening loses the race (ConditionalCheckFailedException)', async () => {
317+
const { screenImage, AttachmentScreeningError } = jest.requireMock('../../src/handlers/shared/attachment-screening');
318+
319+
ddbSend.mockResolvedValueOnce({ Item: PENDING_TASK });
320+
s3Send
321+
.mockResolvedValueOnce({ VersionId: 'v1', ContentLength: 1024 })
322+
.mockResolvedValueOnce({ VersionId: 'v2', ContentLength: 512 });
323+
324+
// Pre-check passes
325+
ddbSend.mockResolvedValueOnce({ Item: { active_count: 0 } });
326+
327+
// GetObject for first attachment
328+
const pngContent = Buffer.alloc(1024);
329+
s3Send.mockResolvedValueOnce({ Body: { transformToByteArray: () => pngContent } });
330+
331+
// Screening blocks the image
332+
screenImage.mockRejectedValueOnce(new AttachmentScreeningError('Inappropriate content detected'));
333+
334+
// failTaskOnScreening conditional write fails — another caller already transitioned
335+
const condErr = new Error('The conditional request failed');
336+
condErr.name = 'ConditionalCheckFailedException';
337+
ddbSend.mockRejectedValueOnce(condErr);
338+
339+
const result = await handler(makeEvent('task-1'), makeContext(180_000));
340+
expect(result.statusCode).toBe(400);
341+
342+
// S3 DeleteObjectsCommand should NOT have been called (only Head + Get calls)
343+
const s3DeleteCalls = s3Send.mock.calls.filter(
344+
(call: any[]) => call[0]?._type === 'S3Delete',
345+
);
346+
expect(s3DeleteCalls).toHaveLength(0);
347+
});
348+
349+
test('does not re-upload content to S3 after screening passes (no redundant PUT)', async () => {
350+
const { screenImage, screenTextFile } = jest.requireMock('../../src/handlers/shared/attachment-screening');
351+
352+
const pngContent = Buffer.alloc(1024);
353+
pngContent.set([0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]);
354+
const textContent = Buffer.alloc(512);
355+
textContent.write('hello world');
356+
357+
screenImage.mockResolvedValue({
358+
content: pngContent,
359+
contentType: 'image/png',
360+
checksum: 'abc123',
361+
screening: { status: 'passed' },
362+
});
363+
screenTextFile.mockResolvedValue({
364+
content: textContent,
365+
contentType: 'text/plain',
366+
checksum: 'def456',
367+
screening: { status: 'passed' },
368+
});
369+
370+
let ddbCallCount = 0;
371+
ddbSend.mockImplementation(() => {
372+
ddbCallCount++;
373+
switch (ddbCallCount) {
374+
case 1: return Promise.resolve({ Item: PENDING_TASK });
375+
case 2: return Promise.resolve({ Item: { active_count: 1 } });
376+
case 3: return Promise.resolve({});
377+
case 4: return Promise.resolve({});
378+
case 5: return Promise.resolve({});
379+
default: return Promise.resolve({});
380+
}
381+
});
382+
383+
s3Send.mockImplementation((cmd: any) => {
384+
if (cmd._type === 'S3Head') {
385+
const isAtt1 = cmd.input.Key?.includes('att-1');
386+
return Promise.resolve({
387+
VersionId: isAtt1 ? 'v1' : 'v2',
388+
ContentLength: isAtt1 ? 1024 : 512,
389+
});
390+
}
391+
if (cmd._type === 'S3Get') {
392+
const isAtt1 = cmd.input.Key?.includes('att-1');
393+
return Promise.resolve({
394+
Body: { transformToByteArray: () => (isAtt1 ? pngContent : textContent) },
395+
});
396+
}
397+
if (cmd._type === 'S3Put') {
398+
return Promise.resolve({ VersionId: 'v-screened' });
399+
}
400+
return Promise.resolve({});
401+
});
402+
403+
lambdaSend.mockResolvedValueOnce({});
404+
405+
const result = await handler(makeEvent('task-1'), makeContext(180_000));
406+
expect(result.statusCode).toBe(200);
407+
408+
// Verify NO S3 PutObject calls were made
409+
const s3PutCalls = s3Send.mock.calls.filter(
410+
(call: any[]) => call[0]?._type === 'S3Put',
411+
);
412+
expect(s3PutCalls).toHaveLength(0);
413+
});
414+
415+
test('uses original versionId and size from HeadObject in attachment record after screening', async () => {
416+
const { screenImage, screenTextFile } = jest.requireMock('../../src/handlers/shared/attachment-screening');
417+
418+
const pngContent = Buffer.alloc(1024);
419+
pngContent.set([0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]);
420+
const textContent = Buffer.alloc(512);
421+
textContent.write('hello world');
422+
423+
screenImage.mockResolvedValue({
424+
content: pngContent,
425+
contentType: 'image/png',
426+
checksum: 'abc123',
427+
screening: { status: 'passed' },
428+
});
429+
screenTextFile.mockResolvedValue({
430+
content: textContent,
431+
contentType: 'text/plain',
432+
checksum: 'def456',
433+
screening: { status: 'passed' },
434+
});
435+
436+
let ddbCallCount = 0;
437+
ddbSend.mockImplementation(() => {
438+
ddbCallCount++;
439+
switch (ddbCallCount) {
440+
case 1: return Promise.resolve({ Item: PENDING_TASK });
441+
case 2: return Promise.resolve({ Item: { active_count: 0 } });
442+
case 3: return Promise.resolve({});
443+
case 4: return Promise.resolve({});
444+
case 5: return Promise.resolve({});
445+
default: return Promise.resolve({});
446+
}
447+
});
448+
449+
s3Send.mockImplementation((cmd: any) => {
450+
if (cmd._type === 'S3Head') {
451+
const isAtt1 = cmd.input.Key?.includes('att-1');
452+
return Promise.resolve({
453+
VersionId: isAtt1 ? 'original-v1' : 'original-v2',
454+
ContentLength: isAtt1 ? 1024 : 512,
455+
});
456+
}
457+
if (cmd._type === 'S3Get') {
458+
const isAtt1 = cmd.input.Key?.includes('att-1');
459+
return Promise.resolve({
460+
Body: { transformToByteArray: () => (isAtt1 ? pngContent : textContent) },
461+
});
462+
}
463+
return Promise.resolve({});
464+
});
465+
466+
lambdaSend.mockResolvedValueOnce({});
467+
468+
const result = await handler(makeEvent('task-1'), makeContext(180_000));
469+
expect(result.statusCode).toBe(200);
470+
471+
// Check the DDB UpdateCommand (transition to SUBMITTED) includes original versionIds
472+
const updateCall = ddbSend.mock.calls.find(
473+
(call: any[]) => call[0]?.input?.UpdateExpression?.includes('attachments'),
474+
);
475+
expect(updateCall).toBeDefined();
476+
const attachments = updateCall![0].input.ExpressionAttributeValues[':atts'];
477+
const att1 = attachments.find((a: any) => a.attachment_id === 'att-1');
478+
const att2 = attachments.find((a: any) => a.attachment_id === 'att-2');
479+
expect(att1.s3_version_id).toBe('original-v1');
480+
expect(att1.size_bytes).toBe(1024);
481+
expect(att2.s3_version_id).toBe('original-v2');
482+
expect(att2.size_bytes).toBe(512);
483+
});
315484
});

0 commit comments

Comments
 (0)