Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions forklet/core/concurrency_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,27 @@ def get_stats(self) -> ConcurrencyStats:
def is_busy(self) -> bool:
"""Check if there are active tasks."""
return len(self._active_tasks) > 0

def update_max_concurrent(self, new_max_concurrent: int) -> None:
"""
Update the maximum number of concurrent operations.

This method should be called when no tasks are currently executing
to avoid breaking the semaphore's invariants.

Args:
new_max_concurrent: The new maximum number of concurrent operations
"""
if self.is_busy():
logger.warning(
f"Cannot update max_concurrent while {len(self._active_tasks)} tasks are active"
)
return

if new_max_concurrent <= 0:
raise ValueError("max_concurrent must be positive")

self.max_concurrent = new_max_concurrent
# Replace the semaphore with a new one having the updated value
self._semaphore = asyncio.Semaphore(new_max_concurrent)
logger.debug(f"Updated max_concurrent to {new_max_concurrent}")
40 changes: 40 additions & 0 deletions forklet/core/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ def __init__(
self._current_result = None
self._active_tasks = self.concurrency_manager._active_tasks

# Set up rate limit callback to adjust concurrency based on API rate limit status
self.github_service.rate_limiter.set_rate_limit_callback(
self._on_rate_limit_update
)

@property
def is_cancelled(self) -> bool:
"""Check if the orchestrator is cancelled (backwards compatibility)."""
Expand Down Expand Up @@ -444,3 +449,38 @@ def reset_state(self) -> None:
self._is_paused = self.state_controller.is_paused
self._current_result = None
self._active_tasks = self.concurrency_manager._active_tasks

def _on_rate_limit_update(self, rate_limit_info: RateLimitInfo) -> None:
"""
Adjust download concurrency based on GitHub API rate limit status.

This is called by the rate limiter when rate limit information is updated.
It dynamically adjusts the concurrency level to stay within rate limits.
"""
# Calculate a safe concurrency level based on remaining rate limit
# We want to leave enough buffer for other API calls (repo info, etc.)
# Assume we need ~10 API calls per download operation (simplified)
api_calls_per_download = 10

if rate_limit_info.remaining < api_calls_per_download * 2:
# Very low rate limit - reduce concurrency significantly
new_max_concurrent = max(
1, rate_limit_info.remaining // (api_calls_per_download * 2)
)
elif rate_limit_info.remaining < api_calls_per_download * 10:
# Moderate rate limit - reduce concurrency somewhat
new_max_concurrent = max(
1, rate_limit_info.remaining // (api_calls_per_download * 2)
)
else:
# Plenty of rate limit - use configured maximum
new_max_concurrent = self._max_concurrent_downloads

# Apply the new concurrency level if it's different
if new_max_concurrent != self.concurrency_manager.max_concurrent:
logger.debug(
f"Adjusting download concurrency from {self.concurrency_manager.max_concurrent} "
f"to {new_max_concurrent} based on rate limit "
f"({rate_limit_info.remaining}/{rate_limit_info.limit} remaining)"
)
self.concurrency_manager.update_max_concurrent(new_max_concurrent)
78 changes: 43 additions & 35 deletions forklet/infrastructure/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,34 @@
"""

import time
import asyncio
import asyncio
from typing import Optional, Dict
from dataclasses import dataclass
from datetime import datetime
import random
from typing import Callable, Optional

from forklet.infrastructure.logger import logger


####
## RATE LIMIT INFO
#####
@dataclass
class RateLimitInfo:
"""Rate limit information from GitHub API headers."""

limit: int = 5000
remaining: int = 5000
reset_time: Optional[datetime] = None
used: int = 0

@property
def is_exhausted(self) -> bool:
"""Check if rate limit is exhausted."""

return self.remaining <= 10 # Keep a small buffer

@property
def reset_in_seconds(self) -> float:
"""Get seconds until rate limit resets."""
Expand All @@ -44,15 +46,12 @@ def reset_in_seconds(self) -> float:
class RateLimiter:
"""
Async rate limiter for GitHub API requests.

Handles both primary and secondary rate limits with exponential backoff.
"""

def __init__(
self,
default_delay: float = 1.0,
max_delay: float = 60.0,
adaptive: bool = True
self, default_delay: float = 1.0, max_delay: float = 60.0, adaptive: bool = True
):
self.default_delay = default_delay
self.max_delay = max_delay
Expand All @@ -61,40 +60,47 @@ def __init__(
self._last_request = 0.0
self._rate_limit_info = RateLimitInfo()
self._consecutive_limits = 0

self._rate_limit_callback: Optional[Callable[[RateLimitInfo], None]] = None

def set_rate_limit_callback(
self, callback: Callable[[RateLimitInfo], None]
) -> None:
"""Set a callback to be invoked when rate limit information is updated."""
self._rate_limit_callback = callback
self._rate_limit_callback: Optional[Callable[[RateLimitInfo], None]] = None
self._rate_limit_callback: Optional[Callable[[RateLimitInfo], None]] = None

async def acquire(self) -> None:
"""Acquire rate limit permission."""

async with self._lock:
current_time = time.time()

# Check if we need to wait due to rate limiting
if self._rate_limit_info.is_exhausted:
wait_time = self._rate_limit_info.reset_in_seconds
if wait_time > 0:
logger.warning(
f"Rate limit exhausted, waiting {wait_time:.1f}s"
)
logger.warning(f"Rate limit exhausted, waiting {wait_time:.1f}s")
await asyncio.sleep(wait_time)

# Adaptive delay based on rate limit status
delay = self._calculate_adaptive_delay(current_time)

if delay > 0:
await asyncio.sleep(delay)

self._last_request = time.time()

def _calculate_adaptive_delay(self, current_time: float) -> float:
"""Calculate adaptive delay based on rate limit status."""

if not self.adaptive:
return self.default_delay

# Base delay from last request
elapsed = current_time - self._last_request
base_delay = max(0, self.default_delay - elapsed)

# Adjust based on remaining rate limit
if self._rate_limit_info.remaining < 100:
# Very low remaining calls - be more conservative
Expand All @@ -111,43 +117,45 @@ def _calculate_adaptive_delay(self, current_time: float) -> float:
else:
# Plenty of calls remaining
multiplier = 1.0

# Add jitter to prevent thundering herd
jitter = random.uniform(0.8, 1.2)

final_delay = min(base_delay * multiplier * jitter, self.max_delay)
return final_delay

async def update_rate_limit_info(self, headers: Dict[str, str]) -> None:
"""Update rate limit information from API response headers."""

async with self._lock:
try:
self._rate_limit_info.limit = int(
headers.get('x-ratelimit-limit', 5000)
headers.get("x-ratelimit-limit", 5000)
)
self._rate_limit_info.remaining = int(
headers.get('x-ratelimit-remaining', 5000)
headers.get("x-ratelimit-remaining", 5000)
)
self._rate_limit_info.used = int(
headers.get('x-ratelimit-used', 0)
)

reset_timestamp = headers.get('x-ratelimit-reset')
self._rate_limit_info.used = int(headers.get("x-ratelimit-used", 0))

reset_timestamp = headers.get("x-ratelimit-reset")
if reset_timestamp:
self._rate_limit_info.reset_time = datetime.fromtimestamp(
int(reset_timestamp)
)

# Track consecutive rate limit hits
if self._rate_limit_info.is_exhausted:
self._consecutive_limits += 1
else:
self._consecutive_limits = 0


# Invoke callback if set
if self._rate_limit_callback:
self._rate_limit_callback(self._rate_limit_info)

except (ValueError, KeyError) as e:
logger.warning(f"Failed to parse rate limit headers: {e}")

@property
def rate_limit_info(self) -> RateLimitInfo:
"""Get current rate limit information."""
Expand Down
79 changes: 79 additions & 0 deletions forklet/services/github_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,85 @@ def __init__(
self.retry_manager = retry_manager
self.auth_token = auth_token
self.timeout = timeout
self._concurrency_adjustment_callback: Optional[
Callable[[RateLimitInfo], None]
] = None
# Set up rate limit callback to adjust concurrency
self.rate_limiter.set_rate_limit_callback(self._on_rate_limit_update)

def set_external_rate_limit_callback(
self, callback: Callable[[RateLimitInfo], None]
) -> None:
"""Set an external callback to be invoked when rate limit information is updated."""
self._external_rate_limit_callback = callback

def _on_rate_limit_update(self, rate_limit_info: RateLimitInfo) -> None:
"""Internal callback for rate limit updates - adjusts download concurrency based on rate limit status."""
# This is a placeholder - in a full implementation, this would adjust the
# concurrency settings in the DownloadOrchestrator based on rate limit status
# For now, we just log when we're getting low on rate limits
if rate_limit_info.remaining < 100:
logger.warning(
f"GitHub API rate limit low: {rate_limit_info.remaining}/{rate_limit_info.limit} remaining"
)
elif rate_limit_info.is_exhausted:
logger.warning(
f"GitHub API rate limit exhausted: {rate_limit_info.remaining}/{rate_limit_info.limit} remaining"
)

# Configure HTTP client
headers = {"Accept": "application/vnd.github.v3+json", "User-Agent": USER_AGENT}

if auth_token:
headers["Authorization"] = f"token {auth_token}"

self.http_client = httpx.AsyncClient(
headers=headers, timeout=httpx.Timeout(timeout)
)

# Sync client for PyGithub (used only for metadata)
self.github_client = (
Github(
auth_token, retry=self.retry_manager.max_retries, user_agent=USER_AGENT
)
if auth_token
else Github(retry=self.retry_manager.max_retries, user_agent=USER_AGENT)
)

async def update_rate_limit_info(self, headers: Dict[str, str]) -> None:
"""Update rate limit information from API response headers."""

async with self._lock:
try:
self._rate_limit_info.limit = int(
headers.get("x-ratelimit-limit", 5000)
)
self._rate_limit_info.remaining = int(
headers.get("x-ratelimit-remaining", 5000)
)
self._rate_limit_info.used = int(headers.get("x-ratelimit-used", 0))

reset_timestamp = headers.get("x-ratelimit-reset")
if reset_timestamp:
self._rate_limit_info.reset_time = datetime.fromtimestamp(
int(reset_timestamp)
)

# Track consecutive rate limit hits
if self._rate_limit_info.is_exhausted:
self._consecutive_limits += 1
else:
self._consecutive_limits = 0

# Invoke internal callback if set
if self._rate_limit_callback:
self._rate_limit_callback(self._rate_limit_info)
# Invoke external callback if set
if self._external_rate_limit_callback:
self._external_rate_limit_callback(self._rate_limit_info)

except (ValueError, KeyError) as e:
logger.warning(f"Failed to parse rate limit headers: {e}")

# Configure HTTP client
headers = {"Accept": "application/vnd.github.v3+json", "User-Agent": USER_AGENT}
Expand Down
Loading