Skip to content

Commit 670c83d

Browse files
committed
feat: Implement linked file management in FileService
- Added methods to create, register, and manage linked-input aliases for files across sessions. - Introduced functionality to check for existing linked references and handle read-only constraints during file updates. - Enhanced the orchestrator to support linking files into sessions, ensuring proper handling of explicit and linked files during execution. - Updated unit tests to cover new linked file behaviors and ensure correct functionality.
1 parent 9e94344 commit 670c83d

6 files changed

Lines changed: 572 additions & 41 deletions

File tree

src/services/file.py

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,62 @@ def _get_session_files_key(self, session_id: str) -> str:
7575
"""Generate Redis key for session file list."""
7676
return f"session_files:{session_id}"
7777

78+
def _get_file_links_key(self, session_id: str, file_id: str) -> str:
79+
"""Generate Redis key for aliases that reference a source file."""
80+
return f"file_links:{session_id}:{file_id}"
81+
82+
async def _register_link_reference(
83+
self,
84+
source_session_id: str,
85+
source_file_id: str,
86+
linked_session_id: str,
87+
linked_file_id: str,
88+
) -> None:
89+
"""Track a linked-input alias for cleanup safety."""
90+
links_key = self._get_file_links_key(source_session_id, source_file_id)
91+
ttl_seconds = settings.get_session_ttl_minutes() * 60
92+
await self.redis_client.sadd(links_key, f"{linked_session_id}:{linked_file_id}")
93+
await self.redis_client.expire(links_key, ttl_seconds)
94+
95+
async def _remove_link_reference(
96+
self,
97+
source_session_id: str,
98+
source_file_id: str,
99+
linked_session_id: str,
100+
linked_file_id: str,
101+
) -> None:
102+
"""Remove a linked-input alias reference."""
103+
links_key = self._get_file_links_key(source_session_id, source_file_id)
104+
await self.redis_client.srem(links_key, f"{linked_session_id}:{linked_file_id}")
105+
106+
async def _has_link_references(self, session_id: str, file_id: str) -> bool:
107+
"""Return True when other session aliases still reference a file."""
108+
links_key = self._get_file_links_key(session_id, file_id)
109+
return bool(await self.redis_client.smembers(links_key))
110+
111+
async def _find_linked_file(
112+
self, target_session_id: str, source_session_id: str, source_file_id: str
113+
) -> Optional[str]:
114+
"""Return an existing linked-input alias for the given source file."""
115+
session_files_key = self._get_session_files_key(target_session_id)
116+
file_ids = await self.redis_client.smembers(session_files_key)
117+
118+
for candidate_file_id in file_ids:
119+
metadata = await self.get_file_metadata(
120+
target_session_id, candidate_file_id
121+
)
122+
if not metadata:
123+
continue
124+
125+
if (
126+
metadata.get("type") == "linked_input"
127+
and metadata.get("source_session_id") == source_session_id
128+
and metadata.get("source_file_id") == source_file_id
129+
):
130+
return candidate_file_id
131+
132+
return None
133+
78134
async def _store_file_metadata(
79135
self, session_id: str, file_id: str, metadata: Dict[str, Any]
80136
) -> None:
@@ -317,6 +373,69 @@ async def list_files(self, session_id: str) -> List[FileInfo]:
317373
logger.error("Failed to list files", error=str(e), session_id=session_id)
318374
return []
319375

