Skip to content

Commit e5074f0

Browse files
authored
[MongoDB] Direct BSON Buffer -> JSON conversion (#599)
* Add experimental bufferToSqlite implementation. * Use shared writer to reduce allocations. * Add bson -> SqliteRow tests. * Fix issues picked up by tests. * Restructure tests to include the actual expected output. * Fix handling of UUID edge cases and non-finite numbers. * Further restructure and simplify tests. * Improve and test Regexp option handling. * Further regexp tests. * Add tests for invalid UTF-8. * Add some length checks for invalid BSON. * Add tests for degenerate arrays. * Initial restructuring and comments. * Use consts. * Further cleanup and comments. * Simplify date serialization. * Handle date options in compatibility context. * Update docs. * Rename converters, switch to the new one. * Return Uint8Array instead of Buffer. * Use raw queries for mongo_test. * Remove DBRef tests - those were not actually using DBPointer types. * Add more DBPointer and DBRef tests. * Fix benchmark script; tweak docs. * Fix DBPointer implementation. * Avoid parsing a string just to get the end position. * Custom parser for parseDocumentId. * Optimize UUID implementation. * Add surrogate pair tests.
1 parent 7f993e4 commit e5074f0

File tree

11 files changed

+2867
-100
lines changed

11 files changed

+2867
-100
lines changed
Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
import { CompatibilityContext } from '@powersync/service-sync-rules';
2+
import * as bson from 'bson';
3+
import { performance } from 'node:perf_hooks';
4+
import { DirectSourceRowConverter, LegacySourceRowConverter, parseChangeDocument } from '../dist/index.js';
5+
6+
// This is a synthetic benchmark to test performance of parseChangeDocument
7+
// versus the normal bson.deserialize().
8+
// Primarily AI-generated.
9+
10+
const BSON_OPTIONS = { useBigInt64: true } as const;
11+
const OPERATION_TYPES = ['insert', 'update'] as const;
12+
const SIZE_TARGETS = [
13+
{ label: '1 KB', bytes: 1_024 },
14+
{ label: '10 KB', bytes: 10_240 },
15+
{ label: '100 KB', bytes: 102_400 }
16+
] as const;
17+
18+
type OperationType = (typeof OPERATION_TYPES)[number];
19+
20+
type FullDocument = {
21+
_id: bson.ObjectId;
22+
checksum: number;
23+
operationType: OperationType;
24+
tenantId: string;
25+
version: number;
26+
createdAt: Date;
27+
updatedAt: Date;
28+
flags: {
29+
active: boolean;
30+
archived: boolean;
31+
source: string;
32+
};
33+
metrics: {
34+
itemCount: number;
35+
ratio: number;
36+
};
37+
tags: string[];
38+
nested: {
39+
owner: {
40+
id: string;
41+
region: string;
42+
};
43+
counters: number[];
44+
changedFields: string[];
45+
};
46+
payload: string;
47+
};
48+
49+
type UpdateDescription = {
50+
updatedFields: {
51+
payload: string;
52+
metrics: FullDocument['metrics'];
53+
updatedAt: Date;
54+
version: number;
55+
};
56+
removedFields: string[];
57+
truncatedArrays: never[];
58+
};
59+
60+
type ChangeDocument = {
61+
_id: {
62+
_data: string;
63+
};
64+
operationType: OperationType;
65+
wallTime: Date;
66+
ns: {
67+
db: string;
68+
coll: string;
69+
};
70+
lsid: {
71+
id: bson.Binary;
72+
};
73+
txnNumber: bson.Long;
74+
documentKey: {
75+
_id: bson.ObjectId;
76+
};
77+
fullDocument: FullDocument;
78+
updateDescription?: UpdateDescription;
79+
};
80+
81+
type Benchmark = {
82+
label: string;
83+
run: (buffer: Buffer) => number;
84+
};
85+
86+
type Scenario = {
87+
label: string;
88+
operationType: OperationType;
89+
targetBytes: number;
90+
fullDocumentBytes: number;
91+
eventBytes: number;
92+
buffer: Buffer;
93+
};
94+
95+
type BenchmarkResult = {
96+
elapsedMs: number;
97+
opsPerSecond: number;
98+
mibPerSecond: number;
99+
};
100+
101+
const rawConverter = new DirectSourceRowConverter(CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
102+
const docConverter = new LegacySourceRowConverter(CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
103+
104+
const BENCHMARKS: readonly Benchmark[] = [
105+
{
106+
label: 'parseChangeDocument + rawToSqliteRow',
107+
run: (buffer: Buffer) => {
108+
const change = parseChangeDocument(buffer);
109+
if (!('fullDocument' in change)) {
110+
throw new Error('Unsupported change type: ' + change.operationType);
111+
}
112+
if (change.fullDocument == null) {
113+
throw new Error('Expected fullDocument to be present');
114+
}
115+
const fullDocument = rawConverter.rawToSqliteRow(change.fullDocument) as { checksum?: number };
116+
return Number(fullDocument.checksum ?? 0);
117+
}
118+
},
119+
{
120+
label: 'bson.deserialize + documentToSqliteRow',
121+
run: (buffer: Buffer) => {
122+
const change = bson.deserialize(buffer, BSON_OPTIONS) as { fullDocument?: { checksum?: number } };
123+
const fullDocument = docConverter.documentToSqliteRow(change.fullDocument!);
124+
return Number(change.fullDocument?.checksum ?? 0);
125+
}
126+
}
127+
] as const;
128+
129+
function createFullDocument(operationType: OperationType, targetBytes: number): FullDocument {
130+
const seed = targetBytes + (operationType === 'insert' ? 11 : 29);
131+
const baseDocument: FullDocument = {
132+
_id: new bson.ObjectId(),
133+
checksum: seed,
134+
operationType,
135+
tenantId: 'tenant-benchmark',
136+
version: operationType === 'insert' ? 1 : 2,
137+
createdAt: new Date('2026-01-01T00:00:00.000Z'),
138+
updatedAt: new Date('2026-01-02T03:04:05.000Z'),
139+
flags: {
140+
active: true,
141+
archived: false,
142+
source: 'benchmark'
143+
},
144+
metrics: {
145+
itemCount: seed,
146+
ratio: Number((targetBytes / 1024).toFixed(3))
147+
},
148+
tags: ['alpha', 'beta', 'gamma', operationType],
149+
nested: {
150+
owner: {
151+
id: `owner-${seed}`,
152+
region: 'af-south-1'
153+
},
154+
counters: [1, 2, 3, 5, 8, 13],
155+
changedFields: operationType === 'update' ? ['payload', 'metrics.itemCount', 'updatedAt'] : []
156+
},
157+
payload: ''
158+
};
159+
160+
const payloadLength = findPayloadLength(baseDocument, targetBytes);
161+
const payload = repeatCharacter('x', payloadLength);
162+
return {
163+
...baseDocument,
164+
payload
165+
};
166+
}
167+
168+
function createChangeDocument(fullDocument: FullDocument, operationType: OperationType): ChangeDocument {
169+
const updateDescription: UpdateDescription | undefined =
170+
operationType === 'update'
171+
? {
172+
updatedFields: {
173+
payload: fullDocument.payload,
174+
metrics: fullDocument.metrics,
175+
updatedAt: fullDocument.updatedAt,
176+
version: fullDocument.version
177+
},
178+
removedFields: ['legacyField'],
179+
truncatedArrays: []
180+
}
181+
: undefined;
182+
183+
return {
184+
_id: {
185+
_data: `${operationType}-${fullDocument.checksum}-${new bson.ObjectId().toHexString()}`
186+
},
187+
operationType,
188+
wallTime: new Date('2026-01-03T09:10:11.000Z'),
189+
ns: {
190+
db: 'benchmark_db',
191+
coll: 'benchmark_coll'
192+
},
193+
lsid: {
194+
id: new bson.Binary(Buffer.alloc(16, operationType === 'insert' ? 0x11 : 0x22))
195+
},
196+
txnNumber: seedLong(fullDocument.checksum),
197+
documentKey: {
198+
_id: fullDocument._id
199+
},
200+
...(updateDescription == null ? {} : { updateDescription }),
201+
fullDocument
202+
};
203+
}
204+
205+
function seedLong(value: number): bson.Long {
206+
return bson.Long.fromNumber(value);
207+
}
208+
209+
function findPayloadLength(baseDocument: FullDocument, targetBytes: number): number {
210+
const baseSize = bson.calculateObjectSize(baseDocument);
211+
if (baseSize >= targetBytes) {
212+
return 0;
213+
}
214+
215+
let low = 0;
216+
let high = Math.max(16, targetBytes - baseSize);
217+
while (calculateSizedDocumentBytes(baseDocument, high) < targetBytes) {
218+
high *= 2;
219+
}
220+
221+
while (low < high) {
222+
const mid = Math.floor((low + high) / 2);
223+
if (calculateSizedDocumentBytes(baseDocument, mid) < targetBytes) {
224+
low = mid + 1;
225+
} else {
226+
high = mid;
227+
}
228+
}
229+
230+
return low;
231+
}
232+
233+
function calculateSizedDocumentBytes(baseDocument: FullDocument, payloadLength: number): number {
234+
return bson.calculateObjectSize({
235+
...baseDocument,
236+
payload: repeatCharacter('x', payloadLength)
237+
});
238+
}
239+
240+
function repeatCharacter(character: string, count: number): string {
241+
return character.repeat(Math.max(0, count));
242+
}
243+
244+
function chooseIterations(eventBytes: number): number {
245+
const targetBytes = 256 * 1024 * 1024;
246+
return clamp(Math.floor(targetBytes / eventBytes), 200, 50_000);
247+
}
248+
249+
function clamp(value: number, min: number, max: number): number {
250+
return Math.max(min, Math.min(max, value));
251+
}
252+
253+
function median(values: number[]): number {
254+
const sorted = [...values].sort((a, b) => a - b);
255+
const middle = Math.floor(sorted.length / 2);
256+
return sorted.length % 2 === 0 ? (sorted[middle - 1] + sorted[middle]) / 2 : sorted[middle];
257+
}
258+
259+
function buildScenario(operationType: OperationType, sizeLabel: string, targetBytes: number): Scenario {
260+
const fullDocument = createFullDocument(operationType, targetBytes);
261+
const changeDocument = createChangeDocument(fullDocument, operationType);
262+
const buffer = Buffer.from(bson.serialize(changeDocument));
263+
return {
264+
label: `${operationType} ${sizeLabel}`,
265+
operationType,
266+
targetBytes,
267+
fullDocumentBytes: bson.calculateObjectSize(fullDocument),
268+
eventBytes: buffer.byteLength,
269+
buffer
270+
};
271+
}
272+
273+
function runBenchmark(
274+
label: string,
275+
fn: (buffer: Buffer) => number,
276+
buffer: Buffer,
277+
iterations: number
278+
): BenchmarkResult {
279+
const warmupIterations = Math.min(2_000, Math.max(50, Math.floor(iterations / 10)));
280+
let sink = 0;
281+
for (let i = 0; i < warmupIterations; i += 1) {
282+
sink += fn(buffer);
283+
}
284+
285+
const samples: number[] = [];
286+
for (let round = 0; round < 5; round += 1) {
287+
const start = performance.now();
288+
for (let i = 0; i < iterations; i += 1) {
289+
sink += fn(buffer);
290+
}
291+
samples.push(performance.now() - start);
292+
}
293+
294+
if (sink === Number.MIN_SAFE_INTEGER) {
295+
console.error(label);
296+
}
297+
298+
const elapsedMs = median(samples);
299+
const opsPerSecond = (iterations * 1000) / elapsedMs;
300+
const mibPerSecond = (buffer.byteLength * iterations) / (1024 * 1024) / (elapsedMs / 1000);
301+
return {
302+
elapsedMs,
303+
opsPerSecond,
304+
mibPerSecond
305+
};
306+
}
307+
308+
function printRow(values: string[]): void {
309+
const widths = [16, 10, 10, 38, 10, 10];
310+
const line = values
311+
.map((value, index) => value.padEnd(widths[index] ?? value.length))
312+
.join(' ')
313+
.trimEnd();
314+
console.log(line);
315+
}
316+
317+
printRow(['Scenario', 'Full doc', 'Event', 'Benchmark', 'Ops/s', 'MiB/s']);
318+
printRow(['--------', '--------', '-----', '---------', '-----', '-----']);
319+
320+
for (const operationType of OPERATION_TYPES) {
321+
for (const size of SIZE_TARGETS) {
322+
const scenario = buildScenario(operationType, size.label, size.bytes);
323+
const iterations = chooseIterations(scenario.eventBytes);
324+
const results = BENCHMARKS.map((benchmark) => ({
325+
label: benchmark.label,
326+
...runBenchmark(benchmark.label, benchmark.run, scenario.buffer, iterations)
327+
}));
328+
329+
let isFirstRow = true;
330+
for (const result of results) {
331+
printRow([
332+
isFirstRow ? scenario.label : '',
333+
isFirstRow ? formatBytes(scenario.fullDocumentBytes) : '',
334+
isFirstRow ? formatBytes(scenario.eventBytes) : '',
335+
result.label,
336+
formatNumber(result.opsPerSecond),
337+
formatNumber(result.mibPerSecond)
338+
]);
339+
isFirstRow = false;
340+
}
341+
}
342+
}
343+
344+
function formatNumber(value: number): string {
345+
return new Intl.NumberFormat('en-US', {
346+
maximumFractionDigits: value >= 100 ? 0 : 1
347+
}).format(value);
348+
}
349+
350+
function formatBytes(bytes: number): string {
351+
if (bytes < 1024) {
352+
return `${bytes} B`;
353+
}
354+
if (bytes < 1024 * 1024) {
355+
return `${(bytes / 1024).toFixed(bytes >= 10 * 1024 ? 0 : 1)} KB`;
356+
}
357+
return `${(bytes / (1024 * 1024)).toFixed(1)} MB`;
358+
}

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import {
3333
rawChangeStream
3434
} from './RawChangeStream.js';
3535
import { CHECKPOINTS_COLLECTION, timestampToDate } from './replication-utils.js';
36-
import { DefaultSourceRowConverter, SourceRowConverter } from './SourceRowConverter.js';
36+
import { DirectSourceRowConverter, SourceRowConverter } from './SourceRowConverter.js';
3737

3838
export interface ChangeStreamOptions {
3939
connections: MongoManager;
@@ -118,7 +118,7 @@ export class ChangeStream {
118118
this.sync_rules = options.storage.getParsedSyncRules({
119119
defaultSchema: this.defaultDb.databaseName
120120
});
121-
this.sourceRowConverter = new DefaultSourceRowConverter(this.sync_rules.compatibility);
121+
this.sourceRowConverter = new DirectSourceRowConverter(this.sync_rules.compatibility);
122122

123123
// The change stream aggregation command should timeout before the socket times out,
124124
// so we use 90% of the socket timeout value.

0 commit comments

Comments
 (0)