Skip to content

Commit 70269c0

Browse files
test(rate-limiter): add multi-instance toggle-journey test for wipe-on-disable
Extends the journey coverage to the production fan-out shape: multiple plugin instances sharing one Redis URL, exercising the distributed lock under realistic flow. What this adds beyond the existing tests: - test_wipe_is_idempotent_under_concurrent_shutdown (#5) already proves N parallel shutdowns converge cleanly, but with seeded counters and no re-enable phase. - test_full_toggle_journey_enforce_disable_reenforce proves the full enforce → disable → re-enable transition composes, but with a single plugin instance, so the lock isn't exercised under real flow. This test is the composition of both — three plugin instances each saturate a different user (alice/bob/carol) in parallel via real tool_pre_invoke calls, the parallel shutdown under mode=disabled must converge to all rl:* keys deleted (exactly one wipes, two skip via the lock without raising), and three fresh plugin instances must all observe a fresh window for their respective users. The unique contract this test pins: a regression that wiped only the user owned by the lock-winning plugin would pass test #5 but fail Phase 3 here, where every user must get a fresh window. Window stays at 3/m for the same reason as the single-instance journey — a 4th-request-blocked assertion in Phase 1 makes the "first request passes" assertion in Phase 3 non-vacuous. Signed-off-by: Pratik Gandhi <gandhipratik203@gmail.com>
1 parent f5dffce commit 70269c0

1 file changed

Lines changed: 95 additions & 0 deletions

File tree

plugins/tests/rate_limiter/test_redis_integration.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1903,3 +1903,98 @@ async def test_full_toggle_journey_enforce_disable_reenforce(self, redis_url_for
19031903
"stale counter state — this is the user-visible promise of "
19041904
"wipe-on-disable"
19051905
)
1906+
1907+
@pytest.mark.asyncio
1908+
async def test_full_toggle_journey_across_multiple_instances(self, redis_url_for_integration):
1909+
"""Multi-instance toggle journey: N plugins simulate the production fan-out.
1910+
1911+
At the cpex-plugins layer, "multiple gateway pods" and "multiple workers
1912+
per pod" collapse to a single knob — N independent plugin instances
1913+
sharing one Redis URL. This test runs N=3 concurrent instances, each
1914+
handling a different user, then drives the full enforce → disable →
1915+
re-enable cycle to assert:
1916+
1917+
* the parallel disable triggers exactly one wipe (the others skip via
1918+
the lock), and every ``rl:*`` key disappears;
1919+
* every user gets a fresh window after re-enable — not just the user
1920+
whose plugin happened to win the wipe race.
1921+
1922+
Test #5 (``test_wipe_is_idempotent_under_concurrent_shutdown``) already
1923+
proves N parallel shutdowns converge to a clean state, but with seeded
1924+
counters and no re-enable phase. The single-instance journey
1925+
(``test_full_toggle_journey_enforce_disable_reenforce``) proves the
1926+
transition composes, but with one plugin so the distributed lock isn't
1927+
exercised under realistic flow. This test is the composition of both —
1928+
real traffic-generated state across N instances, real distributed
1929+
coordination during the wipe, and the post-re-enable fresh-window
1930+
assertion across every user.
1931+
1932+
Window sized at 3/m for the same reason as the single-instance journey
1933+
— the test must never straddle a second boundary, so a passing Phase 3
1934+
request is unambiguous evidence of the wipe.
1935+
"""
1936+
await _flush_redis(redis_url_for_integration)
1937+
1938+
users = ("alice", "bob", "carol")
1939+
payload = ToolPreInvokePayload(name="t", arguments={})
1940+
1941+
# ── Phase 1: enforce — all three instances saturate their users in parallel ──
1942+
plugins = [_make_redis_plugin(redis_url_for_integration, limit="3/m") for _ in users]
1943+
contexts = [
1944+
PluginContext(global_context=GlobalContext(request_id=f"r-{u}", user=u))
1945+
for u in users
1946+
]
1947+
1948+
async def _saturate(plugin, ctx, user):
1949+
for i in range(3):
1950+
r = await plugin.tool_pre_invoke(payload, ctx)
1951+
assert r.continue_processing, f"{user}: request {i + 1}/3 must pass"
1952+
blocked = await plugin.tool_pre_invoke(payload, ctx)
1953+
assert blocked.continue_processing is False, (
1954+
f"{user}: 4th request must be blocked before disable"
1955+
)
1956+
1957+
await asyncio.gather(*(
1958+
_saturate(p, c, u) for p, c, u in zip(plugins, contexts, users)
1959+
))
1960+
1961+
keys_pre_disable = await _keys_in_redis(redis_url_for_integration, "rl:*")
1962+
for u in users:
1963+
assert any(u in k for k in keys_pre_disable), (
1964+
f"{u}'s counter key should exist before disable; got {keys_pre_disable}"
1965+
)
1966+
1967+
# ── Phase 2: parallel disable — exactly one wipes, all keys gone ──
1968+
await _set_plugin_mode_key(redis_url_for_integration, "RateLimiter", "disabled")
1969+
1970+
results = await asyncio.gather(
1971+
*(p.shutdown() for p in plugins),
1972+
return_exceptions=True,
1973+
)
1974+
for i, r in enumerate(results):
1975+
assert not isinstance(r, BaseException), (
1976+
f"plugin[{i}].shutdown() raised under concurrent toggle — got {r!r}"
1977+
)
1978+
1979+
keys_post_wipe = await _keys_in_redis(redis_url_for_integration, "rl:*")
1980+
assert keys_post_wipe == [], (
1981+
"concurrent shutdown under mode=disabled must converge to all rl:* "
1982+
f"keys deleted — expected [], got {keys_post_wipe}"
1983+
)
1984+
1985+
# ── Phase 3: re-enforce — every user gets a fresh window ──
1986+
# Fresh plugin instances mirror what each worker's plugin manager does
1987+
# on re-enable.
1988+
await _set_plugin_mode_key(redis_url_for_integration, "RateLimiter", "enforce")
1989+
fresh_plugins = [_make_redis_plugin(redis_url_for_integration, limit="3/m") for _ in users]
1990+
1991+
async def _first_request_passes(plugin, ctx, user):
1992+
r = await plugin.tool_pre_invoke(payload, ctx)
1993+
assert r.continue_processing, (
1994+
f"{user} was saturated pre-disable; after the wipe + re-enable, "
1995+
f"{user}'s first request must NOT be blocked"
1996+
)
1997+
1998+
await asyncio.gather(*(
1999+
_first_request_passes(p, c, u) for p, c, u in zip(fresh_plugins, contexts, users)
2000+
))

0 commit comments

Comments
 (0)