Skip to content

Commit 1b8ffa6

Browse files
author
bgagent
committed
fix(fanout): partial-batch retry + github-comment defense-in-depth (krokoko review aws-samples#1, aws-samples#5, aws-samples#9, aws-samples#12)
Four related fixes on the fanout + github-comment surface, from the code review on PR aws-samples#52. Grouped because they share the narrative "defense-in-depth on the fanout dispatcher" — any one landing without the others leaves a hole. ## Findings addressed **aws-samples#1 — Fanout handler returns void despite reportBatchItemFailures: true** The ``FanOutConsumer`` construct (``cdk/src/constructs/fanout-consumer.ts:146``) has ``reportBatchItemFailures: true`` on its DDB Stream event-source mapping. The handler returned ``void``, so Lambda retried the entire batch on any unhandled throw instead of isolating the poisonous record. Combined with aws-samples#5 this could cascade into retry storms and violated the per-task ordering guarantee we rely on (§6.4, AD-9). Fix: handler return type becomes ``Promise<DynamoDBBatchResponse>``. Per-record processing is wrapped in try/catch; caught throws push ``{ itemIdentifier: record.eventID }`` to ``batchItemFailures`` and emit ``fanout.record.failed`` warn. Final ``fanout.batch.complete`` log grows a ``failed`` count. Note: ``DynamoDBStreamHandler`` constrains return to ``void | Promise<void>``, so the handler is typed as a plain 3-arg async function. Lambda's runtime accepts either shape; existing tests (passing ``event, context, cb``) work unchanged. **aws-samples#5 — Unhandled exception in routeEvent crashes batch** ``routeEvent`` uses ``Promise.allSettled`` internally, but ``resolveTokenSecretArn`` can throw ``AccessDeniedException`` SYNCHRONOUSLY before the ``allSettled`` guard is reached. The new per-record try/catch from aws-samples#1 catches these too. **aws-samples#9 — renderCommentBody not self-defending against uncoerced DDB strings** The ``.toFixed(4)`` call on ``costUsd`` is the same bug class as the ``toFixed is not a function`` crash we fixed at the fanout boundary in commit 9fe704e. Today the sole call site coerces via the shared helper; a future caller that forgets to would crash. Fix: ``renderCommentBody`` coerces ``durationS`` and ``costUsd`` internally via the shared ``coerceNumericOrNull`` helper (second line of defense; caller's coercion remains the first). Widened ``CommentBodyInput`` fields to ``number | string | null`` to honestly model the DDB Document-client boundary. **aws-samples#12 — Markdown injection possible via prUrl in GitHub comment body** ``prUrl`` was interpolated directly into a Markdown link target (``[link](${input.prUrl})``). A crafted URL containing ``)`` / ``|`` / ``\n`` could break the table layout or inject content, and a ``javascript:`` scheme could produce a click-to-execute link on some Markdown renderers. Fix: new exported ``sanitizeMarkdownLinkTarget`` helper in ``shared/github-comment.ts`` rejects URLs containing ``\r\n\t\s)|]"<>`` characters, validates via ``new URL()``, and rejects non-http(s) schemes. Returns ``null`` on rejection so ``renderCommentBody`` omits the Pull-request row entirely rather than emitting a broken or unsafe link. ## Tests +22 regression tests net (fanout 7 for aws-samples#1+aws-samples#5 + 3 for aws-samples#9; github-comment 12 for aws-samples#12): - Fanout partial-batch: poison-pill isolation, mixed-batch (good record NOT in failures), observability warn, empty-failures regression guard, baseline pin that today's ``Promise.allSettled`` containment still works. - renderCommentBody numeric self-defense: string-typed values render correctly; non-finite strings collapse to null with warn; null does NOT warn. - sanitizeMarkdownLinkTarget unit tests: accept clean http/https, reject 9 injection patterns, reject 4 non-http schemes (``javascript:``, ``data:``, ``file:``, ``ftp:``), reject malformed, handle null/undefined. Plus end-to-end assertions on ``renderCommentBody`` proving the PR row is omitted on rejection. CDK suite: 1029 passing (was 1001). Refs: krokoko code review on PR aws-samples#52 (findings 1, 5, 9, 12)
1 parent c779016 commit 1b8ffa6

4 files changed

Lines changed: 680 additions & 51 deletions

File tree

cdk/src/handlers/fanout-task-events.ts

Lines changed: 102 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@
4646

4747
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
4848
import { DynamoDBDocumentClient, GetCommand, UpdateCommand } from '@aws-sdk/lib-dynamodb';
49-
import type { DynamoDBStreamEvent, DynamoDBStreamHandler, DynamoDBRecord } from 'aws-lambda';
49+
import type {
50+
DynamoDBBatchItemFailure,
51+
DynamoDBBatchResponse,
52+
DynamoDBRecord,
53+
DynamoDBStreamEvent,
54+
} from 'aws-lambda';
5055
import { clearTokenCache, resolveGitHubToken } from './shared/context-hydration';
5156
import { renderCommentBody, upsertTaskComment } from './shared/github-comment';
5257
import { logger } from './shared/logger';
@@ -693,9 +698,53 @@ export async function routeEvent(
693698
/**
694699
* Lambda entry point. Invoked by the DynamoDB Streams event source
695700
* mapping with batches of NEW_IMAGE records from `TaskEventsTable`.
701+
*
702+
* Returns a ``DynamoDBBatchResponse`` so the event-source-mapping's
703+
* ``reportBatchItemFailures: true`` setting (see
704+
* ``constructs/fanout-consumer.ts``) can honor partial-batch semantics.
705+
* Without a structured return, a single poisonous record would cause
706+
* Lambda to retry the **entire batch** from the stream checkpoint,
707+
* replaying every sibling event and defeating the per-task ordering
708+
* guarantee promised by ``ParallelizationFactor: 1`` upstream.
709+
*
710+
* Partial-failure surface (per-record try/catch below):
711+
* - ``routeEvent`` wraps each dispatcher in ``Promise.allSettled``, so
712+
* dispatcher rejections are already caught at the channel granularity
713+
* and do not reach here. What DOES reach here is a throw BEFORE the
714+
* ``allSettled`` — e.g. ``resolveTokenSecretArn`` throwing
715+
* ``AccessDeniedException`` on an IAM misconfig (deliberate hard fail
716+
* inside ``dispatchToGitHubComment``), a synchronous throw in
717+
* ``loadTaskForComment`` on a broken DDB env, or any future writer
718+
* that opens a non-``allSettled`` code path.
719+
* - Parse / filter / rate-limit errors are defensive — today they
720+
* cannot throw, but catching them keeps one stray ``throw`` in a
721+
* future refactor (e.g. a stricter ``parseStreamRecord``) from
722+
* crashing the whole batch.
723+
*
724+
* On any caught throw we push ``{ itemIdentifier: record.eventID }`` so
725+
* Lambda retries ONLY that record, isolating the poison pill per
726+
* design §6 + §8.9 expectations. Successful records are NOT in
727+
* ``batchItemFailures`` and advance the stream checkpoint normally.
728+
*
729+
* Refs: PR #52 krokoko code review findings #1 and #5 (the fanout
730+
* handler returned ``void`` despite ``reportBatchItemFailures: true``,
731+
* and a ``routeEvent`` throw from ``resolveTokenSecretArn`` could crash
732+
* the whole batch).
696733
*/
697-
export const handler: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent) => {
734+
// ``DynamoDBStreamHandler`` constrains the return to ``void | Promise<void>``,
735+
// which blocks the ``DynamoDBBatchResponse`` we must return for
736+
// ``reportBatchItemFailures: true`` to work (finding #1). Typing the
737+
// handler as a plain async function preserves the 3-arg Lambda
738+
// signature for existing call sites (tests pass ``event, context, cb``)
739+
// while allowing a structured return. Lambda's runtime accepts either
740+
// shape.
741+
export const handler = async (
742+
event: DynamoDBStreamEvent,
743+
_context?: unknown,
744+
_callback?: unknown,
745+
): Promise<DynamoDBBatchResponse> => {
698746
const perTaskCounts = new Map<string, number>();
747+
const batchItemFailures: DynamoDBBatchItemFailure[] = [];
699748
let processed = 0;
700749
let dispatched = 0;
701750
let dropped = 0;
@@ -706,33 +755,57 @@ export const handler: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent)
706755

707756
for (const record of event.Records) {
708757
processed++;
709-
const ev = parseStreamRecord(record);
710-
if (!ev) {
711-
dropped++;
712-
continue;
713-
}
714-
if (!shouldFanOut(ev, overrides)) {
715-
dropped++;
716-
continue;
717-
}
758+
try {
759+
const ev = parseStreamRecord(record);
760+
if (!ev) {
761+
dropped++;
762+
continue;
763+
}
764+
if (!shouldFanOut(ev, overrides)) {
765+
dropped++;
766+
continue;
767+
}
718768

719-
const seen = perTaskCounts.get(ev.task_id) ?? 0;
720-
if (seen >= MAX_EVENTS_PER_TASK_PER_INVOCATION) {
721-
logger.warn('[fanout] per-task cap hit — dropping event', {
722-
event: 'fanout.rate_limit.hit',
723-
task_id: ev.task_id,
724-
event_id: ev.event_id,
725-
event_type: ev.event_type,
726-
effective_event_type: effectiveEventType(ev),
727-
cap: MAX_EVENTS_PER_TASK_PER_INVOCATION,
769+
const seen = perTaskCounts.get(ev.task_id) ?? 0;
770+
if (seen >= MAX_EVENTS_PER_TASK_PER_INVOCATION) {
771+
logger.warn('[fanout] per-task cap hit — dropping event', {
772+
event: 'fanout.rate_limit.hit',
773+
task_id: ev.task_id,
774+
event_id: ev.event_id,
775+
event_type: ev.event_type,
776+
effective_event_type: effectiveEventType(ev),
777+
cap: MAX_EVENTS_PER_TASK_PER_INVOCATION,
778+
});
779+
dropped++;
780+
continue;
781+
}
782+
perTaskCounts.set(ev.task_id, seen + 1);
783+
784+
const channels = await routeEvent(ev, overrides);
785+
if (channels.length > 0) dispatched++;
786+
} catch (err) {
787+
// Poison-pill isolation: one record's unhandled throw must not
788+
// crash the batch. See the handler doc block for the full list of
789+
// paths that can reach here (notably AccessDeniedException from
790+
// ``resolveTokenSecretArn``, finding #5).
791+
//
792+
// ``eventID`` is the stream-record identifier Lambda uses for the
793+
// retry cursor; on Kinesis-style event-source-mappings with
794+
// ``reportBatchItemFailures: true`` the service retries all
795+
// records at-or-after the lowest-sequence failure. Returning even
796+
// one failed itemIdentifier is enough to preserve ordering across
797+
// the whole batch for that task.
798+
const eventID = record.eventID;
799+
logger.warn('[fanout] record threw — flagging for partial-batch retry', {
800+
event: 'fanout.record.failed',
801+
event_id: eventID,
802+
error: err instanceof Error ? err.message : String(err),
803+
error_name: err instanceof Error ? err.name : undefined,
728804
});
729-
dropped++;
730-
continue;
805+
if (eventID !== undefined) {
806+
batchItemFailures.push({ itemIdentifier: eventID });
807+
}
731808
}
732-
perTaskCounts.set(ev.task_id, seen + 1);
733-
734-
const channels = await routeEvent(ev, overrides);
735-
if (channels.length > 0) dispatched++;
736809
}
737810

738811
logger.info('[fanout] batch complete', {
@@ -741,6 +814,9 @@ export const handler: DynamoDBStreamHandler = async (event: DynamoDBStreamEvent)
741814
processed,
742815
dispatched,
743816
dropped,
817+
failed: batchItemFailures.length,
744818
unique_tasks: perTaskCounts.size,
745819
});
820+
821+
return { batchItemFailures };
746822
};

cdk/src/handlers/shared/github-comment.ts

Lines changed: 107 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
*/
4848

4949
import { logger } from './logger';
50+
import { coerceNumericOrNull } from './numeric';
5051

5152
/** GitHub REST v3 media type — required on writes for stable behavior. */
5253
const GITHUB_ACCEPT = 'application/vnd.github.v3+json';
@@ -284,31 +285,122 @@ function sanitizeEventType(eventType: string): string {
284285
return eventType.replace(/[`|\r\n]/g, '');
285286
}
286287

288+
/**
289+
* Sanitize a ``prUrl`` before interpolating it into a Markdown link
290+
* target (``[link](<here>)``). Without this guard a crafted URL could
291+
* break the table layout or inject trailing Markdown: ``)`` closes the
292+
* link prematurely, ``|`` starts a new table column, and CR / LF / ``]``
293+
* / ``"`` each let an attacker extend the comment body past the link.
294+
*
295+
* Strategy: strip the six characters that are meaningful inside
296+
* ``[text](target)`` Markdown AND reject anything that does not parse
297+
* as an ``http://`` or ``https://`` URL after the strip. The strip is
298+
* deliberately conservative — a legitimate GitHub PR URL
299+
* (``https://github.com/owner/repo/pull/42``) never contains any of the
300+
* listed characters, so false positives are effectively zero. Any URL
301+
* that fails validation returns ``null`` and the caller must omit the
302+
* Pull-request row entirely rather than risk a broken-layout comment.
303+
*
304+
* Refs: PR #52 krokoko code review finding #12 (Markdown injection
305+
* possible via ``prUrl`` in GitHub comment body).
306+
*/
307+
export function sanitizeMarkdownLinkTarget(url: string | null | undefined): string | null {
308+
if (url === null || url === undefined) return null;
309+
// Reject anything carrying a Markdown-significant character. We do
310+
// NOT attempt to URL-encode these — encoded ``)`` (``%29``) would
311+
// render correctly, but encoding opens a harder surface to reason
312+
// about (e.g. an attacker who gets ``%0A`` past this function could
313+
// still break the table on some Markdown renderers). A flat reject
314+
// keeps the contract simple and the comment row trustworthy.
315+
if (/[\r\n\t\s)|\]"<>`]/.test(url)) return null;
316+
// Validate the parsed URL is http(s). A ``javascript:`` link target
317+
// is also attacker-controlled content and has no place in a status
318+
// comment. ``new URL`` throws on malformed input.
319+
let parsed: URL;
320+
try {
321+
parsed = new URL(url);
322+
} catch {
323+
return null;
324+
}
325+
if (parsed.protocol !== 'https:' && parsed.protocol !== 'http:') return null;
326+
return url;
327+
}
328+
287329
function bgagentMarker(taskId: string): string {
288330
return `<!-- ${BGAGENT_COMMENT_MARKER_PREFIX}${taskId} -->`;
289331
}
290332

291-
/** A compact terminal-friendly summary the GitHub comment displays as
292-
* the task progresses. Kept small on purpose — GitHub truncates long
293-
* comments in mobile / email notifications and the PR activity log
294-
* accumulates the full history anyway. */
333+
/**
334+
* A compact terminal-friendly summary the GitHub comment displays as
335+
* the task progresses. Kept small on purpose — GitHub truncates long
336+
* comments in mobile / email notifications and the PR activity log
337+
* accumulates the full history anyway.
338+
*
339+
* The numeric fields accept ``string`` alongside ``number | null`` to
340+
* honestly model the DynamoDB Document-client boundary: ``Number``
341+
* attributes deserialize as JS ``string`` in some code paths (see
342+
* ``shared/numeric.ts``). ``renderCommentBody`` coerces defensively so
343+
* a future caller that forgets to run the shared coercion helper at
344+
* its own boundary does not crash the way the fanout dispatcher did in
345+
* Scenario 7-ext (``costUsd.toFixed is not a function``, commit
346+
* ``9fe704e``).
347+
*/
295348
export interface CommentBodyInput {
296349
readonly taskId: string;
297350
readonly status: string;
298351
readonly repo: string;
299352
readonly latestEventType: string;
300353
readonly latestEventAt: string;
301354
readonly prUrl: string | null;
302-
readonly durationS: number | null;
303-
readonly costUsd: number | null;
355+
readonly durationS: number | string | null;
356+
readonly costUsd: number | string | null;
304357
}
305358

306359
/**
307-
* Render the Markdown body for the in-place comment. Pure: no logger,
308-
* no timing, no side effects — callers can snapshot-test the exact
309-
* bytes without monkey-patching anything.
360+
* Render the Markdown body for the in-place comment. Pure: no timing,
361+
* no network — callers can snapshot-test the exact bytes. The only
362+
* side effect is a ``logger.warn`` via ``coerceNumericOrNull`` if a
363+
* numeric field arrives with a non-finite value (e.g. ``'NaN'``), which
364+
* surfaces upstream writer bugs instead of silently dropping the row.
365+
*
366+
* Defense-in-depth vs. the caller's coercion (finding #9):
367+
* - Callers SHOULD coerce DDB numerics at their own boundary using
368+
* ``coerceNumericOrNull`` so the warn log carries their context
369+
* (task_id, event_id). The fanout dispatcher does this today.
370+
* - ``renderCommentBody`` coerces again internally so a future caller
371+
* that forgets the boundary step (e.g. a Chunk K reconciler
372+
* reading raw DDB items) still degrades to a null-omitted row
373+
* instead of throwing ``TypeError: .toFixed is not a function``.
374+
* Non-finite values (``NaN``, ``Infinity``) collapse to null and
375+
* omit the row; finite values (including parseable strings) render
376+
* normally.
377+
*
378+
* Markdown-link target sanitization (finding #12):
379+
* - ``prUrl`` is interpolated into a Markdown link (``[link](<here>)``).
380+
* Without sanitization a crafted URL containing ``)`` / ``|`` / CR
381+
* / LF could break the table layout or inject trailing content.
382+
* ``sanitizeMarkdownLinkTarget`` strips the injection surface and
383+
* validates the URL is http(s); a rejected URL omits the row
384+
* rather than rendering a broken or misleading link.
310385
*/
311386
export function renderCommentBody(input: CommentBodyInput): string {
387+
// Coerce DDB-string numerics defensively — see doc block above. The
388+
// context object gives the ``numeric.coercion_failed`` warn enough
389+
// breadcrumbs (field + task_id) to trace back to the upstream writer.
390+
const durationS = coerceNumericOrNull(
391+
input.durationS,
392+
{ field: 'duration_s', task_id: input.taskId },
393+
logger,
394+
);
395+
const costUsd = coerceNumericOrNull(
396+
input.costUsd,
397+
{ field: 'cost_usd', task_id: input.taskId },
398+
logger,
399+
);
400+
// Sanitize the PR link target before interpolation. A rejected URL
401+
// returns null and the row is omitted.
402+
const safePrUrl = sanitizeMarkdownLinkTarget(input.prUrl);
403+
312404
const lines: string[] = [];
313405
lines.push(bgagentMarker(input.taskId));
314406
lines.push(`### Background agent — ${input.status}`);
@@ -319,14 +411,14 @@ export function renderCommentBody(input: CommentBodyInput): string {
319411
lines.push(`| Repo | \`${input.repo}\` |`);
320412
lines.push(`| Status | **${input.status}** |`);
321413
lines.push(`| Last event | \`${sanitizeEventType(input.latestEventType)}\` @ ${input.latestEventAt} |`);
322-
if (input.prUrl) {
323-
lines.push(`| Pull request | [link](${input.prUrl}) |`);
414+
if (safePrUrl) {
415+
lines.push(`| Pull request | [link](${safePrUrl}) |`);
324416
}
325-
if (input.durationS !== null) {
326-
lines.push(`| Duration | ${input.durationS}s |`);
417+
if (durationS !== null) {
418+
lines.push(`| Duration | ${durationS}s |`);
327419
}
328-
if (input.costUsd !== null) {
329-
lines.push(`| Cost | $${input.costUsd.toFixed(4)} |`);
420+
if (costUsd !== null) {
421+
lines.push(`| Cost | $${costUsd.toFixed(4)} |`);
330422
}
331423
const rendered = lines.join('\n');
332424
if (rendered.length <= MAX_COMMENT_BODY_CHARS) return rendered;

0 commit comments

Comments
 (0)