Skip to content

Commit 6d5ee06

Browse files
authored
Merge pull request usnavy13#73 from usnavy13/dev
fix: resolve socket hang up errors for large file execution
2 parents 6cc765d + 90cf8ca commit 6d5ee06

5 files changed

Lines changed: 304 additions & 54 deletions

File tree

src/api/exec.py

Lines changed: 82 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,20 @@
22
33
This is a thin endpoint that delegates to ExecutionOrchestrator for
44
the actual execution workflow logic.
5+
6+
Uses a streaming response with keepalive whitespace to prevent client
7+
socket timeouts (Node.js 20 defaults to 5s) during long-running
8+
executions like large file operations or cold sandbox starts.
59
"""
610

11+
import asyncio
12+
713
import structlog
814
from fastapi import APIRouter, Request
15+
from fastapi.responses import StreamingResponse
916

1017
from ..models import ExecRequest, ExecResponse
18+
from ..models.errors import ErrorResponse, ValidationError, ServiceUnavailableError
1119
from ..services.orchestrator import ExecutionOrchestrator
1220
from ..dependencies.services import (
1321
SessionServiceDep,
@@ -21,8 +29,12 @@
2129
logger = structlog.get_logger(__name__)
2230
router = APIRouter()
2331

32+
# Keepalive interval: send a space every 3 seconds to prevent
33+
# Node.js 20's default 5-second socket timeout from firing.
34+
_KEEPALIVE_INTERVAL = 3
2435

25-
@router.post("/exec", response_model=ExecResponse)
36+
37+
@router.post("/exec", responses={200: {"model": ExecResponse}})
2638
async def execute_code(
2739
request: ExecRequest,
2840
http_request: Request,
@@ -42,17 +54,9 @@ async def execute_code(
4254
State is stored in Redis (2 hour TTL) with automatic archival to MinIO for
4355
long-term storage (7 day TTL).
4456
45-
Args:
46-
request: Execution request with code, language, and optional files
47-
http_request: HTTP request for accessing state (api_key_hash)
48-
session_service: Session management service
49-
file_service: File storage service
50-
execution_service: Code execution service
51-
state_service: Python state persistence service (Redis)
52-
state_archival_service: Python state archival service (MinIO)
53-
54-
Returns:
55-
ExecResponse with session_id, stdout, stderr, and generated files
57+
Returns a streaming response that sends keepalive whitespace before the
58+
JSON body to prevent client socket timeouts during long operations.
59+
JSON parsers ignore leading whitespace, so this is fully compatible.
5660
"""
5761
request_id = generate_request_id()[:8]
5862

@@ -79,15 +83,70 @@ async def execute_code(
7983
state_archival_service=state_archival_service,
8084
)
8185

