Skip to content

Commit 331e283

Browse files
author
bgagent
committed
fix(reconciler,agent): reconciler alarming + TaskConfig trace-without-user_id validator (krokoko review aws-samples#7, aws-samples#11)
Two small hardening changes paired under the theme "surface silent failures at construction time / at run time". Both make existing latent bugs visible instead of silently breaking at the worst possible moment. ## Findings addressed **aws-samples#7 — Reconciler silently succeeds when ALL transitions fail** ``reconcile-stranded-tasks`` previously logged per-task failures at WARN and returned success unconditionally. A systemic failure (DDB throttling at the shard level, IAM outage, schema corruption) could strand 100% of candidates — and the only signal was a WARN log per task plus a final INFO that looked like a healthy run. Operators dashboarding "reconciler completed" counts would not notice the outage. New behaviour classifies the run result into three cases and picks a log level accordingly: - ``stranded > 0 AND failed == 0 AND errors > 0`` → ERROR with ``error_id: 'RECONCILER_TOTAL_FAILURE'``. Systemic failure; alarm on the error_id string. - ``errors > 0 AND failed > 0`` → WARN with ``error_id: 'RECONCILER_PARTIAL_FAILURE'``. Dashboards signal; not alarm-worthy on its own. - Otherwise → INFO, as today. The handler still does NOT throw — event-source-mapping invocations complete normally. The log-level escalation IS the alarm signal, matching the ``error_id`` convention already used in ``fanout-task-events.ts`` (``FANOUT_GITHUB_PERSIST_FAILED``). **aws-samples#11 — TaskConfig missing @model_validator for trace=True + user_id=""** The trace trajectory is uploaded to ``traces/<user_id>/<task_id>.jsonl.gz`` (design §10.1), and the ``get-trace-url`` handler refuses presigned keys outside the caller's own ``traces/<user_id>/`` prefix. Pre-fix, a TaskConfig built with ``trace=True`` and an empty ``user_id`` sentinel would construct fine and fail later at S3 upload time — mid-task, when the agent had already paid the cost of running. Added a ``@model_validator(mode='after')`` on ``TaskConfig`` that raises a descriptive ``ValueError`` when ``trace=True`` and ``user_id`` is empty. Construction fails immediately; local/dev misconfigurations surface before the agent wastes tokens. The error message cites design §10.1 + the get-trace-url handler's prefix guard so the remedy is clear without cross-referencing other files. ## Tests **CDK reconciler (+4 tests):** - Total-failure case logs ERROR + ``RECONCILER_TOTAL_FAILURE``. - Partial-failure case logs WARN + ``RECONCILER_PARTIAL_FAILURE``. - Full-success case logs INFO (happy-path regression). - Empty-candidate case logs INFO (not alarming — absence of stranded tasks is the target state). CDK suite: 1036 passing (was 1032). **Agent TaskConfig (+3 tests, +2 pipeline realignments):** - ``test_trace_true_with_empty_user_id_raises_at_construction`` — validator fires immediately with the documented message fragment. - ``test_trace_true_with_valid_user_id_constructs_cleanly`` — happy-path regression. - ``test_trace_false_allows_empty_user_id`` — negative control; local/batch runs without an orchestrator still work as long as they do not opt into trace capture. - Two existing ``test_pipeline.py`` tests constructed ``TaskConfig(trace=True, user_id="")`` directly. These now either (a) pass a real ``user_id`` for the happy path, or (b) assert that construction raises — the tightened contract is strictly stronger than the previous "defensive-at-upload skip". Agent suite: 500 passing (was 498; +3 new, −1 obsolete, +0 net from pipeline realignment). Refs: krokoko code review on PR aws-samples#52 (findings 7, 11)
1 parent db55bfa commit 331e283

5 files changed

Lines changed: 266 additions & 15 deletions

File tree

agent/src/models.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,28 @@ class TaskConfig(BaseModel):
127127
issue: GitHubIssue | None = None
128128
base_branch: str | None = None
129129

130+
@model_validator(mode="after")
131+
def _validate_trace_requires_user_id(self) -> Self:
132+
"""Fail at construction when trace=True without a user_id.
133+
134+
The trace trajectory is uploaded to
135+
``traces/<user_id>/<task_id>.jsonl.gz`` (design §10.1). An empty
136+
``user_id`` produces ``traces//<task_id>.jsonl.gz``, which the
137+
``get-trace-url`` handler's per-caller-prefix guard refuses.
138+
Catching this at construction time surfaces the misconfiguration
139+
locally / in CI instead of deferring to runtime S3 upload.
140+
"""
141+
if self.trace and not self.user_id:
142+
raise ValueError(
143+
"trace=True requires a non-empty user_id. Local/batch runs "
144+
"without an orchestrator must either set trace=False (the "
145+
"default) or supply user_id explicitly. The trace trajectory "
146+
"is uploaded to traces/<user_id>/<task_id>.jsonl.gz (design "
147+
"§10.1), and the get-trace-url handler refuses keys outside "
148+
"the caller's traces/<user_id>/ prefix."
149+
)
150+
return self
151+
130152

131153
class RepoSetup(BaseModel):
132154
model_config = ConfigDict(frozen=True)

agent/tests/test_models.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,40 @@ def test_validate_assignment(self):
280280
config.max_turns = 50
281281
assert config.max_turns == 50
282282

283+
def test_trace_true_with_empty_user_id_raises_at_construction(self):
284+
"""trace=True + user_id='' must fail at construction, not at S3 upload."""
285+
with pytest.raises(ValidationError, match="trace=True requires a non-empty user_id"):
286+
TaskConfig(
287+
repo_url="owner/repo",
288+
github_token="ghp_test",
289+
aws_region="us-east-1",
290+
trace=True,
291+
# user_id omitted — defaults to ""
292+
)
293+
294+
def test_trace_true_with_valid_user_id_constructs_cleanly(self):
295+
"""Happy path: trace=True with a non-empty user_id is accepted."""
296+
config = TaskConfig(
297+
repo_url="owner/repo",
298+
github_token="ghp_test",
299+
aws_region="us-east-1",
300+
trace=True,
301+
user_id="cognito-sub-abc-123",
302+
)
303+
assert config.trace is True
304+
assert config.user_id == "cognito-sub-abc-123"
305+
306+
def test_trace_false_allows_empty_user_id(self):
307+
"""Negative control: local batch runs (trace=False, user_id='') still work."""
308+
config = TaskConfig(
309+
repo_url="owner/repo",
310+
github_token="ghp_test",
311+
aws_region="us-east-1",
312+
# trace defaults to False; user_id defaults to ""
313+
)
314+
assert config.trace is False
315+
assert config.user_id == ""
316+
283317

284318
class TestRepoSetup:
285319
def test_construction(self):

agent/tests/test_pipeline.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
from unittest.mock import MagicMock, patch
44

5+
import pytest
6+
from pydantic import ValidationError
7+
58
from models import AgentResult, RepoSetup, TaskConfig
69
from pipeline import _chain_prior_agent_error, _resolve_overall_task_status
710

@@ -439,12 +442,14 @@ async def fake_run_agent(_prompt, _system_prompt, config, cwd=None, trajectory=N
439442
aws_region="us-east-1",
440443
task_id="t-trace",
441444
trace=True,
445+
user_id="cognito-sub-trace-user",
442446
)
443447

444448
assert captured_config is not None
445449
# The config reaching run_agent carries trace=True so runner.py's
446450
# _ProgressWriter(config.task_id, trace=config.trace) picks it up.
447451
assert captured_config.trace is True
452+
assert captured_config.user_id == "cognito-sub-trace-user"
448453

449454
@patch("pipeline.run_agent")
450455
@patch("pipeline.build_system_prompt")
@@ -668,9 +673,17 @@ def test_upload_skipped_when_user_id_empty_and_trace_true(
668673
mock_upload,
669674
monkeypatch,
670675
):
671-
"""K2 Stage 3 review Finding #1 — empty user_id with trace=True
672-
must skip the upload to avoid writing an unreachable
673-
``traces//<task_id>.jsonl.gz`` artifact."""
676+
"""krokoko review Finding #11 — trace=True with empty user_id now
677+
fails at ``TaskConfig`` construction time (pre-flight validation)
678+
rather than silently skipping the upload and returning
679+
``trace_s3_uri=None``.
680+
681+
Previously (rev-5) this was a best-effort defensive skip inside
682+
``pipeline.run_task``'s trace-upload block; shifting the check to
683+
the Pydantic model means misconfigured callers surface the error
684+
immediately, before any agent work runs. The upload mock is never
685+
exercised because we never reach the upload path.
686+
"""
674687
monkeypatch.setenv("GITHUB_TOKEN", "ghp_test")
675688
monkeypatch.setenv("AWS_REGION", "us-east-1")
676689

@@ -700,18 +713,18 @@ async def fake_run_agent(_prompt, _system_prompt, _config, cwd=None, trajectory=
700713
):
701714
from pipeline import run_task
702715

703-
result = run_task(
704-
repo_url="owner/repo",
705-
task_description="trace without user",
706-
github_token="ghp_test",
707-
aws_region="us-east-1",
708-
task_id="t-no-uid",
709-
trace=True,
710-
user_id="", # empty — must gate upload
711-
)
716+
with pytest.raises(ValidationError, match="trace=True requires a non-empty user_id"):
717+
run_task(
718+
repo_url="owner/repo",
719+
task_description="trace without user",
720+
github_token="ghp_test",
721+
aws_region="us-east-1",
722+
task_id="t-no-uid",
723+
trace=True,
724+
user_id="", # empty — now rejected at TaskConfig construction
725+
)
712726

713727
assert not mock_upload.called
714-
assert result["trace_s3_uri"] is None
715728

716729
@patch("pipeline.upload_trace_to_s3")
717730
@patch("pipeline.run_agent")

cdk/src/handlers/reconcile-stranded-tasks.ts

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,10 +284,46 @@ export async function handler(): Promise<void> {
284284
}
285285
}
286286

