Skip to content

feat: experimental Cosmos DB MongoDB vCore support#605

Open
Sleepful wants to merge 20 commits intomainfrom
cosmos
Open

feat: experimental Cosmos DB MongoDB vCore support#605
Sleepful wants to merge 20 commits intomainfrom
cosmos

Conversation

@Sleepful
Copy link
Copy Markdown
Contributor

@Sleepful Sleepful commented Apr 14, 2026

Summary

Add experimental support for Azure Cosmos DB for MongoDB vCore (Azure DocumentDB) as a source database. Auto-detects Cosmos DB via hello.internal.cosmos_versions or hello.internal.documentdb_versions and applies workarounds for compatibility gaps in change stream behavior.

What this PR does

Cosmos DB detection

  • detectCosmosDb() method runs on both initReplication() and streamChangesInternal() — survives service restarts
  • Guards isdbgrid and setName checks (Cosmos DB returns msg: 'isdbgrid' and no setName)
  • Throws on post-images configuration (unsupported by Cosmos DB)

wallTime as clusterTime substitute

Standard MongoDB change events include clusterTime — a BSON Timestamp with seconds and an increment (a monotonic counter within each second). Two operations in the same second get different increments (e.g., Timestamp(1718457600, 1) and Timestamp(1718457600, 2)), providing a total order. Cosmos DB change events don't include clusterTime. Instead, they provide wallTime — a Date used at second precision with the increment hardcoded to 0: Timestamp.fromBits(0, Math.floor(wallTime.getTime() / 1000)). All events within the same wall-clock second produce identical LSNs.

This loss of ordering precision directly drives two other changes in this PR:

  • The .lte() dedup guard must be disabled on Cosmos DB. With clusterTime, events at the boundary have unique increments and .lte() correctly identifies duplicates. With second-precision wallTime, .lte() matches all events in the same second as the last checkpoint, silently dropping real data. (See .lte() dedup guard below.)
  • Sentinel checkpoint matching uses document content instead of LSN comparison. Multiple checkpoint events in the same second produce the same wallTime-derived LSN, making >= ambiguous for identifying a specific checkpoint. Content matching sidesteps the ordering problem entirely. (See Sentinel checkpoint resolution below.)
Call sites and files changed

getEventTimestamp() is a private method on ChangeStream in modules/module-mongodb/src/replication/ChangeStream.ts. It returns clusterTime on standard MongoDB and falls back to wallTime on Cosmos DB. It replaces direct clusterTime! access at these call sites, all in the same file:

  • getSnapshotLsn() — LSN construction from the initial checkpoint event
  • streamChangesInternal() — the .lte(startAfter) dedup guard
  • streamChangesInternal() — LSN construction when a checkpoint event is committed
  • streamChangesInternal() — checkpoint-out-of-order safety check (error message)
  • streamChangesInternal() — periodic resume LSN persistence during catchup

MongoRelation.ts references the method in a comment explaining why sentinel mode uses $inc counters rather than relying on wallTime-derived LSNs for ordering.

Sentinel checkpoint resolution

The replication loop in streamChangesInternal groups change stream events into batches. A batch is committed when a checkpoint event arrives — a document that PowerSync writes to _powersync_checkpoints and then observes coming back through the change stream. The variable waitForCheckpointLsn gates this: it holds a value after a checkpoint is created, and clears to null when the matching event arrives, triggering a commit. This creates a natural batching rhythm where data accumulates between checkpoints.

On standard MongoDB, the checkpoint is resolved by comparing LSN strings. createCheckpoint captures session.operationTime (the server-assigned timestamp of the write) and serializes it via MongoLSN into a hex string. When the same write appears on the change stream, the event carries clusterTime (the same server timestamp) plus a resume token. Both are serialized into the MongoLSN comparable format — a hex timestamp optionally followed by |<base64 resume token>. Since the checkpoint's LSN has no resume token suffix, the event's LSN is lexicographically greater for the same timestamp, and >= resolves correctly. It also resolves for any later checkpoint event, making the comparison defensive.

Cosmos DB breaks this: session.operationTime is not returned on regular commands, and clusterTime is absent from change events. Without a timestamp to capture or compare, the LSN comparison can't work. Instead, createCheckpoint returns a sentinel string (sentinel:<id>:<i>) encoding the document's _id and its $inc counter value. The streaming loop matches by deserializing fullDocument from the checkpoint event and checking for an exact content match on _id and i. After the sentinel resolves, the commit path is identical — batch.commit(lsn) is called with a wallTime-derived LSN in both cases.

