Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
.swc

node_modules
.pnpm-store
dist

.env
Expand Down
24 changes: 11 additions & 13 deletions docs/modules/convex/convex-write-checkpoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,17 @@ The managed write checkpoint flow is:
sync can expose the write checkpoint id to the client.

For Convex, the source position is the Convex replication cursor. The current
implementation in `createReplicationHead`:
managed write-checkpoint flow:

1. calls `getHeadCursor()` to read the current global Convex head,
2. invokes the callback with the original head cursor so PowerSync stores the
managed write checkpoint mapping,
3. calls `createWriteCheckpointMarker()` to run the
`powersync_checkpoints:createCheckpoint` Convex mutation.
2. stores the managed write checkpoint mapping with the original head cursor (in
the `createReplicationHead()` callback),
3. calls `createWriteCheckpointMarker()` only if the callback reports that storage
advanced a managed checkpoint (`shouldAdvance: true`).

The callback stores the managed write checkpoint in bucket storage with the
original head as the replication head. The marker write is intentionally not the
write checkpoint position. It is a later Convex mutation whose job is to advance
the Convex delta stream beyond the stored head after the managed mapping exists.
The marker write is intentionally not the write checkpoint position. It is a
later Convex mutation whose job is to advance the Convex delta stream beyond the
stored head after the managed mapping exists.

The key invariant is that PowerSync must observe a checkpoint update at or past
the stored head after the managed write checkpoint mapping exists. Other source
Expand All @@ -63,10 +62,9 @@ can acknowledge the write checkpoint to the client.

## Case 1: no checkpoint collection

If there is no `powersync_checkpoints` collection, `createReplicationHead` can
still read the current Convex head and create a managed write checkpoint in
PowerSync storage. The problem is that nothing guarantees the Convex delta stream
will advance beyond that head.
If there is no `powersync_checkpoints` collection, PowerSync can still read the
current Convex head and create a managed write checkpoint in storage. The problem
is that nothing guarantees the Convex delta stream will advance beyond that head.

### Case 1.1: no replication lag

Expand Down
2 changes: 1 addition & 1 deletion docs/replication/01-core-concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ A write checkpoint lets a client wait until its own backend write has been obser

In managed mode, PowerSync stores a mapping from user/client id plus source replication head to an internal write checkpoint id. When a later storage checkpoint reaches or passes that source head, the sync stream can include the write checkpoint id for that client.

Source modules implement `RouteAPI.createReplicationHead()` to obtain a current source head and, where necessary, force a later observable source event so an idle source can still publish the acknowledgement.
Source modules implement `RouteAPI.createReplicationHead()` to obtain a current source head, let storage persist its write-checkpoint mapping via the callback, and then force a later observable source event when storage actually advances a managed write checkpoint (`shouldAdvance: true`).

## Replication Lock

Expand Down
10 changes: 6 additions & 4 deletions docs/replication/03-source-connector-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,16 @@ The `RouteAPI` adapter bridges API routes to source-specific capabilities.
The most replication-sensitive method is:

```ts
createReplicationHead<T>(callback: (head: string) => Promise<T>): Promise<T>
createReplicationHead<T>(callback: (head: string) => Promise<{ response: T; shouldAdvance: boolean }>): Promise<T>
```

The adapter must:
For managed write checkpoints, the source adapter must:

1. Read the current source replication head.
2. Call the callback with that head.
3. Ensure that the replication stream will later observe the head or a greater source position.
2. Call the callback with that head so storage can persist its write-checkpoint mapping.
3. If the callback returns `shouldAdvance: true`, ensure that the replication stream will later observe the head or a greater source position. If it returns `shouldAdvance: false` (e.g. only stale client-supplied requests were processed), skip the source marker.

Reading the head and forcing the marker happen within a single source session/connection where applicable, so the marker is causally ordered after the head handed to the callback.

Step 3 is important for managed write checkpoints. If the source database is idle, the mapping can be stored correctly but never become visible to a connected client unless replication observes a later checkpoint update.