287-
logger.info('Stranded-task reconciler finished', {
287+
// Severity escalation for the final log line.
288+
//
289+
// Per-task failures upstream are caught and swallowed (logged at WARN)
290+
// so one flaky DDB call doesn't abort the entire reconcile window. But
291+
// a systemic failure — IAM outage, table-level throttling, schema
292+
// corruption — can silently strand 100% of candidates while each
293+
// individual WARN line looks ignorable. We classify the terminal log
294+
// three ways so CloudWatch Log Insights / metric filters can alarm on
295+
// the dedicated `error_id` strings:
296+
//
297+
// 1. totalStranded > 0 AND totalFailed == 0 AND totalErrors > 0
298+
// → SYSTEMIC failure. Every candidate hit an exception. Log ERROR
299+
// with error_id='RECONCILER_TOTAL_FAILURE' (alarm-worthy).
300+
// 2. totalErrors > 0 AND totalFailed > 0
301+
// → PARTIAL failure. Some tasks transitioned, some didn't. Log
302+
// WARN with error_id='RECONCILER_PARTIAL_FAILURE' (dashboard
303+
// signal, not an alarm — expected under occasional DDB flakes).
304+
// 3. Otherwise (no stranded, or all-success with zero errors)
305+
// → SUCCESS. Log INFO as before.
306+
//
307+
// We do NOT throw — the EventBridge schedule invocation should still
308+
// complete "normally" (no retry storm against an already-degraded
309+
// DDB). The log-level escalation IS the alarm signal.
310+
const finalPayload = {
288311
stranded: totalStranded,
289312
failed: totalFailed,
290313
skipped: totalSkipped,
291314
errors: totalErrors,
292-
});
315+
};
316+
if (totalStranded > 0 && totalFailed === 0 && totalErrors > 0) {
317+
logger.error('Stranded-task reconciler finished — every candidate failed to transition', {
318+
...finalPayload,
319+
error_id: 'RECONCILER_TOTAL_FAILURE',
320+
});
321+
} else if (totalErrors > 0 && totalFailed > 0) {
322+
logger.warn('Stranded-task reconciler finished with partial failures', {
323+
...finalPayload,
324+
error_id: 'RECONCILER_PARTIAL_FAILURE',
325+
});
326+
} else {
327+
logger.info('Stranded-task reconciler finished', finalPayload);
328+
}
293329
}

