Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](https://semver.org/)

## [Unreleases]

### Added

- `File` entities: add `entry_path` attribute and `read_stream` method

## [4.10.2] 2025-05-15 - shipped with DI v25.1.1

### Fixed
Expand Down
91 changes: 82 additions & 9 deletions cmem_plugin_base/dataintegration/typed_entities/file.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
"""File entities"""

import zipfile
from abc import abstractmethod
from io import BytesIO
from pathlib import Path
from typing import IO

from cmem.cmempy.workspace.projects.resources.resource import get_resource_response

from cmem_plugin_base.dataintegration.entity import Entity, EntityPath
from cmem_plugin_base.dataintegration.typed_entities import instance_uri, path_uri, type_uri
from cmem_plugin_base.dataintegration.typed_entities.typed_entities import (
Expand All @@ -8,26 +16,83 @@


class File:
"""A file entity that can be held in a FileEntitySchema."""
"""A file entity that can be held in a FileEntitySchema.

:param path: The file path.
:param file_type: The type of the file (one of: "Local", "Project").
:param mime: The MIME type of the file, if known.
:param entry_path: If the file path points to a archive, the entry within the archive.
"""

def __init__(self, path: str, file_type: str, mime: str | None) -> None:
def __init__(self, path: str, file_type: str, mime: str | None, entry_path: str | None) -> None:
self.path = path
self.file_type = file_type
self.mime = mime
self.entry_path = entry_path

@abstractmethod
def read_stream(self, project_id: str) -> IO[bytes]:
"""Open the referenced file as a stream.

Returns a file-like object (stream) in binary mode.
Caller is responsible for closing the stream.
"""


class LocalFile(File):
"""A file that's located on the local file system."""

def __init__(self, path: str, mime: str | None = None) -> None:
super().__init__(path, "Local", mime)
def __init__(self, path: str, mime: str | None = None, entry_path: str | None = None) -> None:
super().__init__(path, "Local", mime, entry_path)

def read_stream(self, project_id: str) -> IO[bytes]:
"""Open the referenced file as a stream.

Returns a file-like object (stream) in binary mode.
Caller is responsible for closing the stream.
"""
if self.entry_path:
archive = zipfile.ZipFile(self.path, "r")
try:
return archive.open(self.entry_path, "r")
except KeyError as err:
archive.close()
raise FileNotFoundError(
f"Entry '{self.entry_path}' not found in archive '{self.path}'."
) from err
else:
if not Path(self.path).is_file():
raise FileNotFoundError(f"File '{self.path}' does not exist.")
return Path(self.path).open("rb")


class ProjectFile(File):
"""A project file"""

def __init__(self, path: str, mime: str | None = None) -> None:
super().__init__(path, "Project", mime)
def __init__(self, path: str, mime: str | None = None, entry_path: str | None = None) -> None:
super().__init__(path, "Project", mime, entry_path)

def read_stream(self, project_id: str) -> IO[bytes]:
"""Open the referenced file as a stream.

Returns a file-like object (stream) in binary mode.
Caller is responsible for closing the stream.
"""
response = get_resource_response(project_id, self.path)
if response.status_code != 200: # noqa: PLR2004
raise FileNotFoundError(f"Project file '{self.path}' not found.")
response_bytes = BytesIO(response.raw.read())
if self.entry_path:
archive = zipfile.ZipFile(response_bytes, "r")
try:
return archive.open(self.entry_path, "r")
except KeyError as err:
archive.close()
raise FileNotFoundError(
f"Entry '{self.entry_path}' not found in project file '{self.path}'."
) from err
else:
return response_bytes


class FileEntitySchema(TypedEntitySchema[File]):
Expand All @@ -40,25 +105,33 @@ def __init__(self):
EntityPath(path_uri("filePath"), is_single_value=True),
EntityPath(path_uri("fileType"), is_single_value=True),
EntityPath(path_uri("mimeType"), is_single_value=True),
EntityPath(path_uri("entryPath"), is_single_value=True),
],
)

def to_entity(self, value: File) -> Entity:
"""Create a generic entity from a file"""
return Entity(
uri=instance_uri(value.path),
values=[[value.path], [value.file_type], [value.mime] if value.mime else []],
values=[
[value.path],
[value.file_type],
[value.mime] if value.mime else [],
[value.entry_path] if value.entry_path else [],
],
)

def from_entity(self, entity: Entity) -> File:
"""Create a file entity from a generic entity."""
path = entity.values[0][0]
file_type = entity.values[1][0]
mime = entity.values[2][0] if entity.values[2] and entity.values[2][0] else None
entry_path = entity.values[3][0] if entity.values[3] and entity.values[3][0] else None

match file_type:
case "Local":
return LocalFile(path, mime)
return LocalFile(path, mime, entry_path)
case "Project":
return ProjectFile(path, mime)
return ProjectFile(path, mime, entry_path)
case _:
raise ValueError(f"File '{path}' has unexpected type '{file_type}'.")
16 changes: 9 additions & 7 deletions tests/typed_entities/test_typed_entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin
from cmem_plugin_base.dataintegration.ports import FixedNumberOfInputs, FixedSchemaPort
from cmem_plugin_base.dataintegration.typed_entities.file import FileEntitySchema, LocalFile
from cmem_plugin_base.testing import TestTaskContext


class ConcatFilesOperator(WorkflowPlugin):
Expand All @@ -29,9 +30,8 @@ def execute(self, inputs: Sequence[Entities], context: ExecutionContext) -> Enti
output_name = o_file.name
for file in input_files.values:
if isinstance(file, LocalFile):
with Path(file.path).open("rb") as f:
contents = f.read()
o_file.write(contents)
with file.read_stream(context.task.project_id()) as in_stream:
o_file.write(in_stream.read())

return FileEntitySchema().to_entities(iter([LocalFile(output_name)]))

Expand All @@ -51,7 +51,9 @@ def test_files(self) -> None:
input_entities = FileEntitySchema().to_entities(
iter([LocalFile(temp1.name), LocalFile(temp2.name)])
)
output = ConcatFilesOperator().execute([input_entities], ExecutionContext())
context = ExecutionContext()
context.task = TestTaskContext(project_id="TestProject", task_id="TestTask")
output = ConcatFilesOperator().execute([input_entities], context)

# Check output
assert output is not None
Expand All @@ -63,15 +65,15 @@ def test_files(self) -> None:

def test_file_entity_conversion(self) -> None:
"""Test conversion from entity to file"""
file_entity = Entity(uri="test.uri", values=[["test.txt"], ["Project"], []])
file_entity = Entity(uri="test.uri", values=[["test.txt"], ["Local"], [], []])
assert FileEntitySchema().from_entity(file_entity)

file_entity = Entity(uri="test.uri", values=[["test.txt"], ["Project"], [""]])
file_entity = Entity(uri="test.uri", values=[["test.txt"], ["Local"], [""], [""]])
assert FileEntitySchema().from_entity(file_entity)

with pytest.raises(ValueError, match="File 'test.txt' has unexpected type 'Wrong Type'"):
FileEntitySchema().from_entity(
Entity(uri="test.uri", values=[["test.txt"], ["Wrong Type"], []])
Entity(uri="test.uri", values=[["test.txt"], ["Wrong Type"], [], []])
)


Expand Down