Expand Down
4 changes: 3 additions & 1 deletion docs/replication/06-checkpoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@ The flow is:

The source marker does not need to be replicated as user data. It only needs to make the source stream advance after the mapping exists.

`/sync/checkpoint-request` accepts a client-supplied `checkpoint_request_id`. Storage treats this id as monotonic for the full user/client id: it only updates the managed write checkpoint when the supplied value is greater than the stored value. The response `write_checkpoint` is always the checkpoint value storage is actually at after handling the request. If a previous request already advanced storage to a larger value, the stale request does not update storage and the response returns that larger stored value so the client can detect that its request id was stale.

Managed write checkpoints require a comparable source head. The source adapter reads a position that should include the client's write, stores it with the generated write checkpoint id, and then ensures replication observes that position or a later one. Postgres uses `pg_current_wal_lsn()` and emits a logical message; MongoDB uses cluster time and writes to `_powersync_checkpoints`; other sources use their equivalent source position and marker behavior.

### Source-Side Checkpoint Markers

A source-side checkpoint marker is a source-visible event whose job is to make the replication stream advance after PowerSync has stored a managed write checkpoint mapping. It may be a logical replication message, a write to a `_powersync_checkpoints` or `powersync_checkpoints` table or collection, or another source-specific no-op event.

Use a marker table or collection when the connector cannot otherwise guarantee that a later ordered event will appear in the replication stream after `createReplicationHead()` calls its callback. This is common when:
Use a marker table or collection when the connector cannot otherwise guarantee that a later ordered event will appear in the replication stream after storage persists a managed write checkpoint for a source head. This is common when:

- The source stream only advances when source data changes, so idle databases can stop producing new positions.
- The source stream is filtered and a valid database head may include writes that are not visible to the stream.
Expand Down
25 changes: 25 additions & 0 deletions libs/lib-services/src/codec/codecs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,31 @@ export const date = t.codec<Date, string | DateTimeValue>(
}
);

export const bigint = t.codec<bigint, string | number | bigint>(
'bigint',
(value) => {
if (typeof value != 'bigint') {
throw new t.TransformError([`Expected bigint but got ${typeof value}`]);
}
return value.toString();
},
(value) => {
if (typeof value == 'bigint') {
return value;
}
if (typeof value == 'number') {
if (!Number.isSafeInteger(value)) {
throw new t.TransformError([`Expected safe integer but got ${value}`]);
}
return BigInt(value);
}
if (typeof value != 'string' || !/^-?[0-9]+$/.test(value)) {
throw new t.TransformError([`Expected integer string but got ${typeof value}`]);
}
return BigInt(value);
}
);

