Skip to content

Commit 3cd4b3b

Browse files
committed
feat(cedar-hitl): TaskApprovals + AWAITING_APPROVAL transition primitives
Adds the four agent-side DDB primitives §6.5 + IMPL-24 need for the three-outcome hook integration in the next commit: - ``transact_write_approval_request`` — cross-table TransactWriteItems: Put(TaskApprovalsTable) with ``attribute_not_exists(request_id)`` + Update(TaskTable) gated on ``status = RUNNING``. Atomic per §12.3 so a concurrent cancel cannot land the task in AWAITING_APPROVAL with no matching approval row (or vice versa). - ``transact_resume_from_approval`` — Update(TaskTable) gated on ``status = AWAITING_APPROVAL AND awaiting_approval_request_id = :rid``. The ``request_id`` condition prevents resuming with a stale ID after a reconciler race (§13.9). - ``best_effort_update_approval_status`` — conditional UpdateItem on the approval row with ``status = :pending`` guard. Returns False on ``ConditionalCheckFailedException``; this is the signal IMPL-24's re-read path fires on (§6.5 pseudocode lines 846-879, §13.12). - ``get_approval_row`` — GetItem with ``ConsistentRead=True`` by default. Required by IMPL-24's re-read; kept opt-out (bool flag) for future cold-path callers that don't need the strong read. Errors: - ``ApprovalTablesUnavailable`` for env-var-missing — raised loud so a pre-Chunk-4 deploy fails closed (hook will map to DENY) rather than silently no-op'ing the gate. - ``ApprovalWriteError`` / ``ApprovalResumeError`` wrap ``TransactionCanceledException`` with the cancellation reasons list. The hook uses these to distinguish the "concurrent cancel" branch from real DDB outages. - ``ConditionalCheckFailedException`` on ``update_item`` is consumed and returned as ``False`` from ``best_effort_update_approval_status`` — the caller (hook) needs the boolean to decide whether to re-read, not to propagate. - All other DDB errors propagate so the hook's outer try/except can classify fail-closed with a specific reason. Implementation notes: - Uses ``boto3.client("dynamodb")`` low-level API (not resource). ``transact_write_items`` lives on the client, and marshalling the approval row attributes explicitly gives deterministic DDB shapes that the tests can assert on. ``_py_to_ddb_attr`` covers the subset of Python types §10.1 actually uses (str/int/bool/None/list of str); any other type raises TypeError loudly rather than silently writing something unexpected. - ``_extract_error_code`` / ``_extract_cancellation_reasons`` duck-type on ``exc.response`` so we don't need botocore at import time (tests use a minimal exception class). - Errors from unsupported types (floats, dicts, etc.) are caught BEFORE the DDB round-trip so the unit-test asserts ``transact_write_items`` was not called — catches schema drift early. - Status constants (``_STATUS_RUNNING`` / ``_STATUS_AWAITING_APPROVAL``) named so a rename in CDK cannot silently diverge the Python path. Tests: +20 total. - 5 on TransactWriteApprovalRequest: env-missing, happy-path shape assertion (both items + conditions), TransactionCanceled → ApprovalWriteError with reasons preserved, other errors propagate, unsupported type rejected before any DDB call. - 3 on TransactResumeFromApproval: env-missing, happy-path expression shape (includes REMOVE awaiting_approval_request_id), cancel → ApprovalResumeError. - 4 on BestEffortUpdateApprovalStatus: happy path returns True, ``reason`` kwarg attaches ``deny_reason``, ConditionalCheckFailed returns False (IMPL-24's signal), other errors propagate. - 4 on GetApprovalRow: ConsistentRead default True, opt-out False, row-not-found returns None, row unmarshalling through every supported DDB attribute type. - 4 on helpers: error-code extraction with and without ClientError-shape, cancellation-reasons extraction with and without. No runtime callers yet — hook integration lands in commit C. Physical TaskApprovalsTable lands in Chunk 4; Python side is wire-compatible so the hook work can be unit-tested today with mocked clients.
1 parent 4c51a16 commit 3cd4b3b

2 files changed

Lines changed: 675 additions & 0 deletions

File tree

agent/src/task_state.py

Lines changed: 391 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,3 +408,394 @@ def get_task(task_id: str) -> dict | None:
408408
print(f"[task_state] get_task failed: {type(e).__name__}: {e}")
409409
raise TaskFetchError(f"{type(e).__name__}: {e}") from e
410410
return resp.get("Item")
411+
412+
413+
# ---------------------------------------------------------------------------
414+
# Cedar HITL approval primitives (§6.5, §9.1, IMPL-24)
415+
# ---------------------------------------------------------------------------
416+
#
417+
# ``TaskApprovalsTable`` and the AWAITING_APPROVAL status transitions land
418+
# physically in Chunk 4 (CDK). The agent-side helpers below are written to
419+
# that contract and exposed so Chunk 3's ``pre_tool_use_hook`` can be
420+
# implemented + unit-tested now (via mocked boto3 clients); Chunk 4 sets
421+
# ``TASK_APPROVALS_TABLE_NAME`` + grants IAM and the same helpers start
422+
# making real DDB calls with no further code change on the agent side.
423+
#
424+
# Primitives exposed:
425+
# - ``transact_write_approval_request`` — atomic Put(TaskApprovals) +
426+
# Update(TaskTable: RUNNING → AWAITING_APPROVAL). Raises
427+
# ``ApprovalWriteError`` on ``TransactionCanceledException`` so the
428+
# hook can return DENY + ``approval_write_failed`` (§13.1).
429+
# - ``transact_resume_from_approval`` — atomic Update(TaskTable:
430+
# AWAITING_APPROVAL → RUNNING) gated on
431+
# ``awaiting_approval_request_id = request_id``. Raises
432+
# ``ApprovalResumeError`` on cancellation (§13.9).
433+
# - ``best_effort_update_approval_status`` — conditional Update on the
434+
# approval row (``status = :pending`` guard). Returns ``False`` on
435+
# ``ConditionCheckFailed`` so IMPL-24's re-read re-read path fires.
436+
# - ``get_approval_row`` — strongly-consistent GetItem; default
437+
# ``consistent_read=True`` because IMPL-24's race fix relies on it.
438+
#
439+
# Errors beyond the structural conditions (unreachable DDB, IAM drift,
440+
# missing env var) raise ``ApprovalTablesUnavailable`` so the hook can
441+
# fail CLOSED without guessing. The hook maps that to DENY so a
442+
# pre-Chunk-4 deploy cannot silently bypass gates.
443+
444+
TASK_APPROVALS_TABLE_ENV = "TASK_APPROVALS_TABLE_NAME"
445+
TASK_TABLE_ENV = "TASK_TABLE_NAME"
446+
447+
# TaskTable status values referenced by the approval primitives. Kept as
448+
# constants so a rename in CDK cannot silently diverge the Python path.
449+
_STATUS_RUNNING = "RUNNING"
450+
_STATUS_AWAITING_APPROVAL = "AWAITING_APPROVAL"
451+
452+
453+
class ApprovalTablesUnavailable(RuntimeError):
454+
"""Either ``TASK_APPROVALS_TABLE_NAME`` or ``TASK_TABLE_NAME`` is unset.
455+
456+
Hook maps to DENY (fail-closed); see §13.15. Distinct from
457+
``TaskFetchError`` so callers do not collapse a config problem with a
458+
transient read failure.
459+
"""
460+
461+
462+
class ApprovalWriteError(RuntimeError):
463+
"""``transact_write_approval_request`` TransactionCanceledException.
464+
465+
Fired when the cross-table atomic write is cancelled — either the
466+
TaskTable precondition fails (task already cancelled / advanced past
467+
RUNNING) or the approval row already exists. Hook maps to DENY +
468+
``approval_write_failed`` (§13.1). The underlying cancellation reasons
469+
are stashed on ``.cancellation_reasons`` for triage.
470+
"""
471+
472+
def __init__(self, message: str, cancellation_reasons: list | None = None) -> None:
473+
super().__init__(message)
474+
self.cancellation_reasons = cancellation_reasons or []
475+
476+
477+
class ApprovalResumeError(RuntimeError):
478+
"""``transact_resume_from_approval`` TransactionCanceledException.
479+
480+
Fired when the resume transition fails — typically because the user
481+
cancelled the task mid-approval (§13.9). Hook maps to DENY +
482+
``approval_resume_failed``.
483+
"""
484+
485+
def __init__(self, message: str, cancellation_reasons: list | None = None) -> None:
486+
super().__init__(message)
487+
self.cancellation_reasons = cancellation_reasons or []
488+
489+
490+
def _get_ddb_client(*, client=None):
491+
"""Return a boto3 DDB low-level client, or the injected ``client`` for tests.
492+
493+
Tests inject a mock client rather than relying on moto because the
494+
primitives here touch ``transact_write_items`` with cross-table
495+
conditions, which moto's older versions do not fully emulate.
496+
"""
497+
if client is not None:
498+
return client
499+
import boto3
500+
501+
region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
502+
return boto3.client("dynamodb", region_name=region)
503+
504+
505+
def _require_tables() -> tuple[str, str]:
506+
"""Return ``(task_table, approvals_table)`` or raise.
507+
508+
Kept as a single guard so every caller surfaces the same error class.
509+
"""
510+
task_table = os.environ.get(TASK_TABLE_ENV)
511+
approvals_table = os.environ.get(TASK_APPROVALS_TABLE_ENV)
512+
if not task_table or not approvals_table:
513+
raise ApprovalTablesUnavailable(
514+
f"{TASK_TABLE_ENV}/{TASK_APPROVALS_TABLE_ENV} unset; approval gates cannot be recorded"
515+
)
516+
return task_table, approvals_table
517+
518+
519+
def _py_to_ddb_attr(value):
520+
"""Translate a Python value into the DDB low-level attribute shape.
521+
522+
Handles the subset we actually write: ``str``, ``int``, ``bool``,
523+
``None``, lists-of-str. More exotic types would need marshalling
524+
support; ``approval_row`` values are constrained to the §10.1 schema
525+
which falls entirely inside this subset.
526+
"""
527+
if value is None:
528+
return {"NULL": True}
529+
if isinstance(value, bool):
530+
return {"BOOL": value}
531+
if isinstance(value, int):
532+
return {"N": str(value)}
533+
if isinstance(value, str):
534+
return {"S": value}
535+
if isinstance(value, list):
536+
# Lists of strings (matching_rule_ids); other shapes are rejected
537+
# loudly so a future schema drift surfaces in tests rather than
538+
# silently writing an empty list.
539+
if all(isinstance(v, str) for v in value):
540+
return {"L": [{"S": v} for v in value]}
541+
raise TypeError(f"unsupported list element types in approval row: {value!r}")
542+
raise TypeError(f"unsupported approval-row attribute type: {type(value).__name__}")
543+
544+
545+
def _ddb_attr_to_py(attr):
546+
"""Inverse of ``_py_to_ddb_attr`` — enough to rehydrate an approval row."""
547+
if attr is None:
548+
return None
549+
if "NULL" in attr:
550+
return None
551+
if "BOOL" in attr:
552+
return attr["BOOL"]
553+
if "N" in attr:
554+
raw = attr["N"]
555+
# Keep integers integer-shaped for downstream arithmetic (ttl,
556+
# timeout_s). ``decided_at`` is a string so no floats to worry
557+
# about.
558+
try:
559+
return int(raw)
560+
except ValueError:
561+
return raw
562+
if "S" in attr:
563+
return attr["S"]
564+
if "L" in attr:
565+
return [_ddb_attr_to_py(item) for item in attr["L"]]
566+
# Unsupported shape; return raw to aid debugging rather than losing data.
567+
return attr
568+
569+
570+
def transact_write_approval_request(
571+
task_id: str,
572+
request_id: str,
573+
approval_row: dict,
574+
*,
575+
client=None,
576+
) -> None:
577+
"""Atomically record a pending approval + transition the task to AWAITING_APPROVAL.
578+
579+
Two items:
580+
1. Put on ``TaskApprovalsTable`` with ``ConditionExpression:
581+
attribute_not_exists(request_id)`` — guards against ULID collisions
582+
and duplicate writes on retry.
583+
2. Update on ``TaskTable`` with ``ConditionExpression: status =
584+
:running`` — fails if the task has already been cancelled, failed,
585+
or is still pre-RUNNING. On success sets
586+
``status = AWAITING_APPROVAL`` and
587+
``awaiting_approval_request_id = <request_id>`` so the resume
588+
transition can verify it's resuming the right approval.
589+
590+
Raises ``ApprovalTablesUnavailable`` if either env var is unset;
591+
``ApprovalWriteError`` on ``TransactionCanceledException``; other
592+
DDB-layer exceptions propagate so the hook's outer try/except can
593+
fail-closed with a specific reason.
594+
"""
595+
task_table, approvals_table = _require_tables()
596+
ddb = _get_ddb_client(client=client)
597+
598+
approval_item = {k: _py_to_ddb_attr(v) for k, v in approval_row.items()}
599+
# Belt-and-braces: ensure the keys we rely on downstream are present.
600+
approval_item.setdefault("task_id", {"S": task_id})
601+
approval_item.setdefault("request_id", {"S": request_id})
602+
approval_item.setdefault("status", {"S": "PENDING"})
603+
604+
try:
605+
ddb.transact_write_items(
606+
TransactItems=[
607+
{
608+
"Put": {
609+
"TableName": approvals_table,
610+
"Item": approval_item,
611+
"ConditionExpression": "attribute_not_exists(request_id)",
612+
}
613+
},
614+
{
615+
"Update": {
616+
"TableName": task_table,
617+
"Key": {"task_id": {"S": task_id}},
618+
"UpdateExpression": (
619+
"SET #s = :awaiting, awaiting_approval_request_id = :rid"
620+
),
621+
"ConditionExpression": "#s = :running",
622+
"ExpressionAttributeNames": {"#s": "status"},
623+
"ExpressionAttributeValues": {
624+
":awaiting": {"S": _STATUS_AWAITING_APPROVAL},
625+
":running": {"S": _STATUS_RUNNING},
626+
":rid": {"S": request_id},
627+
},
628+
}
629+
},
630+
]
631+
)
632+
except Exception as exc:
633+
# TransactionCanceledException carries per-item reasons. Keep the
634+
# detection structural (duck-typed on ``response``) so we do not
635+
# need botocore at import time.
636+
reasons = _extract_cancellation_reasons(exc)
637+
code = _extract_error_code(exc)
638+
if code == "TransactionCanceledException":
639+
raise ApprovalWriteError(
640+
f"approval write cancelled: reasons={reasons}",
641+
cancellation_reasons=reasons,
642+
) from exc
643+
# Otherwise propagate so outer handler classifies it fail-closed.
644+
raise
645+
646+
647+
def transact_resume_from_approval(
648+
task_id: str,
649+
request_id: str,
650+
*,
651+
client=None,
652+
) -> None:
653+
"""Atomically resume RUNNING from AWAITING_APPROVAL for ``request_id``.
654+
655+
The condition ``status = AWAITING_APPROVAL AND
656+
awaiting_approval_request_id = :rid`` prevents:
657+
- resuming a task that's been cancelled mid-approval (§13.9);
658+
- resuming with a stale request_id after a race with the
659+
reconciler / a concurrent approval.
660+
661+
Raises ``ApprovalResumeError`` on ``TransactionCanceledException`` so
662+
the hook can emit ``approval_resume_failed`` + DENY.
663+
"""
664+
task_table, _ = _require_tables()
665+
ddb = _get_ddb_client(client=client)
666+
667+
try:
668+
ddb.transact_write_items(
669+
TransactItems=[
670+
{
671+
"Update": {
672+
"TableName": task_table,
673+
"Key": {"task_id": {"S": task_id}},
674+
"UpdateExpression": (
675+
"SET #s = :running REMOVE awaiting_approval_request_id"
676+
),
677+
"ConditionExpression": (
678+
"#s = :awaiting AND awaiting_approval_request_id = :rid"
679+
),
680+
"ExpressionAttributeNames": {"#s": "status"},
681+
"ExpressionAttributeValues": {
682+
":running": {"S": _STATUS_RUNNING},
683+
":awaiting": {"S": _STATUS_AWAITING_APPROVAL},
684+
":rid": {"S": request_id},
685+
},
686+
}
687+
}
688+
]
689+
)
690+
except Exception as exc:
691+
reasons = _extract_cancellation_reasons(exc)
692+
code = _extract_error_code(exc)
693+
if code == "TransactionCanceledException":
694+
raise ApprovalResumeError(
695+
f"approval resume cancelled: reasons={reasons}",
696+
cancellation_reasons=reasons,
697+
) from exc
698+
raise
699+
700+
701+
def best_effort_update_approval_status(
702+
task_id: str,
703+
request_id: str,
704+
new_status: str,
705+
*,
706+
reason: str | None = None,
707+
client=None,
708+
) -> bool:
709+
"""Conditionally flip ``status`` on an approval row.
710+
711+
The condition ``status = :pending`` is the design-doc guard from §6.5.
712+
Used on the TIMED_OUT write path: if the row has already transitioned
713+
to APPROVED or DENIED, the update fails and the caller (the hook) must
714+
re-read the row with ConsistentRead (IMPL-24).
715+
716+
Returns ``True`` on successful write, ``False`` on
717+
``ConditionalCheckFailedException``. All other errors propagate.
718+
"""
719+
_, approvals_table = _require_tables()
720+
ddb = _get_ddb_client(client=client)
721+
722+
update_expr_parts = ["#s = :new"]
723+
expr_values = {
724+
":new": {"S": new_status},
725+
":pending": {"S": "PENDING"},
726+
}
727+
if reason is not None:
728+
update_expr_parts.append("deny_reason = :reason")
729+
expr_values[":reason"] = {"S": reason}
730+
731+
try:
732+
ddb.update_item(
733+
TableName=approvals_table,
734+
Key={"task_id": {"S": task_id}, "request_id": {"S": request_id}},
735+
UpdateExpression="SET " + ", ".join(update_expr_parts),
736+
ConditionExpression="#s = :pending",
737+
ExpressionAttributeNames={"#s": "status"},
738+
ExpressionAttributeValues=expr_values,
739+
)
740+
return True
741+
except Exception as exc:
742+
code = _extract_error_code(exc)
743+
if code == "ConditionalCheckFailedException":
744+
return False
745+
raise
746+
747+
748+
def get_approval_row(
749+
task_id: str,
750+
request_id: str,
751+
*,
752+
consistent_read: bool = True,
753+
client=None,
754+
) -> dict | None:
755+
"""Fetch an approval row. Defaults to strongly-consistent read (IMPL-24).
756+
757+
Returns a Python dict with unmarshalled attribute values, or ``None`` if
758+
the row does not exist (TTL reaped, wrong IDs, etc.). Callers use the
759+
``None`` return to detect the row-gone branch in §13.12.
760+
"""
761+
_, approvals_table = _require_tables()
762+
ddb = _get_ddb_client(client=client)
763+
resp = ddb.get_item(
764+
TableName=approvals_table,
765+
Key={"task_id": {"S": task_id}, "request_id": {"S": request_id}},
766+
ConsistentRead=consistent_read,
767+
)
768+
item = resp.get("Item")
769+
if item is None:
770+
return None
771+
return {k: _ddb_attr_to_py(v) for k, v in item.items()}
772+
773+
774+
# ---- Exception-introspection helpers ------------------------------------
775+
776+
777+
def _extract_error_code(exc: BaseException) -> str | None:
778+
"""Pull the AWS error code off a ``ClientError``-shaped exception.
779+
780+
Duck-typed so tests (and environments without botocore at import time)
781+
stay decoupled from the concrete exception type.
782+
"""
783+
response = getattr(exc, "response", None)
784+
if not isinstance(response, dict):
785+
return None
786+
error_block = response.get("Error") or {}
787+
if not isinstance(error_block, dict):
788+
return None
789+
code = error_block.get("Code")
790+
return code if isinstance(code, str) else None
791+
792+
793+
def _extract_cancellation_reasons(exc: BaseException) -> list:
794+
"""Pull CancellationReasons (best-effort) off a TransactionCanceledException."""
795+
response = getattr(exc, "response", None)
796+
if not isinstance(response, dict):
797+
return []
798+
reasons = response.get("CancellationReasons")
799+
if isinstance(reasons, list):
800+
return reasons
801+
return []

0 commit comments

Comments
 (0)