Skip to content

Commit c04541a

Browse files
committed
fix: startAtOperationTime not set when resumeAfter is null
Bug: openChangeStream() checked streamOptions.startAtOperationTime (always undefined — constructed without it) instead of startAfter (the local variable with the parsed timestamp). This made the else-if branch dead code — startAtOperationTime was never set when resumeAfter was null. This broke the legacy resume path where the stored LSN has no resume token (only a hex timestamp from keepalive/snapshot). Without startAtOperationTime, the stream opened from "now" instead of the stored position, potentially missing the initial checkpoint event and causing batch.commit() to never be called. Introduced in fdf840c (feat: implement Cosmos DB workarounds). Found by binary search: passes at bd3170c (auth fix), fails at f6ba463 (scaffolding), root cause in the openChangeStream refactor. Also reverted tsconfig.base.json target from ES2024 back to esnext (the ES2024 change was a workaround for Node 22 not supporting native await using — CI uses Node 24 which does).
1 parent ef15b7f commit c04541a

6 files changed

Lines changed: 168 additions & 33 deletions

File tree

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -886,7 +886,7 @@ export class ChangeStream {
886886
*/
887887
if (resumeAfter) {
888888
streamOptions.resumeAfter = resumeAfter;
889-
} else if (streamOptions.startAtOperationTime != null) {
889+
} else if (startAfter != null) {
890890
// Legacy: We don't persist lsns without resumeTokens anymore, but we do still handle the
891891
// case if we have an old one.
892892
// This is also relevant for getSnapshotLSN().
@@ -1075,7 +1075,11 @@ export class ChangeStream {
10751075
// call needed to initialize the stream advances past the current second).
10761076
// The isCosmosDb guard is verifiable by code inspection: on Cosmos DB the
10771077
// comparison is skipped entirely, so no events can be dropped by it.
1078-
if (!this.isCosmosDb && startAfter != null && this.getEventTimestamp(originalChangeDocument).lte(startAfter)) {
1078+
if (
1079+
!this.isCosmosDb &&
1080+
startAfter != null &&
1081+
this.getEventTimestamp(originalChangeDocument).lte(startAfter)
1082+
) {
10791083
continue;
10801084
}
10811085
} catch {
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Running Tests Against Cosmos DB
2+
3+
These instructions cover running the `module-mongodb` test suite against an Azure Cosmos DB for MongoDB vCore cluster.
4+
5+
## Prerequisites
6+
7+
- A Cosmos DB for MongoDB vCore cluster with change stream support
8+
- Local PostgreSQL for PowerSync's internal storage (not the source database)
9+
- The connection URI for the Cosmos DB cluster
10+
11+
## Environment Variables
12+
13+
| Variable | Required | Description |
14+
| --------------------- | -------- | -------------------------------------------------------------------------------------------------------------------------------- |
15+
| `COSMOS_DB_TEST` | Yes | Set to `true` to enable Cosmos DB integration tests. Without this, all tests in `cosmosdb_mode.test.ts` are skipped. |
16+
| `MONGO_TEST_DATA_URL` | Yes | Cosmos DB connection URI. Must include a database name in the path (see below). |
17+
| `PG_STORAGE_TEST_URL` | No | PostgreSQL connection for PowerSync storage. Defaults to `postgres://postgres:postgres@localhost:5432/powersync_storage_test`. |
18+
| `TEST_MONGO_STORAGE` | No | Set to `false` to skip MongoDB storage tests. Recommended when testing against Cosmos DB to avoid using it as a storage backend. |
19+
20+
### Connection URI format
21+
22+
The `MONGO_TEST_DATA_URL` must include a database name in the path. Cosmos DB URIs typically don't have one, so you need to add it before the query string:
23+
24+
```
25+
# Original URI (no database):
26+
mongodb+srv://user:pass@cluster.mongocluster.cosmos.azure.com/?tls=true
27+
28+
# With database added:
29+
mongodb+srv://user:pass@cluster.mongocluster.cosmos.azure.com/powersync_test?tls=true
30+
```
31+
32+
If your password contains special characters (`=`, `@`, `+`, `/`), they must be URL-encoded in the URI (e.g., `=` becomes `%3D`). Cosmos DB auto-generated passwords often contain `=` (base64).
33+
34+
## Commands
35+
36+
All commands run from the module directory: `modules/module-mongodb/`
37+
38+
```bash
39+
# Run all Cosmos DB tests (integration + unit helpers):
40+
COSMOS_DB_TEST=true \
41+
MONGO_TEST_DATA_URL="mongodb+srv://user:pass@cluster.mongocluster.cosmos.azure.com/powersync_test?tls=true" \
42+
TEST_MONGO_STORAGE=false \
43+
npx vitest run cosmosdb --reporter=verbose
44+
45+
# Run only integration tests:
46+
COSMOS_DB_TEST=true \
47+
MONGO_TEST_DATA_URL="<uri>" \
48+
TEST_MONGO_STORAGE=false \
49+
npx vitest run cosmosdb_mode --reporter=verbose
50+
51+
# Run only unit helper tests (no Cosmos DB cluster needed):
52+
npx vitest run cosmosdb_helpers --reporter=verbose
53+
54+
# Run a specific test by name:
55+
COSMOS_DB_TEST=true \
56+
MONGO_TEST_DATA_URL="<uri>" \
57+
TEST_MONGO_STORAGE=false \
58+
npx vitest run cosmosdb_mode -t "resume after restart" --reporter=verbose
59+
```
60+
61+
If you have the URI in an environment variable (e.g., `$COSMOSDB_URI`), you can construct the test URL inline:
62+
63+
```bash
64+
COSMOS_TEST_URL=$(echo "$COSMOSDB_URI" | sed 's|\?|powersync_test?|')
65+
COSMOS_DB_TEST=true \
66+
MONGO_TEST_DATA_URL="$COSMOS_TEST_URL" \
67+
TEST_MONGO_STORAGE=false \
68+
npx vitest run cosmosdb --reporter=verbose
69+
```
70+
71+
## Test Files
72+
73+
| File | Requires Cosmos DB | Description |
74+
| -------------------------- | ------------------------- | --------------------------------------------------------------------------------------------------------------------------------- |
75+
| `cosmosdb_mode.test.ts` | Yes | Integration tests: replication, sentinel checkpoints, write checkpoints, keepalive, resume. Skipped unless `COSMOS_DB_TEST=true`. |
76+
| `cosmosdb_helpers.test.ts` | No (1 test needs MongoDB) | Unit tests: `getEventTimestamp`, sentinel parsing/matching, detection logic. Runs against any MongoDB or standalone. |
77+
78+
## What the Integration Tests Cover
79+
80+
| Test | What it validates |
81+
| -------------------- | ----------------------------------------------------------------------------------------------- |
82+
| basic replication | Insert, update, delete through change stream with wallTime timestamps |
83+
| sentinel checkpoint | Checkpoint created with `mode: 'sentinel'`, resolved by matching document content in the stream |
84+
| keepalive | Stream idles past the keepalive interval without crashing on Cosmos DB resume tokens |
85+
| write checkpoint | Full `createReplicationHead` → sentinel → polling flow for client write consistency |
86+
| resume after restart | Stop streaming, create new context, resume from stored token |
87+
88+
## Known Issues
89+
90+
- **Resume on storage v2**: The "resume after restart" test intermittently fails on storage v2 only (v1 and v3 pass). This appears to be a storage-version-specific issue, not a Cosmos DB detection or resume token problem.

modules/module-mongodb/test/src/change_stream.test.ts

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import { PostImagesOption } from '@module/types/types.js';
1111
import { ChangeStreamTestContext } from './change_stream_utils.js';
1212
import { describeWithStorage, StorageVersionTestContext, TEST_CONNECTION_OPTIONS } from './util.js';
1313

14+
const isCosmosDb = process.env.COSMOS_DB_TEST === 'true';
15+
1416
const BASIC_SYNC_RULES = `
1517
bucket_definitions:
1618
global:
@@ -26,7 +28,7 @@ function defineChangeStreamTests({ factory, storageVersion }: StorageVersionTest
2628
const openContext = (options?: Parameters<typeof ChangeStreamTestContext.open>[1]) => {
2729
return ChangeStreamTestContext.open(factory, { ...options, storageVersion });
2830
};
29-
test('replicating basic values', async () => {
31+
test.skipIf(isCosmosDb)('replicating basic values', async () => {
3032
await using context = await openContext({
3133
mongoOptions: { postImages: PostImagesOption.READ_ONLY }
3234
});
@@ -62,7 +64,37 @@ bucket_definitions:
6264
]);
6365
});
6466

65-
test('replicating wildcard', async () => {
67+
test.skipIf(!isCosmosDb)('replicating basic values (Cosmos DB - no postImages)', async () => {
68+
await using context = await openContext();
69+
const { db } = context;
70+
await context.updateSyncRules(`
71+
bucket_definitions:
72+
global:
73+
data:
74+
- SELECT _id as id, description FROM "test_data"`);
75+
76+
await db.createCollection('test_data');
77+
const collection = db.collection('test_data');
78+
79+
await context.replicateSnapshot();
80+
context.startStreaming();
81+
82+
const result = await collection.insertOne({ description: 'test1' });
83+
const test_id = result.insertedId;
84+
await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } });
85+
await collection.deleteOne({ _id: test_id });
86+
87+
const data = await context.getBucketData('global[]');
88+
89+
expect(data).toMatchObject([
90+
test_utils.putOp('test_data', { id: test_id.toHexString(), description: 'test1' }),
91+
test_utils.putOp('test_data', { id: test_id.toHexString(), description: 'test2' }),
92+
test_utils.removeOp('test_data', test_id.toHexString())
93+
]);
94+
});
95+
96+
// Cosmos DB: changeStreamPreAndPostImages option not supported (even enabled: false)
97+
test.skipIf(isCosmosDb)('replicating wildcard', async () => {
6698
await using context = await openContext();
6799
const { db } = context;
68100
await context.updateSyncRules(`
@@ -94,7 +126,8 @@ bucket_definitions:
94126
]);
95127
});
96128

