feat(retry): migrate to Rust-majority#91
Conversation
Signed-off-by: Madhu Mohan Jaishankar <madhu.mohan.jaishankar@ibm.com>
Signed-off-by: Madhu Mohan Jaishankar <madhu.mohan.jaishankar@ibm.com>
…t tests Signed-off-by: Madhu Mohan Jaishankar <madhu.mohan.jaishankar@ibm.com>
bf4a968 to
d8fd06b
Compare
…bridge Signed-off-by: Madhu Mohan Jaishankar <madhu.mohan.jaishankar@ibm.com>
| try: | ||
| from mcpgateway.plugins.framework import ( | ||
| Plugin, | ||
| PluginConfig, | ||
| PluginContext, | ||
| ResourcePostFetchPayload, | ||
| ResourcePostFetchResult, | ||
| ToolPostInvokePayload, | ||
| ToolPostInvokeResult, | ||
| ) | ||
| except ModuleNotFoundError: | ||
| # Fallback for testing without mcpgateway | ||
| class Plugin: # type: ignore[no-redef] | ||
| def __init__(self, config) -> None: | ||
| self.config = config | ||
|
|
||
| class PluginConfig: # type: ignore[no-redef] | ||
| def __init__(self, **kwargs) -> None: | ||
| for k, v in kwargs.items(): | ||
| setattr(self, k, v) | ||
|
|
||
| PluginContext = object # type: ignore[misc,assignment] | ||
| ToolPostInvokePayload = object # type: ignore[misc,assignment] | ||
| ToolPostInvokeResult = object # type: ignore[misc,assignment] | ||
| ResourcePostFetchPayload = object # type: ignore[misc,assignment] | ||
| ResourcePostFetchResult = object # type: ignore[misc,assignment] |
| pub fn from_py_dict(dict: &Bound<'_, PyDict>) -> PyResult<Self> { | ||
| // Extract fields manually to handle Python types | ||
| let max_retries = dict | ||
| .get_item("max_retries")? | ||
| .and_then(|v| v.extract::<u32>().ok()) | ||
| .unwrap_or_else(default_max_retries); | ||
|
|
||
| let backoff_base_ms = dict | ||
| .get_item("backoff_base_ms")? | ||
| .and_then(|v| v.extract::<u64>().ok()) | ||
| .unwrap_or_else(default_backoff_base_ms); | ||
|
|
||
| let max_backoff_ms = dict | ||
| .get_item("max_backoff_ms")? | ||
| .and_then(|v| v.extract::<u64>().ok()) | ||
| .unwrap_or_else(default_max_backoff_ms); | ||
|
|
||
| let retry_on_status = dict | ||
| .get_item("retry_on_status")? | ||
| .and_then(|v| v.extract::<Vec<i32>>().ok()) | ||
| .unwrap_or_else(default_retry_on_status); | ||
|
|
||
| let jitter = dict | ||
| .get_item("jitter")? | ||
| .and_then(|v| v.extract::<bool>().ok()) | ||
| .unwrap_or_else(default_jitter); | ||
|
|
||
| let check_text_content = dict | ||
| .get_item("check_text_content")? | ||
| .and_then(|v| v.extract::<bool>().ok()) | ||
| .unwrap_or(false); |
| /// Get configuration for a specific tool, applying overrides if present | ||
| pub fn get_tool_config(&self, tool_name: &str) -> Self { | ||
| if let Some(override_cfg) = self.tool_overrides.get(tool_name) { | ||
| Self { | ||
| max_retries: override_cfg.max_retries.unwrap_or(self.max_retries), | ||
| backoff_base_ms: override_cfg.backoff_base_ms.unwrap_or(self.backoff_base_ms), | ||
| max_backoff_ms: override_cfg.max_backoff_ms.unwrap_or(self.max_backoff_ms), | ||
| retry_on_status: override_cfg | ||
| .retry_on_status | ||
| .clone() | ||
| .unwrap_or_else(|| self.retry_on_status.clone()), | ||
| jitter: override_cfg.jitter.unwrap_or(self.jitter), | ||
| check_text_content: self.check_text_content, | ||
| tool_overrides: HashMap::new(), // Don't nest overrides | ||
| } |
| pub backoff_base_ms: Option<u64>, | ||
| pub max_backoff_ms: Option<u64>, | ||
| pub retry_on_status: Option<Vec<i32>>, | ||
| pub jitter: Option<bool>, |
| let retry_status_set = config.retry_on_status_set(); | ||
|
|
| description: "High-performance retry policy engine with exponential backoff, jitter, per-tool overrides, and retry metadata for transient tool and resource failures - Rust-majority architecture (95% Rust / 5% Python)" | ||
| author: "ContextForge Contributors" | ||
| version: "0.3.1" | ||
| version: "0.3.0" | ||
| kind: "cpex_retry_with_backoff.retry_with_backoff.RetryWithBackoffPlugin" |
| [package] | ||
| name = "retry_with_backoff" | ||
| version = "0.3.1" | ||
| version = "0.3.0" | ||
| edition.workspace = true |
| from cpex_retry_with_backoff.retry_with_backoff import RetryWithBackoffPlugin | ||
|
|
||
| def __getattr__(name: str): | ||
| if name in {"RetryConfig", "RetryWithBackoffPlugin"}: | ||
| from cpex_retry_with_backoff.retry_with_backoff import RetryConfig, RetryWithBackoffPlugin | ||
|
|
||
| exports = { | ||
| "RetryConfig": RetryConfig, | ||
| "RetryWithBackoffPlugin": RetryWithBackoffPlugin, | ||
| } | ||
| return exports[name] | ||
| raise AttributeError(f"module {__name__!r} has no attribute {name!r}") | ||
|
|
||
|
|
||
| __all__ = ["RetryConfig", "RetryWithBackoffPlugin"] | ||
| __all__ = ["RetryWithBackoffPlugin"] No newline at end of file |
| import logging | ||
| import time | ||
| import uuid | ||
| from dataclasses import dataclass | ||
| from pathlib import Path | ||
| from unittest.mock import MagicMock, patch | ||
|
|
| let tool_overrides = dict | ||
| .get_item("tool_overrides")? | ||
| .and_then(|v| { | ||
| v.cast::<PyDict>() | ||
| .ok() | ||
| .and_then(|d| parse_tool_overrides(d).ok()) | ||
| }) | ||
| .unwrap_or_default(); |
There was a problem hiding this comment.
@madhu-mohan-jaishankar mcpgateway.plugins.framework should not exists anywhere in your code we are on versions relying solely on cpex. version should be bumped up not down.
Signed-off-by: Madhu Mohan Jaishankar <madhu.mohan.jaishankar@ibm.com>
|
@lucarlig addressed review comments. Framework import ( Version config.rs
plugin.rs
Cargo.toml init.py test_integration.py |
…ction Signed-off-by: Madhu Mohan Jaishankar <madhu.mohan.jaishankar@ibm.com>
| // Check structuredContent; track presence to gate text content check | ||
| let has_structured_content = result_dict.get_item("structuredContent")?.is_some(); | ||
| if let Some(structured) = result_dict.get_item("structuredContent")? |
|
|
||
| for (key, value) in dict.iter() { | ||
| let tool_name = key.extract::<String>()?; | ||
| let override_dict = value.cast::<PyDict>()?; |
| fn compute_delay_ms(attempt: u32, base_ms: u64, max_ms: u64, jitter: bool) -> u64 { | ||
| let ceiling = base_ms | ||
| .saturating_mul(2u64.saturating_pow(attempt)) | ||
| .min(max_ms); | ||
| if jitter { | ||
| rand::thread_rng().gen_range(0..=ceiling) | ||
| } else { | ||
| ceiling | ||
| } |
| mutants = "0" | ||
| pyo3.workspace = true | ||
| pyo3-log.workspace = true | ||
| pyo3-stub-gen.workspace = true | ||
| rand.workspace = true | ||
| serde = { workspace = true, features = ["derive"] } | ||
| serde_json.workspace = true |
| description: "High-performance retry policy engine with exponential backoff, jitter, per-tool overrides, and retry metadata for transient tool and resource failures - Rust-majority architecture (95% Rust / 5% Python)" | ||
| author: "ContextForge Contributors" | ||
| version: "0.3.1" | ||
| version: "0.3.2" |
| def __init__(self, config: PluginConfig) -> None: | ||
| super().__init__(config) | ||
| raw_cfg = RetryConfig(**(config.config or {})) | ||
|
|
||
| ceiling = getattr(get_settings(), "max_tool_retries", raw_cfg.max_retries) | ||
| if raw_cfg.max_retries > ceiling: | ||
| log.warning( | ||
| "retry_with_backoff: max_retries=%d exceeds gateway ceiling=%d, clamping", | ||
| raw_cfg.max_retries, | ||
| ceiling, | ||
| ) | ||
| raw_cfg = raw_cfg.model_copy(update={"max_retries": ceiling}) | ||
|
|
||
| for tool_name, overrides in raw_cfg.tool_overrides.items(): | ||
| if overrides.get("max_retries", 0) > ceiling: | ||
| log.warning( | ||
| "retry_with_backoff: tool_overrides[%s].max_retries=%d exceeds ceiling=%d, clamping", | ||
| tool_name, | ||
| overrides["max_retries"], | ||
| ceiling, | ||
| ) | ||
| overrides["max_retries"] = ceiling | ||
|
|
||
| self._cfg = raw_cfg | ||
| self._rust = RetryStateManager( | ||
| self._cfg.max_retries, | ||
| self._cfg.backoff_base_ms, | ||
| self._cfg.max_backoff_ms, | ||
| self._cfg.jitter, | ||
| self._cfg.retry_on_status, | ||
| ) | ||
| self._rust_overrides = { | ||
| tool_name: RetryStateManager( | ||
| overrides.get("max_retries", self._cfg.max_retries), | ||
| overrides.get("backoff_base_ms", self._cfg.backoff_base_ms), | ||
| overrides.get("max_backoff_ms", self._cfg.max_backoff_ms), | ||
| overrides.get("jitter", self._cfg.jitter), | ||
| overrides.get("retry_on_status", self._cfg.retry_on_status), | ||
| ) | ||
| for tool_name, overrides in self._cfg.tool_overrides.items() | ||
| } | ||
|
|
||
| def to_rust_native_policy(self, tool_name: str, ceiling: int) -> Optional[dict[str, Any]]: | ||
| raw_cfg = RetryConfig(**(self.config.config or {})) | ||
| cfg = _cfg_for(raw_cfg, tool_name) | ||
| if cfg.max_retries > ceiling: | ||
| cfg = cfg.model_copy(update={"max_retries": ceiling}) | ||
|
|
||
| if cfg.check_text_content: | ||
| return None | ||
|
|
||
| return { | ||
| "kind": "retry_with_backoff", | ||
| "maxRetries": int(cfg.max_retries), | ||
| "backoffBaseMs": int(cfg.backoff_base_ms), | ||
| "maxBackoffMs": int(cfg.max_backoff_ms), | ||
| "retryOnStatus": list(cfg.retry_on_status), | ||
| "jitter": bool(cfg.jitter), | ||
| } | ||
| self._core = RetryWithBackoffPluginCore(config.config or {}) | ||
| log.info("retry_with_backoff: Initialized with Rust core (v0.3.0)") |
| """Delegate to Rust core for resource post-fetch processing.""" | ||
| return self._core.resource_post_fetch(payload, context) | ||
|
|
||
|
|
||
| __all__ = [ | ||
| "RetryConfig", | ||
| "RetryWithBackoffPlugin", | ||
| "RetryStateManager", | ||
| "_STATE", | ||
| "_STATE_TTL_SECONDS", | ||
| "_ToolRetryState", | ||
| "_cfg_for", | ||
| "_compute_delay_ms", | ||
| "_del_state", | ||
| "_get_state", | ||
| "_is_failure", | ||
| ] | ||
| __all__ = ["RetryWithBackoffPlugin"] No newline at end of file |
There was a problem hiding this comment.
I think a few minor issues need fixing before merge:
-
Gateway retry ceiling is no longer enforced.
RetryWithBackoffPlugin.__init__now passes config straight into Rust, while Rust only capsmax_retries <= 10. Deployments with a lower gatewaymax_tool_retriescan now retry more than policy allows. Please restore ceiling clamping for both global config and tool overrides. -
structuredContent: Nonenow skips text-content retry detection.
The new Rust path treats key presence as structured content, socheck_text_content=Truewill not parse retryable JSON text whenstructuredContentis explicitlyNone. Previous behavior parsed text in that case. Please treat missing and PythonNonethe same, and only suppress text parsing for non-null structured content. -
Retry logic now exists in two Rust cores.
RetryStateManagerinsrc/lib.rsandRetryWithBackoffPluginCoreinsrc/plugin.rsnow both own state/TTL/delay/failure-classification logic. That creates likely divergence for future fixes. Please consolidate shared retry policy logic or make one path call the other. -
Stale eviction scans all retry state on every failed invocation.
Each failure callsevict_stale, which retains over the full state map. Under many failed in-flight requests this makes failure handling O(n) per invocation. Please switch to periodic/bounded eviction or another approach that does not scan the full map on every failure.
Migrate
retry_with_backoffto Rust-Majority Architecture (v0.3.1 → v0.3.2)Related Issues
Closes #50
Overview
This PR completes the migration of the
retry_with_backoffplugin from a hybrid Python/Rust architecture to a Rust-majority architecture (95% Rust / 5% Python), following the established patterns frompii_filterandsecrets_detectionplugins.Motivation
The previous hybrid architecture split retry logic across Python and Rust, creating maintenance overhead and limiting performance. This migration:
Changes
New Rust Components
src/config.rs(332 lines)PyDictmax_retries ≤ 10,backoff_base_ms > 0, etc.)src/plugin.rs(356 lines)RetryWithBackoffPluginCorestruct with PyO3 bindingstool_post_invoke()implementationresource_post_fetch()implementationUpdated Python Components
retry_with_backoff.py(276 lines → 35 lines)pii_filter)Removed
test_compat.pyRetryConfigPydantic model_ToolRetryStatedataclass_is_failure()function_compute_delay_ms()function_get_state()/_del_state()functions_cfg_for()functionTest Suite Cleanup
Removed Obsolete Tests (24 tests)
TestRustNativePolicyto_rust_native_policy()methodTestPluginInitget_settings()and gateway ceiling clampingTestRustFallbackTestGetState.test_ttl_eviction_ToolRetryStateclassTestCfgForTestIsFailuretestsRewrote Tests (10 tests)
TestComputeDelayMsRetryStateManager.compute_delay()TestIsFailureRetryStateManager.check_failure()Results
Architecture Comparison
Before (Hybrid)
After (Rust-Majority)
Breaking Changes
None — full backward compatibility maintained:
Dependencies
Added:
serde(withderivefeature)serde_jsoncpex_framework_bridgeRemoved:
pydantic(no longer needed)Performance
Expected improvements over previous implementations:
Checklist
clippywarningscargo fmt)