Skip to content

Commit 335b5c9

Browse files
authored
Merge branch 'main' into support/CM-2298
2 parents 7e0d269 + 3d55fb1 commit 335b5c9

4 files changed

Lines changed: 44 additions & 8 deletions

File tree

services/apps/nango_worker/src/activities/nangoActivities.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,16 @@ export async function processNangoWebhook(
6161
integrationId: integration.id,
6262
})
6363

64-
const cursor = integration.settings.cursors ? integration.settings.cursors[args.model] : undefined
64+
const settings = integration.settings
65+
let cursor = args.nextPageCursor
66+
if (
67+
!cursor &&
68+
settings.cursors &&
69+
settings.cursors[args.connectionId] &&
70+
settings.cursors[args.connectionId][args.model]
71+
) {
72+
cursor = settings.cursors[args.connectionId][args.model]
73+
}
6574

6675
await initNangoCloudClient()
6776

@@ -96,6 +105,7 @@ export async function processNangoWebhook(
96105
await setNangoIntegrationCursor(
97106
dbStoreQx(svc.postgres.writer),
98107
integration.id,
108+
args.connectionId,
99109
args.model,
100110
records.nextCursor,
101111
)
@@ -105,6 +115,7 @@ export async function processNangoWebhook(
105115
await setNangoIntegrationCursor(
106116
dbStoreQx(svc.postgres.writer),
107117
integration.id,
118+
args.connectionId,
108119
args.model,
109120
records.records[records.records.length - 1].metadata.cursor,
110121
)

services/apps/nango_worker/src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ export interface IProcessNangoWebhookArguments {
33
providerConfigKey: string
44
model: string
55
syncType: 'INITIAL' | 'INCREMENTAL'
6+
nextPageCursor?: string
67
}
78

89
export interface ISyncGithubIntegrationArguments {

services/apps/nango_worker/src/workflows/processNangoWebhook.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ export async function processNangoWebhook(args: IProcessNangoWebhookArguments):
1111
let nextCursor = await activity.processNangoWebhook(args)
1212

1313
while (nextCursor) {
14-
nextCursor = await activity.processNangoWebhook(args)
14+
nextCursor = await activity.processNangoWebhook({
15+
...args,
16+
nextPageCursor: nextCursor,
17+
})
1518
}
1619
}

services/libs/data-access-layer/src/integrations/index.ts

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -275,21 +275,42 @@ export async function findIntegrationDataForNangoWebhookProcessing(
275275
export async function setNangoIntegrationCursor(
276276
qx: QueryExecutor,
277277
integrationId: string,
278+
connectionId: string,
278279
model: string,
279280
cursor: string,
280281
): Promise<void> {
281282
await qx.result(
282283
`
283284
update integrations
284-
set settings = jsonb_set(
285-
settings,
286-
'{cursors}',
287-
coalesce(settings -> 'cursors', '{}') || jsonb_build_object($(model), $(cursor))
288-
)
289-
where id = $(integrationId)
285+
set settings = case
286+
-- when we don't have any cursors yet
287+
when settings -> 'cursors' is null then
288+
jsonb_set(
289+
settings,
290+
array['cursors'],
291+
jsonb_build_object($(connectionId), jsonb_build_object($(model), $(cursor)))
292+
)
293+
-- when we have cursors but not yet for this connectionId
294+
when settings -> 'cursors' -> $(connectionId) is null then
295+
jsonb_set(
296+
settings,
297+
array['cursors'],
298+
(settings -> 'cursors') ||
299+
jsonb_build_object($(connectionId), jsonb_build_object($(model), $(cursor)))
300+
)
301+
-- when we have cursors and entries for this connectionId
302+
else
303+
jsonb_set(
304+
settings,
305+
array['cursors', $(connectionId)],
306+
(settings -> 'cursors' -> $(connectionId)) || jsonb_build_object($(model), $(cursor))
307+
)
308+
end
309+
where id = $(integrationId);
290310
`,
291311
{
292312
integrationId,
313+
connectionId,
293314
model,
294315
cursor,
295316
},

0 commit comments

Comments
 (0)