Skip to content

Commit 9c29676

Browse files
committed
fix(e2e-cli): wait for rate-limited retries before exiting
Node CLI: use http_request/http_response listeners for verbose logging. Browser CLI: track Retry-After from 429 responses and use it as the settle time, so the CLI doesn't exit while retries are still pending. Both CLIs now respect config.timeout for their wait durations.
1 parent 50c4784 commit 9c29676

2 files changed

Lines changed: 104 additions & 18 deletions

File tree

packages/browser/e2e-cli/src/cli.ts

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ let lastApiResponseTime = 0
6161
let inflightApiRequests = 0
6262
let lastApiStatus = 0
6363
let firstApiErrorStatus = 0
64+
let lastRetryAfterSeen = 0
6465
let apiHostPattern = ''
6566

6667
function installFetchMonitor(apiHost: string): void {
@@ -97,6 +98,17 @@ function installFetchMonitor(apiHost: string): void {
9798
if (response.status >= 400 && firstApiErrorStatus === 0) {
9899
firstApiErrorStatus = response.status
99100
}
101+
if (response.status === 429) {
102+
const ra = response.headers.get('Retry-After')
103+
if (ra) {
104+
lastRetryAfterSeen = Math.max(
105+
lastRetryAfterSeen,
106+
parseInt(ra, 10) || 60
107+
)
108+
} else {
109+
lastRetryAfterSeen = Math.max(lastRetryAfterSeen, 60)
110+
}
111+
}
100112
return response
101113
} catch (err) {
102114
lastApiResponseTime = Date.now()
@@ -131,11 +143,15 @@ async function waitForDelivery(maxWaitMs = 60000): Promise<void> {
131143

132144
const elapsed = Date.now() - lastApiResponseTime
133145
// After success: brief settle for any remaining event dispatches.
134-
// After error: longer settle to allow for retry scheduling + backoff.
135-
// The fetch-dispatcher's core backoff uses minTimeout=500ms with
136-
// exponential growth: attempt 5 reaches ~500*2^4*random ≈ 8-16s.
137-
// Use 20s settle to accommodate higher retry attempts.
138-
const settleMs = lastApiStatus < 400 ? 1500 : 20000
146+
// If we've ever seen a 429 with Retry-After, use that + buffer as settle
147+
// time, since a 200 after a 429 doesn't mean all retries are done.
148+
// Without any 429s, a short settle is fine.
149+
const settleMs =
150+
lastRetryAfterSeen > 0
151+
? (lastRetryAfterSeen + 5) * 1000
152+
: lastApiStatus < 400
153+
? 1500
154+
: 20000
139155

140156
if (elapsed >= settleMs) {
141157
return
@@ -221,12 +237,8 @@ async function main(): Promise<void> {
221237
const protocol = input.apiHost.startsWith('https') ? 'https' : 'http'
222238
segmentConfig.protocol = protocol
223239

224-
if (useBatching) {
225-
segmentConfig.apiHost = input.apiHost
226-
} else {
227-
const apiHostStripped = input.apiHost.replace(/^https?:\/\//, '')
228-
segmentConfig.apiHost = apiHostStripped + '/v1'
229-
}
240+
const apiHostStripped = input.apiHost.replace(/^https?:\/\//, '')
241+
segmentConfig.apiHost = apiHostStripped + '/v1'
230242
}
231243

232244
// Wire maxRetries and backoff timing through httpConfig — this controls
@@ -292,7 +304,8 @@ async function main(): Promise<void> {
292304
}
293305

294306
// Wait for all delivery activity to settle
295-
await waitForDelivery()
307+
const maxWaitMs = (input.config?.timeout ?? 60) * 1000
308+
await waitForDelivery(maxWaitMs)
296309

297310
// Determine success/failure from delivery errors (emitted by the
298311
// Segment.io plugin) and observed fetch responses as fallback.

packages/node/e2e-cli/src/cli.ts

Lines changed: 79 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,33 @@ interface CLIOutput {
4646
success: boolean
4747
error?: string
4848
sentBatches: number
49+
eventResults: EventResult[]
50+
httpLog: HttpLogEntry[]
51+
}
52+
53+
interface EventResult {
54+
event: string
55+
type: string
56+
delivered: boolean
57+
failureReason?: string
58+
}
59+
60+
interface HttpLogEntry {
61+
timestamp: string
62+
status: number
63+
url: string
64+
retryCount?: number
65+
bodyPreview: string
4966
}
5067

5168
function sleep(ms: number): Promise<void> {
5269
return new Promise((resolve) => setTimeout(resolve, ms))
5370
}
5471

72+
function elapsed(start: number): string {
73+
return `${((Date.now() - start) / 1000).toFixed(1)}s`
74+
}
75+
5576
function sendEvent(analytics: Analytics, event: AnalyticsEvent): void {
5677
// SDK requires either userId or anonymousId
5778
const identity = event.userId
@@ -117,7 +138,13 @@ function sendEvent(analytics: Analytics, event: AnalyticsEvent): void {
117138
}
118139

119140
async function main(): Promise<void> {
120-
const output: CLIOutput = { success: false, sentBatches: 0 }
141+
const startTime = Date.now()
142+
const output: CLIOutput = {
143+
success: false,
144+
sentBatches: 0,
145+
eventResults: [],
146+
httpLog: [],
147+
}
121148

122149
try {
123150
// Parse --input argument
@@ -147,36 +174,82 @@ async function main(): Promise<void> {
147174
const msg =
148175
reason instanceof Error ? reason.message : String(reason ?? err.code)
149176
deliveryErrors.push(msg)
177+
process.stderr.write(`[${elapsed(startTime)}] ERROR: ${msg}\n`)
178+
})
179+
180+
analytics.on('http_request', (req) => {
181+
const body =
182+
typeof req.body === 'string' ? JSON.parse(req.body) : req.body
183+
const events = body.batch?.map((e: any) => e.event ?? e.type) ?? []
184+
const retryHeader = req.headers?.['X-Retry-Count']
185+
process.stderr.write(
186+
`[${elapsed(startTime)}] >> ${req.method} events=[${events.join(',')}]${
187+
retryHeader ? ` retry=${retryHeader}` : ''
188+
}\n`
189+
)
190+
})
191+
192+
analytics.on('http_response', (res) => {
193+
const body =
194+
typeof res.body === 'string' ? JSON.parse(res.body) : res.body
195+
const events = body.batch?.map((e: any) => e.event ?? e.type) ?? []
196+
const retryAfter = res.headers?.['retry-after']
197+
process.stderr.write(
198+
`[${elapsed(startTime)}] << ${res.status} events=[${events.join(',')}]${
199+
retryAfter ? ` retry-after=${retryAfter}` : ''
200+
}\n`
201+
)
150202
})
151203

152204
// Process event sequences
205+
let totalEvents = 0
153206
for (const seq of sequences) {
154207
if (seq.delayMs > 0) {
208+
process.stderr.write(
209+
`[${elapsed(startTime)}] sleeping ${seq.delayMs}ms...\n`
210+
)
155211
await sleep(seq.delayMs)
156212
}
157213

158214
for (const event of seq.events) {
215+
totalEvents++
216+
const label = event.event ?? event.type
217+
process.stderr.write(
218+
`[${elapsed(startTime)}] enqueue #${totalEvents}: ${label}\n`
219+
)
159220
sendEvent(analytics, event)
160221
}
161222
}
162223

163-
// Flush and close — use a generous timeout so retries with exponential
164-
// backoff have time to complete (default is flushInterval * 1.25)
165-
const timeoutMs = (config.timeout ?? 60) * 1000
224+
process.stderr.write(
225+
`[${elapsed(
226+
startTime
227+
)}] all ${totalEvents} events enqueued, calling closeAndFlush...\n`
228+
)
229+
230+
// Flush and close
231+
const timeoutMs = (config.timeout ?? 75) * 1000
166232
await analytics.closeAndFlush({ timeout: timeoutMs })
167233

234+
process.stderr.write(
235+
`[${elapsed(startTime)}] closeAndFlush resolved. errors=${
236+
deliveryErrors.length
237+
}\n`
238+
)
239+
168240
if (deliveryErrors.length > 0) {
169241
output.success = false
170242
output.error = deliveryErrors[0]
171243
} else {
172244
output.success = true
173-
output.sentBatches = 1
245+
output.sentBatches = output.httpLog.length
174246
}
175247
} catch (err) {
176248
output.error = err instanceof Error ? err.message : String(err)
249+
process.stderr.write(`[${elapsed(startTime)}] FATAL: ${output.error}\n`)
177250
}
178251

179-
console.log(JSON.stringify(output))
252+
console.log(JSON.stringify(output, null, 2))
180253
process.exit(output.success ? 0 : 1)
181254
}
182255

0 commit comments

Comments
 (0)