Skip to content

Commit 4d9c0df

Browse files
feat: Add smart retry system with rate limiting and exponential backoff (#426)
feat: Add smart retry system with rate limiting and exponential backoff Co-authored-by: Michael Grosse Huelsewiesche <mihuelsewiesche@twilio.com>
1 parent acfd5fc commit 4d9c0df

23 files changed

Lines changed: 2414 additions & 84 deletions

Sources/Segment/Configuration.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ public class Configuration {
127127
var storageMode: StorageMode = .disk
128128
var anonymousIdGenerator: AnonymousIdGenerator = SegmentAnonymousId()
129129
var httpSession: (() -> any HTTPSession) = HTTPSessions.urlSession
130+
var httpConfig: HttpConfig? = nil
130131
}
131132

132133
internal var values: Values
@@ -361,6 +362,16 @@ extension Configuration {
361362
values.httpSession = httpSession
362363
return self
363364
}
365+
366+
/// Sets HTTP retry configuration for rate limiting and exponential backoff.
367+
/// When nil (default), retry system is disabled (legacy mode).
368+
/// - Parameter config: HttpConfig with rate limit and backoff settings
369+
/// - Returns: The current Configuration
370+
@discardableResult
371+
public func httpConfig(_ config: HttpConfig?) -> Configuration {
372+
values.httpConfig = config
373+
return self
374+
}
364375
}
365376

366377
extension Analytics {

Sources/Segment/Plugins/SegmentDestination.swift

Lines changed: 93 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -66,32 +66,35 @@ public class SegmentDestination: DestinationPlugin, Subscriber, FlushCompletion
6666
public func update(settings: Settings, type: UpdateType) {
6767
guard let analytics = analytics else { return }
6868
let segmentInfo = settings.integrationSettings(forKey: self.key)
69-
// if customer cycles out a writekey at app.segment.com, this is necessary.
70-
/*
71-
This actually works differently than anticipated. It was thought that when a writeKey was
72-
revoked, it's old writekey would redirect to the new, but it doesn't work this way. As a result
73-
it doesn't appear writekey can be changed remotely. Leaving this here in case that changes in the
74-
near future (written on 10/29/2022).
75-
*/
76-
/*
77-
if let key = segmentInfo?[Self.Constants.apiKey.rawValue] as? String, key.isEmpty == false {
78-
if key != analytics.configuration.values.writeKey {
79-
/*
80-
- would need to flush.
81-
- would need to change the writeKey across the system.
82-
- would need to re-init storage.
83-
- probably other things too ...
84-
*/
85-
}
86-
}
87-
*/
69+
var needsHTTPClientRebuild = false
70+
8871
// if customer specifies a different apiHost (ie: eu1.segmentapis.com) at app.segment.com ...
8972
if let host = segmentInfo?[Self.Constants.apiHost.rawValue] as? String, host.isEmpty == false {
9073
if host != analytics.configuration.values.apiHost {
9174
analytics.configuration.values.apiHost = host
92-
httpClient = HTTPClient(analytics: analytics)
75+
needsHTTPClientRebuild = true
9376
}
9477
}
78+
79+
// Read httpConfig from CDN settings at integrations["Segment.io"].httpConfig
80+
if let httpConfigDict = segmentInfo?["httpConfig"] as? [String: Any],
81+
let data = try? JSONSerialization.data(withJSONObject: httpConfigDict),
82+
var cdnConfig = try? JSONDecoder.default.decode(HttpConfig.self, from: data) {
83+
// CDN-sourced config defaults enabled to true (presence of httpConfig implies active).
84+
// Only honor explicit `enabled: false` from CDN.
85+
let rlDict = httpConfigDict["rateLimitConfig"] as? [String: Any]
86+
cdnConfig.rateLimitConfig.enabled = (rlDict?["enabled"] as? Bool) ?? true
87+
88+
let boDict = httpConfigDict["backoffConfig"] as? [String: Any]
89+
cdnConfig.backoffConfig.enabled = (boDict?["enabled"] as? Bool) ?? true
90+
91+
analytics.configuration.values.httpConfig = cdnConfig
92+
needsHTTPClientRebuild = true
93+
}
94+
95+
if needsHTTPClientRebuild {
96+
httpClient = HTTPClient(analytics: analytics)
97+
}
9598
}
9699

