Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions orga/changelog/fix-replication-sent-deletedfield-format.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- FIX replication `sent$` observable emitting documents in the master format (with the user-defined `deletedField`) instead of the typed `WithDeleted<RxDocType>` format (with `_deleted: boolean`), because the `deletedField` swap mutated the same row object that was later forwarded to subscribers
19 changes: 15 additions & 4 deletions src/plugins/replication/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,22 @@ export class RxReplicationState<RxDocType, CheckpointType> {
if (row.assumedMasterState) {
row.assumedMasterState = await pushModifier(row.assumedMasterState);
}
/**
* The deletedField swap must not mutate the original row,
* because that row is later forwarded to processed.up
* which feeds sent$. sent$ is typed as
* Observable<WithDeleted<RxDocType>> and must always emit
* documents in the WithDeleted format with `_deleted: boolean`,
* never the master-format `deletedField`.
*/
if (this.deletedField !== '_deleted') {
row.newDocumentState = swapDefaultDeletedTodeletedField(this.deletedField, row.newDocumentState) as any;
if (row.assumedMasterState) {
row.assumedMasterState = swapDefaultDeletedTodeletedField(this.deletedField, row.assumedMasterState) as any;
}
return {
...row,
newDocumentState: swapDefaultDeletedTodeletedField(this.deletedField, row.newDocumentState) as any,
assumedMasterState: row.assumedMasterState
? swapDefaultDeletedTodeletedField(this.deletedField, row.assumedMasterState) as any
: undefined
};
}
return row;
})
Expand Down
64 changes: 64 additions & 0 deletions test/unit/replication.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,70 @@ describe('replication.test.ts', () => {
localCollection.database.close();
remoteCollection.database.close();
});
it('sent$ must emit documents in WithDeleted format with _deleted field when deletedField is custom', async () => {
const localCollection = await humansCollection.createHumanWithTimestamp(0, randomToken(10), false);

// Prepare states before starting replication to avoid storage-specific
// intermediate emissions for insert-then-remove in a single run.
await localCollection.insert(schemaObjects.humanWithTimestampData({ id: 'alive' }));
const removeDoc = await localCollection.insert(schemaObjects.humanWithTimestampData({ id: 'to-remove' }));
await removeDoc.remove();

const pushedToMaster: any[] = [];
const replicationState = replicateRxCollection<HumanWithTimestampDocumentType, any>({
collection: localCollection,
replicationIdentifier: REPLICATION_IDENTIFIER_TEST,
deletedField: 'is_deleted',
live: false,
push: {
handler: (rows) => {
rows.forEach(row => pushedToMaster.push(row.newDocumentState));
return Promise.resolve([]);
}
}
});
ensureReplicationHasNoErrors(replicationState);

const sentDocs: WithDeleted<HumanWithTimestampDocumentType>[] = [];
replicationState.sent$.subscribe(d => sentDocs.push(d));

await replicationState.awaitInitialReplication();

// The push handler must receive the master format with `is_deleted` (not `_deleted`)
assert.ok(pushedToMaster.length >= 2, 'push handler must receive at least both docs');
pushedToMaster.forEach((doc) => {
assert.strictEqual(typeof doc.is_deleted, 'boolean', 'push handler must see is_deleted, got ' + JSON.stringify(doc));
assert.strictEqual((doc as any)._deleted, undefined, 'push handler must NOT see _deleted, got ' + JSON.stringify(doc));
});

// sent$ is typed as Observable<WithDeleted<RxDocType>>.
// It must always emit documents with `_deleted: boolean`,
// never the master-format `is_deleted` field.
assert.ok(sentDocs.length >= 2, 'sent$ must emit at least both docs, got ' + sentDocs.length);
sentDocs.forEach((doc, idx) => {
assert.strictEqual(
typeof doc._deleted,
'boolean',
'sent$ doc at index ' + idx + ' must have _deleted: boolean, got ' + JSON.stringify(doc)
);
assert.strictEqual(
(doc as any).is_deleted,
undefined,
'sent$ doc at index ' + idx + ' must NOT have is_deleted (master format), got ' + JSON.stringify(doc)
);
});

const aliveSent = ensureNotFalsy(sentDocs.find(d => (d as any).id === 'alive'));
const removedSent = sentDocs.filter(d => (d as any).id === 'to-remove');
assert.strictEqual(aliveSent._deleted, false, 'alive doc must have _deleted=false on sent$');
assert.ok(removedSent.length > 0, 'removed doc must be emitted on sent$');
assert.ok(
removedSent.some(doc => doc._deleted === true),
'removed doc must have _deleted=true on sent$'
);

await localCollection.database.close();
});
it('should not save pulled documents that do not match the schema', async () => {
const amount = 5;
const { localCollection, remoteCollection } = await getTestCollections({ local: 0, remote: amount });
Expand Down
Loading