-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathmain.py
More file actions
452 lines (369 loc) · 15.7 KB
/
main.py
File metadata and controls
452 lines (369 loc) · 15.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
"""Main FastAPI application for the Code Interpreter API."""
# Standard library imports
import sys
from contextlib import asynccontextmanager
# Third-party imports
import structlog
import uvicorn
from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import ValidationError
# Local application imports
from .api import files, exec, health, admin, dashboard_metrics, programmatic
from .config import settings
from .middleware.security import SecurityMiddleware, RequestLoggingMiddleware
from .middleware.metrics import MetricsMiddleware
from .models.errors import CodeInterpreterException
from .services.health import health_service
from .services.metrics import metrics_service
from .utils.config_validator import validate_configuration, get_configuration_summary
from .utils.error_handlers import (
code_interpreter_exception_handler,
http_exception_handler,
validation_exception_handler,
general_exception_handler,
)
from .utils.logging import setup_logging
from .utils.shutdown import setup_graceful_shutdown, shutdown_handler
# Setup logging
setup_logging()
logger = structlog.get_logger()
async def _startup_monitoring(app: FastAPI) -> None:
"""Start metrics and monitoring services."""
try:
await metrics_service.start()
metrics_service.register_event_handlers()
logger.info("Metrics service started")
except Exception as e:
logger.error("Failed to start metrics service", error=str(e))
async def _startup_cleanup_tasks() -> None:
"""Start session cleanup and event-driven cleanup scheduler."""
try:
from .dependencies.services import get_session_service
session_service = get_session_service()
await session_service.start_cleanup_task()
logger.info("Session cleanup task started")
except Exception as e:
logger.error("Failed to start session cleanup task", error=str(e))
try:
from .services.cleanup import cleanup_scheduler
from .dependencies.services import (
get_execution_service,
get_file_service,
get_state_archival_service,
)
cleanup_scheduler.set_services(
execution_service=get_execution_service(),
file_service=get_file_service(),
state_archival_service=(
get_state_archival_service() if settings.state_archive_enabled else None
),
)
cleanup_scheduler.start()
logger.info("Cleanup scheduler started")
except Exception as e:
logger.error("Failed to start cleanup scheduler", error=str(e))
async def _startup_sandbox_pool(app: FastAPI) -> None:
"""Start the sandbox pool if enabled."""
if settings.sandbox_pool_enabled:
try:
from .services.sandbox.pool import SandboxPool
from .services.sandbox.manager import SandboxManager
from .services.cleanup import cleanup_scheduler
from .dependencies.services import (
set_sandbox_pool,
inject_sandbox_pool_to_execution_service,
)
sandbox_manager = SandboxManager()
sandbox_pool = SandboxPool(sandbox_manager)
await sandbox_pool.start()
# Connect pool to cleanup scheduler
cleanup_scheduler.set_sandbox_pool(sandbox_pool)
# Register pool with dependency injection system
set_sandbox_pool(sandbox_pool)
inject_sandbox_pool_to_execution_service()
# Register pool with health service for monitoring
health_service.set_sandbox_pool(sandbox_pool)
# Store pool reference in app state
app.state.sandbox_pool = sandbox_pool
logger.info("Sandbox pool started")
except Exception as e:
logger.error("Failed to start sandbox pool", error=str(e))
else:
logger.info("Sandbox pool disabled")
async def _perform_health_checks() -> None:
"""Perform initial health checks on all services."""
try:
health_results = await health_service.check_all_services(use_cache=False)
for service_name, result in health_results.items():
if result.status.value == "healthy":
logger.debug(
f"{service_name} healthy",
response_time_ms=result.response_time_ms,
)
else:
logger.warning(
f"{service_name} health check failed",
status=result.status.value,
error=result.error,
)
overall_status = health_service.get_overall_status(health_results)
logger.info("Health checks completed", overall_status=overall_status.value)
except Exception as e:
logger.error("Initial health checks failed", error=str(e))
async def _startup_egress_proxy(app: FastAPI) -> None:
"""Start the inline egress proxy if sandbox network access is enabled.
Also prepares the persistent skill-deps directory: chmods it sticky +
world-writable so each language's sandbox uid can install packages
without root, while preserving package files across containers.
"""
if not settings.enable_sandbox_network:
return
import os
from pathlib import Path
from .services.sandbox.egress_proxy import DEFAULT_ALLOWLIST, EgressProxy
deps_root = Path(settings.skill_deps_path)
try:
deps_root.mkdir(parents=True, exist_ok=True)
# Sticky + world-writable, like /tmp. The sandbox uid (e.g. 1001) needs
# to write here; keeping it sticky means one sandbox can't unlink
# another's files.
os.chmod(str(deps_root), 0o1777) # nosec B103
except OSError as exc:
logger.warning(
"Could not prepare skill-deps directory; "
"sandbox installs may fail with permission errors",
path=str(deps_root),
error=str(exc),
)
extra = (
[h.strip() for h in settings.sandbox_egress_allowlist.split(",") if h.strip()]
if settings.sandbox_egress_allowlist
else []
)
proxy = EgressProxy(
port=settings.sandbox_egress_port,
allowlist=list(DEFAULT_ALLOWLIST) + extra,
allow_public_https=settings.sandbox_egress_mode == "public_https",
)
await proxy.start()
app.state.egress_proxy = proxy
# Network-level enforcement so a malicious skill can't `socket.create_connection`
# around the application-level proxy. Without these iptables rules, sandbox
# processes — sharing the API container's net namespace — can directly reach
# Redis/S3 and any internal docker network. Refuse to enable network if the
# firewall can't be installed (better to fail loudly than to silently leak SSRF).
from .config.languages import SANDBOX_USER_ID
from .services.sandbox.egress_firewall import install_sandbox_egress_rules
sandbox_uid = SANDBOX_USER_ID
firewall_ok = install_sandbox_egress_rules(
sandbox_uid=sandbox_uid,
proxy_port=settings.sandbox_egress_port,
)
if not firewall_ok:
await proxy.stop()
app.state.egress_proxy = None
raise RuntimeError(
"ENABLE_SANDBOX_NETWORK=true but the iptables egress firewall could "
"not be installed. The container needs CAP_NET_ADMIN (cap_add: NET_ADMIN "
"in compose) and an iptables binary. Without these rules, sandboxes "
"could SSRF Redis/S3 via direct sockets — refusing to enable network."
)
logger.info(
"Sandbox network access ENABLED via egress proxy + firewall",
port=settings.sandbox_egress_port,
egress_mode=settings.sandbox_egress_mode,
skill_deps_path=str(deps_root),
sandbox_uid=sandbox_uid,
allowlist_extra=extra or None,
)
async def _shutdown_egress_proxy(app: FastAPI) -> None:
proxy = getattr(app.state, "egress_proxy", None)
if proxy is None:
return
try:
await proxy.stop()
except Exception as exc:
logger.warning("Failed to stop egress proxy cleanly", error=str(exc))
# Best-effort cleanup of the iptables rules so a restart doesn't accumulate.
try:
from .services.sandbox.egress_firewall import remove_existing_rules
remove_existing_rules()
except Exception as exc:
logger.warning("Failed to remove sandbox egress firewall", error=str(exc))
async def _shutdown_services(app: FastAPI) -> None:
"""Stop monitoring services, sandbox pool, PTC contexts, and cleanup scheduler."""
try:
await metrics_service.stop()
logger.info("Metrics service stopped")
except Exception as e:
logger.error("Error stopping metrics service", error=str(e))
# Clean up PTC paused contexts
try:
from .api.programmatic import _ptc_service
if _ptc_service is not None:
await _ptc_service.cleanup_all()
logger.info("PTC service cleaned up")
except Exception as e:
logger.error("Error cleaning up PTC service", error=str(e))
if hasattr(app.state, "sandbox_pool") and app.state.sandbox_pool:
try:
await app.state.sandbox_pool.stop()
logger.info("Sandbox pool stopped")
except Exception as e:
logger.error("Error stopping sandbox pool", error=str(e))
try:
from .services.cleanup import cleanup_scheduler
cleanup_scheduler.stop()
logger.info("Cleanup scheduler stopped")
except Exception as e:
logger.error("Error stopping cleanup scheduler", error=str(e))
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager."""
logger.info("Starting Code Interpreter API", version="1.2.0")
setup_graceful_shutdown()
if not validate_configuration():
logger.error("Configuration validation failed - shutting down")
sys.exit(1)
if settings.api_key == "test-api-key":
logger.warning("Using default API key - CHANGE THIS IN PRODUCTION!")
if settings.api_debug:
logger.warning("Debug mode is enabled - disable in production")
if not settings.auth_enabled:
logger.warning(
"AUTHENTICATION DISABLED via AUTH_ENABLED=false; "
"trusting network boundary for x-api-key endpoints "
"(master-key admin endpoints still require MASTER_API_KEY)"
)
if settings.master_api_key:
logger.info("API key management enabled")
logger.debug("Rate limiting", enabled=settings.rate_limit_enabled)
# Bash PTC requires `jq` inside the sandbox image. The Dockerfile installs
# it, but warn if running outside Docker so bash PTC failures aren't a
# surprise.
import shutil
if shutil.which("jq") is None:
logger.warning(
"jq not found on PATH; /exec/programmatic with lang='bash' will fail "
"(bash PTC tool wrappers depend on jq for JSON marshalling)"
)
await _startup_monitoring(app)
await _startup_cleanup_tasks()
await _startup_sandbox_pool(app)
await _startup_egress_proxy(app)
await _perform_health_checks()
logger.info("Code Interpreter API startup completed")
yield
logger.info("Shutting down Code Interpreter API")
await _shutdown_egress_proxy(app)
await _shutdown_services(app)
try:
await shutdown_handler.shutdown()
except Exception as e:
logger.error("Error during graceful shutdown", error=str(e))
logger.info("Code Interpreter API shutdown completed")
# Create FastAPI app with enhanced configuration
app = FastAPI(
title="Code Interpreter API",
description="A secure API for executing code in isolated environments",
version="1.2.0",
docs_url="/docs" if settings.enable_docs else None,
redoc_url="/redoc" if settings.enable_docs else None,
debug=settings.api_debug,
lifespan=lifespan,
)
# Add middleware (order matters - most specific first)
app.add_middleware(MetricsMiddleware)
app.add_middleware(RequestLoggingMiddleware)
app.add_middleware(SecurityMiddleware)
# Add CORS middleware (conditionally)
if settings.enable_cors:
origins = settings.cors_origins if settings.cors_origins else ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
expose_headers=[
"Content-Disposition"
], # Removed Content-Length for chunked encoding
)
logger.info("CORS enabled", origins=origins)
# Register global error handlers
app.add_exception_handler(CodeInterpreterException, code_interpreter_exception_handler)
app.add_exception_handler(HTTPException, http_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(ValidationError, validation_exception_handler)
app.add_exception_handler(Exception, general_exception_handler)
@app.get("/config")
async def config_info():
"""Configuration information endpoint (non-sensitive data only)."""
if not settings.api_debug:
raise HTTPException(status_code=404, detail="Not found")
return get_configuration_summary()
# Include routers (authentication handled by middleware)
# State persistence is exposed through /exec session continuity only.
# No public /state router is mounted.
# Files routes - mount without prefix for LibreChat compatibility
app.include_router(files.router, tags=["files"])
app.include_router(exec.router, tags=["exec"])
app.include_router(programmatic.router, tags=["exec", "programmatic"])
app.include_router(health.router, tags=["health", "monitoring"])
app.include_router(admin.router, prefix="/api/v1", tags=["admin"])
app.include_router(dashboard_metrics.router, prefix="/api/v1", tags=["admin-metrics"])
# Admin Dashboard Frontend
app.mount(
"/admin-dashboard/static",
StaticFiles(directory="dashboard/static"),
name="dashboard-static",
)
@app.get("/admin-dashboard", tags=["admin"])
async def get_admin_dashboard():
"""Serve the admin dashboard frontend."""
return FileResponse("dashboard/index.html")
@app.get("/admin-dashboard/{rest_of_path:path}", tags=["admin"])
async def get_admin_dashboard_deep_link(rest_of_path: str):
"""Handle deep links for the admin dashboard by serving index.html."""
return FileResponse("dashboard/index.html")
def run_server():
if settings.https_enabled:
# Validate SSL files exist
if not settings.validate_ssl_files():
logger.error("SSL configuration invalid - missing certificate files")
sys.exit(1)
# Configure SSL
ssl_config = {
"ssl_certfile": settings.ssl_cert_file,
"ssl_keyfile": settings.ssl_key_file,
}
if settings.ssl_ca_certs:
ssl_config["ssl_ca_certs"] = settings.ssl_ca_certs
logger.info(f"Starting HTTPS server on {settings.api_host}:{settings.api_port}")
uvicorn.run(
"src.main:app",
host=settings.api_host,
port=settings.api_port,
reload=settings.api_reload,
log_level=settings.log_level.lower(),
access_log=settings.enable_access_logs,
timeout_keep_alive=120,
**ssl_config,
)
else:
logger.info(f"Starting HTTP server on {settings.api_host}:{settings.api_port}")
uvicorn.run(
"src.main:app",
host=settings.api_host,
port=settings.api_port,
reload=settings.api_reload,
log_level=settings.log_level.lower(),
access_log=settings.enable_access_logs,
timeout_keep_alive=120,
)
if __name__ == "__main__":
run_server()