Skip to content

Commit 832902c

Browse files
committed
fix edge case
1 parent 0368cb6 commit 832902c

2 files changed

Lines changed: 20 additions & 12 deletions

File tree

packages/protocol/src/utils/binary-subdivision.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ export function toIso(unixSeconds: number): string {
4343
return new Date(unixSeconds * 1000).toISOString()
4444
}
4545

46+
function cloneRange(range: Range): Range {
47+
return { ...range }
48+
}
49+
4650
// MARK: - Subdivision
4751

4852
/** Default number of segments to split the older remainder into. */
@@ -84,7 +88,7 @@ export function subdivideRanges(
8488
const secondsLeft = newEnd - start
8589
const segments = Math.min(n, secondsLeft)
8690
const segmentDuration = Math.floor(secondsLeft / segments)
87-
if (segmentDuration < 30) {
91+
if (segmentDuration <= 1) {
8892
result.push(range)
8993
continue
9094
}
@@ -96,7 +100,7 @@ export function subdivideRanges(
96100
if (lastSegment) {
97101
// handle the edge case where there are multiple objects created in a same second
98102
// but our fetch didn't return all of them because of the limit of 100.
99-
result.push({ gte: toIso(segGte), lt: toIso(newEnd + 1), cursor: range.cursor })
103+
result.push({ gte: toIso(segGte), lt: toIso(newEnd + 1), cursor: null })
100104
} else {
101105
result.push({ gte: toIso(segGte), lt: toIso(segLt), cursor: null })
102106
}
@@ -179,7 +183,7 @@ export async function* streamingSubdivide<T>(opts: {
179183

180184
/** Snapshot of all ranges not yet fully fetched (queued + in flight). */
181185
function snapshotRemaining(): Range[] {
182-
return [...inflightRanges.values(), ...queue]
186+
return [...inflightRanges.values(), ...queue].map(cloneRange)
183187
}
184188

185189
// Fill up to concurrency
@@ -211,15 +215,18 @@ export async function* streamingSubdivide<T>(opts: {
211215
queue.push(range)
212216
}
213217

214-
// Launch new work BEFORE yielding so fetches run while consumer processes
218+
const eventRange = cloneRange(range)
219+
const remaining = snapshotRemaining()
220+
221+
// Launch new work after snapshotting so checkpoints cannot observe later cursor mutation.
215222
while (launchNext()) {}
216223

217224
yield {
218-
range,
225+
range: eventRange,
219226
data,
220227
hasMore,
221228
exhausted: !hasMore,
222-
remaining: snapshotRemaining(),
229+
remaining,
223230
}
224231
}
225232
} finally {

packages/source-stripe/src/src-list-api.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,8 @@ async function fetchPageForRange(opts: {
286286
const data: Record<string, unknown>[] = []
287287
for (const item of response.data) {
288288
const record = item as Record<string, unknown>
289-
const created = record.created
289+
// Invoice item is the only object that uses .date as created timestamp
290+
const created = record.created ?? record.date
290291
if (typeof created === 'number') lastObserved = created
291292
data.push({
292293
...record,
@@ -531,7 +532,7 @@ async function* iterateStream(opts: {
531532
supportsForwardPagination,
532533
}),
533534
//concurrency: 100, // rate limiter is the real bottleneck
534-
concurrency: 1, // serialized for reliability; parallelism re-enabled if data gaps are due to parallelism
535+
concurrency: 100, // serialized for reliability; parallelism re-enabled if data gaps are due to parallelism
535536
subdivisionFactor,
536537
})
537538

@@ -565,11 +566,11 @@ async function* iterateStream(opts: {
565566
} else if (event.hasMore && event.data.length > 0) {
566567
// Range was subdivided — the fetched head (from oldest record to range.lt)
567568
// is already accounted for. Emit range_complete so the progress bar fills.
568-
const oldest = event.data.findLast((r) => typeof r.created === 'number') as
569-
| { created: number }
570-
| undefined
569+
const oldest = event.data.findLast(
570+
(r) => typeof r.created === 'number' || typeof r.date === 'number'
571+
) as { created?: number; date?: number } | undefined
571572
if (oldest) {
572-
const headGte = toIso(oldest.created + 1)
573+
const headGte = toIso((oldest.created ?? oldest.date)! + 1)
573574
if (headGte < event.range.lt) {
574575
yield msg.stream_status({
575576
stream: streamName,

0 commit comments

Comments
 (0)