Skip to content

Commit 3053f0e

Browse files
committed
Retry rate-limited responses and tighten the truncation-repair branch
1 parent 481388a commit 3053f0e

2 files changed

Lines changed: 211 additions & 14 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ All notable changes to Sofos are documented in this file.
66

77
### Fixed
88

9+
- **Rate-limited responses (HTTP 429) are now retried once.** Previously a 429 fell into the same "client error, fail fast" bucket as 4xx codes, so a transient burst limit aborted the call straight away. The retry now uses the server's `Retry-After` delay when present (capped so a misbehaving server can't ask for an hour-long pause), or the usual exponential backoff otherwise, and surrenders after a single retry rather than burning every retry slot on an ongoing limit.
10+
- **Truncated tool arguments with an internal trailing comma are now recovered.** A payload that needed both the `[1,2,]``[1,2]` strip and the "close the missing `}`" repair used to fall through to the raw-arguments fallback because the truncation branch threw away the comma-stripped intermediate; both repairs now apply on the same attempt.
911
- **Streaming responses from OpenAI no longer corrupt multi-byte characters that arrive split across HTTP chunks.** Same chunk-boundary corruption as Anthropic, fixed by buffering raw bytes and decoding only at SSE line boundaries.
1012
- **OpenAI streams that emit nested error envelopes now surface the server's message.** Previously the parser only inspected the flat `{message: "..."}` shape, so the more common `{error: {message: "..."}}` envelope landed as "Unknown streaming error" and the user lost the real reason (rate limit, context overflow, and so on). Both envelopes are now tolerated.
1113
- **Pressing ESC during a long OpenAI streamed response now stops on the very next line instead of finishing the burst.** The interrupt flag used to be checked only between HTTP chunks, so a terminal `response.completed` chunk carrying many SSE lines processed every line before noticing the press; the parser now re-checks between lines too.

src/api/utils.rs

