Skip to content

Commit 511a4a2

Browse files
authored
refactor: stream batched items to db during backfill (#229)
Streams and upserts in chunks as items arrive, instead of collecting all first. - Chunks of 250 items - Upsert runs while fetch continues - Logging: removes mid-sync count log (total unknown until done)
1 parent 8fe9847 commit 511a4a2

1 file changed

Lines changed: 14 additions & 12 deletions

File tree

packages/sync-engine/src/stripeSync.ts

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,25 +1009,27 @@ export class StripeSync {
10091009
fetch: () => Stripe.ApiListPromise<T>,
10101010
upsert: (items: T[]) => Promise<T[]>
10111011
): Promise<Sync> {
1012-
const items: T[] = []
1012+
const chunkSize = 250
1013+
let chunk: T[] = []
1014+
let synced = 0
10131015

10141016
this.config.logger?.info('Fetching items to sync from Stripe')
10151017
for await (const item of fetch()) {
1016-
items.push(item)
1017-
}
1018+
chunk.push(item)
1019+
synced++
1020+
if (synced % 1000 === 0) this.config.logger?.info(`Synced ${synced} items`)
10181021

1019-
if (!items.length) return { synced: 0 }
1022+
if (chunk.length >= chunkSize) {
1023+
await upsert(chunk)
1024+
chunk = []
1025+
}
1026+
}
10201027

1021-
this.config.logger?.info(`Upserting ${items.length} items`)
1022-
const chunkSize = 250
1023-
for (let i = 0; i < items.length; i += chunkSize) {
1024-
const chunk = items.slice(i, i + chunkSize)
1028+
if (chunk.length > 0) await upsert(chunk)
10251029

1026-
await upsert(chunk)
1027-
}
1028-
this.config.logger?.info('Upserted items')
1030+
this.config.logger?.info(`Upserted ${synced} items`)
10291031

1030-
return { synced: items.length }
1032+
return { synced }
10311033
}
10321034

10331035
private async upsertCharges(

0 commit comments

Comments
 (0)