Skip to content

Commit d1c0bd0

Browse files
committed
feat: Implement file content update functionality and associated tests
- Added `update_file_content` method to `FileService` for updating existing file content in MinIO and updating metadata in Redis. - Introduced `_update_mounted_files_content` method in `ExecutionOrchestrator` to handle in-place edits to mounted files after execution. - Created integration tests in `test_mounted_file_edits.py` to verify persistence of edits to mounted files. - Developed unit tests in `test_file_service.py` to ensure correct behavior of the `update_file_content` method, including success and error scenarios.
1 parent 0b38569 commit d1c0bd0

4 files changed

Lines changed: 852 additions & 1 deletion

File tree

src/services/file.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,98 @@ async def update_file_state_hash(
804804
)
805805
return False
806806

807+
async def update_file_content(
808+
self,
809+
session_id: str,
810+
file_id: str,
811+
content: bytes,
812+
state_hash: Optional[str] = None,
813+
execution_id: Optional[str] = None,
814+
) -> bool:
815+
"""Update the content of an existing file.
816+
817+
Overwrites the MinIO object and updates metadata. Used to persist
818+
in-place edits to mounted files after execution.
819+
820+
Args:
821+
session_id: Session identifier
822+
file_id: File identifier
823+
content: New file content as bytes
824+
state_hash: Optional SHA256 hash of the Python state
825+
execution_id: Optional ID of the execution that modified this file
826+
827+
Returns:
828+
True if update was successful
829+
"""
830+
try:
831+
# Get existing metadata to find object_key
832+
metadata = await self._get_file_metadata(session_id, file_id)
833+
if not metadata:
834+
logger.warning(
835+
"File not found for content update",
836+
session_id=session_id[:12],
837+
file_id=file_id,
838+
)
839+
return False
840+
841+
object_key = metadata.get("object_key")
842+
if not object_key:
843+
logger.warning(
844+
"No object_key in file metadata",
845+
session_id=session_id[:12],
846+
file_id=file_id,
847+
)
848+
return False
849+
850+
# Overwrite content in MinIO
851+
import io
852+
853+
loop = asyncio.get_event_loop()
854+
content_stream = io.BytesIO(content)
855+
content_type = metadata.get("content_type", "application/octet-stream")
856+
857+
await loop.run_in_executor(
858+
None,
859+
lambda: self.minio_client.put_object(
860+
self.bucket_name,
861+
object_key,
862+
content_stream,
863+
len(content),
864+
content_type,
865+
),
866+
)
867+
868+
# Update metadata
869+
now = datetime.utcnow().isoformat()
870+
updates = {
871+
"size": len(content),
872+
"last_used_at": now,
873+
}
874+
if state_hash:
875+
updates["state_hash"] = state_hash
876+
if execution_id:
877+
updates["execution_id"] = execution_id
878+
879+
metadata_key = self._get_file_metadata_key(session_id, file_id)
880+
await self.redis_client.hset(metadata_key, mapping=updates)
881+
882+
logger.debug(
883+
"Updated file content",
884+
session_id=session_id[:12],
885+
file_id=file_id,
886+
size=len(content),
887+
)
888+
return True
889+
890+
except Exception as e:
891+
logger.error(
892+
"Failed to update file content",
893+
error=str(e),
894+
session_id=session_id,
895+
file_id=file_id,
896+
)
897+
return False
898+
807899
async def close(self) -> None:
808900
"""Close service connections."""
809901
try:

src/services/orchestrator.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ async def execute(
155155
# This sets ctx.new_state_hash needed for file-state linking
156156
await self._save_state(ctx)
157157

158+
# Step 5.6: Update mounted files to capture in-place edits
159+
await self._update_mounted_files_content(ctx)
160+
158161
# Step 6: Handle generated files (with state_hash for linking)
159162
ctx.generated_files = await self._handle_generated_files(ctx)
160163

@@ -555,6 +558,63 @@ async def _update_mounted_files_state_hash(
555558
error=str(e),
556559
)
557560

561+
async def _update_mounted_files_content(self, ctx: ExecutionContext) -> None:
562+
"""Re-upload all mounted files to capture any modifications.
563+
564+
This ensures in-place edits to mounted files persist after execution.
565+
Called after execution completes, reads current content from container
566+
and updates the file in MinIO storage.
567+
"""
568+
if not ctx.mounted_files or not ctx.container:
569+
return
570+
571+
container_manager = self.execution_service.container_manager
572+
573+
for file_info in ctx.mounted_files:
574+
try:
575+
filename = file_info.get("filename")
576+
file_id = file_info.get("file_id")
577+
session_id = file_info.get("session_id")
578+
579+
if not all([filename, file_id, session_id]):
580+
continue
581+
582+
# Read current content from container
583+
file_path = f"/mnt/data/{filename}"
584+
content = await container_manager.get_file_content_from_container(
585+
ctx.container, file_path
586+
)
587+
588+
if content is None:
589+
# File may have been deleted - that's ok
590+
logger.debug(
591+
"Mounted file not found after execution",
592+
filename=filename,
593+
)
594+
continue
595+
596+
# Update file in storage
597+
await self.file_service.update_file_content(
598+
session_id=session_id,
599+
file_id=file_id,
600+
content=content,
601+
state_hash=ctx.new_state_hash,
602+
execution_id=ctx.request_id,
603+
)
604+
605+
logger.debug(
606+
"Updated mounted file content",
607+
filename=filename,
608+
size=len(content),
609+
)
610+
611+
except Exception as e:
612+
logger.warning(
613+
"Failed to update mounted file",
614+
filename=file_info.get("filename"),
615+
error=str(e),
616+
)
617+
558618
def _normalize_args(self, args: Any) -> Optional[List[str]]:
559619
"""Normalize args parameter to List[str] or None.
560620
@@ -591,7 +651,8 @@ async def _execute_code(self, ctx: ExecutionContext) -> Any:
591651
# Determine if we should use state persistence (Python only)
592652
use_state = settings.state_persistence_enabled and ctx.request.lang == "py"
593653

594-
# execute_code returns (execution, container, new_state, state_errors, container_source) tuple
654+
# execute_code returns tuple:
655+
# (execution, container, new_state, state_errors, container_source)
595656
(
596657
execution,
597658
ctx.container,

0 commit comments

Comments
 (0)