Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,31 @@ AUTH_SECRET=CHANGE-ME-IN-PRODUCTION
# Target repository in format "owner/repo"
# Example: frankbria/codeframe
# GITHUB_REPO=owner/repo

# ============================================================================
# Rate Limiting Configuration
# ============================================================================

# Enable/disable rate limiting (default: true)
# Set to false to disable rate limiting entirely
RATE_LIMIT_ENABLED=true

# Rate limit for authentication endpoints (default: 10/minute)
# Format: "N/period" where period is second, minute, hour, or day
RATE_LIMIT_AUTH=10/minute

# Rate limit for standard API endpoints (default: 100/minute)
RATE_LIMIT_STANDARD=100/minute

# Rate limit for AI/expensive operations like chat (default: 20/minute)
RATE_LIMIT_AI=20/minute

# Rate limit for WebSocket connections (default: 30/minute)
RATE_LIMIT_WEBSOCKET=30/minute

# Storage backend for rate limiting (default: memory)
# Options: memory (single instance) or redis (distributed)
RATE_LIMIT_STORAGE=memory

# Redis URL for distributed rate limiting (required if RATE_LIMIT_STORAGE=redis)
# REDIS_URL=redis://localhost:6379/0
165 changes: 165 additions & 0 deletions codeframe/config/rate_limits.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""Rate limiting configuration for CodeFRAME API.

This module provides configuration for API rate limiting using slowapi.
It delegates to GlobalConfig in core/config.py as the single source of truth
for environment variable handling.

Environment Variables (via GlobalConfig):
RATE_LIMIT_ENABLED: Enable/disable rate limiting (default: true)
RATE_LIMIT_AUTH: Rate limit for authentication endpoints (default: 10/minute)
RATE_LIMIT_STANDARD: Rate limit for standard API endpoints (default: 100/minute)
RATE_LIMIT_AI: Rate limit for AI/expensive operations (default: 20/minute)
RATE_LIMIT_WEBSOCKET: Rate limit for WebSocket connections (default: 30/minute)
RATE_LIMIT_STORAGE: Storage backend - memory or redis (default: memory)
RATE_LIMIT_TRUSTED_PROXIES: Comma-separated trusted proxy IPs/CIDRs
REDIS_URL: Redis connection URL for distributed rate limiting (optional)
"""

import ipaddress
import logging
from dataclasses import dataclass, field
from functools import lru_cache
from typing import Optional

logger = logging.getLogger(__name__)


@dataclass
class RateLimitConfig:
"""Configuration for API rate limiting.

Attributes:
auth_limit: Rate limit for authentication endpoints
standard_limit: Rate limit for standard API endpoints
ai_limit: Rate limit for AI/expensive operations
websocket_limit: Rate limit for WebSocket connections
enabled: Whether rate limiting is enabled
storage: Storage backend ('memory' or 'redis')
redis_url: Redis connection URL for distributed rate limiting
trusted_proxies: List of trusted proxy IP addresses/networks
"""

auth_limit: str = "10/minute"
standard_limit: str = "100/minute"
ai_limit: str = "20/minute"
websocket_limit: str = "30/minute"
enabled: bool = True
storage: str = "memory"
redis_url: Optional[str] = None
trusted_proxies: list = field(default_factory=list)

def is_trusted_proxy(self, ip: str) -> bool:
"""Check if an IP address is from a trusted proxy.

Args:
ip: IP address to check

Returns:
True if IP is in trusted_proxies list or matches a trusted network
"""
if not self.trusted_proxies:
return False

try:
client_ip = ipaddress.ip_address(ip)
for proxy in self.trusted_proxies:
try:
# Check if it's a network (CIDR notation)
if "/" in proxy:
network = ipaddress.ip_network(proxy, strict=False)
if client_ip in network:
return True
else:
# Check exact IP match
if client_ip == ipaddress.ip_address(proxy):
return True
except ValueError:
# Invalid proxy entry, skip it
continue
return False
except ValueError:
# Invalid IP address
return False

@classmethod
def from_global_config(cls) -> "RateLimitConfig":
"""Create RateLimitConfig from GlobalConfig.

Uses core/config.py as the single source of truth for
environment variable handling.

Returns:
RateLimitConfig instance with values from GlobalConfig
"""
# Import here to avoid circular imports
from codeframe.core.config import get_global_config

global_config = get_global_config()

enabled = global_config.rate_limit_enabled
storage = global_config.rate_limit_storage
redis_url = global_config.redis_url

# Parse trusted proxies from comma-separated string
trusted_proxies_str = global_config.rate_limit_trusted_proxies.strip()
trusted_proxies = []
if trusted_proxies_str:
trusted_proxies = [
p.strip() for p in trusted_proxies_str.split(",") if p.strip()
]

# Validate storage type (already validated by Pydantic, but double-check)
if storage not in ("memory", "redis"):
logger.warning(
f"Invalid RATE_LIMIT_STORAGE: {storage}. "
f"Must be 'memory' or 'redis'. Defaulting to 'memory'."
)
storage = "memory"

# Warn if redis storage is requested but no URL provided
if storage == "redis" and not redis_url:
logger.warning(
"RATE_LIMIT_STORAGE is 'redis' but REDIS_URL is not set. "
"Falling back to in-memory storage."
)
storage = "memory"

return cls(
auth_limit=global_config.rate_limit_auth,
standard_limit=global_config.rate_limit_standard,
ai_limit=global_config.rate_limit_ai,
websocket_limit=global_config.rate_limit_websocket,
enabled=enabled,
storage=storage,
redis_url=redis_url,
trusted_proxies=trusted_proxies,
)


@lru_cache(maxsize=1)
def get_rate_limit_config() -> RateLimitConfig:
"""Get the global rate limit configuration.

