Skip to content

Commit 7d8fa99

Browse files
committed
Add definition rollup, action suggestions, and tables to the bucket report
1 parent d4eb5cc commit 7d8fa99

9 files changed

Lines changed: 714 additions & 91 deletions

File tree

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 160 additions & 35 deletions
Large diffs are not rendered by default.

modules/module-mongodb-storage/src/storage/implementation/v1/MongoSyncBucketStorageV1.ts

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ import {
2929
MongoSyncBucketStorage,
3030
MongoSyncBucketStorageOptions,
3131
TopBucketCandidate,
32-
TopBucketSelection
32+
TopBucketSelection,
33+
TopDefinitionCandidate
3334
} from '../MongoSyncBucketStorage.js';
3435
import {
3536
BucketDataDocumentV1,
@@ -190,13 +191,15 @@ export class MongoSyncBucketStorageV1 extends MongoSyncBucketStorage {
190191

191192
// For storage v1/v2, bucket state and bucket data are shared collections scoped by group (replication stream).
192193
protected async collectTopBuckets(limit: number): Promise<TopBucketSelection> {
193-
const { buckets, totals } = await this.aggregateTopBuckets(
194+
const { buckets, definitions, definitionsTruncated, totals } = await this.aggregateTopBuckets(
194195
this.db.bucketStateV1,
195196
{ '_id.g': this.replicationStreamId },
196197
limit
197198
);
198199
return {
199200
buckets: buckets.map((b) => ({ bucket: b.id.b, operations: b.operations, operationBytes: b.operationBytes })),
201+
definitions,
202+
definitionsTruncated,
200203
totals
201204
};
202205
}
@@ -225,6 +228,36 @@ export class MongoSyncBucketStorageV1 extends MongoSyncBucketStorage {
225228
return this.estimateRowsFromOperationSample(this.db.bucketDataV1, buildPrefix, candidate.operations, sampled);
226229
}
227230

231+
protected estimateDefinitionRows(candidate: TopDefinitionCandidate): Promise<BucketRowEstimate> {
232+
const sampled = this.shouldSampleBucketRows(candidate.operations);
233+
const buildPrefix = (applySample: boolean): mongo.Document[] => {
234+
// All of a definition's bucket names start with `<definition>[`, so an `_id` range on that string
235+
// prefix selects exactly the definition's operations via the index. `\\` (0x5C) is the character
236+
// after `[` (0x5B), so [`<definition>[`, `<definition>\\`) cannot include any other definition:
237+
// a longer definition name would have to differ at or before the `[`.
238+
const prefix: mongo.Document[] = [
239+
{
240+
$match: {
241+
_id: {
242+
$gte: { g: this.replicationStreamId, b: `${candidate.definition}[`, o: new bson.MinKey() },
243+
$lt: { g: this.replicationStreamId, b: `${candidate.definition}\\`, o: new bson.MinKey() }
244+
}
245+
}
246+
}
247+
];
248+
if (applySample) {
249+
prefix.push({ $match: { $sampleRate: this.bucketRowSampleRate(candidate.operations) } });
250+
}
251+
return prefix;
252+
};
253+
// Include the bucket name in the row key: at definition grain a row counts once per bucket holding it.
254+
return this.estimateRowsFromOperationSample(this.db.bucketDataV1, buildPrefix, candidate.operations, sampled, {
255+
b: '$_id.b',
256+
table: '$table',
257+
row_id: '$row_id'
258+
});
259+
}
260+
228261
protected createMongoParameterCompactor(
229262
checkpoint: InternalOpId,
230263
options: storage.CompactOptions

modules/module-mongodb-storage/src/storage/implementation/v3/MongoSyncBucketStorageV3.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import {
2828
MongoSyncBucketStorage,
2929
MongoSyncBucketStorageOptions,
3030
TopBucketCandidate,
31-
TopBucketSelection
31+
TopBucketSelection,
32+
TopDefinitionCandidate
3233
} from '../MongoSyncBucketStorage.js';
3334
import { loadBucketDataDocument } from './bucket-format.js';
3435
import {
@@ -193,7 +194,7 @@ export class MongoSyncBucketStorageV3 extends MongoSyncBucketStorage {
193194
// sharing these collections. Scope to the active config's definition ids so the report excludes stale buckets
194195
// from old/stopped definitions. `this.storageIds` is derived from the active config only (see getActiveSyncConfig).
195196
protected async collectTopBuckets(limit: number): Promise<TopBucketSelection> {
196-
const { buckets, totals } = await this.aggregateTopBuckets(
197+
const { buckets, definitions, definitionsTruncated, totals } = await this.aggregateTopBuckets(
197198
this.db.bucketState(this.replicationStreamId),
198199
{ '_id.d': { $in: this.storageIds.bucketDefinitionIds } },
199200
limit
@@ -205,6 +206,8 @@ export class MongoSyncBucketStorageV3 extends MongoSyncBucketStorage {
205206
operationBytes: b.operationBytes,
206207
defId: b.id.d
207208
})),
209+
definitions,
210+
definitionsTruncated,
208211
totals
209212
};
210213
}
@@ -229,6 +232,30 @@ export class MongoSyncBucketStorageV3 extends MongoSyncBucketStorage {
229232
return this.estimateRowsFromOperationSample(collection, buildPrefix, candidate.operations, sampled);
230233
}
231234

235+
protected estimateDefinitionRows(candidate: TopDefinitionCandidate): Promise<BucketRowEstimate> {
236+
// A definition's operations are exactly its per-definition bucket_data collection, so no match stage is
237+
// needed. Keep the bucket name alongside each unwound operation: at definition grain a row counts once
238+
// per bucket holding it.
239+
const sampled = this.shouldSampleBucketRows(candidate.operations);
240+
const collection = this.db.bucketData(this.replicationStreamId, candidate.defId!);
241+
const buildPrefix = (applySample: boolean): mongo.Document[] => {
242+
const prefix: mongo.Document[] = [];
243+
if (applySample) {
244+
prefix.push({ $match: { $sampleRate: this.bucketRowSampleRate(candidate.operations) } });
245+
}
246+
prefix.push(
247+
{ $unwind: '$ops' },
248+
{ $project: { b: '$_id.b', op: '$ops.op', table: '$ops.table', row_id: '$ops.row_id' } }
249+
);
250+
return prefix;
251+
};
252+
return this.estimateRowsFromOperationSample(collection, buildPrefix, candidate.operations, sampled, {
253+
b: '$b',
254+
table: '$table',
255+
row_id: '$row_id'
256+
});
257+
}
258+
232259
protected createMongoParameterCompactor(
233260
checkpoint: InternalOpId,
234261
options: storage.CompactOptions

packages/service-core-tests/src/tests/register-bucket-report-tests.ts

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,33 @@ bucket_definitions:
6161
const report = await getReport(bucketStorage);
6262

6363
expect(report.totals.bucketCount).toEqual(1);
64-
expect(report.truncated).toEqual(false);
64+
expect(report.bucketsTruncated).toEqual(false);
65+
expect(report.definitionsTruncated).toEqual(false);
6566

6667
const stats = report.buckets.find((b) => b.bucket === bucket)!;
6768
// Three inserts of distinct ids: three operations, three live rows, fully compacted (ratio 1).
68-
expect(stats).toMatchObject({ operations: 3, rows: 3, fragmentation: 1, rowsEstimated: false });
69+
expect(stats).toMatchObject({
70+
operations: 3,
71+
rows: 3,
72+
fragmentation: 1,
73+
rowsEstimated: false,
74+
suggestedAction: 'none',
75+
tables: ['test']
76+
});
6977
expect(stats.operationBytes).toBeGreaterThan(0);
7078
expect(report.totals).toMatchObject({ operations: 3, estimated: false });
79+
80+
// The definition rollup aggregates the single bucket. The definition name is the bucket-name prefix.
81+
expect(report.definitions).toHaveLength(1);
82+
expect(report.definitions[0]).toMatchObject({
83+
definition: bucket.split('[')[0],
84+
bucketCount: 1,
85+
operations: 3,
86+
rows: 3,
87+
fragmentation: 1,
88+
suggestedAction: 'none',
89+
tables: ['test']
90+
});
7191
});
7292

7393
test('operations exceed live rows after updates, and compaction reduces fragmentation', async () => {
@@ -162,6 +182,15 @@ bucket_definitions:
162182
expect(report.buckets.find((b) => b.bucket === b1)).toMatchObject({ operations: 3, rows: 1 });
163183
expect(report.buckets.find((b) => b.bucket === b2)).toMatchObject({ operations: 2, rows: 2 });
164184

185+
// Both buckets belong to one definition; the rollup sums them, counting each bucket's rows separately.
186+
expect(report.definitions).toHaveLength(1);
187+
expect(report.definitions[0]).toMatchObject({
188+
definition: b1.split('[')[0],
189+
bucketCount: 2,
190+
operations: 5,
191+
rows: 3
192+
});
193+
165194
// operationBytes is an aggregated ($toDouble) sum; assert every bucket is non-zero and that the
166195
// per-bucket bytes add up to the instance total.
167196
expect(report.totals.operationBytes).toBeGreaterThan(0);
@@ -204,13 +233,46 @@ bucket_definitions:
204233
const b1 = test_utils.bucketRequest(content, 'grouped["b1"]').bucket;
205234

206235
const report = await getReport(bucketStorage, { limit: 1 });
207-
expect(report.truncated).toEqual(true);
236+
expect(report.bucketsTruncated).toEqual(true);
208237
expect(report.buckets.map((b) => b.bucket)).toEqual([b1]);
209238
// Totals still cover every bucket, not just the truncated list.
210239
expect(report.totals.bucketCount).toEqual(2);
211240
expect(report.totals).toMatchObject({ operations: 3, estimated: false });
212241
});
213242

243+
test('caps the definition rollup and flags the truncation', async () => {
244+
// Two definitions past the rollup cap; a single row lands in every definition's global bucket.
245+
const definitionCount = storage.BUCKET_REPORT_DEFINITION_LIMIT + 2;
246+
const manyDefinitions =
247+
'bucket_definitions:\n' +
248+
Array.from({ length: definitionCount }, (_, i) => ` def${i}:\n data: [select * from test]\n`).join('');
249+
250+
await using factory = await generateStorageFactory();
251+
const { stream } = await test_utils.deploySyncRules(
252+
factory,
253+
updateSyncRulesFromYaml(manyDefinitions, { storageVersion })
254+
);
255+
const bucketStorage = factory.getInstance(stream);
256+
257+
await using writer = await bucketStorage.createWriter(test_utils.BATCH_OPTIONS);
258+
const testTable = await test_utils.resolveTestTable(writer, 'test', ['id'], config);
259+
await writer.markAllSnapshotDone('1/1');
260+
await writer.save({
261+
sourceTable: testTable,
262+
tag: storage.SaveOperationTag.INSERT,
263+
after: { id: 't1' },
264+
afterReplicaId: test_utils.rid('t1')
265+
});
266+
await writer.commit('1/1');
267+
await writer.flush();
268+
269+
const report = await getReport(bucketStorage);
270+
expect(report.totals.bucketCount).toEqual(definitionCount);
271+
expect(report.bucketsTruncated).toEqual(false);
272+
expect(report.definitions).toHaveLength(storage.BUCKET_REPORT_DEFINITION_LIMIT);
273+
expect(report.definitionsTruncated).toEqual(true);
274+
});
275+
214276
test('samples the row count for a bucket above the sampling threshold', async () => {
215277
await using factory = await generateStorageFactory();
216278
const { stream, content } = await test_utils.deploySyncRules(
@@ -265,5 +327,21 @@ bucket_definitions:
265327
expect(stats.rows).toBeLessThanOrEqual(55);
266328
// Fragmentation is operations / rows, so a heavily updated bucket reads well above 1.
267329
expect(stats.fragmentation).toBeGreaterThan(10);
330+
// The history is un-compacted superseded churn, which a compact reclaims.
331+
expect(stats.suggestedAction).toEqual('compact');
332+
// The sampled history names the tables a defragment would touch.
333+
expect(stats.tables).toEqual(['test']);
334+
335+
// The definition rollup samples the same history at definition grain.
336+
expect(report.definitions).toHaveLength(1);
337+
const defStats = report.definitions[0];
338+
expect(defStats).toMatchObject({
339+
bucketCount: 1,
340+
operations: stats.operations,
341+
suggestedAction: 'compact',
342+
tables: ['test']
343+
});
344+
expect(defStats.rows).toBeGreaterThanOrEqual(45);
345+
expect(defStats.rows).toBeLessThanOrEqual(55);
268346
});
269347
}

packages/service-core/src/routes/endpoints/admin.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,15 +313,29 @@ export const bucketReport = routeDefinition({
313313
rows: bucket.rows,
314314
operation_bytes: bucket.operationBytes,
315315
fragmentation: bucket.fragmentation,
316-
rows_estimated: bucket.rowsEstimated
316+
rows_estimated: bucket.rowsEstimated,
317+
suggested_action: bucket.suggestedAction,
318+
tables: bucket.tables
319+
})),
320+
definitions: report.definitions.map((definition) => ({
321+
definition: definition.definition,
322+
bucket_count: definition.bucketCount,
323+
operations: definition.operations,
324+
operation_bytes: definition.operationBytes,
325+
rows: definition.rows,
326+
fragmentation: definition.fragmentation,
327+
rows_estimated: definition.rowsEstimated,
328+
suggested_action: definition.suggestedAction,
329+
tables: definition.tables
317330
})),
318331
totals: {
319332
bucket_count: report.totals.bucketCount,
320333
operations: report.totals.operations,
321334
operation_bytes: report.totals.operationBytes,
322335
estimated: report.totals.estimated
323336
},
324-
truncated: report.truncated
337+
buckets_truncated: report.bucketsTruncated,
338+
definitions_truncated: report.definitionsTruncated
325339
});
326340
}
327341
});

0 commit comments

Comments
 (0)