@@ -145,13 +145,19 @@ def __init__(
145145 * ,
146146 fan_out_internal_save_batching : FanOutInternalSaveBatching | None = None ,
147147 abort_after_instance : int | None = None ,
148+ abort_after_instance_node : str | None = None ,
148149 abort_after_node : str | None = None ,
149150 ) -> None :
150151 self ._inner = InMemoryCheckpointer (
151152 fan_out_internal_save_batching = fan_out_internal_save_batching ,
152153 )
153154 self .saves : list [CheckpointRecord ] = []
154155 self ._abort_after_instance = abort_after_instance
156+ # The fan-out node ``abort_after_instance`` targets. ``None`` (the
157+ # legacy fan_out.abort_after_instance path) matches any fan-out;
158+ # crash_injection.after_fan_out_instance sets it to scope the abort to
159+ # the named node in a multi-fan-out graph.
160+ self ._abort_after_instance_node = abort_after_instance_node
155161 self ._abort_after_node = abort_after_node
156162 self ._aborted = False
157163 # Per proposal 0029 (fixture 056): mutating the saved record's
@@ -205,6 +211,14 @@ def _maybe_abort(self, record: CheckpointRecord) -> None:
205211 if self ._abort_after_instance is not None :
206212 target_idx = self ._abort_after_instance
207213 for fp in record .fan_out_progress :
214+ # Scope to the targeted fan-out node when one is named
215+ # (crash_injection.after_fan_out_instance); the legacy path
216+ # leaves it None and matches any fan-out.
217+ if (
218+ self ._abort_after_instance_node is not None
219+ and fp .fan_out_node_name != self ._abort_after_instance_node
220+ ):
221+ continue
208222 if target_idx < len (fp .instances ) and fp .instances [target_idx ].state == "completed" :
209223 # Subsequent instances must NOT be completed — otherwise
210224 # we'd abort after a later instance's save instead.
@@ -299,13 +313,18 @@ def _build_capturing(spec: Mapping[str, Any]) -> _CapturingCheckpointer:
299313 )
300314 flush_every = int (batching_cfg .get ("flush_every" , 0 ))
301315 batching = FanOutInternalSaveBatching (flush_every = flush_every )
302- abort_after = _find_abort_after_instance (spec )
303- ci_instance , ci_node = _find_crash_injection (spec )
304- if ci_instance is not None :
316+ ci_instance , ci_instance_node , ci_node = _find_crash_injection (spec )
317+ if ci_instance is not None or ci_node is not None :
318+ # crash_injection defines the crash boundary exclusively; the legacy
319+ # fan_out.abort_after_instance directive is ignored when it is set, so
320+ # an instance-boundary and a node-boundary abort can't both be active.
305321 abort_after = ci_instance
322+ else :
323+ abort_after = _find_abort_after_instance (spec )
306324 return _CapturingCheckpointer (
307325 fan_out_internal_save_batching = batching ,
308326 abort_after_instance = abort_after ,
327+ abort_after_instance_node = ci_instance_node ,
309328 abort_after_node = ci_node ,
310329 )
311330
@@ -325,22 +344,27 @@ def _find_abort_after_instance(spec: Mapping[str, Any]) -> int | None:
325344
326345# Conformance-adapter §5.6 ``crash_injection`` (proposal 0070): a simulated
327346# crash at a checkpoint boundary, independent of an instance failure.
328- def _find_crash_injection (spec : Mapping [str , Any ]) -> tuple [int | None , str | None ]:
347+ def _find_crash_injection (spec : Mapping [str , Any ]) -> tuple [int | None , str | None , str | None ]:
329348 """Parse the top-level ``crash_injection`` directive. Returns
330- ``(after_fan_out_instance_index, after_node_name)`` with at most one set.
331- Pairs with ``resume:`` the way ``first_run_expected_error`` does, but the
332- first run has no asserted outcome (it "crashed")."""
349+ ``(after_fan_out_instance_index, after_fan_out_instance_node,
350+ after_node_name)``: the index + node identify the
351+ ``after_fan_out_instance`` boundary, ``after_node_name`` the ``after_node``
352+ boundary; at most one boundary is set. Pairs with ``resume:`` the way
353+ ``first_run_expected_error`` does, but the first run has no asserted
354+ outcome (it "crashed")."""
333355 ci = spec .get ("crash_injection" )
334356 if not isinstance (ci , dict ):
335- return None , None
357+ return None , None , None
336358 ci_dict = cast ("Mapping[str, Any]" , ci )
337359 after_instance = ci_dict .get ("after_fan_out_instance" )
338360 if isinstance (after_instance , dict ):
339- return int (cast ("Mapping[str, Any]" , after_instance )["index" ]), None
361+ ai = cast ("Mapping[str, Any]" , after_instance )
362+ node = ai .get ("node" )
363+ return int (ai ["index" ]), (str (node ) if node is not None else None ), None
340364 after_node = ci_dict .get ("after_node" )
341365 if after_node is not None :
342- return None , str (after_node )
343- return None , None
366+ return None , None , str (after_node )
367+ return None , None , None
344368
345369
346370def _strip_abort_directive (spec : Mapping [str , Any ]) -> Mapping [str , Any ]:
@@ -445,7 +469,12 @@ async def _run_one_case(spec: Mapping[str, Any], *, top_level: Mapping[str, Any]
445469 # crash_injection (proposal 0070): a simulated crash at a checkpoint
446470 # boundary with NO asserted first-run outcome (it "crashed"). When set,
447471 # the abort is expected and swallowed without a first_run_expected_error.
448- crash_injection = spec .get ("crash_injection" )
472+ # Coerced to None when not a mapping, matching _find_crash_injection, so a
473+ # malformed directive parses to no boundary rather than swallowing aborts
474+ # or tripping the "configured but no crash fired" assertion.
475+ crash_injection : Any = spec .get ("crash_injection" )
476+ if not isinstance (crash_injection , dict ):
477+ crash_injection = None
449478 invocation_id_first_run : str | None = None
450479 final_first_run : State | None = None
451480 trace .clear ()
@@ -613,6 +642,7 @@ async def _run_one_case(spec: Mapping[str, Any], *, top_level: Mapping[str, Any]
613642 # ``_maybe_abort`` is also a no-op on the resume path.
614643 capturing ._aborted = False # noqa: SLF001 — test driver intentional
615644 capturing ._abort_after_instance = None # noqa: SLF001
645+ capturing ._abort_after_instance_node = None # noqa: SLF001
616646 capturing ._abort_after_node = None # noqa: SLF001
617647 # Clear the trace so post-resume execution capture is isolated.
618648 trace .clear ()
0 commit comments