cdk/test/handlers/reconcile-stranded-tasks.test.ts

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,152 @@ describe('reconcile-stranded-tasks', () => {
175175
expect(statusValues).toEqual(expect.arrayContaining(['SUBMITTED', 'HYDRATING']));
176176
});
177177

178+
describe('final log severity escalation', () => {
179+
// Spy on the logger module used by the handler. We import the logger
180+
// directly and replace the three level methods with jest.fn before
181+
// each test so we can assert exactly which level was called.
182+
// eslint-disable-next-line @typescript-eslint/no-var-requires
183+
const loggerModule = require('../../src/handlers/shared/logger') as {
184+
logger: {
185+
info: (m: string, d?: Record<string, unknown>) => void;
186+
warn: (m: string, d?: Record<string, unknown>) => void;
187+
error: (m: string, d?: Record<string, unknown>) => void;
188+
};
189+
};
190+
191+
let infoSpy: jest.SpyInstance;
192+
let warnSpy: jest.SpyInstance;
193+
let errorSpy: jest.SpyInstance;
194+
195+
beforeEach(() => {
196+
infoSpy = jest.spyOn(loggerModule.logger, 'info').mockImplementation(() => { /* silence */ });
197+
warnSpy = jest.spyOn(loggerModule.logger, 'warn').mockImplementation(() => { /* silence */ });
198+
errorSpy = jest.spyOn(loggerModule.logger, 'error').mockImplementation(() => { /* silence */ });
199+
});
200+
201+
afterEach(() => {
202+
infoSpy.mockRestore();
203+
warnSpy.mockRestore();
204+
errorSpy.mockRestore();
205+
});
206+
207+
/**
208+
* Find the final reconciler log line (i.e. the one whose message
209+
* starts with 'Stranded-task reconciler finished') across all spies
210+
* and return its [level, message, payload] triple.
211+
*/
212+
function findFinalLog(): { level: 'INFO' | 'WARN' | 'ERROR'; message: string; payload: Record<string, unknown> } {
213+
const match = (spy: jest.SpyInstance, level: 'INFO' | 'WARN' | 'ERROR') => {
214+
const call = spy.mock.calls.find(
215+
(c: unknown[]) => typeof c[0] === 'string' && (c[0] as string).startsWith('Stranded-task reconciler finished'),
216+
);
217+
return call ? { level, message: call[0] as string, payload: (call[1] ?? {}) as Record<string, unknown> } : null;
218+
};
219+
return match(errorSpy, 'ERROR') ?? match(warnSpy, 'WARN') ?? match(infoSpy, 'INFO')
220+
?? (() => { throw new Error('No final reconciler log line found'); })();
221+
}
222+
223+
test('test_logs_ERROR_with_RECONCILER_TOTAL_FAILURE_error_id_when_every_task_fails', async () => {
224+
// Two candidates both hit an exception on the first DDB write
225+
// (UpdateItem transition). None transition cleanly, so totalFailed=0,
226+
// totalStranded=2, totalErrors=2 → systemic failure path.
227+
const ancient = new Date(Date.now() - 25 * 60 * 1000).toISOString();
228+
const ddbErr = Object.assign(new Error('DDB blew up'), { name: 'InternalServerError' });
229+
primeResponses([
230+
// SUBMITTED query → two candidates.
231+
{
232+
Items: [
233+
mockTaskRow({ task_id: 't-fail-1', user_id: 'u-1', created_at: ancient }),
234+
mockTaskRow({ task_id: 't-fail-2', user_id: 'u-2', created_at: ancient }),
235+
],
236+
},
237+
ddbErr, // UpdateItem for t-fail-1 → throws
238+
ddbErr, // UpdateItem for t-fail-2 → throws
239+
{ Items: [] }, // HYDRATING query
240+
]);
241+
242+
await handler();
243+
244+
const final = findFinalLog();
245+
expect(final.level).toBe('ERROR');
246+
expect(final.payload.error_id).toBe('RECONCILER_TOTAL_FAILURE');
247+
expect(final.payload.stranded).toBe(2);
248+
expect(final.payload.failed).toBe(0);
249+
expect(final.payload.errors).toBe(2);
250+
});
251+
252+
test('test_logs_WARN_with_RECONCILER_PARTIAL_FAILURE_when_some_tasks_fail', async () => {
253+
// One success (4 writes), one failure (throws on UpdateItem).
254+
const ancient = new Date(Date.now() - 25 * 60 * 1000).toISOString();
255+
const ddbErr = Object.assign(new Error('DDB throttled'), { name: 'ProvisionedThroughputExceededException' });
256+
primeResponses([
257+
// SUBMITTED query → two candidates.
258+
{
259+
Items: [
260+
mockTaskRow({ task_id: 't-ok', user_id: 'u-a', created_at: ancient }),
261+
mockTaskRow({ task_id: 't-fail', user_id: 'u-b', created_at: ancient }),
262+
],
263+
},
264+
{}, // UpdateItem t-ok (transition) → success
265+
{}, // PutItem task_stranded event
266+
{}, // PutItem task_failed event
267+
{}, // UpdateItem decrement concurrency
268+
ddbErr, // UpdateItem t-fail (transition) → throws
269+
{ Items: [] }, // HYDRATING query
270+
]);
271+
272+
await handler();
273+
274+
const final = findFinalLog();
275+
expect(final.level).toBe('WARN');
276+
expect(final.payload.error_id).toBe('RECONCILER_PARTIAL_FAILURE');
277+
expect(final.payload.stranded).toBe(2);
278+
expect(final.payload.failed).toBe(1);
279+
expect(final.payload.errors).toBe(1);
280+
});
281+
282+
test('test_logs_INFO_on_full_success', async () => {
283+
// Two candidates, both transition cleanly.
284+
const ancient = new Date(Date.now() - 25 * 60 * 1000).toISOString();
285+
primeResponses([
286+
{
287+
Items: [
288+
mockTaskRow({ task_id: 't-1', user_id: 'u-a', created_at: ancient }),
289+
mockTaskRow({ task_id: 't-2', user_id: 'u-b', created_at: ancient }),
290+
],
291+
},
292+
{}, {}, {}, {}, // t-1: transition + 2 events + decrement
293+
{}, {}, {}, {}, // t-2: transition + 2 events + decrement
294+
{ Items: [] }, // HYDRATING
295+
]);
296+
297+
await handler();
298+
299+
const final = findFinalLog();
300+
expect(final.level).toBe('INFO');
301+
expect(final.payload.error_id).toBeUndefined();
302+
expect(final.payload.stranded).toBe(2);
303+
expect(final.payload.failed).toBe(2);
304+
expect(final.payload.errors).toBe(0);
305+
});
306+
307+
test('test_no_stranded_tasks_logs_INFO_not_ERROR', async () => {
308+
// Empty-query case: totalStranded=0. Must NOT alarm.
309+
primeResponses([
310+
{ Items: [] }, // SUBMITTED
311+
{ Items: [] }, // HYDRATING
312+
]);
313+
314+
await handler();
315+
316+
const final = findFinalLog();
317+
expect(final.level).toBe('INFO');
318+
expect(final.payload.stranded).toBe(0);
319+
expect(final.payload.errors).toBe(0);
320+
expect(errorSpy).not.toHaveBeenCalled();
321+
});
322+
});
323+
178324
test('query paginates with ExclusiveStartKey when LastEvaluatedKey present', async () => {
179325
const ancient = new Date(Date.now() - 25 * 60 * 1000).toISOString();
180326
// findStrandedCandidates paginates internally and returns ALL rows

0 commit comments

Comments
 (0)