-
Notifications
You must be signed in to change notification settings - Fork 30
Expand file tree
/
Copy pathhooks.py
More file actions
1441 lines (1279 loc) · 58.8 KB
/
Copy pathhooks.py
File metadata and controls
1441 lines (1279 loc) · 58.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""PreToolUse, PostToolUse, and Stop hook callbacks.
- PreToolUse: three-outcome Cedar policy enforcement (ALLOW / DENY /
REQUIRE_APPROVAL). The REQUIRE_APPROVAL path writes a pending approval
row + transitions the task to AWAITING_APPROVAL atomically, polls for a
human decision, then resumes / denies per the user's input. See
``docs/design/CEDAR_HITL_GATES.md`` §6.5.
- PostToolUse: output scanner for secrets/PII.
- Stop: between-turns hook dispatcher. Cancel → nudge → denial injection
in that order (cancel-wins semantics, finding #2). Each producer
appends synthetic user-message strings that get reinjected via the
SDK's ``decision: "block"`` mechanism.
A module-level registry ``between_turns_hooks`` lets phases (Phase 2
nudges, Phase 3 denial injections) append additional synthetic-message
producers without touching the Stop hook callback itself.
"""
from __future__ import annotations
import asyncio
import json
import os
import re
import time
from collections.abc import Callable
from datetime import UTC
from typing import TYPE_CHECKING, Any
import nudge_reader
import task_state
from nudge_reader import _xml_escape
from output_scanner import scan_tool_output
from policy import APPROVAL_RATE_LIMIT, FLOOR_TIMEOUT_S, Outcome
from progress_writer import _generate_ulid
from shell import log, log_error_cw
if TYPE_CHECKING:
from policy import PolicyEngine
from telemetry import _TrajectoryWriter
# ---------------------------------------------------------------------------
# Chunk 3 constants (§6.5 pseudocode)
# ---------------------------------------------------------------------------
FLOOR_30S: int = FLOOR_TIMEOUT_S # §6 decision #6: sourced from contracts/constants.json
CLEANUP_MARGIN_120S: int = 120 # §6.5 lifetime-margin reserve for cleanup
# Poll cadence per §3 decision #3 and IMPL-12: 2s for the first 30s, 5s
# thereafter. Exact counts vary with ``timeout_s``; these pin the
# user-observable "fast for a bit, then slack off" behavior.
POLL_FAST_INTERVAL_S: float = 2.0
POLL_FAST_DURATION_S: float = 30.0
POLL_SLOW_INTERVAL_S: float = 5.0
POLL_DEGRADED_FAILS: int = 3 # emit approval_poll_degraded at this count (§13.2)
POLL_MAX_CONSECUTIVE_FAILS: int = 10 # treat as TIMED_OUT at this count (§13.2)
TOOL_INPUT_PREVIEW_MAX: int = 256 # §6.5: strip-ANSI, truncate
ELLIPSIS_LEN: int = 3 # chars reserved for the "..." truncation marker
# ANSI CSI / OSC escape sequence stripper for ``tool_input_preview`` +
# ``permissionDecisionReason`` fields (§12.7). Re-derives the pattern from
# the canonical definition; kept local to avoid adding a cross-module
# dependency for one regex.
_ANSI_ESCAPE_RE = re.compile(r"\x1B(?:\[[0-?]*[ -/]*[@-~]|\][^\x07\x1B]*(?:\x07|\x1B\\)|[@-Z\\-_])")
def _strip_ansi(text: str) -> str:
"""Remove ANSI CSI / OSC sequences to prevent terminal injection (§12.7)."""
return _ANSI_ESCAPE_RE.sub("", text)
def _truncate(text: str | None, max_len: int) -> str:
"""Truncate ``text`` to ``max_len`` chars with an ellipsis marker."""
if text is None:
return ""
if len(text) <= max_len:
return text
# Reserve 3 chars for the ellipsis so the returned string never
# exceeds ``max_len``. For very small ``max_len`` (<= 3) there is no
# room for the ellipsis and ``max_len - 3`` would slice negatively
# (dropping characters off the END), so fall back to a plain prefix.
if max_len <= ELLIPSIS_LEN:
return text[:max_len]
return text[: max_len - ELLIPSIS_LEN] + "..."
def _tool_input_preview(tool_input: Any, max_len: int = TOOL_INPUT_PREVIEW_MAX) -> str:
"""Return an ANSI-stripped, truncated preview of ``tool_input`` for DDB.
Uses ``json.dumps`` so dict/list inputs render as stable JSON rather
than Python repr (avoids leaking ``OrderedDict(...)`` wrappers etc.).
Falls back to ``str()`` on any serialization error; never raises.
"""
try:
rendered = (
tool_input if isinstance(tool_input, str) else json.dumps(tool_input, default=str)
)
except (TypeError, ValueError):
rendered = str(tool_input)
return _truncate(_strip_ansi(rendered), max_len)
def _deny_response(reason: str) -> dict:
"""Build a PreToolUse DENY response with a sanitized reason.
Guaranteed surface: ``permissionDecisionReason`` is ANSI-stripped and
truncated to 500 chars so it can never carry terminal-escape injection
or overflow a log line (§6.5, §12.7).
"""
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": _truncate(_strip_ansi(reason or "denied"), 500),
}
}
def _allow_response(reason: str = "permitted") -> dict:
"""Build a PreToolUse ALLOW response."""
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "allow",
"permissionDecisionReason": reason,
}
}
async def pre_tool_use_hook(
hook_input: Any,
tool_use_id: str | None,
hook_context: Any,
*,
engine: PolicyEngine,
trajectory: _TrajectoryWriter | None = None,
task_id: str | None = None,
user_id: str | None = None,
progress: Any = None,
task_state_module: Any = None,
) -> dict:
"""PreToolUse hook: three-outcome Cedar policy enforcement (§6.5).
Returns a dict with hookSpecificOutput containing:
- permissionDecision: "allow" or "deny"
- permissionDecisionReason: explanation string
The REQUIRE_APPROVAL path (Chunk 3, §6.5) pauses here: writes a
pending approval row + transitions the task to AWAITING_APPROVAL
atomically, polls for a human decision with 2s→5s backoff, then
returns allow / deny based on the decision. On TIMED_OUT a
ConditionCheckFailed from the best-effort status write triggers the
IMPL-24 re-read — if the user's decision landed between our poll
and write, we honor it instead of falsely denying.
``task_id`` / ``user_id`` / ``progress`` are optional to preserve
the Phase 1 test call shape. Without them the REQUIRE_APPROVAL path
falls through to fail-closed DENY (state-write infrastructure is
missing), so legacy callers still see coherent behaviour.
``task_state_module`` is a test seam for injecting a mocked
``task_state`` namespace; production callers rely on the default.
"""
ts_module = task_state_module if task_state_module is not None else task_state
if not isinstance(hook_input, dict):
log("WARN", "PreToolUse hook received non-dict input — denying")
return _deny_response("invalid hook input")
tool_name = hook_input.get("tool_name", "unknown")
tool_input = hook_input.get("tool_input", {})
if isinstance(tool_input, str):
try:
tool_input = json.loads(tool_input)
except (json.JSONDecodeError, TypeError):
log("WARN", f"PreToolUse hook failed to parse tool_input — denying {tool_name}")
return _deny_response("unparseable tool input")
# Fail-closed contract: every downstream consumer (Cedar evaluation,
# the approval-row builder, the SHA-256 cache key) assumes ``tool_input``
# is a JSON object. A bare list/scalar (e.g. ``"[1,2]"`` or ``"\"foo\""``
# decoded by the branch above, or a non-dict passed in directly) would
# otherwise raise an AttributeError deep in the engine and rely on the
# SDK-boundary wrapper to catch it. Make the rejection explicit here so
# the deny reason names the malformed input rather than a stack trace.
if not isinstance(tool_input, dict):
log("WARN", f"PreToolUse hook received non-dict tool_input — denying {tool_name}")
return _deny_response("tool input is not an object")
decision = engine.evaluate_tool_use(tool_name, tool_input)
# Telemetry: ALLOW "permitted" is the quiet happy path; everything else
# is worth a trajectory event. Treat REQUIRE_APPROVAL as "not allowed"
# for the legacy ``allowed=False`` field so the Phase 2 trajectory
# schema stays coherent — the specific outcome is already on the
# ``reason`` string.
if trajectory and decision.reason != "permitted":
trajectory.write_policy_decision(
tool_name, decision.allowed, decision.reason, decision.duration_ms
)
if decision.outcome == Outcome.ALLOW:
return _allow_response(decision.reason or "permitted")
if decision.outcome == Outcome.DENY:
# IMPL-23: when the DENY arrived from the recent-decision cache
# (evaluate_tool_use Step 2.5), emit a ``policy_decision``
# milestone with ``decision_source="recent_decision_cache"`` to
# TaskEventsTable so cache-driven denies are visible in the live
# stream + 90d audit record (§12.8). No new approval row is
# written and the gate counter is NOT bumped — the original
# gate already accounted for the decision.
if progress is not None and decision.cache_hit_metadata is not None:
_try_progress(
progress,
"write_policy_decision_cached",
**decision.cache_hit_metadata,
)
log("POLICY", f"DENIED: {tool_name} — {decision.reason}")
return _deny_response(decision.reason)
# -- REQUIRE_APPROVAL path (§6.5) ---------------------------------------
return await _handle_require_approval(
decision=decision,
tool_name=tool_name,
tool_input=tool_input,
engine=engine,
task_id=task_id,
user_id=user_id,
progress=progress,
ts=ts_module,
)
async def _handle_require_approval(
*,
decision: Any,
tool_name: str,
tool_input: dict,
engine: PolicyEngine,
task_id: str | None,
user_id: str | None,
progress: Any,
ts: Any,
) -> dict:
"""REQUIRE_APPROVAL branch of ``pre_tool_use_hook``.
Split out of the main hook for readability — the control-flow in
§6.5 is long enough that inlining it obscures the top-level three-way
branch.
"""
# Missing task infrastructure → fail closed with a clear reason. This
# lines up with §13.15: every exceptional branch ends in DENY.
if not task_id:
log("WARN", "REQUIRE_APPROVAL hit without task_id — fail-closed deny")
return _deny_response("approval system unavailable (no task_id)")
request_id = _generate_ulid()
# Step 1 — per-task cap. §12.9: cap exceeded fails closed and the
# cap_exceeded milestone carries the configured cap so dashboards
# reflect the blueprint override.
if engine.approval_gate_count >= engine.approval_gate_cap:
if progress is not None:
_try_progress(
progress,
"write_approval_cap_exceeded",
request_id=request_id,
count=engine.approval_gate_count,
cap=engine.approval_gate_cap,
)
return _deny_response(f"approval-gate cap exceeded ({engine.approval_gate_cap}/task)")
# Step 2 — per-minute rate limit. ``approvals_in_last_minute`` prunes
# on read so the comparison is against the current sliding window.
rate = engine.approvals_in_last_minute
if rate >= APPROVAL_RATE_LIMIT:
if progress is not None:
_try_progress(
progress,
"write_approval_rate_limit_exceeded",
request_id=request_id,
rate=rate,
limit=APPROVAL_RATE_LIMIT,
)
return _deny_response(f"approval-gate rate limit exceeded ({APPROVAL_RATE_LIMIT}/min)")
# Step 3 — effective timeout with floor/ceiling math (§6.5). Emit
# ``approval_timeout_capped`` when the caller's ask is clipped so the
# user can see why (IMPL-26).
remaining_lifetime = _remaining_maxlifetime_s()
effective_timeout, clip_reason, requested_timeout = _compute_effective_timeout(
decision_timeout_s=decision.timeout_s,
task_default_timeout_s=engine.task_default_timeout_s,
remaining_lifetime_s=remaining_lifetime,
)
if clip_reason is not None and progress is not None:
_try_progress(
progress,
"write_approval_timeout_capped",
request_id=request_id,
requested_timeout_s=requested_timeout,
effective_timeout_s=effective_timeout,
reason=clip_reason,
matching_rule_ids=(
list(decision.matching_rule_ids) if clip_reason == "rule_annotation" else None
),
)
# IMPL-26: once per task, surface the "gates will have small windows
# from here on" ceiling-shrinking milestone when the remaining
# lifetime is approaching the 2x-task-default threshold.
if (
remaining_lifetime is not None
and remaining_lifetime - CLEANUP_MARGIN_120S < 2 * engine.task_default_timeout_s
and engine.mark_ceiling_shrinking_emitted()
and progress is not None
):
_try_progress(
progress,
"write_approval_ceiling_shrinking",
request_id=request_id,
max_lifetime_remaining_s=remaining_lifetime,
cleanup_margin_s=CLEANUP_MARGIN_120S,
task_default_timeout_s=engine.task_default_timeout_s,
)
# Step 4 — insufficient lifetime remaining for a valid approval
# (§13.7). Below the floor we DENY immediately without writing the
# approval row; no point waking the user to a guaranteed-dead gate.
if remaining_lifetime is not None and remaining_lifetime - CLEANUP_MARGIN_120S < FLOOR_30S:
return _deny_response(
f"insufficient maxLifetime remaining ({remaining_lifetime}s) for approval"
)
# Step 5 — build the approval row per §10.1 schema.
tool_input_sha256 = _sha256_tool_input_for_row(tool_input)
row = {
"task_id": task_id,
"request_id": request_id,
"tool_name": tool_name,
"tool_input_preview": _tool_input_preview(tool_input),
"tool_input_sha256": tool_input_sha256,
"reason": decision.reason,
"severity": decision.severity or "medium",
"matching_rule_ids": list(decision.matching_rule_ids),
"status": "PENDING",
"created_at": _iso_now(),
"timeout_s": effective_timeout,
"ttl": int(time.time()) + effective_timeout + CLEANUP_MARGIN_120S,
"user_id": user_id or "",
"repo": engine.repo,
}
# Step 6 — bump counters BEFORE the write so cap/rate checks on
# subsequent gates reflect the attempt even if the DDB write itself
# fails. The session counter survives within the task; the failure
# path below emits ``approval_write_failed`` so the lost row is
# visible.
engine.increment_approval_gate_count()
engine.record_approval_gate_timestamp()
# Chunk 7 (§13.6): best-effort atomic increment of the persisted
# ``approval_gate_count`` on TaskTable. The session counter
# enforces the cap within THIS container; the persisted counter
# exists so a restarted container re-seeds from a non-zero value
# instead of re-exposing the user to another ``approval_gate_cap``
# worth of gates. Failure is best-effort per §13.6 — "counter is
# a safety bound, not a correctness bound" — so we keep going on
# error and accept the (bounded) restart-retry amplification.
if task_id:
await asyncio.to_thread(ts.increment_approval_gate_count_in_ddb, task_id)
# Step 7 — cross-table atomic transition.
try:
await asyncio.to_thread(ts.transact_write_approval_request, task_id, request_id, row)
except ts.ApprovalWriteError as exc:
if progress is not None:
_try_progress(
progress,
"write_approval_write_failed",
request_id=request_id,
error=f"cancelled: {exc.cancellation_reasons}",
)
return _deny_response("approval system unavailable (write cancelled)")
except ts.ApprovalTablesUnavailable as exc:
if progress is not None:
_try_progress(
progress,
"write_approval_write_failed",
request_id=request_id,
error=f"tables unavailable: {exc}",
)
return _deny_response("approval system unavailable (tables unconfigured)")
except Exception as exc:
log(
"ERROR",
f"approval request write failed: {type(exc).__name__}: {exc}",
)
if progress is not None:
_try_progress(
progress,
"write_approval_write_failed",
request_id=request_id,
error=f"{type(exc).__name__}: {exc}",
)
return _deny_response("approval system unavailable")
# Step 8 — ``approval_requested`` milestone so the user's stream
# shows the gate immediately.
if progress is not None:
_try_progress(
progress,
"write_approval_requested",
request_id=request_id,
tool_name=tool_name,
input_preview=row["tool_input_preview"],
reason=decision.reason,
severity=row["severity"],
timeout_s=effective_timeout,
matching_rule_ids=list(decision.matching_rule_ids),
)
# Step 9 — poll for a decision.
outcome = await _poll_for_decision(
task_id=task_id,
request_id=request_id,
timeout_s=effective_timeout,
progress=progress,
ts=ts,
)
# Step 10 — IMPL-24 VM-throttle + late-approval race. Best-effort
# flip to TIMED_OUT; if ConditionCheckFailed, the user beat us — read
# and honor.
if outcome["status"] == "TIMED_OUT":
try:
wrote = await asyncio.to_thread(
ts.best_effort_update_approval_status,
task_id,
request_id,
"TIMED_OUT",
reason=outcome.get("reason"),
)
except Exception as exc:
log("WARN", f"approval TIMED_OUT write raised: {type(exc).__name__}: {exc}")
# Fall into the IMPL-24 re-read path. A transient DDB write
# error MUST NOT bypass the late-approval check — the user's
# APPROVED decision may already be on the row, and skipping
# the re-read would falsely deny their tool call. Setting
# ``wrote = False`` triggers the ConsistentRead below; if
# the row still says PENDING the re-read is a no-op and we
# keep the TIMED_OUT outcome. If it says APPROVED/DENIED,
# ``_reconcile_late_decision`` honors the user's choice.
wrote = False
if not wrote:
# User's decision beat our timer; re-read with ConsistentRead.
try:
row_reread = await asyncio.to_thread(
ts.get_approval_row,
task_id,
request_id,
consistent_read=True,
)
except Exception as exc:
log("WARN", f"approval re-read raised: {type(exc).__name__}: {exc}")
row_reread = None
outcome = _reconcile_late_decision(outcome, row_reread, progress, request_id)
# Step 11 — resume transition (RUNNING). The ``awaiting_approval_request_id``
# condition prevents resuming a cancelled task or racing with another
# approval.
try:
await asyncio.to_thread(ts.transact_resume_from_approval, task_id, request_id)
except ts.ApprovalResumeError as exc:
if progress is not None:
_try_progress(
progress,
"write_approval_resume_failed",
request_id=request_id,
error=f"cancelled: {exc.cancellation_reasons}",
)
return _deny_response("task no longer awaiting approval")
except Exception as exc:
log("WARN", f"approval resume raised: {type(exc).__name__}: {exc}")
if progress is not None:
_try_progress(
progress,
"write_approval_resume_failed",
request_id=request_id,
error=f"{type(exc).__name__}: {exc}",
)
return _deny_response("approval resume failed")
# Step 12 — terminal branches.
status = outcome.get("status")
if status == "APPROVED":
scope = outcome.get("scope") or "this_call"
if scope != "this_call":
try:
engine.allowlist.add(scope)
except ValueError as exc:
# Malformed scope from the API — log loudly but still
# allow this one call (the user did approve it).
log("WARN", f"invalid approved scope {scope!r}: {exc}")
if progress is not None:
_try_progress(
progress,
"write_approval_granted",
request_id=request_id,
scope=scope,
decided_at=outcome.get("decided_at"),
# Chunk 8a: propagate the row's ``created_at`` so the
# ApprovalMetricsPublisher can compute decision latency.
created_at=row.get("created_at"),
)
return _allow_response(f"User approved ({scope})")
# DENIED or TIMED_OUT — cache + queue injection.
cache_decision = "DENIED" if status == "DENIED" else "TIMED_OUT"
# IMPL-23: thread the user's ``decided_at`` into the cache entry so
# subsequent cache-hit events surface the ORIGINAL decision timestamp,
# not the wall-clock time the cache was populated (which is ~the same
# but technically wrong). ``decided_at`` for TIMED_OUT is the
# agent-side clock moment the timeout fired; for DENIED it's the
# user's deny timestamp from the Lambda audit row.
engine.recent_decisions.record(
tool_name,
tool_input_sha256,
decision=cache_decision,
reason=outcome.get("reason", ""),
original_decision_ts=outcome.get("decided_at"),
)
# Rule-level cache (§12.8 extension): on DENIED, record an entry
# per matching_rule_id so semantic retries — same rule, different
# input — get fast-denied without a new approval round-trip. Only
# populate on DENIED because TIMED_OUT is ambiguous (user was
# away, not actively refusing); TIMED_OUT cache entries stay
# input-hash-scoped.
if status == "DENIED":
for rule_id in decision.matching_rule_ids:
engine.recent_decisions.record_rule_decision(
tool_name,
rule_id,
decision="DENIED",
reason=outcome.get("reason", ""),
original_decision_ts=outcome.get("decided_at"),
)
if status == "DENIED":
engine.queue_denial_injection(
request_id=request_id,
reason=outcome.get("reason", ""),
decided_at=outcome.get("decided_at"),
)
if progress is not None:
_try_progress(
progress,
"write_approval_denied",
request_id=request_id,
reason=outcome.get("reason", ""),
decided_at=outcome.get("decided_at"),
# Chunk 8a: propagate the row's ``created_at`` so the
# ApprovalMetricsPublisher can compute decision latency.
created_at=row.get("created_at"),
)
elif progress is not None:
_try_progress(
progress,
"write_approval_timed_out",
request_id=request_id,
timeout_s=effective_timeout,
# Chunk 8a: propagate the row's ``created_at`` +
# ``matching_rule_ids`` + the post-clip effective timeout
# so the ApprovalMetricsPublisher can emit the decision
# latency + the ``ApprovalTimeoutBreakdown`` histogram
# with a normalized ``rule_id`` dimension.
created_at=row.get("created_at"),
effective_timeout_s=effective_timeout,
matching_rule_ids=list(decision.matching_rule_ids),
)
# Guaranteed surface (§6.5): truncated reason even when denial
# injection is pre-empted by a concurrent cancel. Wrap the user's
# reason in authoritative stop-language — E2E Phase 4 observed
# the agent treating bare "User denied" as "try a different
# approach" and burning through max_turns retrying the same rule
# with trivial variations. The explicit AUTHORITATIVE-prefixed
# wording, combined with the rule-level recent-deny cache (§12.8),
# makes retries fail fast with clear feedback.
raw_reason = outcome.get("reason") or f"User {status.lower() if status else 'denied'}"
if status == "DENIED":
rule_hint = (
f" (matching rule{'s' if len(decision.matching_rule_ids) != 1 else ''}: "
f"{', '.join(decision.matching_rule_ids)})"
if decision.matching_rule_ids
else ""
)
reason_text = (
f"AUTHORITATIVE DENY from human reviewer: {raw_reason}{rule_hint}. "
"Do NOT retry this class of action with trivial variations; "
"the same rule will fast-deny subsequent attempts. Find an "
"alternative task strategy or report back to the user explaining "
"why progress is blocked."
)
else:
reason_text = raw_reason
return _deny_response(reason_text)
def _reconcile_late_decision(
outcome: dict,
row: dict | None,
progress: Any,
request_id: str,
) -> dict:
"""IMPL-24: rebuild outcome from a re-read row after a TIMED_OUT race.
- ``row["status"] == "APPROVED"`` → rebuild as APPROVED (allow flow).
- ``row["status"] == "DENIED"`` → rebuild as DENIED (deny flow).
- Anything else (row gone, still PENDING) → fall through with the
original TIMED_OUT (§13.12 fail-closed branch).
Emits ``approval_late_win`` for APPROVED or DENIED races so operator
telemetry can count them.
"""
if row is None:
return outcome
status = row.get("status")
if status == "APPROVED":
if progress is not None:
_try_progress(
progress,
"write_approval_late_win",
request_id=request_id,
outcome="APPROVED",
reason="user decision landed during TIMED_OUT write",
)
return {
"status": "APPROVED",
"scope": row.get("scope"),
"decided_at": row.get("decided_at"),
"decided_by": row.get("user_id"),
}
if status == "DENIED":
if progress is not None:
_try_progress(
progress,
"write_approval_late_win",
request_id=request_id,
outcome="DENIED",
reason="user decision landed during TIMED_OUT write",
)
return {
"status": "DENIED",
"reason": row.get("deny_reason") or "denied",
"decided_at": row.get("decided_at"),
}
return outcome
async def _poll_for_decision(
*,
task_id: str,
request_id: str,
timeout_s: int,
progress: Any,
ts: Any,
) -> dict:
"""Poll the approval row until terminal or timeout.
Cadence: ``POLL_FAST_INTERVAL_S`` for ``POLL_FAST_DURATION_S``, then
``POLL_SLOW_INTERVAL_S`` (IMPL-12). Each iteration uses
ConsistentRead; after ``POLL_DEGRADED_FAILS`` consecutive failures we
emit ``approval_poll_degraded``; at ``POLL_MAX_CONSECUTIVE_FAILS`` we
fall through as TIMED_OUT with a distinct reason (§13.2).
Returns an outcome dict mirroring the approval row's terminal fields.
"""
deadline = time.monotonic() + timeout_s
start = time.monotonic()
consecutive_fails = 0
degraded_emitted = False
while True:
now = time.monotonic()
if now >= deadline:
return {"status": "TIMED_OUT", "reason": None}
try:
row = await asyncio.to_thread(
ts.get_approval_row,
task_id,
request_id,
consistent_read=True,
)
consecutive_fails = 0
except Exception as exc:
consecutive_fails += 1
log(
"WARN",
f"approval poll get_item raised ({consecutive_fails}/"
f"{POLL_MAX_CONSECUTIVE_FAILS}): {type(exc).__name__}: {exc}",
)
if consecutive_fails >= POLL_DEGRADED_FAILS and not degraded_emitted:
if progress is not None:
_try_progress(
progress,
"write_approval_poll_degraded",
request_id=request_id,
consecutive_failures=consecutive_fails,
)
degraded_emitted = True
if consecutive_fails >= POLL_MAX_CONSECUTIVE_FAILS:
return {
"status": "TIMED_OUT",
"reason": f"poll failed {consecutive_fails} consecutive times",
}
row = None # force sleep below
if row is not None:
status = row.get("status")
if status == "APPROVED":
return {
"status": "APPROVED",
"scope": row.get("scope"),
"decided_at": row.get("decided_at"),
"decided_by": row.get("user_id"),
}
if status == "DENIED":
return {
"status": "DENIED",
"reason": row.get("deny_reason") or "denied",
"decided_at": row.get("decided_at"),
}
# Compute sleep interval based on elapsed since poll started.
elapsed = time.monotonic() - start
interval = POLL_FAST_INTERVAL_S if elapsed < POLL_FAST_DURATION_S else POLL_SLOW_INTERVAL_S
# Clamp sleep against remaining deadline so we don't oversleep.
sleep_for = min(interval, max(0.0, deadline - time.monotonic()))
if sleep_for <= 0:
return {"status": "TIMED_OUT", "reason": None}
await asyncio.sleep(sleep_for)
def _compute_effective_timeout(
*,
decision_timeout_s: int | None,
task_default_timeout_s: int,
remaining_lifetime_s: int | None,
) -> tuple[int, str | None, int]:
"""Compute the effective timeout per §6.5.
``min(rule-annotation timeout, task default, remaining lifetime -
cleanup margin)``, floored at FLOOR_30S. The engine's
``_merge_annotations`` already applies ``min(rule_annotation,
task_default)`` — decision.timeout_s reaches us pre-clipped against
those two. Here we apply the remaining-lifetime ceiling and report
whichever source pulled the effective timeout below the task
default, so the user sees "your gate was clipped because ..." rather
than silent clipping.
Returns ``(effective, clip_reason, requested)``:
- ``requested`` — the user-visible "would have liked" value (task
default for display purposes; the rule's ask was already merged).
- ``clip_reason`` — ``"rule_annotation"`` when the rule's annotation
pulled the decision below the task default; ``"maxLifetime_ceiling"``
when the remaining-lifetime ceiling is the tightest bound; ``None``
when nothing clipped.
"""
requested = task_default_timeout_s
decision_value = (
decision_timeout_s if decision_timeout_s is not None else task_default_timeout_s
)
# Start with the decision value (already clipped by rule vs task default
# in the engine) and apply the remaining-lifetime ceiling here.
effective = decision_value
clip_reason: str | None = None
# Rule annotation clipped below task default — surface that first so a
# later lifetime-ceiling clip can override with a more specific reason.
if decision_value < requested:
clip_reason = "rule_annotation"
if remaining_lifetime_s is not None:
ceiling = remaining_lifetime_s - CLEANUP_MARGIN_120S
if ceiling < effective:
effective = ceiling
clip_reason = "maxLifetime_ceiling"
# Floor: if clipping pushed below the hard floor, the effective value
# is floored (so the user can still respond) but the clip reason is
# still the tightest-binding input. Floor is a safety net, not a user
# concept.
if effective < FLOOR_30S:
effective = FLOOR_30S
return effective, clip_reason, requested
def _remaining_maxlifetime_s() -> int | None:
"""Compute remaining AgentCore maxLifetime seconds.
Reads ``AGENTCORE_MAX_LIFETIME_S`` (default 8h) and ``TASK_STARTED_AT``
(ISO 8601, optional). Returns ``None`` if the start timestamp is
unavailable; the hook treats this as "unknown, don't clip" so the
gate still fires with the task default (fail-open on the optional
signal rather than pre-DENY when unknown). A future Chunk wires
these from the task launch path; for now they are optional hints.
"""
try:
max_lifetime = int(os.environ.get("AGENTCORE_MAX_LIFETIME_S", "28800"))
except ValueError:
max_lifetime = 28800
started_at = os.environ.get("TASK_STARTED_AT")
if not started_at:
return None
try:
# Support both ISO 8601 (YYYY-MM-DDTHH:MM:SSZ) and raw epoch seconds.
if started_at.isdigit():
started_epoch = int(started_at)
else:
from datetime import datetime
# The trailing Z means UTC; strptime returns a naive datetime whose
# .timestamp() would otherwise be interpreted in the container's
# local TZ, skewing remaining-lifetime math by the UTC offset.
started_epoch = int(
datetime.strptime(started_at, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC).timestamp()
)
except (ValueError, AttributeError):
return None
elapsed = int(time.time()) - started_epoch
remaining = max_lifetime - elapsed
return max(0, remaining)
def _sha256_tool_input_for_row(tool_input: Any) -> str:
"""Stable SHA-256 of ``tool_input`` for the approval row + cache key.
Re-derives hashing here (rather than importing ``policy._sha256_tool_input``)
so the hook's failure-mode is independent of the engine's internals and
to keep import graphs shallow. The engine's own cache uses the same
algorithm; §6.5 row + ``RecentDecisionCache`` need the same key shape.
"""
import hashlib
try:
serialized = (
tool_input if isinstance(tool_input, str) else json.dumps(tool_input, sort_keys=True)
)
except (TypeError, ValueError):
serialized = str(tool_input)
return hashlib.sha256(serialized.encode("utf-8")).hexdigest()
def _iso_now() -> str:
"""ISO 8601 UTC timestamp in the ``YYYY-MM-DDTHH:MM:SSZ`` form."""
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
# S10: per-method consecutive-failure counters for ``_try_progress``.
# Progress writes are best-effort, but a hard infrastructure failure
# (DDB throttle, IAM regression, missing table) would otherwise be
# masked as a stream of WARN lines with no escalation. After
# ``_TRY_PROGRESS_ESCALATE_AFTER`` consecutive failures of the same
# method the next failure is logged at ERROR via ``log_error_cw`` so
# it lands in APPLICATION_LOGS / TaskDashboard, and the counter
# resets so escalations don't spam. Per-method state because a
# single failing method shouldn't suppress visibility into other
# kinds of progress writes that might still be working.
_TRY_PROGRESS_ESCALATE_AFTER = 5
_try_progress_consecutive_failures: dict[str, int] = {}
def _try_progress(progress: Any, method_name: str, /, **kwargs: Any) -> None:
"""Call a progress-writer method, swallowing errors.
Progress is best-effort observability; a throttled DDB write must not
break the approval flow. ``_emit_nudge_milestone`` uses a similar
pattern for the Phase 2 nudges.
S10 escalation: repeated consecutive failures of the same
``method_name`` are tracked per-process and escalated to ERROR
after ``_TRY_PROGRESS_ESCALATE_AFTER`` so a silent live-stream
outage becomes operator-visible.
"""
if getattr(progress, "_disabled", False) is True:
log("WARN", f"progress {method_name!r} skipped: circuit breaker open")
return
method = getattr(progress, method_name, None)
if method is None:
log("DEBUG", f"progress missing {method_name!r}; skipping")
return
try:
method(**kwargs)
except Exception as exc: # pragma: no cover — defensive
prev = _try_progress_consecutive_failures.get(method_name, 0)
count = prev + 1
if count >= _TRY_PROGRESS_ESCALATE_AFTER:
log_error_cw(
f"progress {method_name!r} has failed {count} consecutive times; "
f"latest: {type(exc).__name__}: {exc}",
)
_try_progress_consecutive_failures[method_name] = 0 # reset to avoid spam
else:
log("WARN", f"progress {method_name!r} raised: {type(exc).__name__}: {exc}")
_try_progress_consecutive_failures[method_name] = count
return
# Success path — reset the consecutive-failure count.
if _try_progress_consecutive_failures.get(method_name):
_try_progress_consecutive_failures[method_name] = 0
async def post_tool_use_hook(
hook_input: Any,
tool_use_id: str | None,
hook_context: Any,
*,
trajectory: _TrajectoryWriter | None = None,
) -> dict:
"""PostToolUse hook: screen tool output for secrets/PII.
Returns a dict with hookSpecificOutput. When sensitive content is
detected the response includes ``updatedMCPToolOutput`` containing the
redacted version (steered enforcement — content is sanitized, not
blocked).
"""
_PASS_THROUGH: dict = {"hookSpecificOutput": {"hookEventName": "PostToolUse"}}
_FAIL_CLOSED: dict = {
"hookSpecificOutput": {
"hookEventName": "PostToolUse",
"updatedMCPToolOutput": "[Output redacted: screening error — fail-closed]",
}
}
if not isinstance(hook_input, dict):
log("WARN", "PostToolUse hook received non-dict input — passing through")
return _PASS_THROUGH
tool_name = hook_input.get("tool_name", "unknown")
if "tool_response" not in hook_input:
log("WARN", f"PostToolUse hook: missing 'tool_response' key for {tool_name}")
return _PASS_THROUGH
tool_response = hook_input["tool_response"]
# Normalise non-string responses
if not isinstance(tool_response, str):
tool_response = str(tool_response)
try:
result = scan_tool_output(tool_response)
except Exception as exc:
log("ERROR", f"Output scanner failed for {tool_name}: {type(exc).__name__}: {exc}")
if trajectory:
trajectory.write_output_screening_decision(
tool_name, [f"SCANNER_ERROR: {type(exc).__name__}"], redacted=True, duration_ms=0.0
)
return _FAIL_CLOSED
if result.has_sensitive_content:
if trajectory:
trajectory.write_output_screening_decision(
tool_name, result.findings, redacted=True, duration_ms=result.duration_ms
)
log("POLICY", f"OUTPUT REDACTED: {tool_name} — {', '.join(result.findings)}")
return {
"hookSpecificOutput": {
"hookEventName": "PostToolUse",
"updatedMCPToolOutput": result.redacted_content,
}
}
return _PASS_THROUGH
# ---------------------------------------------------------------------------
# Between-turns hook registry (Phase 2 nudges, extensible for Phase 3)
# ---------------------------------------------------------------------------
# A hook takes a context dict (currently ``{"task_id": str}``) and returns a
# list of synthetic user-message strings to inject before the agent's next
# turn. An empty list means "no injection — allow normal stop".
BetweenTurnsHook = Callable[[dict], list[str]]
# Process-lifetime dedup map: task_id -> set of nudge_ids already injected in
# this process. Guards against infinite re-injection if ``mark_consumed``
# persistently fails (DDB throttling, IAM drift) — without this, the same
# nudge would be re-injected every Stop hook firing until ``max_turns`` is
# exhausted. Lives for the duration of the process (== task) so it doesn't
# leak across tasks in the same runtime.
_INJECTED_NUDGES: dict[str, set[str]] = {}
# Preview length for the first nudge message in the milestone details
# string. Kept short so the whole details line stays under ~120 chars
# (single terminal line in ``bgagent watch``).
_NUDGE_PREVIEW_LEN = 60