Skip to content

Commit af37b02

Browse files
jbingenmsfstef
andauthored
fix(pglite-sync): don't filter move-in messages by LSN (#883)
* fix(pglite-sync): don't filter move-in messages by LSN Move-in messages from Electric's tagged_subqueries feature don't have an LSN header because they come from direct DB queries, not replication. Previously these messages were incorrectly skipped as "already seen" because the missing LSN defaulted to 0. This checks for the is_move_in header and bypasses LSN filtering for move-in messages, ensuring rows moving into a shape due to subquery condition changes are properly synced. Fixes electric-sql/electric#3769 * fix(pglite-sync): handle move-in duplicate keys with ON CONFLICT Move-in data from tagged_subqueries can overlap with initial sync data, causing duplicate key errors. This adds ON CONFLICT DO UPDATE handling specifically for move-in inserts. - Add primaryKey param to applyInsertsToTable - Use ON CONFLICT DO UPDATE for move-in inserts - Update changeset description * Add regression tests for move-in messages * Fix formatting --------- Co-authored-by: msfstef <msfstef@gmail.com>
1 parent fd8ebcf commit af37b02

4 files changed

Lines changed: 199 additions & 1 deletion

File tree

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
"@electric-sql/pglite-sync": patch
3+
---
4+
5+
Fix move-in messages from tagged_subqueries not being synced
6+
7+
This fixes two issues with move-in messages from Electric's `tagged_subqueries` feature:
8+
9+
1. **LSN filtering bypass**: Move-in messages don't include an LSN header because they originate from direct database queries rather than the PostgreSQL replication stream. Previously, these messages were being filtered out as "already seen" because the missing LSN defaulted to 0. This fix checks for the `is_move_in` header and bypasses LSN filtering for these messages.
10+
11+
2. **Duplicate key handling**: Move-in data can overlap with data from the initial sync (e.g., when a row "moves in" to match a subquery that it already matched during initial sync). This fix uses `ON CONFLICT DO UPDATE` for move-in inserts to handle these duplicates gracefully, updating the row with the latest data instead of erroring.

