diff --git a/orga/changelog/fix-replication-sent-deletedfield-format.md b/orga/changelog/fix-replication-sent-deletedfield-format.md new file mode 100644 index 00000000000..dc45c59e31f --- /dev/null +++ b/orga/changelog/fix-replication-sent-deletedfield-format.md @@ -0,0 +1 @@ +- FIX replication `sent$` observable emitting documents in the master format (with the user-defined `deletedField`) instead of the typed `WithDeleted` format (with `_deleted: boolean`), because the `deletedField` swap mutated the same row object that was later forwarded to subscribers diff --git a/src/plugins/replication/index.ts b/src/plugins/replication/index.ts index 714b435fa96..9ab40a5fd66 100644 --- a/src/plugins/replication/index.ts +++ b/src/plugins/replication/index.ts @@ -321,11 +321,22 @@ export class RxReplicationState { if (row.assumedMasterState) { row.assumedMasterState = await pushModifier(row.assumedMasterState); } + /** + * The deletedField swap must not mutate the original row, + * because that row is later forwarded to processed.up + * which feeds sent$. sent$ is typed as + * Observable> and must always emit + * documents in the WithDeleted format with `_deleted: boolean`, + * never the master-format `deletedField`. + */ if (this.deletedField !== '_deleted') { - row.newDocumentState = swapDefaultDeletedTodeletedField(this.deletedField, row.newDocumentState) as any; - if (row.assumedMasterState) { - row.assumedMasterState = swapDefaultDeletedTodeletedField(this.deletedField, row.assumedMasterState) as any; - } + return { + ...row, + newDocumentState: swapDefaultDeletedTodeletedField(this.deletedField, row.newDocumentState) as any, + assumedMasterState: row.assumedMasterState + ? swapDefaultDeletedTodeletedField(this.deletedField, row.assumedMasterState) as any + : undefined + }; } return row; }) diff --git a/test/unit/replication.test.ts b/test/unit/replication.test.ts index 978e064e9f2..20d8a3127f0 100644 --- a/test/unit/replication.test.ts +++ b/test/unit/replication.test.ts @@ -334,6 +334,70 @@ describe('replication.test.ts', () => { localCollection.database.close(); remoteCollection.database.close(); }); + it('sent$ must emit documents in WithDeleted format with _deleted field when deletedField is custom', async () => { + const localCollection = await humansCollection.createHumanWithTimestamp(0, randomToken(10), false); + + // Prepare states before starting replication to avoid storage-specific + // intermediate emissions for insert-then-remove in a single run. + await localCollection.insert(schemaObjects.humanWithTimestampData({ id: 'alive' })); + const removeDoc = await localCollection.insert(schemaObjects.humanWithTimestampData({ id: 'to-remove' })); + await removeDoc.remove(); + + const pushedToMaster: any[] = []; + const replicationState = replicateRxCollection({ + collection: localCollection, + replicationIdentifier: REPLICATION_IDENTIFIER_TEST, + deletedField: 'is_deleted', + live: false, + push: { + handler: (rows) => { + rows.forEach(row => pushedToMaster.push(row.newDocumentState)); + return Promise.resolve([]); + } + } + }); + ensureReplicationHasNoErrors(replicationState); + + const sentDocs: WithDeleted[] = []; + replicationState.sent$.subscribe(d => sentDocs.push(d)); + + await replicationState.awaitInitialReplication(); + + // The push handler must receive the master format with `is_deleted` (not `_deleted`) + assert.ok(pushedToMaster.length >= 2, 'push handler must receive at least both docs'); + pushedToMaster.forEach((doc) => { + assert.strictEqual(typeof doc.is_deleted, 'boolean', 'push handler must see is_deleted, got ' + JSON.stringify(doc)); + assert.strictEqual((doc as any)._deleted, undefined, 'push handler must NOT see _deleted, got ' + JSON.stringify(doc)); + }); + + // sent$ is typed as Observable>. + // It must always emit documents with `_deleted: boolean`, + // never the master-format `is_deleted` field. + assert.ok(sentDocs.length >= 2, 'sent$ must emit at least both docs, got ' + sentDocs.length); + sentDocs.forEach((doc, idx) => { + assert.strictEqual( + typeof doc._deleted, + 'boolean', + 'sent$ doc at index ' + idx + ' must have _deleted: boolean, got ' + JSON.stringify(doc) + ); + assert.strictEqual( + (doc as any).is_deleted, + undefined, + 'sent$ doc at index ' + idx + ' must NOT have is_deleted (master format), got ' + JSON.stringify(doc) + ); + }); + + const aliveSent = ensureNotFalsy(sentDocs.find(d => (d as any).id === 'alive')); + const removedSent = sentDocs.filter(d => (d as any).id === 'to-remove'); + assert.strictEqual(aliveSent._deleted, false, 'alive doc must have _deleted=false on sent$'); + assert.ok(removedSent.length > 0, 'removed doc must be emitted on sent$'); + assert.ok( + removedSent.some(doc => doc._deleted === true), + 'removed doc must have _deleted=true on sent$' + ); + + await localCollection.database.close(); + }); it('should not save pulled documents that do not match the schema', async () => { const amount = 5; const { localCollection, remoteCollection } = await getTestCollections({ local: 0, remote: amount });