Skip to content

Commit 87a6831

Browse files
authored
feat(telemetry): add streaming JSON export writer (#960)
Add writeJsonArrayExport that streams telemetry events to disk as a JSON array via stream/promises pipeline, so memory stays flat regardless of maxTotalBytes (default 100 MB, no enforced ceiling). Extract src/util/fs.ts with tempFilePath, renameWithRetry, and a new writeAtomically(path, write, onCleanupError) helper covering the temp-file + rename + best-effort cleanup pattern previously duplicated in the telemetry writer and CliCredentialManager.atomicWriteFile. SSH config / support bundle keep their bespoke flows and just update imports. Refs #903
1 parent 76ccd6a commit 87a6831

15 files changed

Lines changed: 445 additions & 195 deletions

File tree

src/core/cliCredentialManager.ts

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ import { isAbortError } from "../error/errorUtils";
99
import { featureSetForVersion } from "../featureSet";
1010
import { isKeyringEnabled } from "../settings/cli";
1111
import { getHeaderArgs } from "../settings/headers";
12-
import { renameWithRetry, tempFilePath, toSafeHost } from "../util";
12+
import { toSafeHost } from "../util";
13+
import { writeAtomically } from "../util/fs";
1314

1415
import { version } from "./cliExec";
1516

@@ -259,23 +260,17 @@ export class CliCredentialManager {
259260
}
260261
}
261262

262-
/**
263-
* Atomically write content to a file via temp-file + rename.
264-
*/
263+
/** Atomically write content to a file. */
265264
private async atomicWriteFile(
266265
filePath: string,
267266
content: string,
268267
): Promise<void> {
269268
await fs.mkdir(path.dirname(filePath), { recursive: true });
270-
const tempPath = tempFilePath(filePath, "temp");
271-
try {
272-
await fs.writeFile(tempPath, content, { mode: 0o600 });
273-
await renameWithRetry(fs.rename, tempPath, filePath);
274-
} catch (err) {
275-
await fs.rm(tempPath, { force: true }).catch((rmErr) => {
276-
this.logger.warn("Failed to delete temp file", tempPath, rmErr);
277-
});
278-
throw err;
279-
}
269+
await writeAtomically(
270+
filePath,
271+
(tempPath) => fs.writeFile(tempPath, content, { mode: 0o600 }),
272+
(rmErr, tempPath) =>
273+
this.logger.warn("Failed to delete temp file", tempPath, rmErr),
274+
);
280275
}
281276
}

src/core/cliManager.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import { errToStr } from "../api/api-helper";
1313
import * as pgp from "../pgp";
1414
import { withCancellableProgress, withOptionalProgress } from "../progress";
1515
import { isKeyringEnabled } from "../settings/cli";
16-
import { tempFilePath, toSafeHost } from "../util";
16+
import { toSafeHost } from "../util";
17+
import { tempFilePath } from "../util/fs";
1718
import { vscodeProposed } from "../vscodeProposed";
1819

1920
import { BinaryLock } from "./binaryLock";

src/core/supportBundleLogs.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import * as path from "node:path";
44
import { promisify } from "node:util";
55

66
import { type Logger } from "../logging/logger";
7-
import { renameWithRetry } from "../util";
7+
import { renameWithRetry } from "../util/fs";
88