82-
# Execute via orchestrator (handles validation, session, files, execution, cleanup)
83-
response = await orchestrator.execute(
84-
request, request_id, api_key_hash=api_key_hash, is_env_key=is_env_key
85-
)
86-
87-
logger.info(
88-
"Code execution completed",
89-
request_id=request_id,
90-
session_id=response.session_id,
86+
async def _stream_response():
87+
"""Execute code and stream the response with keepalive whitespace.
88+
89+
Sends a space character every few seconds while the execution is
90+
running. Once the result is ready, sends the JSON body. Leading
91+
whitespace is ignored by JSON parsers, so this is transparent
92+
to clients.
93+
"""
94+
result_holder = {}
95+
error_holder = {}
96+
97+
async def _run():
98+
try:
99+
result_holder["response"] = await orchestrator.execute(
100+
request,
101+
request_id,
102+
api_key_hash=api_key_hash,
103+
is_env_key=is_env_key,
104+
)
105+
except Exception as e:
106+
error_holder["error"] = e
107+
108+
task = asyncio.create_task(_run())
109+
110+
# Send keepalive spaces while execution is running
111+
while not task.done():
112+
try:
113+
await asyncio.wait_for(
114+
asyncio.shield(task), timeout=_KEEPALIVE_INTERVAL
115+
)
116+
except asyncio.TimeoutError:
117+
# Execution still running — send keepalive space
118+
yield b" "
119+
except Exception:
120+
# Task raised an exception — will be handled below
121+
break
122+
123+
# Ensure the task is complete
124+
if not task.done():
125+
await task
126+
127+
# Re-raise validation/service errors so FastAPI exception handlers
128+
# can return proper HTTP status codes (400, 503, etc.)
129+
if "error" in error_holder:
130+
err = error_holder["error"]
131+
if isinstance(err, (ValidationError, ServiceUnavailableError)):
132+
raise err
133+
error_resp = ErrorResponse(
134+
error=str(err),
135+
error_type="execution",
136+
)
137+
yield error_resp.model_dump_json().encode()
138+
return
139+
140+
# Send the JSON response
141+
response = result_holder["response"]
142+
logger.info(
143+
"Code execution completed",
144+
request_id=request_id,
145+
session_id=response.session_id,
146+
)
147+
yield response.model_dump_json().encode()
148+
149+
return StreamingResponse(
150+
_stream_response(),
151+
media_type="application/json",
91152
)
92-
93-
return response

src/services/execution/runner.py

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -701,11 +701,23 @@ async def _mount_files_to_sandbox(
701701
files: List[Dict[str, Any]],
702702
language: str = "py",
703703
) -> None:
704-
"""Mount files to sandbox workspace."""
704+
"""Mount files to sandbox workspace.
705+
706+
Uses streaming (MinIO fget_object) to transfer files directly to the
707+
sandbox data directory without loading entire files into memory. This
708+
avoids blocking the asyncio event loop during large file transfers.
709+
"""
705710
try:
706711
from ..file import FileService
712+
from ...config.languages import get_user_id_for_language
707713

708714
file_service = FileService()
715+
user_id = get_user_id_for_language(language)
716+
717+
def _set_file_perms(path, uid):
718+
os.chown(path, uid, uid)
719+
os.chmod(path, 0o644)
720+
return os.path.getsize(path)
709721

710722
for file_info in files:
711723
filename = file_info.get("filename", "unknown")
@@ -717,36 +729,37 @@ async def _mount_files_to_sandbox(
717729
continue
718730

719731
try:
720-
file_content = await file_service.get_file_content(
721-
session_id, file_id
732+
normalized_filename = OutputProcessor.sanitize_filename(filename)
733+
dest_path = str(sandbox_info.data_dir / normalized_filename)
734+
735+
file_size = file_info.get("size", 0)
736+
if file_size > 10 * 1024 * 1024:
737+
logger.info(
738+
"Mounting large file",
739+
filename=filename,
740+
size_mb=round(file_size / 1024 / 1024, 1),
741+
)
742+
743+
# Stream directly from MinIO to sandbox directory (non-blocking)
744+
success = await file_service.stream_file_to_path(
745+
session_id, file_id, dest_path
722746
)
723747

724-
if file_content is not None:
725-
# Direct memory-to-sandbox transfer (no tempfiles)
726-
normalized_filename = OutputProcessor.sanitize_filename(
727-
filename
748+
if success:
749+
actual_size = await asyncio.to_thread(
750+
_set_file_perms, dest_path, user_id
728751
)
729-
dest_path = f"/mnt/data/{normalized_filename}"
730-
731-
if self.sandbox_manager.copy_content_to_sandbox(
732-
sandbox_info, file_content, dest_path, language=language
733-
):
734-
logger.debug(
735-
"Mounted file",
736-
filename=filename,
737-
size=len(file_content),
738-
)
739-
else:
740-
logger.warning("Failed to mount file", filename=filename)
741-
await self._create_placeholder_file(sandbox_info, filename)
742-
else:
743-
logger.warning(
744-
f"Could not retrieve content for file {filename}"
752+
logger.debug(
753+
"Mounted file",
754+
filename=filename,
755+
size=actual_size,
745756
)
757+
else:
758+
logger.warning("Failed to mount file", filename=filename)
746759
await self._create_placeholder_file(sandbox_info, filename)
747760

748761
except Exception as file_error:
749-
logger.error(f"Error retrieving file {filename}: {file_error}")
762+
logger.error(f"Error mounting file {filename}: {file_error}")
750763
await self._create_placeholder_file(sandbox_info, filename)
751764

752765
except Exception as e:

src/services/file.py

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -546,16 +546,17 @@ async def get_file_content(self, session_id: str, file_id: str) -> Optional[byte
546546
object_key = metadata["object_key"]
547547

548548
try:
549-
# Get object content
550549
loop = asyncio.get_event_loop()
551-
response = await loop.run_in_executor(
552-
None, self.minio_client.get_object, self.bucket_name, object_key
553-
)
554550

555-
content = response.read()
556-
response.close()
557-
response.release_conn()
551+
def _download():
552+
response = self.minio_client.get_object(self.bucket_name, object_key)
553+
try:
554+
return response.read()
555+
finally:
556+
response.close()
557+
response.release_conn()
558558

559+
content = await loop.run_in_executor(None, _download)
559560
return content
560561

561562
except S3Error as e:
@@ -567,6 +568,49 @@ async def get_file_content(self, session_id: str, file_id: str) -> Optional[byte
567568
)
568569
return None
569570

571+
async def stream_file_to_path(
572+
self, session_id: str, file_id: str, dest_path: str
573+
) -> bool:
574+
"""Stream file content from MinIO directly to a local file path.
575+
576+
Uses MinIO's fget_object for efficient disk-to-disk transfer
577+
without loading the entire file into memory. Runs in a thread
578+
pool executor to avoid blocking the async event loop.
579+
580+
Args:
581+
session_id: Session identifier
582+
file_id: File identifier
583+
dest_path: Local filesystem path to write the file to
584+
585+
Returns:
586+
True if successful, False otherwise
587+
"""
588+
metadata = await self.get_file_metadata(session_id, file_id)
589+
if not metadata:
590+
return False
591+
592+
object_key = metadata["object_key"]
593+
594+
try:
595+
loop = asyncio.get_event_loop()
596+
await loop.run_in_executor(
597+
None,
598+
self.minio_client.fget_object,
599+
self.bucket_name,
600+
object_key,
601+
dest_path,
602+
)
603+
return True
604+
except S3Error as e:
605+
logger.error(
606+
"Failed to stream file to path",
607+
error=str(e),
608+
session_id=session_id,
609+
file_id=file_id,
610+
dest_path=dest_path,
611+
)
612+
return False
613+
570614
async def store_uploaded_file(
571615
self,
572616
session_id: str,

src/services/interfaces.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,13 @@ async def get_file_content(self, session_id: str, file_id: str) -> Optional[byte
115115
"""Get actual file content."""
116116
pass
117117

118+
@abstractmethod
119+
async def stream_file_to_path(
120+
self, session_id: str, file_id: str, dest_path: str
121+
) -> bool:
122+
"""Stream file content directly to a local file path."""
123+
pass
124+
118125
@abstractmethod
119126
async def delete_file(self, session_id: str, file_id: str) -> bool:
120127
"""Delete a file from storage."""

0 commit comments

Comments
 (0)