Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion plugins/rust/python-package/rate_limiter/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rate_limiter"
version = "0.0.6"
version = "0.0.7"
edition.workspace = true
authors.workspace = true
license.workspace = true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
description: "Rate limiting by user/tenant/tool — memory (single-process) or Redis (shared across instances)"
author: "ContextForge Contributors"
version: "0.0.6"
version: "0.0.7"
kind: "cpex_rate_limiter.rate_limiter.RateLimiterPlugin"
available_hooks:
- "prompt_pre_fetch"
Expand Down
119 changes: 118 additions & 1 deletion plugins/rust/python-package/rate_limiter/src/redis_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
use std::cmp::max;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;

use parking_lot::Mutex;
use redis::aio::MultiplexedConnection;
use tokio::runtime::{Builder, Runtime};
use tokio::time::timeout;

use crate::config::Algorithm;
use crate::types::DimResult;
Expand Down Expand Up @@ -219,7 +221,43 @@ impl RedisRateLimiter {
}
}

let conn = self.client.get_multiplexed_tokio_connection().await?;
// Bound the connection-acquisition. Without this, a Redis endpoint
// that accepts TCP but never responds at the application layer
// (plain ``redis://`` against a TLS-required server, a network ACL
// dropping post-handshake bytes, etc.) hangs the call indefinitely;
// the existing fail_mode path cannot engage because the call never
// returns to surface an error. Mapping the timeout into a
// RedisError lets the caller's fail_mode logic route this exactly
// like any other connection-side failure.
//
// Hardcoded rather than promoted to a config key to keep the
// plugin's config surface small — operators rarely tune this knob
// and adding it for the few who might need it expands the schema
// for everyone else. Two seconds is comfortable headroom for
// typical production paths (intra-VPC and cross-AZ Redis well
// under 100 ms; managed Redis with TLS handshake adds ~100-300 ms
// on top). If a deployment with deliberately slow networks
// surfaces and 2 s becomes too tight, promote this into the
// ``lib.rs`` defaults + the engine's KNOWN config-key list — the
// existing config-validation machinery (defaults, unknown-key
// warning) handles the rest cleanly.
const CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
let conn = timeout(
CONNECT_TIMEOUT,
self.client.get_multiplexed_tokio_connection(),
)
.await
.map_err(|_elapsed| {
redis::RedisError::from((
redis::ErrorKind::IoError,
"connection timeout",
format!(
"redis connection acquisition exceeded {:?}",
CONNECT_TIMEOUT,
),
))
})??;