Implementation details
  • createCheckpoint() in MongoRelation.ts accepts mode: 'lsn' | 'sentinel'. In sentinel mode, it returns sentinel:<id>:<i> from the upserted document instead of reading session.operationTime.
  • Branching in streamChangesInternal is via waitForCheckpointLsn.startsWith('sentinel:') — the same variable holds either a hex LSN or a sentinel string.
  • MongoLSN.comparable serializes as <8-hex-high><8-hex-low> (timestamp only) or <8-hex-high><8-hex-low>|<base64-bson> (with resume token). The resume token is carried for stream resumption on restart, not for this comparison.
  • Standard MongoDB resolution: lsn >= waitForCheckpointLsn where both sides share the same timestamp but differ in resume token suffix.
  • Cosmos DB resolution: content match on changeDocument.documentKey._id + BSON.deserialize(changeDocument.fullDocument).i.
  • After resolution (waitForCheckpointLsn = null), both paths fall through identically to batch.commit(lsn, ...).

Snapshot LSN acquisition

Before the initial snapshot, getSnapshotLsn() establishes a change stream position to resume from after the snapshot completes. It writes a checkpoint document and waits to observe it on the change stream — the event's LSN becomes the starting point for streaming.

On standard MongoDB, the checkpoint's operationTime is used with startAtOperationTime to open the stream from that exact point. On Cosmos DB, startAtOperationTime is not supported, so the stream opens from the current position (lsn: null). The checkpoint may have been written before the stream opened, in which case its event is missed. To handle this, the loop re-creates checkpoints every second for up to 60 seconds until one is observed. A later checkpoint producing a slightly later snapshot LSN is harmless — any changes between the missed and caught checkpoints are captured by the snapshot itself, which reads the full current state of all collections. Unlike the streaming loop's sentinel matching, getSnapshotLsn doesn't need content matching — it just looks for any checkpoint event from the right checkpointId and returns a wallTime-derived LSN.

Implementation details
  • getSnapshotLsn() in ChangeStream.ts calls detectCosmosDb() first, then branches on this.isCosmosDb for checkpoint creation and stream options.
  • On Cosmos DB: createCheckpoint(mode: 'sentinel') is called but the returned sentinel string is not used — streamLsn is set to null and the stream opens without resumeAfter or startAtOperationTime.
  • The retry loop creates new checkpoints every LSN_CREATE_INTERVAL_SECONDS (1s) with mode: this.isCosmosDb ? 'sentinel' : 'lsn'.
  • Resolution is by checkpointId match only (this.checkpointStreamId.equals(checkpointId)), not by sentinel content. The return value is a real MongoLSN comparable string, not a sentinel.
  • Timeout is LSN_TIMEOUT_SECONDS (60s) — if no checkpoint is observed, a PSYNC_S1301 error is thrown.

Write checkpoint HEAD capture

Write checkpoints provide read-your-own-writes consistency for sync clients. After a client writes to the source database, replication may not have delivered the change to PowerSync's storage yet. A write checkpoint ensures the sync response includes all writes up to a known replication position, so the client doesn't see stale data that's missing its own recent changes.

The flow works by capturing a replication HEAD — the current position of the change stream — then creating a checkpoint that the sync stream resolves once replication advances past that HEAD. On standard MongoDB, createReplicationHead issues a hello command on a session, reads session.clusterTime, and passes it to the callback as the HEAD. This works because any subsequent change stream event is guaranteed to be at or after that cluster time.

Cosmos DB doesn't return clusterTime on the hello command. Instead, createReplicationHead writes a sentinel to _powersync_checkpoints (triggering a change stream event) and passes null as the HEAD. The caller — createWriteCheckpoint in service-core — detects the null and reads the current storage checkpoint LSN directly. This is valid because the storage LSN was committed before the sentinel was written, and the sentinel guarantees the streaming loop will advance past it, so the write checkpoint will eventually resolve.

