@@ -120,7 +120,7 @@ impl Flusher {
120120 match resp {
121121 Ok ( resp) => {
122122 let status = resp. status ( ) ;
123- _ = resp. text ( ) . await ;
123+ let body = resp. text ( ) . await . unwrap_or_default ( ) ;
124124 if status == StatusCode :: FORBIDDEN {
125125 // Access denied. Stop retrying.
126126 error ! (
@@ -131,6 +131,22 @@ impl Flusher {
131131 if status. is_success ( ) {
132132 return Ok ( ( ) ) ;
133133 }
134+ if attempts >= FLUSH_RETRY_COUNT {
135+ // Non-success HTTP response (e.g. 4xx/5xx from an OPW or
136+ // intake under load) — surface the request for later retry
137+ // instead of looping unbounded against a degraded endpoint.
138+ error ! (
139+ "LOGS | Failed to send request after {} ms and {} attempts: status {status}, body: {body}" ,
140+ elapsed. as_millis( ) ,
141+ attempts,
142+ ) ;
143+ return Err ( Box :: new ( FailedRequestError {
144+ request : req,
145+ message : format ! (
146+ "LOGS | Failed after {attempts} attempts: status {status}"
147+ ) ,
148+ } ) ) ;
149+ }
134150 }
135151 Err ( e) => {
136152 if attempts >= FLUSH_RETRY_COUNT {
@@ -309,3 +325,225 @@ impl LogsFlusher {
309325 encoder. finish ( ) . map_err ( |e| Box :: new ( e) as Box < dyn Error > )
310326 }
311327}
328+
329+ #[ cfg( test) ]
330+ mod tests {
331+ use super :: * ;
332+ use httpmock:: prelude:: * ;
333+ use std:: sync:: atomic:: Ordering ;
334+ use std:: time:: Duration ;
335+
336+ fn build_request ( server : & MockServer , timeout : Duration ) -> reqwest:: RequestBuilder {
337+ reqwest:: Client :: new ( )
338+ . post ( server. url ( "/api/v2/logs" ) )
339+ . timeout ( timeout)
340+ . body ( "test" )
341+ }
342+
343+ #[ tokio:: test]
344+ async fn send_returns_ok_on_success ( ) {
345+ let server = MockServer :: start ( ) ;
346+ let mock = server. mock ( |when, then| {
347+ when. method ( POST ) . path ( "/api/v2/logs" ) ;
348+ then. status ( 200 ) ;
349+ } ) ;
350+
351+ let result = Flusher :: send ( build_request ( & server, Duration :: from_secs ( 2 ) ) ) . await ;
352+
353+ assert ! ( result. is_ok( ) , "2xx response should return Ok immediately" ) ;
354+ mock. assert_hits ( 1 ) ;
355+ }
356+
357+ #[ tokio:: test]
358+ async fn send_returns_ok_on_forbidden_without_retry ( ) {
359+ let server = MockServer :: start ( ) ;
360+ let mock = server. mock ( |when, then| {
361+ when. method ( POST ) . path ( "/api/v2/logs" ) ;
362+ then. status ( 403 ) ;
363+ } ) ;
364+
365+ let result = Flusher :: send ( build_request ( & server, Duration :: from_secs ( 2 ) ) ) . await ;
366+
367+ assert ! (
368+ result. is_ok( ) ,
369+ "403 is permanent (bad API key) — drop, do not retry"
370+ ) ;
371+ mock. assert_hits ( 1 ) ;
372+ }
373+
374+ /// Regression test for SLES-2843: a persistent non-success, non-403 status
375+ /// (e.g. 500 from an OPW under load) must respect `FLUSH_RETRY_COUNT`
376+ /// instead of looping unbounded and hammering the endpoint.
377+ #[ tokio:: test]
378+ async fn send_bounds_retries_on_non_success_status ( ) {
379+ let server = MockServer :: start ( ) ;
380+ let mock = server. mock ( |when, then| {
381+ when. method ( POST ) . path ( "/api/v2/logs" ) ;
382+ then. status ( 500 ) ;
383+ } ) ;
384+
385+ // Bound the test so the buggy (unbounded) implementation fails fast
386+ // instead of hanging the suite.
387+ let result = tokio:: time:: timeout (
388+ Duration :: from_secs ( 3 ) ,
389+ Flusher :: send ( build_request ( & server, Duration :: from_secs ( 2 ) ) ) ,
390+ )
391+ . await
392+ . expect ( "send must respect FLUSH_RETRY_COUNT and not loop forever on non-success" ) ;
393+
394+ assert ! (
395+ result. is_err( ) ,
396+ "send should return Err after exhausting retries on persistent 5xx"
397+ ) ;
398+ mock. assert_hits ( FLUSH_RETRY_COUNT ) ;
399+ }
400+
401+ #[ tokio:: test]
402+ async fn send_bounds_retries_on_4xx_status ( ) {
403+ let server = MockServer :: start ( ) ;
404+ let mock = server. mock ( |when, then| {
405+ when. method ( POST ) . path ( "/api/v2/logs" ) ;
406+ then. status ( 404 ) ;
407+ } ) ;
408+
409+ let result = tokio:: time:: timeout (
410+ Duration :: from_secs ( 3 ) ,
411+ Flusher :: send ( build_request ( & server, Duration :: from_secs ( 2 ) ) ) ,
412+ )
413+ . await
414+ . expect ( "send must respect FLUSH_RETRY_COUNT on 4xx (e.g. misrouted OPW URL)" ) ;
415+
416+ assert ! (
417+ result. is_err( ) ,
418+ "persistent 4xx should bound at FLUSH_RETRY_COUNT"
419+ ) ;
420+ mock. assert_hits ( FLUSH_RETRY_COUNT ) ;
421+ }
422+
423+ #[ tokio:: test]
424+ async fn send_bounds_retries_on_transport_error ( ) {
425+ let server = MockServer :: start ( ) ;
426+ // Server holds the response longer than the client timeout so every
427+ // attempt resolves to a reqwest timeout error (the `Err` branch).
428+ let mock = server. mock ( |when, then| {
429+ when. method ( POST ) . path ( "/api/v2/logs" ) ;
430+ then. status ( 200 ) . delay ( Duration :: from_secs ( 5 ) ) ;
431+ } ) ;
432+
433+ let result = tokio:: time:: timeout (
434+ Duration :: from_secs ( 3 ) ,
435+ Flusher :: send ( build_request ( & server, Duration :: from_millis ( 50 ) ) ) ,
436+ )
437+ . await
438+ . expect ( "send must respect FLUSH_RETRY_COUNT on transport errors" ) ;
439+
440+ assert ! (
441+ result. is_err( ) ,
442+ "send should return Err after exhausting retries on transport timeout"
443+ ) ;
444+ mock. assert_hits ( FLUSH_RETRY_COUNT ) ;
445+ }
446+
447+ /// Gap #2: when a transient failure clears, the loop must actually retry
448+ /// and succeed — not treat the first non-2xx as a permanent failure.
449+ /// Without this test, a refactor that early-returns on the first failed
450+ /// status would still pass the bounded-retry tests above.
451+ ///
452+ /// Uses an axum-based stateful mock because httpmock 0.7 matchers cannot
453+ /// capture state (their predicate type is `fn`, not `Fn`).
454+ #[ tokio:: test]
455+ async fn send_succeeds_on_retry_after_transient_failure ( ) {
456+ use axum:: { Router , http:: StatusCode , routing:: post} ;
457+ use std:: sync:: atomic:: AtomicUsize ;
458+
459+ let counter = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
460+ let app = Router :: new ( ) . route (
461+ "/api/v2/logs" ,
462+ post ( {
463+ let counter = Arc :: clone ( & counter) ;
464+ move || {
465+ let counter = Arc :: clone ( & counter) ;
466+ async move {
467+ // First attempt → 500 (transient), all later attempts → 200.
468+ let n = counter. fetch_add ( 1 , Ordering :: SeqCst ) ;
469+ if n == 0 {
470+ StatusCode :: INTERNAL_SERVER_ERROR
471+ } else {
472+ StatusCode :: OK
473+ }
474+ }
475+ }
476+ } ) ,
477+ ) ;
478+
479+ let listener = tokio:: net:: TcpListener :: bind ( "127.0.0.1:0" )
480+ . await
481+ . expect ( "test should be able to bind a local port" ) ;
482+ let addr = listener
483+ . local_addr ( )
484+ . expect ( "bound listener should have a local addr" ) ;
485+ tokio:: spawn ( async move {
486+ axum:: serve ( listener, app)
487+ . await
488+ . expect ( "test server should run cleanly" ) ;
489+ } ) ;
490+
491+ let req = reqwest:: Client :: new ( )
492+ . post ( format ! ( "http://{addr}/api/v2/logs" ) )
493+ . timeout ( Duration :: from_secs ( 2 ) )
494+ . body ( "test" ) ;
495+
496+ let result = Flusher :: send ( req) . await ;
497+
498+ assert ! (
499+ result. is_ok( ) ,
500+ "send must retry past a transient failure and succeed on a later attempt"
501+ ) ;
502+ assert_eq ! (
503+ counter. load( Ordering :: SeqCst ) ,
504+ 2 ,
505+ "expected exactly one failed attempt + one successful retry"
506+ ) ;
507+ }
508+
509+ /// Gap #1: after retry exhaustion, the returned error must carry a
510+ /// re-issuable request so `Flusher::flush` can re-queue it for the next
511+ /// flush cycle. A refactor that loses or corrupts the request would
512+ /// silently turn a transient failure into permanent data loss.
513+ #[ tokio:: test]
514+ async fn failed_request_error_carries_replayable_request ( ) {
515+ let server = MockServer :: start ( ) ;
516+ let mock = server. mock ( |when, then| {
517+ when. method ( POST ) . path ( "/api/v2/logs" ) ;
518+ then. status ( 500 ) ;
519+ } ) ;
520+
521+ let err = tokio:: time:: timeout (
522+ Duration :: from_secs ( 3 ) ,
523+ Flusher :: send ( build_request ( & server, Duration :: from_secs ( 2 ) ) ) ,
524+ )
525+ . await
526+ . expect ( "send must terminate" )
527+ . expect_err ( "expected Err after retry exhaustion" ) ;
528+
529+ let failed = err
530+ . downcast_ref :: < FailedRequestError > ( )
531+ . expect ( "error must be downcastable to FailedRequestError so flush() can re-queue it" ) ;
532+
533+ let cloned = failed
534+ . request
535+ . try_clone ( )
536+ . expect ( "FailedRequestError.request must be cloneable for redrive" ) ;
537+
538+ // Re-issue the stashed request and confirm it actually reaches the
539+ // server — proves the request is intact, not a corrupted shell.
540+ let response = cloned
541+ . send ( )
542+ . await
543+ . expect ( "re-issued request should be sendable" ) ;
544+ assert_eq ! ( response. status( ) . as_u16( ) , 500 ) ;
545+
546+ // FLUSH_RETRY_COUNT attempts during send + 1 from the re-issue above.
547+ mock. assert_hits ( FLUSH_RETRY_COUNT + 1 ) ;
548+ }
549+ }
0 commit comments