-
Notifications
You must be signed in to change notification settings - Fork 28
Expand file tree
/
Copy pathshell.py
More file actions
190 lines (165 loc) · 7.21 KB
/
Copy pathshell.py
File metadata and controls
190 lines (165 loc) · 7.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
"""Shell utilities: logging, command execution, and text helpers."""
import contextlib
import os
import re
import subprocess
import threading
import time
def log(prefix: str, text: str):
"""Print a timestamped, redacted log line.
Emits via ``os.write(1, ...)`` rather than ``print`` for parity with
``server._emit_stdout_line``: content is always routed through
``redact_secrets`` first, and the fd-level sink keeps CodeQL's
cleartext-logging query (which models print/TextIOWrapper.write)
from flagging the already-sanitized line. Tests observing this
output must use ``capfd``, not ``capsys``.
"""
ts = time.strftime("%H:%M:%S")
line = f"[{ts}] {prefix} {redact_secrets(text)}\n".encode("utf-8", errors="replace")
try:
while line:
n = os.write(1, line)
line = line[n:]
except OSError:
pass
def log_error_cw(message: str, *, task_id: str | None = None) -> None:
"""Emit an ERROR line to stdout AND the APPLICATION_LOGS CloudWatch group.
Chunk 10 observability gap: ``log("ERROR", ...)`` writes to container
stdout, which AgentCore routes to
``/aws/bedrock-agentcore/runtimes/<runtime>-DEFAULT`` rather than
the APPLICATION_LOGS group that ``TaskDashboard`` LogQueryWidgets
and ``bgagent status`` read. Agent-fatal errors were therefore
invisible in the two places operators normally look — discovered
during E2E 2026-05-11 T2.2 when a ``missing built-in hard-deny
policies`` crash surfaced only as a cryptic "unknown" on the CLI.
This helper mirrors the ERROR line to APPLICATION_LOGS via a
fire-and-forget daemon thread (so it cannot block the failing
code path) using the same writer pattern as ``server.py::_warn_cw``.
Delivery failures are swallowed silently — the stdout ``log`` call
above still runs, and a caller that wants strict delivery should
use ``server._warn_cw`` directly from the server-only code paths.
"""
# Always log to stdout for local / docker-compose parity with the
# normal ``log()`` path.
log("ERROR", message)
log_group = os.environ.get("LOG_GROUP_NAME")
if not log_group:
return
stamped = f"[agent/error] {redact_secrets(message)}"
_t = threading.Thread(
target=_log_error_cw_blocking,
args=(log_group, task_id, stamped),
name="agent-error-cw-write",
daemon=True,
)
_t.start()
def _log_error_cw_blocking(log_group: str, task_id: str | None, stamped: str) -> None:
"""Blocking CloudWatch put for ``log_error_cw`` — daemon-thread only.
Mirrors ``server.py::_warn_cw_write_blocking`` but targets a
separate ``agent_error/<task_id>`` stream so operators can alarm
on agent-runtime fatal errors distinctly from server-layer
warnings. Failures are swallowed (any surfaceable alarm should
fire on the absence of the expected stream, not on this helper).
"""
try:
from aws_session import platform_client
region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION")
client = platform_client("logs", region_name=region)
stream = f"agent_error/{task_id or 'unknown'}"
with contextlib.suppress(client.exceptions.ResourceAlreadyExistsException):
client.create_log_stream(logGroupName=log_group, logStreamName=stream)
client.put_log_events(
logGroupName=log_group,
logStreamName=stream,
logEvents=[{"timestamp": int(time.time() * 1000), "message": stamped}],
)
except Exception: # noqa: S110 - best-effort telemetry; stdout path already logged
# Intentionally silent. The caller (``log_error_cw``) has
# already written the same message to stdout via the regular
# ``log("ERROR", ...)`` path, so a CloudWatch delivery failure
# (IAM, network, quota) does not lose the signal — it only
# degrades it to the runtime-DEFAULT log group. Raising here
# would unwind the daemon thread mid-shutdown and emit a
# confusing secondary traceback during a primary failure.
pass
def truncate(text: str, max_len: int = 200) -> str:
"""Truncate text for log display."""
if not text:
return ""
text = text.replace("\n", " ").strip()
if len(text) > max_len:
return text[:max_len] + "..."
return text
def slugify(text: str, max_len: int = 40) -> str:
"""Convert text to a URL-safe slug for branch names."""
text = text.lower().strip()
text = re.sub(r"[^a-z0-9\s-]", "", text)
text = re.sub(r"[\s-]+", "-", text)
text = text.strip("-")
if len(text) > max_len:
text = text[:max_len].rstrip("-")
return text or "task"
def redact_secrets(text: str) -> str:
"""Redact tokens and secrets from log output."""
# GitHub and generic token-like values.
text = re.sub(r"(ghp_|github_pat_|gho_|ghs_|ghr_)[A-Za-z0-9_]+", r"\1***", text)
text = re.sub(r"(x-access-token:)[^\s@]+", r"\1***", text)
text = re.sub(r"(authorization:\s*(?:bearer|token)\s+)[^\s]+", r"\1***", text, flags=re.I)
text = re.sub(
r"([?&](?:token|access_token|api_key|apikey|password)=)[^&\s]+",
r"\1***",
text,
flags=re.I,
)
text = re.sub(r"(gh[opusr]_[A-Za-z0-9_]+)", "***", text)
return text
def _clean_env() -> dict[str, str]:
"""Return a copy of os.environ with OTEL auto-instrumentation vars removed.
The ``opentelemetry-instrument`` wrapper injects PYTHONPATH and OTEL_*
env vars that would cause child Python processes (e.g. mise run build →
semgrep in the target repo) to attempt OTEL auto-instrumentation and fail
because the target repo's Python environment doesn't have the OTEL
packages installed. Stripping these vars isolates target-repo commands
from the agent's own instrumentation.
"""
env = {k: v for k, v in os.environ.items() if not k.startswith("OTEL_")}
# Strip only OTEL-injected PYTHONPATH components (the sitecustomize.py
# directory), preserving any entries the target repo's toolchain may need.
pythonpath = env.get("PYTHONPATH", "")
if pythonpath:
cleaned = os.pathsep.join(
p for p in pythonpath.split(os.pathsep) if "opentelemetry" not in p
)
if cleaned:
env["PYTHONPATH"] = cleaned
else:
env.pop("PYTHONPATH", None)
return env
def run_cmd(
cmd: list[str],
label: str,
cwd: str | None = None,
timeout: int = 600,
check: bool = True,
) -> subprocess.CompletedProcess:
"""Run a command with logging."""
log("CMD", redact_secrets(f"{label}: {' '.join(cmd)}"))
result = subprocess.run(
cmd,
cwd=cwd,
capture_output=True,
text=True,
timeout=timeout,
env=_clean_env(),
)
if result.returncode != 0:
log("CMD", f"{label}: FAILED (exit {result.returncode})")
if result.stderr:
for line in result.stderr.strip().splitlines()[:20]:
log("CMD", f" {line}")
if check:
stderr_snippet = redact_secrets(result.stderr.strip()[:500]) if result.stderr else ""
raise RuntimeError(f"{label} failed (exit {result.returncode}): {stderr_snippet}")
else:
log("CMD", f"{label}: OK")
return result