Skip to content

Commit 504e837

Browse files
fix(rate-limiter): bound Redis connection acquisition with a 2s timeout
Without a time-bound on ``Client::get_multiplexed_tokio_connection().await``, the rate-limiter blocks indefinitely against any Redis endpoint where the TCP handshake completes but the application layer never speaks (e.g. plain ``redis://`` against a TLS-required server, network ACL drops, firewalled cluster, etc.). Every request through the plugin then pays the framework's outer 30-second plugin timeout, and ``fail_mode`` cannot engage because the connection-acquisition future never returns to surface an error. This commit wraps the connection acquisition in ``tokio::time::timeout(Duration::from_secs(2), …)`` and maps the elapsed error into a ``redis::ErrorKind::IoError``-shaped ``RedisError`` so the existing ``fail_mode`` path routes it the same way as any other connection-side failure. Test coverage added at both layers: * Rust unit test (``redis_backend::tests::connection_async_fails_fast_against_hanging_redis``) — binds a TCP listener that accepts but never reads/writes; asserts ``connection_async`` returns within ~3s with an ``IoError``-shaped error. * Python integration test (``TestRedisFailModeAndViolationContext::test_hanging_redis_fails_fast_via_connect_timeout``) — same setup pattern at the public-API layer; asserts ``tool_pre_invoke`` completes within ~5s and the default ``fail_mode=open`` allows the request through. Both tests fail-by-hang against the prior implementation and pass against this commit; an outer ``tokio::time::timeout`` / ``asyncio.wait_for`` guards each so a regression doesn't hang the test run. The 2-second value is hardcoded as ``CONNECT_TIMEOUT`` for now to keep the change small. Promoting it to a ``redis_connect_timeout_ms`` config key (extending the existing config-key list and warning machinery) is a trivial follow-up if operators in slow-network deployments need a longer budget. Signed-off-by: Pratik Gandhi <gandhipratik203@gmail.com>
1 parent a028dae commit 504e837

2 files changed

Lines changed: 165 additions & 1 deletion

File tree

plugins/rust/python-package/rate_limiter/src/redis_backend.rs

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
use std::cmp::max;
1717
use std::sync::OnceLock;
1818
use std::sync::atomic::{AtomicU64, Ordering};
19+
use std::time::Duration;
1920

2021
use parking_lot::Mutex;
2122
use redis::aio::MultiplexedConnection;
2223
use tokio::runtime::{Builder, Runtime};
24+
use tokio::time::timeout;
2325

2426
use crate::config::Algorithm;
2527
use crate::types::DimResult;
@@ -219,7 +221,31 @@ impl RedisRateLimiter {
219221
}
220222
}
221223

