Skip to content

Commit 5c42d2c

Browse files
author
bgagent
committed
fix(fanout): generalize Slack dedup to cover agent_error + log Retry-After (aws-samples#79 review aws-samples#4)
PR aws-samples#79 review aws-samples#4 surfaced a sibling-channel-failure hazard: when GitHub or Email rate-limits, the record lands in ``batchItemFailures``. On the Lambda retry, every Slack-subscribed event for that record runs again. Terminal events were already guarded by ``slack_notified_terminal``; ``agent_error`` was not — operators would page twice on a single agent failure if a sibling channel happened to fail. Generalize the dedup mechanism. ``TERMINAL_EVENTS`` is replaced by a ``SLACK_DEDUP_ATTRIBUTE`` map that marks each event type with the ``channel_metadata`` attribute that should guard the post: - 5 terminals share ``slack_notified_terminal`` (any first-arriving terminal claims the right; subsequent terminals dedup against it) - ``agent_error`` gets its own ``slack_dispatched_agent_error`` so a duplicate agent_error doesn't reuse the terminal slot - ``task_created`` / ``session_started`` map to ``null`` because they already use the per-event ``slack_*_msg_ts`` conditional persists from review #1 — the conditional already provides full idempotency (a separate marker would be redundant) Also surfaces Slack's ``Retry-After`` header on rate-limited responses through a dedicated ``fanout.slack.retryable_api_error`` warn so operators reading CloudWatch can see the recovery window instead of guessing from sustained warn rate. Tests: - logs Retry-After header on rate-limited Slack responses (new): asserts ``retry_after_seconds`` propagates from Slack's response header into the warn metadata - existing terminal-codes parametrized test untouched (terminal branch doesn't read headers) - existing retryable test gains a ``headers: { get: () => null }`` stub on the fetch mock so the headers.get call doesn't crash Reviewer suggested a per-channel dispatch bitmap as the alternative. Rejected as premature: the duplicate-GitHub-PATCH is harmless (idempotent), Email is still a stub, and the dedup map covers the specific agent_error pain identified above. A bitmap would add a new table + IAM grants + per-dispatch DDB cost for a hypothetical problem (Slack rate-limiting AND a sibling channel failure).
1 parent b7bc393 commit 5c42d2c

2 files changed

Lines changed: 92 additions & 7 deletions

File tree

cdk/src/handlers/slack-notify.ts

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,32 @@ const TERMINAL_EVENTS = new Set<string>([
6767
'task_stranded',
6868
]);
6969

70+
/**
71+
* Map an event type to the ``channel_metadata`` attribute that should
72+
* guard against double-posting on a partial-batch retry. PR #79 review
73+
* #4 surfaced the gap: when GitHub or Email rate-limits and the record
74+
* is replayed, every Slack-subscribed event for that record runs again.
75+
* Terminals were already dedup-protected by ``slack_notified_terminal``
76+
* but ``agent_error`` was not — operators would page twice on a single
77+
* agent failure if a sibling channel happened to fail.
78+
*
79+
* Each entry is an attribute name; ``null`` means the event type is
80+
* intentionally NOT deduped (lifecycle events ``task_created`` /
81+
* ``session_started`` use the per-event ``slack_*_msg_ts`` conditional
82+
* persists instead, which is the right shape since they need to store
83+
* a value, not just a presence marker).
84+
*/
85+
const SLACK_DEDUP_ATTRIBUTE: Record<string, string | null> = {
86+
task_completed: 'slack_notified_terminal',
87+
task_failed: 'slack_notified_terminal',
88+
task_cancelled: 'slack_notified_terminal',
89+
task_timed_out: 'slack_notified_terminal',
90+
task_stranded: 'slack_notified_terminal',
91+
agent_error: 'slack_dispatched_agent_error',
92+
task_created: null,
93+
session_started: null,
94+
};
95+
7096
/** Event types this dispatcher renders. Must stay in sync with the
7197
* Slack entries in ``CHANNEL_DEFAULTS`` (see fanout-task-events.ts) —
7298
* drift means the router subscribes Slack to events that the
@@ -207,24 +233,32 @@ export async function dispatchSlackEvent(
207233
const task = taskResult.Item as TaskRecord | undefined;
208234
if (!task || task.channel_source !== 'slack') return;
209235

210-
// Dedup terminals — the orchestrator can write multiple completion /
211-
// failure events (retries, reconciler). A conditional update on
212-
// ``channel_metadata.slack_notified_terminal`` claims the right to post.
213-
if (TERMINAL_EVENTS.has(eventType)) {
236+
// Dedup any event that should only ever post once per task even
237+
// under partial-batch retry (terminals, agent_error). The orchestrator
238+
// can also write multiple events of the same kind (retries,
239+
// reconciler), so the ``ADD`` on the ``channel_metadata.<attr>``
240+
// marker claims the right to post for the whole event class.
241+
// ``slack_notified_terminal`` covers all 5 terminals collectively;
242+
// ``slack_dispatched_agent_error`` covers agent_error separately so
243+
// the operator gets the first agent_error but not duplicates from
244+
// sibling-channel-failure retries (PR #79 review #4).
245+
const dedupAttr = SLACK_DEDUP_ATTRIBUTE[eventType];
246+
if (dedupAttr) {
214247
try {
215248
await ddb.send(new UpdateCommand({
216249
TableName: tableName,
217250
Key: { task_id: taskId },
218-
UpdateExpression: 'SET channel_metadata.slack_notified_terminal = :t',
219-
ConditionExpression: 'attribute_not_exists(channel_metadata.slack_notified_terminal)',
251+
UpdateExpression: `SET channel_metadata.${dedupAttr} = :t`,
252+
ConditionExpression: `attribute_not_exists(channel_metadata.${dedupAttr})`,
220253
ExpressionAttributeValues: { ':t': true },
221254
}));
222255
} catch (err) {
223256
if ((err as Error)?.name === 'ConditionalCheckFailedException') {
224-
logger.info('[fanout/slack] terminal notification already sent, skipping duplicate', {
257+
logger.info('[fanout/slack] notification already sent, skipping duplicate', {
225258
event: 'fanout.slack.dedup_hit',
226259
task_id: taskId,
227260
event_type: eventType,
261+
dedup_attr: dedupAttr,
228262
});
229263
return;
230264
}
@@ -296,6 +330,19 @@ export async function dispatchSlackEvent(
296330
// ``invalid_blocks``) are wrapped in SlackApiError so the router
297331
// swallows them — retrying ``channel_not_found`` won't help.
298332
if (classifySlackError(errorCode) === 'retryable') {
333+
// Surface ``Retry-After`` (Slack's rate-limit header, in seconds)
334+
// so operators reading CloudWatch can see when the next retry
335+
// should succeed rather than guessing from sustained warn rate
336+
// (PR #79 review #4 mitigation). Header is a string per fetch
337+
// Headers spec; coerce defensively for the log.
338+
const retryAfter = response.headers.get('retry-after');
339+
logger.warn('[fanout/slack] retryable Slack API error', {
340+
event: 'fanout.slack.retryable_api_error',
341+
task_id: taskId,
342+
event_type: eventType,
343+
slack_error_code: errorCode,
344+
retry_after_seconds: retryAfter ?? undefined,
345+
});
299346
throw new Error(failureMessage);
300347
}
301348
throw new SlackApiError(failureMessage);

cdk/test/handlers/slack-notify.test.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ describe('dispatchSlackEvent', () => {
193193
});
194194
fetchMock.mockResolvedValueOnce({
195195
ok: true,
196+
headers: { get: () => null },
196197
json: () => Promise.resolve({ ok: false, error: slackErrorCode }),
197198
});
198199

@@ -207,6 +208,43 @@ describe('dispatchSlackEvent', () => {
207208
expect((caught as Error).message).toContain(slackErrorCode);
208209
});
209210

211+
test('logs Retry-After header on rate-limited Slack responses (PR #79 review #4)', async () => {
212+
// Slack returns the Retry-After header (in seconds) on
213+
// ``ratelimited`` so callers know when to retry. Surfacing it in
214+
// the warn log means operators reading CloudWatch can see the
215+
// expected recovery time instead of guessing from sustained warn
216+
// rate.
217+
ddbSend.mockResolvedValueOnce({
218+
Item: {
219+
task_id: 't1',
220+
channel_source: 'slack',
221+
channel_metadata: { slack_team_id: 'T1', slack_channel_id: 'C1' },
222+
},
223+
});
224+
fetchMock.mockResolvedValueOnce({
225+
ok: true,
226+
headers: { get: (name: string) => (name.toLowerCase() === 'retry-after' ? '30' : null) },
227+
json: () => Promise.resolve({ ok: false, error: 'ratelimited' }),
228+
});
229+
230+
const loggerModule = await import('../../src/handlers/shared/logger');
231+
const warnSpy = jest.spyOn(loggerModule.logger, 'warn').mockImplementation(() => undefined);
232+
try {
233+
await expect(
234+
dispatchSlackEvent(mkEvent('t1', 'task_created'), ddb),
235+
).rejects.toThrow(/ratelimited/);
236+
237+
const retryWarn = warnSpy.mock.calls.find(
238+
c => (c[1] as Record<string, unknown> | undefined)?.event === 'fanout.slack.retryable_api_error',
239+
);
240+
expect(retryWarn).toBeDefined();
241+
expect((retryWarn?.[1] as Record<string, unknown>).retry_after_seconds).toBe('30');
242+
expect((retryWarn?.[1] as Record<string, unknown>).slack_error_code).toBe('ratelimited');
243+
} finally {
244+
warnSpy.mockRestore();
245+
}
246+
});
247+
210248
test('rethrows infra errors so the router records a dispatcher-rejected warn', async () => {
211249
// DDB throttling and Secrets Manager outages must surface to the
212250
// router — Promise.allSettled records them as rejections and batch

0 commit comments

Comments
 (0)