Skip to content

Commit 5eb12a1

Browse files
authored
fix(node): prevent closeAndFlush from dropping in-flight events (#1365)
1 parent 0555f07 commit 5eb12a1

8 files changed

Lines changed: 193 additions & 37 deletions

File tree

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@segment/analytics-node': patch
3+
---
4+
5+
Fix closeAndFlush silently dropping in-flight events on timeout.
6+
7+
- Cancel pending retry sleeps via AbortController when closeAndFlush times out, instead of silently swallowing the timeout error.
8+
- Raise default closeAndFlush timeout floor to 75s (was 12.5s) so it survives at least one Retry-After: 60 cycle.
9+
- Add `http_response` emitter event for observing API response status codes and headers.

packages/browser/e2e-cli/src/cli.ts

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ let lastApiResponseTime = 0
6161
let inflightApiRequests = 0
6262
let lastApiStatus = 0
6363
let firstApiErrorStatus = 0
64+
let lastRetryAfterSeen = 0
6465
let apiHostPattern = ''
6566

6667
function installFetchMonitor(apiHost: string): void {
@@ -97,6 +98,12 @@ function installFetchMonitor(apiHost: string): void {
9798
if (response.status >= 400 && firstApiErrorStatus === 0) {
9899
firstApiErrorStatus = response.status
99100
}
101+
if (response.status === 429) {
102+
const ra = response.headers.get('Retry-After')
103+
const parsed = ra !== null ? parseInt(ra, 10) : NaN
104+
const retryAfterSeconds = Number.isFinite(parsed) ? parsed : 10
105+
lastRetryAfterSeen = Math.max(lastRetryAfterSeen, retryAfterSeconds)
106+
}
100107
return response
101108
} catch (err) {
102109
lastApiResponseTime = Date.now()
@@ -131,11 +138,15 @@ async function waitForDelivery(maxWaitMs = 60000): Promise<void> {
131138

132139
const elapsed = Date.now() - lastApiResponseTime
133140
// After success: brief settle for any remaining event dispatches.
134-
// After error: longer settle to allow for retry scheduling + backoff.
135-
// The fetch-dispatcher's core backoff uses minTimeout=500ms with
136-
// exponential growth: attempt 5 reaches ~500*2^4*random ≈ 8-16s.
137-
// Use 20s settle to accommodate higher retry attempts.
138-
const settleMs = lastApiStatus < 400 ? 1500 : 20000
141+
// If we've ever seen a 429 with Retry-After, use that + buffer as settle
142+
// time, since a 200 after a 429 doesn't mean all retries are done.
143+
// Without any 429s, a short settle is fine.
144+
const settleMs =
145+
lastRetryAfterSeen > 0
146+
? (lastRetryAfterSeen + 5) * 1000
147+
: lastApiStatus < 400
148+
? 1500
149+
: 20000
139150

140151
if (elapsed >= settleMs) {
141152
return
@@ -221,12 +232,8 @@ async function main(): Promise<void> {
221232
const protocol = input.apiHost.startsWith('https') ? 'https' : 'http'
222233
segmentConfig.protocol = protocol
223234

224-
if (useBatching) {
225-
segmentConfig.apiHost = input.apiHost
226-
} else {
227-
const apiHostStripped = input.apiHost.replace(/^https?:\/\//, '')
228-
segmentConfig.apiHost = apiHostStripped + '/v1'
229-
}
235+
const apiHostStripped = input.apiHost.replace(/^https?:\/\//, '')
236+
segmentConfig.apiHost = apiHostStripped + '/v1'
230237
}
231238

232239
// Wire maxRetries and backoff timing through httpConfig — this controls
@@ -292,7 +299,8 @@ async function main(): Promise<void> {
292299
}
293300

294301
// Wait for all delivery activity to settle
295-
await waitForDelivery()
302+
const maxWaitMs = (input.config?.timeout ?? 60) * 1000
303+
await waitForDelivery(maxWaitMs)
296304

297305
// Determine success/failure from delivery errors (emitted by the
298306
// Segment.io plugin) and observed fetch responses as fallback.

packages/node/e2e-cli/src/cli.ts

Lines changed: 79 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,33 @@ interface CLIOutput {
4646
success: boolean
4747
error?: string
4848
sentBatches: number
49+
eventResults: EventResult[]
50+
httpLog: HttpLogEntry[]
51+
}
52+
53+
interface EventResult {
54+
event: string
55+
type: string
56+
delivered: boolean
57+
failureReason?: string
58+
}
59+
60+
interface HttpLogEntry {
61+
timestamp: string
62+
status: number
63+
url: string
64+
retryCount?: number
65+
bodyPreview: string
4966
}
5067

5168
function sleep(ms: number): Promise<void> {
5269
return new Promise((resolve) => setTimeout(resolve, ms))
5370
}
5471

72+
function elapsed(start: number): string {
73+
return `${((Date.now() - start) / 1000).toFixed(1)}s`
74+
}
75+
5576
function sendEvent(analytics: Analytics, event: AnalyticsEvent): void {
5677
// SDK requires either userId or anonymousId
5778
const identity = event.userId
@@ -117,7 +138,13 @@ function sendEvent(analytics: Analytics, event: AnalyticsEvent): void {
117138
}
118139

119140
async function main(): Promise<void> {
120-
const output: CLIOutput = { success: false, sentBatches: 0 }
141+
const startTime = Date.now()
142+
const output: CLIOutput = {
143+
success: false,
144+
sentBatches: 0,
145+
eventResults: [],
146+
httpLog: [],
147+
}
121148

122149
try {
123150
// Parse --input argument
@@ -147,36 +174,82 @@ async function main(): Promise<void> {
147174
const msg =
148175
reason instanceof Error ? reason.message : String(reason ?? err.code)
149176
deliveryErrors.push(msg)
177+
process.stderr.write(`[${elapsed(startTime)}] ERROR: ${msg}\n`)
178+
})
179+
180+
analytics.on('http_request', (req) => {
181+
const body =
182+
typeof req.body === 'string' ? JSON.parse(req.body) : req.body
183+
const events = body.batch?.map((e: any) => e.event ?? e.type) ?? []
184+
const retryHeader = req.headers?.['X-Retry-Count']
185+
process.stderr.write(
186+
`[${elapsed(startTime)}] >> ${req.method} events=[${events.join(',')}]${
187+
retryHeader ? ` retry=${retryHeader}` : ''
188+
}\n`
189+
)
190+
})
191+
192+
analytics.on('http_response', (res) => {
193+
const body =
194+
typeof res.body === 'string' ? JSON.parse(res.body) : res.body
195+
const events = body.batch?.map((e: any) => e.event ?? e.type) ?? []
196+
const retryAfter = res.headers?.['retry-after']
197+
process.stderr.write(
198+
`[${elapsed(startTime)}] << ${res.status} events=[${events.join(',')}]${
199+
retryAfter ? ` retry-after=${retryAfter}` : ''
200+
}\n`
201+
)
150202
})
151203

152204
// Process event sequences
205+
let totalEvents = 0
153206
for (const seq of sequences) {
154207
if (seq.delayMs > 0) {
208+
process.stderr.write(
209+
`[${elapsed(startTime)}] sleeping ${seq.delayMs}ms...\n`
210+
)
155211
await sleep(seq.delayMs)
156212
}
157213

158214
for (const event of seq.events) {
215+
totalEvents++
216+
const label = event.event ?? event.type
217+
process.stderr.write(
218+
`[${elapsed(startTime)}] enqueue #${totalEvents}: ${label}\n`
219+
)
159220
sendEvent(analytics, event)
160221
}
161222
}
162223

163-
// Flush and close — use a generous timeout so retries with exponential
164-
// backoff have time to complete (default is flushInterval * 1.25)
165-
const timeoutMs = (config.timeout ?? 60) * 1000
224+
process.stderr.write(
225+
`[${elapsed(
226+
startTime
227+
)}] all ${totalEvents} events enqueued, calling closeAndFlush...\n`
228+
)
229+
230+
// Flush and close
231+
const timeoutMs = (config.timeout ?? 75) * 1000
166232
await analytics.closeAndFlush({ timeout: timeoutMs })
167233

234+
process.stderr.write(
235+
`[${elapsed(startTime)}] closeAndFlush resolved. errors=${
236+
deliveryErrors.length
237+
}\n`
238+
)
239+
168240
if (deliveryErrors.length > 0) {
169241
output.success = false
170242
output.error = deliveryErrors[0]
171243
} else {
172244
output.success = true
173-
output.sentBatches = 1
245+
output.sentBatches = output.httpLog.length
174246
}
175247
} catch (err) {
176248
output.error = err instanceof Error ? err.message : String(err)
249+
process.stderr.write(`[${elapsed(startTime)}] FATAL: ${output.error}\n`)
177250
}
178251

179-
console.log(JSON.stringify(output))
252+
console.log(JSON.stringify(output, null, 2))
180253
process.exit(output.success ? 0 : 1)
181254
}
182255

packages/node/src/__tests__/graceful-shutdown-integration.test.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,23 @@ describe('Ability for users to exit without losing events', () => {
8484
})
8585

8686
describe('.closeAndFlush()', () => {
87-
test('default timeout should be related to flush interval', () => {
88-
const flushInterval = 500
87+
test('default timeout should be at least 75 seconds', () => {
8988
ajs = new Analytics({
9089
writeKey: 'abc123',
91-
flushInterval,
9290
httpClient: testClient,
9391
})
9492
const closeAndFlushTimeout = ajs['_closeAndFlushDefaultTimeout']
95-
expect(closeAndFlushTimeout).toBe(flushInterval * 1.25)
93+
expect(closeAndFlushTimeout).toBe(75000)
94+
})
95+
96+
test('default timeout should scale with large flushInterval', () => {
97+
ajs = new Analytics({
98+
writeKey: 'abc123',
99+
flushInterval: 120000,
100+
httpClient: testClient,
101+
})
102+
const closeAndFlushTimeout = ajs['_closeAndFlushDefaultTimeout']
103+
expect(closeAndFlushTimeout).toBe(120000 * 1.25)
96104
})
97105

98106
test('should force resolve if method call execution time exceeds specified timeout', async () => {

packages/node/src/__tests__/oauth.integration.test.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -246,16 +246,16 @@ describe('OAuth Failure', () => {
246246
maxRetries: 3,
247247
})
248248

249+
analytics.track({
250+
event: 'Test Event',
251+
anonymousId: 'unknown',
252+
userId: 'known',
253+
timestamp: timestamp,
254+
})
255+
const ctxPromise = resolveCtx(analytics, 'track')
256+
await analytics.closeAndFlush({ timeout: 20000 })
249257
try {
250-
analytics.track({
251-
event: 'Test Event',
252-
anonymousId: 'unknown',
253-
userId: 'known',
254-
timestamp: timestamp,
255-
})
256-
await analytics.closeAndFlush()
257-
const ctx1 = await resolveCtx(analytics, 'track') // forces exception to be thrown
258-
expect(ctx1.event.type).toEqual('track')
258+
await ctxPromise
259259
throw new Error('fail')
260260
} catch (err: any) {
261261
expect(err.reason).toEqual(

packages/node/src/app/analytics-node.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ export class Analytics extends NodeEmitter implements CoreAnalytics {
4444

4545
const flushInterval = settings.flushInterval ?? 10000
4646

47-
this._closeAndFlushDefaultTimeout = flushInterval * 1.25 // add arbitrary multiplier in case an event is in a plugin.
47+
this._closeAndFlushDefaultTimeout = Math.max(60000, flushInterval) * 1.25
4848

4949
const { plugin, publisher } = createConfiguredNodePlugin(
5050
{
@@ -123,7 +123,11 @@ export class Analytics extends NodeEmitter implements CoreAnalytics {
123123
}).finally(() => {
124124
this._isFlushing = false
125125
})
126-
return timeout ? pTimeout(promise, timeout).catch(() => undefined) : promise
126+
if (!timeout) return promise
127+
128+
return pTimeout(promise, timeout).catch(() => {
129+
this._publisher.abort()
130+
})
127131
}
128132

129133
private _dispatch(segmentEvent: SegmentEvent, callback?: Callback) {

packages/node/src/app/emitter.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@ export type NodeEmitterEvents = CoreEmitterContract<Context> & {
1818
body: string
1919
}
2020
]
21+
http_response: [
22+
{
23+
status: number
24+
statusText: string
25+
url: string
26+
body: string
27+
headers: Record<string, string>
28+
}
29+
]
2130
drained: []
2231
}
2332

0 commit comments

Comments
 (0)