Skip to content

Commit 6e14402

Browse files
didiergarciaclaude
andauthored
feat: implement TAPI retry handling - Phase 1 (State Machine Core) (#293)
* feat: add retry system data structures - Add RetryState with pipeline state and batch metadata - Add RetryConfig with rate limit and backoff configuration - Add enums and sealed classes for decisions - All immutable data classes with kotlinx.serialization support Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * feat: add TimeProvider abstraction for testing - TimeProvider interface for time access - SystemTimeProvider for production - FakeTimeProvider for deterministic tests Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * feat: implement status code resolution with tests - Add status code behavior resolution (overrides -> defaults) - Handle retryable errors with exponential backoff - Drop non-retryable errors - Comprehensive test coverage for 4xx, 5xx, unknown codes Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * feat: implement 429 rate limiting - Add handleRateLimitResponse for global pipeline state - Transition to RATE_LIMITED on 429 - Increment globalRetryCount, reset on success - Clamp Retry-After to maxRetryInterval Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * feat: implement shouldUploadBatch with upload gate logic - Check global rate limiting (skip all batches) - Check per-batch backoff time (skip this batch) - Check max retries and duration (drop batch) - Return explicit upload decisions Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * feat: implement getRetryCount for X-Retry-Count header - Return max of per-batch failureCount and global retry count - Return 0 for new batches (first attempt) - Tests verify header value calculation Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * feat: implement legacy mode fallback - When both configs disabled, revert to legacy behavior - Try all batches on every flush - Keep on 429/5xx, drop on 4xx (not 429) - Emergency escape hatch if smart retry has bugs Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> * fix: address co-pilot review feedback - Inject Random provider for deterministic backoff jitter - Clear stale rate limit state when expired in shouldUploadBatch - Rename retryAfterMs to waitUntilTimeMs for clarity - Update test comment to clarify positive-only jitter - Clamp backoff after jitter to enforce maxBackoffInterval hard ceiling Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 689beab commit 6e14402

7 files changed

Lines changed: 648 additions & 0 deletions

File tree

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.segment.analytics.kotlin.core.retry
2+
3+
import kotlinx.serialization.Serializable
4+
5+
@Serializable
6+
data class RetryConfig(
7+
val rateLimitConfig: RateLimitConfig = RateLimitConfig(),
8+
val backoffConfig: BackoffConfig = BackoffConfig()
9+
)
10+
11+
@Serializable
12+
data class RateLimitConfig(
13+
val enabled: Boolean = true,
14+
val maxRetryCount: Int = 100,
15+
val maxRetryInterval: Int = 300,
16+
val maxTotalBackoffDuration: Long = 43200
17+
)
18+
19+
@Serializable
20+
data class BackoffConfig(
21+
val enabled: Boolean = true,
22+
val maxRetryCount: Int = 100,
23+
val baseBackoffInterval: Double = 0.5,
24+
val maxBackoffInterval: Int = 300,
25+
val maxTotalBackoffDuration: Long = 43200,
26+
val jitterPercent: Int = 10,
27+
val default4xxBehavior: RetryBehavior = RetryBehavior.DROP,
28+
val default5xxBehavior: RetryBehavior = RetryBehavior.RETRY,
29+
val unknownCodeBehavior: RetryBehavior = RetryBehavior.DROP,
30+
val statusCodeOverrides: Map<Int, RetryBehavior> = mapOf(
31+
408 to RetryBehavior.RETRY,
32+
410 to RetryBehavior.RETRY,
33+
429 to RetryBehavior.RETRY,
34+
460 to RetryBehavior.RETRY,
35+
501 to RetryBehavior.DROP,
36+
505 to RetryBehavior.DROP
37+
)
38+
)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.segment.analytics.kotlin.core.retry
2+
3+
import kotlinx.serialization.Serializable
4+
5+
@Serializable
6+
data class RetryState(
7+
val pipelineState: PipelineState = PipelineState.READY,
8+
val waitUntilTime: Long? = null,
9+
val globalRetryCount: Int = 0,
10+
val batchMetadata: Map<String, BatchMetadata> = emptyMap()
11+
) {
12+
fun isRateLimited(currentTime: Long): Boolean =
13+
pipelineState == PipelineState.RATE_LIMITED &&
14+
waitUntilTime?.let { currentTime < it } == true
15+
}
16+
17+
@Serializable
18+
data class BatchMetadata(
19+
val failureCount: Int = 0,
20+
val nextRetryTime: Long? = null,
21+
val firstFailureTime: Long? = null
22+
) {
23+
fun shouldRetry(currentTime: Long): Boolean =
24+
nextRetryTime?.let { currentTime >= it } ?: true
25+
26+
fun exceedsMaxDuration(currentTime: Long, maxDuration: Long): Boolean =
27+
firstFailureTime?.let { (currentTime - it) > maxDuration } ?: false
28+
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package com.segment.analytics.kotlin.core.retry
2+
3+
import kotlin.random.Random
4+
5+
class RetryStateMachine(
6+
private val config: RetryConfig,
7+
private val timeProvider: TimeProvider = SystemTimeProvider(),
8+
private val random: Random = Random.Default
9+
) {
10+
private val isLegacyMode: Boolean
11+
get() = !config.rateLimitConfig.enabled && !config.backoffConfig.enabled
12+
13+
fun handleResponse(
14+
state: RetryState,
15+
response: ResponseInfo
16+
): RetryState {
17+
// Legacy mode: both configs disabled
18+
if (isLegacyMode) {
19+
return when {
20+
response.statusCode in 200..299 -> state.removeBatch(response.batchFile)
21+
response.statusCode == 429 || response.statusCode in 500..599 -> state // Keep
22+
else -> state.removeBatch(response.batchFile) // Drop on 4xx
23+
}
24+
}
25+
26+
val currentTime = response.currentTime
27+
val behavior = resolveStatusCodeBehavior(response.statusCode)
28+
29+
return when {
30+
response.statusCode in 200..299 -> {
31+
state.copy(
32+
pipelineState = PipelineState.READY,
33+
waitUntilTime = null,
34+
globalRetryCount = 0,
35+
batchMetadata = state.batchMetadata - response.batchFile
36+
)
37+
}
38+
39+
response.statusCode == 429 && config.rateLimitConfig.enabled -> {
40+
handleRateLimitResponse(state, response, currentTime)
41+
}
42+
43+
behavior == RetryBehavior.RETRY && config.backoffConfig.enabled -> {
44+
handleRetryableError(state, response, currentTime)
45+
}
46+
47+
else -> {
48+
state.removeBatch(response.batchFile)
49+
}
50+
}
51+
}
52+
53+
private fun handleRateLimitResponse(
54+
state: RetryState,
55+
response: ResponseInfo,
56+
currentTime: Long
57+
): RetryState {
58+
val waitUntilTimeMs = calculateWaitUntilTimeMs(response.retryAfterSeconds, currentTime)
59+
60+
return state.copy(
61+
pipelineState = PipelineState.RATE_LIMITED,
62+
waitUntilTime = waitUntilTimeMs,
63+
globalRetryCount = state.globalRetryCount + 1
64+
)
65+
}
66+
67+
private fun calculateWaitUntilTimeMs(retryAfterSeconds: Int?, currentTime: Long): Long {
68+
val seconds = retryAfterSeconds?.coerceAtLeast(0) ?: config.rateLimitConfig.maxRetryInterval
69+
val clampedSeconds = minOf(seconds, config.rateLimitConfig.maxRetryInterval)
70+
return currentTime + (clampedSeconds * 1000L)
71+
}
72+
73+
private fun handleRetryableError(
74+
state: RetryState,
75+
response: ResponseInfo,
76+
currentTime: Long
77+
): RetryState {
78+
val existingMetadata = state.batchMetadata[response.batchFile]
79+
val newFailureCount = (existingMetadata?.failureCount ?: 0) + 1
80+
val firstFailureTime = existingMetadata?.firstFailureTime ?: currentTime
81+
val nextRetryTime = currentTime + calculateBackoffMs(newFailureCount)
82+
83+
val newMetadata = BatchMetadata(
84+
failureCount = newFailureCount,
85+
nextRetryTime = nextRetryTime,
86+
firstFailureTime = firstFailureTime
87+
)
88+
89+
return state.copy(
90+
batchMetadata = state.batchMetadata + (response.batchFile to newMetadata)
91+
)
92+
}
93+
94+
private fun calculateBackoffMs(failureCount: Int): Long {
95+
val base = config.backoffConfig.baseBackoffInterval * 1000
96+
val max = config.backoffConfig.maxBackoffInterval * 1000L
97+
98+
val exponentialBackoff = base * Math.pow(2.0, (failureCount - 1).toDouble())
99+
val cappedBackoff = minOf(exponentialBackoff, max.toDouble())
100+
101+
val jitterAmount = cappedBackoff * (config.backoffConfig.jitterPercent / 100.0)
102+
val jitter = (random.nextDouble() * jitterAmount).toLong()
103+
104+
return minOf(cappedBackoff + jitter, max.toDouble()).toLong()
105+
}
106+
107+
fun shouldUploadBatch(
108+
state: RetryState,
109+
batchFile: String
110+
): Pair<UploadDecision, RetryState> {
111+
// Legacy mode: skip all smart retry logic
112+
if (isLegacyMode) {
113+
return UploadDecision.Proceed to state
114+
}
115+
116+
val currentTime = timeProvider.currentTimeMillis()
117+
118+
// Check 1: Global rate limiting
119+
if (state.isRateLimited(currentTime)) {
120+
return UploadDecision.SkipAllBatches to state
121+
}
122+
123+
// Clear stale rate limit state if it has expired
124+
val clearedState = if (state.pipelineState == PipelineState.RATE_LIMITED &&
125+
state.waitUntilTime != null &&
126+
currentTime >= state.waitUntilTime) {
127+
state.copy(
128+
pipelineState = PipelineState.READY,
129+
waitUntilTime = null
130+
)
131+
} else {
132+
state
133+
}
134+
135+
// Check 2: Per-batch metadata
136+
val metadata = clearedState.batchMetadata[batchFile]
137+
if (metadata != null) {
138+
// Check retry count limit (must be checked before duration per spec)
139+
if (config.backoffConfig.enabled &&
140+
metadata.failureCount >= config.backoffConfig.maxRetryCount) {
141+
return UploadDecision.DropBatch(DropReason.MAX_RETRIES_EXCEEDED) to
142+
clearedState.removeBatch(batchFile)
143+
}
144+
145+
// Check duration limit
146+
if (config.backoffConfig.enabled &&
147+
metadata.exceedsMaxDuration(currentTime, config.backoffConfig.maxTotalBackoffDuration * 1000)) {
148+
return UploadDecision.DropBatch(DropReason.MAX_DURATION_EXCEEDED) to
149+
clearedState.removeBatch(batchFile)
150+
}
151+
152+
// Check if backoff time has passed
153+
if (config.backoffConfig.enabled && !metadata.shouldRetry(currentTime)) {
154+
return UploadDecision.SkipThisBatch to clearedState
155+
}
156+
}
157+
158+
return UploadDecision.Proceed to clearedState
159+
}
160+
161+
fun getRetryCount(state: RetryState, batchFile: String): Int {
162+
val batchRetryCount = state.batchMetadata[batchFile]?.failureCount ?: 0
163+
return maxOf(batchRetryCount, state.globalRetryCount)
164+
}
165+
166+
private fun resolveStatusCodeBehavior(code: Int): RetryBehavior {
167+
config.backoffConfig.statusCodeOverrides[code]?.let { return it }
168+
169+
return when (code) {
170+
in 400..499 -> config.backoffConfig.default4xxBehavior
171+
in 500..599 -> config.backoffConfig.default5xxBehavior
172+
else -> config.backoffConfig.unknownCodeBehavior
173+
}
174+
}
175+
}
176+
177+
private fun RetryState.removeBatch(batchFile: String): RetryState {
178+
return copy(batchMetadata = batchMetadata - batchFile)
179+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.segment.analytics.kotlin.core.retry
2+
3+
import kotlinx.serialization.Serializable
4+
5+
@Serializable
6+
enum class PipelineState {
7+
READY,
8+
RATE_LIMITED
9+
}
10+
11+
@Serializable
12+
enum class RetryBehavior {
13+
RETRY,
14+
DROP
15+
}
16+
17+
@Serializable
18+
enum class DropReason {
19+
MAX_RETRIES_EXCEEDED,
20+
MAX_DURATION_EXCEEDED,
21+
NON_RETRYABLE_ERROR
22+
}
23+
24+
sealed class UploadDecision {
25+
object Proceed : UploadDecision()
26+
object SkipThisBatch : UploadDecision()
27+
object SkipAllBatches : UploadDecision()
28+
data class DropBatch(val reason: DropReason) : UploadDecision()
29+
}
30+
31+
@Serializable
32+
data class ResponseInfo(
33+
val statusCode: Int,
34+
val retryAfterSeconds: Int? = null,
35+
val batchFile: String,
36+
val currentTime: Long
37+
)
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.segment.analytics.kotlin.core.retry
2+
3+
interface TimeProvider {
4+
fun currentTimeMillis(): Long
5+
}
6+
7+
class SystemTimeProvider : TimeProvider {
8+
override fun currentTimeMillis(): Long = System.currentTimeMillis()
9+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.segment.analytics.kotlin.core.retry
2+
3+
class FakeTimeProvider(private var currentTime: Long = 0L) : TimeProvider {
4+
override fun currentTimeMillis(): Long = currentTime
5+
6+
fun setTime(millis: Long) {
7+
currentTime = millis
8+
}
9+
10+
fun advanceBy(millis: Long) {
11+
currentTime += millis
12+
}
13+
}

0 commit comments

Comments
 (0)