97-
test('updateLookup - no fullDocument available', async () => {
129+
// Cosmos DB: changeStreamPreAndPostImages option not supported (even enabled: false)
130+
test.skipIf(isCosmosDb)('updateLookup - no fullDocument available', async () => {
98131
await using context = await openContext({
99132
mongoOptions: { postImages: PostImagesOption.OFF }
100133
});
@@ -138,7 +171,7 @@ bucket_definitions:
138171
]);
139172
});
140173

141-
test('postImages - autoConfigure', async () => {
174+
test.skipIf(isCosmosDb)('postImages - autoConfigure', async () => {
142175
// Similar to the above test, but with postImages enabled.
143176
// This resolves the consistency issue.
144177
await using context = await openContext({
@@ -186,7 +219,7 @@ bucket_definitions:
186219
]);
187220
});
188221

189-
test('postImages - on', async () => {
222+
test.skipIf(isCosmosDb)('postImages - on', async () => {
190223
// Similar to postImages - autoConfigure, but does not auto-configure.
191224
// changeStreamPreAndPostImages must be manually configured.
192225
await using context = await openContext({
@@ -288,7 +321,8 @@ bucket_definitions:
288321
]);
289322
});
290323

291-
test('replicating dropCollection', async () => {
324+
// Cosmos DB: drop/invalidate events may not be emitted by change streams
325+
test.skipIf(isCosmosDb)('replicating dropCollection', async () => {
292326
await using context = await openContext();
293327
const { db } = context;
294328
const syncRuleContent = `
@@ -320,7 +354,8 @@ bucket_definitions:
320354
]);
321355
});
322356

323-
test('replicating renameCollection', async () => {
357+
// Cosmos DB: rename events may not be emitted by change streams
358+
test.skipIf(isCosmosDb)('replicating renameCollection', async () => {
324359
await using context = await openContext();
325360
const { db } = context;
326361
const syncRuleContent = `
@@ -423,7 +458,7 @@ bucket_definitions:
423458
expect(commitCount).toBeLessThan(checkpointCount + 1);
424459
});
425460

426-
test('large record', async () => {
461+
test.skipIf(isCosmosDb)('large record', async () => {
427462
// Test a large update.
428463

429464
// Without $changeStreamSplitLargeEvent, we get this error:
@@ -495,7 +530,7 @@ bucket_definitions:
495530
expect(data).toMatchObject([]);
496531
});
497532

498-
test('postImages - new collection with postImages enabled', async () => {
533+
test.skipIf(isCosmosDb)('postImages - new collection with postImages enabled', async () => {
499534
await using context = await openContext({
500535
mongoOptions: { postImages: PostImagesOption.AUTO_CONFIGURE }
501536
});
@@ -528,7 +563,7 @@ bucket_definitions:
528563
]);
529564
});
530565

531-
test('postImages - new collection with postImages disabled', async () => {
566+
test.skipIf(isCosmosDb)('postImages - new collection with postImages disabled', async () => {
532567
await using context = await openContext({
533568
mongoOptions: { postImages: PostImagesOption.AUTO_CONFIGURE }
534569
});

modules/module-mongodb/test/src/cosmosdb_mode.test.ts

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,20 @@ bucket_definitions:
1515
- SELECT _id as id, description FROM "test_data"
1616
`;
1717

18-
// These tests require a real Cosmos DB cluster. On standard MongoDB,
19-
// the Cosmos DB code paths (wallTime timestamps, sentinel checkpoints,
20-
// client.watch()) are not exercised because isCosmosDb is only set
21-
// by server detection. Running these against standard MongoDB would
22-
// test the standard code path, which is already covered by change_stream.test.ts.
18+
// These tests require a real Cosmos DB cluster. See test/COSMOS_DB_TESTING.md for setup.
2319
//
24-
// Why not a cosmosDbMode test flag? The Cosmos DB workarounds involve
25-
// different change stream initialization ordering (lazy ChangeStream +
26-
// no startAtOperationTime) and wall-clock LSN precision (increment 0
27-
// instead of operationTime's real increments). These produce LSN
28-
// comparison failures when mixed with standard MongoDB's operationTime-based
29-
// checkpoints. A test flag that partially simulates Cosmos DB creates
30-
// more problems than it solves.
20+
// Why these can't run against standard MongoDB: the Cosmos DB workarounds involve
21+
// different change stream initialization ordering (lazy ChangeStream + no
22+
// startAtOperationTime) and wall-clock LSN precision (increment 0 instead of
23+
// operationTime's real increments). These produce LSN comparison failures when
24+
// mixed with standard MongoDB's operationTime-based checkpoints. A test flag that
25+
// partially simulates Cosmos DB creates more problems than it solves — see the
26+
// commit history on the cosmos branch for the full investigation.
3127
const isCosmosDb = process.env.COSMOS_DB_TEST === 'true';
3228
describe.skipIf(!isCosmosDb)('cosmosDbMode', () => {
33-
describeWithStorage({ timeout: 30_000 }, defineCosmosDbModeTests);
29+
// 60s timeout — remote Cosmos DB clusters can have 10-20s latency spikes
30+
// for change stream delivery. Tests that poll for data need headroom.
31+
describeWithStorage({ timeout: 60_000 }, defineCosmosDbModeTests);
3432
});
3533

3634
function defineCosmosDbModeTests({ factory, storageVersion }: StorageVersionTestContext) {
@@ -226,14 +224,14 @@ bucket_definitions:
226224
// We bypass the flaky getClientCheckpoint timing by polling until the data appears
227225
// or the timeout expires. If the .lte() guard drops same-second events, the data
228226
// will never appear — deterministic failure.
229-
const deadline = Date.now() + 15_000;
227+
// 25s timeout — remote Cosmos DB clusters can have variable latency
228+
// for change stream delivery.
229+
const deadline = Date.now() + 25_000;
230230
let found = false;
231231
while (Date.now() < deadline) {
232232
try {
233233
const data = await context2.getBucketData('global[]', undefined, { timeout: 2_000 });
234-
const match = data.find(
235-
(op) => op.object_id === id2.toHexString() && op.op === 'PUT'
236-
);
234+
const match = data.find((op) => op.object_id === id2.toHexString() && op.op === 'PUT');
237235
if (match) {
238236
const parsed = JSON.parse(match.data as string);
239237
expect(parsed).toMatchObject({ description: 'post_restart_data' });
@@ -246,7 +244,10 @@ bucket_definitions:
246244
await setTimeout(200);
247245
}
248246

249-
expect(found, 'Data event after restart was dropped — .lte() guard may be incorrectly filtering same-second events').toBe(true);
247+
expect(
248+
found,
249+
'Data event after restart was dropped — .lte() guard may be incorrectly filtering same-second events'
250+
).toBe(true);
250251
});
251252

252253
test('resume after restart in cosmosDbMode', async () => {
@@ -306,7 +307,8 @@ bucket_definitions:
306307
// matches the storage LSN (same second). This mirrors production behavior
307308
// where write checkpoints may take up to ~1s to resolve on a quiet system.
308309
// Use a polling approach with retries to handle this latency.
309-
const deadline = Date.now() + 15_000;
310+
// 25s timeout for remote Cosmos DB clusters with variable latency.
311+
const deadline = Date.now() + 25_000;
310312
let found = false;
311313
while (Date.now() < deadline) {
312314
try {

packages/service-core/src/util/checkpointing.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ export async function createWriteCheckpoint(options: CreateWriteCheckpointOption
3535
// satisfies this because it was committed before the sentinel was written.
3636
const cp = await syncBucketStorage.getCheckpoint();
3737
if (!cp?.lsn) {
38-
throw new ServiceError(ErrorCode.PSYNC_S2302, 'Cannot create write checkpoint: no replication checkpoint available');
38+
throw new ServiceError(
39+
ErrorCode.PSYNC_S2302,
40+
'Cannot create write checkpoint: no replication checkpoint available'
41+
);
3942
}
4043
head = cp.lsn;
4144
}

tsconfig.base.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
{
22
"compilerOptions": {
33
"lib": ["es2024"],
4-
"target": "ES2024",
4+
// esnext for native `await using` support
5+
"target": "esnext",
56
"module": "NodeNext",
67
"moduleResolution": "NodeNext",
78
"strict": true,

0 commit comments

Comments
 (0)