Skip to content

Commit f90e12f

Browse files
charliegilletclaudestepmikhaylov
authored andcommitted
fix(nodes): add rate limiting to tool_http_request node (#560)
* fix(nodes): add configurable rate limiting to tool_http_request node Without rate limiting, the HTTP request node can be abused to flood external APIs or exhaust server resources. This adds a token-bucket rate limiter with three configurable limits: per-second, per-minute, and max concurrent requests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(nodes): address PR #560 review feedback for rate limiter - Fix critical bug: check semaphore BEFORE consuming tokens so rejected requests don't wastefully consume rate-limit tokens. Release semaphore if token check fails afterward. - Extract _config_int() utility with min_value/max_value clamping params from inline _int_or_default helper in IGlobal._build_rate_limiter. - Allow _build_rate_limiter to return None when all three rate-limit knobs are explicitly set to 0 (opt-out path). - Remove unused RateLimitError import with noqa:F401 from http_driver.py. - Add unit tests covering: normal acquire/release, per-second enforcement, per-minute enforcement, semaphore exhaustion, token restoration on semaphore rejection, and thread safety. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * chore(nodes): move tool_http_request tests --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: stepmik <stepmikhaylov@yandex.ru>
1 parent d242abc commit f90e12f

6 files changed

Lines changed: 407 additions & 13 deletions

File tree

nodes/src/nodes/tool_http_request/IGlobal.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
HTTP Request tool node - global (shared) state.
2626
2727
Reads config and stores security guardrails (allowed methods + URL whitelist)
28-
for IInstance tool methods.
28+
and rate limiter for IInstance tool methods.
2929
"""
3030

3131
from __future__ import annotations
@@ -34,6 +34,32 @@
3434
from ai.common.config import Config
3535
from rocketlib import IGlobalBase, OPEN_MODE, warning
3636

37+
from .rate_limiter import DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_PER_MINUTE, DEFAULT_MAX_PER_SECOND, RateLimiter
38+
39+
40+
def _config_int(cfg: dict, key: str, default: int, *, min_value: int | None = None, max_value: int | None = None) -> int:
41+
"""Read an integer from *cfg*, falling back to *default*.
42+
43+
Returns *default* when the key is missing, non-numeric, or <= 0.
44+
The result is clamped to [min_value, max_value] when those bounds are given.
45+
"""
46+
raw = cfg.get(key)
47+
if raw is None:
48+
val = default
49+
else:
50+
try:
51+
val = int(raw)
52+
if val <= 0:
53+
val = default
54+
except (TypeError, ValueError):
55+
val = default
56+
if min_value is not None:
57+
val = max(val, min_value)
58+
if max_value is not None:
59+
val = min(val, max_value)
60+
return val
61+
62+
3763
_METHOD_FLAGS = {
3864
'GET': 'allowGET',
3965
'POST': 'allowPOST',
@@ -50,13 +76,15 @@ class IGlobal(IGlobalBase):
5076

5177
enabled_methods: set[str] | None = None
5278
url_patterns: list[re.Pattern] | None = None
79+
rate_limiter: RateLimiter | None = None
5380

5481
def beginGlobal(self) -> None:
5582
if self.IEndpoint.endpoint.openMode == OPEN_MODE.CONFIG:
5683
return
5784

5885
cfg = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig)
5986
self.enabled_methods, self.url_patterns = self._build_guardrails(cfg)
87+
self.rate_limiter = self._build_rate_limiter(cfg)
6088

6189
@staticmethod
6290
def _build_guardrails(cfg: dict) -> tuple[set[str], list[re.Pattern]]:
@@ -89,6 +117,35 @@ def _build_guardrails(cfg: dict) -> tuple[set[str], list[re.Pattern]]:
89117

90118
return enabled, patterns
91119

120+
@staticmethod
121+
def _build_rate_limiter(cfg: dict) -> RateLimiter | None:
122+
"""Create a ``RateLimiter`` from the node configuration.
123+
124+
Returns ``None`` when all three rate-limit knobs are explicitly set to
125+
``0`` (i.e. the user has opted out of rate limiting).
126+
"""
127+
raw_ps = cfg.get('rateLimitPerSecond')
128+
raw_pm = cfg.get('rateLimitPerMinute')
129+
raw_mc = cfg.get('maxConcurrentRequests')
130+
131+
# If all three are explicitly set to 0, disable rate limiting entirely.
132+
def _is_zero(raw: object) -> bool:
133+
if raw is None:
134+
return False
135+
try:
136+
return int(raw) == 0
137+
except (TypeError, ValueError):
138+
return False
139+
140+
if _is_zero(raw_ps) and _is_zero(raw_pm) and _is_zero(raw_mc):
141+
return None
142+
143+
return RateLimiter(
144+
max_per_second=_config_int(cfg, 'rateLimitPerSecond', DEFAULT_MAX_PER_SECOND, min_value=1),
145+
max_per_minute=_config_int(cfg, 'rateLimitPerMinute', DEFAULT_MAX_PER_MINUTE, min_value=1),
146+
max_concurrent=_config_int(cfg, 'maxConcurrentRequests', DEFAULT_MAX_CONCURRENT, min_value=1),
147+
)
148+
92149
def validateConfig(self) -> None:
93150
try:
94151
cfg = Config.getNodeConfig(self.glb.logicalType, self.glb.connConfig)
@@ -105,3 +162,4 @@ def validateConfig(self) -> None:
105162
def endGlobal(self) -> None:
106163
self.enabled_methods = set()
107164
self.url_patterns = []
165+
self.rate_limiter = None

nodes/src/nodes/tool_http_request/IInstance.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -128,16 +128,25 @@ def http_request(self, args):
128128
# Validate guardrails from config
129129
self._validate_guardrails(args)
130130

131-
return execute_request(
132-
url=args.get('url', ''),
133-
method=args.get('method', 'GET'),
134-
query_params=args.get('query_params'),
135-
path_params=args.get('path_params'),
136-
headers=args.get('headers'),
137-
auth=args.get('auth'),
138-
body=args.get('body'),
139-
timeout=args.get('timeout'),
140-
)
131+
# Enforce rate limits before executing the request
132+
rate_limiter = self.IGlobal.rate_limiter
133+
if rate_limiter is not None:
134+
rate_limiter.acquire()
135+
136+
try:
137+
return execute_request(
138+
url=args.get('url', ''),
139+
method=args.get('method', 'GET'),
140+
query_params=args.get('query_params'),
141+
path_params=args.get('path_params'),
142+
headers=args.get('headers'),
143+
auth=args.get('auth'),
144+
body=args.get('body'),
145+
timeout=args.get('timeout'),
146+
)
147+
finally:
148+
if rate_limiter is not None:
149+
rate_limiter.release()
141150

142151
def _validate_guardrails(self, args):
143152
"""Enforce allowed methods + URL whitelist from config."""

nodes/src/nodes/tool_http_request/http_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ def execute_request(
5656
5757
Raises ``requests.RequestException`` on transport-level failures.
5858
"""
59-
6059
resolved_url = _resolve_path_params(url, path_params)
6160

6261
req_headers = dict(headers or {})
@@ -95,6 +94,7 @@ def execute_request(
9594
# Internal helpers
9695
# ---------------------------------------------------------------------------
9796

97+
9898
def _resolve_path_params(url: str, path_params: Optional[Dict[str, str]]) -> str:
9999
"""Replace ``:name`` placeholders in the URL with values from *path_params*."""
100100
if not path_params:
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
# =============================================================================
2+
# MIT License
3+
# Copyright (c) 2024 RocketRide Inc.
4+
#
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy
6+
# of this software and associated documentation files (the "Software"), to deal
7+
# in the Software without restriction, including without limitation the rights
8+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
# copies of the Software, and to permit persons to whom the Software is
10+
# furnished to do so, subject to the following conditions:
11+
#
12+
# The above copyright notice and this permission notice shall be included in
13+
# all copies or substantial portions of the Software.
14+
#
15+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
# SOFTWARE.
22+
# =============================================================================
23+
24+
"""
25+
Token-bucket rate limiter with concurrency control for HTTP requests.
26+
27+
Enforces three independent limits:
28+
- requests per second (token bucket, refills every second)
29+
- requests per minute (token bucket, refills every minute)
30+
- max concurrent requests (semaphore)
31+
32+
All limits are configurable via services.json; sensible defaults are provided.
33+
Thread-safe: uses a single ``threading.Lock`` for the token buckets and a
34+
``threading.Semaphore`` for concurrency.
35+
"""
36+
37+
from __future__ import annotations
38+
39+
import threading
40+
import time
41+
42+
43+
class RateLimitError(Exception):
44+
"""Raised when a request is rejected due to rate limiting."""
45+
46+
47+
# Defaults used when services.json omits the fields.
48+
DEFAULT_MAX_PER_SECOND = 10
49+
DEFAULT_MAX_PER_MINUTE = 100
50+
DEFAULT_MAX_CONCURRENT = 5
51+
52+
53+
class RateLimiter:
54+
"""Token-bucket rate limiter with concurrent-request semaphore."""
55+
56+
def __init__(
57+
self,
58+
*,
59+
max_per_second: int = DEFAULT_MAX_PER_SECOND,
60+
max_per_minute: int = DEFAULT_MAX_PER_MINUTE,
61+
max_concurrent: int = DEFAULT_MAX_CONCURRENT,
62+
) -> None:
63+
"""Initialise token buckets and concurrency semaphore."""
64+
# --- per-second bucket ---
65+
self._ps_capacity = max(max_per_second, 1)
66+
self._ps_tokens = float(self._ps_capacity)
67+
self._ps_refill_rate = float(self._ps_capacity) # tokens / second
68+
69+
# --- per-minute bucket ---
70+
self._pm_capacity = max(max_per_minute, 1)
71+
self._pm_tokens = float(self._pm_capacity)
72+
self._pm_refill_rate = self._pm_capacity / 60.0 # tokens / second
73+
74+
self._last_refill = time.monotonic()
75+
self._lock = threading.Lock()
76+
77+
# --- concurrency semaphore ---
78+
self._max_concurrent = max(max_concurrent, 1)
79+
self._semaphore = threading.Semaphore(self._max_concurrent)
80+
81+
# ------------------------------------------------------------------
82+
# Public API
83+
# ------------------------------------------------------------------
84+
85+
def acquire(self) -> None:
86+
"""Acquire a rate-limit slot, or raise ``RateLimitError``."""
87+
# 1. Check concurrency limit first (non-blocking) so we never
88+
# consume tokens for a request that would be rejected anyway.
89+
if not self._semaphore.acquire(blocking=False):
90+
raise RateLimitError(f'Too many concurrent requests: max {self._max_concurrent} in-flight. Please wait for an ongoing request to complete.')
91+
92+
# 2. Check token buckets (per-second + per-minute).
93+
try:
94+
with self._lock:
95+
self._refill()
96+
if self._ps_tokens < 1.0:
97+
raise RateLimitError(f'Rate limit exceeded: max {self._ps_capacity} requests per second. Please retry after a short delay.')
98+
if self._pm_tokens < 1.0:
99+
raise RateLimitError(f'Rate limit exceeded: max {self._pm_capacity} requests per minute. Please retry after a short delay.')
100+
self._ps_tokens -= 1.0
101+
self._pm_tokens -= 1.0
102+
except RateLimitError:
103+
self._semaphore.release()
104+
raise
105+
106+
def release(self) -> None:
107+
"""Release the concurrency slot after a request completes."""
108+
self._semaphore.release()
109+
110+
# ------------------------------------------------------------------
111+
# Internals
112+
# ------------------------------------------------------------------
113+
114+
def _refill(self) -> None:
115+
"""Refill both token buckets based on elapsed time. Caller holds ``_lock``."""
116+
now = time.monotonic()
117+
elapsed = now - self._last_refill
118+
self._last_refill = now
119+
120+
self._ps_tokens = min(self._ps_capacity, self._ps_tokens + elapsed * self._ps_refill_rate)
121+
self._pm_tokens = min(self._pm_capacity, self._pm_tokens + elapsed * self._pm_refill_rate)

nodes/src/nodes/tool_http_request/services.json

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,13 +109,32 @@
109109
"type": "object",
110110
"properties": ["http_request.whitelistPattern"]
111111
}
112+
},
113+
114+
"http_request.rateLimitPerSecond": {
115+
"type": "number",
116+
"title": "Max requests per second",
117+
"description": "Maximum number of HTTP requests allowed per second. Uses a token-bucket algorithm for smooth enforcement.",
118+
"default": 10
119+
},
120+
"http_request.rateLimitPerMinute": {
121+
"type": "number",
122+
"title": "Max requests per minute",
123+
"description": "Maximum number of HTTP requests allowed per minute. Provides a broader throttle beyond the per-second limit.",
124+
"default": 100
125+
},
126+
"http_request.maxConcurrentRequests": {
127+
"type": "number",
128+
"title": "Max concurrent requests",
129+
"description": "Maximum number of HTTP requests that can be in-flight simultaneously.",
130+
"default": 5
112131
}
113132
},
114133
"shape": [
115134
{
116135
"section": "Pipe",
117136
"title": "HTTP Request",
118-
"properties": ["type", "http_request.allowGET", "http_request.allowPOST", "http_request.allowPUT", "http_request.allowPATCH", "http_request.allowDELETE", "http_request.allowHEAD", "http_request.allowOPTIONS", "http_request.urlWhitelist"]
137+
"properties": ["type", "http_request.allowGET", "http_request.allowPOST", "http_request.allowPUT", "http_request.allowPATCH", "http_request.allowDELETE", "http_request.allowHEAD", "http_request.allowOPTIONS", "http_request.urlWhitelist", "http_request.rateLimitPerSecond", "http_request.rateLimitPerMinute", "http_request.maxConcurrentRequests"]
119138
}
120139
]
121140
}

0 commit comments

Comments
 (0)