222-
let conn = self.client.get_multiplexed_tokio_connection().await?;
224+
// Bound the connection-acquisition. Without this, a Redis endpoint
225+
// that accepts TCP but never responds at the application layer
226+
// (plain ``redis://`` against a TLS-required server, a network ACL
227+
// dropping post-handshake bytes, etc.) hangs the call indefinitely;
228+
// the existing fail_mode path cannot engage because the call never
229+
// returns to surface an error. Mapping the timeout into a
230+
// RedisError lets the caller's fail_mode logic route this exactly
231+
// like any other connection-side failure.
232+
const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
233+
let conn = timeout(
234+
CONNECT_TIMEOUT,
235+
self.client.get_multiplexed_tokio_connection(),
236+
)
237+
.await
238+
.map_err(|_elapsed| {
239+
redis::RedisError::from((
240+
redis::ErrorKind::IoError,
241+
"connection timeout",
242+
format!(
243+
"redis connection acquisition exceeded {:?}",
244+
CONNECT_TIMEOUT,
245+
),
246+
))
247+
})??;
248+
223249
let mut conn_guard = self.conn.lock();
224250
if let Some(existing) = conn_guard.as_ref() {
225251
return Ok(existing.clone());
@@ -542,6 +568,84 @@ impl RedisRateLimiter {
542568
#[cfg(test)]
543569
mod tests {
544570
use super::RedisRateLimiter;
571+
use crate::config::Algorithm;
572+
use std::time::{Duration, Instant};
573+
574+
/// `connection_async` must time out within a bounded window when the
575+
/// Redis endpoint accepts TCP but never speaks at the application layer.
576+
///
577+
/// Test setup: bind a TCP listener but never call `accept()` to read or
578+
/// write any bytes. The kernel completes the TCP three-way handshake
579+
/// into its accept queue; the redis crate's
580+
/// `get_multiplexed_tokio_connection` sends its initial handshake bytes
581+
/// and waits for a response that never comes.
582+
///
583+
/// The outer `tokio::time::timeout(5s)` is the test's runaway-guard so
584+
/// a regression doesn't hang the test run. Asserts:
585+
/// * `connection_async` returns within ~3 seconds (well under the
586+
/// 5s guard).
587+
/// * The returned error is `IoError`-shaped, so the existing
588+
/// `fail_mode` path can route it the same way as any other
589+
/// connection-side failure.
590+
#[test]
591+
fn connection_async_fails_fast_against_hanging_redis() {
592+
use std::net::TcpListener;
593+
594+
let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
595+
let hang_addr = listener.local_addr().expect("local_addr").to_string();
596+
let url = format!("redis://{}/0", hang_addr);
597+
598+
let limiter = RedisRateLimiter::new(&url, Algorithm::FixedWindow, "rl".to_string())
599+
.expect("client should construct (lazy connection)");
600+
601+
let runtime = tokio::runtime::Builder::new_current_thread()
602+
.enable_all()
603+
.build()
604+
.expect("tokio runtime");
605+
606+
let started = Instant::now();
607+
let result: Result<Result<_, redis::RedisError>, tokio::time::error::Elapsed> = runtime
608+
.block_on(async {
609+
tokio::time::timeout(Duration::from_secs(5), limiter.connection_async()).await
610+
});
611+
let elapsed = started.elapsed();
612+
613+
// The outer 5s tokio::time::timeout is the test's runaway-guard.
614+
// It firing is the bug shape (hang). We want the inner Result
615+
// to be available — i.e., connection_async must have returned of
616+
// its own accord well before 5s.
617+
let inner = result.expect(
618+
"connection_async hung against a TCP-accepted-but-app-hangs Redis — \
619+
expected an explicit connection timeout error from the redis client \
620+
well before the 5s test bound; instead the call never returned.",
621+
);
622+
623+
assert!(
624+
elapsed < Duration::from_secs(3),
625+
"connection_async must fail fast on a hanging Redis (≤3s) — took {:?}. \
626+
Without a connection time-bound, the existing fail_mode path can't \
627+
trigger because the call never returns at all.",
628+
elapsed,
629+
);
630+
631+
let err = inner.expect_err(
632+
"connection_async should error against a hanging Redis (server never \
633+
completes the redis handshake), not return Ok",
634+
);
635+
// The error category should be IO/timeout shaped, not a config or
636+
// protocol-level error — pins the contract that the timeout maps
637+
// into the existing fail_mode path.
638+
let kind = err.kind();
639+
assert!(
640+
matches!(
641+
kind,
642+
redis::ErrorKind::IoError | redis::ErrorKind::ResponseError
643+
),
644+
"expected IoError-shaped timeout error from connection_async; got {:?}: {}",
645+
kind,
646+
err,
647+
);
648+
}
545649

546650
#[test]
547651
fn token_bucket_success_reset_uses_time_to_full() {

plugins/tests/rate_limiter/test_redis_integration.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1540,6 +1540,66 @@ async def test_allowed_request_metadata_does_not_carry_identity(self, redis_url_
15401540
f"allowed response must not carry tenant_id in metadata; got metadata={metadata!r}"
15411541
)
15421542

1543+
@pytest.mark.asyncio
1544+
async def test_hanging_redis_fails_fast_via_connect_timeout(self):
1545+
"""``tool_pre_invoke`` must complete within a bounded window when the
1546+
configured Redis endpoint accepts TCP but never responds at the
1547+
application layer — and the default ``fail_mode=open`` should allow
1548+
the request once the connection-attempt times out.
1549+
1550+
Test setup: bind a socket on an ephemeral port, put it in
1551+
``listen()``, but never ``accept()`` to read or write any bytes.
1552+
The kernel completes the TCP handshake into its accept queue; the
1553+
plugin's connection attempt sees TCP open but never gets a server
1554+
response on top.
1555+
1556+
The other tests in this class cover the *unreachable* case (TCP
1557+
connection refused), which errors out cleanly in milliseconds.
1558+
This test covers the *hanging* case, which is qualitatively
1559+
different: the connection attempt never returns without an
1560+
explicit time-bound on the redis-side call.
1561+
1562+
``asyncio.wait_for`` with a 10s ceiling is the test's runaway-guard
1563+
so a regression doesn't hang the suite. Asserts:
1564+
* The hook completes within ~5 seconds (well under the 10s guard).
1565+
* ``result.continue_processing is True`` — the default
1566+
``fail_mode=open`` allows the request once the connection-attempt
1567+
times out.
1568+
"""
1569+
# Standard
1570+
import asyncio # noqa: PLC0415
1571+
import socket # noqa: PLC0415
1572+
import time # noqa: PLC0415
1573+
1574+
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1575+
listener.bind(("127.0.0.1", 0))
1576+
listener.listen(1)
1577+
hang_port = listener.getsockname()[1]
1578+
1579+
try:
1580+
plugin = _make_redis_plugin(f"redis://127.0.0.1:{hang_port}/0")
1581+
ctx = PluginContext(global_context=GlobalContext(request_id="r1", user="alice"))
1582+
payload = ToolPreInvokePayload(name="t", arguments={})
1583+
1584+
t0 = time.monotonic()
1585+
result = await asyncio.wait_for(
1586+
plugin.tool_pre_invoke(payload, ctx),
1587+
timeout=10.0,
1588+
)
1589+
elapsed = time.monotonic() - t0
1590+
1591+
assert elapsed < 5.0, (
1592+
f"plugin must fail fast on a hanging Redis (≤5s) — without a "
1593+
f"connection timeout in the redis client this would hang until "
1594+
f"the framework's outer 30-second timeout fires. Took {elapsed:.2f}s."
1595+
)
1596+
assert result.continue_processing is True, (
1597+
f"fail_mode=open default must allow the request when Redis hangs "
1598+
f"during connection acquisition; got {result!r}"
1599+
)
1600+
finally:
1601+
listener.close()
1602+
15431603

15441604
class TestConfigHardening:
15451605
"""Config hardening (G13, G15).

0 commit comments

Comments
 (0)