Skip to content

Commit 7f993e4

Browse files
authored
[MongoDB] Raw buffers (#598)
* New API for converting mongodb Document -> SqliteRow. * Raw change stream document parsing. * Add benchmark script. * Minor optimizations / refactoring. * Disable utf8 validation in some cases. * Fix test. * Use rawToSqliteRow everywhere. This uses now uses a custom _id parser for snapshots. * Simplify & clarify the _id parsing implementation.
1 parent cdb8993 commit 7f993e4

File tree

10 files changed

+682
-82
lines changed

10 files changed

+682
-82
lines changed
Lines changed: 370 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,370 @@
1+
import * as bson from 'bson';
2+
import { performance } from 'node:perf_hooks';
3+
import { parseChangeDocument } from '../dist/index.js';
4+
5+
// This is a synthetic benchmark to test performance of parseChangeDocument
6+
// versus the normal bson.deserialize().
7+
// Primarily AI-generated.
8+
9+
const BSON_OPTIONS = { useBigInt64: true } as const;
10+
const OPERATION_TYPES = ['insert', 'update'] as const;
11+
const SIZE_TARGETS = [
12+
{ label: '1 KB', bytes: 1_024 },
13+
{ label: '10 KB', bytes: 10_240 },
14+
{ label: '100 KB', bytes: 102_400 }
15+
] as const;
16+
17+
type OperationType = (typeof OPERATION_TYPES)[number];
18+
19+
type FullDocument = {
20+
_id: bson.ObjectId;
21+
checksum: number;
22+
operationType: OperationType;
23+
tenantId: string;
24+
version: number;
25+
createdAt: Date;
26+
updatedAt: Date;
27+
flags: {
28+
active: boolean;
29+
archived: boolean;
30+
source: string;
31+
};
32+
metrics: {
33+
itemCount: number;
34+
ratio: number;
35+
};
36+
tags: string[];
37+
nested: {
38+
owner: {
39+
id: string;
40+
region: string;
41+
};
42+
counters: number[];
43+
changedFields: string[];
44+
};
45+
payload: string;
46+
};
47+
48+
type UpdateDescription = {
49+
updatedFields: {
50+
payload: string;
51+
metrics: FullDocument['metrics'];
52+
updatedAt: Date;
53+
version: number;
54+
};
55+
removedFields: string[];
56+
truncatedArrays: never[];
57+
};
58+
59+
type ChangeDocument = {
60+
_id: {
61+
_data: string;
62+
};
63+
operationType: OperationType;
64+
wallTime: Date;
65+
ns: {
66+
db: string;
67+
coll: string;
68+
};
69+
lsid: {
70+
id: bson.Binary;
71+
};
72+
txnNumber: bson.Long;
73+
documentKey: {
74+
_id: bson.ObjectId;
75+
};
76+
fullDocument: FullDocument;
77+
updateDescription?: UpdateDescription;
78+
};
79+
80+
type Benchmark = {
81+
label: string;
82+
run: (buffer: Buffer) => number;
83+
};
84+
85+
type Scenario = {
86+
label: string;
87+
operationType: OperationType;
88+
targetBytes: number;
89+
fullDocumentBytes: number;
90+
eventBytes: number;
91+
buffer: Buffer;
92+
};
93+
94+
type BenchmarkResult = {
95+
elapsedMs: number;
96+
opsPerSecond: number;
97+
mibPerSecond: number;
98+
};
99+
100+
const BENCHMARKS: readonly Benchmark[] = [
101+
{
102+
label: 'parseChangeDocument',
103+
run: (buffer: Buffer) => {
104+
const change = parseChangeDocument(buffer);
105+
if (!('fullDocument' in change)) {
106+
throw new Error('Unsupported change type: ' + change.operationType);
107+
}
108+
return change.fullDocument?.byteLength ?? 0;
109+
}
110+
},
111+
{
112+
label: 'parseChangeDocument + fullDocument',
113+
run: (buffer: Buffer) => {
114+
const change = parseChangeDocument(buffer);
115+
if (!('fullDocument' in change)) {
116+
throw new Error('Unsupported change type: ' + change.operationType);
117+
}
118+
if (change.fullDocument == null) {
119+
throw new Error('Expected fullDocument to be present');
120+
}
121+
const fullDocument = bson.deserialize(change.fullDocument, BSON_OPTIONS) as { checksum?: number };
122+
return Number(fullDocument.checksum ?? 0);
123+
}
124+
},
125+
{
126+
label: 'bson.deserialize',
127+
run: (buffer: Buffer) => {
128+
const change = bson.deserialize(buffer, BSON_OPTIONS) as { fullDocument?: { checksum?: number } };
129+
return Number(change.fullDocument?.checksum ?? 0);
130+
}
131+
}
132+
] as const;
133+
134+
function createFullDocument(operationType: OperationType, targetBytes: number): FullDocument {
135+
const seed = targetBytes + (operationType === 'insert' ? 11 : 29);
136+
const baseDocument: FullDocument = {
137+
_id: new bson.ObjectId(),
138+
checksum: seed,
139+
operationType,
140+
tenantId: 'tenant-benchmark',
141+
version: operationType === 'insert' ? 1 : 2,
142+
createdAt: new Date('2026-01-01T00:00:00.000Z'),
143+
updatedAt: new Date('2026-01-02T03:04:05.000Z'),
144+
flags: {
145+
active: true,
146+
archived: false,
147+
source: 'benchmark'
148+
},
149+
metrics: {
150+
itemCount: seed,
151+
ratio: Number((targetBytes / 1024).toFixed(3))
152+
},
153+
tags: ['alpha', 'beta', 'gamma', operationType],
154+
nested: {
155+
owner: {
156+
id: `owner-${seed}`,
157+
region: 'af-south-1'
158+
},
159+
counters: [1, 2, 3, 5, 8, 13],
160+
changedFields: operationType === 'update' ? ['payload', 'metrics.itemCount', 'updatedAt'] : []
161+
},
162+
payload: ''
163+
};
164+
165+
const payloadLength = findPayloadLength(baseDocument, targetBytes);
166+
const payload = repeatCharacter('x', payloadLength);
167+
return {
168+
...baseDocument,
169+
payload
170+
};
171+
}
172+
173+
function createChangeDocument(fullDocument: FullDocument, operationType: OperationType): ChangeDocument {
174+
const updateDescription: UpdateDescription | undefined =
175+
operationType === 'update'
176+
? {
177+
updatedFields: {
178+
payload: fullDocument.payload,
179+
metrics: fullDocument.metrics,
180+
updatedAt: fullDocument.updatedAt,
181+
version: fullDocument.version
182+
},
183+
removedFields: ['legacyField'],
184+
truncatedArrays: []
185+
}
186+
: undefined;
187+
188+
return {
189+
_id: {
190+
_data: `${operationType}-${fullDocument.checksum}-${new bson.ObjectId().toHexString()}`
191+
},
192+
operationType,
193+
wallTime: new Date('2026-01-03T09:10:11.000Z'),
194+
ns: {
195+
db: 'benchmark_db',
196+
coll: 'benchmark_coll'
197+
},
198+
lsid: {
199+
id: new bson.Binary(Buffer.alloc(16, operationType === 'insert' ? 0x11 : 0x22))
200+
},
201+
txnNumber: seedLong(fullDocument.checksum),
202+
documentKey: {
203+
_id: fullDocument._id
204+
},
205+
...(updateDescription == null ? {} : { updateDescription }),
206+
fullDocument
207+
};
208+
}
209+
210+
function seedLong(value: number): bson.Long {
211+
return bson.Long.fromNumber(value);
212+
}
213+
214+
function findPayloadLength(baseDocument: FullDocument, targetBytes: number): number {
215+
const baseSize = bson.calculateObjectSize(baseDocument);
216+
if (baseSize >= targetBytes) {
217+
return 0;
218+
}
219+
220+
let low = 0;
221+
let high = Math.max(16, targetBytes - baseSize);
222+
while (calculateSizedDocumentBytes(baseDocument, high) < targetBytes) {
223+
high *= 2;
224+
}
225+
226+
while (low < high) {
227+
const mid = Math.floor((low + high) / 2);
228+
if (calculateSizedDocumentBytes(baseDocument, mid) < targetBytes) {
229+
low = mid + 1;
230+
} else {
231+
high = mid;
232+
}
233+
}
234+
235+
return low;
236+
}
237+
238+
function calculateSizedDocumentBytes(baseDocument: FullDocument, payloadLength: number): number {
239+
return bson.calculateObjectSize({
240+
...baseDocument,
241+
payload: repeatCharacter('x', payloadLength)
242+
});
243+
}
244+
245+
function repeatCharacter(character: string, count: number): string {
246+
return character.repeat(Math.max(0, count));
247+
}
248+
249+
function chooseIterations(eventBytes: number): number {
250+
const targetBytes = 256 * 1024 * 1024;
251+
return clamp(Math.floor(targetBytes / eventBytes), 200, 50_000);
252+
}
253+
254+
function clamp(value: number, min: number, max: number): number {
255+
return Math.max(min, Math.min(max, value));
256+
}
257+
258+
function median(values: number[]): number {
259+
const sorted = [...values].sort((a, b) => a - b);
260+
const middle = Math.floor(sorted.length / 2);
261+
return sorted.length % 2 === 0 ? (sorted[middle - 1] + sorted[middle]) / 2 : sorted[middle];
262+
}
263+
264+
function buildScenario(operationType: OperationType, sizeLabel: string, targetBytes: number): Scenario {
265+
const fullDocument = createFullDocument(operationType, targetBytes);
266+
const changeDocument = createChangeDocument(fullDocument, operationType);
267+
const buffer = Buffer.from(bson.serialize(changeDocument));
268+
return {
269+
label: `${operationType} ${sizeLabel}`,
270+
operationType,
271+
targetBytes,
272+
fullDocumentBytes: bson.calculateObjectSize(fullDocument),
273+
eventBytes: buffer.byteLength,
274+
buffer
275+
};
276+
}
277+
278+
function runBenchmark(
279+
label: string,
280+
fn: (buffer: Buffer) => number,
281+
buffer: Buffer,
282+
iterations: number
283+
): BenchmarkResult {
284+
const warmupIterations = Math.min(2_000, Math.max(50, Math.floor(iterations / 10)));
285+
let sink = 0;
286+
for (let i = 0; i < warmupIterations; i += 1) {
287+
sink += fn(buffer);
288+
}
289+
290+
const samples: number[] = [];
291+
for (let round = 0; round < 5; round += 1) {
292+
const start = performance.now();
293+
for (let i = 0; i < iterations; i += 1) {
294+
sink += fn(buffer);
295+
}
296+
samples.push(performance.now() - start);
297+
}
298+
299+
if (sink === Number.MIN_SAFE_INTEGER) {
300+
console.error(label);
301+
}
302+
303+
const elapsedMs = median(samples);
304+
const opsPerSecond = (iterations * 1000) / elapsedMs;
305+
const mibPerSecond = (buffer.byteLength * iterations) / (1024 * 1024) / (elapsedMs / 1000);
306+
return {
307+
elapsedMs,
308+
opsPerSecond,
309+
mibPerSecond
310+
};
311+
}
312+
313+
function printHeader() {
314+
console.log('Comparing parseChangeDocument against plain bson.deserialize');
315+
console.log('The key comparison is "parseChangeDocument + fullDocument" vs "bson.deserialize".');
316+
console.log('');
317+
}
318+
319+
function printRow(values: string[]): void {
320+
const widths = [16, 12, 12, 34, 16, 16];
321+
const line = values
322+
.map((value, index) => value.padEnd(widths[index] ?? value.length))
323+
.join(' ')
324+
.trimEnd();
325+
console.log(line);
326+
}
327+
328+
printHeader();
329+
printRow(['Scenario', 'Full doc', 'Event', 'Benchmark', 'Ops/s', 'MiB/s']);
330+
printRow(['--------', '--------', '-----', '---------', '-----', '-----']);
331+
332+
for (const operationType of OPERATION_TYPES) {
333+
for (const size of SIZE_TARGETS) {
334+
const scenario = buildScenario(operationType, size.label, size.bytes);
335+
const iterations = chooseIterations(scenario.eventBytes);
336+
const results = BENCHMARKS.map((benchmark) => ({
337+
label: benchmark.label,
338+
...runBenchmark(benchmark.label, benchmark.run, scenario.buffer, iterations)
339+
}));
340+
341+
let isFirstRow = true;
342+
for (const result of results) {
343+
printRow([
344+
isFirstRow ? scenario.label : '',
345+
isFirstRow ? formatBytes(scenario.fullDocumentBytes) : '',
346+
isFirstRow ? formatBytes(scenario.eventBytes) : '',
347+
result.label,
348+
formatNumber(result.opsPerSecond),
349+
formatNumber(result.mibPerSecond)
350+
]);
351+
isFirstRow = false;
352+
}
353+
}
354+
}
355+
356+
function formatNumber(value: number): string {
357+
return new Intl.NumberFormat('en-US', {
358+
maximumFractionDigits: value >= 100 ? 0 : 1
359+
}).format(value);
360+
}
361+
362+
function formatBytes(bytes: number): string {
363+
if (bytes < 1024) {
364+
return `${bytes} B`;
365+
}
366+
if (bytes < 1024 * 1024) {
367+
return `${(bytes / 1024).toFixed(bytes >= 10 * 1024 ? 0 : 1)} KB`;
368+
}
369+
return `${(bytes / (1024 * 1024)).toFixed(1)} MB`;
370+
}

0 commit comments

Comments
 (0)