From 89e48689873ed577109bbfa5949697fb6a521d7d Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Fri, 10 Apr 2026 21:51:24 -0700 Subject: [PATCH 1/2] fix: async span export to prevent event loop blocking The span export pipeline ran synchronously on the event loop, blocking for 1-1.6s every 2s under load. This caused pg pool connection timeouts for customers like Greenboard. - Chunk transformSpanToCleanJSON across event loop ticks (setImmediate yields between batches of 20) so pool callbacks and timers can fire - Replace fs.appendFileSync with async fs/promises.appendFile and batch writes by trace file (one write per file instead of per span) - Remove dead duplicate adapter-length check - Add pg e2e test endpoints reproducing Greenboard's pool patterns - Add export pipeline benchmark script Measured: max event loop stall drops from 1,609ms to 22ms. E2E record+replay: 12/12 tests pass. --- src/core/tracing/TdSpanExporter.ts | 39 ++- .../tracing/adapters/FilesystemSpanAdapter.ts | 20 +- .../pg/e2e-tests/cjs-pg/run-export-bench.sh | 201 +++++++++++ .../cjs-pg/src/export-pipeline-bench.ts | 105 ++++++ .../pg/e2e-tests/cjs-pg/src/index.ts | 319 ++++++++++++++++++ 5 files changed, 662 insertions(+), 22 deletions(-) create mode 100644 src/instrumentation/libraries/pg/e2e-tests/cjs-pg/run-export-bench.sh create mode 100644 src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/export-pipeline-bench.ts diff --git a/src/core/tracing/TdSpanExporter.ts b/src/core/tracing/TdSpanExporter.ts index 52086698..748c9394 100644 --- a/src/core/tracing/TdSpanExporter.ts +++ b/src/core/tracing/TdSpanExporter.ts @@ -102,8 +102,14 @@ export class TdSpanExporter implements SpanExporter { * Export spans using all configured adapters */ export(spans: ReadableSpan[], resultCallback: (result: ExportResult) => void): void { + this._exportAsync(spans).then( + () => resultCallback({ code: ExportResultCode.SUCCESS }), + (error) => resultCallback({ code: ExportResultCode.FAILED, error }), + ); + } + + private async _exportAsync(spans: ReadableSpan[]): Promise { if (this.mode !== TuskDriftMode.RECORD) { - resultCallback({ code: ExportResultCode.SUCCESS }); return; } @@ -171,29 +177,26 @@ export class TdSpanExporter implements SpanExporter { `Filtered ${filteredSpansBasedOnLibraryName.length - filteredBlockedSpans.length} blocked/oversized span(s), ${filteredBlockedSpans.length} remaining`, ); - // Transform spans to CleanSpanData - const cleanSpans: CleanSpanData[] = filteredBlockedSpans.map((span) => - SpanTransformer.transformSpanToCleanJSON(span, this.environment), - ); - - if (this.adapters.length === 0) { - logger.debug("No adapters configured"); - resultCallback({ code: ExportResultCode.SUCCESS }); - return; + // Yield the event loop between chunks to avoid blocking pool callbacks, timers, and I/O. + const TRANSFORM_CHUNK_SIZE = 20; + const cleanSpans: CleanSpanData[] = []; + for (let i = 0; i < filteredBlockedSpans.length; i += TRANSFORM_CHUNK_SIZE) { + const end = Math.min(i + TRANSFORM_CHUNK_SIZE, filteredBlockedSpans.length); + for (let j = i; j < end; j++) { + cleanSpans.push( + SpanTransformer.transformSpanToCleanJSON(filteredBlockedSpans[j], this.environment), + ); + } + if (end < filteredBlockedSpans.length) { + await new Promise((resolve) => setImmediate(resolve)); + } } - // Filter adapters based on mode - if (this.adapters.length === 0) { - logger.debug(`No active adapters for mode: ${this.mode}`); - resultCallback({ code: ExportResultCode.SUCCESS }); return; } - // Export to all active adapters - Promise.all(this.adapters.map((adapter) => adapter.exportSpans(cleanSpans))) - .then(() => resultCallback({ code: ExportResultCode.SUCCESS })) - .catch((error) => resultCallback({ code: ExportResultCode.FAILED, error })); + await Promise.all(this.adapters.map((adapter) => adapter.exportSpans(cleanSpans))); } /** diff --git a/src/core/tracing/adapters/FilesystemSpanAdapter.ts b/src/core/tracing/adapters/FilesystemSpanAdapter.ts index d282e947..1f802bb8 100644 --- a/src/core/tracing/adapters/FilesystemSpanAdapter.ts +++ b/src/core/tracing/adapters/FilesystemSpanAdapter.ts @@ -1,4 +1,5 @@ import * as fs from "fs"; +import * as fsp from "fs/promises"; import * as path from "path"; import { ExportResult, ExportResultCode } from "@opentelemetry/core"; import type { SpanExportAdapter } from "../TdSpanExporter"; @@ -30,22 +31,33 @@ export class FilesystemSpanAdapter implements SpanExportAdapter { async exportSpans(spans: CleanSpanData[]): Promise { try { + // Group spans by trace file so we do one write per file instead of one per span. + const linesByFile = new Map(); + for (const span of spans) { const traceId = span.traceId; - // Get or create file path for this trace ID let filePath = this.traceFileMap.get(traceId); - if (!filePath) { const isoTimestamp = new Date().toISOString().replace(/[:.]/g, "-"); filePath = path.join(this.baseDirectory, `${isoTimestamp}_trace_${traceId}.jsonl`); this.traceFileMap.set(traceId, filePath); } - const jsonLine = JSON.stringify({ ...span, kind: mapOtToPb(span.kind as OtSpanKind) }) + "\n"; - fs.appendFileSync(filePath, jsonLine, "utf8"); + let lines = linesByFile.get(filePath); + if (!lines) { + lines = []; + linesByFile.set(filePath, lines); + } + lines.push(JSON.stringify({ ...span, kind: mapOtToPb(span.kind as OtSpanKind) })); } + await Promise.all( + Array.from(linesByFile.entries()).map(([filePath, lines]) => + fsp.appendFile(filePath, lines.join("\n") + "\n", "utf8"), + ), + ); + logger.debug( `Exported ${spans.length} span(s) to trace-specific files in ${this.baseDirectory}`, ); diff --git a/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/run-export-bench.sh b/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/run-export-bench.sh new file mode 100644 index 00000000..f44696d3 --- /dev/null +++ b/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/run-export-bench.sh @@ -0,0 +1,201 @@ +#!/bin/bash +set -e + +# Export Pipeline Benchmark Runner +# Tests event loop blocking under different fix configurations. +# +# Usage: ./run-export-bench.sh +# Requires: Docker, docker compose, SDK built (npx tsdown) + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +SDK_ROOT="$(cd "$SCRIPT_DIR/../../../../../.." && pwd)" + +GREEN='\033[0;32m' +BLUE='\033[0;34m' +YELLOW='\033[1;33m' +NC='\033[0m' + +log() { echo -e "${2:-$NC}$1${NC}"; } + +# Files we patch for each fix variant +EXPORTER="$SDK_ROOT/src/core/tracing/TdSpanExporter.ts" +FS_ADAPTER="$SDK_ROOT/src/core/tracing/adapters/FilesystemSpanAdapter.ts" + +# Save originals +cp "$EXPORTER" "$EXPORTER.orig" +cp "$FS_ADAPTER" "$FS_ADAPTER.orig" + +restore_originals() { + cp "$EXPORTER.orig" "$EXPORTER" + cp "$FS_ADAPTER.orig" "$FS_ADAPTER" + rm -f "$EXPORTER.orig" "$FS_ADAPTER.orig" +} +trap restore_originals EXIT + +rebuild_sdk() { + log " Rebuilding SDK..." "$YELLOW" + (cd "$SDK_ROOT" && npx tsdown 2>&1 | tail -1) +} + +run_bench() { + local label="$1" + log "Running: $label" "$BLUE" + + docker compose -f "$SCRIPT_DIR/docker-compose.yml" up -d postgres 2>&1 | tail -1 + + # Wait for postgres + docker compose -f "$SCRIPT_DIR/docker-compose.yml" run --rm --name pg-export-bench \ + --entrypoint "" \ + -e TUSK_DRIFT_MODE=RECORD \ + -e TUSK_LOG_LEVEL=error \ + app bash -c "npm install --silent 2>&1 && npx tsc 2>&1 && node dist/export-pipeline-bench.js 2>/dev/null" \ + 2>/dev/null +} + +# ============================================================ +log "============================================================" "$BLUE" +log "Export Pipeline Benchmark" "$BLUE" +log "============================================================" "$BLUE" +echo "" + +RESULTS_FILE=$(mktemp) + +# --- Baseline --- +log "=== BASELINE (current code) ===" "$BLUE" +restore_originals +rebuild_sdk +echo -n '{"variant":"baseline","data":' >> "$RESULTS_FILE" +run_bench "baseline" >> "$RESULTS_FILE" +echo '}' >> "$RESULTS_FILE" +echo "" + +# --- Fix A: Chunked transform --- +log "=== FIX A: Chunked transformSpanToCleanJSON ===" "$BLUE" +restore_originals + +# Apply Fix A: replace the synchronous .map() with chunked async processing +cat > /tmp/fix-a.py << 'PYEOF' +import re, sys +content = open(sys.argv[1]).read() + +old = """ // Transform spans to CleanSpanData + const cleanSpans: CleanSpanData[] = filteredBlockedSpans.map((span) => + SpanTransformer.transformSpanToCleanJSON(span, this.environment), + );""" + +new = """ // Transform spans to CleanSpanData (chunked to avoid blocking event loop) + const CHUNK_SIZE = 20; + const cleanSpans: CleanSpanData[] = []; + for (let _i = 0; _i < filteredBlockedSpans.length; _i += CHUNK_SIZE) { + const chunk = filteredBlockedSpans.slice(_i, _i + CHUNK_SIZE); + cleanSpans.push(...chunk.map((span) => + SpanTransformer.transformSpanToCleanJSON(span, this.environment), + )); + if (_i + CHUNK_SIZE < filteredBlockedSpans.length) { + await new Promise((resolve) => setImmediate(resolve)); + } + }""" + +if old not in content: + print("ERROR: could not find target in TdSpanExporter.ts", file=sys.stderr) + sys.exit(1) +content = content.replace(old, new) + +# Make export() async +content = content.replace( + "export(spans: ReadableSpan[], resultCallback: (result: ExportResult) => void): void {", + "export(spans: ReadableSpan[], resultCallback: (result: ExportResult) => void): void {\n this._exportAsync(spans, resultCallback);\n }\n\n private async _exportAsync(spans: ReadableSpan[], resultCallback: (result: ExportResult) => void): Promise {" +) + +# Remove the duplicate mode check since _exportAsync handles it +# Actually, we need to be careful here. Let's just wrap the whole body. +# The simplest approach: make the method async by extracting to a helper. + +open(sys.argv[1], 'w').write(content) +print("Fix A applied") +PYEOF +python3 /tmp/fix-a.py "$EXPORTER" +rebuild_sdk +echo -n ',{"variant":"fix-a-chunked-transform","data":' >> "$RESULTS_FILE" +run_bench "fix-a" >> "$RESULTS_FILE" +echo '}' >> "$RESULTS_FILE" +echo "" + +# --- Fix B: Async filesystem --- +log "=== FIX B: Async FilesystemSpanAdapter ===" "$BLUE" +restore_originals + +# Apply Fix B: replace appendFileSync with appendFile +cat > /tmp/fix-b.py << 'PYEOF' +import sys +content = open(sys.argv[1]).read() + +content = content.replace( + 'import * as fs from "fs";', + 'import * as fs from "fs";\nimport * as fsPromises from "fs/promises";' +) +content = content.replace( + "fs.appendFileSync(filePath, jsonLine, \"utf8\");", + "await fsPromises.appendFile(filePath, jsonLine, \"utf8\");" +) +content = content.replace( + "async exportSpans(spans: CleanSpanData[]): Promise {", + "async exportSpans(spans: CleanSpanData[]): Promise {" +) + +open(sys.argv[1], 'w').write(content) +print("Fix B applied") +PYEOF +python3 /tmp/fix-b.py "$FS_ADAPTER" +rebuild_sdk +echo -n ',{"variant":"fix-b-async-fs","data":' >> "$RESULTS_FILE" +run_bench "fix-b" >> "$RESULTS_FILE" +echo '}' >> "$RESULTS_FILE" +echo "" + +# --- Fix A+B: Both --- +log "=== FIX A+B: Chunked transform + Async filesystem ===" "$BLUE" +restore_originals +python3 /tmp/fix-a.py "$EXPORTER" +python3 /tmp/fix-b.py "$FS_ADAPTER" +rebuild_sdk +echo -n ',{"variant":"fix-ab-both","data":' >> "$RESULTS_FILE" +run_bench "fix-a+b" >> "$RESULTS_FILE" +echo '}' >> "$RESULTS_FILE" +echo "" + +# --- Print comparison --- +log "============================================================" "$BLUE" +log "RESULTS COMPARISON" "$BLUE" +log "============================================================" "$BLUE" + +python3 << PYEOF +import json + +with open("$RESULTS_FILE") as f: + raw = '[' + f.read().strip() + ']' + # Fix trailing commas between objects + raw = raw.replace('}{', '},{').replace('}\n{', '},\n{') + +results = json.loads(raw) + +print(f"{'Variant':<35} {'Max Stall':>10} {'Total Stall':>12} {'Stall Count':>12} {'Req Time':>10}") +print("-" * 85) + +for r in results: + v = r['variant'] + d = r['data'] + print(f"{v:<35} {d['maxStallMs']:>8}ms {d['totalStallMs']:>10}ms {d['stallCount']:>12} {d['requestDurationMs']:>8}ms") + +print() +baseline = results[0]['data'] +for r in results[1:]: + d = r['data'] + if baseline['maxStallMs'] > 0: + improvement = (1 - d['maxStallMs'] / baseline['maxStallMs']) * 100 + print(f"{r['variant']}: max stall {improvement:+.0f}% vs baseline") +PYEOF + +# Cleanup +docker compose -f "$SCRIPT_DIR/docker-compose.yml" down 2>/dev/null || true +rm -f "$RESULTS_FILE" /tmp/fix-a.py /tmp/fix-b.py diff --git a/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/export-pipeline-bench.ts b/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/export-pipeline-bench.ts new file mode 100644 index 00000000..055e0741 --- /dev/null +++ b/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/export-pipeline-bench.ts @@ -0,0 +1,105 @@ +/** + * Benchmark: Measures event loop blocking caused by the span export pipeline. + * + * Fires a burst of HTTP requests that generate pg spans, then monitors + * event loop stalls during the BatchSpanProcessor export cycle. + * + * Usage (inside Docker): + * node dist/export-pipeline-bench.js + * + * Output: JSON with stall measurements for each export cycle. + */ +import { TuskDrift } from "./tdInit"; +import http from "http"; +import { Pool } from "pg"; + +const CONCURRENT_REQUESTS = 10; +const QUERIES_PER_REQUEST = 20; +const ROWS_PER_QUERY = 200; +const ROW_PAYLOAD_SIZE = 100; // chars per row +const EXPORT_WAIT_MS = 6000; // wait for export batches to fire +const STALL_THRESHOLD_MS = 5; // report stalls above this + +const dbConfig = { + host: process.env.POSTGRES_HOST || "postgres", + port: parseInt(process.env.POSTGRES_PORT || "5432"), + database: process.env.POSTGRES_DB || "testdb", + user: process.env.POSTGRES_USER || "testuser", + password: process.env.POSTGRES_PASSWORD || "testpass", + max: 10, +}; + +async function run() { + const pool = new Pool(dbConfig); + await pool.query("CREATE TABLE IF NOT EXISTS bench_t (id serial, val text)"); + TuskDrift.markAppAsReady(); + + // --- Event loop stall monitor --- + const stalls: number[] = []; + let monitoring = true; + const monitorLoop = () => { + if (!monitoring) return; + const t = Date.now(); + setImmediate(() => { + const delay = Date.now() - t; + if (delay > STALL_THRESHOLD_MS) stalls.push(delay); + if (monitoring) monitorLoop(); + }); + }; + + // --- HTTP server that generates pg spans --- + const query = `SELECT generate_series(1, ${ROWS_PER_QUERY}) as id, repeat(chr(65), ${ROW_PAYLOAD_SIZE}) as data`; + + const server = http.createServer(async (_req, res) => { + const promises = Array.from({ length: QUERIES_PER_REQUEST }, () => + pool.query(query) + ); + await Promise.all(promises); + res.end("ok"); + }); + + await new Promise((resolve) => server.listen(3000, resolve)); + + // --- Run benchmark --- + // Phase 1: generate spans + monitorLoop(); + const reqStart = Date.now(); + await Promise.all( + Array.from({ length: CONCURRENT_REQUESTS }, () => + fetch("http://localhost:3000/") + ) + ); + const reqDuration = Date.now() - reqStart; + + // Phase 2: wait for export batches to fire + await new Promise((r) => setTimeout(r, EXPORT_WAIT_MS)); + monitoring = false; + + // --- Report --- + const result = { + mode: process.env.TUSK_DRIFT_MODE || "DISABLED", + config: { + concurrentRequests: CONCURRENT_REQUESTS, + queriesPerRequest: QUERIES_PER_REQUEST, + rowsPerQuery: ROWS_PER_QUERY, + rowPayloadSize: ROW_PAYLOAD_SIZE, + }, + requestDurationMs: reqDuration, + stallCount: stalls.length, + maxStallMs: stalls.length > 0 ? Math.max(...stalls) : 0, + p99StallMs: stalls.length > 0 ? stalls.sort((a, b) => a - b)[Math.floor(stalls.length * 0.99)] : 0, + totalStallMs: stalls.reduce((a, b) => a + b, 0), + stalls, + }; + + console.log(JSON.stringify(result)); + + await pool.end(); + server.close(); + process.exit(0); +} + +run().catch((e) => { + console.error(e); + process.exit(1); +}); diff --git a/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/index.ts b/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/index.ts index 1d532cae..299fbf2d 100644 --- a/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/index.ts +++ b/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/index.ts @@ -16,6 +16,16 @@ const dbConfig = { connectionTimeoutMillis: 2000, }; +// Small pool to reproduce Greenboard's pooling issue +const smallPoolConfig = { + ...dbConfig, + max: 5, + idleTimeoutMillis: 30000, + connectionTimeoutMillis: 5000, +}; + +let smallPool: Pool; + let client: Client; let pool: Pool; @@ -29,6 +39,9 @@ async function initializeDatabase() { // Initialize pool pool = new Pool(dbConfig); + // Initialize small pool for Greenboard-style stress testing + smallPool = new Pool(smallPoolConfig); + // Create test tables await client.query(` CREATE TABLE IF NOT EXISTS test_users ( @@ -286,6 +299,311 @@ const server = http.createServer(async (req, res) => { return; } + // ============================================= + // Greenboard-style pool stress tests + // ============================================= + + // Replicates Greenboard's getPool() health-check pattern: + // Every query first does pool.connect() → query('SELECT 1') → release() + // to validate the pool, then does the actual query. + if (url === "/test/greenboard-health-check-query" && method === "GET") { + // Step 1: Health check (matches Greenboard's getPool()) + const healthClient = await smallPool.connect(); + await healthClient.query('SELECT 1'); + healthClient.release(); + + // Step 2: Actual query (matches Greenboard's executeQueryPG()) + const poolClient = await smallPool.connect(); + try { + const results = await poolClient.query("SELECT * FROM test_users ORDER BY id LIMIT 5"); + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ + success: true, + data: results.rows, + poolStats: { + totalCount: smallPool.totalCount, + idleCount: smallPool.idleCount, + waitingCount: smallPool.waitingCount, + }, + })); + } finally { + poolClient.release(); + } + return; + } + + // Concurrent pool stress test - fires N requests simultaneously + // to reproduce pool exhaustion under load + if (url === "/test/greenboard-concurrent-stress" && method === "GET") { + const concurrency = 10; // more than max pool size of 5 + const results: any[] = []; + const errors: any[] = []; + + const executeOneQuery = async (i: number) => { + // Greenboard's getPool() health check + const healthClient = await smallPool.connect(); + await healthClient.query('SELECT 1'); + healthClient.release(); + + // Greenboard's executeQueryPG() + const poolClient = await smallPool.connect(); + try { + const result = await poolClient.query( + "SELECT $1::int as query_num, pg_sleep(0.1)", [i] + ); + return { success: true, queryNum: i, rows: result.rows }; + } finally { + poolClient.release(); + } + }; + + // Fire all queries concurrently + const promises = Array.from({ length: concurrency }, (_, i) => + executeOneQuery(i) + .then(r => results.push(r)) + .catch(e => errors.push({ queryNum: i, error: e.message })) + ); + + await Promise.all(promises); + + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ + success: errors.length === 0, + totalQueries: concurrency, + successCount: results.length, + errorCount: errors.length, + errors, + poolStats: { + totalCount: smallPool.totalCount, + idleCount: smallPool.idleCount, + waitingCount: smallPool.waitingCount, + }, + })); + return; + } + + // Transaction test matching Greenboard's executeQueriesInTransactionPG + if (url === "/test/greenboard-transaction" && method === "GET") { + // Health check first + const healthClient = await smallPool.connect(); + await healthClient.query('SELECT 1'); + healthClient.release(); + + // Transaction + const txClient = await smallPool.connect(); + try { + await txClient.query("BEGIN"); + await txClient.query( + "INSERT INTO test_users (name, email) VALUES ($1, $2)", + [`TxUser ${Date.now()}`, `tx${Date.now()}@example.com`] + ); + const result = await txClient.query("SELECT COUNT(*) as total FROM test_users"); + await txClient.query("ROLLBACK"); + + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ + success: true, + data: result.rows, + poolStats: { + totalCount: smallPool.totalCount, + idleCount: smallPool.idleCount, + waitingCount: smallPool.waitingCount, + }, + })); + } catch (error) { + try { await txClient.query("ROLLBACK"); } catch {} + throw error; + } finally { + txClient.release(); + } + return; + } + + // Large result set test - reproduces the likely root cause of Greenboard's pool issue + // The SDK serializes ALL rows in _addOutputAttributesToSpan, which blocks the event loop + // while the pool connection is still held, causing pool exhaustion under load. + if (url === "/test/greenboard-large-result-stress" && method === "GET") { + const concurrency = 8; // more than pool max of 5 + const errors: any[] = []; + const results: any[] = []; + + const executeOneQuery = async (i: number) => { + // Greenboard health check pattern + const healthClient = await smallPool.connect(); + await healthClient.query('SELECT 1'); + healthClient.release(); + + // Simulate a query returning many rows (like Greenboard's compliance data) + const poolClient = await smallPool.connect(); + try { + const result = await poolClient.query( + `SELECT generate_series(1, 1000) as id, + repeat('data-payload-', 10) as payload, + now() as created_at` + ); + return { success: true, queryNum: i, rowCount: result.rowCount }; + } finally { + poolClient.release(); + } + }; + + const promises = Array.from({ length: concurrency }, (_, i) => + executeOneQuery(i) + .then(r => results.push(r)) + .catch(e => errors.push({ queryNum: i, error: e.message })) + ); + + await Promise.all(promises); + + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ + success: errors.length === 0, + totalQueries: concurrency, + successCount: results.length, + errorCount: errors.length, + errors, + poolStats: { + totalCount: smallPool.totalCount, + idleCount: smallPool.idleCount, + waitingCount: smallPool.waitingCount, + }, + })); + return; + } + + // Behavioral correctness test - checks that SDK doesn't change query return values + if (url === "/test/behavioral-correctness" && method === "GET") { + const issues: string[] = []; + + // Test 1: pool.query() should return full Result object with .rows, .rowCount, .command, .fields + const poolResult = await pool.query("SELECT 1 as num, 'hello' as greeting"); + if (!poolResult.rows) issues.push("pool.query missing .rows"); + if (poolResult.rowCount !== 1) issues.push(`pool.query .rowCount=${poolResult.rowCount}, expected 1`); + if (poolResult.command !== "SELECT") issues.push(`pool.query .command=${poolResult.command}, expected SELECT`); + if (!poolResult.fields || poolResult.fields.length !== 2) issues.push(`pool.query .fields length=${poolResult.fields?.length}, expected 2`); + if (poolResult.rows[0]?.num !== 1) issues.push(`pool.query rows[0].num=${poolResult.rows[0]?.num}, expected 1`); + + // Test 2: pool.connect() → client.query() should return full Result + const poolClient = await pool.connect(); + try { + const clientResult = await poolClient.query("SELECT 42 as answer"); + if (!clientResult.rows) issues.push("client.query missing .rows"); + if (clientResult.rowCount !== 1) issues.push(`client.query .rowCount=${clientResult.rowCount}`); + if (clientResult.rows[0]?.answer !== 42) issues.push(`client.query rows[0].answer=${clientResult.rows[0]?.answer}`); + + // Test 3: Transaction queries should work + await poolClient.query("BEGIN"); + const txResult = await poolClient.query("SELECT COUNT(*) as total FROM test_users"); + await poolClient.query("ROLLBACK"); + if (!txResult.rows) issues.push("tx query missing .rows"); + if (parseInt(txResult.rows[0]?.total) < 1) issues.push(`tx query total=${txResult.rows[0]?.total}`); + } finally { + poolClient.release(); + } + + // Test 4: Parameterized query + const paramResult = await pool.query("SELECT $1::int + $2::int as sum", [10, 20]); + if (paramResult.rows[0]?.sum !== 30) issues.push(`param query sum=${paramResult.rows[0]?.sum}, expected 30`); + + // Test 5: INSERT/UPDATE/DELETE returns correct rowCount + await pool.query("CREATE TABLE IF NOT EXISTS behavior_test (id serial, val text)"); + const insertResult = await pool.query("INSERT INTO behavior_test (val) VALUES ('test1'), ('test2')"); + if (insertResult.rowCount !== 2) issues.push(`INSERT rowCount=${insertResult.rowCount}, expected 2`); + if (insertResult.command !== "INSERT") issues.push(`INSERT command=${insertResult.command}`); + + const updateResult = await pool.query("UPDATE behavior_test SET val = 'updated' WHERE val = 'test1'"); + if (updateResult.rowCount !== 1) issues.push(`UPDATE rowCount=${updateResult.rowCount}, expected 1`); + + const deleteResult = await pool.query("DELETE FROM behavior_test"); + if (deleteResult.rowCount !== 2) issues.push(`DELETE rowCount=${deleteResult.rowCount}, expected 2`); + + await pool.query("DROP TABLE IF EXISTS behavior_test"); + + // Test 6: Empty result set + const emptyResult = await pool.query("SELECT * FROM test_users WHERE id = -1"); + if (emptyResult.rowCount !== 0) issues.push(`empty query rowCount=${emptyResult.rowCount}`); + if (emptyResult.rows.length !== 0) issues.push(`empty query rows.length=${emptyResult.rows.length}`); + + // Test 7: Multiple concurrent pool.query() calls + const [r1, r2, r3] = await Promise.all([ + pool.query("SELECT 1 as n"), + pool.query("SELECT 2 as n"), + pool.query("SELECT 3 as n"), + ]); + if (r1.rows[0]?.n !== 1 || r2.rows[0]?.n !== 2 || r3.rows[0]?.n !== 3) { + issues.push(`concurrent queries returned wrong values: ${r1.rows[0]?.n},${r2.rows[0]?.n},${r3.rows[0]?.n}`); + } + + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ + success: issues.length === 0, + issues, + mode: process.env.TUSK_DRIFT_MODE, + })); + return; + } + + // Event loop blocking test - measures event loop stalls caused by sync I/O + if (url === "/test/event-loop-blocking" && method === "GET") { + const stalls: number[] = []; + let measuring = true; + + // Monitor event loop blocking by checking setImmediate timing + const monitor = () => { + if (!measuring) return; + const start = Date.now(); + setImmediate(() => { + const delay = Date.now() - start; + if (delay > 10) stalls.push(delay); // record stalls > 10ms + if (measuring) monitor(); + }); + }; + monitor(); + + // Fire a burst of queries to generate many spans (simulates staging load) + const promises = Array.from({ length: 50 }, (_, i) => + pool.query("SELECT generate_series(1, 500) as id, repeat('x', 200) as data") + ); + await Promise.all(promises); + + // Wait for BatchSpanProcessor to flush (it fires every 2s) + await new Promise(r => setTimeout(r, 3000)); + + measuring = false; + + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ + mode: process.env.TUSK_DRIFT_MODE, + stallsOver10ms: stalls.length, + maxStallMs: stalls.length > 0 ? Math.max(...stalls) : 0, + stalls, + poolStats: { + totalCount: pool.totalCount, + idleCount: pool.idleCount, + waitingCount: pool.waitingCount, + }, + })); + return; + } + + // Pool stats endpoint for monitoring + if (url === "/test/pool-stats" && method === "GET") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ + smallPool: { + totalCount: smallPool.totalCount, + idleCount: smallPool.idleCount, + waitingCount: smallPool.waitingCount, + }, + mainPool: { + totalCount: pool.totalCount, + idleCount: pool.idleCount, + waitingCount: pool.waitingCount, + }, + })); + return; + } + // 404 for unknown routes res.writeHead(404, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Not found" })); @@ -320,6 +638,7 @@ async function shutdown() { try { await client.end(); await pool.end(); + await smallPool.end(); } catch (error) { console.error("Error during shutdown:", error); } From f49c17894d874e8d98a6eb1e1bea261f23ccbc1c Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Sat, 11 Apr 2026 00:32:11 -0700 Subject: [PATCH 2/2] remove investigation scripts and test endpoints from PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Keep only the core export pipeline fixes (TdSpanExporter, FilesystemSpanAdapter). The benchmark scripts and Greenboard-pattern test endpoints were investigation tools, not e2e tests — they aren't wired into test_requests.mjs and wouldn't run during record/replay. --- .../pg/e2e-tests/cjs-pg/run-export-bench.sh | 201 ----------- .../cjs-pg/src/export-pipeline-bench.ts | 105 ------ .../pg/e2e-tests/cjs-pg/src/index.ts | 319 ------------------ 3 files changed, 625 deletions(-) delete mode 100644 src/instrumentation/libraries/pg/e2e-tests/cjs-pg/run-export-bench.sh delete mode 100644 src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/export-pipeline-bench.ts diff --git a/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/run-export-bench.sh b/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/run-export-bench.sh deleted file mode 100644 index f44696d3..00000000 --- a/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/run-export-bench.sh +++ /dev/null @@ -1,201 +0,0 @@ -#!/bin/bash -set -e - -# Export Pipeline Benchmark Runner -# Tests event loop blocking under different fix configurations. -# -# Usage: ./run-export-bench.sh -# Requires: Docker, docker compose, SDK built (npx tsdown) - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -SDK_ROOT="$(cd "$SCRIPT_DIR/../../../../../.." && pwd)" - -GREEN='\033[0;32m' -BLUE='\033[0;34m' -YELLOW='\033[1;33m' -NC='\033[0m' - -log() { echo -e "${2:-$NC}$1${NC}"; } - -# Files we patch for each fix variant -EXPORTER="$SDK_ROOT/src/core/tracing/TdSpanExporter.ts" -FS_ADAPTER="$SDK_ROOT/src/core/tracing/adapters/FilesystemSpanAdapter.ts" - -# Save originals -cp "$EXPORTER" "$EXPORTER.orig" -cp "$FS_ADAPTER" "$FS_ADAPTER.orig" - -restore_originals() { - cp "$EXPORTER.orig" "$EXPORTER" - cp "$FS_ADAPTER.orig" "$FS_ADAPTER" - rm -f "$EXPORTER.orig" "$FS_ADAPTER.orig" -} -trap restore_originals EXIT - -rebuild_sdk() { - log " Rebuilding SDK..." "$YELLOW" - (cd "$SDK_ROOT" && npx tsdown 2>&1 | tail -1) -} - -run_bench() { - local label="$1" - log "Running: $label" "$BLUE" - - docker compose -f "$SCRIPT_DIR/docker-compose.yml" up -d postgres 2>&1 | tail -1 - - # Wait for postgres - docker compose -f "$SCRIPT_DIR/docker-compose.yml" run --rm --name pg-export-bench \ - --entrypoint "" \ - -e TUSK_DRIFT_MODE=RECORD \ - -e TUSK_LOG_LEVEL=error \ - app bash -c "npm install --silent 2>&1 && npx tsc 2>&1 && node dist/export-pipeline-bench.js 2>/dev/null" \ - 2>/dev/null -} - -# ============================================================ -log "============================================================" "$BLUE" -log "Export Pipeline Benchmark" "$BLUE" -log "============================================================" "$BLUE" -echo "" - -RESULTS_FILE=$(mktemp) - -# --- Baseline --- -log "=== BASELINE (current code) ===" "$BLUE" -restore_originals -rebuild_sdk -echo -n '{"variant":"baseline","data":' >> "$RESULTS_FILE" -run_bench "baseline" >> "$RESULTS_FILE" -echo '}' >> "$RESULTS_FILE" -echo "" - -# --- Fix A: Chunked transform --- -log "=== FIX A: Chunked transformSpanToCleanJSON ===" "$BLUE" -restore_originals - -# Apply Fix A: replace the synchronous .map() with chunked async processing -cat > /tmp/fix-a.py << 'PYEOF' -import re, sys -content = open(sys.argv[1]).read() - -old = """ // Transform spans to CleanSpanData - const cleanSpans: CleanSpanData[] = filteredBlockedSpans.map((span) => - SpanTransformer.transformSpanToCleanJSON(span, this.environment), - );""" - -new = """ // Transform spans to CleanSpanData (chunked to avoid blocking event loop) - const CHUNK_SIZE = 20; - const cleanSpans: CleanSpanData[] = []; - for (let _i = 0; _i < filteredBlockedSpans.length; _i += CHUNK_SIZE) { - const chunk = filteredBlockedSpans.slice(_i, _i + CHUNK_SIZE); - cleanSpans.push(...chunk.map((span) => - SpanTransformer.transformSpanToCleanJSON(span, this.environment), - )); - if (_i + CHUNK_SIZE < filteredBlockedSpans.length) { - await new Promise((resolve) => setImmediate(resolve)); - } - }""" - -if old not in content: - print("ERROR: could not find target in TdSpanExporter.ts", file=sys.stderr) - sys.exit(1) -content = content.replace(old, new) - -# Make export() async -content = content.replace( - "export(spans: ReadableSpan[], resultCallback: (result: ExportResult) => void): void {", - "export(spans: ReadableSpan[], resultCallback: (result: ExportResult) => void): void {\n this._exportAsync(spans, resultCallback);\n }\n\n private async _exportAsync(spans: ReadableSpan[], resultCallback: (result: ExportResult) => void): Promise {" -) - -# Remove the duplicate mode check since _exportAsync handles it -# Actually, we need to be careful here. Let's just wrap the whole body. -# The simplest approach: make the method async by extracting to a helper. - -open(sys.argv[1], 'w').write(content) -print("Fix A applied") -PYEOF -python3 /tmp/fix-a.py "$EXPORTER" -rebuild_sdk -echo -n ',{"variant":"fix-a-chunked-transform","data":' >> "$RESULTS_FILE" -run_bench "fix-a" >> "$RESULTS_FILE" -echo '}' >> "$RESULTS_FILE" -echo "" - -# --- Fix B: Async filesystem --- -log "=== FIX B: Async FilesystemSpanAdapter ===" "$BLUE" -restore_originals - -# Apply Fix B: replace appendFileSync with appendFile -cat > /tmp/fix-b.py << 'PYEOF' -import sys -content = open(sys.argv[1]).read() - -content = content.replace( - 'import * as fs from "fs";', - 'import * as fs from "fs";\nimport * as fsPromises from "fs/promises";' -) -content = content.replace( - "fs.appendFileSync(filePath, jsonLine, \"utf8\");", - "await fsPromises.appendFile(filePath, jsonLine, \"utf8\");" -) -content = content.replace( - "async exportSpans(spans: CleanSpanData[]): Promise {", - "async exportSpans(spans: CleanSpanData[]): Promise {" -) - -open(sys.argv[1], 'w').write(content) -print("Fix B applied") -PYEOF -python3 /tmp/fix-b.py "$FS_ADAPTER" -rebuild_sdk -echo -n ',{"variant":"fix-b-async-fs","data":' >> "$RESULTS_FILE" -run_bench "fix-b" >> "$RESULTS_FILE" -echo '}' >> "$RESULTS_FILE" -echo "" - -# --- Fix A+B: Both --- -log "=== FIX A+B: Chunked transform + Async filesystem ===" "$BLUE" -restore_originals -python3 /tmp/fix-a.py "$EXPORTER" -python3 /tmp/fix-b.py "$FS_ADAPTER" -rebuild_sdk -echo -n ',{"variant":"fix-ab-both","data":' >> "$RESULTS_FILE" -run_bench "fix-a+b" >> "$RESULTS_FILE" -echo '}' >> "$RESULTS_FILE" -echo "" - -# --- Print comparison --- -log "============================================================" "$BLUE" -log "RESULTS COMPARISON" "$BLUE" -log "============================================================" "$BLUE" - -python3 << PYEOF -import json - -with open("$RESULTS_FILE") as f: - raw = '[' + f.read().strip() + ']' - # Fix trailing commas between objects - raw = raw.replace('}{', '},{').replace('}\n{', '},\n{') - -results = json.loads(raw) - -print(f"{'Variant':<35} {'Max Stall':>10} {'Total Stall':>12} {'Stall Count':>12} {'Req Time':>10}") -print("-" * 85) - -for r in results: - v = r['variant'] - d = r['data'] - print(f"{v:<35} {d['maxStallMs']:>8}ms {d['totalStallMs']:>10}ms {d['stallCount']:>12} {d['requestDurationMs']:>8}ms") - -print() -baseline = results[0]['data'] -for r in results[1:]: - d = r['data'] - if baseline['maxStallMs'] > 0: - improvement = (1 - d['maxStallMs'] / baseline['maxStallMs']) * 100 - print(f"{r['variant']}: max stall {improvement:+.0f}% vs baseline") -PYEOF - -# Cleanup -docker compose -f "$SCRIPT_DIR/docker-compose.yml" down 2>/dev/null || true -rm -f "$RESULTS_FILE" /tmp/fix-a.py /tmp/fix-b.py diff --git a/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/export-pipeline-bench.ts b/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/export-pipeline-bench.ts deleted file mode 100644 index 055e0741..00000000 --- a/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/export-pipeline-bench.ts +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Benchmark: Measures event loop blocking caused by the span export pipeline. - * - * Fires a burst of HTTP requests that generate pg spans, then monitors - * event loop stalls during the BatchSpanProcessor export cycle. - * - * Usage (inside Docker): - * node dist/export-pipeline-bench.js - * - * Output: JSON with stall measurements for each export cycle. - */ -import { TuskDrift } from "./tdInit"; -import http from "http"; -import { Pool } from "pg"; - -const CONCURRENT_REQUESTS = 10; -const QUERIES_PER_REQUEST = 20; -const ROWS_PER_QUERY = 200; -const ROW_PAYLOAD_SIZE = 100; // chars per row -const EXPORT_WAIT_MS = 6000; // wait for export batches to fire -const STALL_THRESHOLD_MS = 5; // report stalls above this - -const dbConfig = { - host: process.env.POSTGRES_HOST || "postgres", - port: parseInt(process.env.POSTGRES_PORT || "5432"), - database: process.env.POSTGRES_DB || "testdb", - user: process.env.POSTGRES_USER || "testuser", - password: process.env.POSTGRES_PASSWORD || "testpass", - max: 10, -}; - -async function run() { - const pool = new Pool(dbConfig); - await pool.query("CREATE TABLE IF NOT EXISTS bench_t (id serial, val text)"); - TuskDrift.markAppAsReady(); - - // --- Event loop stall monitor --- - const stalls: number[] = []; - let monitoring = true; - const monitorLoop = () => { - if (!monitoring) return; - const t = Date.now(); - setImmediate(() => { - const delay = Date.now() - t; - if (delay > STALL_THRESHOLD_MS) stalls.push(delay); - if (monitoring) monitorLoop(); - }); - }; - - // --- HTTP server that generates pg spans --- - const query = `SELECT generate_series(1, ${ROWS_PER_QUERY}) as id, repeat(chr(65), ${ROW_PAYLOAD_SIZE}) as data`; - - const server = http.createServer(async (_req, res) => { - const promises = Array.from({ length: QUERIES_PER_REQUEST }, () => - pool.query(query) - ); - await Promise.all(promises); - res.end("ok"); - }); - - await new Promise((resolve) => server.listen(3000, resolve)); - - // --- Run benchmark --- - // Phase 1: generate spans - monitorLoop(); - const reqStart = Date.now(); - await Promise.all( - Array.from({ length: CONCURRENT_REQUESTS }, () => - fetch("http://localhost:3000/") - ) - ); - const reqDuration = Date.now() - reqStart; - - // Phase 2: wait for export batches to fire - await new Promise((r) => setTimeout(r, EXPORT_WAIT_MS)); - monitoring = false; - - // --- Report --- - const result = { - mode: process.env.TUSK_DRIFT_MODE || "DISABLED", - config: { - concurrentRequests: CONCURRENT_REQUESTS, - queriesPerRequest: QUERIES_PER_REQUEST, - rowsPerQuery: ROWS_PER_QUERY, - rowPayloadSize: ROW_PAYLOAD_SIZE, - }, - requestDurationMs: reqDuration, - stallCount: stalls.length, - maxStallMs: stalls.length > 0 ? Math.max(...stalls) : 0, - p99StallMs: stalls.length > 0 ? stalls.sort((a, b) => a - b)[Math.floor(stalls.length * 0.99)] : 0, - totalStallMs: stalls.reduce((a, b) => a + b, 0), - stalls, - }; - - console.log(JSON.stringify(result)); - - await pool.end(); - server.close(); - process.exit(0); -} - -run().catch((e) => { - console.error(e); - process.exit(1); -}); diff --git a/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/index.ts b/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/index.ts index 299fbf2d..1d532cae 100644 --- a/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/index.ts +++ b/src/instrumentation/libraries/pg/e2e-tests/cjs-pg/src/index.ts @@ -16,16 +16,6 @@ const dbConfig = { connectionTimeoutMillis: 2000, }; -// Small pool to reproduce Greenboard's pooling issue -const smallPoolConfig = { - ...dbConfig, - max: 5, - idleTimeoutMillis: 30000, - connectionTimeoutMillis: 5000, -}; - -let smallPool: Pool; - let client: Client; let pool: Pool; @@ -39,9 +29,6 @@ async function initializeDatabase() { // Initialize pool pool = new Pool(dbConfig); - // Initialize small pool for Greenboard-style stress testing - smallPool = new Pool(smallPoolConfig); - // Create test tables await client.query(` CREATE TABLE IF NOT EXISTS test_users ( @@ -299,311 +286,6 @@ const server = http.createServer(async (req, res) => { return; } - // ============================================= - // Greenboard-style pool stress tests - // ============================================= - - // Replicates Greenboard's getPool() health-check pattern: - // Every query first does pool.connect() → query('SELECT 1') → release() - // to validate the pool, then does the actual query. - if (url === "/test/greenboard-health-check-query" && method === "GET") { - // Step 1: Health check (matches Greenboard's getPool()) - const healthClient = await smallPool.connect(); - await healthClient.query('SELECT 1'); - healthClient.release(); - - // Step 2: Actual query (matches Greenboard's executeQueryPG()) - const poolClient = await smallPool.connect(); - try { - const results = await poolClient.query("SELECT * FROM test_users ORDER BY id LIMIT 5"); - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ - success: true, - data: results.rows, - poolStats: { - totalCount: smallPool.totalCount, - idleCount: smallPool.idleCount, - waitingCount: smallPool.waitingCount, - }, - })); - } finally { - poolClient.release(); - } - return; - } - - // Concurrent pool stress test - fires N requests simultaneously - // to reproduce pool exhaustion under load - if (url === "/test/greenboard-concurrent-stress" && method === "GET") { - const concurrency = 10; // more than max pool size of 5 - const results: any[] = []; - const errors: any[] = []; - - const executeOneQuery = async (i: number) => { - // Greenboard's getPool() health check - const healthClient = await smallPool.connect(); - await healthClient.query('SELECT 1'); - healthClient.release(); - - // Greenboard's executeQueryPG() - const poolClient = await smallPool.connect(); - try { - const result = await poolClient.query( - "SELECT $1::int as query_num, pg_sleep(0.1)", [i] - ); - return { success: true, queryNum: i, rows: result.rows }; - } finally { - poolClient.release(); - } - }; - - // Fire all queries concurrently - const promises = Array.from({ length: concurrency }, (_, i) => - executeOneQuery(i) - .then(r => results.push(r)) - .catch(e => errors.push({ queryNum: i, error: e.message })) - ); - - await Promise.all(promises); - - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ - success: errors.length === 0, - totalQueries: concurrency, - successCount: results.length, - errorCount: errors.length, - errors, - poolStats: { - totalCount: smallPool.totalCount, - idleCount: smallPool.idleCount, - waitingCount: smallPool.waitingCount, - }, - })); - return; - } - - // Transaction test matching Greenboard's executeQueriesInTransactionPG - if (url === "/test/greenboard-transaction" && method === "GET") { - // Health check first - const healthClient = await smallPool.connect(); - await healthClient.query('SELECT 1'); - healthClient.release(); - - // Transaction - const txClient = await smallPool.connect(); - try { - await txClient.query("BEGIN"); - await txClient.query( - "INSERT INTO test_users (name, email) VALUES ($1, $2)", - [`TxUser ${Date.now()}`, `tx${Date.now()}@example.com`] - ); - const result = await txClient.query("SELECT COUNT(*) as total FROM test_users"); - await txClient.query("ROLLBACK"); - - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ - success: true, - data: result.rows, - poolStats: { - totalCount: smallPool.totalCount, - idleCount: smallPool.idleCount, - waitingCount: smallPool.waitingCount, - }, - })); - } catch (error) { - try { await txClient.query("ROLLBACK"); } catch {} - throw error; - } finally { - txClient.release(); - } - return; - } - - // Large result set test - reproduces the likely root cause of Greenboard's pool issue - // The SDK serializes ALL rows in _addOutputAttributesToSpan, which blocks the event loop - // while the pool connection is still held, causing pool exhaustion under load. - if (url === "/test/greenboard-large-result-stress" && method === "GET") { - const concurrency = 8; // more than pool max of 5 - const errors: any[] = []; - const results: any[] = []; - - const executeOneQuery = async (i: number) => { - // Greenboard health check pattern - const healthClient = await smallPool.connect(); - await healthClient.query('SELECT 1'); - healthClient.release(); - - // Simulate a query returning many rows (like Greenboard's compliance data) - const poolClient = await smallPool.connect(); - try { - const result = await poolClient.query( - `SELECT generate_series(1, 1000) as id, - repeat('data-payload-', 10) as payload, - now() as created_at` - ); - return { success: true, queryNum: i, rowCount: result.rowCount }; - } finally { - poolClient.release(); - } - }; - - const promises = Array.from({ length: concurrency }, (_, i) => - executeOneQuery(i) - .then(r => results.push(r)) - .catch(e => errors.push({ queryNum: i, error: e.message })) - ); - - await Promise.all(promises); - - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ - success: errors.length === 0, - totalQueries: concurrency, - successCount: results.length, - errorCount: errors.length, - errors, - poolStats: { - totalCount: smallPool.totalCount, - idleCount: smallPool.idleCount, - waitingCount: smallPool.waitingCount, - }, - })); - return; - } - - // Behavioral correctness test - checks that SDK doesn't change query return values - if (url === "/test/behavioral-correctness" && method === "GET") { - const issues: string[] = []; - - // Test 1: pool.query() should return full Result object with .rows, .rowCount, .command, .fields - const poolResult = await pool.query("SELECT 1 as num, 'hello' as greeting"); - if (!poolResult.rows) issues.push("pool.query missing .rows"); - if (poolResult.rowCount !== 1) issues.push(`pool.query .rowCount=${poolResult.rowCount}, expected 1`); - if (poolResult.command !== "SELECT") issues.push(`pool.query .command=${poolResult.command}, expected SELECT`); - if (!poolResult.fields || poolResult.fields.length !== 2) issues.push(`pool.query .fields length=${poolResult.fields?.length}, expected 2`); - if (poolResult.rows[0]?.num !== 1) issues.push(`pool.query rows[0].num=${poolResult.rows[0]?.num}, expected 1`); - - // Test 2: pool.connect() → client.query() should return full Result - const poolClient = await pool.connect(); - try { - const clientResult = await poolClient.query("SELECT 42 as answer"); - if (!clientResult.rows) issues.push("client.query missing .rows"); - if (clientResult.rowCount !== 1) issues.push(`client.query .rowCount=${clientResult.rowCount}`); - if (clientResult.rows[0]?.answer !== 42) issues.push(`client.query rows[0].answer=${clientResult.rows[0]?.answer}`); - - // Test 3: Transaction queries should work - await poolClient.query("BEGIN"); - const txResult = await poolClient.query("SELECT COUNT(*) as total FROM test_users"); - await poolClient.query("ROLLBACK"); - if (!txResult.rows) issues.push("tx query missing .rows"); - if (parseInt(txResult.rows[0]?.total) < 1) issues.push(`tx query total=${txResult.rows[0]?.total}`); - } finally { - poolClient.release(); - } - - // Test 4: Parameterized query - const paramResult = await pool.query("SELECT $1::int + $2::int as sum", [10, 20]); - if (paramResult.rows[0]?.sum !== 30) issues.push(`param query sum=${paramResult.rows[0]?.sum}, expected 30`); - - // Test 5: INSERT/UPDATE/DELETE returns correct rowCount - await pool.query("CREATE TABLE IF NOT EXISTS behavior_test (id serial, val text)"); - const insertResult = await pool.query("INSERT INTO behavior_test (val) VALUES ('test1'), ('test2')"); - if (insertResult.rowCount !== 2) issues.push(`INSERT rowCount=${insertResult.rowCount}, expected 2`); - if (insertResult.command !== "INSERT") issues.push(`INSERT command=${insertResult.command}`); - - const updateResult = await pool.query("UPDATE behavior_test SET val = 'updated' WHERE val = 'test1'"); - if (updateResult.rowCount !== 1) issues.push(`UPDATE rowCount=${updateResult.rowCount}, expected 1`); - - const deleteResult = await pool.query("DELETE FROM behavior_test"); - if (deleteResult.rowCount !== 2) issues.push(`DELETE rowCount=${deleteResult.rowCount}, expected 2`); - - await pool.query("DROP TABLE IF EXISTS behavior_test"); - - // Test 6: Empty result set - const emptyResult = await pool.query("SELECT * FROM test_users WHERE id = -1"); - if (emptyResult.rowCount !== 0) issues.push(`empty query rowCount=${emptyResult.rowCount}`); - if (emptyResult.rows.length !== 0) issues.push(`empty query rows.length=${emptyResult.rows.length}`); - - // Test 7: Multiple concurrent pool.query() calls - const [r1, r2, r3] = await Promise.all([ - pool.query("SELECT 1 as n"), - pool.query("SELECT 2 as n"), - pool.query("SELECT 3 as n"), - ]); - if (r1.rows[0]?.n !== 1 || r2.rows[0]?.n !== 2 || r3.rows[0]?.n !== 3) { - issues.push(`concurrent queries returned wrong values: ${r1.rows[0]?.n},${r2.rows[0]?.n},${r3.rows[0]?.n}`); - } - - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ - success: issues.length === 0, - issues, - mode: process.env.TUSK_DRIFT_MODE, - })); - return; - } - - // Event loop blocking test - measures event loop stalls caused by sync I/O - if (url === "/test/event-loop-blocking" && method === "GET") { - const stalls: number[] = []; - let measuring = true; - - // Monitor event loop blocking by checking setImmediate timing - const monitor = () => { - if (!measuring) return; - const start = Date.now(); - setImmediate(() => { - const delay = Date.now() - start; - if (delay > 10) stalls.push(delay); // record stalls > 10ms - if (measuring) monitor(); - }); - }; - monitor(); - - // Fire a burst of queries to generate many spans (simulates staging load) - const promises = Array.from({ length: 50 }, (_, i) => - pool.query("SELECT generate_series(1, 500) as id, repeat('x', 200) as data") - ); - await Promise.all(promises); - - // Wait for BatchSpanProcessor to flush (it fires every 2s) - await new Promise(r => setTimeout(r, 3000)); - - measuring = false; - - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ - mode: process.env.TUSK_DRIFT_MODE, - stallsOver10ms: stalls.length, - maxStallMs: stalls.length > 0 ? Math.max(...stalls) : 0, - stalls, - poolStats: { - totalCount: pool.totalCount, - idleCount: pool.idleCount, - waitingCount: pool.waitingCount, - }, - })); - return; - } - - // Pool stats endpoint for monitoring - if (url === "/test/pool-stats" && method === "GET") { - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ - smallPool: { - totalCount: smallPool.totalCount, - idleCount: smallPool.idleCount, - waitingCount: smallPool.waitingCount, - }, - mainPool: { - totalCount: pool.totalCount, - idleCount: pool.idleCount, - waitingCount: pool.waitingCount, - }, - })); - return; - } - // 404 for unknown routes res.writeHead(404, { "Content-Type": "application/json" }); res.end(JSON.stringify({ error: "Not found" })); @@ -638,7 +320,6 @@ async function shutdown() { try { await client.end(); await pool.end(); - await smallPool.end(); } catch (error) { console.error("Error during shutdown:", error); }