Lines changed: 209 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,12 @@ pub fn parse_tool_arguments(name: &str, args: &str) -> serde_json::Value {
175175

176176
// Truncated mid-string: the response terminated without closing the
177177
// open string literal and the enclosing object. Close the string,
178-
// trim the trailing comma if we cut mid-key, and tack on `}`.
179-
if trimmed.starts_with('{') {
180-
let mut candidate = escape_control_chars_in_json_strings(trimmed);
178+
// trim the trailing comma if we cut mid-key, and tack on `}`. Build
179+
// on top of `escaped` rather than re-running the escape pass on
180+
// `trimmed`, so the intra-JSON trailing-comma stripping done above
181+
// isn't discarded for this attempt.
182+
if escaped.starts_with('{') {
183+
let mut candidate = escaped.clone();
181184
if string_is_open(&candidate) {
182185
candidate.push('"');
183186
}
@@ -343,8 +346,19 @@ pub fn truncate_at_char_boundary(s: &str, max_bytes: usize) -> usize {
343346
i
344347
}
345348

346-
/// Only `ServerError` triggers a retry — transport failures and 4xx
347-
/// statuses fail fast.
349+
/// Upper bound applied to the `Retry-After` value advertised by a 429
350+
/// response. 60 seconds is comfortably above the burst-limit windows
351+
/// the APIs we integrate with use in practice, and short enough that
352+
/// an extreme value (a misbehaving or malicious server asking for
353+
/// hours) can't lock sofos for an unreasonable wait.
354+
const MAX_RATE_LIMIT_RETRY_AFTER: Duration = Duration::from_secs(60);
355+
356+
/// `ServerError` and `RateLimited` trigger a retry — transport failures
357+
/// and other 4xx statuses fail fast. `RateLimited` carries the
358+
/// `Retry-After` value the server asked for, capped at
359+
/// [`MAX_RATE_LIMIT_RETRY_AFTER`]; the retry loop is also capped at one
360+
/// extra attempt for this variant so an ongoing limit doesn't burn
361+
/// through every retry slot waiting.
348362
#[derive(Debug)]
349363
pub enum ApiCallError {
350364
Transport(reqwest::Error),
@@ -353,6 +367,11 @@ pub enum ApiCallError {
353367
status: reqwest::StatusCode,
354368
body: String,
355369
},
370+
/// HTTP 429. Body already drained for error reporting.
371+
RateLimited {
372+
retry_after: Option<Duration>,
373+
body: String,
374+
},
356375
/// Body already drained for error reporting.
357376
ClientError {
358377
status: reqwest::StatusCode,
@@ -362,18 +381,36 @@ pub enum ApiCallError {
362381

363382
impl ApiCallError {
364383
fn is_retryable(&self) -> bool {
365-
matches!(self, Self::ServerError { .. })
384+
matches!(self, Self::ServerError { .. } | Self::RateLimited { .. })
366385
}
367386

368387
fn describe(&self) -> String {
369388
match self {
370389
Self::Transport(e) => format!("Request failed: {}", e),
371390
Self::ServerError { status, .. } => format!("Server error {}", status),
391+
Self::RateLimited { retry_after, .. } => match retry_after {
392+
Some(d) => format!("Rate limited (retry after {:?})", d),
393+
None => "Rate limited".to_string(),
394+
},
372395
Self::ClientError { status, .. } => format!("Client error {}", status),
373396
}
374397
}
375398
}
376399

400+
/// Read the `Retry-After` header in its seconds-since-now form and clamp
401+
/// the result to [`MAX_RATE_LIMIT_RETRY_AFTER`]. RFC 7231 also allows an
402+
/// HTTP-date form, but every API we integrate with uses the seconds
403+
/// form for 429s, so the date form falls back to `None` and the retry
404+
/// loop uses its default exponential delay.
405+
fn parse_retry_after_header(headers: &HeaderMap) -> Option<Duration> {
406+
headers
407+
.get(reqwest::header::RETRY_AFTER)
408+
.and_then(|v| v.to_str().ok())
409+
.and_then(|s| s.trim().parse::<u64>().ok())
410+
.map(Duration::from_secs)
411+
.map(|d| d.min(MAX_RATE_LIMIT_RETRY_AFTER))
412+
}
413+
377414
/// Drains the body on non-2xx so the caller can report it; 2xx responses
378415
/// are returned untouched (important for streaming callers that consume
379416
/// the body later).
@@ -384,9 +421,18 @@ pub async fn classify_response(
384421
if status.is_success() {
385422
return Ok(response);
386423
}
424+
// Grab `Retry-After` before draining the body — `text().await`
425+
// consumes the response and the headers go with it.
426+
let retry_after = if status.as_u16() == 429 {
427+
parse_retry_after_header(response.headers())
428+
} else {
429+
None
430+
};
387431
let body = response.text().await.unwrap_or_default();
388432
if status.is_server_error() {
389433
Err(ApiCallError::ServerError { status, body })
434+
} else if status.as_u16() == 429 {
435+
Err(ApiCallError::RateLimited { retry_after, body })
390436
} else {
391437
Err(ApiCallError::ClientError { status, body })
392438
}
@@ -425,33 +471,55 @@ fn api_call_error_to_sofos(service_name: &str, attempts: u32, e: ApiCallError) -
425471
service_name, status, attempts, body
426472
))
427473
}
474+
ApiCallError::RateLimited { retry_after, body } => SofosError::Api(format!(
475+
"{} rate-limited (HTTP 429{}) after {} attempt(s): {}",
476+
service_name,
477+
match retry_after {
478+
Some(d) => format!(", server asked for {:?}", d),
479+
None => String::new(),
480+
},
481+
attempts,
482+
body
483+
)),
428484
}
429485
}
430486

431-
/// Only retries 5xx responses — timeouts, connection errors, and 4xx
432-
/// all fail fast, since retrying those either re-burns expensive work
433-
/// or re-hits a deterministic client error.
487+
/// Retries 5xx responses and 429 rate-limit responses; transport
488+
/// failures and other 4xx statuses fail fast, since retrying those
489+
/// either re-burns expensive work or re-hits a deterministic client
490+
/// error. A 429 is retried at most once, using the server-supplied
491+
/// `Retry-After` delay (capped at [`MAX_RATE_LIMIT_RETRY_AFTER`]) when
492+
/// present and the exponential-backoff delay otherwise.
434493
pub async fn with_retries<F, Fut, T>(service_name: &str, operation: F) -> Result<T>
435494
where
436495
F: Fn() -> Fut,
437496
Fut: Future<Output = std::result::Result<T, ApiCallError>>,
438497
{
439498
let mut retry_delay = Duration::from_millis(INITIAL_RETRY_DELAY_MS);
499+
let mut next_delay_override: Option<Duration> = None;
500+
let mut rate_limit_attempts: u32 = 0;
501+
const MAX_RATE_LIMIT_RETRIES: u32 = 1;
440502

441503
for attempt in 0..=MAX_RETRIES {
442504
if attempt > 0 {
505+
// Server-supplied `Retry-After` wins over the
506+
// exponential-backoff schedule for one iteration. Jitter is
507+
// applied either way so a synchronised retry storm from
508+
// many clients on the same shared limit doesn't all wake
509+
// up at the same instant.
510+
let base_delay = next_delay_override.take().unwrap_or(retry_delay);
443511
let jitter = rand::rng().random_range(0.0..JITTER_FACTOR);
444-
let jittered_delay = retry_delay.mul_f64(1.0 + jitter);
512+
let jittered_delay = base_delay.mul_f64(1.0 + jitter);
445513

446514
tracing::warn!(
447515
service = service_name,
448516
attempt = attempt,
449517
max_retries = MAX_RETRIES,
450518
delay_ms = jittered_delay.as_millis() as u64,
451-
"Retrying API request after server error"
519+
"Retrying API request after retryable error"
452520
);
453521
eprintln!(
454-
" {} server error, retrying in {:?}... (attempt {}/{})",
522+
" {} retrying in {:?}... (attempt {}/{})",
455523
format!("{}:", service_name).bright_yellow(),
456524
jittered_delay,
457525
attempt,
@@ -465,7 +533,20 @@ where
465533
Ok(result) => return Ok(result),
466534
Err(e) => {
467535
let retryable = e.is_retryable();
468-
if attempt < MAX_RETRIES && retryable {
536+
let is_rate_limited = matches!(e, ApiCallError::RateLimited { .. });
537+
if is_rate_limited {
538+
rate_limit_attempts += 1;
539+
if let ApiCallError::RateLimited {
540+
retry_after: Some(d),
541+
..
542+
} = &e
543+
{
544+
next_delay_override = Some(*d);
545+
}
546+
}
547+
let rate_limit_cap_reached =
548+
is_rate_limited && rate_limit_attempts > MAX_RATE_LIMIT_RETRIES;
549+
if attempt < MAX_RETRIES && retryable && !rate_limit_cap_reached {
469550
continue;
470551
}
471552
let attempts = attempt + 1;
@@ -524,19 +605,114 @@ mod tests {
524605
use super::*;
525606

526607
#[test]
527-
fn api_call_error_is_retryable_only_for_server_error() {
608+
fn api_call_error_is_retryable_for_server_error_and_rate_limited() {
528609
let server = ApiCallError::ServerError {
529610
status: reqwest::StatusCode::INTERNAL_SERVER_ERROR,
530611
body: String::new(),
531612
};
613+
let rate_limited = ApiCallError::RateLimited {
614+
retry_after: Some(Duration::from_secs(2)),
615+
body: String::new(),
616+
};
532617
let client = ApiCallError::ClientError {
533618
status: reqwest::StatusCode::BAD_REQUEST,
534619
body: String::new(),
535620
};
536621
assert!(server.is_retryable());
622+
assert!(rate_limited.is_retryable());
537623
assert!(!client.is_retryable());
538624
}
539625

626+
#[tokio::test]
627+
async fn with_retries_retries_rate_limited_once_then_surrenders() {
628+
// 429 used to fall into `ClientError` and fail on the first
629+
// attempt; now it retries exactly once, honouring the
630+
// server-supplied delay but capped so a long limit doesn't
631+
// burn through every retry slot.
632+
use std::sync::atomic::{AtomicU32, Ordering};
633+
let attempts = AtomicU32::new(0);
634+
let result: Result<&'static str> = with_retries("Test", || {
635+
attempts.fetch_add(1, Ordering::SeqCst);
636+
async move {
637+
Err(ApiCallError::RateLimited {
638+
retry_after: Some(Duration::from_millis(1)),
639+
body: "slow down".into(),
640+
})
641+
}
642+
})
643+
.await;
644+
assert!(result.is_err());
645+
assert_eq!(
646+
attempts.load(Ordering::SeqCst),
647+
2,
648+
"rate-limited responses retry exactly once (one initial attempt plus one retry)"
649+
);
650+
}
651+
652+
#[tokio::test]
653+
async fn with_retries_rate_limited_then_success_returns_value() {
654+
use std::sync::atomic::{AtomicU32, Ordering};
655+
let attempts = AtomicU32::new(0);
656+
let result: Result<&'static str> = with_retries("Test", || {
657+
let n = attempts.fetch_add(1, Ordering::SeqCst);
658+
async move {
659+
if n == 0 {
660+
Err(ApiCallError::RateLimited {
661+
retry_after: Some(Duration::from_millis(1)),
662+
body: "slow down".into(),
663+
})
664+
} else {
665+
Ok("done")
666+
}
667+
}
668+
})
669+
.await;
670+
assert_eq!(result.unwrap(), "done");
671+
assert_eq!(attempts.load(Ordering::SeqCst), 2);
672+
}
673+
674+
#[test]
675+
fn parse_retry_after_reads_seconds_form() {
676+
let mut headers = HeaderMap::new();
677+
headers.insert(reqwest::header::RETRY_AFTER, HeaderValue::from_static("7"));
678+
assert_eq!(
679+
parse_retry_after_header(&headers),
680+
Some(Duration::from_secs(7))
681+
);
682+
}
683+
684+
#[test]
685+
fn parse_retry_after_clamps_oversized_values() {
686+
let mut headers = HeaderMap::new();
687+
headers.insert(
688+
reqwest::header::RETRY_AFTER,
689+
HeaderValue::from_static("9999999"),
690+
);
691+
assert_eq!(
692+
parse_retry_after_header(&headers),
693+
Some(MAX_RATE_LIMIT_RETRY_AFTER)
694+
);
695+
}
696+
697+
#[test]
698+
fn parse_retry_after_returns_none_for_http_date_form() {
699+
// The HTTP-date form is valid per RFC 7231 but no API we
700+
// integrate with uses it for 429. Falling back to `None`
701+
// lets the retry loop pick its default exponential delay
702+
// rather than hard-failing on the parse.
703+
let mut headers = HeaderMap::new();
704+
headers.insert(
705+
reqwest::header::RETRY_AFTER,
706+
HeaderValue::from_static("Wed, 21 Oct 2026 07:28:00 GMT"),
707+
);
708+
assert!(parse_retry_after_header(&headers).is_none());
709+
}
710+
711+
#[test]
712+
fn parse_retry_after_returns_none_when_header_absent() {
713+
assert!(parse_retry_after_header(&HeaderMap::new()).is_none());
714+
}
715+
540716
#[tokio::test]
541717
async fn with_retries_retries_server_error_then_succeeds() {
542718
use std::sync::atomic::{AtomicU32, Ordering};
@@ -813,6 +989,25 @@ mod tests {
813989
assert!(v.is_array());
814990
}
815991

992+
#[test]
993+
fn parse_args_truncated_payload_keeps_intra_json_trailing_comma_fix() {
994+
// Regression: the truncation-repair branch used to re-build
995+
// its candidate from `trimmed` and re-run only the control-char
996+
// escape. The intra-JSON `,]` / `,}` strip done earlier was
997+
// discarded for this branch, so a payload that needed BOTH
998+
// repairs (an internal trailing comma AND the missing closing
999+
// brace) fell through to the raw-arguments fallback.
1000+
let v = parse_tool_arguments("write_file", r#"{"path":"a","items":[1,2,]"#);
1001+
assert_eq!(
1002+
v["path"], "a",
1003+
"the truncation branch must consume the comma-stripped intermediate"
1004+
);
1005+
let items = v["items"].as_array().expect("items array recovered");
1006+
assert_eq!(items.len(), 2);
1007+
assert_eq!(items[0], 1);
1008+
assert_eq!(items[1], 2);
1009+
}
1010+
8161011
#[test]
8171012
fn string_is_open_detects_unterminated_literal() {
8181013
assert!(string_is_open(r#"{"a":"b"#));

0 commit comments

Comments
 (0)