Skip to content

Commit d6ebbc5

Browse files
authored
feat(rate-limiting): implement dynamic concurrency adjustment based on GitHub API rate limit status (#66)
feat(rate-limiting): implement dynamic concurrency adjustment based on GitHub API rate limit status (#66)
2 parents e90dbf0 + c579d43 commit d6ebbc5

4 files changed

Lines changed: 186 additions & 35 deletions

File tree

forklet/core/concurrency_manager.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,27 @@ def get_stats(self) -> ConcurrencyStats:
146146
def is_busy(self) -> bool:
147147
"""Check if there are active tasks."""
148148
return len(self._active_tasks) > 0
149+
150+
def update_max_concurrent(self, new_max_concurrent: int) -> None:
151+
"""
152+
Update the maximum number of concurrent operations.
153+
154+
This method should be called when no tasks are currently executing
155+
to avoid breaking the semaphore's invariants.
156+
157+
Args:
158+
new_max_concurrent: The new maximum number of concurrent operations
159+
"""
160+
if self.is_busy():
161+
logger.warning(
162+
f"Cannot update max_concurrent while {len(self._active_tasks)} tasks are active"
163+
)
164+
return
165+
166+
if new_max_concurrent <= 0:
167+
raise ValueError("max_concurrent must be positive")
168+
169+
self.max_concurrent = new_max_concurrent
170+
# Replace the semaphore with a new one having the updated value
171+
self._semaphore = asyncio.Semaphore(new_max_concurrent)
172+
logger.debug(f"Updated max_concurrent to {new_max_concurrent}")

forklet/core/orchestrator.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ def __init__(
5656
self._current_result = None
5757
self._active_tasks = self.concurrency_manager._active_tasks
5858

59+
# Set up rate limit callback to adjust concurrency based on API rate limit status
60+
self.github_service.rate_limiter.set_rate_limit_callback(
61+
self._on_rate_limit_update
62+
)
63+
5964
@property
6065
def is_cancelled(self) -> bool:
6166
"""Check if the orchestrator is cancelled (backwards compatibility)."""
@@ -444,3 +449,38 @@ def reset_state(self) -> None:
444449
self._is_paused = self.state_controller.is_paused
445450
self._current_result = None
446451
self._active_tasks = self.concurrency_manager._active_tasks
452+
453+
def _on_rate_limit_update(self, rate_limit_info: RateLimitInfo) -> None:
454+
"""
455+
Adjust download concurrency based on GitHub API rate limit status.
456+
457+
This is called by the rate limiter when rate limit information is updated.
458+
It dynamically adjusts the concurrency level to stay within rate limits.
459+
"""
460+
# Calculate a safe concurrency level based on remaining rate limit
461+
# We want to leave enough buffer for other API calls (repo info, etc.)
462+
# Assume we need ~10 API calls per download operation (simplified)
463+
api_calls_per_download = 10
464+
465+
if rate_limit_info.remaining < api_calls_per_download * 2:
466+
# Very low rate limit - reduce concurrency significantly
467+
new_max_concurrent = max(
468+
1, rate_limit_info.remaining // (api_calls_per_download * 2)
469+
)
470+
elif rate_limit_info.remaining < api_calls_per_download * 10:
471+
# Moderate rate limit - reduce concurrency somewhat
472+
new_max_concurrent = max(
473+
1, rate_limit_info.remaining // (api_calls_per_download * 2)
474+
)
475+
else:
476+
# Plenty of rate limit - use configured maximum
477+
new_max_concurrent = self._max_concurrent_downloads
478+
479+
# Apply the new concurrency level if it's different
480+
if new_max_concurrent != self.concurrency_manager.max_concurrent:
481+
logger.debug(
482+
f"Adjusting download concurrency from {self.concurrency_manager.max_concurrent} "
483+
f"to {new_max_concurrent} based on rate limit "
484+
f"({rate_limit_info.remaining}/{rate_limit_info.limit} remaining)"
485+
)
486+
self.concurrency_manager.update_max_concurrent(new_max_concurrent)

forklet/infrastructure/rate_limiter.py

Lines changed: 43 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,34 @@
33
"""
44

55
import time
6-
import asyncio
6+
import asyncio
77
from typing import Optional, Dict
88
from dataclasses import dataclass
99
from datetime import datetime
1010
import random
11+
from typing import Callable, Optional
1112

1213
from forklet.infrastructure.logger import logger
1314

15+
1416
####
1517
## RATE LIMIT INFO
1618
#####
1719
@dataclass
1820
class RateLimitInfo:
1921
"""Rate limit information from GitHub API headers."""
20-
22+
2123
limit: int = 5000
2224
remaining: int = 5000
2325
reset_time: Optional[datetime] = None
2426
used: int = 0
25-
27+
2628
@property
2729
def is_exhausted(self) -> bool:
2830
"""Check if rate limit is exhausted."""
2931

3032
return self.remaining <= 10 # Keep a small buffer
31-
33+
3234
@property
3335
def reset_in_seconds(self) -> float:
3436
"""Get seconds until rate limit resets."""
@@ -44,15 +46,12 @@ def reset_in_seconds(self) -> float:
4446
class RateLimiter:
4547
"""
4648
Async rate limiter for GitHub API requests.
47-
49+
4850
Handles both primary and secondary rate limits with exponential backoff.
4951
"""
50-
52+
5153
def __init__(
52-
self,
53-
default_delay: float = 1.0,
54-
max_delay: float = 60.0,
55-
adaptive: bool = True
54+
self, default_delay: float = 1.0, max_delay: float = 60.0, adaptive: bool = True
5655
):
5756
self.default_delay = default_delay
5857
self.max_delay = max_delay
@@ -61,40 +60,47 @@ def __init__(
6160
self._last_request = 0.0
6261
self._rate_limit_info = RateLimitInfo()
6362
self._consecutive_limits = 0
64-
63+
self._rate_limit_callback: Optional[Callable[[RateLimitInfo], None]] = None
64+
65+
def set_rate_limit_callback(
66+
self, callback: Callable[[RateLimitInfo], None]
67+
) -> None:
68+
"""Set a callback to be invoked when rate limit information is updated."""
69+
self._rate_limit_callback = callback
70+
self._rate_limit_callback: Optional[Callable[[RateLimitInfo], None]] = None
71+
self._rate_limit_callback: Optional[Callable[[RateLimitInfo], None]] = None
72+
6573
async def acquire(self) -> None:
6674
"""Acquire rate limit permission."""
6775

6876
async with self._lock:
6977
current_time = time.time()
70-
78+
7179
# Check if we need to wait due to rate limiting
7280
if self._rate_limit_info.is_exhausted:
7381
wait_time = self._rate_limit_info.reset_in_seconds
7482
if wait_time > 0:
75-
logger.warning(
76-
f"Rate limit exhausted, waiting {wait_time:.1f}s"
77-
)
83+
logger.warning(f"Rate limit exhausted, waiting {wait_time:.1f}s")
7884
await asyncio.sleep(wait_time)
79-
85+
8086
# Adaptive delay based on rate limit status
8187
delay = self._calculate_adaptive_delay(current_time)
82-
88+
8389
if delay > 0:
8490
await asyncio.sleep(delay)
85-
91+
8692
self._last_request = time.time()
87-
93+
8894
def _calculate_adaptive_delay(self, current_time: float) -> float:
8995
"""Calculate adaptive delay based on rate limit status."""
9096

9197
if not self.adaptive:
9298
return self.default_delay
93-
99+
94100
# Base delay from last request
95101
elapsed = current_time - self._last_request
96102
base_delay = max(0, self.default_delay - elapsed)
97-
103+
98104
# Adjust based on remaining rate limit
99105
if self._rate_limit_info.remaining < 100:
100106
# Very low remaining calls - be more conservative
@@ -111,43 +117,45 @@ def _calculate_adaptive_delay(self, current_time: float) -> float:
111117
else:
112118
# Plenty of calls remaining
113119
multiplier = 1.0
114-
120+
115121
# Add jitter to prevent thundering herd
116122
jitter = random.uniform(0.8, 1.2)
117-
123+
118124
final_delay = min(base_delay * multiplier * jitter, self.max_delay)
119125
return final_delay
120-
126+
121127
async def update_rate_limit_info(self, headers: Dict[str, str]) -> None:
122128
"""Update rate limit information from API response headers."""
123-
129+
124130
async with self._lock:
125131
try:
126132
self._rate_limit_info.limit = int(
127-
headers.get('x-ratelimit-limit', 5000)
133+
headers.get("x-ratelimit-limit", 5000)
128134
)
129135
self._rate_limit_info.remaining = int(
130-
headers.get('x-ratelimit-remaining', 5000)
136+
headers.get("x-ratelimit-remaining", 5000)
131137
)
132-
self._rate_limit_info.used = int(
133-
headers.get('x-ratelimit-used', 0)
134-
)
135-
136-
reset_timestamp = headers.get('x-ratelimit-reset')
138+
self._rate_limit_info.used = int(headers.get("x-ratelimit-used", 0))
139+
140+
reset_timestamp = headers.get("x-ratelimit-reset")
137141
if reset_timestamp:
138142
self._rate_limit_info.reset_time = datetime.fromtimestamp(
139143
int(reset_timestamp)
140144
)
141-
145+
142146
# Track consecutive rate limit hits
143147
if self._rate_limit_info.is_exhausted:
144148
self._consecutive_limits += 1
145149
else:
146150
self._consecutive_limits = 0
147-
151+
152+
# Invoke callback if set
153+
if self._rate_limit_callback:
154+
self._rate_limit_callback(self._rate_limit_info)
155+
148156
except (ValueError, KeyError) as e:
149157
logger.warning(f"Failed to parse rate limit headers: {e}")
150-
158+
151159
@property
152160
def rate_limit_info(self) -> RateLimitInfo:
153161
"""Get current rate limit information."""

forklet/services/github_api.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,85 @@ def __init__(
4545
self.retry_manager = retry_manager
4646
self.auth_token = auth_token
4747
self.timeout = timeout
48+
self._concurrency_adjustment_callback: Optional[
49+
Callable[[RateLimitInfo], None]
50+
] = None
51+
# Set up rate limit callback to adjust concurrency
52+
self.rate_limiter.set_rate_limit_callback(self._on_rate_limit_update)
53+
54+
def set_external_rate_limit_callback(
55+
self, callback: Callable[[RateLimitInfo], None]
56+
) -> None:
57+
"""Set an external callback to be invoked when rate limit information is updated."""
58+
self._external_rate_limit_callback = callback
59+
60+
def _on_rate_limit_update(self, rate_limit_info: RateLimitInfo) -> None:
61+
"""Internal callback for rate limit updates - adjusts download concurrency based on rate limit status."""
62+
# This is a placeholder - in a full implementation, this would adjust the
63+
# concurrency settings in the DownloadOrchestrator based on rate limit status
64+
# For now, we just log when we're getting low on rate limits
65+
if rate_limit_info.remaining < 100:
66+
logger.warning(
67+
f"GitHub API rate limit low: {rate_limit_info.remaining}/{rate_limit_info.limit} remaining"
68+
)
69+
elif rate_limit_info.is_exhausted:
70+
logger.warning(
71+
f"GitHub API rate limit exhausted: {rate_limit_info.remaining}/{rate_limit_info.limit} remaining"
72+
)
73+
74+
# Configure HTTP client
75+
headers = {"Accept": "application/vnd.github.v3+json", "User-Agent": USER_AGENT}
76+
77+
if auth_token:
78+
headers["Authorization"] = f"token {auth_token}"
79+
80+
self.http_client = httpx.AsyncClient(
81+
headers=headers, timeout=httpx.Timeout(timeout)
82+
)
83+
84+
# Sync client for PyGithub (used only for metadata)
85+
self.github_client = (
86+
Github(
87+
auth_token, retry=self.retry_manager.max_retries, user_agent=USER_AGENT
88+
)
89+
if auth_token
90+
else Github(retry=self.retry_manager.max_retries, user_agent=USER_AGENT)
91+
)
92+
93+
async def update_rate_limit_info(self, headers: Dict[str, str]) -> None:
94+
"""Update rate limit information from API response headers."""
95+
96+
async with self._lock:
97+
try:
98+
self._rate_limit_info.limit = int(
99+
headers.get("x-ratelimit-limit", 5000)
100+
)
101+
self._rate_limit_info.remaining = int(
102+
headers.get("x-ratelimit-remaining", 5000)
103+
)
104+
self._rate_limit_info.used = int(headers.get("x-ratelimit-used", 0))
105+
106+
reset_timestamp = headers.get("x-ratelimit-reset")
107+
if reset_timestamp:
108+
self._rate_limit_info.reset_time = datetime.fromtimestamp(
109+
int(reset_timestamp)
110+
)
111+
112+
# Track consecutive rate limit hits
113+
if self._rate_limit_info.is_exhausted:
114+
self._consecutive_limits += 1
115+
else:
116+
self._consecutive_limits = 0
117+
118+
# Invoke internal callback if set
119+
if self._rate_limit_callback:
120+
self._rate_limit_callback(self._rate_limit_info)
121+
# Invoke external callback if set
122+
if self._external_rate_limit_callback:
123+
self._external_rate_limit_callback(self._rate_limit_info)
124+
125+
except (ValueError, KeyError) as e:
126+
logger.warning(f"Failed to parse rate limit headers: {e}")
48127

49128
# Configure HTTP client
50129
headers = {"Accept": "application/vnd.github.v3+json", "User-Agent": USER_AGENT}

0 commit comments

Comments
 (0)