Skip to content

Commit 2f864ec

Browse files
refactor(materialization): move write ids to changes
1 parent 4bca56e commit 2f864ec

9 files changed

Lines changed: 92 additions & 43 deletions

File tree

.changeset/per-change-write-ids.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@treecrdt/interface': minor
3+
'@treecrdt/wa-sqlite': minor
4+
'@treecrdt/sync': patch
5+
---
6+
7+
Move local materialization write ids from the event root to each materialized change's `source.writeIds`.

packages/treecrdt-engine-conformance/src/index.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ export function treecrdtEngineConformanceScenarios(): TreecrdtEngineConformanceS
137137
run: scenarioLocalInsertWithPayload,
138138
},
139139
{
140-
name: 'local ops: materialization events include writeId',
140+
name: 'local ops: materialization changes include writeId',
141141
run: scenarioLocalOpsMaterializationWriteId,
142142
},
143143
{
@@ -340,6 +340,7 @@ function assertChangeSource(
340340
const change = event.changes.find((change) => change.node === node);
341341
assert(change, `${label} change for node`);
342342
assert(change.source, `${label} source`);
343+
assert(change.source.operation, `${label} source operation`);
343344
assertEqual(change.source.operation.id.counter, op.meta.id.counter, `${label} source counter`);
344345
assertEqual(change.source.operation.lamport, op.meta.lamport, `${label} source lamport`);
345346
assertBytesEqual(
@@ -570,7 +571,9 @@ async function scenarioLocalOpsMaterializationWriteId(
570571
label: string,
571572
) => {
572573
assertEqual(events.length, 1, `${label} should emit one materialization event`);
573-
assertArrayEqual(events[0]!.writeIds ?? [], [writeId], `${label} writeIds`);
574+
for (const change of events[0]!.changes) {
575+
assertArrayEqual(change.source?.writeIds ?? [], [writeId], `${label} change writeIds`);
576+
}
574577
assertEventNodeRefsContain(
575578
materializationEventNodeRefs(events[0]!),
576579
expectedRefs,

packages/treecrdt-postgres-napi/tests/conformance.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ test('conformance registry includes materialization-event scenarios', () => {
7676
expect(names).toContain('materialization events: structural batch');
7777
expect(names).toContain('materialization events: payload coalescing');
7878
expect(names).toContain('materialization events: defensive restore');
79-
expect(names).toContain('local ops: materialization events include writeId');
79+
expect(names).toContain('local ops: materialization changes include writeId');
8080
});
8181

8282
maybeDescribe('engine conformance scenarios (postgres-napi engine)', () => {

packages/treecrdt-sqlite-node/tests/conformance.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ test('conformance registry includes materialization-event scenarios', () => {
3737
expect(names).toContain('materialization events: structural batch');
3838
expect(names).toContain('materialization events: payload coalescing');
3939
expect(names).toContain('materialization events: defensive restore');
40-
expect(names).toContain('local ops: materialization events include writeId');
40+
expect(names).toContain('local ops: materialization changes include writeId');
4141
});
4242

4343
test('sqlite auth-aware local write rolls back on auth failure', async () => {

packages/treecrdt-sync/tests/test-helpers.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ export function createInMemoryTestClient(
125125

126126
/**
127127
* Mock engine: `append` / `appendMany` honor `WriteOptions.writeId` and emit
128-
* `MaterializationEvent` (including `writeIds`) the same way as the dispatcher used in wa-sqlite.
128+
* `MaterializationEvent` per-change write ids the same way as the dispatcher used in wa-sqlite.
129129
*/
130130
export function createInMemoryTestClientWithWriteId(
131131
docId: string,

packages/treecrdt-sync/tests/write-id.test.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ function noopTransport() {
2727
return wrapDuplexTransportWithCodec(wireA, treecrdtSyncV0ProtobufCodec);
2828
}
2929

30-
test('onMaterialized receives writeIds from appendMany(…, { writeId })', async () => {
30+
test('onMaterialized receives per-change writeIds from appendMany(…, { writeId })', async () => {
3131
const docId = `write-id-${Math.random().toString(16).slice(2)}`;
3232
const { client } = createInMemoryTestClientWithWriteId(docId, []);
3333
const transport = noopTransport();
@@ -53,23 +53,30 @@ test('onMaterialized receives writeIds from appendMany(…, { writeId })', async
5353
await sync.close();
5454
}
5555

56-
const withWrite = events.find(
57-
(e) =>
58-
e.writeIds?.[0] === 'my-batch-42' &&
59-
e.changes.some((c) => c.kind === 'insert' && c.node === n1),
56+
const withWrite = events.find((e) =>
57+
e.changes.some(
58+
(c) => c.kind === 'insert' && c.node === n1 && c.source?.writeIds?.includes('my-batch-42'),
59+
),
6060
);
6161
expect(withWrite).toBeDefined();
62-
expect(withWrite!.writeIds).toEqual(['my-batch-42']);
62+
expect('writeIds' in withWrite!).toBe(false);
63+
expect(withWrite!.changes.every((change) => change.source?.writeIds?.[0] === 'my-batch-42')).toBe(
64+
true,
65+
);
6366
});
6467

65-
test('onMaterialized receives writeIds from append(…, { writeId })', async () => {
68+
test('onMaterialized receives per-change writeIds from append(…, { writeId })', async () => {
6669
const docId = `write-id-s-${Math.random().toString(16).slice(2)}`;
6770
const { client } = createInMemoryTestClientWithWriteId(docId, []);
6871
const transport = noopTransport();
6972
const sync = createTreecrdtWebSocketSyncFromTransport(client, transport, undefined);
7073
const seen: string[] = [];
74+
let sawRootWriteIds = false;
7175
const u = client.onMaterialized((e) => {
72-
if (e.writeIds?.[0]) seen.push(e.writeIds[0]!);
76+
sawRootWriteIds ||= 'writeIds' in e;
77+
for (const change of e.changes) {
78+
if (change.source?.writeIds?.[0]) seen.push(change.source.writeIds[0]!);
79+
}
7380
});
7481
const n1 = nodeIdFromInt(1);
7582
const op: Operation = makeOp(r, 1, 1, {
@@ -84,5 +91,6 @@ test('onMaterialized receives writeIds from append(…, { writeId })', async ()
8491
u();
8592
await sync.close();
8693
}
94+
expect(sawRootWriteIds).toBe(false);
8795
expect(seen).toContain('single-append');
8896
});

packages/treecrdt-ts/src/engine.ts

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,16 @@ export type MaterializationSource = {
77
*
88
* `replica` is a low-level CRDT replica id, not an auth/user identity.
99
*/
10-
operation: {
10+
operation?: {
1111
id: {
1212
replica: Uint8Array;
1313
counter: number;
1414
};
1515
/** Lamport timestamp assigned to the operation. */
1616
lamport: number;
1717
};
18+
/** Local write ids supplied to the append/local write API that produced this visible change. */
19+
writeIds?: string[];
1820
/** Future auth metadata for the operation signer, when available. */
1921
signer?: {
2022
publicKey: Uint8Array;
@@ -23,10 +25,10 @@ export type MaterializationSource = {
2325

2426
type ChangeSource = {
2527
/**
26-
* Operation source metadata when the materializer can attribute the visible change to one exact op.
28+
* Source metadata for the visible change.
2729
*
28-
* Conservative catch-up/rebuild paths may omit this when a visible change is derived from rebuilt
29-
* state instead of a single operation.
30+
* Conservative catch-up/rebuild paths may omit operation metadata when a visible change is derived
31+
* from rebuilt state instead of a single operation.
3032
*/
3133
source?: MaterializationSource;
3234
};
@@ -70,11 +72,9 @@ export function emptyMaterializationOutcome(headSeq = 0): MaterializationOutcome
7072

7173
/**
7274
* Event emitted after write-path materialization, or after read-path recovery advances a pending
73-
* materialization frontier. `writeIds` echoes optional ids supplied to append APIs.
75+
* materialization frontier. Local write ids are echoed on each affected change's `source.writeIds`.
7476
*/
75-
export type MaterializationEvent = MaterializationOutcome & {
76-
writeIds?: string[];
77-
};
77+
export type MaterializationEvent = MaterializationOutcome;
7878

7979
export type MaterializationListener = (event: MaterializationEvent) => void;
8080

@@ -94,10 +94,7 @@ export function createMaterializationDispatcher(): MaterializationDispatcher {
9494

9595
const emitOutcome = (outcome: MaterializationOutcome, writeId?: string) => {
9696
if (outcome.changes.length === 0) return;
97-
emitEvent({
98-
...outcome,
99-
...(writeId ? { writeIds: [writeId] } : {}),
100-
});
97+
emitEvent(addMaterializationWriteId(outcome, writeId));
10198
};
10299

103100
const onMaterialized = (listener: MaterializationListener) => {
@@ -110,6 +107,23 @@ export function createMaterializationDispatcher(): MaterializationDispatcher {
110107
return { emitEvent, emitOutcome, onMaterialized };
111108
}
112109

110+
export function addMaterializationWriteId(
111+
outcome: MaterializationOutcome,
112+
writeId?: string,
113+
): MaterializationEvent {
114+
if (!writeId) return { ...outcome };
115+
return {
116+
...outcome,
117+
changes: outcome.changes.map((change) => ({
118+
...change,
119+
source: {
120+
...change.source,
121+
writeIds: [...(change.source?.writeIds ?? []), writeId],
122+
},
123+
})),
124+
};
125+
}
126+
113127
export type WriteOptions = {
114128
writeId?: string;
115129
};
@@ -119,7 +133,7 @@ export type LocalWriteAuthSession = {
119133
};
120134

121135
export type LocalWriteOptions = {
122-
/** Echoed on the materialization event emitted by this local write. */
136+
/** Echoed on each materialized change emitted by this local write. */
123137
writeId?: string;
124138

125139
/**

packages/treecrdt-ts/src/sqlite.ts

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { SerializeNodeId, SerializeReplica, TreecrdtAdapter } from './adapter.js';
2-
import { emptyMaterializationOutcome } from './engine.js';
2+
import { addMaterializationWriteId, emptyMaterializationOutcome } from './engine.js';
33
import type {
44
LocalWriteOptions,
55
MaterializationEvent,
@@ -81,8 +81,7 @@ function emitLocalOutcome(
8181
emit?: (event: MaterializationEvent) => void,
8282
writeId?: string,
8383
): void {
84-
if (outcome.changes.length > 0)
85-
emit?.({ ...outcome, ...(writeId ? { writeIds: [writeId] } : {}) });
84+
if (outcome.changes.length > 0) emit?.(addMaterializationWriteId(outcome, writeId));
8685
}
8786

8887
const ROOT_NODE_BYTES = nodeIdToBytes16(ROOT_NODE_ID_HEX);
@@ -732,18 +731,30 @@ function decodeSqliteMaterializationSource(raw: unknown): MaterializationSource
732731
const value = raw as any;
733732
const operation = value.operation as any;
734733
const operationId = operation?.id as any;
735-
return {
736-
operation: {
737-
id: {
738-
replica: decodeReplicaId(operationId.replica),
739-
counter: Number(operationId.counter),
740-
},
741-
lamport: Number(operation.lamport),
742-
},
734+
const source: MaterializationSource = {
735+
...(operation && operationId
736+
? {
737+
operation: {
738+
id: {
739+
replica: decodeReplicaId(operationId.replica),
740+
counter: Number(operationId.counter),
741+
},
742+
lamport: Number(operation.lamport),
743+
},
744+
}
745+
: {}),
746+
...(Array.isArray(value.writeIds) ? { writeIds: value.writeIds.map(String) } : {}),
743747
...(value.signer?.publicKey
744748
? { signer: { publicKey: Uint8Array.from(value.signer.publicKey) } }
745749
: {}),
746750
};
751+
752+
// Avoid exposing `source: {}` when a backend emits an empty or unknown source object.
753+
const hasSource =
754+
source.operation !== undefined ||
755+
(source.writeIds?.length ?? 0) > 0 ||
756+
source.signer !== undefined;
757+
return hasSource ? source : undefined;
747758
}
748759

749760
export function decodeSqliteMaterializationOutcome(raw: unknown): MaterializationOutcome {

packages/treecrdt-wa-sqlite/src/client.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import type {
2424
WriteOptions,
2525
} from '@treecrdt/interface/engine';
2626
import {
27+
addMaterializationWriteId,
2728
createMaterializationDispatcher,
2829
createTreecrdtEngineLocal,
2930
} from '@treecrdt/interface/engine';
@@ -190,8 +191,16 @@ function createClientMaterializationDispatcher(
190191
};
191192

192193
const eventForPeers = (event: MaterializationEvent): MaterializationEvent => {
193-
const { writeIds: _writeIds, ...nextEvent } = event;
194-
return nextEvent;
194+
return {
195+
...event,
196+
changes: event.changes.map((change) => {
197+
if (!change.source?.writeIds) return change;
198+
const { writeIds: _writeIds, ...source } = change.source;
199+
if (Object.keys(source).length > 0) return { ...change, source };
200+
const { source: _source, ...nextChange } = change;
201+
return nextChange;
202+
}),
203+
};
195204
};
196205

197206
const broadcast = (event: MaterializationEvent) => {
@@ -214,10 +223,7 @@ function createClientMaterializationDispatcher(
214223

215224
const emitOutcome: ClientMaterializationDispatcher['emitOutcome'] = (outcome, writeId) => {
216225
if (outcome.changes.length === 0) return;
217-
emitEvent({
218-
...outcome,
219-
...(writeId ? { writeIds: [writeId] } : {}),
220-
});
226+
emitEvent(addMaterializationWriteId(outcome, writeId));
221227
};
222228

223229
const enableCrossTab = (nextScope: CrossTabMaterializationScope) => {

0 commit comments

Comments
 (0)