Skip to content

Commit 352eaaa

Browse files
fix(streams): cut SSE error-log volume and add Redis pool headroom (#340)
## What Short-term mitigations for unbounded log volume produced by the task-event SSE endpoint when concurrent streams exceed the Redis connection pool size. ### Background The SSE streaming path (`StreamsUseCase.stream_task_events`) holds one **blocking `XREAD` connection per connected client**. On a high enough number of concurrent streams the pool is exhausted, so every client's read fails on each cycle. Two things turned that into a log firehose: 1. **The ~20× multiplier was a log-format problem, not a code problem.** The error handler already used `exc_info=True` (it always has). Under a **plain-text** formatter, `exc_info=True` renders the traceback as ~20 physical lines, and the cluster log collector ingests one entry per line — so every error became ~20 entries. "Switch to exc_info=True" would have been a no-op; the real cause was the formatter. 2. The handler retried with a flat `sleep(1)`, so failures repeated ~once/sec per client. ## Changes **`utils/logging.py` — structured JSON logs by default (this removes the 20×)** - Default to the (already-existing) `CustomJSONFormatter` whenever `ENVIRONMENT != "development"`. With JSON, the same always-on `exc_info=True` traceback is captured as a **single** log entry — the newlines live inside the quoted `exc_info` field — instead of one entry per line. This is what eliminates the ~20× multiplier; `exc_info` itself is unchanged. - The JSON formatter already existed but was gated behind Datadog configuration, so non-Datadog clusters (e.g. plain-text Azure) silently fell back to per-line tracebacks. Local development still uses plain text for readable console output. - Verified: a full traceback emits **1 physical line** under JSON vs ~20 under plain text; env matrix confirmed (development → plain text, production/unset → JSON). **`streams_use_case.py` — throttle the error loop** - Replace the flat 1s retry with **capped exponential backoff** (1→2→4→8→16→30s), reset on a healthy read, so a tight per-client loop can't hammer Redis or flood logs. - Full tracebacks are still logged on **every** failure (nothing swallowed), with a failure counter for context. Volume is bounded by the backoff and by the single-entry JSON logging above. **`environment_variables.py` — pool headroom** - Bump in-code `REDIS_MAX_CONNECTIONS` default 50 → 200. ## What this is NOT A mitigation, not a root-cause fix. Connections still scale 1:1 with clients, so a large enough concurrent-stream count will still exhaust the pool — these changes keep that from becoming a log flood and raise the threshold. The durable fix (tracked separately) is a **shared per-pod reader that fans out to in-process queues**, so connection count becomes O(distinct streams) instead of O(clients). OTel duplicate-handler de-dup is also a separate follow-up. ## Testing - `ruff check` on all changed files — passes. - Verified JSON formatter emits a full traceback as 1 physical line; env matrix confirmed. - Backoff sequence verified (1, 2, 4, 8, 16, 30, 30… cap). - No unit test covers the streaming path; the integration test requires testcontainers. The streaming integration tests (`00-sync-020-streaming`, `10-async-00-base-020-streaming`) pass in CI. 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- greptile_comment --> <h3>Greptile Summary</h3> This PR reduces SSE error-log volume and raises Redis pool headroom. The main changes are: - Production and unset environments now use JSON logging by default. - JSON log fields are capped to limit oversized structured entries. - SSE stream read failures now use capped exponential backoff. - The default Redis connection pool size is raised to 200. <details><summary><h3>Confidence Score: 3/5</h3></summary> The logging mitigation is useful, but request payload fields still need attention before merging because structured JSON output can retain small request data. The changed files are narrow and the Redis/backoff behavior is straightforward, while the logging change leaves an important data-exposure path active for non-development environments. agentex/src/utils/logging.py </details> <details><summary><h3><a href="https://www.greptile.com/trex"><img alt="T-Rex" src="https://greptile-static-assets.s3.amazonaws.com/trex/trex_green.svg" height="20" align="absmiddle"></a> T-Rex Logs</h3></summary> **What T-Rex did** - The focused runtime harness was executed to reproduce logging behavior using the real log\_request path and CustomJSONFormatter with ENVIRONMENT unset for JSON logging. - Tests showed plain-text logging emitted multi-line output for unset/production/dev, and after enabling CustomJSONFormatter for unset and production, head logs emitted JSON lines, while development head stayed plain text. - The SSE error backoff and logging were exercised: the before artifact showed flat sleeps and eight SSE error events with exc\_info on every error, and the after artifact showed capped exponential sleeps after a healthy read, with the eighth error indicating a reset and all logger calls still using exc\_info. - Defaults and env-loading behavior were compared: before, base class/direct unset default was 50, env-loading unset default 100, and refresh override 123; after, head class/direct unset default and refresh/env-loading unset default both became 200, with the refresh override still 123, and direct construction with an env var set continues to return the class default since os.environ is only read by the env-loading path via refresh. <a href="https://app.greptile.com/trex/runs/12306315/artifacts"><picture><source media="(prefers-color-scheme: dark)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/ViewAllArtifactsDark.svg?v=1"><source media="(prefers-color-scheme: light)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/ViewAllArtifacts.svg?v=1"><img alt="View all artifacts" src="https://greptile-static-assets.s3.amazonaws.com/badges/ViewAllArtifacts.svg?v=1" height="32"></picture></a> <sub><a href="https://www.greptile.com/trex"><img alt="T-Rex" src="https://greptile-static-assets.s3.amazonaws.com/trex/trex_green.svg" height="14" align="absmiddle"></a> Ran code and verified through T-Rex</sub> </details> <!-- greptile_failed_comments --> <details open><summary><h3>Comments Outside Diff (2)</h3></summary> 1. `agentex/src/config/environment_variables.py`, line 172-174 ([link](https://github.com/scaleapi/scale-agentex/blob/45b91ff88a67b80901f42ef51916f27dc20cfb18/agentex/src/config/environment_variables.py#L172-L174)) <a href="#"><img alt="P1" src="https://greptile-static-assets.s3.amazonaws.com/badges/p1.svg?v=9" align="top"></a> **Default is bypassed** The new `REDIS_MAX_CONNECTIONS = 200` class default is not used by the normal app startup path. `GlobalDependencies` calls `EnvironmentVariables.refresh()`, and this constructor argument still falls back to `"100"` when the env var is absent. A deployment without an explicit `REDIS_MAX_CONNECTIONS` still initializes the Redis pool with 100 connections, so the intended pool-headroom mitigation does not apply. <details><summary><strong>Artifacts</strong></summary><br /> **[Repro: focused EnvironmentVariables.refresh harness](https://app.greptile.com/trex/artifacts/4a0a9e95-7656-4a33-af65-cdd344baf65a)** - Contains supporting evidence from the run (text/x-python; charset=utf-8). **[Repro: refresh output showing effective Redis max connections is 100](https://app.greptile.com/trex/artifacts/1c073cf8-bd30-4be7-9732-0cf0fe219e26)** - Keeps the command output available without making the summary code-heavy. <a href="https://app.greptile.com/trex/runs/12300895/artifacts?artifact=4a0a9e95-7656-4a33-af65-cdd344baf65a"><picture><source media="(prefers-color-scheme: dark)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/ViewArtifactsDark.svg?v=1"><source media="(prefers-color-scheme: light)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/ViewArtifacts.svg?v=1"><img alt="View artifacts" src="https://greptile-static-assets.s3.amazonaws.com/badges/ViewArtifacts.svg?v=1" height="32"></picture></a> </details> <sub><a href="https://www.greptile.com/trex"><img alt="T-Rex" src="https://greptile-static-assets.s3.amazonaws.com/trex/trex_green.svg" height="14" align="absmiddle"></a> Ran code and verified through T-Rex</sub> <details><summary>Prompt To Fix With AI</summary> `````markdown This is a comment left during a code review. Path: agentex/src/config/environment_variables.py Line: 172-174 Comment: **Default is bypassed** The new `REDIS_MAX_CONNECTIONS = 200` class default is not used by the normal app startup path. `GlobalDependencies` calls `EnvironmentVariables.refresh()`, and this constructor argument still falls back to `"100"` when the env var is absent. A deployment without an explicit `REDIS_MAX_CONNECTIONS` still initializes the Redis pool with 100 connections, so the intended pool-headroom mitigation does not apply. How can I resolve this? If you propose a fix, please make it concise. ````` </details> <a href="https://app.greptile.com/api/ide/cursor?prompt=This%20is%20a%20comment%20left%20during%20a%20code%20review.%0APath%3A%20agentex%2Fsrc%2Fconfig%2Fenvironment_variables.py%0ALine%3A%20172-174%0A%0AComment%3A%0A**Default%20is%20bypassed**%0A%0AThe%20new%20%60REDIS_MAX_CONNECTIONS%20%3D%20200%60%20class%20default%20is%20not%20used%20by%20the%20normal%20app%20startup%20path.%20%60GlobalDependencies%60%20calls%20%60EnvironmentVariables.refresh%28%29%60%2C%20and%20this%20constructor%20argument%20still%20falls%20back%20to%20%60%22100%22%60%20when%20the%20env%20var%20is%20absent.%20A%20deployment%20without%20an%20explicit%20%60REDIS_MAX_CONNECTIONS%60%20still%20initializes%20the%20Redis%20pool%20with%20100%20connections%2C%20so%20the%20intended%20pool-headroom%20mitigation%20does%20not%20apply.%0A%0AHow%20can%20I%20resolve%20this%3F%20If%20you%20propose%20a%20fix%2C%20please%20make%20it%20concise.&pr=340&platform=github"><picture><source media="(prefers-color-scheme: dark)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixInCursorDark.svg?v=3"><source media="(prefers-color-scheme: light)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixInCursor.svg?v=3"><img alt="Fix in Cursor" src="https://greptile-static-assets.s3.amazonaws.com/badges/FixInCursor.svg?v=3" height="20"></picture></a> <a href="https://app.greptile.com/ide/claude-code?prompt=This%20is%20a%20comment%20left%20during%20a%20code%20review.%0APath%3A%20agentex%2Fsrc%2Fconfig%2Fenvironment_variables.py%0ALine%3A%20172-174%0A%0AComment%3A%0A**Default%20is%20bypassed**%0A%0AThe%20new%20%60REDIS_MAX_CONNECTIONS%20%3D%20200%60%20class%20default%20is%20not%20used%20by%20the%20normal%20app%20startup%20path.%20%60GlobalDependencies%60%20calls%20%60EnvironmentVariables.refresh%28%29%60%2C%20and%20this%20constructor%20argument%20still%20falls%20back%20to%20%60%22100%22%60%20when%20the%20env%20var%20is%20absent.%20A%20deployment%20without%20an%20explicit%20%60REDIS_MAX_CONNECTIONS%60%20still%20initializes%20the%20Redis%20pool%20with%20100%20connections%2C%20so%20the%20intended%20pool-headroom%20mitigation%20does%20not%20apply.%0A%0AHow%20can%20I%20resolve%20this%3F%20If%20you%20propose%20a%20fix%2C%20please%20make%20it%20concise.&repo=scaleapi%2Fscale-agentex&pr=340&platform=github"><picture><source media="(prefers-color-scheme: dark)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixInClaudeDark.svg?v=3"><source media="(prefers-color-scheme: light)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixInClaude.svg?v=3"><img alt="Fix in Claude Code" src="https://greptile-static-assets.s3.amazonaws.com/badges/FixInClaude.svg?v=3" height="20"></picture></a> <a href="https://app.greptile.com/api/ide/codex?prompt=IMPORTANT%3A%20Work%20in%20the%20repository%20%22scaleapi%2Fscale-agentex%22%20on%20the%20existing%20branch%20%22dm%2Fsse-redis-pool-headroom-log-backoff%22.%20Checkout%20that%20branch%20%E2%80%94%20do%20NOT%20create%20a%20new%20branch%20or%20open%20a%20new%20PR.%20Push%20your%20changes%20to%20%22dm%2Fsse-redis-pool-headroom-log-backoff%22.%0A%0AThis%20is%20a%20comment%20left%20during%20a%20code%20review.%0APath%3A%20agentex%2Fsrc%2Fconfig%2Fenvironment_variables.py%0ALine%3A%20172-174%0A%0AComment%3A%0A**Default%20is%20bypassed**%0A%0AThe%20new%20%60REDIS_MAX_CONNECTIONS%20%3D%20200%60%20class%20default%20is%20not%20used%20by%20the%20normal%20app%20startup%20path.%20%60GlobalDependencies%60%20calls%20%60EnvironmentVariables.refresh%28%29%60%2C%20and%20this%20constructor%20argument%20still%20falls%20back%20to%20%60%22100%22%60%20when%20the%20env%20var%20is%20absent.%20A%20deployment%20without%20an%20explicit%20%60REDIS_MAX_CONNECTIONS%60%20still%20initializes%20the%20Redis%20pool%20with%20100%20connections%2C%20so%20the%20intended%20pool-headroom%20mitigation%20does%20not%20apply.%0A%0AHow%20can%20I%20resolve%20this%3F%20If%20you%20propose%20a%20fix%2C%20please%20make%20it%20concise.&repo=scaleapi%2Fscale-agentex&pr=340&platform=github"><picture><source media="(prefers-color-scheme: dark)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixInCodexDark.svg?v=3"><source media="(prefers-color-scheme: light)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixInCodex.svg?v=3"><img alt="Fix in Codex" src="https://greptile-static-assets.s3.amazonaws.com/badges/FixInCodex.svg?v=3" height="20"></picture></a> 2. General comment <a href="#"><img alt="P1" src="https://greptile-static-assets.s3.amazonaws.com/badges/p1.svg?v=9" align="top"></a> **Repeated SSE stream read failures still emit full tracebacks** - **Bug** - The PR contract says `stream_task_events` should emit a full traceback only on the first failure of a stream and log repeated failures as compact single-line records with repeat/failure count. Executed head evidence shows failure #1, #2, #3, and the post-reset failure all call `logger.error(..., exc_info=True)`, so repeated failures still include traceback data. Backoff and reset behavior worked in the harness (`1.0, 2.0, 4.0`, reset to `failure #1` after a healthy read), but traceback throttling did not. - **Cause** - In `agentex/src/domain/use_cases/streams_use_case.py`, the stream error handler always passes `exc_info=True` to `logger.error` for every consecutive failure rather than conditioning traceback logging on the first failure after a healthy read. - **Fix** - Change the error handler to pass `exc_info=True` only when `consecutive_errors == 1`; for subsequent consecutive failures, log a compact message with the failure/repeat count and `exc_info=False` or omit `exc_info`. Keep the existing reset of `consecutive_errors = 0` after a successful read cycle. <sub><a href="https://www.greptile.com/trex"><img alt="T-Rex" src="https://greptile-static-assets.s3.amazonaws.com/trex/trex_green.svg" height="14" align="absmiddle"></a> Ran code and verified through T-Rex</sub> </details> <!-- /greptile_failed_comments --> <a href="https://app.greptile.com/api/ide/cursor?prompt=Fix%20the%20following%201%20code%20review%20issue.%20Work%20through%20them%20one%20at%20a%20time%2C%20proposing%20concise%20fixes.%0A%0A---%0A%0A%23%23%23%20Issue%201%20of%201%0Aagentex%2Fsrc%2Futils%2Flogging.py%3A165-168%0A**Small%20bodies%20still%20log**%0A%0AThis%20only%20caps%20oversized%20structured%20fields%3B%20it%20does%20not%20stop%20JSON%20logging%20from%20serializing%20request%20%60extra%60%20fields.%20%60LoggedAPIRoute.log_request%60%20still%20passes%20%60body%60%2C%20%60headers%60%2C%20and%20%60query_params%60%2C%20and%20%60strip_sensitive_items%60%20only%20removes%20blacklisted%20keys.%20In%20non-development%20environments%2C%20any%20non-blacklisted%20request%20body%20under%204096%20characters%20is%20still%20emitted%20into%20production%20logs%2C%20which%20can%20expose%20request%20payload%20data%20and%20keeps%20per-request%20body%20logging%20enabled.%20Please%20drop%20or%20allowlist%20these%20structured%20request%20fields%20before%20JSON%20serialization%20rather%20than%20only%20truncating%20large%20values.%0A%0A&pr=340&platform=github"><picture><source media="(prefers-color-scheme: dark)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInCursorDark.svg?v=3"><source media="(prefers-color-scheme: light)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInCursor.svg?v=3"><img alt="Fix All in Cursor" src="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInCursor.svg?v=3" height="20"></picture></a> <a href="https://app.greptile.com/ide/claude-code?prompt=Fix%20the%20following%201%20code%20review%20issue.%20Work%20through%20them%20one%20at%20a%20time%2C%20proposing%20concise%20fixes.%0A%0A---%0A%0A%23%23%23%20Issue%201%20of%201%0Aagentex%2Fsrc%2Futils%2Flogging.py%3A165-168%0A**Small%20bodies%20still%20log**%0A%0AThis%20only%20caps%20oversized%20structured%20fields%3B%20it%20does%20not%20stop%20JSON%20logging%20from%20serializing%20request%20%60extra%60%20fields.%20%60LoggedAPIRoute.log_request%60%20still%20passes%20%60body%60%2C%20%60headers%60%2C%20and%20%60query_params%60%2C%20and%20%60strip_sensitive_items%60%20only%20removes%20blacklisted%20keys.%20In%20non-development%20environments%2C%20any%20non-blacklisted%20request%20body%20under%204096%20characters%20is%20still%20emitted%20into%20production%20logs%2C%20which%20can%20expose%20request%20payload%20data%20and%20keeps%20per-request%20body%20logging%20enabled.%20Please%20drop%20or%20allowlist%20these%20structured%20request%20fields%20before%20JSON%20serialization%20rather%20than%20only%20truncating%20large%20values.%0A%0A&repo=scaleapi%2Fscale-agentex&pr=340&platform=github"><picture><source media="(prefers-color-scheme: dark)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInClaudeDark.svg?v=3"><source media="(prefers-color-scheme: light)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInClaude.svg?v=3"><img alt="Fix All in Claude Code" src="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInClaude.svg?v=3" height="20"></picture></a> <a href="https://app.greptile.com/api/ide/codex?prompt=IMPORTANT%3A%20Work%20in%20the%20repository%20%22scaleapi%2Fscale-agentex%22%20on%20the%20existing%20branch%20%22dm%2Fsse-redis-pool-headroom-log-backoff%22.%20Checkout%20that%20branch%20%E2%80%94%20do%20NOT%20create%20a%20new%20branch%20or%20open%20a%20new%20PR.%20Push%20your%20changes%20to%20%22dm%2Fsse-redis-pool-headroom-log-backoff%22.%0A%0AFix%20the%20following%201%20code%20review%20issue.%20Work%20through%20them%20one%20at%20a%20time%2C%20proposing%20concise%20fixes.%0A%0A---%0A%0A%23%23%23%20Issue%201%20of%201%0Aagentex%2Fsrc%2Futils%2Flogging.py%3A165-168%0A**Small%20bodies%20still%20log**%0A%0AThis%20only%20caps%20oversized%20structured%20fields%3B%20it%20does%20not%20stop%20JSON%20logging%20from%20serializing%20request%20%60extra%60%20fields.%20%60LoggedAPIRoute.log_request%60%20still%20passes%20%60body%60%2C%20%60headers%60%2C%20and%20%60query_params%60%2C%20and%20%60strip_sensitive_items%60%20only%20removes%20blacklisted%20keys.%20In%20non-development%20environments%2C%20any%20non-blacklisted%20request%20body%20under%204096%20characters%20is%20still%20emitted%20into%20production%20logs%2C%20which%20can%20expose%20request%20payload%20data%20and%20keeps%20per-request%20body%20logging%20enabled.%20Please%20drop%20or%20allowlist%20these%20structured%20request%20fields%20before%20JSON%20serialization%20rather%20than%20only%20truncating%20large%20values.%0A%0A&repo=scaleapi%2Fscale-agentex&pr=340&platform=github"><picture><source media="(prefers-color-scheme: dark)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInCodexDark.svg?v=3"><source media="(prefers-color-scheme: light)" srcset="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInCodex.svg?v=3"><img alt="Fix All in Codex" src="https://greptile-static-assets.s3.amazonaws.com/badges/FixAllInCodex.svg?v=3" height="20"></picture></a> <details><summary>Prompt To Fix All With AI</summary> `````markdown Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes. --- ### Issue 1 of 1 agentex/src/utils/logging.py:165-168 **Small bodies still log** This only caps oversized structured fields; it does not stop JSON logging from serializing request `extra` fields. `LoggedAPIRoute.log_request` still passes `body`, `headers`, and `query_params`, and `strip_sensitive_items` only removes blacklisted keys. In non-development environments, any non-blacklisted request body under 4096 characters is still emitted into production logs, which can expose request payload data and keeps per-request body logging enabled. Please drop or allowlist these structured request fields before JSON serialization rather than only truncating large values. ````` </details> <sub>Reviews (5): Last reviewed commit: ["fix(streams): cut SSE error-log volume a..."](e9e9e94) | [Re-trigger Greptile](https://app.greptile.com/api/retrigger?id=39843348)</sub> > Greptile also left **1 inline comment** on this PR. **Context used:** - Rule used - What: Never log full response bodies, request bodi... ([source](https://app.greptile.com/scale-ai/-/custom-context?memory=fa8d684f-4686-4f3e-b1ef-c27453f614ea)) <!-- /greptile_comment --> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 5ad81e8 commit 352eaaa

3 files changed

Lines changed: 76 additions & 7 deletions

File tree

agentex/src/config/environment_variables.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,13 @@ class EnvironmentVariables(BaseModel):
121121
MONGODB_DATABASE_NAME: str | None = "agentex"
122122
MONGODB_MAX_POOL_SIZE: int = 50
123123
MONGODB_MIN_POOL_SIZE: int = 5
124-
REDIS_MAX_CONNECTIONS: int = 50 # Increased for SSE streaming
124+
# SSE streaming currently holds one blocking XREAD connection per connected
125+
# client, so the pool needs headroom for peak concurrent streams per pod.
126+
# NOTE: this is only the in-code default — deployed environments override it
127+
# via the REDIS_MAX_CONNECTIONS env var, which is the real cap. Bumping this
128+
# buys headroom but does NOT change the 1-connection-per-client scaling; the
129+
# durable fix is a shared per-pod reader that fans out to in-process queues.
130+
REDIS_MAX_CONNECTIONS: int = 200
125131
REDIS_CONNECTION_TIMEOUT: int = 60 # Connection timeout in seconds
126132
REDIS_SOCKET_TIMEOUT: int = 30 # Socket timeout in seconds
127133
REDIS_STREAM_MAXLEN: int = (
@@ -193,7 +199,7 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
193199
os.environ.get(EnvVarKeys.MONGODB_MIN_POOL_SIZE, "5")
194200
),
195201
REDIS_MAX_CONNECTIONS=int(
196-
os.environ.get(EnvVarKeys.REDIS_MAX_CONNECTIONS, "100")
202+
os.environ.get(EnvVarKeys.REDIS_MAX_CONNECTIONS, "200")
197203
),
198204
REDIS_CONNECTION_TIMEOUT=int(
199205
os.environ.get(EnvVarKeys.REDIS_CONNECTION_TIMEOUT, "20")

agentex/src/domain/use_cases/streams_use_case.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,11 @@ async def stream_task_events(
114114
ping_interval = float(
115115
self.environment_variables.SSE_KEEPALIVE_PING_INTERVAL
116116
) # Configurable keepalive ping interval
117+
# Track consecutive read failures so we can back off and avoid a
118+
# tight error loop. When the Redis pool is exhausted, every connected
119+
# client's read fails on each cycle; without backoff this turns into a
120+
# log-ingestion firehose (one failure per client per cycle, ~once/sec).
121+
consecutive_errors = 0
117122
try:
118123
# Application-level control loop
119124
while True:
@@ -133,6 +138,10 @@ async def stream_task_events(
133138
last_message_time = asyncio.get_running_loop().time()
134139
await asyncio.sleep(0.02)
135140

141+
# A read cycle completed without raising — the stream is
142+
# healthy again, so reset the backoff/error counter.
143+
consecutive_errors = 0
144+
136145
# If we didn't get any messages, add a small pause
137146
# to prevent tight loops and send keepalive ping if needed
138147
if message_count == 0:
@@ -151,13 +160,25 @@ async def stream_task_events(
151160
)
152161
raise
153162
except Exception as e:
163+
consecutive_errors += 1
164+
# Always log the full traceback — nothing is swallowed.
165+
# Volume is controlled two ways instead of by dropping
166+
# diagnostics: structured JSON logging keeps each traceback
167+
# to a single log entry (see utils.logging), and the
168+
# exponential backoff below caps how often a sustained
169+
# failure can repeat. The failure counter gives context on
170+
# how long a stream has been erroring.
154171
logger.error(
155-
f"Error processing events for task {task_id}: {e}",
172+
f"Error processing events for task {task_id} "
173+
f"(failure #{consecutive_errors}): {e}",
156174
exc_info=True,
157175
)
158176
yield f"data: {TaskStreamErrorEventEntity(type='error', message=str(e)).model_dump_json()}\n\n"
159-
# Add a small delay before continuing
160-
await asyncio.sleep(1)
177+
# Exponential backoff (capped) so a sustained failure (e.g.
178+
# Redis pool exhaustion) doesn't spin a tight per-client
179+
# loop hammering Redis and flooding logs.
180+
backoff = min(2.0 ** min(consecutive_errors - 1, 5), 30.0)
181+
await asyncio.sleep(backoff)
161182

162183
except asyncio.CancelledError:
163184
# Just exit the generator on cancellation

agentex/src/utils/logging.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import contextvars
2+
import json
23
import logging
34
import os
45
import re
56
import sys
67
from collections.abc import Sequence
8+
from typing import Any
79

810
import ddtrace
911
import json_log_formatter
@@ -14,6 +16,17 @@
1416
# Check if Datadog is configured
1517
_is_datadog_configured = bool(os.environ.get("DD_AGENT_HOST"))
1618

19+
# Emit structured JSON logs in all deployed environments. JSON keeps a
20+
# multi-line traceback (exc_info=True) as a single log entry — the newlines
21+
# live inside the quoted `exc_info` field — instead of fanning out into one
22+
# cluster-log entry per traceback line. Splitting tracebacks per line was a
23+
# primary multiplier behind a log-ingestion spike on a plain-text-logging
24+
# cluster. Local development keeps plain text for readable console output;
25+
# JSON is the default everywhere else (including when ENVIRONMENT is unset)
26+
# so a deployed cluster can never silently fall back to per-line tracebacks.
27+
_is_local_dev = os.environ.get("ENVIRONMENT", "").lower() == "development"
28+
_use_json_logs = not _is_local_dev
29+
1730
# Include Datadog trace IDs only when Datadog is configured
1831
if _is_datadog_configured:
1932
LOG_FORMAT: str = (
@@ -89,6 +102,30 @@ def filter(self, record: logging.LogRecord) -> bool:
89102
_sensitive_data_filter = SensitiveDataFilter()
90103

91104

105+
# Cap the size of individual structured fields in JSON logs. Request logging
106+
# (LoggedAPIRoute.log_request) attaches the decoded body, headers, and
107+
# query_params as ``extra``, and the JSON formatter serializes ``extra`` —
108+
# unlike the plain-text formatter, which drops it. Without a cap, a single
109+
# large request payload would create a very large per-request log entry on
110+
# every request, reintroducing the log-volume problem this mode exists to
111+
# avoid. ``exc_info`` is exempt: a traceback is bounded per error (not per
112+
# request) and the full stack is worth keeping.
113+
_MAX_JSON_FIELD_CHARS = 4096
114+
_UNCAPPED_JSON_FIELDS = frozenset({"exc_info"})
115+
116+
117+
def _truncate_log_value(value: Any) -> Any:
118+
"""Return ``value`` unchanged if small, else a truncated string marker."""
119+
try:
120+
rendered = value if isinstance(value, str) else json.dumps(value, default=str)
121+
except (TypeError, ValueError):
122+
rendered = str(value)
123+
if len(rendered) <= _MAX_JSON_FIELD_CHARS:
124+
return value
125+
dropped = len(rendered) - _MAX_JSON_FIELD_CHARS
126+
return rendered[:_MAX_JSON_FIELD_CHARS] + f"...[truncated {dropped} chars]"
127+
128+
92129
class CustomJSONFormatter(json_log_formatter.JSONFormatter):
93130
def json_record(self, message: str, extra: dict, record: logging.LogRecord) -> dict:
94131
extra = super().json_record(message, extra, record)
@@ -123,7 +160,12 @@ def json_record(self, message: str, extra: dict, record: logging.LogRecord) -> d
123160
if version_override:
124161
extra["dd.version"] = version_override
125162

126-
return extra
163+
# Bound per-field size so large request bodies/headers/query_params
164+
# logged via `extra` can't create oversized entries on every request.
165+
return {
166+
k: v if k in _UNCAPPED_JSON_FIELDS else _truncate_log_value(v)
167+
for k, v in extra.items()
168+
}
127169

128170

129171
def make_logger(name: str) -> logging.Logger:
@@ -134,7 +176,7 @@ def make_logger(name: str) -> logging.Logger:
134176

135177
logger = logging.getLogger(name)
136178
stream_handler = logging.StreamHandler()
137-
if _is_datadog_configured:
179+
if _use_json_logs:
138180
stream_handler.setFormatter(CustomJSONFormatter())
139181
else:
140182
stream_handler.setFormatter(logging.Formatter(LOG_FORMAT))

0 commit comments

Comments
 (0)