376+
async def link_file_into_session(
377+
self, target_session_id: str, source_session_id: str, source_file_id: str
378+
) -> Optional[FileInfo]:
379+
"""Create or reuse a read-only linked alias in the target session."""
380+
source_metadata = await self.get_file_metadata(
381+
source_session_id, source_file_id
382+
)
383+
if not source_metadata:
384+
logger.warning(
385+
"Cannot link missing source file",
386+
source_session_id=source_session_id,
387+
source_file_id=source_file_id,
388+
target_session_id=target_session_id,
389+
)
390+
return None
391+
392+
existing_linked_file_id = await self._find_linked_file(
393+
target_session_id, source_session_id, source_file_id
394+
)
395+
if existing_linked_file_id:
396+
return await self.get_file_info(target_session_id, existing_linked_file_id)
397+
398+
linked_file_id = generate_file_id()
399+
metadata = {
400+
"file_id": linked_file_id,
401+
"filename": source_metadata["filename"],
402+
"content_type": source_metadata["content_type"],
403+
"object_key": source_metadata["object_key"],
404+
"session_id": target_session_id,
405+
"created_at": datetime.utcnow().isoformat(),
406+
"size": source_metadata["size"],
407+
"path": source_metadata["path"],
408+
"type": "linked_input",
409+
"source_session_id": source_session_id,
410+
"source_file_id": source_file_id,
411+
"is_read_only": "1",
412+
}
413+
414+
await self._store_file_metadata(target_session_id, linked_file_id, metadata)
415+
await self._register_link_reference(
416+
source_session_id,
417+
source_file_id,
418+
target_session_id,
419+
linked_file_id,
420+
)
421+
422+
logger.debug(
423+
"Linked file into session",
424+
target_session_id=target_session_id,
425+
linked_file_id=linked_file_id,
426+
source_session_id=source_session_id,
427+
source_file_id=source_file_id,
428+
)
429+
430+
return FileInfo(
431+
file_id=linked_file_id,
432+
filename=metadata["filename"],
433+
size=metadata["size"],
434+
content_type=metadata["content_type"],
435+
created_at=datetime.fromisoformat(metadata["created_at"]),
436+
path=metadata["path"],
437+
)
438+
320439
async def download_file(self, session_id: str, file_id: str) -> Optional[str]:
321440
"""Generate download URL for a file."""
322441
metadata = await self.get_file_metadata(session_id, file_id)
@@ -353,6 +472,30 @@ async def delete_file(self, session_id: str, file_id: str) -> bool:
353472
if not metadata:
354473
return False
355474

475+
if metadata.get("type") == "linked_input":
476+
await self._delete_file_metadata(session_id, file_id)
477+
await self._remove_link_reference(
478+
metadata["source_session_id"],
479+
metadata["source_file_id"],
480+
session_id,
481+
file_id,
482+
)
483+
logger.debug(
484+
"Deleted linked file alias",
485+
session_id=session_id,
486+
file_id=file_id,
487+
)
488+
return True
489+
490+
if await self._has_link_references(session_id, file_id):
491+
await self._delete_file_metadata(session_id, file_id)
492+
logger.debug(
493+
"Deleted file metadata but retained shared object",
494+
session_id=session_id,
495+
file_id=file_id,
496+
)
497+
return True
498+
356499
object_key = metadata["object_key"]
357500

358501
try:
@@ -753,6 +896,12 @@ async def cleanup_orphan_objects(self, batch_limit: int = 1000) -> int:
753896
if object_session_id in active_session_ids:
754897
continue
755898

899+
source_file_id = parts[3] if len(parts) >= 4 else None
900+
if source_file_id and await self._has_link_references(
901+
object_session_id, source_file_id
902+
):
903+
continue
904+
756905
# Double-check via Redis existence in case index is stale
757906
if object_session_id not in checked_missing_sessions:
758907
try:
@@ -829,6 +978,14 @@ async def update_file_content(
829978
)
830979
return False
831980