97100
// MARK: - Event Handling Methods
@@ -181,12 +184,18 @@ extension SegmentDestination {
181184
case .success(_):
182185
storage.remove(data: [url])
183186
cleanupUploads()
184-
185-
// we don't want to retry events in a given batch when a 400
186-
// response for malformed JSON is returned
187-
case .failure(Segment.HTTPClientErrors.statusCode(code: 400)):
187+
188+
// Batch was dropped by the retry state machine (e.g. max retries exceeded)
189+
case .failure(Segment.HTTPClientErrors.badRequest):
188190
storage.remove(data: [url])
189191
cleanupUploads()
192+
193+
// Non-retryable status codes should also drop the batch
194+
case .failure(Segment.HTTPClientErrors.statusCode(let code)):
195+
if httpClient.shouldDropBatch(forStatusCode: code) {
196+
storage.remove(data: [url])
197+
}
198+
cleanupUploads()
190199
default:
191200
break
192201
}
@@ -216,59 +225,86 @@ extension SegmentDestination {
216225
guard let storage = self.storage else { return }
217226
guard let analytics = self.analytics else { return }
218227
guard let httpClient = self.httpClient else { return }
219-
228+
220229
let totalCount = storage.dataStore.count
221-
var currentCount = 0
222-
223230
guard totalCount > 0 else { return }
224-
225-
while currentCount < totalCount {
231+
232+
// Process events in flushAt-sized batches so each batch is independent,
233+
// matching file mode behavior where each file gets its own upload.
234+
// This prevents failed retry events from being merged with new events.
235+
let batchSize = max(analytics.configuration.values.flushAt, 1)
236+
var offset = 0
237+
238+
while offset < totalCount {
226239
// can't imagine why we wouldn't get data at this point, but if we don't, then split.
227-
guard let eventData = storage.dataStore.fetch() else { return }
228-
guard let data = eventData.data else { return }
229-
guard let removable = eventData.removable else { return }
230-
guard let dataCount = eventData.removable?.count else { return }
231-
232-
currentCount += dataCount
233-
240+
guard let eventData = storage.dataStore.fetch(count: batchSize, offset: offset) else { break }
241+
guard let data = eventData.data else { break }
242+
guard let removable = eventData.removable else { break }
243+
guard let dataCount = eventData.removable?.count else { break }
244+
245+
// Generate a stable batch identifier from the data content
246+
let batchId = "mem-\(data.hashValue)"
247+
248+
// Check retry state machine before uploading
249+
let decision = httpClient.checkBatchUpload(batchId: batchId)
250+
switch decision {
251+
case .skipAllBatches:
252+
// Rate limited globally — stop processing all batches
253+
return
254+
case .skipThisBatch:
255+
// Backoff in effect for this batch — skip it, try the next
256+
offset += dataCount
257+
continue
258+
case .dropBatch:
259+
// Max retries or duration exceeded — drop the data
260+
analytics.log(message: "Dropping batch \(batchId): retry limit exceeded")
261+
analytics.reportInternalError(AnalyticsError.batchUploadFail(AnalyticsError.networkServerRejected(nil, 0)))
262+
storage.remove(data: removable)
263+
// Don't advance offset — removal shifted the array
264+
continue
265+
case .proceed:
266+
break
267+
}
268+
234269
// enter for this data we're going to kick off
235270
group.enter()
236271
analytics.log(message: "Processing In-Memory Batch (size: \(data.count))")
237-
272+
238273
// we're already on a separate thread.
239274
// lets let this task complete so we can get all the values out.
240275
let semaphore = DispatchSemaphore(value: 0)
241-
276+
var didRemove = false
277+
242278
// set up the task
243-
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, data: data) { [weak self] result in
279+
let uploadTask = httpClient.startBatchUpload(writeKey: analytics.configuration.values.writeKey, data: data, batchId: batchId) { [weak self] result in
244280
defer {
245281
// leave for the url we kicked off.
246282
group.leave()
247283
semaphore.signal()
248284
}
249-
285+
250286
guard let self else { return }
251287
switch result {
252288
case .success(_):
253289
storage.remove(data: removable)
290+
didRemove = true
254291
cleanupUploads()
255-
256-
// we don't want to retry events in a given batch when a 400
257-
// response for malformed JSON is returned
258-
case .failure(Segment.HTTPClientErrors.statusCode(code: 400)):
259-
storage.remove(data: removable)
292+
293+
// Non-retryable status codes should drop the batch
294+
case .failure(Segment.HTTPClientErrors.statusCode(let code)):
295+
if httpClient.shouldDropBatch(forStatusCode: code) {
296+
storage.remove(data: removable)
297+
didRemove = true
298+
}
260299
cleanupUploads()
261300
default:
262301
break
263302
}
264-
303+
265304
analytics.log(message: "Processed In-Memory Batch (size: \(data.count))")
266-
// the upload we have here has just finished.
267-
// make sure it gets removed and it's cleanup() called rather
268-
// than waiting on the next flush to come around.
269305
cleanupUploads()
270306
}
271-
307+
272308
// we have a legit upload in progress now, so add it to our list.
273309
if let upload = uploadTask {
274310
add(uploadTask: UploadTaskInfo(url: nil, data: data, task: upload))
@@ -277,8 +313,14 @@ extension SegmentDestination {
277313
group.leave()
278314
semaphore.signal()
279315
}
280-
316+
281317
_ = semaphore.wait(timeout: .distantFuture)
318+
319+
// If items were removed, the array shifted — offset stays.
320+
// If items stayed (retryable failure), advance offset past them.
321+
if !didRemove {
322+
offset += dataCount
323+
}
282324
}
283325
}
284326
}

Sources/Segment/Utilities/Networking/HTTPClient.swift

Lines changed: 93 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ public enum HTTPClientErrors: Error {
1515
case failedToOpenBatch
1616
case statusCode(code: Int)
1717
case unknown(error: Error)
18+
case rateLimited
19+
case badRequest
1820
}
1921

2022
public class HTTPClient {
@@ -28,14 +30,28 @@ public class HTTPClient {
2830

2931
private weak var analytics: Analytics?
3032

31-
init(analytics: Analytics) {
33+
private var retryStateMachine: RetryStateMachine?
34+
private var retryState: RetryState
35+
private let timeProvider: TimeProvider
36+
37+
init(analytics: Analytics, timeProvider: TimeProvider = SystemTimeProvider()) {
3238
self.analytics = analytics
3339

3440
self.apiKey = analytics.configuration.values.writeKey
3541
self.apiHost = analytics.configuration.values.apiHost
3642
self.cdnHost = analytics.configuration.values.cdnHost
37-
43+
3844
self.session = analytics.configuration.values.httpSession()
45+
self.timeProvider = timeProvider
46+
47+
// Initialize retry system if httpConfig provided
48+
if let httpConfig = analytics.configuration.values.httpConfig {
49+
self.retryStateMachine = RetryStateMachine(config: httpConfig, timeProvider: timeProvider)
50+
self.retryState = analytics.storage.loadRetryState()
51+
} else {
52+
self.retryStateMachine = nil
53+
self.retryState = RetryState() // Legacy mode
54+
}
3955
}
4056

4157
func segmentURL(for host: String, path: String) -> URL? {
@@ -59,11 +75,40 @@ public class HTTPClient {
5975
return nil
6076
}
6177

62-
let urlRequest = configuredRequest(for: uploadURL, method: "POST")
78+
// Check if we should upload this batch
79+
if let stateMachine = retryStateMachine {
80+
let (decision, updatedState) = stateMachine.shouldUploadBatch(state: retryState, batchFile: batch.lastPathComponent)
81+
retryState = updatedState
82+
analytics?.storage.saveRetryState(retryState)
83+
84+
switch decision {
85+
case .skipAllBatches, .skipThisBatch:
86+
completion(.failure(HTTPClientErrors.rateLimited))
87+
return nil
88+
case .dropBatch:
89+
analytics?.reportInternalError(AnalyticsError.batchUploadFail(AnalyticsError.networkServerRejected(nil, 0)))
90+
completion(.failure(HTTPClientErrors.badRequest))
91+
return nil
92+
case .proceed:
93+
break // Continue with upload
94+
}
95+
}
96+
97+
var urlRequest = configuredRequest(for: uploadURL, method: "POST")
98+
99+
let batchFileName = batch.lastPathComponent
100+
101+
// Add X-Retry-Count header
102+
if let stateMachine = retryStateMachine {
103+
let retryCount = stateMachine.getRetryCount(state: retryState, batchFile: batchFileName)
104+
if retryCount > 0 {
105+
urlRequest.addValue("\(retryCount)", forHTTPHeaderField: "X-Retry-Count")
106+
}
107+
}
63108

64109
let dataTask = session.uploadTask(with: urlRequest, fromFile: batch) { [weak self] (data, response, error) in
65110
guard let self else { return }
66-
handleResponse(data: data, response: response, error: error, url: uploadURL, completion: completion)
111+
handleResponse(data: data, response: response, error: error, url: uploadURL, batchFile: batchFileName, completion: completion)
67112
}
68113

69114
dataTask.resume()
@@ -77,30 +122,54 @@ public class HTTPClient {
77122
/// - batch: The array of the events, considered a batch of events.
78123
/// - completion: The closure executed when done. Passes if the task should be retried or not if failed.
79124
@discardableResult
80-
func startBatchUpload(writeKey: String, data: Data, completion: @escaping (_ result: Result<Bool, Error>) -> Void) -> (any UploadTask)? {
125+
func startBatchUpload(writeKey: String, data: Data, batchId: String, completion: @escaping (_ result: Result<Bool, Error>) -> Void) -> (any UploadTask)? {
81126
guard let uploadURL = segmentURL(for: apiHost, path: "/b") else {
82127
self.analytics?.reportInternalError(HTTPClientErrors.failedToOpenBatch)
83128
completion(.failure(HTTPClientErrors.failedToOpenBatch))
84129
return nil
85130
}
86-
87-
let urlRequest = configuredRequest(for: uploadURL, method: "POST")
131+
132+
var urlRequest = configuredRequest(for: uploadURL, method: "POST")
133+
134+
// Add X-Retry-Count header
135+
if let stateMachine = retryStateMachine {
136+
let retryCount = stateMachine.getRetryCount(state: retryState, batchFile: batchId)
137+
if retryCount > 0 {
138+
urlRequest.addValue("\(retryCount)", forHTTPHeaderField: "X-Retry-Count")
139+
}
140+
}
88141

89142
let dataTask = session.uploadTask(with: urlRequest, from: data) { [weak self] (data, response, error) in
90143
guard let self else { return }
91-
handleResponse(data: data, response: response, error: error, url: uploadURL, completion: completion)
144+
handleResponse(data: data, response: response, error: error, url: uploadURL, batchFile: batchId, completion: completion)
92145
}
93-
146+
94147
dataTask.resume()
95148
return dataTask
96149
}
97150

98-
private func handleResponse(data: Data?, response: URLResponse?, error: Error?, url: URL?, completion: @escaping (_ result: Result<Bool, Error>) -> Void) {
151+
private func extractRetryAfter(from response: HTTPURLResponse) -> Int? {
152+
return response.value(forHTTPHeaderField: "Retry-After").flatMap { Int($0) }
153+
}
154+
155+
private func handleResponse(data: Data?, response: URLResponse?, error: Error?, url: URL?, batchFile: String, completion: @escaping (_ result: Result<Bool, Error>) -> Void) {
99156
if let error = error {
100157
analytics?.log(message: "Error uploading request \(error.localizedDescription).")
101158
analytics?.reportInternalError(AnalyticsError.networkUnknown(url, error))
102159
completion(.failure(HTTPClientErrors.unknown(error: error)))
103160
} else if let httpResponse = response as? HTTPURLResponse {
161+
// Update retry state after response
162+
if let stateMachine = retryStateMachine {
163+
let responseInfo = ResponseInfo(
164+
statusCode: httpResponse.statusCode,
165+
retryAfterSeconds: extractRetryAfter(from: httpResponse),
166+
batchFile: batchFile,
167+
currentTime: timeProvider.now()
168+
)
169+
retryState = stateMachine.handleResponse(state: retryState, response: responseInfo)
170+
analytics?.storage.saveRetryState(retryState)
171+
}
172+
104173
switch (httpResponse.statusCode) {
105174
case 1..<300:
106175
completion(.success(true))
@@ -161,6 +230,20 @@ public class HTTPClient {
161230
dataTask.resume()
162231
}
163232

233+
/// Returns true if the given status code should cause the batch to be dropped (not retried).
234+
func shouldDropBatch(forStatusCode code: Int) -> Bool {
235+
return retryStateMachine?.shouldDropBatch(statusCode: code) ?? (code == 400)
236+
}
237+
238+
/// Check if a batch should be uploaded, and update retry state accordingly.
239+
func checkBatchUpload(batchId: String) -> UploadDecision {
240+
guard let stateMachine = retryStateMachine else { return .proceed }
241+
let (decision, updatedState) = stateMachine.shouldUploadBatch(state: retryState, batchFile: batchId)
242+
retryState = updatedState
243+
analytics?.storage.saveRetryState(retryState)
244+
return decision
245+
}
246+
164247
deinit {
165248
// finish any tasks that may be processing
166249
session.finishTasksAndInvalidate()

0 commit comments

Comments
 (0)