-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathtask_lifecycle_gate.py
More file actions
1796 lines (1648 loc) · 89.4 KB
/
Copy pathtask_lifecycle_gate.py
File metadata and controls
1796 lines (1648 loc) · 89.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Location: pact-plugin/hooks/task_lifecycle_gate.py
Summary: PostToolUse hook (matcher='TaskCreate|TaskUpdate') enforcing PACT
lifecycle invariants. Cannot DENY (post-action); emits structural
advisory via additionalContext, plus a metadata writeback for
self-completion violations.
Used by: hooks.json PostToolUse matcher='TaskCreate|TaskUpdate' (per the
unified Task-mutating-tool matcher convention shared with
agent_handoff_emitter).
Self-completion writeback recursion mitigation: the metadata writeback marks
metadata.gate_writeback=true. The gate's first check skips on this marker
so the gate's own write does not re-trigger the self-completion advisory
on itself.
Safety: fail-closed-as-advisory on module-load failure (mirrors the
bootstrap_gate fail-closed-as-deny pattern, adapted for PostToolUse —
cannot DENY, emits advisory + exit 0).
hookEventName always emitted on every output path (per the
hookSpecificOutput schema-rejection defense — missing hookEventName
triggers silent platform-layer rejection).
Rule coverage:
- teachback_addblocks_missing — Teachback Task owner-wiring
TaskUpdate landed without addBlocks=[<work_task_id>]. Fires at the
canonical Step-3 wiring boundary (lead sets owner on a teachback
Task) rather than at TaskCreate — the historical TaskCreate-time
check was structurally unsatisfiable because the work-task id did
not exist yet at TaskCreate(A).
Fire condition (4-clause AND): subject is teachback-shaped AND
tool_input.owner is set AND tool_input.addBlocks is absent/empty
AND task_a.blocks is empty (benign late-wiring guard).
- work_addblockedby_missing — pact-* work Task created without
addBlockedBy=[<teachback_id>]
- self_completion — Teammate self-completed a Task without carve-out
→ advisory + completion_disputed writeback
- teachback_submit_missing — Teachback Task completed without
metadata.teachback_submit payload
- teachback_submit_schema_invalid — metadata.teachback_submit present
but malformed against the 5-field canonical schema (disjoint with
teachback_submit_missing)
- reasoning_reconstruction_missing_at_required_band — Teachback
submitted at REQUIRED band (Task B variety.total >= 11) without
reasoning_reconstruction
- reasoning_reconstruction_band_unresolvable — band traversal failed
(missing blocks, missing Task B, missing variety, or variety present
but no resolvable total); fail-open advisory documents the gap without
blocking lifecycle
- variety_missing_on_dispatch_task — pact-* work Task created with
metadata.variety present but malformed per-dimension rationales OR no
resolvable variety total. (The ABSENT-stamp arm was moved to the
dispatch-boundary gate in handoff_ordering_gate.py per the #865 surgical
split — this PostToolUse rule keeps only the present-but-malformed checks.)
- variety_acknowledgment_missing — Teachback submitted without
variety_acknowledgment field (D10 teammate verification)
- variety_acknowledgment_schema_invalid_at_write_time — Teachback
submitted with variety_acknowledgment present but malformed
(STRING instead of OBJECT; invalid enum; missing concern)
- reasoning_reconstruction_in_handoff — reasoning_reconstruction
placed inside metadata.handoff (wrong slot — belongs on
teachback_submit)
- reasoning_reconstruction_subkeys_invalid — reasoning_reconstruction
present on teachback_submit but the 3 sub-keys are wrong-shape
(non-canonical names, missing keys, or empty/non-string values)
- intentional_wait_nested_in_teachback_submit — intentional_wait
placed inside teachback_submit instead of as a sibling top-level
metadata key (Step 3 of the canonical 3-step shape)
- Every gate decision emits a session_journal lifecycle_decision event
"""
from __future__ import annotations
# ─── stdlib first (used by _emit_load_failure_advisory BEFORE wrapped imports) ─
import json
import os
import sys
from typing import NoReturn
_SUPPRESS_OUTPUT = json.dumps({"suppressOutput": True})
# Cap on exception text interpolated into context-bound output (warning
# strings reaching Claude's context and the user banner). Exception
# messages can embed attacker-influencable content (file contents, paths,
# crafted payloads in tracebacks) — bound + sanitize before interpolation.
# The stderr diagnostic line keeps the full text (debug channel).
_ERROR_TEXT_MAX = 200
# Cap on the crash-path stdin read in _emit_gate_health_event. Generous:
# real PostToolUse frames embed tool_response payloads (file contents,
# command output) and stay well under this; anything larger is not a
# realistic hook frame and must not be slurped unbounded by a best-effort
# emitter. An over-cap frame truncates mid-JSON → JSONDecodeError → the
# guard's stderr disposition (marker-only outcome, never a raise).
# This cap bounds MEMORY only — it does NOT reject sub-cap input: a frame
# with a valid JSON prefix still parses (harmless — degraded never grants
# allow, primary fails-open).
# VALUE MUST EQUAL bootstrap_gate._STDIN_READ_MAX (twin-VALUE discipline).
_STDIN_READ_MAX = 8 * 1024 * 1024 # 8 MB
def _bounded_error_text(error: BaseException) -> str:
"""Sanitized, length-bounded rendering of an exception for embedding in
context-bound warning text: control/non-printable characters become
spaces, and the result is truncated to _ERROR_TEXT_MAX chars with an
explicit marker. Full text still goes to stderr at the call site.
Total over hostile exceptions, structurally: the type name is captured
first — a metaclass can make __name__ a property that raises (caught;
falls back to a literal) or return any non-str value, INCLUDING a str
subclass whose own __str__/__format__ raises. The exact-type check below
(type(...) is str, which rejects str subclasses too) reduces type_name to
an EXACT str, whose __format__/__str__ are str's own built-ins and cannot
be overridden — so neither f-string branch below can raise on type_name
regardless of the original __name__ value. The only exception-owned code
left is the message render (error's own __str__), isolated to the main
branch and guarded by the fallback. The function therefore returns a
string for ANY exception object."""
try:
type_name = type(error).__name__
except BaseException: # noqa: BLE001 — hostile metaclass __name__ must not escape
type_name = "exception"
# __name__ can also RETURN (not raise) a non-str value — including a str
# SUBCLASS whose own __str__/__format__ raises, which an isinstance check
# would wave through. An EXACT-type check (type(...) is str) rejects
# subclasses too, so type_name is provably an exact str whose formatting
# uses str's own unpatchable built-ins → both f-string branches below
# (incl. the fallback, which re-interpolates type_name) cannot raise on it.
if type(type_name) is not str:
type_name = "exception"
try:
text = f"{type_name}: {error}"
except BaseException: # noqa: BLE001 — hostile __str__ must not escape the renderer
text = f"{type_name}: <exception str() raised>"
truncated = len(text) > _ERROR_TEXT_MAX
if truncated:
# MemoryError-safe by STRUCTURE: bounding first keeps the sanitize
# join O(cap) not O(n) — a multi-GB input never materializes a
# sanitized copy; asserted structurally, not via a runtime test.
text = text[:_ERROR_TEXT_MAX] # bound BEFORE the O(n) sanitize join
text = "".join(ch if ch.isprintable() else " " for ch in text)
if truncated:
text = text + "...[truncated]"
return text
def _emit_gate_health_event(
stage: str, error_text: str, input_data: dict | None
) -> None:
"""Best-effort durable journal emit for a crash-path gate_health event.
Lazy-imports pact_context + session_journal so it stays functional on
the import-stage crash path (works unless the breakage hits those very
modules or shared/__init__ — then the lazy import raises into the guard
below and the stdout marker remains the only record; prep §2.1 table).
On the import-stage path stdin is still unconsumed: read (capped at
_STDIN_READ_MAX) + init here.
Never raises; never load-bearing (tmux teammate fires self-drop, #877).
"""
try:
import shared.pact_context as _lazy_pact_context
import shared.session_journal as _lazy_session_journal
if input_data is None:
input_data = json.loads(sys.stdin.read(_STDIN_READ_MAX))
if not isinstance(input_data, dict):
return
if not _lazy_pact_context.is_initialized():
_lazy_pact_context.init(input_data)
# tool_name is attacker-set stdin on the import-stage path (main()'s
# TaskCreate/TaskUpdate allowlist short-circuit never ran) — apply
# the same sanitize+bound discipline as the error text before it
# reaches the durable journal.
tool_name = input_data.get("tool_name", "")
if not isinstance(tool_name, str):
tool_name = f"<non-str {type(tool_name).__name__}>"
tool_name = "".join(ch if ch.isprintable() else " " for ch in tool_name)
if len(tool_name) > _ERROR_TEXT_MAX:
tool_name = tool_name[:_ERROR_TEXT_MAX] + "...[truncated]"
event = _lazy_session_journal.make_event(
"gate_health",
hook="task_lifecycle_gate",
status="failed",
stage=stage,
error=error_text,
tool_name=tool_name,
)
written = _lazy_session_journal.append_event(event)
if not written:
try:
print(
"task_lifecycle_gate: gate_health journal emit skipped "
"(append_event returned False)",
file=sys.stderr,
)
except BaseException: # noqa: BLE001 — a diagnostic-write raise must not flip the exit code
pass
except BaseException: # noqa: BLE001 — the crash handler must not crash:
# mirror the import gauntlet's breadth (except BaseException at the
# wrapped-import block). The lazy imports execute arbitrary module
# bodies — a module-level sys.exit or KeyboardInterrupt surfacing
# here would exit nonzero AFTER the floor marker printed, and stdout
# JSON is only honored on exit 0.
try:
print(
"task_lifecycle_gate: gate_health journal emit unavailable "
"(late import or init failed)",
file=sys.stderr,
)
except BaseException: # noqa: BLE001 — a diagnostic-write raise must not flip the exit code
pass
def _emit_load_failure_advisory(
stage: str, error: BaseException, input_data: dict | None = None
) -> NoReturn:
"""Stdlib-only fail-advisory (PostToolUse cannot DENY).
Mirrors bootstrap_gate._emit_load_failure_deny but for PostToolUse —
advisory output + exit 0 since deny is not a valid PostToolUse verdict.
Uses ONLY stdlib (json, sys) so it remains functional even when every
wrapped import below fails.
Crash-path health surfacing: a top-level pactGateHealth key (stripped by
the platform's non-strict output schema; assertable on RAW stdout only)
plus a systemMessage mirror of the advisory, plus a best-effort
gate_health journal event. Error text is bounded/sanitized for all
context-bound output; the stderr diagnostic keeps the full text.
"""
# Thin call-site fallback (defense-in-depth over the now-total helper):
# the FLOOR below must print no matter what the renderer does. The
# fallback is a raise-proof CONSTANT — type(error).__name__ would
# re-invoke the same attribute access (hostile metaclass __name__) that
# is the helper's one remaining fall-through path.
try:
error_text = _bounded_error_text(error)
except BaseException: # noqa: BLE001 — floor must survive any renderer defect
error_text = "<error text unavailable>"
advisory = (
f"PACT task_lifecycle_gate {stage} failure — lifecycle "
f"rule enforcement skipped this turn. "
f"{error_text}. Investigate hook "
"installation and shared module availability."
)
output = {
"hookSpecificOutput": {
"hookEventName": "PostToolUse",
"additionalContext": advisory,
},
"systemMessage": advisory,
"pactGateHealth": {
"v": 1,
"hook": "task_lifecycle_gate",
"status": "failed",
"stage": stage,
"error": error_text,
},
}
print(json.dumps(output)) # floor FIRST
# Guarded full-text rendering: this line runs AFTER the floor printed,
# but an unguarded str(error) raising here would exit nonzero — and
# stdout JSON is only honored on exit 0, voiding the floor retroactively.
try:
error_full = f"{error}"
except BaseException: # noqa: BLE001 — hostile __str__; keep the exit-0 path
error_full = "<exception str() raised>"
try:
print(
f"Hook load error (task_lifecycle_gate / {stage}): {error_full}", # full text
file=sys.stderr,
)
except BaseException: # noqa: BLE001 — a diagnostic-write raise must not flip the exit code
pass
_emit_gate_health_event(stage, error_text, input_data) # bonus LAST
sys.exit(0)
# ─── fail-closed wrapper around cross-package imports ─────────────────────────
try:
import re
from pathlib import Path
import shared.pact_context as pact_context
from shared.paths import get_claude_config_dir
from shared.agent_handoff_marker import (
already_emitted,
is_signal_task,
occupant_hash,
sanitize_path_component,
unclaim,
)
from shared.dispatch_helpers import trustworthy_actor_name
from shared.intentional_wait import is_self_complete_exempt, is_teachback_exempt
from shared.session_journal import (
append_event,
get_journal_path,
make_event,
read_events,
)
from shared.task_utils import is_teachback_subject as _is_teachback_subject
from shared.task_utils import read_task_json
from shared.teachback_schema import (
DISPATCH_VARIETY_KEYS,
TEACHBACK_OBJECT_FIELDS,
TEACHBACK_RECOMMENDED_BAND_MIN,
TEACHBACK_REASONING_RECONSTRUCTION_REQUIRED_MIN,
TEACHBACK_REQUIRED_FIELDS,
TEACHBACK_REQUIRED_SUBKEYS,
TEACHBACK_SCHEMA_ECHO,
TEACHBACK_VARIETY_ACK_VALID_VALUES,
resolve_variety_total,
validate_reasoning_reconstruction,
)
from shared.tool_response import extract_tool_response
except BaseException as _module_load_error: # noqa: BLE001 — fail-closed catch-all
_emit_load_failure_advisory("module imports", _module_load_error)
# ─── constants ────────────────────────────────────────────────────────────────
# Self-completion carve-out resolution is delegated entirely to
# is_self_complete_exempt(task, team_name) in shared.intentional_wait
# (the SSOT). The predicate keys on team-config agentType (NOT owner
# name) — secretaries spawned under any name reach the carve-out as
# long as the team-config records agentType=pact-secretary. team_name
# is resolved via pact_context at the call site below. The dispatch_gate
# RESERVED_NAMES set still reserves the `secretary`/`pact-secretary`
# literals as a defense-in-depth name perimeter — see
# dispatch_gate.RESERVED_NAMES comment block for the name-perimeter
# rationale (the SSOT for self-completion exemption is agentType, but
# the name perimeter blocks teammates from spawning under reserved
# names that could shadow legitimate secretary spawn).
# Teachback-exempt dispatch carve-out resolution is delegated entirely to
# is_teachback_exempt(owner, team_name) in shared.intentional_wait.
# Required handoff schema fields (advisory if present-but-malformed).
_HANDOFF_REQUIRED_FIELDS = (
"produced",
"decisions",
"reasoning_chain",
"uncertainty",
"integration",
"open_questions",
)
# Teachback schema constants (TEACHBACK_REQUIRED_FIELDS,
# TEACHBACK_VARIETY_ACK_VALID_VALUES, TEACHBACK_REASONING_RECONSTRUCTION_REQUIRED_MIN)
# and the reasoning_reconstruction validator are imported from
# shared.teachback_schema (SSOT). TEACHBACK_REQUIRED_SUBKEYS and
# validate_reasoning_reconstruction are consumed by the write-time advisory
# rules below. TEACHBACK_SCHEMA_ECHO (also from the SSOT) is appended to the
# teachback_submit_schema_invalid advisory so the deny message echoes the full
# canonical schema, not only the offending field(s).
# Required per-dimension rationale fields on metadata.variety (D11).
# 4-tuple. Each rationale is one sentence explaining THIS dispatch's score
# on THAT dimension. Cargo-cult-via-single-rationale (D4 legacy) is no longer
# schema-conformant; four distinct rationales force four fresh articulations.
_VARIETY_REQUIRED_RATIONALES = (
"novelty_rationale",
"scope_rationale",
"uncertainty_rationale",
"risk_rationale",
)
# Sentinel problem string returned by _validate_variety_schema when the
# rationales are well-formed but no resolvable variety total exists. The R4
# rule keys on this exact value to select the distinct unresolvable-total
# advisory message (vs. the rationale-malformed message).
_NO_RESOLVABLE_TOTAL = (
"no resolvable total (need an integer 4-16 under key 'total', "
"or a recoverable fallback)"
)
# ─── artifact_paths emit backstop (durability nudge) ─────────────────────────
#
# Maps a phase-task subject PREFIX to its artifact_paths `workflow` tag, but
# ONLY for the phases that MUST emit a durable artifact pointer — PREPARE and
# ARCHITECT, which write recoverable disk artifacts (docs/preparation/,
# docs/architecture/) that the git-immune, GC-durable journal event protects.
# CODE:/TEST:/ATOMIZE:/CONSOLIDATE: phases are DELIBERATELY ABSENT: their output
# is git-tracked (CODE/TEST) or non-artifact (ATOMIZE/CONSOLIDATE), so a missing
# artifact_paths event is expected, not an anomaly — leaving them out makes the
# backstop silently skip them. The `workflow` values mirror the lowercase enum
# in session_journal's artifact_paths registration (plan-mode / prepare /
# architect / peer-review / code-auditor); only the two phase-task-backed
# workflows appear here (plan-mode / peer-review are command syntheses with no
# phase-completion TaskUpdate for this PostToolUse hook to observe).
_ARTIFACT_EMIT_PHASE_WORKFLOWS = {
"PREPARE:": "prepare",
"ARCHITECT:": "architect",
}
def _phase_artifact_requirement(subject: str) -> "tuple[str, str] | None":
"""Map a phase-task subject to its (workflow, feature) artifact key, or
None when this subject does not require an artifact_paths emit.
Returns None (skip the backstop nudge) for:
- a non-PREPARE/ARCHITECT subject (CODE/TEST/ATOMIZE/CONSOLIDATE/other →
exempt by design — their artifacts are git-tracked or absent), and
- a malformed phase subject with no ": " feature separator (rare; the
orchestrate command creates phase tasks as exactly "PREPARE: {feature}"
/ "ARCHITECT: {feature}", so a missing separator is degenerate — fail
open as a no-op rather than nudging on an unresolvable feature slug).
The feature slug is the subject suffix after the "{PHASE}: " prefix; it must
equal the orchestrate {feature-slug} that the lead-frame emit site writes as
the event's `feature` field, so the (workflow, feature) presence check
aligns with the dedup SSOT key. Pure; never raises.
"""
if not isinstance(subject, str):
return None
for prefix, workflow in _ARTIFACT_EMIT_PHASE_WORKFLOWS.items():
if subject.startswith(prefix):
# Split on the canonical "{PHASE}: " separator; the suffix is the
# feature slug. A subject that startswith the bare prefix but has no
# ": " (e.g. "PREPAREthing") cannot match prefix anyway; one that is
# exactly the prefix with nothing after yields an empty slug → skip.
_, sep, suffix = subject.partition(": ")
feature = suffix.strip()
if not sep or not feature:
return None
return workflow, feature
return None
def _artifact_paths_event_present(workflow: str, feature: str) -> bool:
"""Return True iff an artifact_paths journal event exists for this
(workflow, feature) in the CURRENT session's journal.
Uses the IMPLICIT read_events() (lead-frame): this backstop is is_lead-gated
at its single call site, so get_session_dir() resolves the canonical journal
in the lead's process. The off-lead masked-read hazard (read_events()
false-returning [] for a teammate frame) is strictly the SECRETARY HARVEST
reader's concern — that reader runs off-lead and MUST use
read_events_from(session_dir); this backstop does not, because it never runs
off-lead. The hook frame carries no worktree/session_dir to pass anyway.
Fail-OPEN by construction: read_events swallows its own errors and returns []
(treated as "absent" → the nudge fires). A false nudge is non-blocking and
self-corrects; a false-silence (missing the forgotten emit) is the dangerous
error this presence check exists to prevent, so absent-on-error is safe-side.
Pure read; never raises (read_events is internally fail-open).
"""
events = read_events("artifact_paths")
for event in events:
if not isinstance(event, dict):
continue
if event.get("workflow") == workflow and event.get("feature") == feature:
return True
return False
# `_is_teachback_subject` (the canonical Teachback Task subject predicate) was
# HOISTED to shared/task_utils.py so the handoff_ordering_gate PreToolUse hook
# can reuse it without importing this PostToolUse module. It is re-imported at
# the top of this file as `_is_teachback_subject` (private-name alias preserving
# the gate's existing call sites). SINGLE definition lives in task_utils; do NOT
# re-introduce the regex here — duplication reopens the drift class.
# ─── metadata.completion_disputed writeback (direct FS) ──────────────────────
def _writeback_dispute(task_id: str) -> bool:
"""Set metadata.completion_disputed=true and metadata.gate_writeback=true
on the task JSON, directly via filesystem write (no harness round-trip).
Per user decision: direct FS write (no CLI shim). Reads via
shared.task_utils.read_task_json (path-traversal safe), mutates metadata,
writes back atomically (.tmp + os.replace).
The gate_writeback marker is a recursion guard: if anything (the harness
or a future tool) replays the metadata change as a TaskUpdate, this
gate's step ① recursion check will skip it.
Fail-OPEN by design: any IOError swallowed and returns False — advisory
is still emitted by the caller (per the writeback-failure convention:
the advisory's user-facing surface is the load-bearing signal; the
metadata writeback is best-effort accounting). Logged via
lifecycle_decision journal event.
Returns True iff the writeback succeeded.
"""
if not task_id or not isinstance(task_id, str):
return False
# M2 (security): sanitize_path_component strips C0 / \x00 control chars too —
# a bare `re.sub(r'[/\\]|\.\.')` lets a NUL byte through to task_file.exists(),
# which raises an uncaught ValueError ('embedded null byte') that propagates to
# the gate catch-all and suppresses rule-enforcement for the turn (advisory-
# suppression DoS). read_task_json's ValueError catch backstops other callers.
sanitized_id = sanitize_path_component(task_id)
if not sanitized_id:
return False
try:
team_name = pact_context.get_pact_context().get("team_name", "")
except Exception:
team_name = ""
if not team_name or not re.fullmatch(r"[A-Za-z0-9_\-]+", team_name):
return False
task = read_task_json(sanitized_id, team_name)
if not task:
return False
metadata = task.get("metadata")
if not isinstance(metadata, dict):
metadata = {}
task["metadata"] = metadata
metadata["completion_disputed"] = True
metadata["gate_writeback"] = True
tasks_root = get_claude_config_dir() / "tasks" / team_name
target = tasks_root / f"{sanitized_id}.json"
tmp = tasks_root / f".{sanitized_id}.json.tmp"
try:
tasks_root.mkdir(parents=True, exist_ok=True, mode=0o700)
tmp.write_text(json.dumps(task), encoding="utf-8")
os.replace(str(tmp), str(target))
return True
except OSError:
try:
if tmp.exists():
tmp.unlink()
except OSError:
pass
return False
# #906: auditor verdict severity ladder for destructive-downgrade detection.
# Higher rank = more severe. A lead overwrite that LOWERS the rank (e.g.
# RED->GREEN) is a destructive downgrade that escalates the advisory SEVERITY
# only — ranks are NEVER used to gate preservation (always-preserve regardless
# of direction, per the architect ruling). Unknown / non-dict signals yield
# None → no escalation (the advisory still fires, without the downgrade
# emphasis).
_AUDIT_SIGNAL_RANK = {"GREEN": 0, "YELLOW": 1, "RED": 2}
def _audit_signal_rank(audit_summary: object) -> "int | None":
"""Return the severity rank of an audit_summary's `signal`, or None if the
shape/signal is unrankable. Pure; never raises."""
if not isinstance(audit_summary, dict):
return None
signal = audit_summary.get("signal")
if not isinstance(signal, str):
return None
return _AUDIT_SIGNAL_RANK.get(signal.strip().upper())
def _is_destructive_audit_downgrade(prior: object, incoming: object) -> bool:
"""Return True iff `incoming` LOWERS `prior`'s verdict severity (e.g.
RED->GREEN). Unknown signals on either side → False (cannot rank → no
escalation). Pure; never raises. Used ONLY for advisory wording — preservation
is unconditional regardless of the return value."""
prior_rank = _audit_signal_rank(prior)
incoming_rank = _audit_signal_rank(incoming)
if prior_rank is None or incoming_rank is None:
return False
return incoming_rank < prior_rank
def _writeback_audit_recovery(task_id: str, updates: dict) -> bool:
"""#906 codified-mirror writeback — durable, direct-FS metadata update for
auditor-verdict overwrite-protection. Mirrors _writeback_dispute exactly
(read via shared.task_utils.read_task_json [path-traversal safe] → mutate
metadata → atomic .tmp + os.replace), and sets the gate_writeback recursion
guard so a replayed metadata change cannot re-fire the gate.
Used by BOTH branches of the codified mirror:
- MIRROR : updates={"audit_summary_authored": <authored verdict>}
- RECOVER : updates={"lead_close_note": <lead's overwriting value>}
Task JSON lives in the team dir (~/.claude/tasks/{team}/), which is writable
from ANY teammate process — unlike the session journal, which self-drops in
a teammate context (#877). That is precisely why the auditor's verdict can
be captured at AUTHOR time (the MIRROR), sidestepping the post-overwrite
read-of-a-clobbered-value problem.
Fail-OPEN by design: any IOError swallowed → returns False. The advisory is
the load-bearing user-facing signal; the metadata writeback is best-effort
accounting (same convention as _writeback_dispute). Returns True iff the
writeback succeeded.
"""
if not task_id or not isinstance(task_id, str):
return False
# M2 (security): sanitize_path_component strips C0 / \x00 control chars too —
# a bare `re.sub(r'[/\\]|\.\.')` lets a NUL byte through to task_file.exists(),
# which raises an uncaught ValueError ('embedded null byte') that propagates to
# the gate catch-all and suppresses rule-enforcement for the turn (advisory-
# suppression DoS). read_task_json's ValueError catch backstops other callers.
sanitized_id = sanitize_path_component(task_id)
if not sanitized_id:
return False
try:
team_name = pact_context.get_pact_context().get("team_name", "")
except Exception:
team_name = ""
if not team_name or not re.fullmatch(r"[A-Za-z0-9_\-]+", team_name):
return False
task = read_task_json(sanitized_id, team_name)
if not task:
return False
metadata = task.get("metadata")
if not isinstance(metadata, dict):
metadata = {}
task["metadata"] = metadata
for key, value in updates.items():
metadata[key] = value
metadata["gate_writeback"] = True
tasks_root = get_claude_config_dir() / "tasks" / team_name
target = tasks_root / f"{sanitized_id}.json"
tmp = tasks_root / f".{sanitized_id}.json.tmp"
try:
tasks_root.mkdir(parents=True, exist_ok=True, mode=0o700)
tmp.write_text(json.dumps(task), encoding="utf-8")
os.replace(str(tmp), str(target))
return True
except OSError:
try:
if tmp.exists():
tmp.unlink()
except OSError:
pass
return False
def _emit_lead_side_agent_handoff(
team_name: str,
task_id: str,
owner: str,
subject: str,
task_metadata: dict,
) -> None:
"""Fix A (#869): emit a single agent_handoff event at the lead's
acceptance-commit — the lead's TaskUpdate(status="completed") on a work
task carrying a populated metadata.handoff.
WHY HERE (the b2 emit point): agent_handoff is TaskCompleted-keyed; a
stage-ready task completes mid-turn, so it is already "completed" at the
lead's Stop-sweep and swept over → the TaskCompleted-keyed
agent_handoff_emitter (b1) never fires for it. The lead's process has a
populated context, so append_event writes the canonical journal via
get_session_dir() with no resolver.
EMIT-ELIGIBILITY MIRRORS agent_handoff_emitter (b1) — that hook is the
CANONICAL source for the emit-shape (owner non-empty, not a teachback
task, not a signal task, handoff present). The divergence-critical atoms
(is_signal_task, occupant_hash, already_emitted) are SHARED via
shared.agent_handoff_marker so the two paths cannot drift on signal-task
exclusion or the dedup key (#887 class). The b2-specific topology gate
(is_lead) is applied by the caller. Eligibility keys on handoff PRESENCE
(owner / not-teachback / not-signal / handoff-present), never on an
owner-name prefix — bare owner names are the convention, so a `pact-`
prefix gate would no-op. (A now-retired completion-time branch above once
carried such a prefix gate; it was permanently dormant and was removed.)
Shares the occupant-keyed marker with b1: if a mid-turn TaskUpdate ALSO
dispatches TaskCompleted (R2 — open until the real-tmux smoke), b1 and b2
test-and-set the SAME marker and dedup to exactly one event.
Best-effort: fail-open on any error (matches append_event's policy and
this hook's livelock-safe exit-0 posture; never raises to the caller).
"""
try:
# Emit-eligibility (mirrors b1). owner-empty / teachback / signal-task
# / handoff-absent all suppress, same as the emitter's bypass gates.
# #917 R2 (validate-before-claim): also reject a WHITESPACE-only owner —
# it passes a bare falsy check but FAILS the journal's non-empty-str
# `agent` schema, so it would claim the O_EXCL marker then fail
# append_event (claim-without-write poison). Mirrors b1's owner guard.
if not owner or not owner.strip() or _is_teachback_subject(subject):
return
if is_signal_task(task_metadata):
return
# M1: handoff must be a dict (the journal schema requires it). A
# truthy-but-non-dict handoff (str/list) would pass a bare presence
# check, claim the O_EXCL marker, then FAIL append_event's schema
# validation — an orphaned/poisoned marker. isinstance makes a
# malformed handoff DEFER (claim nothing). Mirrors b1's gate.
handoff = task_metadata.get("handoff")
if not isinstance(handoff, dict) or not handoff:
return
# #917 R2 (validate-before-claim): substitute the sentinel for a
# whitespace-only / empty subject (mirrors b1's falsy+whitespace
# substitution) so a degenerate subject is schema-valid BEFORE the
# claim rather than poisoning the marker. _is_teachback_subject above
# already ran on the original subject (a blank subject is not a
# teachback), so substituting here does not change the teachback gate.
if not subject or not subject.strip():
subject = "(no subject)"
# #917 symmetry: same writability precondition as b1
# (agent_handoff_emitter). In the lead's gate process this is a no-op
# (the lead's context is persisted -> get_journal_path() resolves), but
# keeping both emit paths' marker-claim preconditions IDENTICAL prevents
# the b1/b2 divergence class (#887/#901): a future change that makes b2
# reachable from a non-lead / unresolvable context cannot silently
# claim-without-write and poison the shared marker. Pure read; the
# mark-then-write order below is unchanged.
# F3: this gate is the TWIN of agent_handoff_emitter.main — keep both
# in parity. Mark-then-write / O_EXCL contract:
# shared/agent_handoff_marker.already_emitted.
if not get_journal_path():
return
occupant = occupant_hash(owner, subject)
if already_emitted(team_name, task_id, occupant):
return
# #917 R1 (compensating-unclaim): we OWN the marker here (already_emitted
# returned False = fresh O_EXCL claim). Roll the claim back if the write
# returns False (schema rejection / unwritable dir — the residual paths
# the writability gate does NOT cover) OR raises, so a later writable
# fire can re-emit instead of being permanently suppressed by the
# poisoned marker. Best-effort + fail-safe (worst case reverts to
# today's behavior). F3 twin of agent_handoff_emitter.main.
try:
written = append_event(
make_event(
"agent_handoff",
agent=owner,
task_id=task_id,
task_subject=subject,
handoff=handoff,
)
)
except Exception:
written = False
if not written:
unclaim(team_name, task_id, occupant)
except Exception:
# Fail-open: a journal-emit failure must never break the gate's
# advisory evaluation or its exit-0 contract.
pass
# ─── core evaluation ─────────────────────────────────────────────────────────
def _validate_handoff_schema(handoff: object) -> str | None:
"""Return None if handoff is well-formed, or a short reason string
describing the schema problem (suitable for advisory text).
"""
if not isinstance(handoff, dict):
return f"metadata.handoff must be object, got {type(handoff).__name__}"
missing = [f for f in _HANDOFF_REQUIRED_FIELDS if f not in handoff]
if missing:
return f"metadata.handoff missing required fields: {', '.join(missing)}"
return None
def _validate_variety_acknowledgment(ack: object) -> str | None:
"""Return None if variety_acknowledgment is well-formed per D10, or a
short reason string. Pure function; never raises.
Schema:
- must be dict
- rationale_articulates_this_dispatch: enum 'yes' | 'no' | 'concern'
- concern: non-empty string when value != 'yes'; optional/empty when 'yes'
"""
if not isinstance(ack, dict):
return f"must be object, got {type(ack).__name__}"
value = ack.get("rationale_articulates_this_dispatch")
if value not in TEACHBACK_VARIETY_ACK_VALID_VALUES:
return (
f"rationale_articulates_this_dispatch must be one of "
f"{TEACHBACK_VARIETY_ACK_VALID_VALUES}, got {value!r}"
)
if value != "yes":
concern = ack.get("concern")
if not isinstance(concern, str) or not concern.strip():
return (
"concern field required (non-empty string) when "
"rationale_articulates_this_dispatch != 'yes'"
)
return None
def _validate_teachback_submit_schema(teachback: object) -> str | None:
"""Return None if teachback_submit is well-formed, or a short reason
string. Mirrors _validate_handoff_schema.
Validates the 5 canonical fields per pact-teachback skill (4 string
fields + variety_acknowledgment dict per D10). reasoning_reconstruction
is checked separately at R3 dispatch time, not here.
"""
if not isinstance(teachback, dict):
return (
f"metadata.teachback_submit must be object, "
f"got {type(teachback).__name__}"
)
missing = [f for f in TEACHBACK_REQUIRED_FIELDS if f not in teachback]
if missing:
return (
f"metadata.teachback_submit missing required fields: "
f"{', '.join(missing)}"
)
# Non-empty-string check on the string fields; the object fields
# (variety_acknowledgment) are validated by the dedicated sub-validator
# below. The carve-out derives from TEACHBACK_OBJECT_FIELDS (SSOT) so this
# string/object partition stays in lockstep with the schema-echo derivation.
string_fields = tuple(
f for f in TEACHBACK_REQUIRED_FIELDS if f not in TEACHBACK_OBJECT_FIELDS
)
empty = [
f for f in string_fields
if not isinstance(teachback.get(f), str) or not teachback[f].strip()
]
if empty:
return (
f"metadata.teachback_submit fields empty/non-string: "
f"{', '.join(empty)}"
)
ack_problem = _validate_variety_acknowledgment(
teachback.get("variety_acknowledgment")
)
if ack_problem:
return (
f"metadata.teachback_submit.variety_acknowledgment "
f"{ack_problem}"
)
return None
def _validate_variety_schema(
variety: object, metadata: object = None
) -> str | None:
"""Return None if metadata.variety is well-formed per D11, or a short
reason string. Pure function; never raises.
Validates the four per-dimension rationale fields (presence +
non-empty string) AND that the stamp carries a resolvable variety
total. The total check consults the same shared resolver the
read-time band resolver uses (resolve_variety_total) — this is the
cross-rule consistency property: any variety shape that passes
write-time validation MUST resolve at read-time. Dimension score
range checks (1-4) are the orchestrator's authority; this hook is
defense-in-depth for the cargo-cult-prevention property D11 codifies.
The optional `metadata` argument is forwarded to the resolver so the
non-canonical top-level `variety_score` sibling is reachable. Callers
that only have the variety dict may omit it (the resolver simply skips
that candidate). Rationale problems are checked first (cheap dict +
string checks), then the total (one resolver call); the first problem
found is returned, preserving the single-string-return contract.
"""
if not isinstance(variety, dict):
return f"must be object, got {type(variety).__name__}"
missing = [
r for r in _VARIETY_REQUIRED_RATIONALES if r not in variety
]
if missing:
return (
f"missing required per-dimension rationales: "
f"{', '.join(missing)}"
)
empty = [
r for r in _VARIETY_REQUIRED_RATIONALES
if not isinstance(variety.get(r), str) or not variety[r].strip()
]
if empty:
return (
f"per-dimension rationales empty/non-string: "
f"{', '.join(empty)}"
)
if resolve_variety_total(variety, metadata) is None:
return _NO_RESOLVABLE_TOTAL
return None
def _band_from_total(total: int) -> str:
"""Map a resolved variety total to a reasoning_reconstruction band
string ("required" / "recommended" / "skipped"). Shared by the direct
Task-B resolution path and the C2 parent-inheritance fallback so both
apply the identical threshold logic (one band-cut SSOT)."""
if total >= TEACHBACK_REASONING_RECONSTRUCTION_REQUIRED_MIN:
return "required"
if total >= TEACHBACK_RECOMMENDED_BAND_MIN:
return "recommended"
return "skipped"
def _inherit_band_from_parent(task_b: dict, team_name: str) -> str | None:
"""#891 Opt2 parent-inheritance fallback: when Task B carries no
resolvable variety, inherit the band from its PARENT task's variety
total. Returns a band string when the parent resolves, else None
(caller treats None as "unresolvable" — the preserved floor).
Modular/separable: this is the entire Opt2 surface. Deleting this
function + its two call sites reverts to Opt1-alone behavior.
Parent resolution + guardrail (don't inherit a WRONG parent's band —
a wrong inherit mis-resolves the band, the exact bug being fixed):
- Task B blocks the parent (Plan/feature/umbrella) task. The parent
pointer is task_b.blocks[0], but only when blocks is UNAMBIGUOUS:
a singleton list. Empty, multi-entry, or non-list blocks → fail
open (None) rather than guess among candidates.
- The parent must itself carry a resolvable variety total (via the
shared resolve_variety_total, reused unchanged). A non-parent task
(phase / teachback gate) does not carry variety, so a mis-pointed
blocks[0] fails open here rather than inheriting a wrong band. This
is the structural "looks like a Plan/feature task" guardrail: the
defining property of an inheritable parent is that it is stamped.
"""
blocks = task_b.get("blocks")
# Singleton-only: >1 entry is ambiguous (which is the parent?); 0/None
# has no parent pointer. Either way fail open.
if not isinstance(blocks, list) or len(blocks) != 1:
return None
parent_id = blocks[0]
if not isinstance(parent_id, str) or not parent_id:
return None
parent = read_task_json(parent_id, team_name)
if not parent:
return None
parent_metadata = parent.get("metadata")
if not isinstance(parent_metadata, dict):
return None
parent_variety = parent_metadata.get("variety")
if not isinstance(parent_variety, dict):
return None
parent_total = resolve_variety_total(parent_variety, parent_metadata)
if parent_total is None:
return None
return _band_from_total(parent_total)
def _resolve_required_band_via_blocks(
task_a: dict, team_name: str
) -> str:
"""Resolve the REQUIRED band for reasoning_reconstruction from Task A
via blocks traversal to Task B.
Returns one of:
- "required": resolved total >= TEACHBACK_REASONING_RECONSTRUCTION_REQUIRED_MIN
- "recommended": TEACHBACK_RECOMMENDED_BAND_MIN <= total < required-min
- "skipped": total < TEACHBACK_RECOMMENDED_BAND_MIN
- "unresolvable": blocks link missing, Task B file missing, or
variety absent/malformed/untotaled on Task B AND the parent
inheritance fallback also failed (fail-open: caller emits a separate
band_unresolvable advisory documenting the gap)
The total is resolved via the shared resolve_variety_total helper, so a
non-canonical stamp (score / top-level variety_score / dimension-sum)
resolves rather than reading as "unresolvable" — the same resolver the
write-time validator consults (the cross-rule consistency property).
#891 Opt2: when Task B has no resolvable variety, the band is inherited
from the parent (Plan/feature/umbrella) task before returning
"unresolvable" — see _inherit_band_from_parent. This keeps an unstamped
Task B's band resolvable (consultations are frequently 11-13) instead of
silently mis-resolving as "skipped".
"""
blocks = task_a.get("blocks")
if not isinstance(blocks, list) or not blocks:
return "unresolvable"
# Convention: Task A blocks Task B (the work task). The first blocked
# ID is the canonical work-task pointer in the Teachback-Gated Dispatch
# shape — multi-block teachback tasks are not in current convention.
task_b_id = blocks[0]
if not isinstance(task_b_id, str) or not task_b_id:
return "unresolvable"
if not team_name:
return "unresolvable"
task_b = read_task_json(task_b_id, team_name)
if not task_b:
return "unresolvable"
metadata = task_b.get("metadata")
# Opt2 broadened the inherit trigger: a metadata-not-dict OR variety-not-dict
# Task B (not only an absent variety) now resolves total=None and flows to
# the parent-inherit fallback below, rather than returning "unresolvable"
# immediately as the pre-Opt2 code did. The floor is preserved (an
# unresolvable parent still yields "unresolvable").
variety = metadata.get("variety") if isinstance(metadata, dict) else None
total = (
resolve_variety_total(variety, metadata)
if isinstance(variety, dict)
else None
)
if total is None:
# Task B variety absent/malformed/untotaled → try parent inheritance
# (Opt2) before conceding "unresolvable". Floor preserved: a parent
# that also fails to resolve returns None → "unresolvable".
return _inherit_band_from_parent(task_b, team_name) or "unresolvable"
return _band_from_total(total)
def evaluate_lifecycle(input_data: dict) -> list[tuple[str, str]]:
"""Return list of (rule, message) advisory tuples. Empty list → ALLOW
silently.
The ``rule`` element is a behavioral identifier (e.g.
``"teachback_addblocks_missing"``, ``"self_completion"``) used in
journal events; the ``message`` element is the human-readable