const assertObjectId = (value: any) => {
if (!bson.ObjectId.isValid(value)) {
throw new t.TransformError([`Expected an ObjectId but got ${typeof value}`]);
Expand Down
22 changes: 21 additions & 1 deletion libs/lib-services/src/codec/parsers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,26 @@ export const DateParser = t.createParser<typeof codecs.date>(codecs.date._tag, (
}
});

export const BigIntParser = t.createParser<typeof codecs.bigint>(codecs.bigint._tag, (_, { target }) => {
switch (target) {
case t.TransformTarget.Encoded: {
return {
oneOf: [
{ type: 'string', pattern: '^-?[0-9]+$' },
{
type: 'integer',
minimum: Number.MIN_SAFE_INTEGER,
maximum: Number.MAX_SAFE_INTEGER
}
]
};
}
case t.TransformTarget.Decoded: {
return { nodeType: 'bigint' };
}
}
});

export const BufferParser = t.createParser<typeof codecs.buffer>(codecs.buffer._tag, (_, { target }) => {
switch (target) {
case t.TransformTarget.Encoded: {
Expand All @@ -57,4 +77,4 @@ export const BufferParser = t.createParser<typeof codecs.buffer>(codecs.buffer._
}
});

export const parsers = [ObjectIdParser, ResourceIdParser, DateParser, BufferParser];
export const parsers = [ObjectIdParser, ResourceIdParser, DateParser, BigIntParser, BufferParser];
8 changes: 4 additions & 4 deletions modules/module-convex/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ The content below is written in an agents.md style describing the behavior of `m

## 8) Checkpointing and Consistency

- `createReplicationHead` must:
- Managed write checkpoint creation must:
1. resolve global head cursor,
2. pass that head to the callback so PowerSync stores the managed write checkpoint mapping,
3. then write a Convex checkpoint marker via `POST /api/mutation` (calls `powersync_checkpoints:createCheckpoint`).
- The marker must be written after the callback. If the marker is replicated before the managed write checkpoint mapping exists, an idle source can still leave the client waiting for a later observable checkpoint update.
2. store any managed write checkpoint mapping for that head,
3. then write a Convex checkpoint marker via `POST /api/mutation` (calls `powersync_checkpoints:createCheckpoint`) only if storage advanced a checkpoint.
- The marker must be written after the mapping is stored. If the marker is replicated before the managed write checkpoint mapping exists, an idle source can still leave the client waiting for a later observable checkpoint update.
- Source marker table: `powersync_checkpoints`
- Convex rejects table names starting with `_`, so no leading-underscore variant is used.
- The table has a single `last_updated` field; the mutation upserts one row (bounded to one row total).
Expand Down
14 changes: 9 additions & 5 deletions modules/module-convex/src/api/ConvexRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { api, ParseSyncConfigOptions, ReplicationHeadCallback, ReplicationLagOptions } from '@powersync/service-core';
import { api, ParseSyncConfigOptions, ReplicationLagOptions } from '@powersync/service-core';
import * as sync_rules from '@powersync/service-sync-rules';
import * as service_types from '@powersync/service-types';
import { isConvexCheckpointTable } from '../common/ConvexCheckpoints.js';
Expand Down Expand Up @@ -121,11 +121,15 @@ export class ConvexRouteAPIAdapter implements api.RouteAPI {
return undefined;
}

async createReplicationHead<T>(callback: ReplicationHeadCallback<T>): Promise<T> {
async createReplicationHead<T>(callback: api.ReplicationHeadCallback<T>): Promise<T> {
const head = await this.connectionManager.client.getHeadCursor();
const result = await callback(parseConvexLsn(head));
await this.connectionManager.client.createWriteCheckpointMarker();
return result;
const { response, shouldAdvance } = await callback(parseConvexLsn(head));

if (shouldAdvance) {
await this.connectionManager.client.createWriteCheckpointMarker();
}

return response;
}

async getConnectionSchema(): Promise<service_types.DatabaseSchema[]> {
Expand Down
21 changes: 18 additions & 3 deletions modules/module-convex/test/src/ConvexRouteAPIAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,35 @@ bucket_definitions:
await adapter.shutdown();
});

it('creates replication head from the global snapshot cursor', async () => {
it('gets and advances replication head from the global snapshot cursor', async () => {
const adapter = createAdapter();
const getHeadCursor = vi.fn(async (_options?: any) => HEAD_CURSOR);
const createWriteCheckpointMarker = vi.fn(async (_options?: any) => undefined);
(adapter as any).connectionManager.client.getHeadCursor = getHeadCursor;
(adapter as any).connectionManager.client.createWriteCheckpointMarker = createWriteCheckpointMarker;

const result = await adapter.createReplicationHead(async (head) => head);
expect(result).toBe(parseConvexLsn(HEAD_CURSOR));
const head = await adapter.createReplicationHead(async (head) => ({ response: head, shouldAdvance: true }));
expect(head).toBe(parseConvexLsn(HEAD_CURSOR));
expect(getHeadCursor).toHaveBeenCalledTimes(1);
expect(getHeadCursor).toHaveBeenCalledWith();
expect(createWriteCheckpointMarker).toHaveBeenCalledTimes(1);
expect(createWriteCheckpointMarker).toHaveBeenCalledWith();

await adapter.shutdown();
});

it('does not write a checkpoint marker when the callback reports no advance', async () => {
const adapter = createAdapter();
const getHeadCursor = vi.fn(async (_options?: any) => HEAD_CURSOR);
const createWriteCheckpointMarker = vi.fn(async (_options?: any) => undefined);
(adapter as any).connectionManager.client.getHeadCursor = getHeadCursor;
(adapter as any).connectionManager.client.createWriteCheckpointMarker = createWriteCheckpointMarker;

const head = await adapter.createReplicationHead(async (head) => ({ response: head, shouldAdvance: false }));
expect(head).toBe(parseConvexLsn(HEAD_CURSOR));
expect(getHeadCursor).toHaveBeenCalledTimes(1);
expect(createWriteCheckpointMarker).not.toHaveBeenCalled();

await adapter.shutdown();
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { migrations } from '@powersync/service-core';
import * as storage from '../../../storage/storage-index.js';
import { MongoStorageConfig } from '../../../types/types.js';

export const up: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;
const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
await db.write_checkpoints.createIndex(
{
checkpoint_requested_at: 1
},
{
name: 'checkpoint_requested_at',
// Only client-requested checkpoints have this field; generated
// checkpoints leave it unset. This keeps the index limited to the
// documents the compact job's retention delete scans.
partialFilterExpression: { checkpoint_requested_at: { $exists: true } }
}
);
} finally {
await db.client.close();
}
};

export const down: migrations.PowerSyncMigrationFunction = async (context) => {
const {
service_context: { configuration }
} = context;

const db = storage.createPowerSyncMongo(configuration.storage as MongoStorageConfig);

try {
if (await db.write_checkpoints.indexExists('checkpoint_requested_at')) {
await db.write_checkpoints.dropIndex('checkpoint_requested_at');
}
} finally {
await db.client.close();
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ export abstract class MongoCompactor {
protected readonly minChangeRatio: number;
protected readonly maxOpId: bigint;
protected readonly buckets: string[] | undefined;
protected readonly deleteCheckpointRequestsBefore: Date | undefined;
protected readonly signal?: AbortSignal;
protected readonly group_id: number;

Expand All @@ -113,6 +114,7 @@ export abstract class MongoCompactor {
this.minChangeRatio = options.minChangeRatio ?? DEFAULT_MIN_CHANGE_RATIO;
this.maxOpId = options.maxOpId ?? 0n;
this.buckets = options.compactBuckets;
this.deleteCheckpointRequestsBefore = options.deleteCheckpointRequestsBefore;
this.signal = options.signal;
this.logger = options.logger ?? defaultLogger;
}
Expand All @@ -123,6 +125,8 @@ export abstract class MongoCompactor {
* See /docs/storage/compacting-operations.md for details.
*/
async compact() {
await this.deleteOldCheckpointRequests();

if (this.buckets) {
for (const bucket of this.buckets) {
// We can make this more efficient later on by iterating through the buckets in a single query.
Expand All @@ -134,6 +138,19 @@ export abstract class MongoCompactor {
}
}

private async deleteOldCheckpointRequests() {
if (this.deleteCheckpointRequestsBefore == null) {
return;
}

this.signal?.throwIfAborted();
// The explicit $exists guarantees the query is a subset of the
// checkpoint_requested_at partial index's filter, so the planner can use it.
await this.db.write_checkpoints.deleteMany({
checkpoint_requested_at: { $exists: true, $lt: this.deleteCheckpointRequestsBefore }
});
}

/**
* Subset of compact, only populating checksums where relevant.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ export abstract class MongoSyncBucketStorage
this.writeCheckpointAPI.setWriteCheckpointMode(mode);
}

createManagedWriteCheckpoints(checkpoints: storage.ManagedWriteCheckpointOptions[]): Promise<Map<string, bigint>> {
createManagedWriteCheckpoints(
checkpoints: storage.ManagedWriteCheckpointOptions[]
): Promise<storage.CreateManagedWriteCheckpointsResult> {
return this.writeCheckpointAPI.createManagedWriteCheckpoints(checkpoints);
}

Expand Down
Loading
Loading