diff --git a/CHANGELOG.md b/CHANGELOG.md index dfd89cb..c247f7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](https://semver.org/) +## [Unreleases] + +### Added + +- `File` entities: add `entry_path` attribute and `read_stream` method + ## [4.10.2] 2025-05-15 - shipped with DI v25.1.1 ### Fixed diff --git a/cmem_plugin_base/dataintegration/typed_entities/file.py b/cmem_plugin_base/dataintegration/typed_entities/file.py index e32aa10..ec2bf68 100644 --- a/cmem_plugin_base/dataintegration/typed_entities/file.py +++ b/cmem_plugin_base/dataintegration/typed_entities/file.py @@ -1,5 +1,13 @@ """File entities""" +import zipfile +from abc import abstractmethod +from io import BytesIO +from pathlib import Path +from typing import IO + +from cmem.cmempy.workspace.projects.resources.resource import get_resource_response + from cmem_plugin_base.dataintegration.entity import Entity, EntityPath from cmem_plugin_base.dataintegration.typed_entities import instance_uri, path_uri, type_uri from cmem_plugin_base.dataintegration.typed_entities.typed_entities import ( @@ -8,26 +16,83 @@ class File: - """A file entity that can be held in a FileEntitySchema.""" + """A file entity that can be held in a FileEntitySchema. + + :param path: The file path. + :param file_type: The type of the file (one of: "Local", "Project"). + :param mime: The MIME type of the file, if known. + :param entry_path: If the file path points to a archive, the entry within the archive. + """ - def __init__(self, path: str, file_type: str, mime: str | None) -> None: + def __init__(self, path: str, file_type: str, mime: str | None, entry_path: str | None) -> None: self.path = path self.file_type = file_type self.mime = mime + self.entry_path = entry_path + + @abstractmethod + def read_stream(self, project_id: str) -> IO[bytes]: + """Open the referenced file as a stream. + + Returns a file-like object (stream) in binary mode. + Caller is responsible for closing the stream. + """ class LocalFile(File): """A file that's located on the local file system.""" - def __init__(self, path: str, mime: str | None = None) -> None: - super().__init__(path, "Local", mime) + def __init__(self, path: str, mime: str | None = None, entry_path: str | None = None) -> None: + super().__init__(path, "Local", mime, entry_path) + + def read_stream(self, project_id: str) -> IO[bytes]: + """Open the referenced file as a stream. + + Returns a file-like object (stream) in binary mode. + Caller is responsible for closing the stream. + """ + if self.entry_path: + archive = zipfile.ZipFile(self.path, "r") + try: + return archive.open(self.entry_path, "r") + except KeyError as err: + archive.close() + raise FileNotFoundError( + f"Entry '{self.entry_path}' not found in archive '{self.path}'." + ) from err + else: + if not Path(self.path).is_file(): + raise FileNotFoundError(f"File '{self.path}' does not exist.") + return Path(self.path).open("rb") class ProjectFile(File): """A project file""" - def __init__(self, path: str, mime: str | None = None) -> None: - super().__init__(path, "Project", mime) + def __init__(self, path: str, mime: str | None = None, entry_path: str | None = None) -> None: + super().__init__(path, "Project", mime, entry_path) + + def read_stream(self, project_id: str) -> IO[bytes]: + """Open the referenced file as a stream. + + Returns a file-like object (stream) in binary mode. + Caller is responsible for closing the stream. + """ + response = get_resource_response(project_id, self.path) + if response.status_code != 200: # noqa: PLR2004 + raise FileNotFoundError(f"Project file '{self.path}' not found.") + response_bytes = BytesIO(response.raw.read()) + if self.entry_path: + archive = zipfile.ZipFile(response_bytes, "r") + try: + return archive.open(self.entry_path, "r") + except KeyError as err: + archive.close() + raise FileNotFoundError( + f"Entry '{self.entry_path}' not found in project file '{self.path}'." + ) from err + else: + return response_bytes class FileEntitySchema(TypedEntitySchema[File]): @@ -40,6 +105,7 @@ def __init__(self): EntityPath(path_uri("filePath"), is_single_value=True), EntityPath(path_uri("fileType"), is_single_value=True), EntityPath(path_uri("mimeType"), is_single_value=True), + EntityPath(path_uri("entryPath"), is_single_value=True), ], ) @@ -47,7 +113,12 @@ def to_entity(self, value: File) -> Entity: """Create a generic entity from a file""" return Entity( uri=instance_uri(value.path), - values=[[value.path], [value.file_type], [value.mime] if value.mime else []], + values=[ + [value.path], + [value.file_type], + [value.mime] if value.mime else [], + [value.entry_path] if value.entry_path else [], + ], ) def from_entity(self, entity: Entity) -> File: @@ -55,10 +126,12 @@ def from_entity(self, entity: Entity) -> File: path = entity.values[0][0] file_type = entity.values[1][0] mime = entity.values[2][0] if entity.values[2] and entity.values[2][0] else None + entry_path = entity.values[3][0] if entity.values[3] and entity.values[3][0] else None + match file_type: case "Local": - return LocalFile(path, mime) + return LocalFile(path, mime, entry_path) case "Project": - return ProjectFile(path, mime) + return ProjectFile(path, mime, entry_path) case _: raise ValueError(f"File '{path}' has unexpected type '{file_type}'.") diff --git a/tests/typed_entities/test_typed_entities.py b/tests/typed_entities/test_typed_entities.py index d8202b6..529fc8f 100644 --- a/tests/typed_entities/test_typed_entities.py +++ b/tests/typed_entities/test_typed_entities.py @@ -12,6 +12,7 @@ from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin from cmem_plugin_base.dataintegration.ports import FixedNumberOfInputs, FixedSchemaPort from cmem_plugin_base.dataintegration.typed_entities.file import FileEntitySchema, LocalFile +from cmem_plugin_base.testing import TestTaskContext class ConcatFilesOperator(WorkflowPlugin): @@ -29,9 +30,8 @@ def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Enti output_name = o_file.name for file in input_files.values: if isinstance(file, LocalFile): - with Path(file.path).open("rb") as f: - contents = f.read() - o_file.write(contents) + with file.read_stream(context.task.project_id()) as in_stream: + o_file.write(in_stream.read()) return FileEntitySchema().to_entities(iter([LocalFile(output_name)])) @@ -51,7 +51,9 @@ def test_files(self) -> None: input_entities = FileEntitySchema().to_entities( iter([LocalFile(temp1.name), LocalFile(temp2.name)]) ) - output = ConcatFilesOperator().execute([input_entities], ExecutionContext()) + context = ExecutionContext() + context.task = TestTaskContext(project_id="TestProject", task_id="TestTask") + output = ConcatFilesOperator().execute([input_entities], context) # Check output assert output is not None @@ -63,15 +65,15 @@ def test_files(self) -> None: def test_file_entity_conversion(self) -> None: """Test conversion from entity to file""" - file_entity = Entity(uri="test.uri", values=[["test.txt"], ["Project"], []]) + file_entity = Entity(uri="test.uri", values=[["test.txt"], ["Local"], [], []]) assert FileEntitySchema().from_entity(file_entity) - file_entity = Entity(uri="test.uri", values=[["test.txt"], ["Project"], [""]]) + file_entity = Entity(uri="test.uri", values=[["test.txt"], ["Local"], [""], [""]]) assert FileEntitySchema().from_entity(file_entity) with pytest.raises(ValueError, match="File 'test.txt' has unexpected type 'Wrong Type'"): FileEntitySchema().from_entity( - Entity(uri="test.uri", values=[["test.txt"], ["Wrong Type"], []]) + Entity(uri="test.uri", values=[["test.txt"], ["Wrong Type"], [], []]) )