packages/pglite-sync/src/apply.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,39 @@ export async function applyMessageToTable({
2323
}: ApplyMessageToTableOptions) {
2424
const data = mapColumns ? doMapColumns(mapColumns, message) : message.value
2525

26+
// Check if this is a move-in message (from subquery-based shapes)
27+
const isMoveIn =
28+
(message.headers as Record<string, unknown>).is_move_in === true
29+
2630
switch (message.headers.operation) {
2731
case 'insert': {
2832
if (debug) console.log('inserting', data)
2933
const columns = Object.keys(data)
34+
35+
// Build ON CONFLICT clause for move-in messages
36+
let onConflictClause = ''
37+
if (isMoveIn && primaryKey && primaryKey.length > 0) {
38+
const nonPkColumns = columns.filter((c) => !primaryKey.includes(c))
39+
if (nonPkColumns.length > 0) {
40+
onConflictClause = `
41+
ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')})
42+
DO UPDATE SET ${nonPkColumns.map((c) => `"${c}" = EXCLUDED."${c}"`).join(', ')}
43+
`
44+
} else {
45+
onConflictClause = `
46+
ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')})
47+
DO NOTHING
48+
`
49+
}
50+
}
51+
3052
return await pg.query(
3153
`
3254
INSERT INTO "${schema}"."${table}"
3355
(${columns.map((s) => '"' + s + '"').join(', ')})
3456
VALUES
3557
(${columns.map((_v, i) => '$' + (i + 1)).join(', ')})
58+
${onConflictClause}
3659
`,
3760
columns.map((column) => data[column]),
3861
)
@@ -86,6 +109,7 @@ export interface BulkApplyMessagesToTableOptions {
86109
schema?: string
87110
messages: InsertChangeMessage[]
88111
mapColumns?: MapColumns
112+
primaryKey?: string[]
89113
debug: boolean
90114
}
91115

@@ -95,6 +119,7 @@ export async function applyInsertsToTable({
95119
schema = 'public',
96120
messages,
97121
mapColumns,
122+
primaryKey,
98123
debug,
99124
}: BulkApplyMessagesToTableOptions) {
100125
// Map the messages to the data to be inserted
@@ -104,6 +129,12 @@ export async function applyInsertsToTable({
104129

105130
if (debug) console.log('inserting', data)
106131

132+
// Check if any message is a move-in (from subquery-based shapes)
133+
// Move-in data can overlap with existing data, so we need ON CONFLICT handling
134+
const hasMoveIn = messages.some(
135+
(m) => (m.headers as Record<string, unknown>).is_move_in === true,
136+
)
137+
107138
// Get column names from the first message
108139
const columns = Object.keys(data[0])
109140

@@ -176,11 +207,31 @@ export async function applyInsertsToTable({
176207

177208
// Helper function to execute a batch insert
178209
const executeBatch = async (batch: Record<string, any>[]) => {
210+
// Build ON CONFLICT clause for move-in messages
211+
// Move-in data can contain rows that already exist from initial sync
212+
let onConflictClause = ''
213+
if (hasMoveIn && primaryKey && primaryKey.length > 0) {
214+
const nonPkColumns = columns.filter((c) => !primaryKey.includes(c))
215+
if (nonPkColumns.length > 0) {
216+
onConflictClause = `
217+
ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')})
218+
DO UPDATE SET ${nonPkColumns.map((c) => `"${c}" = EXCLUDED."${c}"`).join(', ')}
219+
`
220+
} else {
221+
// All columns are primary key, just ignore duplicates
222+
onConflictClause = `
223+
ON CONFLICT (${primaryKey.map((c) => `"${c}"`).join(', ')})
224+
DO NOTHING
225+
`
226+
}
227+
}
228+
179229
const sql = `
180230
INSERT INTO "${schema}"."${table}"
181231
(${columns.map((s) => `"${s}"`).join(', ')})
182232
VALUES
183233
${batch.map((_, j) => `(${columns.map((_v, k) => '$' + (j * columns.length + k + 1)).join(', ')})`).join(', ')}
234+
${onConflictClause}
184235
`
185236
const values = batch.flatMap((message) =>
186237
columns.map((column) => message[column]),

packages/pglite-sync/src/index.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ async function createPlugin(
255255
schema: shape.schema,
256256
messages: initialInserts as InsertChangeMessage[],
257257
mapColumns: shape.mapColumns,
258+
primaryKey: shape.primaryKey,
258259
debug,
259260
})
260261

@@ -282,6 +283,7 @@ async function createPlugin(
282283
schema: shape.schema,
283284
messages: bulkInserts as InsertChangeMessage[],
284285
mapColumns: shape.mapColumns,
286+
primaryKey: shape.primaryKey,
285287
debug,
286288
})
287289
bulkInserts.length = 0
@@ -351,7 +353,14 @@ async function createPlugin(
351353
typeof message.headers.lsn === 'string'
352354
? BigInt(message.headers.lsn)
353355
: BigInt(0) // we default to 0 if there no lsn on the message
354-
if (lsn <= lastCommittedLsnForShape) {
356+
357+
// Move-in messages from subquery-based shapes don't have an LSN
358+
// because they come from direct DB queries, not from replication.
359+
// We should never skip these based on LSN filtering.
360+
const isMoveIn =
361+
(message.headers as Record<string, unknown>).is_move_in === true
362+
363+
if (!isMoveIn && lsn <= lastCommittedLsnForShape) {
355364
// We are replaying changes / have already seen this lsn
356365
// skip and move on to the next message
357366
return

packages/pglite-sync/test/sync.test.ts

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,133 @@ describe('pglite-sync', () => {
172172
shape.unsubscribe()
173173
})
174174

175+
it('syncs move-in messages that have no lsn', async () => {
176+
let feedMessage: (
177+
lsn: number,
178+
message: MultiShapeMessage,
179+
) => Promise<void> = async (_) => {}
180+
MockMultiShapeStream.mockImplementation(() => ({
181+
subscribe: vi.fn(
182+
(cb: (messages: MultiShapeMessage[]) => Promise<void>) => {
183+
feedMessage = (lsn, message) =>
184+
cb([
185+
message,
186+
{
187+
shape: 'shape',
188+
headers: {
189+
control: 'up-to-date',
190+
global_last_seen_lsn: lsn.toString(),
191+
},
192+
},
193+
])
194+
},
195+
),
196+
unsubscribeAll: vi.fn(),
197+
isUpToDate: true,
198+
shapes: {
199+
shape: {
200+
subscribe: vi.fn(),
201+
unsubscribeAll: vi.fn(),
202+
},
203+
},
204+
}))
205+
206+
const shape = await pg.electric.syncShapeToTable({
207+
shape: {
208+
url: 'http://localhost:3000/v1/shape',
209+
params: { table: 'todo' },
210+
},
211+
table: 'todo',
212+
primaryKey: ['id'],
213+
shapeKey: null,
214+
})
215+
216+
// initial sync, commits past lsn 0
217+
await feedMessage(5, {
218+
headers: { operation: 'insert', lsn: '5' },
219+
key: 'id1',
220+
value: { id: 1, task: 'task1', done: false },
221+
shape: 'shape',
222+
})
223+
224+
// move-in has no lsn (defaults to 0), must not be skipped as already seen
225+
await feedMessage(6, {
226+
headers: { operation: 'insert', is_move_in: true },
227+
key: 'id2',
228+
value: { id: 2, task: 'task2', done: false },
229+
shape: 'shape',
230+
})
231+
expect((await pg.sql`SELECT * FROM todo ORDER BY id;`).rows).toEqual([
232+
{ id: 1, task: 'task1', done: false },
233+
{ id: 2, task: 'task2', done: false },
234+
])
235+
236+
shape.unsubscribe()
237+
})
238+
239+
it('upserts move-in messages that overlap with existing rows', async () => {
240+
let feedMessage: (
241+
lsn: number,
242+
message: MultiShapeMessage,
243+
) => Promise<void> = async (_) => {}
244+
MockMultiShapeStream.mockImplementation(() => ({
245+
subscribe: vi.fn(
246+
(cb: (messages: MultiShapeMessage[]) => Promise<void>) => {
247+
feedMessage = (lsn, message) =>
248+
cb([
249+
message,
250+
{
251+
shape: 'shape',
252+
headers: {
253+
control: 'up-to-date',
254+
global_last_seen_lsn: lsn.toString(),
255+
},
256+
},
257+
])
258+
},
259+
),
260+
unsubscribeAll: vi.fn(),
261+
isUpToDate: true,
262+
shapes: {
263+
shape: {
264+
subscribe: vi.fn(),
265+
unsubscribeAll: vi.fn(),
266+
},
267+
},
268+
}))
269+
270+
const shape = await pg.electric.syncShapeToTable({
271+
shape: {
272+
url: 'http://localhost:3000/v1/shape',
273+
params: { table: 'todo' },
274+
},
275+
table: 'todo',
276+
primaryKey: ['id'],
277+
shapeKey: null,
278+
})
279+
280+
// initial sync
281+
await feedMessage(5, {
282+
headers: { operation: 'insert', lsn: '5' },
283+
key: 'id1',
284+
value: { id: 1, task: 'task1', done: false },
285+
shape: 'shape',
286+
})
287+
288+
// move-in with an existing key must upsert, not error on duplicate key
289+
await feedMessage(6, {
290+
headers: { operation: 'insert', is_move_in: true },
291+
key: 'id1',
292+
value: { id: 1, task: 'task1-updated', done: true },
293+
shape: 'shape',
294+
})
295+
expect((await pg.sql`SELECT * FROM todo;`).rows).toEqual([
296+
{ id: 1, task: 'task1-updated', done: true },
297+
])
298+
299+
shape.unsubscribe()
300+
})
301+
175302
it('performs operations within a transaction', async () => {
176303
let feedMessages: (
177304
lsn: number,

0 commit comments

Comments
 (0)