Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/per-change-write-ids.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@treecrdt/interface': minor
'@treecrdt/wa-sqlite': minor
'@treecrdt/sync': patch
---

Move local materialization write ids from the event root to each materialized change's `source.writeIds`.
7 changes: 5 additions & 2 deletions packages/treecrdt-engine-conformance/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ export function treecrdtEngineConformanceScenarios(): TreecrdtEngineConformanceS
run: scenarioLocalInsertWithPayload,
},
{
name: 'local ops: materialization events include writeId',
name: 'local ops: materialization changes include writeId',
run: scenarioLocalOpsMaterializationWriteId,
},
{
Expand Down Expand Up @@ -340,6 +340,7 @@ function assertChangeSource(
const change = event.changes.find((change) => change.node === node);
assert(change, `${label} change for node`);
assert(change.source, `${label} source`);
assert(change.source.operation, `${label} source operation`);
assertEqual(change.source.operation.id.counter, op.meta.id.counter, `${label} source counter`);
assertEqual(change.source.operation.lamport, op.meta.lamport, `${label} source lamport`);
assertBytesEqual(
Expand Down Expand Up @@ -570,7 +571,9 @@ async function scenarioLocalOpsMaterializationWriteId(
label: string,
) => {
assertEqual(events.length, 1, `${label} should emit one materialization event`);
assertArrayEqual(events[0]!.writeIds ?? [], [writeId], `${label} writeIds`);
for (const change of events[0]!.changes) {
assertArrayEqual(change.source?.writeIds ?? [], [writeId], `${label} change writeIds`);
}
assertEventNodeRefsContain(
materializationEventNodeRefs(events[0]!),
expectedRefs,
Expand Down
2 changes: 1 addition & 1 deletion packages/treecrdt-postgres-napi/tests/conformance.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ test('conformance registry includes materialization-event scenarios', () => {
expect(names).toContain('materialization events: structural batch');
expect(names).toContain('materialization events: payload coalescing');
expect(names).toContain('materialization events: defensive restore');
expect(names).toContain('local ops: materialization events include writeId');
expect(names).toContain('local ops: materialization changes include writeId');
});

maybeDescribe('engine conformance scenarios (postgres-napi engine)', () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/treecrdt-sqlite-node/tests/conformance.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ test('conformance registry includes materialization-event scenarios', () => {
expect(names).toContain('materialization events: structural batch');
expect(names).toContain('materialization events: payload coalescing');
expect(names).toContain('materialization events: defensive restore');
expect(names).toContain('local ops: materialization events include writeId');
expect(names).toContain('local ops: materialization changes include writeId');
});

test('sqlite auth-aware local write rolls back on auth failure', async () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/treecrdt-sync/tests/test-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ export function createInMemoryTestClient(

/**
* Mock engine: `append` / `appendMany` honor `WriteOptions.writeId` and emit
* `MaterializationEvent` (including `writeIds`) the same way as the dispatcher used in wa-sqlite.
* `MaterializationEvent` per-change write ids the same way as the dispatcher used in wa-sqlite.
*/
export function createInMemoryTestClientWithWriteId(
docId: string,
Expand Down
24 changes: 16 additions & 8 deletions packages/treecrdt-sync/tests/write-id.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function noopTransport() {
return wrapDuplexTransportWithCodec(wireA, treecrdtSyncV0ProtobufCodec);
}

test('onMaterialized receives writeIds from appendMany(…, { writeId })', async () => {
test('onMaterialized receives per-change writeIds from appendMany(…, { writeId })', async () => {
const docId = `write-id-${Math.random().toString(16).slice(2)}`;
const { client } = createInMemoryTestClientWithWriteId(docId, []);
const transport = noopTransport();
Expand All @@ -53,23 +53,30 @@ test('onMaterialized receives writeIds from appendMany(…, { writeId })', async
await sync.close();
}

const withWrite = events.find(
(e) =>
e.writeIds?.[0] === 'my-batch-42' &&
e.changes.some((c) => c.kind === 'insert' && c.node === n1),
const withWrite = events.find((e) =>
e.changes.some(
(c) => c.kind === 'insert' && c.node === n1 && c.source?.writeIds?.includes('my-batch-42'),
),
);
expect(withWrite).toBeDefined();
expect(withWrite!.writeIds).toEqual(['my-batch-42']);
expect('writeIds' in withWrite!).toBe(false);
expect(withWrite!.changes.every((change) => change.source?.writeIds?.[0] === 'my-batch-42')).toBe(
true,
);
});

test('onMaterialized receives writeIds from append(…, { writeId })', async () => {
test('onMaterialized receives per-change writeIds from append(…, { writeId })', async () => {
const docId = `write-id-s-${Math.random().toString(16).slice(2)}`;
const { client } = createInMemoryTestClientWithWriteId(docId, []);
const transport = noopTransport();
const sync = createTreecrdtWebSocketSyncFromTransport(client, transport, undefined);
const seen: string[] = [];
let sawRootWriteIds = false;
const u = client.onMaterialized((e) => {
if (e.writeIds?.[0]) seen.push(e.writeIds[0]!);
sawRootWriteIds ||= 'writeIds' in e;
for (const change of e.changes) {
if (change.source?.writeIds?.[0]) seen.push(change.source.writeIds[0]!);
}
});
const n1 = nodeIdFromInt(1);
const op: Operation = makeOp(r, 1, 1, {
Expand All @@ -84,5 +91,6 @@ test('onMaterialized receives writeIds from append(…, { writeId })', async ()
u();
await sync.close();
}
expect(sawRootWriteIds).toBe(false);
expect(seen).toContain('single-append');
});
40 changes: 27 additions & 13 deletions packages/treecrdt-ts/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ export type MaterializationSource = {
*
* `replica` is a low-level CRDT replica id, not an auth/user identity.
*/
operation: {
operation?: {
id: {
replica: Uint8Array;
counter: number;
};
/** Lamport timestamp assigned to the operation. */
lamport: number;
};
/** Local write ids supplied to the append/local write API that produced this visible change. */
writeIds?: string[];
/** Future auth metadata for the operation signer, when available. */
signer?: {
publicKey: Uint8Array;
Expand All @@ -23,10 +25,10 @@ export type MaterializationSource = {

type ChangeSource = {
/**
* Operation source metadata when the materializer can attribute the visible change to one exact op.
* Source metadata for the visible change.
*
* Conservative catch-up/rebuild paths may omit this when a visible change is derived from rebuilt
* state instead of a single operation.
* Conservative catch-up/rebuild paths may omit operation metadata when a visible change is derived
* from rebuilt state instead of a single operation.
*/
source?: MaterializationSource;
};
Expand Down Expand Up @@ -70,11 +72,9 @@ export function emptyMaterializationOutcome(headSeq = 0): MaterializationOutcome

/**
* Event emitted after write-path materialization, or after read-path recovery advances a pending
* materialization frontier. `writeIds` echoes optional ids supplied to append APIs.
* materialization frontier. Local write ids are echoed on each affected change's `source.writeIds`.
*/
export type MaterializationEvent = MaterializationOutcome & {
writeIds?: string[];
};
export type MaterializationEvent = MaterializationOutcome;

export type MaterializationListener = (event: MaterializationEvent) => void;

Expand All @@ -94,10 +94,7 @@ export function createMaterializationDispatcher(): MaterializationDispatcher {

const emitOutcome = (outcome: MaterializationOutcome, writeId?: string) => {
if (outcome.changes.length === 0) return;
emitEvent({
...outcome,
...(writeId ? { writeIds: [writeId] } : {}),
});
emitEvent(addMaterializationWriteId(outcome, writeId));
};

const onMaterialized = (listener: MaterializationListener) => {
Expand All @@ -110,6 +107,23 @@ export function createMaterializationDispatcher(): MaterializationDispatcher {
return { emitEvent, emitOutcome, onMaterialized };
}

export function addMaterializationWriteId(
outcome: MaterializationOutcome,
writeId?: string,
): MaterializationEvent {
if (!writeId) return { ...outcome };
return {
...outcome,
changes: outcome.changes.map((change) => ({
...change,
source: {
...change.source,
writeIds: [...(change.source?.writeIds ?? []), writeId],
},
})),
};
}

export type WriteOptions = {
writeId?: string;
};
Expand All @@ -119,7 +133,7 @@ export type LocalWriteAuthSession = {
};

export type LocalWriteOptions = {
/** Echoed on the materialization event emitted by this local write. */
/** Echoed on each materialized change emitted by this local write. */
writeId?: string;

/**
Expand Down
33 changes: 22 additions & 11 deletions packages/treecrdt-ts/src/sqlite.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { SerializeNodeId, SerializeReplica, TreecrdtAdapter } from './adapter.js';
import { emptyMaterializationOutcome } from './engine.js';
import { addMaterializationWriteId, emptyMaterializationOutcome } from './engine.js';
import type {
LocalWriteOptions,
MaterializationEvent,
Expand Down Expand Up @@ -81,8 +81,7 @@ function emitLocalOutcome(
emit?: (event: MaterializationEvent) => void,
writeId?: string,
): void {
if (outcome.changes.length > 0)
emit?.({ ...outcome, ...(writeId ? { writeIds: [writeId] } : {}) });
if (outcome.changes.length > 0) emit?.(addMaterializationWriteId(outcome, writeId));
}

const ROOT_NODE_BYTES = nodeIdToBytes16(ROOT_NODE_ID_HEX);
Expand Down Expand Up @@ -732,18 +731,30 @@ function decodeSqliteMaterializationSource(raw: unknown): MaterializationSource
const value = raw as any;
const operation = value.operation as any;
const operationId = operation?.id as any;
return {
operation: {
id: {
replica: decodeReplicaId(operationId.replica),
counter: Number(operationId.counter),
},
lamport: Number(operation.lamport),
},
const source: MaterializationSource = {
...(operation && operationId
? {
operation: {
id: {
replica: decodeReplicaId(operationId.replica),
counter: Number(operationId.counter),
},
lamport: Number(operation.lamport),
},
}
: {}),
...(Array.isArray(value.writeIds) ? { writeIds: value.writeIds.map(String) } : {}),
...(value.signer?.publicKey
? { signer: { publicKey: Uint8Array.from(value.signer.publicKey) } }
: {}),
};

// Avoid exposing `source: {}` when a backend emits an empty or unknown source object.
const hasSource =
source.operation !== undefined ||
(source.writeIds?.length ?? 0) > 0 ||
source.signer !== undefined;
return hasSource ? source : undefined;
}

export function decodeSqliteMaterializationOutcome(raw: unknown): MaterializationOutcome {
Expand Down
18 changes: 12 additions & 6 deletions packages/treecrdt-wa-sqlite/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import type {
WriteOptions,
} from '@treecrdt/interface/engine';
import {
addMaterializationWriteId,
createMaterializationDispatcher,
createTreecrdtEngineLocal,
} from '@treecrdt/interface/engine';
Expand Down Expand Up @@ -190,8 +191,16 @@ function createClientMaterializationDispatcher(
};

const eventForPeers = (event: MaterializationEvent): MaterializationEvent => {
const { writeIds: _writeIds, ...nextEvent } = event;
return nextEvent;
return {
...event,
changes: event.changes.map((change) => {
if (!change.source?.writeIds) return change;
const { writeIds: _writeIds, ...source } = change.source;
if (Object.keys(source).length > 0) return { ...change, source };
const { source: _source, ...nextChange } = change;
return nextChange;
}),
};
};

const broadcast = (event: MaterializationEvent) => {
Expand All @@ -214,10 +223,7 @@ function createClientMaterializationDispatcher(

const emitOutcome: ClientMaterializationDispatcher['emitOutcome'] = (outcome, writeId) => {
if (outcome.changes.length === 0) return;
emitEvent({
...outcome,
...(writeId ? { writeIds: [writeId] } : {}),
});
emitEvent(addMaterializationWriteId(outcome, writeId));
};

const enableCrossTab = (nextScope: CrossTabMaterializationScope) => {
Expand Down
Loading