From eefb96c87d95ac48ea92a4012306549b1628e5e9 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Sat, 23 May 2026 20:20:09 +0100 Subject: [PATCH] fix(webapp): recover from ClickHouse JSON parse failures in runs replication (#3708) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary On a ClickHouse `Cannot parse JSON object` rejection, `RunsReplicationService` now sanitizes lone UTF-16 surrogates across the failing batch via the existing `sanitizeRows` helper and retries once. If the sanitizer found nothing or the retry also fails, the batch is dropped loudly with a counter increment, so the surrounding `#insertWithRetry` layer doesn't spin three more times on a deterministic failure. Non-parse errors propagate unchanged. Mirrors the pattern from #3659 (for `ClickhouseEventRepository`) — same root cause (lone UTF-16 surrogates in user-provided JSON), same recovery shape, **reusing the same shared helpers** (`sanitizeRows`, `isClickHouseJsonParseError`, `parseRowNumberFromError`). Fixes the customer-facing symptom from [TRI-9755](https://linear.app/triggerdotdev/issue/TRI-9755): a single row's poisoned `output` JSON used to take down the `COMPLETED_SUCCESSFULLY` UPDATE events for its 50+ batch-mates, stranding them in `EXECUTING` in ClickHouse forever and inflating "Running" counts on the Tasks page. Confirmed in production this is ongoing — ~120k stale rows accumulated in a single 5-hour burst on 2026-05-18; smaller continuous leak before and after. ## What changed `apps/webapp/app/services/runsReplicationService.server.ts`: - Imports the three helpers from `~/v3/eventRepository/sanitizeRowsOnParseError.server` (no duplication; no move). - New private `#insertWithJsonParseRecovery(rows, doInsert, contextLabel, attempt)` — generic over `TaskRunInsertArray[]` and `PayloadInsertArray[]`, structurally identical to `ClickhouseEventRepository.#insertWithJsonParseRecovery`. Try → on parse error sanitize the whole batch (the `at row N` hint is logged but not used to slice — semantics under `input_format_parallel_parsing` aren't stable) → retry once → drop with loud log if sanitizer found nothing OR retry still fails. - `#insertTaskRunInserts` and `#insertPayloadInserts` extract a `doInsert` closure and hand it to the wrapper. Existing error logging, span recording, and `recordSpanError` are preserved inside the closure. - New `private _permanentlyDroppedBatches = 0` counter with a public getter, for ops dashboards and tests (matches the events-repo convention). One shared counter for both insert sites — granularity comes from the `contextLabel` (`task_runs_v2` / `raw_task_runs_payload_v1`) on every log line. `.server-changes/runs-replication-utf16-recovery.md` — release notes entry. ## Why no new tests The shared helpers already have full unit + real-ClickHouse contract coverage from #3659 (`apps/webapp/test/sanitizeRowsOnParseError.test.ts`, `apps/webapp/test/otlpUtf16Sanitization.integration.test.ts`). The new wrapper is a line-for-line structural port. Adding a parallel integration test would require synthesizing bad data that *escapes* the preemptive `detectBadJsonStrings` check in `#prepareJson` but still trips ClickHouse — non-trivial without hand-crafted fixtures and wouldn't cover any new logic. ## What this does NOT do - Doesn't touch the ~120k existing stale `EXECUTING` rows in production. That needs a reconciliation/backfill sweep (separate ticket — TRI-9755 fix #3). - Doesn't sanitize the `error` column path (`runsReplicationService.server.ts:932 const errorData = { data: run.error };`). Reactive recovery will catch it if it ever poisons a batch, but feeding it through `#prepareJson` like `output` is a cheap follow-up. ## Test plan - [x] `pnpm run typecheck --filter webapp` — clean - [ ] Post-deploy: confirm `permanentlyDroppedBatches` counter stays at zero (or near-zero) in `/stp/trigger-app-prod/ecs/replication/service-container/process-logs`, and watch for `Sanitizing batch after ClickHouse JSON parse error` warns to confirm recovery is firing on real traffic - [ ] Post-deploy: confirm the rate of new "EXECUTING-but-actually-COMPLETED" zombies in ClickHouse flattens (current rate ≈ tens-to-hundreds per hour platform-wide) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 --- .../runs-replication-utf16-recovery.md | 20 ++ .../services/runsReplicationService.server.ts | 207 ++++++++++++++---- 2 files changed, 189 insertions(+), 38 deletions(-) create mode 100644 .server-changes/runs-replication-utf16-recovery.md diff --git a/.server-changes/runs-replication-utf16-recovery.md b/.server-changes/runs-replication-utf16-recovery.md new file mode 100644 index 00000000000..4a998f2a31e --- /dev/null +++ b/.server-changes/runs-replication-utf16-recovery.md @@ -0,0 +1,20 @@ +--- +area: webapp +type: fix +--- + +Recover from ClickHouse `JSONEachRow` parse failures in the runs +replication path. `RunsReplicationService` now wraps its task-run and +payload inserts with the same reactive-sanitisation pattern used by +`ClickhouseEventRepository` since #3659: on `Cannot parse JSON object`, +sanitize lone UTF-16 surrogates across the batch (via the shared +`sanitizeRows` helper) and retry once. If the sanitiser found nothing +or the retry also fails, the batch is dropped, `permanentlyDroppedBatches` +increments, and a loud error log is emitted — preventing the surrounding +`#insertWithRetry` layer from spinning on the same deterministic +failure. Non-parse errors propagate unchanged. + +Stops the bleeding behind the customer-visible "Tasks page shows a huge +Running count" symptom: one row with bad output JSON used to take down +the COMPLETED updates for its 50+ batch-mates, leaving every one of +them stranded in `EXECUTING` in ClickHouse forever (Postgres unaffected). diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 6b62cc76b27..39bfd379ecb 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -38,6 +38,11 @@ import EventEmitter from "node:events"; import pLimit from "p-limit"; import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings"; import { calculateErrorFingerprint } from "~/utils/errorFingerprinting"; +import { + isClickHouseJsonParseError, + parseRowNumberFromError, + sanitizeRows, +} from "~/v3/eventRepository/sanitizeRowsOnParseError.server"; interface TransactionEvent { tag: "insert" | "update" | "delete"; @@ -129,6 +134,15 @@ export class RunsReplicationService { private _disablePayloadInsert: boolean; private _disableErrorFingerprinting: boolean; + /** + * Counts batches that hit a ClickHouse `Cannot parse JSON object` failure + * that survived one sanitize-retry. These batches are dropped on the floor + * (returning success-ish to the caller so the retry layer doesn't spin on + * the same deterministic failure), and we track the drop count for + * observability. Counter only — does not gate behaviour. + */ + private _permanentlyDroppedBatches = 0; + // Metrics private _replicationLagHistogram: Histogram; private _batchesFlushedCounter: Counter; @@ -283,6 +297,11 @@ export class RunsReplicationService { this._insertMaxDelayMs = options.insertMaxDelayMs ?? 2000; } + /** Exposed for tests and metrics — total batches lost to unrecoverable parse errors. */ + get permanentlyDroppedBatches() { + return this._permanentlyDroppedBatches; + } + public async shutdown() { if (this._isShuttingDown) return; @@ -658,7 +677,7 @@ export class RunsReplicationService { combinedTaskRunInserts.push(...group.taskRunInserts); combinedPayloadInserts.push(...group.payloadInserts); - const [trErr] = await this.#insertWithRetry( + const [trErr, trOutcome] = await this.#insertWithRetry( (attempt) => this.#insertTaskRunInserts(clickhouse, group.taskRunInserts, attempt), "task run inserts", flushId @@ -667,7 +686,7 @@ export class RunsReplicationService { taskRunError = trErr; } - const [plErr] = await this.#insertWithRetry( + const [plErr, plOutcome] = await this.#insertWithRetry( (attempt) => this.#insertPayloadInserts(clickhouse, group.payloadInserts, attempt), "payload inserts", flushId @@ -676,10 +695,14 @@ export class RunsReplicationService { payloadError = plErr; } - if (!trErr) { + // Only count rows that actually landed in ClickHouse. `kind: "dropped"` + // means the recovery wrapper bailed (sanitizer no-op or sanitize-retry + // still failed) — those rows never made it, so they must not show up + // as successful inserts in the per-batch counter. + if (!trErr && trOutcome?.kind !== "dropped") { this._taskRunsInsertedCounter.add(group.taskRunInserts.length); } - if (!plErr) { + if (!plErr && plOutcome?.kind !== "dropped") { this._payloadsInsertedCounter.add(group.payloadInserts.length); } } @@ -837,24 +860,28 @@ export class RunsReplicationService { return; } return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => { - const [insertError, insertResult] = - await clickhouse.taskRuns.insertCompactArrays(taskRunInserts, { - params: { - clickhouse_settings: this.#getClickhouseInsertSettings(), - }, - }); - - if (insertError) { - this.logger.error("Error inserting task run inserts attempt", { - error: insertError, - attempt, - }); - - recordSpanError(span, insertError); - throw insertError; - } - - return insertResult; + const doInsert = async () => { + const [insertError, insertResult] = await clickhouse.taskRuns.insertCompactArrays( + taskRunInserts, + { params: { clickhouse_settings: this.#getClickhouseInsertSettings() } } + ); + if (insertError) { + this.logger.error("Error inserting task run inserts attempt", { + error: insertError, + attempt, + }); + recordSpanError(span, insertError); + throw insertError; + } + return insertResult; + }; + + return await this.#insertWithJsonParseRecovery( + taskRunInserts, + doInsert, + "task_runs_v2", + attempt + ); }); } @@ -867,25 +894,129 @@ export class RunsReplicationService { return; } return await startSpan(this._tracer, "insertPayloadInserts", async (span) => { - const [insertError, insertResult] = - await clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, { - params: { - clickhouse_settings: this.#getClickhouseInsertSettings(), - }, - }); - - if (insertError) { - this.logger.error("Error inserting payload inserts attempt", { - error: insertError, - attempt, - }); + const doInsert = async () => { + const [insertError, insertResult] = await clickhouse.taskRuns.insertPayloadsCompactArrays( + payloadInserts, + { params: { clickhouse_settings: this.#getClickhouseInsertSettings() } } + ); + if (insertError) { + this.logger.error("Error inserting payload inserts attempt", { + error: insertError, + attempt, + }); + recordSpanError(span, insertError); + throw insertError; + } + return insertResult; + }; + + return await this.#insertWithJsonParseRecovery( + payloadInserts, + doInsert, + "raw_task_runs_payload_v1", + attempt + ); + }); + } - recordSpanError(span, insertError); - throw insertError; + /** + * Wraps a ClickHouse insert with reactive UTF-16 sanitization for + * `Cannot parse JSON object` rejections. Mirrors the pattern from + * `ClickhouseEventRepository.#insertWithJsonParseRecovery` introduced + * in #3659 — same root cause (lone UTF-16 surrogates in user-provided + * JSON), same recovery shape: + * + * 1. Try the insert. Healthy batches pay zero scan cost. + * 2. On parse error, walk the whole batch via `sanitizeRows` and + * replace any lone-surrogate string with `"[invalid-utf16]"`. + * 3. Retry once. If the sanitizer found nothing or the retry also + * fails with the same error class, drop the batch loudly and + * return — do NOT rethrow, otherwise the surrounding + * `#insertWithRetry` layer would spin three more times on the + * same deterministic failure. + * 4. Non-parse errors propagate unchanged so the existing + * transient-retry path still handles them. + * + * The whole-batch scan (rather than slicing on the `at row N` hint) is + * deliberate: `at row N` semantics under `input_format_parallel_parsing` + * aren't stable enough to safely skip rows. The cost is bounded because + * `detectBadJsonStrings` exits in O(1) for clean strings. + */ + async #insertWithJsonParseRecovery( + rows: T[], + doInsert: () => Promise, + contextLabel: string, + attempt: number + ): Promise< + | { kind: "inserted"; insertResult: unknown } + | { kind: "sanitized"; insertResult: unknown } + | { kind: "dropped" } + > { + try { + return { kind: "inserted", insertResult: await doInsert() }; + } catch (firstError) { + if (!isClickHouseJsonParseError(firstError)) throw firstError; + + const firstMessage = + typeof firstError === "object" && firstError !== null && "message" in firstError + ? String((firstError as { message?: unknown }).message ?? "") + : String(firstError); + + const rowHint = parseRowNumberFromError(firstMessage); + const { rowsTouched, fieldsSanitized } = sanitizeRows(rows); + + if (fieldsSanitized === 0) { + this._permanentlyDroppedBatches += 1; + this.logger.error( + "Dropped batch — ClickHouse JSON parse error but sanitizer found nothing to fix", + { + contextLabel, + attempt, + batchSize: rows.length, + clickhouseRowHint: rowHint, + permanentlyDroppedBatches: this._permanentlyDroppedBatches, + sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024), + clickhouseError: firstMessage.split("\n")[0], + } + ); + return { kind: "dropped" }; } - return insertResult; - }); + this.logger.warn("Sanitizing batch after ClickHouse JSON parse error", { + contextLabel, + attempt, + batchSize: rows.length, + clickhouseRowHint: rowHint, + rowsTouched, + fieldsSanitized, + clickhouseError: firstMessage.split("\n")[0], + }); + + try { + return { kind: "sanitized", insertResult: await doInsert() }; + } catch (retryError) { + if (!isClickHouseJsonParseError(retryError)) throw retryError; + + this._permanentlyDroppedBatches += 1; + const retryMessage = + typeof retryError === "object" && retryError !== null && "message" in retryError + ? String((retryError as { message?: unknown }).message ?? "") + : String(retryError); + this.logger.error( + "Dropped batch after sanitize-retry still hit ClickHouse JSON parse error", + { + contextLabel, + attempt, + batchSize: rows.length, + permanentlyDroppedBatches: this._permanentlyDroppedBatches, + sampleRow: JSON.stringify(rows[0] ?? null).slice(0, 1024), + firstError: firstMessage.split("\n")[0], + retryError: retryMessage.split("\n")[0], + } + ); + return { kind: "dropped" }; + } + } } async #prepareRunInserts(