let mut conn_guard = self.conn.lock();
if let Some(existing) = conn_guard.as_ref() {
return Ok(existing.clone());
Expand Down Expand Up @@ -542,6 +580,85 @@ impl RedisRateLimiter {
#[cfg(test)]
mod tests {
use super::RedisRateLimiter;
use crate::config::Algorithm;
use std::time::{Duration, Instant};

/// `connection_async` must time out within a bounded window when the
/// Redis endpoint accepts TCP but never speaks at the application layer.
///
/// Test setup: bind a TCP listener but never call `accept()` to read or
/// write any bytes. The kernel completes the TCP three-way handshake
/// into its accept queue; the redis crate's
/// `get_multiplexed_tokio_connection` sends its initial handshake bytes
/// and waits for a response that never comes.
///
/// The outer `tokio::time::timeout(5s)` is the test's runaway-guard so
/// a regression doesn't hang the test run. Asserts:
/// * `connection_async` returns within ~3 seconds (well under the
/// 5s guard).
/// * The returned error is `IoError`-shaped, so the existing
/// `fail_mode` path can route it the same way as any other
/// connection-side failure.
#[test]
fn connection_async_fails_fast_against_hanging_redis() {
use std::net::TcpListener;

let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
let hang_addr = listener.local_addr().expect("local_addr").to_string();
let url = format!("redis://{}/0", hang_addr);

let limiter = RedisRateLimiter::new(&url, Algorithm::FixedWindow, "rl".to_string())
.expect("client should construct (lazy connection)");

let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio runtime");

let started = Instant::now();
let result: Result<Result<_, redis::RedisError>, tokio::time::error::Elapsed> = runtime
.block_on(async {
tokio::time::timeout(Duration::from_secs(5), limiter.connection_async()).await
});
let elapsed = started.elapsed();

// The outer 5s tokio::time::timeout is the test's runaway-guard.
// It firing is the bug shape (hang). We want the inner Result
// to be available — i.e., connection_async must have returned of
// its own accord well before 5s.
let inner = result.expect(
"connection_async hung against a TCP-accepted-but-app-hangs Redis — \
expected an explicit connection timeout error from the redis client \
well before the 5s test bound; instead the call never returned.",
);

assert!(
elapsed < Duration::from_secs(3),
"connection_async must fail fast on a hanging Redis (≤3s) — took {:?}. \
Without a connection time-bound, the existing fail_mode path can't \
trigger because the call never returns at all.",
elapsed,
);

let err = inner.expect_err(
"connection_async should error against a hanging Redis (server never \
completes the redis handshake), not return Ok",
);
// Pin the exact contract: the connection-acquisition timeout maps
// into ``redis::ErrorKind::IoError``, the same shape the existing
// ``fail_mode`` path routes for any other connection-side failure.
// Anything else (ResponseError, ClientError, ...) would mean the
// timeout is being surfaced through a different code path than
// the rest of the fail-mode logic and would silently break the
// operator's fail-open / fail-closed policy.
assert_eq!(
err.kind(),
redis::ErrorKind::IoError,
"expected IoError-shaped timeout error from connection_async; got {:?}: {}",
err.kind(),
err,
);
}

#[test]
fn token_bucket_success_reset_uses_time_to_full() {
Expand Down
133 changes: 133 additions & 0 deletions plugins/tests/rate_limiter/test_redis_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1540,6 +1540,139 @@ async def test_allowed_request_metadata_does_not_carry_identity(self, redis_url_
f"allowed response must not carry tenant_id in metadata; got metadata={metadata!r}"
)

@pytest.mark.asyncio
async def test_hanging_redis_fails_fast_via_connect_timeout(self):
"""``tool_pre_invoke`` must complete within a bounded window when the
configured Redis endpoint accepts TCP but never responds at the
application layer — and the default ``fail_mode=open`` should allow
the request once the connection-attempt times out.

Test setup: bind a socket on an ephemeral port, put it in
``listen()``, but never ``accept()`` to read or write any bytes.
The kernel completes the TCP handshake into its accept queue; the
plugin's connection attempt sees TCP open but never gets a server
response on top.

The other tests in this class cover the *unreachable* case (TCP
connection refused), which errors out cleanly in milliseconds.
This test covers the *hanging* case, which is qualitatively
different: the connection attempt never returns without an
explicit time-bound on the redis-side call.

``asyncio.wait_for`` with a 10s ceiling is the test's runaway-guard
so a regression doesn't hang the suite. Asserts:
* The hook completes within ~5 seconds (well under the 10s guard).
* ``result.continue_processing is True`` — the default
``fail_mode=open`` allows the request once the connection-attempt
times out.
"""
# Standard
import asyncio # noqa: PLC0415
import socket # noqa: PLC0415
import time # noqa: PLC0415

listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.bind(("127.0.0.1", 0))
listener.listen(1)
hang_port = listener.getsockname()[1]

try:
plugin = _make_redis_plugin(f"redis://127.0.0.1:{hang_port}/0")
ctx = PluginContext(global_context=GlobalContext(request_id="r1", user="alice"))
payload = ToolPreInvokePayload(name="t", arguments={})

t0 = time.monotonic()
result = await asyncio.wait_for(
plugin.tool_pre_invoke(payload, ctx),
timeout=10.0,
)
elapsed = time.monotonic() - t0

assert elapsed < 5.0, (
f"plugin must fail fast on a hanging Redis (≤5s) — without a "
f"connection timeout in the redis client this would hang until "
f"the framework's outer 30-second timeout fires. Took {elapsed:.2f}s."
)
assert result.continue_processing is True, (
f"fail_mode=open default must allow the request when Redis hangs "
f"during connection acquisition; got {result!r}"
)
finally:
listener.close()

@pytest.mark.asyncio
async def test_hanging_redis_with_fail_mode_closed_blocks_with_backend_unavailable(self):
"""``tool_pre_invoke`` with ``fail_mode=closed`` must complete within
a bounded window when the configured Redis endpoint accepts TCP but
never responds at the application layer — and must surface a
``BACKEND_UNAVAILABLE`` violation (HTTP 503 + ``Retry-After``)
rather than hanging until the framework's outer 30-second timeout
fires.

Companion to ``test_hanging_redis_fails_fast_via_connect_timeout``
which covers the default ``fail_mode=open`` branch. Together the
two pin both halves of the operator's policy contract under the
hanging-Redis failure shape: open allows the request, closed blocks
with the documented error envelope.

Test setup matches the open-mode test: bind a TCP listener but
never ``accept()`` to read or write any bytes. ``asyncio.wait_for``
is the runaway-guard so a regression doesn't hang the suite.
"""
# Standard
import asyncio # noqa: PLC0415
import socket # noqa: PLC0415
import time # noqa: PLC0415

listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.bind(("127.0.0.1", 0))
listener.listen(1)
hang_port = listener.getsockname()[1]

try:
plugin = _make_redis_plugin_with_config(
f"redis://127.0.0.1:{hang_port}/0",
{"fail_mode": "closed"},
)
ctx = PluginContext(global_context=GlobalContext(request_id="r1", user="alice"))
payload = ToolPreInvokePayload(name="t", arguments={})

t0 = time.monotonic()
result = await asyncio.wait_for(
plugin.tool_pre_invoke(payload, ctx),
timeout=10.0,
)
elapsed = time.monotonic() - t0

assert elapsed < 5.0, (
f"plugin must fail fast on a hanging Redis (≤5s) regardless "
f"of fail_mode; without the connection-acquisition timeout this "
f"would hang until the framework's outer 30-second timeout fires. "
f"Took {elapsed:.2f}s."
)
assert result.continue_processing is False, (
f"fail_mode=closed must block the request when Redis hangs "
f"during connection acquisition; got {result!r}"
)
assert result.violation is not None, (
f"fail_mode=closed must produce a violation when the backend "
f"is unreachable; got {result!r}"
)
assert result.violation.code == "BACKEND_UNAVAILABLE", (
f"expected violation.code='BACKEND_UNAVAILABLE', "
f"got {result.violation.code!r}"
)
assert result.violation.http_status_code == 503, (
f"expected http_status_code=503, "
f"got {result.violation.http_status_code!r}"
)
assert "Retry-After" in result.violation.http_headers, (
f"expected Retry-After header on the violation; "
f"got headers={result.violation.http_headers!r}"
)
finally:
listener.close()


class TestConfigHardening:
"""Config hardening (G13, G15).
Expand Down
Loading