Implementation details
  • MongoRouteAPIAdapter.createReplicationHead() calls detectCosmosDb(), then either captures session.clusterTime (standard) or writes a sentinel via findOneAndUpdate with $inc and calls callback(null) (Cosmos DB).
  • createWriteCheckpoint() in checkpointing.ts checks if HEAD is null. If so, it calls syncBucketStorage.getCheckpoint() to read the current storage LSN and uses that as the HEAD.
  • ReplicationHeadCallback type was widened from (head: string) => Promise<T> to (head: string | null) => Promise<T> to support this.
  • The v1 write checkpoint endpoint in checkpointing.ts throws on null HEAD (v1 doesn't support the polling fallback).

Change stream configuration

On Cosmos DB:

  • Uses cluster-level aggregate (admin db + allChangesForCluster) instead of database-level streams
  • Skips $changeStreamSplitLargeEvent (not supported)
  • Skips showExpandedEvents (not supported)
  • Forces fullDocument: 'updateLookup' (post-images not supported)
Implementation details

All guards are in rawChangeStreamBatches() in ChangeStream.ts, which builds the $changeStream aggregation pipeline:

  • Cluster-level aggregate: Cosmos DB does not support database-level change streams (db.watch()). The code uses client.db('admin') with allChangesForCluster: true, reusing the same path as multi-database replication on standard MongoDB. The $match filter on namespaces handles collection scoping.
  • $changeStreamSplitLargeEvent: Pushed onto the pipeline only when !this.isCosmosDb. Without it, documents exceeding 16MB on Cosmos DB would fail — but this is an existing MongoDB limitation too, and Cosmos DB simply doesn't support the split workaround.
  • showExpandedEvents: Set on streamOptions only when !this.isCosmosDb. This enables DDL events (create, rename, drop) on standard MongoDB 6.0+.
  • fullDocument: Forced to 'updateLookup' on Cosmos DB. Standard MongoDB can use 'required' (which reads from changeStreamPreAndPostImages), but Cosmos DB doesn't support that collection option — even setting enabled: false throws. The usePostImages configuration is validated at startup and throws if enabled on Cosmos DB.

.lte() dedup guard

When the streaming loop resumes from a stored LSN, standard MongoDB may use startAtOperationTime — a legacy path where the server can redeliver events at the boundary timestamp. The .lte(startAfter) guard skips events at or before the resume point to prevent duplicate processing. When resumeAfter is used instead (which includes Cosmos DB), the server guarantees no duplicates and the guard is unnecessary.

On Cosmos DB, the guard is actively harmful: wallTime has second precision (increment 0), so every event within the same wall-clock second as the last checkpoint would satisfy .lte() and be silently dropped — causing data loss after restart. The guard is skipped when isCosmosDb is true.

Implementation details
  • The guard in streamChangesInternal is wrapped in if (!this.isCosmosDb && startAfter != null && ...).
  • A try/catch around the check handles events with neither clusterTime nor wallTime — the event is not skipped.
  • The resumeAfter path is the default for Cosmos DB because resume tokens are always persisted with checkpoint LSNs.

Keepalive guard

MongoDB's change stream API returns a resume token with every cursor batch — including empty batches from getMore when no events are available. This token is an opaque marker representing the cursor's current position in the stream. A new change stream opened with resumeAfter: <token> picks up from exactly that position.

When the change stream cursor returns empty batches (no data events), the streaming loop periodically persists the latest resume token to PowerSync's internal storage. This keepalive prevents the stored resume position from going stale — on standard MongoDB, a token older than the oplog retention window becomes unusable on restart (forcing a full re-snapshot), and on any backend a stale position means re-processing events unnecessarily.

On standard MongoDB, the keepalive path calls parseResumeTokenTimestamp() to extract the timestamp from the resume token's _data hex string, then constructs a MongoLSN with both the timestamp and the token for persistence. Cosmos DB resume tokens are opaque base64 blobs that can't be parsed by parseResumeTokenTimestamp(), which expects a specific hex byte layout. Instead, the keepalive path constructs the timestamp from Date.now() at second precision and pairs it with the raw resume token. The token is still persisted and usable for resumeAfter on restart — only the timestamp derivation differs.

Implementation details
  • The keepalive branch in streamChangesInternal in ChangeStream.ts is gated by if (this.isCosmosDb).
  • Standard path: MongoLSN.fromResumeToken(resumeToken) — calls parseResumeTokenTimestamp() which reads bytes 1-8 of the hex _data field.
  • Cosmos DB path: new MongoLSN({ timestamp: Timestamp.fromBits(0, Math.floor(Date.now() / 1000)), resume_token: resumeToken }) — bypasses token parsing entirely.
  • The Date.now() timestamp is structurally required by MongoLSN to construct a comparable string, but functionally unused on Cosmos DB — the resume token is what matters for stream resumption, and the Cosmos DB code paths don't use the timestamp for ordering or dedup.
  • The keepalive is throttled by keepaliveIntervalMs (default 60s) and only fires when waitForCheckpointLsn is null (no pending checkpoint).

Testing

Setup guide: See modules/module-mongodb/test/COSMOS_DB_TESTING.md for environment variables, connection URI format, and test commands.

When run against a MongoDB backend

59/59 existing tests pass. No regressions. 1 test skips, meant to be run against Cosmos DB (×3 storage versions).

When run against a Cosmos DB backend

29/29 Cosmos DB tests pass (verified before the merge with upstream main). Tests include:

  • Basic replication (insert, update, delete)
  • Sentinel checkpoint resolution
  • Write checkpoint flow
  • Keepalive with Cosmos DB resume tokens
  • Resume after restart
  • Data events not dropped after restart (.lte() guard)

10 existing tests are skipped on Cosmos DB (unsupported features: post-images, large record split, drop/rename events, wildcard collections test.skipIf(isCosmosDb)).

Post-merge status

After merging with upstream main (PR #591 raw change streams), standard MongoDB tests pass (59/59). Cosmos DB tests pass (30/30) after fixing two upstream Cosmos DB server changes — see Cosmos DB upstream changes below.

Files changed

14 files changed (click to expand)
File Change
modules/module-mongodb/src/replication/ChangeStream.ts Core Cosmos DB workarounds
modules/module-mongodb/src/replication/MongoRelation.ts Sentinel mode for createCheckpoint
modules/module-mongodb/src/replication/replication-utils.ts isdbgrid guard
modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts Cosmos DB detection + sentinel for createReplicationHead
packages/service-core/src/util/checkpointing.ts Cosmos DB HEAD capture (direct storage read)
packages/service-core/src/api/RouteAPI.ts ReplicationHeadCallback accepts string | null
packages/service-core/src/routes/endpoints/checkpointing.ts v1 endpoint null HEAD guard
modules/module-postgres/test/src/util.ts Null guard for lsn after ReplicationHeadCallback type change
modules/module-mongodb/test/src/cosmosdb_mode.test.ts Cosmos DB integration tests
modules/module-mongodb/test/src/cosmosdb_helpers.test.ts Unit tests for helpers
modules/module-mongodb/test/src/change_stream.test.ts skipIf guards for Cosmos DB
modules/module-mongodb/test/src/change_stream_utils.ts keepaliveIntervalMs support
modules/module-mongodb/test/COSMOS_DB_TESTING.md Test setup and run instructions

Cross-module impact

The ReplicationHeadCallback type in packages/service-core/src/api/RouteAPI.ts was changed from (head: string) => Promise<T> to (head: string | null) => Promise<T> to support the Cosmos DB sentinel flow. This required a null guard in:

  • packages/service-core/src/routes/endpoints/checkpointing.ts (v1 endpoint)
  • packages/service-core/src/util/checkpointing.ts (v2 write checkpoint)
  • modules/module-postgres/test/src/util.ts (Postgres test utility)

The Postgres production code is unaffected — only the test utility needed a guard.

Behavioral gaps from unsupported features

10 existing tests are skipped on Cosmos DB (test.skipIf(isCosmosDb)) because they exercise features Cosmos DB doesn't support. These gaps have not been verified independently against a live cluster — the risks below are inferred from the feature absence.

Post-images (changeStreamPreAndPostImages) — 7 tests skipped

Cosmos DB doesn't support stored post-images, so fullDocument: 'updateLookup' is used instead. The difference: updateLookup re-queries the document at read time, while post-images store the document state at write time.

Risk: Occasional missing update data. If a document is updated and then quickly deleted (or updated again) before the updateLookup re-query executes, fullDocument is null. PowerSync skips the update — the client sees the previous state followed by the delete, missing the intermediate update. This would be a brief data staleness window, not permanent data loss — the data self-corrects once the delete (or next update) is processed.

Large record splitting ($changeStreamSplitLargeEvent) — 1 test skipped

Cosmos DB doesn't support the pipeline stage that splits change events exceeding 16MB into fragments.

Risk: Replication stall on oversized documents. If a change event exceeds 16MB, the change stream would error. Replication restarts and hits the same error — an infinite restart loop until the document is reduced in size or the change ages out. No data loss for other documents, but replication would be blocked.

DDL events (drop, rename, showExpandedEvents) — 2 tests skipped

Cosmos DB may not emit drop or rename events on the change stream.

Risk: Stale phantom data. If a collection is dropped, PowerSync would never learn about it. The dropped collection's data would persist in PowerSync's storage indefinitely, serving stale records to clients that no longer exist in the source. If a collection is renamed, PowerSync would continue tracking the old name — new writes under the new name are missed. Neither case would be self-correcting. Recovery would require restarting replication (which triggers a re-snapshot and reconciliation).

Known limitations

  • wallTime precision: See wallTime as clusterTime substitute above. Second-level precision means identical LSNs for events in the same second. Impact is negligible in practice.
  • maxAwaitTime not supported: Cosmos DB returns empty batches immediately instead of long-polling. The streaming loop polls with the configured maxAwaitTimeMS which acts as a timeout on the raw getMore.
  • Cosmos DB test timeouts: Tests use 50s poll deadline and 120s test timeout for remote clusters with variable propagation delay (10-30s spikes). Co-located deployments would be much faster.

Dependencies

Cosmos DB upstream changes

The Cosmos DB vCore dev cluster received a server-side update that broke two assumptions in this PR. Both were fixed:

  1. Detection field renamed: The hello response changed from internal.cosmos_versions to internal.documentdb_versions. All three detection sites now check both field names for backwards compatibility.

  2. Resume token format changed: Individual change event _id fields use a _data key that resumeAfter now rejects. The batch-level postBatchResumeToken uses a _token key that is accepted. On Cosmos DB, LSN construction now uses the batch-level postBatchResumeToken instead of changeDocument._id. Standard MongoDB is unaffected — both formats work there.

Sleepful and others added 13 commits April 7, 2026 23:45
ConnectionURI's username/password getters return URL-encoded values
(e.g., %3D instead of =). When these are passed to MongoClient's auth
option, SCRAM authentication fails because the driver uses the encoded
string as the literal password.

This affects any MongoDB user whose password contains characters that
get URL-encoded (=, @, +, /, %, etc.).
- createCheckpoint without forceCosmosDb now returns a wall-clock LSN
  instead of a sentinel string, fixing lexicographic comparison in
  no_checkpoint_before storage boundaries
- Re-apply setName null guard (hello.setName == null && !isCosmosDb)
  that was lost during rebase

The lexicographic comparison bug: no_checkpoint_before is stored in bucket
storage and compared with `lsn >= no_checkpoint_before` to determine if
the stream has advanced past the snapshot boundary. Normal LSN strings
start with hex digits (0-9, a-f). The sentinel string started with "s"
(sentinel:_standalone_checkpoint:1). In ASCII ordering, all hex digits
are less than "s", so the comparison was always false — the stream could
never be considered caught up past the snapshot boundary. The fix returns
a real wall-clock LSN for storage boundaries and reserves the sentinel
format for the streaming loop, which matches by document content instead
of lexicographic comparison.
Replace the forceCosmosDb flag with a mode parameter ("lsn" | "sentinel")
that describes what the caller needs, not which database is in use.

- mode "lsn" (default): returns a real LSN string for storage boundaries
  (uses operationTime, falls back to wall clock on Cosmos DB)
- mode "sentinel": returns a sentinel marker for event-based matching
  in the streaming loop

The caller chooses based on how the result will be compared, not based
on the database type. isCosmosDb on ChangeStream controls which mode
is passed.
When isCosmosDb is true, all LSN production must use wall-clock timestamps
(second precision, increment 0) to be consistent with getEventTimestamp()
wallTime-derived LSNs. Previously, createCheckpoint still used
session.operationTime (which has real increments) in some paths, causing
LSN comparisons to fail when both timestamps fell in the same second.

Changes:
- createCheckpoint accepts isCosmosDb option — skips operationTime,
  always uses wall clock
- Removed wallclock-lsn mode in favor of isCosmosDb flag
- getClientCheckpoint (test util) passes isCosmosDb through
- Fixed test assertion (sentinel checkpoint: exact ID instead of matcher)
- Fixed resume test (set syncRulesContent on second context)
- Skipped write checkpoint test (needs real Cosmos DB, not cosmosDbMode)
Remove the cosmosDbMode / isRealCosmosDb split. isCosmosDb is now set
only by server detection (hello.internal.cosmos_versions), never by a
test flag.

The sentinel approach is a single code path that works on both:

The sentinel logic itself is identical on MongoDB and Cosmos DB:
write a checkpoint document ({$inc: {i: 1}}), wait for it in the
change stream, match by documentKey._id and fullDocument.i, use
the event timestamp as the LSN, commit. This passes on both.

The branching between MongoDB and Cosmos DB is minimal and
environmental, not architectural:
- Which stream to open (client.watch vs db.watch)
- Which timestamp to use (wallTime vs clusterTime)
- Whether to initialize the stream before checkpointing
- Whether startAtOperationTime is available

These are server-level differences, not sentinel-level. The sentinel
matching, the checkpoint resolution chain, and batch.commit are the
same code on both backends.

Why cosmosDbMode on standard MongoDB does not work:

The test flag tried to force the Cosmos DB code path on standard
MongoDB. The problems were all about mixing two environments, not
about the sentinel approach:

1. Lazy ChangeStream initialization. The MongoDB driver's ChangeStream
   is lazy — the aggregate command is not sent until the first
   tryNext()/hasNext() call. On standard MongoDB, startAtOperationTime
   sets the stream's start position explicitly, so events committed
   before the first tryNext() are included. On Cosmos DB (no
   startAtOperationTime), the stream starts from "now" — whenever the
   first tryNext() runs. This requires opening the stream and forcing
   initialization BEFORE creating checkpoint documents, reversing the
   order used on standard MongoDB.

2. Wall-clock LSN precision. Cosmos DB events use wallTime (second
   precision, increment 0) instead of clusterTime (sub-second, real
   increment). When cosmosDbMode forces wallTime on standard MongoDB,
   the createCheckpoint function still has access to operationTime
   (which has real increments). Mixing the two clock sources causes
   LSN comparisons to fail when timestamps fall in the same second:
   wallTime produces increment 0, operationTime produces increment N,
   and 0 < N means the checkpoint never resolves.

3. Sentinel checkpoint matching. The sentinel flow depends on the
   stream being initialized before the marker is written. Combined
   with issue 1, this creates a deadlock on standard MongoDB where
   tryNext() blocks waiting for events that have not been created yet.

These issues compound: fixing one exposes the next. A test flag that
partially simulates Cosmos DB creates more problems than it solves
because the underlying assumptions about stream initialization and
clock sources are fundamentally different between the two servers.

Changes:
- Remove cosmosDbMode from ChangeStreamOptions
- Remove isRealCosmosDb field — isCosmosDb is the only flag
- createCheckpoint auto-detects via session.operationTime == null
- Integration tests skip unless COSMOS_DB_TEST=true
- Unskip write checkpoint test (works against real Cosmos DB)
- Fix write checkpoint assertion (returns object, not bigint)
- Revert getClientCheckpoint to simple cp.lsn >= lsn comparison

Remaining test failures against Cosmos DB (3/26):

  resume after restart — PSYNC_S1346 Error reading MongoDB ChangeStream

  The resume test stops streaming, creates a new context, loads active
  sync rules, and starts streaming again from the stored resume token.
  On Cosmos DB this fails with a change stream error. Likely causes:
  - Cosmos DB resume tokens may expire faster than MongoDB (400MB
    change stream log limit vs time-based oplog retention)
  - The token format (base64 vs hex) may not round-trip correctly
    through MongoLSN serialization/deserialization
  - Cosmos DB may not support resumeAfter with tokens from a previous
    connection
  Needs separate investigation of the resume token persistence path
  in MongoLSN.fromSerialized().
isCosmosDb was only set during getSnapshotLsn() which runs on initial
sync. After restart (snapshot already done), the flag stayed false,
causing the stream to open with MongoDB-only features that Cosmos DB
rejects: $changeStreamSplitLargeEvent, showExpandedEvents, db.watch().

Extract detection into a reusable detectCosmosDb() method (idempotent,
runs hello once per ChangeStream instance). Call it at the top of both
initReplication() and streamChangesInternal() so detection happens
before any change stream operation, regardless of whether initial sync
runs.

This was a production-blocking bug: every service restart after initial
sync would crash the replication loop with PSYNC_S1346.

Result: 25/26 Cosmos DB tests pass. The one remaining failure is
"resume after restart" on storage v2 only — a storage-version-specific
issue unrelated to detection.
On Cosmos DB, wallTime has second precision (increment 0). After a
restart, the .lte() check at line 1064 compares the new event timestamp
against startAfter (from the last checkpoint). If events arrive within
the same wall-clock second as the last checkpoint, their timestamps
equal startAfter and .lte() returns true — silently dropping the events.

On standard MongoDB, clusterTime has monotonically increasing increments
within a second, so new events always have strictly greater timestamps.

The .lte() check is a client-side dedup guard only needed for the legacy
startAtOperationTime path. When resumeAfter is used (which includes
Cosmos DB), the server guarantees no duplicates.

The resume test still has an intermittent failure (~30% flake rate)
on Cosmos DB, which appears to be a separate timing issue in the test
utility getClientCheckpoint — not a production bug.
Three changes that together bring the Cosmos DB test suite to 29/29:

1. createWriteCheckpoint polling: use >= instead of > for LSN comparison

   The sentinel polling in checkpointing.ts compared cp.lsn > baselineLsn
   (strict greater). On Cosmos DB, wall-clock LSNs have second precision
   (increment 0). When the sentinel commit and the baseline fall in the
   same wall-clock second, they produce identical LSNs, so strict >
   never resolves — the poll hangs until a commit in the next second,
   or times out at 30s.

   Changed to >= which resolves immediately with the current LSN as HEAD.
   This is correct because the HEAD just needs to be a valid replication
   position — the sentinel write guarantees the streaming loop will
   eventually advance past it. The sync stream's watchCheckpointChanges
   resolves the write checkpoint when replication naturally advances.

   Note: with >= the polling loop effectively always resolves on the
   first iteration (baseline is already in storage). The loop structure
   is preserved for clarity but could be simplified in a follow-up.

   History of this comparison:
   - Original: cp.lsn > baselineLsn (strict) — hung on same-second LSNs
   - Attempted: cp.checkpoint > baselineId — broke tests entirely
     (checkpoint IDs did not compare as expected)
   - Current: cp.lsn >= baselineLsn — resolves reliably

2. .lte() dedup guard: updated warning comment

   The "data events not dropped after restart (lte guard)" test validates
   that data survives restart on Cosmos DB, but cannot reproduce the
   specific same-second timing condition (the getCheckpoint call needed
   to initialize the stream advances past the current second). Updated
   the code comment to reflect this — the isCosmosDb guard is verifiable
   by code inspection rather than a timing-dependent test.

3. Resume test: polling approach with retries

   The resume-after-restart test was flaky (~30% failure) because
   getBucketData -> getClientCheckpoint could resolve before data events
   were committed (same-second LSN race). Changed to a polling approach:
   call getBucketData repeatedly with short timeouts until the expected
   data appears. This mirrors production behavior where write checkpoints
   may take up to ~1s to resolve on a quiet Cosmos DB system.

   Also added a new "lte guard" test that specifically verifies data
   events survive restart by polling storage directly.

Test results against Cosmos DB: 29/29 pass (0 flakes across 3+ runs)
Test results against standard MongoDB: no regression
Replace the polling loop in createWriteCheckpoint with a direct read
of the current storage checkpoint LSN.

The polling loop (get baseline → write sentinel → poll until LSN
advances) was unnecessary: with >= comparison, the poll always resolved
on the first iteration because the baseline LSN was already in storage.
The loop was effectively just reading the current LSN.

The simplified version reads the current checkpoint LSN directly and
uses it as HEAD. This is correct because:
- createReplicationHead already wrote a sentinel to _powersync_checkpoints
- The sentinel guarantees the streaming loop will advance past this point
- The HEAD just needs to be a valid LSN at or before the sentinel
- The sync stream resolves the write checkpoint when replication advances

Removes ~15 lines of polling/timeout code.
Bug: openChangeStream() checked streamOptions.startAtOperationTime
(always undefined — constructed without it) instead of startAfter
(the local variable with the parsed timestamp). This made the else-if
branch dead code — startAtOperationTime was never set when resumeAfter
was null.

This broke the legacy resume path where the stored LSN has no resume
token (only a hex timestamp from keepalive/snapshot). Without
startAtOperationTime, the stream opened from "now" instead of the
stored position, potentially missing the initial checkpoint event
and causing batch.commit() to never be called.

Introduced in fdf840c (feat: implement Cosmos DB workarounds).
Found by binary search: passes at bd3170c (auth fix), fails at
f6ba463 (scaffolding), root cause in the openChangeStream refactor.

Also reverted tsconfig.base.json target from ES2024 back to esnext
(the ES2024 change was a workaround for Node 22 not supporting native
await using — CI uses Node 24 which does).
Cosmos DB has a variable delay between accepting a write and making it
visible on the change stream cursor (internal propagation, not network
latency). Against the remote dev cluster this can take 10-30s during
spikes, though in a co-located deployment it would be much faster.

Previous timeouts (25s poll, 60s test) were insufficient — the lte guard
and resume tests flaked at ~20-30% rate. Increased to 50s poll deadline
and 120s test timeout. 5 consecutive runs with 0 flakes (29/29 each).

maxAwaitTime (when supported) would reduce polling overhead but would
not help with propagation delay — the event is simply not available yet.
The generous timeouts are appropriate for prototype-quality tests run
manually against a remote cluster, not for CI.
@changeset-bot
Copy link
Copy Markdown

changeset-bot bot commented Apr 14, 2026

🦋 Changeset detected

Latest commit: 72257d2

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 12 packages
Name Type
@powersync/service-module-mongodb Minor
@powersync/service-schema Patch
@powersync/service-image Patch
@powersync/service-core Patch
@powersync/service-core-tests Patch
@powersync/service-module-core Patch
@powersync/service-module-mongodb-storage Patch
@powersync/service-module-mssql Patch
@powersync/service-module-mysql Patch
@powersync/service-module-postgres-storage Patch
@powersync/service-module-postgres Patch
test-client Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@Sleepful Sleepful force-pushed the cosmos branch 4 times, most recently from 9d2c49a to 1bf3b44 Compare April 14, 2026 04:54
Merges upstream main which includes PR #591 (raw change streams) and
PR #599 (direct BSON Buffer -> JSON conversion).

Auth fix conflicts (types.ts, config.test.ts) resolved — both sides
had the same fix, upstream also added database name decoding.

ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the
MongoDB driver ChangeStream with a custom RawChangeStream using raw
aggregate + getMore. Our Cosmos DB changes need to be re-applied to
the new code structure. Resolved in the next commit.

resolve: ChangeStream.ts merge conflicts for raw change streams

Re-applied all Cosmos DB changes to the new raw change stream code
structure from PR #591. The raw aggregate approach is better for
Cosmos DB: no lazy ChangeStream init, explicit cursor management,
$changeStream stage built directly in pipeline.

Changes applied to new structure:
- detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal
- getEventTimestamp() adapted to ProjectedChangeStreamDocument type
- Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer)
- Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents
- Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb
- startAtOperationTime fix (startAfter != null)
- Keepalive guard for Cosmos DB resume tokens
- .lte() dedup guard skip on Cosmos DB
- wallTime tracking for replication lag
- Added changeset for @powersync/service-module-mongodb (minor)

Verified: 59/59 standard MongoDB tests pass.
Cosmos DB cluster is currently down — tests blocked by TLS timeout.
Code audit of RawChangeStream.ts found no compatibility issues:
cursor ID type auto-fixed by BigInt, postBatchResumeToken needs
empirical verification when cluster is back.
Remove outdated "resume on storage v2" issue (resolved by increasing
poll timeouts to 50s). Replace with notes about propagation delay and
cluster availability.
@Sleepful Sleepful marked this pull request as draft April 14, 2026 08:53
The old comment claimed the change stream opens before the checkpoint is
created on Cosmos DB. In reality, the checkpoint is created first, then
the stream opens. The first checkpoint may be missed, but the retry loop
(which re-creates checkpoints every second) handles this safely.
@Sleepful Sleepful requested a review from rkistner April 15, 2026 18:45
…valent

The Cosmos DB "replicating basic values" test is a reduced-scope replacement
for the standard test — without post-images, replaceOne, or bigint. Add a
comment making this relationship explicit.
Cosmos DB vCore renamed the internal version field in the hello response
from cosmos_versions to documentdb_versions in a recent server-side update.
Without this fix, isCosmosDb is never set to true and the isdbgrid guard
throws PSYNC_S1341 on connection.

Updated all three detection sites (ChangeStream.ts, replication-utils.ts,
MongoRouteAPIAdapter.ts) to check both field names. Added a test case for
the new field name in cosmosdb_helpers.test.ts.
Cosmos DB vCore changed the resume token format in a recent server-side
update: event._id now uses a _data field that is rejected by resumeAfter,
while postBatchResumeToken uses a _token field that is accepted. On
standard MongoDB both formats work, but Cosmos DB now only accepts the
batch-level token.

Use the batch resumeToken (from postBatchResumeToken) instead of
changeDocument._id when constructing MongoLSN on Cosmos DB. Three call
sites in ChangeStream.ts are affected: getSnapshotLsn, checkpoint commit,
and periodic resume LSN persistence during catchup.
…ge warning

Add the missing "data events not dropped" test to the coverage table, include
test counts (30 total: 15 integration across 3 storage versions + 15 unit),
and warn about Cosmos DB server-side field renames that can break detection
and resume token handling.
@Sleepful Sleepful marked this pull request as ready for review April 16, 2026 06:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants