Skip to content

Commit f604f9c

Browse files
TD-P012 (#370): fail deterministically on codec-decode errors at task boundary
Previously a worker task whose inbound payload could not be decoded left the lease to time out instead of reporting a structured failure. The three paths that bypassed fail_{workflow,activity}_task were: 1. Activity argument decode — decode_envelope() was called outside any try/except, so AvroNotInstalledError (ImportError subclass) or malformed-payload ValueError bubbled to the dispatcher's log-and-drop catch block. 2. Workflow start-input decode — try/except caught ValueError only; AvroNotInstalledError slipped past because ImportError is not a ValueError. 3. Replay history-result decode — serializer.decode() was called with no codec override, so Avro-encoded activity results were read as JSON. Any decode failure propagated to the dispatcher. Changes: - workflow.replay() accepts payload_codec and threads it through the ActivityCompleted / SideEffectRecorded / ChildRunCompleted result decodes so Avro-pinned runs replay correctly. - Worker wraps each decode site in targeted try/except for AvroNotInstalledError and (ValueError, TypeError); reports fail_* with a clear codec/install hint, failure_type, and stack trace. Activity decode failures are marked non_retryable — bad bytes won't become good on retry. Regression tests cover: - activity with invalid JSON bytes -> fail_activity_task, non_retryable - activity with invalid Avro bytes -> fail_activity_task, non_retryable - activity with avro extra missing (monkeypatched) -> fail_activity_task - workflow with invalid JSON start -> fail_workflow_task - workflow with avro extra missing on start -> fail_workflow_task - workflow replay with avro extra missing on history -> fail_workflow_task Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 8fc211a commit f604f9c

3 files changed

Lines changed: 254 additions & 7 deletions

File tree

src/durable_workflow/worker.py

Lines changed: 91 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from . import serializer
1212
from .activity import ActivityContext, ActivityInfo, _set_context
1313
from .client import Client
14-
from .errors import ActivityCancelled, NonRetryableError
14+
from .errors import ActivityCancelled, AvroNotInstalledError, NonRetryableError
1515
from .workflow import replay
1616

1717
log = logging.getLogger("durable_workflow.worker")
@@ -120,8 +120,40 @@ async def _run_workflow_task(self, task: dict[str, Any]) -> list[dict[str, Any]]
120120
decoded = serializer.decode_envelope(raw_args, codec=codec)
121121
if decoded is not None:
122122
start_input = decoded if isinstance(decoded, list) else [decoded]
123-
except ValueError as e:
124-
log.warning("task %s start input unreadable (%s); falling back to [].", task_id, e)
123+
except AvroNotInstalledError as e:
124+
log.exception("task %s start input Avro decode failed (extra not installed)", task_id)
125+
try:
126+
await self.client.fail_workflow_task(
127+
task_id=task_id,
128+
lease_owner=self.worker_id,
129+
workflow_task_attempt=attempt,
130+
message=(
131+
f"cannot decode workflow start input with codec 'avro': {e}. "
132+
f"Install the avro extra: pip install 'durable-workflow[avro]'."
133+
),
134+
failure_type=type(e).__name__,
135+
stack_trace=traceback.format_exc(),
136+
)
137+
except Exception as fe:
138+
log.warning("failed to report Avro-missing start input failure: %s", fe)
139+
return None
140+
except (ValueError, TypeError) as e:
141+
log.exception("task %s start input decode failed (codec=%r)", task_id, codec)
142+
try:
143+
await self.client.fail_workflow_task(
144+
task_id=task_id,
145+
lease_owner=self.worker_id,
146+
workflow_task_attempt=attempt,
147+
message=(
148+
f"cannot decode workflow start input with codec {codec!r}: {e}. "
149+
f"Verify the start input bytes match the declared codec and writer schema."
150+
),
151+
failure_type=type(e).__name__,
152+
stack_trace=traceback.format_exc(),
153+
)
154+
except Exception as fe:
155+
log.warning("failed to report start input decode failure: %s", fe)
156+
return None
125157

126158
run_id: str = task.get("run_id", "")
127159

@@ -140,7 +172,24 @@ async def _run_workflow_task(self, task: dict[str, Any]) -> list[dict[str, Any]]
140172
return None
141173

142174
try:
143-
outcome = replay(cls, history, start_input, run_id=run_id)
175+
outcome = replay(cls, history, start_input, run_id=run_id, payload_codec=codec)
176+
except AvroNotInstalledError as e:
177+
log.exception("replay failed: Avro extra not installed")
178+
try:
179+
await self.client.fail_workflow_task(
180+
task_id=task_id,
181+
lease_owner=self.worker_id,
182+
workflow_task_attempt=attempt,
183+
message=(
184+
f"cannot replay workflow history with codec 'avro': {e}. "
185+
f"Install the avro extra: pip install 'durable-workflow[avro]'."
186+
),
187+
failure_type=type(e).__name__,
188+
stack_trace=traceback.format_exc(),
189+
)
190+
except Exception as fe:
191+
log.warning("failed to report replay Avro-missing failure: %s", fe)
192+
return None
144193
except Exception as e:
145194
log.exception("replay failed")
146195
try:
@@ -182,7 +231,44 @@ async def _run_activity_task(self, task: dict[str, Any]) -> None:
182231
raw_args = task.get("arguments")
183232
inbound_codec = task.get("payload_codec") or serializer.JSON_CODEC
184233
result_codec = inbound_codec if inbound_codec in serializer.SUPPORTED_CODECS else serializer.JSON_CODEC
185-
args = serializer.decode_envelope(raw_args, codec=inbound_codec) or []
234+
try:
235+
args = serializer.decode_envelope(raw_args, codec=inbound_codec) or []
236+
except AvroNotInstalledError as e:
237+
log.exception("activity %s arguments Avro decode failed (extra not installed)", task_id)
238+
try:
239+
await self.client.fail_activity_task(
240+
task_id=task_id,
241+
activity_attempt_id=attempt_id,
242+
lease_owner=self.worker_id,
243+
message=(
244+
f"cannot decode activity arguments with codec 'avro': {e}. "
245+
f"Install the avro extra: pip install 'durable-workflow[avro]'."
246+
),
247+
failure_type=type(e).__name__,
248+
stack_trace=traceback.format_exc(),
249+
non_retryable=True,
250+
)
251+
except Exception as fe:
252+
log.warning("failed to report Avro-missing activity decode failure: %s", fe)
253+
return
254+
except (ValueError, TypeError) as e:
255+
log.exception("activity %s arguments decode failed (codec=%r)", task_id, inbound_codec)
256+
try:
257+
await self.client.fail_activity_task(
258+
task_id=task_id,
259+
activity_attempt_id=attempt_id,
260+
lease_owner=self.worker_id,
261+
message=(
262+
f"cannot decode activity arguments with codec {inbound_codec!r}: {e}. "
263+
f"Verify the argument bytes match the declared codec and writer schema."
264+
),
265+
failure_type=type(e).__name__,
266+
stack_trace=traceback.format_exc(),
267+
non_retryable=True,
268+
)
269+
except Exception as fe:
270+
log.warning("failed to report activity decode failure: %s", fe)
271+
return
186272
if not isinstance(args, list):
187273
args = [args]
188274

src/durable_workflow/workflow.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ def replay(
279279
start_input: list[Any],
280280
*,
281281
run_id: str = "",
282+
payload_codec: str | None = None,
282283
) -> ReplayOutcome:
283284
events = list(history_events)
284285

@@ -300,14 +301,14 @@ def replay(
300301
etype = ev.get("event_type")
301302
payload = ev.get("payload") or {}
302303
if etype in ("ActivityCompleted", "activity_completed"):
303-
resolved_results.append(serializer.decode(payload.get("result")))
304+
resolved_results.append(serializer.decode(payload.get("result"), codec=payload_codec))
304305
elif etype in ("TimerFired", "timer_fired"):
305306
resolved_results.append(None)
306307
elif etype in (
307308
"SideEffectRecorded", "side_effect_recorded",
308309
"ChildRunCompleted", "child_run_completed",
309310
):
310-
resolved_results.append(serializer.decode(payload.get("result")))
311+
resolved_results.append(serializer.decode(payload.get("result"), codec=payload_codec))
311312
elif etype in ("ChildRunFailed", "child_run_failed"):
312313
resolved_results.append(ChildWorkflowFailed(
313314
payload.get("message", "child workflow failed")

tests/test_worker.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,166 @@ async def test_workflow_with_envelope_arguments(self, mock_client: AsyncMock) ->
292292
mock_client.complete_workflow_task.assert_called_once()
293293

294294

295+
class TestCodecDecodeFailures:
296+
"""TD-P012 / #370 regression: codec decode failures at the task boundary
297+
must turn into a deterministic fail_{workflow,activity}_task call so the
298+
lease does not sit until timeout."""
299+
300+
@pytest.mark.asyncio
301+
async def test_activity_json_decode_failure_fails_task(self, mock_client: AsyncMock) -> None:
302+
worker = Worker(mock_client, task_queue="q1", workflows=[], activities=[echo_activity])
303+
task = {
304+
"task_id": "at-bad-json",
305+
"activity_attempt_id": "aa-bad-json",
306+
"activity_type": "test-act",
307+
"arguments": "{not valid json",
308+
"payload_codec": "json",
309+
}
310+
await worker._run_activity_task(task)
311+
mock_client.fail_activity_task.assert_called_once()
312+
call_kwargs = mock_client.fail_activity_task.call_args.kwargs
313+
assert "decode" in call_kwargs["message"].lower()
314+
assert "json" in call_kwargs["message"]
315+
assert call_kwargs["non_retryable"] is True
316+
mock_client.complete_activity_task.assert_not_called()
317+
318+
@pytest.mark.asyncio
319+
async def test_activity_avro_decode_failure_fails_task(self, mock_client: AsyncMock) -> None:
320+
pytest.importorskip("avro", reason="avro extra not installed")
321+
worker = Worker(mock_client, task_queue="q1", workflows=[], activities=[echo_activity])
322+
task = {
323+
"task_id": "at-bad-avro",
324+
"activity_attempt_id": "aa-bad-avro",
325+
"activity_type": "test-act",
326+
"arguments": "!!!not-valid-base64!!!",
327+
"payload_codec": "avro",
328+
}
329+
await worker._run_activity_task(task)
330+
mock_client.fail_activity_task.assert_called_once()
331+
call_kwargs = mock_client.fail_activity_task.call_args.kwargs
332+
assert "decode" in call_kwargs["message"].lower()
333+
assert "avro" in call_kwargs["message"]
334+
assert call_kwargs["non_retryable"] is True
335+
mock_client.complete_activity_task.assert_not_called()
336+
337+
@pytest.mark.asyncio
338+
async def test_activity_avro_missing_extra_fails_task(
339+
self, mock_client: AsyncMock, monkeypatch: pytest.MonkeyPatch
340+
) -> None:
341+
from durable_workflow import _avro
342+
from durable_workflow.errors import AvroNotInstalledError
343+
344+
def _raise_missing(_blob: str) -> None:
345+
raise AvroNotInstalledError(
346+
"The 'avro' package is required to encode/decode payloads with the 'avro' "
347+
"codec. Install with: pip install 'durable-workflow[avro]'"
348+
)
349+
350+
monkeypatch.setattr(_avro, "decode", _raise_missing)
351+
352+
worker = Worker(mock_client, task_queue="q1", workflows=[], activities=[echo_activity])
353+
task = {
354+
"task_id": "at-no-avro",
355+
"activity_attempt_id": "aa-no-avro",
356+
"activity_type": "test-act",
357+
"arguments": "anything",
358+
"payload_codec": "avro",
359+
}
360+
await worker._run_activity_task(task)
361+
mock_client.fail_activity_task.assert_called_once()
362+
call_kwargs = mock_client.fail_activity_task.call_args.kwargs
363+
assert "avro extra" in call_kwargs["message"].lower() or "pip install" in call_kwargs["message"]
364+
assert call_kwargs["failure_type"] == "AvroNotInstalledError"
365+
assert call_kwargs["non_retryable"] is True
366+
mock_client.complete_activity_task.assert_not_called()
367+
368+
@pytest.mark.asyncio
369+
async def test_workflow_json_decode_failure_fails_task(self, mock_client: AsyncMock) -> None:
370+
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
371+
task = {
372+
"task_id": "t-bad-json",
373+
"workflow_type": "test-wf",
374+
"workflow_task_attempt": 1,
375+
"history_events": [],
376+
"arguments": "{not valid json",
377+
"payload_codec": "json",
378+
}
379+
await worker._run_workflow_task(task)
380+
mock_client.fail_workflow_task.assert_called_once()
381+
call_kwargs = mock_client.fail_workflow_task.call_args.kwargs
382+
assert "decode" in call_kwargs["message"].lower()
383+
assert "json" in call_kwargs["message"]
384+
mock_client.complete_workflow_task.assert_not_called()
385+
386+
@pytest.mark.asyncio
387+
async def test_workflow_avro_missing_extra_fails_task(
388+
self, mock_client: AsyncMock, monkeypatch: pytest.MonkeyPatch
389+
) -> None:
390+
from durable_workflow import _avro
391+
from durable_workflow.errors import AvroNotInstalledError
392+
393+
def _raise_missing(_blob: str) -> None:
394+
raise AvroNotInstalledError(
395+
"The 'avro' package is required to encode/decode payloads with the 'avro' "
396+
"codec. Install with: pip install 'durable-workflow[avro]'"
397+
)
398+
399+
monkeypatch.setattr(_avro, "decode", _raise_missing)
400+
401+
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
402+
task = {
403+
"task_id": "t-no-avro",
404+
"workflow_type": "test-wf",
405+
"workflow_task_attempt": 1,
406+
"history_events": [],
407+
"arguments": "anything",
408+
"payload_codec": "avro",
409+
}
410+
await worker._run_workflow_task(task)
411+
mock_client.fail_workflow_task.assert_called_once()
412+
call_kwargs = mock_client.fail_workflow_task.call_args.kwargs
413+
assert "avro extra" in call_kwargs["message"].lower() or "pip install" in call_kwargs["message"]
414+
assert call_kwargs["failure_type"] == "AvroNotInstalledError"
415+
mock_client.complete_workflow_task.assert_not_called()
416+
417+
@pytest.mark.asyncio
418+
async def test_workflow_replay_avro_missing_extra_fails_task(
419+
self, mock_client: AsyncMock, monkeypatch: pytest.MonkeyPatch
420+
) -> None:
421+
"""Avro-encoded history result that cannot be decoded (extra missing)
422+
surfaces as fail_workflow_task, not an unhandled dispatcher exception."""
423+
from durable_workflow import _avro
424+
from durable_workflow.errors import AvroNotInstalledError
425+
426+
def _raise_missing(_blob: str) -> None:
427+
raise AvroNotInstalledError(
428+
"The 'avro' package is required to encode/decode payloads with the 'avro' "
429+
"codec. Install with: pip install 'durable-workflow[avro]'"
430+
)
431+
432+
monkeypatch.setattr(_avro, "decode", _raise_missing)
433+
434+
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
435+
# JSON envelope for start args bypasses the Avro path so the replay
436+
# decode of history result (under the run's avro codec) is the site
437+
# that triggers AvroNotInstalledError.
438+
task = {
439+
"task_id": "t-replay-no-avro",
440+
"workflow_type": "test-wf",
441+
"workflow_task_attempt": 1,
442+
"history_events": [
443+
{"event_type": "ActivityCompleted", "payload": {"result": "anything"}},
444+
],
445+
"arguments": {"codec": "json", "blob": '["hello"]'},
446+
"payload_codec": "avro",
447+
}
448+
await worker._run_workflow_task(task)
449+
mock_client.fail_workflow_task.assert_called_once()
450+
call_kwargs = mock_client.fail_workflow_task.call_args.kwargs
451+
assert call_kwargs["failure_type"] == "AvroNotInstalledError"
452+
mock_client.complete_workflow_task.assert_not_called()
453+
454+
295455
class TestWorkerStop:
296456
@pytest.mark.asyncio
297457
async def test_stop_sets_event(self, mock_client: AsyncMock) -> None:

0 commit comments

Comments
 (0)