@@ -34,14 +34,22 @@ export const SEGMENT_DESTINATION_KEY = 'Segment.io';
3434type BatchResult = {
3535 batch : SegmentEvent [ ] ;
3636 messageIds : string [ ] ;
37- status : 'success' | '429' | 'transient' | 'permanent' | 'network_error' ;
37+ // 'retry_after' = a retryable response that carried a Retry-After header
38+ // (429 or any other retryable code), so we wait the server-directed time
39+ // instead of computing exponential backoff.
40+ status :
41+ | 'success'
42+ | 'retry_after'
43+ | 'transient'
44+ | 'permanent'
45+ | 'network_error' ;
3846 statusCode ?: number ;
3947 retryAfterSeconds ?: number ;
4048} ;
4149
4250type ErrorAggregation = {
4351 successfulMessageIds : string [ ] ;
44- rateLimitResults : BatchResult [ ] ;
52+ serverDirectedResults : BatchResult [ ] ;
4553 hasTransientError : boolean ;
4654 permanentErrorMessageIds : string [ ] ;
4755 retryableMessageIds : string [ ] ;
@@ -92,21 +100,36 @@ export class SegmentDestination extends DestinationPlugin {
92100
93101 switch ( classification . errorType ) {
94102 case 'rate_limit' :
103+ // 429: always a server-directed wait. Default to 60s when the header
104+ // is missing/invalid, preserving prior behavior.
95105 return {
96106 batch,
97107 messageIds,
98- status : '429 ' ,
108+ status : 'retry_after ' ,
99109 statusCode : res . status ,
100110 retryAfterSeconds : retryAfterSeconds ?? 60 ,
101111 } ;
102112 case 'transient' :
113+ // Any other retryable code (529, 503, 408, …): if the server sent a
114+ // valid Retry-After, honor it as a server-directed wait. Otherwise use
115+ // exponential backoff (unchanged behavior).
116+ if ( retryAfterSeconds !== undefined ) {
117+ return {
118+ batch,
119+ messageIds,
120+ status : 'retry_after' ,
121+ statusCode : res . status ,
122+ retryAfterSeconds,
123+ } ;
124+ }
103125 return {
104126 batch,
105127 messageIds,
106128 status : 'transient' ,
107129 statusCode : res . status ,
108130 } ;
109131 default :
132+ // Permanent: drop. Retry-After is ignored on non-retryable codes.
110133 return {
111134 batch,
112135 messageIds,
@@ -136,13 +159,16 @@ export class SegmentDestination extends DestinationPlugin {
136159 retryCount,
137160 } ) ;
138161
139- const retryAfterSeconds =
140- res . status === 429
141- ? parseRetryAfter (
142- res . headers . get ( 'Retry-After' ) ,
143- this . getRateLimitConfig ( ) ?. maxRetryInterval
144- )
145- : undefined ;
162+ // Parse Retry-After on any error response (not just 429). The header —
163+ // regardless of which retryable status code carries it — is the
164+ // authoritative signal for how long to wait. classifyBatchResult decides
165+ // whether to honor it (retryable codes) or ignore it (permanent codes).
166+ const retryAfterSeconds = res . ok
167+ ? undefined
168+ : parseRetryAfter (
169+ res . headers . get ( 'Retry-After' ) ,
170+ this . getRateLimitConfig ( ) ?. maxRetryInterval
171+ ) ;
146172
147173 return this . classifyBatchResult (
148174 res ,
@@ -173,7 +199,7 @@ export class SegmentDestination extends DestinationPlugin {
173199 private aggregateErrors ( results : BatchResult [ ] ) : ErrorAggregation {
174200 const aggregation : ErrorAggregation = {
175201 successfulMessageIds : [ ] ,
176- rateLimitResults : [ ] ,
202+ serverDirectedResults : [ ] ,
177203 hasTransientError : false ,
178204 permanentErrorMessageIds : [ ] ,
179205 retryableMessageIds : [ ] ,
@@ -184,8 +210,8 @@ export class SegmentDestination extends DestinationPlugin {
184210 case 'success' :
185211 aggregation . successfulMessageIds . push ( ...result . messageIds ) ;
186212 break ;
187- case '429 ' :
188- aggregation . rateLimitResults . push ( result ) ;
213+ case 'retry_after ' :
214+ aggregation . serverDirectedResults . push ( result ) ;
189215 aggregation . retryableMessageIds . push ( ...result . messageIds ) ;
190216 break ;
191217 case 'transient' :
@@ -256,12 +282,14 @@ export class SegmentDestination extends DestinationPlugin {
256282 return false ;
257283 }
258284
259- const has429 = aggregation . rateLimitResults . length > 0 ;
285+ const hasServerDirectedWait = aggregation . serverDirectedResults . length > 0 ;
260286 let result : RetryResult | undefined ;
261287
262- if ( has429 ) {
263- for ( const r of aggregation . rateLimitResults ) {
264- result = await this . retryManager . handle429 ( r . retryAfterSeconds ?? 60 ) ;
288+ if ( hasServerDirectedWait ) {
289+ for ( const r of aggregation . serverDirectedResults ) {
290+ result = await this . retryManager . handleRetryAfter (
291+ r . retryAfterSeconds ?? 60
292+ ) ;
265293 }
266294 } else if ( aggregation . hasTransientError ) {
267295 result = await this . retryManager . handleTransientError ( ) ;
@@ -316,9 +344,10 @@ export class SegmentDestination extends DestinationPlugin {
316344 aggregation . successfulMessageIds . length -
317345 aggregation . permanentErrorMessageIds . length ;
318346 if ( failedCount > 0 ) {
319- const has429 = aggregation . rateLimitResults . length > 0 ;
347+ const hasServerDirectedWait =
348+ aggregation . serverDirectedResults . length > 0 ;
320349 this . analytics ?. logger . warn (
321- `${ failedCount } events will retry (429 : ${ has429 } , transient: ${ aggregation . hasTransientError } )`
350+ `${ failedCount } events will retry (retry-after : ${ hasServerDirectedWait } , transient: ${ aggregation . hasTransientError } )`
322351 ) ;
323352 }
324353 }
0 commit comments