99
export interface LogSources {
1010
remoteSshLogPath?: string;

src/remote/sshConfig.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import {
88
} from "node:fs/promises";
99
import path from "node:path";
1010

11-
import { countSubstring, renameWithRetry, tempFilePath } from "../util";
11+
import { countSubstring } from "../util";
12+
import { renameWithRetry, tempFilePath } from "../util/fs";
1213

1314
import type { Logger } from "../logging/logger";
1415

src/telemetry/export/range.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,21 @@ export function validateUtcDateInput(value: string): string | undefined {
102102
: "Enter a valid calendar date.";
103103
}
104104

105+
/** Parses a telemetry ISO timestamp to epoch ms, throwing on unparseable input. */
106+
export function parseTelemetryTimestampMs(timestamp: string): number {
107+
const ms = Date.parse(timestamp);
108+
if (!Number.isFinite(ms)) {
109+
throw new Error(`Invalid telemetry timestamp '${timestamp}'.`);
110+
}
111+
return ms;
112+
}
113+
105114
/** True if the ISO `timestamp` falls inside the range. */
106115
export function isTimestampInRange(
107116
timestamp: string,
108117
range: TelemetryDateRange,
109118
): boolean {
110-
const ms = Date.parse(timestamp);
111-
if (!Number.isFinite(ms)) {
112-
throw new Error(`Invalid telemetry timestamp '${timestamp}'.`);
113-
}
119+
const ms = parseTelemetryTimestampMs(timestamp);
114120
return (
115121
(range.startMs === undefined || ms >= range.startMs) &&
116122
(range.endMs === undefined || ms < range.endMs)
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import { createWriteStream } from "node:fs";
2+
import { Readable } from "node:stream";
3+
import { pipeline } from "node:stream/promises";
4+
5+
import { writeAtomically } from "../../../util/fs";
6+
import { serializeTelemetryEvent } from "../../wireFormat";
7+
8+
import type { TelemetryEvent } from "../../event";
9+
10+
/**
11+
* Streams `events` as a JSON array to `outputPath` via a temp file and
12+
* atomic rename. Returns the number of events written. `onCleanupError`
13+
* is invoked if removing the temp file after a failed write itself fails
14+
* (typically a Windows lock); callers are expected to log it.
15+
*/
16+
export async function writeJsonArrayExport(
17+
outputPath: string,
18+
events: AsyncIterable<TelemetryEvent>,
19+
onCleanupError: (err: unknown, tempPath: string) => void,
20+
): Promise<number> {
21+
let count = 0;
22+
async function* chunks(): AsyncGenerator<string> {
23+
yield "[";
24+
for await (const event of events) {
25+
yield (count === 0 ? "\n" : ",\n") +
26+
JSON.stringify(serializeTelemetryEvent(event));
27+
count += 1;
28+
}
29+
yield count === 0 ? "]\n" : "\n]\n";
30+
}
31+
await writeAtomically(
32+
outputPath,
33+
async (tempPath) => {
34+
await pipeline(
35+
Readable.from(chunks()),
36+
createWriteStream(tempPath, { encoding: "utf8" }),
37+
);
38+
},
39+
onCleanupError,
40+
);
41+
return count;
42+
}

src/util.ts

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -164,51 +164,6 @@ export function countSubstring(needle: string, haystack: string): number {
164164
return count;
165165
}
166166

167-
const transientRenameCodes: ReadonlySet<string> = new Set([
168-
"EPERM",
169-
"EACCES",
170-
"EBUSY",
171-
]);
172-
173-
/**
174-
* Rename with retry for transient Windows filesystem errors (EPERM, EACCES,
175-
* EBUSY). On Windows, antivirus, Search Indexer, cloud sync, or concurrent
176-
* processes can briefly lock files causing renames to fail.
177-
*
178-
* On non-Windows platforms, calls renameFn directly with no retry.
179-
*
180-
* Matches the strategy used by VS Code (pfs.ts) and graceful-fs: 60s
181-
* wall-clock timeout with linear backoff (10ms increments) capped at 100ms.
182-
*/
183-
export async function renameWithRetry(
184-
renameFn: (src: string, dest: string) => Promise<void>,
185-
source: string,
186-
destination: string,
187-
timeoutMs = 60_000,
188-
delayCapMs = 100,
189-
): Promise<void> {
190-
if (process.platform !== "win32") {
191-
return renameFn(source, destination);
192-
}
193-
const startTime = Date.now();
194-
for (let attempt = 1; ; attempt++) {
195-
try {
196-
return await renameFn(source, destination);
197-
} catch (err) {
198-
const code = (err as NodeJS.ErrnoException).code;
199-
if (
200-
!code ||
201-
!transientRenameCodes.has(code) ||
202-
Date.now() - startTime >= timeoutMs
203-
) {
204-
throw err;
205-
}
206-
const delay = Math.min(delayCapMs, attempt * 10);
207-
await new Promise((resolve) => setTimeout(resolve, delay));
208-
}
209-
}
210-
}
211-
212167
/**
213168
* Wraps `arg` in `"..."` unless every character is in the shell-safe
214169
* whitelist (matching Python `shlex.quote`'s set: alphanumerics plus
@@ -240,12 +195,3 @@ export function escapeShellArg(arg: string): string {
240195
}
241196
return `'${arg.replace(/'/g, "'\\''")}'`;
242197
}
243-
244-
/**
245-
* Generate a temporary file path by appending a suffix with a random component.
246-
* The suffix describes the purpose of the temp file (e.g. "temp", "old").
247-
* Example: tempFilePath("/a/b", "temp") → "/a/b.temp-k7x3f9qw"
248-
*/
249-
export function tempFilePath(basePath: string, suffix: string): string {
250-
return `${basePath}.${suffix}-${crypto.randomUUID().substring(0, 8)}`;
251-
}

src/util/fs.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import * as fs from "node:fs/promises";
2+
3+
const transientRenameCodes: ReadonlySet<string> = new Set([
4+
"EPERM",
5+
"EACCES",
6+
"EBUSY",
7+
]);
8+
9+
/**
10+
* Rename with retry for transient Windows filesystem errors (EPERM, EACCES,
11+
* EBUSY). On Windows, antivirus, Search Indexer, cloud sync, or concurrent
12+
* processes can briefly lock files causing renames to fail.
13+
*
14+
* On non-Windows platforms, calls renameFn directly with no retry.
15+
*
16+
* Matches the strategy used by VS Code (pfs.ts) and graceful-fs: 60s
17+
* wall-clock timeout with linear backoff (10ms increments) capped at 100ms.
18+
*/
19+
export async function renameWithRetry(
20+
renameFn: (src: string, dest: string) => Promise<void>,
21+
source: string,
22+
destination: string,
23+
timeoutMs = 60_000,
24+
delayCapMs = 100,
25+
): Promise<void> {
26+
if (process.platform !== "win32") {
27+
return renameFn(source, destination);
28+
}
29+
const startTime = Date.now();
30+
for (let attempt = 1; ; attempt++) {
31+
try {
32+
return await renameFn(source, destination);
33+
} catch (err) {
34+
const code = (err as NodeJS.ErrnoException).code;
35+
if (
36+
!code ||
37+
!transientRenameCodes.has(code) ||
38+
Date.now() - startTime >= timeoutMs
39+
) {
40+
throw err;
41+
}
42+
const delay = Math.min(delayCapMs, attempt * 10);
43+
await new Promise((resolve) => setTimeout(resolve, delay));
44+
}
45+
}
46+
}
47+
48+
/**
49+
* Generate a temporary file path by appending a suffix with a random component.
50+
* The suffix describes the purpose of the temp file (e.g. "temp", "old").
51+
* Example: tempFilePath("/a/b", "temp") → "/a/b.temp-k7x3f9qw"
52+
*/
53+
export function tempFilePath(basePath: string, suffix: string): string {
54+
return `${basePath}.${suffix}-${crypto.randomUUID().substring(0, 8)}`;
55+
}
56+
57+
/**
58+
* Atomically writes to `outputPath` via a sibling temp file and rename.
59+
* The parent directory must already exist. On failure the destination is
60+
* left untouched, the temp file is best-effort removed, and the writer
61+
* error is always rethrown. `onCleanupError` receives any error from the
62+
* cleanup attempt; its own throws are swallowed.
63+
*/
64+
export async function writeAtomically<T>(
65+
outputPath: string,
66+
write: (tempPath: string) => Promise<T>,
67+
onCleanupError: (err: unknown, tempPath: string) => void,
68+
): Promise<T> {
69+
const tempPath = tempFilePath(outputPath, "temp");
70+
try {
71+
const result = await write(tempPath);
72+
await renameWithRetry(fs.rename, tempPath, outputPath);
73+
return result;
74+
} catch (err) {
75+
try {
76+
await fs.rm(tempPath, { force: true }).catch((rmErr) => {
77+
onCleanupError(rmErr, tempPath);
78+
});
79+
} catch {
80+
// onCleanupError threw; the writer error below takes precedence.
81+
}
82+
throw err;
83+
}
84+
}

test/mocks/asyncIterable.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/**
2+
* Wraps a sync array as an `AsyncIterable` that yields one item per microtask,
3+
* so consumers exercise the same async iteration path they would in production.
4+
*/
5+
export async function* asyncIterable<T>(
6+
values: readonly T[],
7+
): AsyncGenerator<T> {
8+
for (const value of values) {
9+
await Promise.resolve();
10+
yield value;
11+
}
12+
}

test/unit/core/supportBundleLogs.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ import * as path from "node:path";
55
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
66

77
import { appendVsCodeLogs } from "@/core/supportBundleLogs";
8-
import { renameWithRetry } from "@/util";
8+
import { renameWithRetry } from "@/util/fs";
99

1010
import { createMockLogger } from "../../mocks/testHelpers";
1111

1212
// Wrap renameWithRetry so individual tests can override it via
1313
// mockRejectedValueOnce; by default it calls through to the real impl.
14-
vi.mock("@/util", async () => {
15-
const actual = await vi.importActual<typeof import("@/util")>("@/util");
14+
vi.mock("@/util/fs", async () => {
15+
const actual = await vi.importActual<typeof import("@/util/fs")>("@/util/fs");
1616
return { ...actual, renameWithRetry: vi.fn(actual.renameWithRetry) };
1717
});
1818

0 commit comments

Comments
 (0)