-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathconfig_validator.py
More file actions
174 lines (140 loc) · 5.83 KB
/
config_validator.py
File metadata and controls
174 lines (140 loc) · 5.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""Configuration validation utilities."""
import logging
import shutil
from typing import List, Dict, Any
import redis
from botocore.exceptions import ClientError
from ..config import settings
logger = logging.getLogger(__name__)
class ConfigurationError(Exception):
"""Raised when configuration validation fails."""
pass
class ConfigValidator:
"""Validates application configuration and external service connectivity."""
def __init__(self):
self.errors: List[str] = []
self.warnings: List[str] = []
def validate_all(self) -> bool:
"""Validate all configuration settings and external services."""
self.errors.clear()
self.warnings.clear()
# Validate basic configuration
self._validate_api_config()
self._validate_security_config()
self._validate_resource_limits()
self._validate_file_config()
# Validate external services
self._validate_redis_connection()
self._validate_s3_connection()
self._validate_nsjail()
# Log results
if self.warnings:
for warning in self.warnings:
logger.warning(f"Configuration warning: {warning}")
if self.errors:
for error in self.errors:
logger.error(f"Configuration error: {error}")
return False
return True
def _validate_api_config(self):
"""Validate API configuration."""
# Check API key strength
if len(settings.api_key) < 16:
self.errors.append("API key must be at least 16 characters long")
if settings.api_key == "test-api-key":
self.warnings.append("Using default API key - change this in production")
# Validate additional API keys
if settings.api_keys:
for key in settings.api_keys:
if len(key) < 16:
self.errors.append(f"Additional API key too short: {key[:8]}...")
def _validate_security_config(self):
"""Validate security configuration."""
# Check file extensions
if not settings.allowed_file_extensions:
self.warnings.append("No allowed file extensions configured")
# Validate sandbox security settings
if not settings.enable_network_isolation:
self.warnings.append("Network isolation is disabled - security risk")
if not settings.enable_filesystem_isolation:
self.warnings.append("Filesystem isolation is disabled - security risk")
def _validate_resource_limits(self):
"""Validate resource limit configuration."""
pass
def _validate_file_config(self):
"""Validate file handling configuration."""
# Validate file extensions format
for ext in settings.allowed_file_extensions:
if not ext.startswith("."):
self.errors.append(f"File extension must start with dot: {ext}")
def _validate_redis_connection(self):
"""Validate Redis connection."""
try:
# Use Redis URL from settings
client = redis.from_url(
settings.get_redis_url(),
socket_timeout=settings.redis_socket_timeout,
socket_connect_timeout=settings.redis_socket_connect_timeout,
max_connections=settings.redis_max_connections,
)
# Test connection
client.ping()
except redis.ConnectionError as e:
# Treat as warning in development mode to allow startup without Redis
if settings.api_debug:
self.warnings.append(f"Cannot connect to Redis: {e}")
else:
self.errors.append(f"Cannot connect to Redis: {e}")
except redis.AuthenticationError as e:
self.errors.append(f"Redis authentication failed: {e}")
except Exception as e:
# Treat as warning in development mode
if settings.api_debug:
self.warnings.append(f"Redis validation error: {e}")
else:
self.errors.append(f"Redis validation error: {e}")
def _validate_s3_connection(self):
"""Validate S3 storage connection."""
try:
client = settings.s3.make_client()
try:
client.head_bucket(Bucket=settings.s3_bucket)
except ClientError as e:
code = e.response["Error"]["Code"]
if code in ("404", "NoSuchBucket"):
self.warnings.append(
f"S3 bucket '{settings.s3_bucket}' does not exist"
)
else:
raise
except ClientError as e:
if settings.api_debug:
self.warnings.append(f"S3 error: {e}")
else:
self.errors.append(f"S3 error: {e}")
except Exception as e:
if settings.api_debug:
self.warnings.append(f"S3 validation error: {e}")
else:
self.errors.append(f"S3 validation error: {e}")
def _validate_nsjail(self):
"""Validate nsjail sandbox availability."""
nsjail_path = shutil.which("nsjail")
if not nsjail_path:
self.warnings.append(
"nsjail binary not found in PATH - sandboxed execution will not work"
)
else:
logger.info(f"nsjail found at: {nsjail_path}")
def validate_configuration() -> bool:
"""Validate application configuration."""
validator = ConfigValidator()
return validator.validate_all()
def get_configuration_summary() -> Dict[str, Any]:
"""Get a summary of current configuration for debugging."""
return {
"debug": settings.api_debug,
"languages": len(settings.supported_languages),
"max_execution_time": settings.max_execution_time,
"max_memory_mb": settings.max_memory_mb,
}