diff --git a/forklet/core/concurrency_manager.py b/forklet/core/concurrency_manager.py index f6d2769..0d3e560 100644 --- a/forklet/core/concurrency_manager.py +++ b/forklet/core/concurrency_manager.py @@ -146,3 +146,27 @@ def get_stats(self) -> ConcurrencyStats: def is_busy(self) -> bool: """Check if there are active tasks.""" return len(self._active_tasks) > 0 + + def update_max_concurrent(self, new_max_concurrent: int) -> None: + """ + Update the maximum number of concurrent operations. + + This method should be called when no tasks are currently executing + to avoid breaking the semaphore's invariants. + + Args: + new_max_concurrent: The new maximum number of concurrent operations + """ + if self.is_busy(): + logger.warning( + f"Cannot update max_concurrent while {len(self._active_tasks)} tasks are active" + ) + return + + if new_max_concurrent <= 0: + raise ValueError("max_concurrent must be positive") + + self.max_concurrent = new_max_concurrent + # Replace the semaphore with a new one having the updated value + self._semaphore = asyncio.Semaphore(new_max_concurrent) + logger.debug(f"Updated max_concurrent to {new_max_concurrent}") diff --git a/forklet/core/orchestrator.py b/forklet/core/orchestrator.py index e4d4073..411c486 100644 --- a/forklet/core/orchestrator.py +++ b/forklet/core/orchestrator.py @@ -56,6 +56,11 @@ def __init__( self._current_result = None self._active_tasks = self.concurrency_manager._active_tasks + # Set up rate limit callback to adjust concurrency based on API rate limit status + self.github_service.rate_limiter.set_rate_limit_callback( + self._on_rate_limit_update + ) + @property def is_cancelled(self) -> bool: """Check if the orchestrator is cancelled (backwards compatibility).""" @@ -444,3 +449,38 @@ def reset_state(self) -> None: self._is_paused = self.state_controller.is_paused self._current_result = None self._active_tasks = self.concurrency_manager._active_tasks + + def _on_rate_limit_update(self, rate_limit_info: RateLimitInfo) -> None: + """ + Adjust download concurrency based on GitHub API rate limit status. + + This is called by the rate limiter when rate limit information is updated. + It dynamically adjusts the concurrency level to stay within rate limits. + """ + # Calculate a safe concurrency level based on remaining rate limit + # We want to leave enough buffer for other API calls (repo info, etc.) + # Assume we need ~10 API calls per download operation (simplified) + api_calls_per_download = 10 + + if rate_limit_info.remaining < api_calls_per_download * 2: + # Very low rate limit - reduce concurrency significantly + new_max_concurrent = max( + 1, rate_limit_info.remaining // (api_calls_per_download * 2) + ) + elif rate_limit_info.remaining < api_calls_per_download * 10: + # Moderate rate limit - reduce concurrency somewhat + new_max_concurrent = max( + 1, rate_limit_info.remaining // (api_calls_per_download * 2) + ) + else: + # Plenty of rate limit - use configured maximum + new_max_concurrent = self._max_concurrent_downloads + + # Apply the new concurrency level if it's different + if new_max_concurrent != self.concurrency_manager.max_concurrent: + logger.debug( + f"Adjusting download concurrency from {self.concurrency_manager.max_concurrent} " + f"to {new_max_concurrent} based on rate limit " + f"({rate_limit_info.remaining}/{rate_limit_info.limit} remaining)" + ) + self.concurrency_manager.update_max_concurrent(new_max_concurrent) diff --git a/forklet/infrastructure/rate_limiter.py b/forklet/infrastructure/rate_limiter.py index 21f437f..cecc2eb 100644 --- a/forklet/infrastructure/rate_limiter.py +++ b/forklet/infrastructure/rate_limiter.py @@ -3,32 +3,34 @@ """ import time -import asyncio +import asyncio from typing import Optional, Dict from dataclasses import dataclass from datetime import datetime import random +from typing import Callable, Optional from forklet.infrastructure.logger import logger + #### ## RATE LIMIT INFO ##### @dataclass class RateLimitInfo: """Rate limit information from GitHub API headers.""" - + limit: int = 5000 remaining: int = 5000 reset_time: Optional[datetime] = None used: int = 0 - + @property def is_exhausted(self) -> bool: """Check if rate limit is exhausted.""" return self.remaining <= 10 # Keep a small buffer - + @property def reset_in_seconds(self) -> float: """Get seconds until rate limit resets.""" @@ -44,15 +46,12 @@ def reset_in_seconds(self) -> float: class RateLimiter: """ Async rate limiter for GitHub API requests. - + Handles both primary and secondary rate limits with exponential backoff. """ - + def __init__( - self, - default_delay: float = 1.0, - max_delay: float = 60.0, - adaptive: bool = True + self, default_delay: float = 1.0, max_delay: float = 60.0, adaptive: bool = True ): self.default_delay = default_delay self.max_delay = max_delay @@ -61,40 +60,47 @@ def __init__( self._last_request = 0.0 self._rate_limit_info = RateLimitInfo() self._consecutive_limits = 0 - + self._rate_limit_callback: Optional[Callable[[RateLimitInfo], None]] = None + + def set_rate_limit_callback( + self, callback: Callable[[RateLimitInfo], None] + ) -> None: + """Set a callback to be invoked when rate limit information is updated.""" + self._rate_limit_callback = callback + self._rate_limit_callback: Optional[Callable[[RateLimitInfo], None]] = None + self._rate_limit_callback: Optional[Callable[[RateLimitInfo], None]] = None + async def acquire(self) -> None: """Acquire rate limit permission.""" async with self._lock: current_time = time.time() - + # Check if we need to wait due to rate limiting if self._rate_limit_info.is_exhausted: wait_time = self._rate_limit_info.reset_in_seconds if wait_time > 0: - logger.warning( - f"Rate limit exhausted, waiting {wait_time:.1f}s" - ) + logger.warning(f"Rate limit exhausted, waiting {wait_time:.1f}s") await asyncio.sleep(wait_time) - + # Adaptive delay based on rate limit status delay = self._calculate_adaptive_delay(current_time) - + if delay > 0: await asyncio.sleep(delay) - + self._last_request = time.time() - + def _calculate_adaptive_delay(self, current_time: float) -> float: """Calculate adaptive delay based on rate limit status.""" if not self.adaptive: return self.default_delay - + # Base delay from last request elapsed = current_time - self._last_request base_delay = max(0, self.default_delay - elapsed) - + # Adjust based on remaining rate limit if self._rate_limit_info.remaining < 100: # Very low remaining calls - be more conservative @@ -111,43 +117,45 @@ def _calculate_adaptive_delay(self, current_time: float) -> float: else: # Plenty of calls remaining multiplier = 1.0 - + # Add jitter to prevent thundering herd jitter = random.uniform(0.8, 1.2) - + final_delay = min(base_delay * multiplier * jitter, self.max_delay) return final_delay - + async def update_rate_limit_info(self, headers: Dict[str, str]) -> None: """Update rate limit information from API response headers.""" - + async with self._lock: try: self._rate_limit_info.limit = int( - headers.get('x-ratelimit-limit', 5000) + headers.get("x-ratelimit-limit", 5000) ) self._rate_limit_info.remaining = int( - headers.get('x-ratelimit-remaining', 5000) + headers.get("x-ratelimit-remaining", 5000) ) - self._rate_limit_info.used = int( - headers.get('x-ratelimit-used', 0) - ) - - reset_timestamp = headers.get('x-ratelimit-reset') + self._rate_limit_info.used = int(headers.get("x-ratelimit-used", 0)) + + reset_timestamp = headers.get("x-ratelimit-reset") if reset_timestamp: self._rate_limit_info.reset_time = datetime.fromtimestamp( int(reset_timestamp) ) - + # Track consecutive rate limit hits if self._rate_limit_info.is_exhausted: self._consecutive_limits += 1 else: self._consecutive_limits = 0 - + + # Invoke callback if set + if self._rate_limit_callback: + self._rate_limit_callback(self._rate_limit_info) + except (ValueError, KeyError) as e: logger.warning(f"Failed to parse rate limit headers: {e}") - + @property def rate_limit_info(self) -> RateLimitInfo: """Get current rate limit information.""" diff --git a/forklet/services/github_api.py b/forklet/services/github_api.py index 06a257e..711d577 100644 --- a/forklet/services/github_api.py +++ b/forklet/services/github_api.py @@ -45,6 +45,85 @@ def __init__( self.retry_manager = retry_manager self.auth_token = auth_token self.timeout = timeout + self._concurrency_adjustment_callback: Optional[ + Callable[[RateLimitInfo], None] + ] = None + # Set up rate limit callback to adjust concurrency + self.rate_limiter.set_rate_limit_callback(self._on_rate_limit_update) + + def set_external_rate_limit_callback( + self, callback: Callable[[RateLimitInfo], None] + ) -> None: + """Set an external callback to be invoked when rate limit information is updated.""" + self._external_rate_limit_callback = callback + + def _on_rate_limit_update(self, rate_limit_info: RateLimitInfo) -> None: + """Internal callback for rate limit updates - adjusts download concurrency based on rate limit status.""" + # This is a placeholder - in a full implementation, this would adjust the + # concurrency settings in the DownloadOrchestrator based on rate limit status + # For now, we just log when we're getting low on rate limits + if rate_limit_info.remaining < 100: + logger.warning( + f"GitHub API rate limit low: {rate_limit_info.remaining}/{rate_limit_info.limit} remaining" + ) + elif rate_limit_info.is_exhausted: + logger.warning( + f"GitHub API rate limit exhausted: {rate_limit_info.remaining}/{rate_limit_info.limit} remaining" + ) + + # Configure HTTP client + headers = {"Accept": "application/vnd.github.v3+json", "User-Agent": USER_AGENT} + + if auth_token: + headers["Authorization"] = f"token {auth_token}" + + self.http_client = httpx.AsyncClient( + headers=headers, timeout=httpx.Timeout(timeout) + ) + + # Sync client for PyGithub (used only for metadata) + self.github_client = ( + Github( + auth_token, retry=self.retry_manager.max_retries, user_agent=USER_AGENT + ) + if auth_token + else Github(retry=self.retry_manager.max_retries, user_agent=USER_AGENT) + ) + + async def update_rate_limit_info(self, headers: Dict[str, str]) -> None: + """Update rate limit information from API response headers.""" + + async with self._lock: + try: + self._rate_limit_info.limit = int( + headers.get("x-ratelimit-limit", 5000) + ) + self._rate_limit_info.remaining = int( + headers.get("x-ratelimit-remaining", 5000) + ) + self._rate_limit_info.used = int(headers.get("x-ratelimit-used", 0)) + + reset_timestamp = headers.get("x-ratelimit-reset") + if reset_timestamp: + self._rate_limit_info.reset_time = datetime.fromtimestamp( + int(reset_timestamp) + ) + + # Track consecutive rate limit hits + if self._rate_limit_info.is_exhausted: + self._consecutive_limits += 1 + else: + self._consecutive_limits = 0 + + # Invoke internal callback if set + if self._rate_limit_callback: + self._rate_limit_callback(self._rate_limit_info) + # Invoke external callback if set + if self._external_rate_limit_callback: + self._external_rate_limit_callback(self._rate_limit_info) + + except (ValueError, KeyError) as e: + logger.warning(f"Failed to parse rate limit headers: {e}") # Configure HTTP client headers = {"Accept": "application/vnd.github.v3+json", "User-Agent": USER_AGENT}