Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Handler entry tests: `cdk/test/handlers/orchestrate-task.test.ts`, `create-task.
- **`prek install`** fails if Git **`core.hooksPath`** is set — another hook manager owns hooks; see [CONTRIBUTING.md](./CONTRIBUTING.md).
- **Editing on `main` directly** — ALWAYS create a worktree with a feature branch for changes, even trivial ones. Main should stay clean; all work flows through worktree → branch → PR → merge.
- **Git worktrees** — Always **`git fetch origin main`** before creating a new worktree to ensure you branch from the latest remote state. `node_modules/` and `agent/.venv/` are per-tree (not shared). Run **`mise run install`** in each new worktree before building. All CDK path references (`__dirname`-relative) and mise `config_roots` resolve correctly without extra setup.
- **Instantiating AWS SDK clients without the ABCA solution User-Agent (#319)** — every outbound AWS API call must carry the solution-tracking UA segments. Never write naked `boto3.client(...)` / `new XClient({})`: in `agent/src/` use `aws_session.tenant_client`/`tenant_resource` (tenant data) or `aws_session.platform_client` (ambient-chain calls); in `cdk/src/handlers/` pass `abcaUserAgent()` and wrap with `withAbcaTrace()` from `shared/ua.ts`; in `cli/src/` same via `cli/src/ua.ts`. The three `ua` modules (`agent/src/ua.py`, `cdk/src/handlers/shared/ua.ts`, `cli/src/ua.ts`) must stay identical in solution id, wire format, and sanitization.
- **Bumping Cedar engines in isolation** — `cedarpy` (Python, `agent/pyproject.toml`) and `@cedar-policy/cedar-wasm` (TypeScript, `cdk/package.json`) are two language bindings over the same Cedar Rust core. They MUST move together; even patch-version drift between bindings can yield divergent `(decision, matching_rule_ids)` on the same `(policy, input)` — invisible to per-side unit tests, caught (only) by `contracts/cedar-parity/` golden fixtures in CI. If you bump one engine you MUST bump the other to a tested-compatible version AND refresh the parity fixtures in the same commit. Both pins are EXACT (no `^`/`~`). See `docs/design/CEDAR_HITL_GATES.md` §15.6 (decision #23) and the parity-contract banner in `mise.toml`. **DO NOT** accept upstream's "Update branch" or auto-merge suggestions on cedarpy without verifying parity with cedar-wasm.

### Tech stack
Expand Down
83 changes: 73 additions & 10 deletions agent/src/aws_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,23 @@ def configure_session(user_id: str, repo: str, task_id: str) -> None:
for key, value in (("user_id", user_id), ("repo", repo), ("task_id", task_id))
if value
}
# The task id doubles as the UA trace handle (#319): every AWS call made
# while this task runs carries md/...#agent#{task_id}.
import ua

ua.set_trace(task_id or None)


def reset_session_cache() -> None:
"""Drop the cached session and tags. For tests that toggle config."""
"""Drop the cached session, tags, and UA trace. For tests that toggle config."""
global _session, _scoped, _tags
with _lock:
_session = None
_scoped = None
_tags = {}
import ua

ua.set_trace(None)


def _session_tags() -> list[dict[str, str]]:
Expand All @@ -128,6 +136,8 @@ def _build_scoped_session(role_arn: str) -> Any:
)
from botocore.session import get_session as get_botocore_session

import ua

region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
task_id = _tags.get("task_id", "")
# Role session name must be <=64 chars and match [\w+=,.@-]. task_id is a
Expand All @@ -139,7 +149,8 @@ def _build_scoped_session(role_arn: str) -> Any:
# A dedicated STS client built from the *ambient* (compute-role) chain.
# This is the role-chaining caller; the assumed SessionRole credentials it
# returns must NOT be used to build it, or refresh would recurse.
sts_client = boto3.client("sts", region_name=region)
sts_client = boto3.client("sts", region_name=region, config=ua.client_config())
ua.register_trace_appender(sts_client.meta.events)

def _refresh() -> dict[str, str]:
resp = sts_client.assume_role(
Expand Down Expand Up @@ -167,6 +178,12 @@ def _refresh() -> dict[str, str]:
)
if region:
botocore_session.set_config_variable("region", region)
# Outbound UA solution tracking (#319): session-level so every client and
# resource derived from this singleton carries the static segments; the
# per-request #{TRACE} appender mutates only the header, preserving the
# session's connection pool across trace changes.
botocore_session.user_agent_extra = ua.static_user_agent_extra()
ua.register_trace_appender(botocore_session.get_component("event_emitter"))
return boto3.Session(botocore_session=botocore_session)


Expand Down Expand Up @@ -209,10 +226,19 @@ def get_session() -> Any:
) from exc
else:
# Scoping not requested (local/dev/tests, or pre-provisioning):
# plain ambient session, behaviorally identical to pre-feature code.
_session = boto3.Session(
region_name=os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
)
# plain ambient session, behaviorally identical to pre-feature code
# apart from the UA solution-tracking segments (#319).
from botocore.session import get_session as get_botocore_session

import ua

botocore_session = get_botocore_session()
region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
if region:
botocore_session.set_config_variable("region", region)
botocore_session.user_agent_extra = ua.static_user_agent_extra()
ua.register_trace_appender(botocore_session.get_component("event_emitter"))
_session = boto3.Session(botocore_session=botocore_session)
_scoped = False
return _session

Expand All @@ -235,9 +261,7 @@ def tenant_client(service_name: str, **kwargs: Any) -> Any:
session = get_session()
if is_scoped():
return session.client(service_name, **kwargs)
import boto3

return boto3.client(service_name, **kwargs)
return platform_client(service_name, **kwargs)


def tenant_resource(service_name: str, **kwargs: Any) -> Any:
Expand All @@ -247,4 +271,43 @@ def tenant_resource(service_name: str, **kwargs: Any) -> Any:
return session.resource(service_name, **kwargs)
import boto3

return boto3.resource(service_name, **kwargs)
resource = boto3.resource(service_name, **_with_ua(kwargs))
# Guarded like platform_client: test doubles may lack the meta chain.
inner = getattr(getattr(resource, "meta", None), "client", None)
events = getattr(getattr(inner, "meta", None), "events", None)
if events is not None:
import ua

ua.register_trace_appender(events)
return resource


def platform_client(service_name: str, **kwargs: Any) -> Any:
"""boto3 client for platform (non-tenant) calls, with the ABCA UA (#319).

For call sites that intentionally use the ambient compute-role chain
(CloudWatch Logs debug writers, Secrets Manager, AgentCore memory) rather
than the tenant-scoped session. Same signature as ``boto3.client`` plus
the solution-tracking User-Agent and per-request trace appender.
"""
import boto3

client = boto3.client(service_name, **_with_ua(kwargs))
# Real clients always expose meta.events; test doubles (MagicMock, or the
# bare fakes some suites install as a stub boto3 module) may not — the
# appender is solution telemetry, never worth failing a call site over.
events = getattr(getattr(client, "meta", None), "events", None)
if events is not None:
import ua

ua.register_trace_appender(events)
return client


def _with_ua(kwargs: dict[str, Any]) -> dict[str, Any]:
"""Merge the ABCA UA config into a boto3 client/resource kwargs dict."""
import ua

supplied = kwargs.get("config")
config = supplied.merge(ua.client_config()) if supplied is not None else ua.client_config()
return {**kwargs, "config": config}
10 changes: 6 additions & 4 deletions agent/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ def resolve_github_token() -> str:
return cached
secret_arn = os.environ.get("GITHUB_TOKEN_SECRET_ARN")
if secret_arn:
import boto3
from aws_session import platform_client

region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
client = boto3.client("secretsmanager", region_name=region)
client = platform_client("secretsmanager", region_name=region)
resp = client.get_secret_value(SecretId=secret_arn)
token = resp["SecretString"]
# Cache in env so downstream tools (git, gh CLI) work unchanged
Expand Down Expand Up @@ -101,14 +101,16 @@ def resolve_linear_api_token(channel_metadata: dict[str, str] | None = None) ->
import json
from datetime import datetime, timedelta

import boto3
import boto3 # noqa: F401 — availability probe; client built via platform_client

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the import boto3 still here once and ignored via "noqa" to ensure it is available? Why not let it fail at the platform_client?

from botocore.exceptions import BotoCoreError, ClientError
except ImportError as e:
log("WARN", f"resolve_linear_api_token: boto3 unavailable ({e}); skipping")
# nosemgrep: py-silent-success-masking -- optional Linear MCP; boto3 unavailable
return ""

sm = boto3.client("secretsmanager", region_name=region)
from aws_session import platform_client

sm = platform_client("secretsmanager", region_name=region)

def _fetch_token() -> dict | None:
"""Fetch + parse the per-workspace OAuth secret.
Expand Down
4 changes: 2 additions & 2 deletions agent/src/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ def _get_client():
global _client
if _client is not None:
return _client
import boto3
from aws_session import platform_client

region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
if not region:
raise ValueError("AWS_REGION or AWS_DEFAULT_REGION must be set for memory operations")
_client = boto3.client("bedrock-agentcore", region_name=region)
_client = platform_client("bedrock-agentcore", region_name=region)
return _client


Expand Down
8 changes: 4 additions & 4 deletions agent/src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,10 @@ def _warn_cw_write_blocking(log_group: str, task_id: str | None, stamped: str) -
covers both writers.
"""
try:
import boto3
from aws_session import platform_client

region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
client = boto3.client("logs", region_name=region)
client = platform_client("logs", region_name=region)

stream = f"server_warn/{task_id or 'server'}"
with _ctx_for_debug.suppress(client.exceptions.ResourceAlreadyExistsException):
Expand All @@ -193,10 +193,10 @@ def _warn_cw_write_blocking(log_group: str, task_id: str | None, stamped: str) -
def _debug_cw_write_blocking(log_group: str, task_id: str | None, stamped: str) -> None:
"""Blocking CloudWatch write — only called from a background thread."""
try:
import boto3
from aws_session import platform_client

region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
client = boto3.client("logs", region_name=region)
client = platform_client("logs", region_name=region)

stream = f"server_debug/{task_id or 'server'}"
with _ctx_for_debug.suppress(client.exceptions.ResourceAlreadyExistsException):
Expand Down
4 changes: 2 additions & 2 deletions agent/src/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ def _log_error_cw_blocking(log_group: str, task_id: str | None, stamped: str) ->
fire on the absence of the expected stream, not on this helper).
"""
try:
import boto3
from aws_session import platform_client

region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
client = boto3.client("logs", region_name=region)
client = platform_client("logs", region_name=region)
stream = f"agent_error/{task_id or 'unknown'}"
with contextlib.suppress(client.exceptions.ResourceAlreadyExistsException):
client.create_log_stream(logGroupName=log_group, logStreamName=stream)
Expand Down
8 changes: 4 additions & 4 deletions agent/src/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ def _emit_metrics_to_cloudwatch(json_payload: dict) -> None:
try:
import contextlib

import boto3
from aws_session import platform_client

region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
client = boto3.client("logs", region_name=region)
client = platform_client("logs", region_name=region)

task_id = json_payload.get("task_id", "unknown")
log_stream = f"metrics/{task_id}"
Expand Down Expand Up @@ -164,10 +164,10 @@ def _ensure_client(self):

import contextlib

import boto3
from aws_session import platform_client

region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
self._client = boto3.client("logs", region_name=region)
self._client = platform_client("logs", region_name=region)

log_stream = f"trajectory/{self._task_id}"
with contextlib.suppress(self._client.exceptions.ResourceAlreadyExistsException):
Expand Down
Loading
Loading