981+
if metadata.get("is_read_only") == "1":
982+
logger.debug(
983+
"Skipping update for read-only file",
984+
session_id=session_id[:12],
985+
file_id=file_id,
986+
)
987+
return False
988+
832989
object_key = metadata.get("object_key")
833990
if not object_key:
834991
logger.warning(

src/services/interfaces.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ async def list_executions(
8383
class FileServiceInterface(ABC):
8484
"""Interface for file management service."""
8585

86+
@abstractmethod
87+
async def get_file_metadata(
88+
self, session_id: str, file_id: str
89+
) -> Optional[Dict[str, Any]]:
90+
"""Retrieve raw file metadata."""
91+
pass
92+
8693
@abstractmethod
8794
async def upload_file(
8895
self, session_id: str, request: FileUploadRequest
@@ -122,6 +129,13 @@ async def stream_file_to_path(
122129
"""Stream file content directly to a local file path."""
123130
pass
124131

132+
@abstractmethod
133+
async def link_file_into_session(
134+
self, target_session_id: str, source_session_id: str, source_file_id: str
135+
) -> Optional[FileInfo]:
136+
"""Create or reuse a read-only alias for a source file in another session."""
137+
pass
138+
125139
@abstractmethod
126140
async def delete_file(self, session_id: str, file_id: str) -> bool:
127141
"""Delete a file from storage."""

src/services/orchestrator.py

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
ExecutionServiceInterface,
4141
FileServiceInterface,
4242
)
43+
from .execution.output import OutputProcessor
4344
from .state import StateService
4445
from .state_archival import StateArchivalService
4546

@@ -317,19 +318,55 @@ async def _mount_files(self, ctx: ExecutionContext) -> List[Dict[str, Any]]:
317318
"""Mount files for code execution.
318319
319320
Behavior:
320-
1. If request.files[] is provided, mount those files (explicit mounting)
321-
2. If no request.files[] but session_id exists, auto-mount ALL session files
322-
3. If neither, return empty list
321+
1. Mount explicit file references from request.files[]
322+
2. Also auto-mount files already tracked in the current session
323+
3. Deduplicate by mounted filename with precedence:
324+
explicit refs > native current-session files > linked-input aliases
323325
"""
324-
# If explicit files provided, mount those (existing behavior)
326+
explicit_files = []
325327
if ctx.request.files:
326-
return await self._mount_explicit_files(ctx)
328+
explicit_files = await self._mount_explicit_files(ctx)
327329

328-
# Auto-mount all session files when session_id exists but no explicit files
330+
session_files = []
329331
if ctx.session_id:
330-
return await self._auto_mount_session_files(ctx)
332+
session_files = await self._auto_mount_session_files(ctx)
333+
334+
native_session_files = [
335+
file_info
336+
for file_info in session_files
337+
if not file_info.get("is_linked_input")
338+
]
339+
linked_session_files = [
340+
file_info for file_info in session_files if file_info.get("is_linked_input")
341+
]
342+
343+
return self._merge_mounted_files(
344+
explicit_files,
345+
native_session_files,
346+
linked_session_files,
347+
)
348+
349+
def _mount_dedupe_key(self, file_info: Dict[str, Any]) -> str:
350+
"""Return the normalized filename key used for mount precedence."""
351+
return OutputProcessor.sanitize_filename(file_info.get("filename", ""))
352+
353+
def _merge_mounted_files(
354+
self, *groups: List[Dict[str, Any]]
355+
) -> List[Dict[str, Any]]:
356+
"""Merge mounted file groups using filename-based precedence."""
357+
merged: List[Dict[str, Any]] = []
358+
mounted_names = set()
359+
360+
for group in groups:
361+
for file_info in group:
362+
dedupe_key = self._mount_dedupe_key(file_info)
363+
if not dedupe_key or dedupe_key in mounted_names:
364+
continue
331365

332-
return []
366+
merged.append(file_info)
367+
mounted_names.add(dedupe_key)
368+
369+
return merged
333370

334371
async def _mount_explicit_files(
335372
self, ctx: ExecutionContext
@@ -363,13 +400,21 @@ async def _mount_explicit_files(
363400
if key in mounted_ids:
364401
continue
365402

403+
if ctx.session_id and file_ref.session_id != ctx.session_id:
404+
await self.file_service.link_file_into_session(
405+
ctx.session_id,
406+
file_ref.session_id,
407+
file_info.file_id,
408+
)
409+
366410
mounted.append(
367411
{
368412
"file_id": file_info.file_id,
369413
"filename": file_info.filename,
370414
"path": file_info.path,
371415
"size": file_info.size,
372416
"session_id": file_ref.session_id,
417+
"is_linked_input": False,
373418
}
374419
)
375420
mounted_ids.add(key)
@@ -399,6 +444,13 @@ async def _auto_mount_session_files(
399444
session_files = await self.file_service.list_files(ctx.session_id)
400445

401446
for file_info in session_files:
447+
file_metadata = await self.file_service.get_file_metadata(
448+
ctx.session_id, file_info.file_id
449+
)
450+
is_linked_input = (
451+
file_metadata.get("type") == "linked_input" if file_metadata else False
452+
)
453+
402454
# Skip duplicates (shouldn't happen, but defensive)
403455
key = (ctx.session_id, file_info.file_id)
404456
if key in mounted_ids:
@@ -411,6 +463,7 @@ async def _auto_mount_session_files(
411463
"path": file_info.path,
412464
"size": file_info.size,
413465
"session_id": ctx.session_id,
466+
"is_linked_input": is_linked_input,
414467
}
415468
)
416469
mounted_ids.add(key)
@@ -600,6 +653,14 @@ async def _update_mounted_files_content(self, ctx: ExecutionContext) -> None:
600653
)
601654
continue
602655

656+
if file_metadata and file_metadata.get("is_read_only") == "1":
657+
logger.debug(
658+
"Skipping update for read-only linked file",
659+
filename=filename,
660+
file_id=file_id,
661+
)
662+
continue
663+
603664
# Read current content from container
604665
file_path = f"/mnt/data/{filename}"
605666
content = sandbox_manager.get_file_content_from_sandbox(

0 commit comments

Comments
 (0)