Loads from GlobalConfig on first call, cached thereafter.
Thread-safe via lru_cache.

Returns:
RateLimitConfig instance
"""
config = RateLimitConfig.from_global_config()
logger.info(
f"Rate limit config initialized: "
f"enabled={config.enabled}, "
f"storage={config.storage}, "
f"standard={config.standard_limit}, "
f"trusted_proxies={len(config.trusted_proxies)} configured"
)
return config


def _reset_rate_limit_config() -> None:
"""Reset the global rate limit configuration.

Useful for testing to ensure clean state between tests.
"""
get_rate_limit_config.cache_clear()
48 changes: 48 additions & 0 deletions codeframe/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,17 @@ class GlobalConfig(BaseSettings):
github_token: Optional[str] = Field(None, alias="GITHUB_TOKEN")
github_repo: Optional[str] = Field(None, alias="GITHUB_REPO") # Format: "owner/repo"

# Rate Limiting Configuration
rate_limit_enabled: bool = Field(True, alias="RATE_LIMIT_ENABLED")
rate_limit_storage: str = Field("memory", alias="RATE_LIMIT_STORAGE")
redis_url: Optional[str] = Field(None, alias="REDIS_URL")
rate_limit_auth: str = Field("10/minute", alias="RATE_LIMIT_AUTH")
rate_limit_standard: str = Field("100/minute", alias="RATE_LIMIT_STANDARD")
rate_limit_ai: str = Field("20/minute", alias="RATE_LIMIT_AI")
rate_limit_websocket: str = Field("30/minute", alias="RATE_LIMIT_WEBSOCKET")
# Comma-separated list of trusted proxy IPs/CIDRs (e.g., "10.0.0.0/8,172.16.0.0/12")
rate_limit_trusted_proxies: str = Field("", alias="RATE_LIMIT_TRUSTED_PROXIES")

model_config = SettingsConfigDict(
env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore"
)
Expand All @@ -417,6 +428,15 @@ def validate_port(cls, v: int) -> int:
raise ValueError(f"API_PORT must be between 1 and 65535, got: {v}")
return v

@field_validator("rate_limit_storage")
@classmethod
def validate_rate_limit_storage(cls, v: str) -> str:
"""Validate rate limit storage is valid."""
allowed = ["memory", "redis"]
if v not in allowed:
raise ValueError(f"RATE_LIMIT_STORAGE must be one of {allowed}, got: {v}")
return v

def get_cors_origins_list(self) -> list[str]:
"""Parse CORS origins from comma-separated string."""
return [origin.strip() for origin in self.cors_origins.split(",") if origin.strip()]
Expand Down Expand Up @@ -580,3 +600,31 @@ def get(self, key: str) -> Any:
current = current[k]

return current


# Module-level singleton for GlobalConfig
_global_config: Optional[GlobalConfig] = None


def get_global_config() -> GlobalConfig:
"""Get the global configuration singleton.

Loads from environment variables on first call, cached thereafter.
This is the recommended way to access GlobalConfig for most use cases.

Returns:
GlobalConfig instance with values from environment
"""
global _global_config
if _global_config is None:
_global_config = GlobalConfig()
return _global_config


def reset_global_config() -> None:
"""Reset the global configuration singleton.

Useful for testing to ensure clean state between tests.
"""
global _global_config
_global_config = None
36 changes: 36 additions & 0 deletions codeframe/lib/audit_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class AuditEventType(Enum):
USER_DELETED = "user.deleted"
USER_ROLE_CHANGED = "user.role.changed"

# Rate limiting events
RATE_LIMIT_EXCEEDED = "rate_limit.exceeded"
RATE_LIMIT_WARNING = "rate_limit.warning"


class AuditLogger:
"""Centralized audit logger for security events.
Expand Down Expand Up @@ -182,6 +186,38 @@ def log_user_event(
metadata=metadata,
)

def log_rate_limit_event(
self,
event_type: AuditEventType,
user_id: Optional[int] = None,
ip_address: Optional[str] = None,
endpoint: Optional[str] = None,
limit_category: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Log rate limiting event.

Args:
event_type: Type of rate limit event (exceeded or warning)
user_id: User ID (if authenticated)
ip_address: Client IP address
endpoint: API endpoint path
limit_category: Rate limit category (auth, standard, ai, websocket)
metadata: Additional event metadata
"""
self._log_event(
event_type=event_type,
user_id=user_id,
resource_type="rate_limit",
resource_id=None,
ip_address=ip_address,
metadata={
**(metadata or {}),
"endpoint": endpoint,
"limit_category": limit_category,
},
)

def _log_event(
self,
event_type: AuditEventType,
Expand Down
Loading
Loading