From 3566b5d3e3be0d6866af60aaf7b645bb334345ea Mon Sep 17 00:00:00 2001 From: Kristof Herrmann Date: Sun, 14 Apr 2024 11:28:49 +0200 Subject: [PATCH 01/11] feat: get pipeline files --- deepset_cloud_sdk/_api/pipelines.py | 61 +++++++++++++++++++ deepset_cloud_sdk/_service/files_service.py | 1 + .../_service/pipeline_service.py | 44 +++++++++++++ deepset_cloud_sdk/cli.py | 39 ++++++++++++ .../workflows/async_client/files.py | 55 +++++++++++++++++ .../workflows/sync_client/files.py | 40 ++++++++++++ 6 files changed, 240 insertions(+) create mode 100644 deepset_cloud_sdk/_api/pipelines.py create mode 100644 deepset_cloud_sdk/_service/pipeline_service.py diff --git a/deepset_cloud_sdk/_api/pipelines.py b/deepset_cloud_sdk/_api/pipelines.py new file mode 100644 index 00000000..aaf7c9b4 --- /dev/null +++ b/deepset_cloud_sdk/_api/pipelines.py @@ -0,0 +1,61 @@ +""" +Pipeline API for deepset Cloud. + +This module takes care of all pipeline-related API calls to deepset Cloud. +""" + +from enum import Enum +from typing import Dict, List +from uuid import UUID + +import structlog +from httpx import codes + +from deepset_cloud_sdk._api.deepset_cloud_api import DeepsetCloudAPI + +logger = structlog.get_logger(__name__) + + +class FileIndexingStatus(str, Enum): + """File indexing status""" + + FAILED = "FAILED" + INDEXED_NO_DOCUMENTS = "INDEXED_NO_DOCUMENTS" + + +class PipelineFilesNotFound(Exception): + """Failed fetching pipeline files""" + + +class PipelinesAPI: + """Pipeline API for deepset Cloud. + + This module takes care of all pipeline-related API calls to deepset Cloud. + + :param deepset_cloud_api: Instance of the DeepsetCloudAPI. + """ + + def __init__(self, deepset_cloud_api: DeepsetCloudAPI) -> None: + """ + Create FileAPI object. + + :param deepset_cloud_api: Instance of the DeepsetCloudAPI. + """ + self._deepset_cloud_api = deepset_cloud_api + + async def get_pipeline_files( + self, pipeline_name: str, workspace_name: str, status: FileIndexingStatus = FileIndexingStatus.FAILED + ) -> List[UUID]: + """Get file ids that failed or did not create documents during indexing. + + :param pipeline_name: Name of the pipeline that indexed files. + :param workspace_name: Name of the workspace. + :param status: Status that should be used for fetching files + """ + + params: Dict[str, str] = {"status": status} + response = await self._deepset_cloud_api.get(workspace_name, f"pipelines/{pipeline_name}/files", params=params) + if response.status_code != codes.OK: + raise PipelineFilesNotFound(response.text) + file_ids: List[UUID] = response.json() + return file_ids diff --git a/deepset_cloud_sdk/_service/files_service.py b/deepset_cloud_sdk/_service/files_service.py index 81020d38..486d2b08 100644 --- a/deepset_cloud_sdk/_service/files_service.py +++ b/deepset_cloud_sdk/_service/files_service.py @@ -1,4 +1,5 @@ """Module for all file-related operations.""" + from __future__ import annotations import asyncio diff --git a/deepset_cloud_sdk/_service/pipeline_service.py b/deepset_cloud_sdk/_service/pipeline_service.py new file mode 100644 index 00000000..b764c776 --- /dev/null +++ b/deepset_cloud_sdk/_service/pipeline_service.py @@ -0,0 +1,44 @@ +"""Module for all file-related operations.""" + +from __future__ import annotations + +from contextlib import asynccontextmanager +from typing import AsyncGenerator, List +from uuid import UUID + +import structlog + +from deepset_cloud_sdk._api.config import CommonConfig +from deepset_cloud_sdk._api.deepset_cloud_api import DeepsetCloudAPI +from deepset_cloud_sdk._api.pipelines import FileIndexingStatus, PipelinesAPI + +logger = structlog.get_logger(__name__) + + +class PipelinesService: + """Service for all pipeline-related operations.""" + + def __init__(self, pipelines: PipelinesAPI): + """Initialize the service. + + :param pipelines: API for pipelines. + """ + self._pipelines = pipelines + + @classmethod + @asynccontextmanager + async def factory(cls, config: CommonConfig) -> AsyncGenerator[PipelinesService, None]: + """Create a new instance of the service. + + :param config: CommonConfig object. + :return: New instance of the service. + """ + async with DeepsetCloudAPI.factory(config) as deepset_cloud_api: + yield cls(PipelinesAPI(deepset_cloud_api)) + + async def get_pipeline_file_ids( + self, pipeline_name: str, workspace_name: str, status: FileIndexingStatus = FileIndexingStatus.FAILED + ) -> List[UUID]: + return await self._pipelines.get_pipeline_files( + pipeline_name=pipeline_name, workspace_name=workspace_name, status=status + ) diff --git a/deepset_cloud_sdk/cli.py b/deepset_cloud_sdk/cli.py index 97d48f74..279901ae 100644 --- a/deepset_cloud_sdk/cli.py +++ b/deepset_cloud_sdk/cli.py @@ -1,4 +1,5 @@ """The CLI for the deepset Cloud SDK.""" + import json import os from typing import List, Optional @@ -10,6 +11,9 @@ from deepset_cloud_sdk.__about__ import __version__ from deepset_cloud_sdk._api.config import DEFAULT_WORKSPACE_NAME, ENV_FILE_PATH from deepset_cloud_sdk.workflows.sync_client.files import download as sync_download +from deepset_cloud_sdk.workflows.sync_client.files import ( + download_pipeline_files as sync_download_pipeline_files, +) from deepset_cloud_sdk.workflows.sync_client.files import ( get_upload_session as sync_get_upload_session, ) @@ -25,6 +29,41 @@ cli_app.command()(upload) +@cli_app.command() +def download_pipeline_files( # pylint: disable=too-many-arguments + pipeline_name: str, + workspace_name: str = DEFAULT_WORKSPACE_NAME, + file_dir: Optional[str] = None, + batch_size: int = 50, + api_key: Optional[str] = None, + api_url: Optional[str] = None, + show_progress: bool = True, + timeout_s: Optional[int] = None, +) -> None: + """Download all files that led to a error status during indexing. + + :param pipeline_name: Name of the pipeline. + :param workspace_name: Name of the workspace to upload the files to. It uses the workspace from the .ENV file by default. + :param file_dir: Path to the folder to download. If the folder contains unsupported files, they're skipped. + during the upload. Supported file formats are TXT and PDF. + :param batch_size: Batch size for the listing. + :param api_key: API key to use for authentication. + :param api_url: API URL to use for authentication. + :param show_progress: Shows the upload progress. + :param timeout_s: Timeout in seconds for the download. + """ + sync_download_pipeline_files( + workspace_name=workspace_name, + pipeline_name=pipeline_name, + file_dir=file_dir, + batch_size=batch_size, + api_key=api_key, + api_url=api_url, + show_progress=show_progress, + timeout_s=timeout_s, + ) + + @cli_app.command() def download( # pylint: disable=too-many-arguments workspace_name: str = DEFAULT_WORKSPACE_NAME, diff --git a/deepset_cloud_sdk/workflows/async_client/files.py b/deepset_cloud_sdk/workflows/async_client/files.py index 71c67c33..2983e332 100644 --- a/deepset_cloud_sdk/workflows/async_client/files.py +++ b/deepset_cloud_sdk/workflows/async_client/files.py @@ -13,6 +13,7 @@ CommonConfig, ) from deepset_cloud_sdk._api.files import File +from deepset_cloud_sdk._api.pipelines import FileIndexingStatus from deepset_cloud_sdk._api.upload_sessions import ( UploadSessionDetail, UploadSessionStatus, @@ -20,6 +21,7 @@ ) from deepset_cloud_sdk._s3.upload import S3UploadSummary from deepset_cloud_sdk._service.files_service import DeepsetCloudFile, FilesService +from deepset_cloud_sdk._service.pipeline_service import PipelinesService def _get_config(api_key: Optional[str] = None, api_url: Optional[str] = None) -> CommonConfig: @@ -202,6 +204,59 @@ async def download( ) +async def download_pipeline_files( + pipeline_name: str, + status: FileIndexingStatus = FileIndexingStatus.FAILED, + workspace_name: str = DEFAULT_WORKSPACE_NAME, + file_dir: Optional[Union[Path, str]] = None, + batch_size: int = 50, + api_key: Optional[str] = None, + api_url: Optional[str] = None, + show_progress: bool = True, + timeout_s: Optional[int] = None, +) -> None: + """Download all files that led to a error status during indexing. + + :param pipeline_name: Name of the pipeline. + :param workspace_name: Name of the workspace to upload the files to. It uses the workspace from the .ENV file by default. + :param file_dir: Path to the folder to download. If the folder contains unsupported files, they're skipped. + during the upload. Supported file formats are TXT and PDF. + :param batch_size: Batch size for the listing. + :param api_key: API key to use for authentication. + :param api_url: API URL to use for authentication. + :param show_progress: Shows the upload progress. + :param timeout_s: Timeout in seconds for the download. + """ + file_ids: List[UUID] + + if file_dir is None: + file_dir = Path.cwd() + if isinstance(file_dir, str): + file_dir = Path(file_dir).resolve() + + for status in FileIndexingStatus: + status_file_dir = Path(str(file_dir) + f"/{status.value}") + + async with PipelinesService.factory(_get_config(api_key=api_key, api_url=api_url)) as pipeline_service: + file_ids = await pipeline_service.get_pipeline_file_ids( + pipeline_name=pipeline_name, workspace_name=workspace_name, status=status + ) + + async with FilesService.factory(_get_config(api_key=api_key, api_url=api_url)) as file_service: + # WARNING: This filter might explode if we have many files that failed or did not + # create documents. We need to use the download method to get the file names here. + file_ids_filters = ",".join([str(_id) for _id in file_ids]) + await file_service.download( + workspace_name=workspace_name, + file_dir=status_file_dir, + odata_filter=f"file_id in '{file_ids_filters}'", + include_meta=True, + batch_size=batch_size, + show_progress=show_progress, + timeout_s=timeout_s, + ) + + async def upload_texts( files: List[DeepsetCloudFile], api_key: Optional[str] = None, diff --git a/deepset_cloud_sdk/workflows/sync_client/files.py b/deepset_cloud_sdk/workflows/sync_client/files.py index bca3a6a8..86bd1798 100644 --- a/deepset_cloud_sdk/workflows/sync_client/files.py +++ b/deepset_cloud_sdk/workflows/sync_client/files.py @@ -1,4 +1,5 @@ """Sync client for files workflow.""" + import asyncio from pathlib import Path from typing import Generator, List, Optional, Union @@ -16,6 +17,9 @@ from deepset_cloud_sdk._s3.upload import S3UploadSummary from deepset_cloud_sdk._service.files_service import DeepsetCloudFile from deepset_cloud_sdk.workflows.async_client.files import download as async_download +from deepset_cloud_sdk.workflows.async_client.files import ( + download_pipeline_files as async_download_pipeline_files, +) from deepset_cloud_sdk.workflows.async_client.files import ( get_upload_session as async_get_upload_session, ) @@ -124,6 +128,42 @@ def download( # pylint: disable=too-many-arguments ) +def download_pipeline_files( + pipeline_name: str, + workspace_name: str = DEFAULT_WORKSPACE_NAME, + file_dir: Optional[Union[Path, str]] = None, + batch_size: int = 50, + api_key: Optional[str] = None, + api_url: Optional[str] = None, + show_progress: bool = True, + timeout_s: Optional[int] = None, +) -> None: + """Download all files that led to a error status during indexing. + + :param pipeline_name: Name of the pipeline. + :param workspace_name: Name of the workspace to upload the files to. It uses the workspace from the .ENV file by default. + :param file_dir: Path to the folder to download. If the folder contains unsupported files, they're skipped. + during the upload. Supported file formats are TXT and PDF. + :param batch_size: Batch size for the listing. + :param api_key: API key to use for authentication. + :param api_url: API URL to use for authentication. + :param show_progress: Shows the upload progress. + :param timeout_s: Timeout in seconds for the download. + """ + asyncio.run( + async_download_pipeline_files( + api_key=api_key, + api_url=api_url, + workspace_name=workspace_name, + pipeline_name=pipeline_name, + file_dir=file_dir, + batch_size=batch_size, + show_progress=show_progress, + timeout_s=timeout_s, + ) + ) + + def upload_texts( files: List[DeepsetCloudFile], api_key: Optional[str] = None, From 70de75fb33a7338f5dbefe6f61b092ba245c99ce Mon Sep 17 00:00:00 2001 From: Kristof Herrmann Date: Sun, 14 Apr 2024 11:39:53 +0200 Subject: [PATCH 02/11] tests: pipeline service --- deepset_cloud_sdk/_api/pipelines.py | 2 +- .../_service/pipeline_service.py | 2 +- tests/conftest.py | 6 +++ tests/unit/service/test_pipelines_service.py | 42 +++++++++++++++++++ 4 files changed, 50 insertions(+), 2 deletions(-) create mode 100644 tests/unit/service/test_pipelines_service.py diff --git a/deepset_cloud_sdk/_api/pipelines.py b/deepset_cloud_sdk/_api/pipelines.py index aaf7c9b4..fe984b9b 100644 --- a/deepset_cloud_sdk/_api/pipelines.py +++ b/deepset_cloud_sdk/_api/pipelines.py @@ -43,7 +43,7 @@ def __init__(self, deepset_cloud_api: DeepsetCloudAPI) -> None: """ self._deepset_cloud_api = deepset_cloud_api - async def get_pipeline_files( + async def get_pipeline_file_ids( self, pipeline_name: str, workspace_name: str, status: FileIndexingStatus = FileIndexingStatus.FAILED ) -> List[UUID]: """Get file ids that failed or did not create documents during indexing. diff --git a/deepset_cloud_sdk/_service/pipeline_service.py b/deepset_cloud_sdk/_service/pipeline_service.py index b764c776..278b2601 100644 --- a/deepset_cloud_sdk/_service/pipeline_service.py +++ b/deepset_cloud_sdk/_service/pipeline_service.py @@ -39,6 +39,6 @@ async def factory(cls, config: CommonConfig) -> AsyncGenerator[PipelinesService, async def get_pipeline_file_ids( self, pipeline_name: str, workspace_name: str, status: FileIndexingStatus = FileIndexingStatus.FAILED ) -> List[UUID]: - return await self._pipelines.get_pipeline_files( + return await self._pipelines.get_pipeline_file_ids( pipeline_name=pipeline_name, workspace_name=workspace_name, status=status ) diff --git a/tests/conftest.py b/tests/conftest.py index 6c317afc..4832b47a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,6 +7,7 @@ from uuid import uuid4 import httpx +from deepset_cloud_sdk._api.pipelines import PipelinesAPI import pytest import structlog from dotenv import load_dotenv @@ -71,6 +72,11 @@ def mocked_upload_sessions_api() -> Mock: return Mock(spec=UploadSessionsAPI) +@pytest.fixture +def mocked_pipelines() -> Mock: + return Mock(spec=PipelinesAPI) + + @pytest.fixture def mocked_files_api() -> Mock: return Mock(spec=FilesAPI) diff --git a/tests/unit/service/test_pipelines_service.py b/tests/unit/service/test_pipelines_service.py new file mode 100644 index 00000000..983bd314 --- /dev/null +++ b/tests/unit/service/test_pipelines_service.py @@ -0,0 +1,42 @@ +from unittest.mock import AsyncMock, Mock +from uuid import UUID + +from deepset_cloud_sdk._api.pipelines import FileIndexingStatus +from deepset_cloud_sdk._service.pipeline_service import PipelinesService +import pytest + +from deepset_cloud_sdk._api.config import CommonConfig + + +@pytest.fixture +def pipelines_service(mocked_pipelines: Mock) -> PipelinesService: + return PipelinesService(pipelines=mocked_pipelines) + + +@pytest.mark.asyncio +class TestUtilsPipelinesService: + async def test_factory(self, unit_config: CommonConfig) -> None: + async with PipelinesService.factory(unit_config) as pipelines_service: + assert isinstance(pipelines_service, PipelinesService) + + +@pytest.mark.asyncio +class TestGetFileIds: + async def test_get_file_ids( + self, + mocked_pipelines: Mock, + pipeline_service: PipelinesService, + ) -> None: + mocked_pipelines.get_pipeline_file_ids = AsyncMock(return_value=[UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10")]) + returned_file_ids = await pipeline_service.get_pipeline_file_ids( + workspace_name="test_workspace", + pipeline_name="test_pipeline", + status=FileIndexingStatus.INDEXED_NO_DOCUMENTS, + ) + assert returned_file_ids == [UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10")], "Unexpected file ids" + + mocked_pipelines.get_pipeline_file_ids.assert_called_once_with( + pipeline_name="test_pipeline", + workspace_name="test_workspace", + status=FileIndexingStatus.INDEXED_NO_DOCUMENTS, + ) From bb67cf0ca5885e92eadc56757f42d8fc145bc911 Mon Sep 17 00:00:00 2001 From: Kristof Herrmann Date: Sun, 14 Apr 2024 11:49:52 +0200 Subject: [PATCH 03/11] tests: pipelines api --- deepset_cloud_sdk/_api/pipelines.py | 14 +++-- tests/conftest.py | 2 +- tests/unit/api/test_pipelines.py | 60 ++++++++++++++++++++ tests/unit/service/test_pipelines_service.py | 4 +- 4 files changed, 73 insertions(+), 7 deletions(-) create mode 100644 tests/unit/api/test_pipelines.py diff --git a/deepset_cloud_sdk/_api/pipelines.py b/deepset_cloud_sdk/_api/pipelines.py index fe984b9b..07a90a47 100644 --- a/deepset_cloud_sdk/_api/pipelines.py +++ b/deepset_cloud_sdk/_api/pipelines.py @@ -23,8 +23,12 @@ class FileIndexingStatus(str, Enum): INDEXED_NO_DOCUMENTS = "INDEXED_NO_DOCUMENTS" -class PipelineFilesNotFound(Exception): - """Failed fetching pipeline files""" +class PipelineNotFoundException(Exception): + """Raised if pipeline was not found.""" + + +class FailedToFetchFileIdsException(Exception): + """Failed fetching pipeline files.""" class PipelinesAPI: @@ -55,7 +59,9 @@ async def get_pipeline_file_ids( params: Dict[str, str] = {"status": status} response = await self._deepset_cloud_api.get(workspace_name, f"pipelines/{pipeline_name}/files", params=params) + if response.status_code == codes.NOT_FOUND: + raise PipelineNotFoundException() if response.status_code != codes.OK: - raise PipelineFilesNotFound(response.text) - file_ids: List[UUID] = response.json() + raise FailedToFetchFileIdsException(response.text) + file_ids: List[UUID] = [UUID(_id) for _id in response.json()] return file_ids diff --git a/tests/conftest.py b/tests/conftest.py index 4832b47a..e00dc252 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,7 +7,6 @@ from uuid import uuid4 import httpx -from deepset_cloud_sdk._api.pipelines import PipelinesAPI import pytest import structlog from dotenv import load_dotenv @@ -17,6 +16,7 @@ from deepset_cloud_sdk._api.config import CommonConfig from deepset_cloud_sdk._api.deepset_cloud_api import DeepsetCloudAPI from deepset_cloud_sdk._api.files import FilesAPI +from deepset_cloud_sdk._api.pipelines import PipelinesAPI from deepset_cloud_sdk._api.upload_sessions import ( AWSPrefixedRequestConfig, UploadSession, diff --git a/tests/unit/api/test_pipelines.py b/tests/unit/api/test_pipelines.py new file mode 100644 index 00000000..14c716bd --- /dev/null +++ b/tests/unit/api/test_pipelines.py @@ -0,0 +1,60 @@ +from unittest.mock import Mock +from uuid import UUID + +import httpx +import pytest + +from deepset_cloud_sdk._api.pipelines import ( + FailedToFetchFileIdsException, + FileIndexingStatus, + PipelineNotFoundException, + PipelinesAPI, +) + + +@pytest.fixture +def pipelines_api(mocked_deepset_cloud_api: Mock) -> PipelinesAPI: + return PipelinesAPI(mocked_deepset_cloud_api) + + +@pytest.mark.asyncio +class TestGetPipelineFileIDs: + async def test_get_pipeline_file_ids(self, pipelines_api: PipelinesAPI, mocked_deepset_cloud_api: Mock) -> None: + mocked_deepset_cloud_api.get.return_value = httpx.Response( + status_code=httpx.codes.OK, + json=["cd16435f-f6eb-423f-bf6f-994dc8a36a10", "cd16435f-f6eb-423f-bf6f-994dc8a36a13"], + ) + result = await pipelines_api.get_pipeline_file_ids( + workspace_name="test_workspace", + pipeline_name="test_pipeline", + status=FileIndexingStatus.INDEXED_NO_DOCUMENTS, + ) + assert result == [UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10"), UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a13")] + + async def test_get_pipeline_file_ids_with_not_found_pipeline( + self, pipelines_api: PipelinesAPI, mocked_deepset_cloud_api: Mock + ) -> None: + mocked_deepset_cloud_api.get.return_value = httpx.Response( + status_code=httpx.codes.NOT_FOUND, + json=[], + ) + with pytest.raises(PipelineNotFoundException): + await pipelines_api.get_pipeline_file_ids( + workspace_name="test_workspace", + pipeline_name="test_pipeline", + status=FileIndexingStatus.INDEXED_NO_DOCUMENTS, + ) + + async def test_get_pipeline_file_ids_with_api_error( + self, pipelines_api: PipelinesAPI, mocked_deepset_cloud_api: Mock + ) -> None: + mocked_deepset_cloud_api.get.return_value = httpx.Response( + status_code=httpx.codes.SERVICE_UNAVAILABLE, + json=[], + ) + with pytest.raises(FailedToFetchFileIdsException): + await pipelines_api.get_pipeline_file_ids( + workspace_name="test_workspace", + pipeline_name="test_pipeline", + status=FileIndexingStatus.INDEXED_NO_DOCUMENTS, + ) diff --git a/tests/unit/service/test_pipelines_service.py b/tests/unit/service/test_pipelines_service.py index 983bd314..54836214 100644 --- a/tests/unit/service/test_pipelines_service.py +++ b/tests/unit/service/test_pipelines_service.py @@ -1,11 +1,11 @@ from unittest.mock import AsyncMock, Mock from uuid import UUID -from deepset_cloud_sdk._api.pipelines import FileIndexingStatus -from deepset_cloud_sdk._service.pipeline_service import PipelinesService import pytest from deepset_cloud_sdk._api.config import CommonConfig +from deepset_cloud_sdk._api.pipelines import FileIndexingStatus +from deepset_cloud_sdk._service.pipeline_service import PipelinesService @pytest.fixture From cdb38f4a1db006167c566c672d3df55d2395b4b5 Mon Sep 17 00:00:00 2001 From: Kristof Herrmann Date: Sun, 14 Apr 2024 12:07:14 +0200 Subject: [PATCH 04/11] feat: test async workflow --- .../async_client/test_async_workflow_files.py | 133 +++++++++++++++++- 1 file changed, 132 insertions(+), 1 deletion(-) diff --git a/tests/unit/workflows/async_client/test_async_workflow_files.py b/tests/unit/workflows/async_client/test_async_workflow_files.py index 8526c630..d5ab0bd1 100644 --- a/tests/unit/workflows/async_client/test_async_workflow_files.py +++ b/tests/unit/workflows/async_client/test_async_workflow_files.py @@ -1,7 +1,7 @@ import datetime from pathlib import Path from typing import Any, AsyncGenerator, List -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, Mock, call from uuid import UUID import pytest @@ -10,6 +10,7 @@ from deepset_cloud_sdk._api.config import DEFAULT_WORKSPACE_NAME from deepset_cloud_sdk._api.files import File +from deepset_cloud_sdk._api.pipelines import FileIndexingStatus from deepset_cloud_sdk._api.upload_sessions import ( UploadSessionDetail, UploadSessionIngestionStatus, @@ -19,9 +20,11 @@ WriteMode, ) from deepset_cloud_sdk._service.files_service import DeepsetCloudFile, FilesService +from deepset_cloud_sdk._service.pipeline_service import PipelinesService from deepset_cloud_sdk.models import UserInfo from deepset_cloud_sdk.workflows.async_client.files import ( download, + download_pipeline_files, get_upload_session, list_files, list_upload_sessions, @@ -280,3 +283,131 @@ async def mocked_get_upload_session( monkeypatch.setattr(FilesService, "get_upload_session", mocked_get_upload_session) returned_upload_session = await get_upload_session(session_id=UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10")) assert returned_upload_session == mocked_upload_session + + +@pytest.mark.asyncio +class TestPipelineFiles: + @pytest.fixture + def mocked_download(self) -> Mock: + return Mock() + + @pytest.fixture + def mocked_get_pipeline_file_ids(self) -> Mock: + get_file_ids = Mock() + get_file_ids.return_value = [ + UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10"), + UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a11"), + ] + return get_file_ids + + async def test_download_pipeline_files( + self, monkeypatch: MonkeyPatch, mocked_download: Mock, mocked_get_pipeline_file_ids: Mock + ) -> None: + monkeypatch.setattr(PipelinesService, "get_pipeline_file_ids", mocked_get_pipeline_file_ids) + monkeypatch.setattr(FilesService, "download", mocked_download) + + await download_pipeline_files( + workspace_name="my_workspace", + pipeline_name="my_pipeline", + batch_size=100, + timeout_s=100, + ) + # Check that pipeline indexing file ids was called with + # the correct status for filtering + assert ( + call(pipeline_name="my_pipeline", workspace_name="my_workspace", status=FileIndexingStatus.FAILED) + in mocked_get_pipeline_file_ids.call_args_list + ) + assert ( + call( + pipeline_name="my_pipeline", + workspace_name="my_workspace", + status=FileIndexingStatus.INDEXED_NO_DOCUMENTS, + ) + in mocked_get_pipeline_file_ids.call_args_list + ) + + # Assert that download calls are correctly called for + # failed files + assert ( + call( + workspace_name="my_workspace", + file_dir=Path("./FAILED"), + odata_filter=f"file_id in 'cd16435f-f6eb-423f-bf6f-994dc8a36a10,cd16435f-f6eb-423f-bf6f-994dc8a36a11'", + include_meta=True, + batch_size=100, + show_progress=True, + timeout_s=100, + ) + in mocked_download.call_args_list + ) + assert ( + call( + workspace_name="my_workspace", + file_dir=Path("./INDEXED_NO_DOCUMENTS"), + odata_filter=f"file_id in 'cd16435f-f6eb-423f-bf6f-994dc8a36a10,cd16435f-f6eb-423f-bf6f-994dc8a36a11'", + include_meta=True, + batch_size=100, + show_progress=True, + timeout_s=100, + ) + in mocked_download.call_args_list + ) + + async def test_download_parses_path_for_string( + self, monkeypatch: MonkeyPatch, mocked_download: Mock, mocked_get_pipeline_file_ids: Mock + ) -> None: + monkeypatch.setattr(PipelinesService, "get_pipeline_file_ids", mocked_get_pipeline_file_ids) + monkeypatch.setattr(FilesService, "download", mocked_download) + + await download_pipeline_files( + workspace_name="my_workspace", + pipeline_name="my_pipeline", + file_dir="./my-path", + batch_size=100, + timeout_s=100, + ) + + # Assert that download calls are correctly called for + # failed files + assert ( + call( + workspace_name="my_workspace", + file_dir=Path("./my-path/FAILED"), # THIS IS TESTED + odata_filter=f"file_id in 'cd16435f-f6eb-423f-bf6f-994dc8a36a10,cd16435f-f6eb-423f-bf6f-994dc8a36a11'", + include_meta=True, + batch_size=100, + show_progress=True, + timeout_s=100, + ) + in mocked_download.call_args_list + ) + + async def test_download_parses_path_for_path_type( + self, monkeypatch: MonkeyPatch, mocked_download: Mock, mocked_get_pipeline_file_ids: Mock + ) -> None: + monkeypatch.setattr(PipelinesService, "get_pipeline_file_ids", mocked_get_pipeline_file_ids) + monkeypatch.setattr(FilesService, "download", mocked_download) + + await download_pipeline_files( + workspace_name="my_workspace", + pipeline_name="my_pipeline", + file_dir=Path("./my-path"), + batch_size=100, + timeout_s=100, + ) + + # Assert that download calls are correctly called for + # failed files + assert ( + call( + workspace_name="my_workspace", + file_dir=Path("./my-path/FAILED"), # THIS IS TESTED + odata_filter=f"file_id in 'cd16435f-f6eb-423f-bf6f-994dc8a36a10,cd16435f-f6eb-423f-bf6f-994dc8a36a11'", + include_meta=True, + batch_size=100, + show_progress=True, + timeout_s=100, + ) + in mocked_download.call_args_list + ) From d34fd1e0dd971037f58994ff217072ca80a5fbd7 Mon Sep 17 00:00:00 2001 From: Kristof Herrmann Date: Sun, 14 Apr 2024 12:09:28 +0200 Subject: [PATCH 05/11] tests: sync workflow files --- .../sync_client/test_sync_workflow_files.py | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/unit/workflows/sync_client/test_sync_workflow_files.py b/tests/unit/workflows/sync_client/test_sync_workflow_files.py index 73991c85..6a1e5d18 100644 --- a/tests/unit/workflows/sync_client/test_sync_workflow_files.py +++ b/tests/unit/workflows/sync_client/test_sync_workflow_files.py @@ -18,6 +18,7 @@ from deepset_cloud_sdk.models import UserInfo from deepset_cloud_sdk.workflows.sync_client.files import ( download, + download_pipeline_files, get_upload_session, list_files, list_upload_sessions, @@ -151,6 +152,32 @@ def test_download_files() -> None: ) +def test_download_pipeline_files() -> None: + mocked_async_download_pipeline_files = AsyncMock() + with patch( + "deepset_cloud_sdk.workflows.sync_client.files.async_download_pipeline_files", + new=mocked_async_download_pipeline_files, + ): + download_pipeline_files( + workspace_name="my_workspace", + pipeline_name="my_pipeline", + file_dir="./mypath", + batch_size=100, + timeout_s=100, + ) + mocked_async_download_pipeline_files.assert_called_once_with( + api_key=None, + api_url=None, + workspace_name="my_workspace", + pipeline_name="my_pipeline", + file_dir="./mypath", + include_meta=True, + batch_size=100, + show_progress=True, + timeout_s=100, + ) + + def test_list_upload_sessions() -> None: async def mocked_async_upload_sessions( *args: Any, **kwargs: Any From 7a70fb2549cc736c3bf32f0740ca90fe69b3ed99 Mon Sep 17 00:00:00 2001 From: Kristof Herrmann Date: Sun, 14 Apr 2024 12:14:46 +0200 Subject: [PATCH 06/11] tests: cli method --- tests/unit/test_cli.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index f63acdd8..057fa44e 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -65,6 +65,23 @@ def test_download_files(self, sync_download_mock: AsyncMock) -> None: show_progress=True, ) + class TestDownloadPipelineFiles: + @patch("deepset_cloud_sdk.cli.sync_download_pipeline_files") + def test_download_pipeline_files(self, mocked_sync_download_pipeline_files: AsyncMock) -> None: + mocked_sync_download_pipeline_files.side_effect = Mock() + result = runner.invoke(cli_app, ["download-pipeline-files", "my-pipeline", "--workspace-name", "default"]) + assert result.exit_code == 0 + mocked_sync_download_pipeline_files.assert_called_once_with( + workspace_name="default", + pipeline_name="my-pipeline", + file_dir=None, + batch_size=50, + api_key=None, + api_url=None, + show_progress=True, + timeout_s=None, + ) + class TestListFiles: @patch("deepset_cloud_sdk.cli.sync_list_files") def test_listing_files(self, sync_list_files_mock: AsyncMock) -> None: From abb328770f68f35db5b9aebe7b7cac504db2a5f8 Mon Sep 17 00:00:00 2001 From: Kristof Herrmann Date: Sun, 14 Apr 2024 12:33:40 +0200 Subject: [PATCH 07/11] fix: tests --- tests/unit/service/test_pipelines_service.py | 4 ++-- .../async_client/test_async_workflow_files.py | 11 ++++++----- .../workflows/sync_client/test_sync_workflow_files.py | 1 - 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/unit/service/test_pipelines_service.py b/tests/unit/service/test_pipelines_service.py index 54836214..875ca7a7 100644 --- a/tests/unit/service/test_pipelines_service.py +++ b/tests/unit/service/test_pipelines_service.py @@ -25,10 +25,10 @@ class TestGetFileIds: async def test_get_file_ids( self, mocked_pipelines: Mock, - pipeline_service: PipelinesService, + pipelines_service: PipelinesService, ) -> None: mocked_pipelines.get_pipeline_file_ids = AsyncMock(return_value=[UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10")]) - returned_file_ids = await pipeline_service.get_pipeline_file_ids( + returned_file_ids = await pipelines_service.get_pipeline_file_ids( workspace_name="test_workspace", pipeline_name="test_pipeline", status=FileIndexingStatus.INDEXED_NO_DOCUMENTS, diff --git a/tests/unit/workflows/async_client/test_async_workflow_files.py b/tests/unit/workflows/async_client/test_async_workflow_files.py index d5ab0bd1..8c090ec5 100644 --- a/tests/unit/workflows/async_client/test_async_workflow_files.py +++ b/tests/unit/workflows/async_client/test_async_workflow_files.py @@ -288,12 +288,12 @@ async def mocked_get_upload_session( @pytest.mark.asyncio class TestPipelineFiles: @pytest.fixture - def mocked_download(self) -> Mock: - return Mock() + def mocked_download(self) -> AsyncMock: + return AsyncMock() @pytest.fixture - def mocked_get_pipeline_file_ids(self) -> Mock: - get_file_ids = Mock() + def mocked_get_pipeline_file_ids(self) -> AsyncMock: + get_file_ids = AsyncMock() get_file_ids.return_value = [ UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a10"), UUID("cd16435f-f6eb-423f-bf6f-994dc8a36a11"), @@ -301,7 +301,7 @@ def mocked_get_pipeline_file_ids(self) -> Mock: return get_file_ids async def test_download_pipeline_files( - self, monkeypatch: MonkeyPatch, mocked_download: Mock, mocked_get_pipeline_file_ids: Mock + self, monkeypatch: MonkeyPatch, mocked_download: AsyncMock, mocked_get_pipeline_file_ids: AsyncMock ) -> None: monkeypatch.setattr(PipelinesService, "get_pipeline_file_ids", mocked_get_pipeline_file_ids) monkeypatch.setattr(FilesService, "download", mocked_download) @@ -312,6 +312,7 @@ async def test_download_pipeline_files( batch_size=100, timeout_s=100, ) + # Check that pipeline indexing file ids was called with # the correct status for filtering assert ( diff --git a/tests/unit/workflows/sync_client/test_sync_workflow_files.py b/tests/unit/workflows/sync_client/test_sync_workflow_files.py index 6a1e5d18..be0c4f52 100644 --- a/tests/unit/workflows/sync_client/test_sync_workflow_files.py +++ b/tests/unit/workflows/sync_client/test_sync_workflow_files.py @@ -171,7 +171,6 @@ def test_download_pipeline_files() -> None: workspace_name="my_workspace", pipeline_name="my_pipeline", file_dir="./mypath", - include_meta=True, batch_size=100, show_progress=True, timeout_s=100, From 801f2f18000f6523af9c659a342e4a9c843cfe7b Mon Sep 17 00:00:00 2001 From: Kristof Herrmann Date: Mon, 22 Apr 2024 12:08:05 +0200 Subject: [PATCH 08/11] fix: docstrings --- deepset_cloud_sdk/_api/pipelines.py | 3 +-- deepset_cloud_sdk/_service/pipeline_service.py | 7 +++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/deepset_cloud_sdk/_api/pipelines.py b/deepset_cloud_sdk/_api/pipelines.py index 07a90a47..9e673e2b 100644 --- a/deepset_cloud_sdk/_api/pipelines.py +++ b/deepset_cloud_sdk/_api/pipelines.py @@ -17,7 +17,7 @@ class FileIndexingStatus(str, Enum): - """File indexing status""" + """File indexing status.""" FAILED = "FAILED" INDEXED_NO_DOCUMENTS = "INDEXED_NO_DOCUMENTS" @@ -56,7 +56,6 @@ async def get_pipeline_file_ids( :param workspace_name: Name of the workspace. :param status: Status that should be used for fetching files """ - params: Dict[str, str] = {"status": status} response = await self._deepset_cloud_api.get(workspace_name, f"pipelines/{pipeline_name}/files", params=params) if response.status_code == codes.NOT_FOUND: diff --git a/deepset_cloud_sdk/_service/pipeline_service.py b/deepset_cloud_sdk/_service/pipeline_service.py index 278b2601..6822c2a5 100644 --- a/deepset_cloud_sdk/_service/pipeline_service.py +++ b/deepset_cloud_sdk/_service/pipeline_service.py @@ -39,6 +39,13 @@ async def factory(cls, config: CommonConfig) -> AsyncGenerator[PipelinesService, async def get_pipeline_file_ids( self, pipeline_name: str, workspace_name: str, status: FileIndexingStatus = FileIndexingStatus.FAILED ) -> List[UUID]: + """Get file ids that failed or did not create documents during indexing. + + :param pipeline_name: Name of the pipeline that indexed files. + :param workspace_name: Name of the workspace. + :param status: Status that should be used for fetching files. + :return: List of file ids. + """ return await self._pipelines.get_pipeline_file_ids( pipeline_name=pipeline_name, workspace_name=workspace_name, status=status ) From a69ed63af834fcba5d4ee11bd5421e85e316fc27 Mon Sep 17 00:00:00 2001 From: Kristof Herrmann Date: Mon, 22 Apr 2024 12:08:55 +0200 Subject: [PATCH 09/11] fix: lint --- deepset_cloud_sdk/workflows/async_client/files.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/deepset_cloud_sdk/workflows/async_client/files.py b/deepset_cloud_sdk/workflows/async_client/files.py index 2983e332..e03b8624 100644 --- a/deepset_cloud_sdk/workflows/async_client/files.py +++ b/deepset_cloud_sdk/workflows/async_client/files.py @@ -234,12 +234,12 @@ async def download_pipeline_files( if isinstance(file_dir, str): file_dir = Path(file_dir).resolve() - for status in FileIndexingStatus: - status_file_dir = Path(str(file_dir) + f"/{status.value}") + for _status in FileIndexingStatus: + status_file_dir = Path(str(file_dir) + f"/{_status.value}") async with PipelinesService.factory(_get_config(api_key=api_key, api_url=api_url)) as pipeline_service: file_ids = await pipeline_service.get_pipeline_file_ids( - pipeline_name=pipeline_name, workspace_name=workspace_name, status=status + pipeline_name=pipeline_name, workspace_name=workspace_name, status=_status ) async with FilesService.factory(_get_config(api_key=api_key, api_url=api_url)) as file_service: From 0fe05a6b9cfc76efbe05094b140d6fa0f99cf85c Mon Sep 17 00:00:00 2001 From: Kristof Herrmann Date: Mon, 22 Apr 2024 12:09:28 +0200 Subject: [PATCH 10/11] fix: lint --- deepset_cloud_sdk/workflows/async_client/files.py | 1 - 1 file changed, 1 deletion(-) diff --git a/deepset_cloud_sdk/workflows/async_client/files.py b/deepset_cloud_sdk/workflows/async_client/files.py index e03b8624..eaeb4ad0 100644 --- a/deepset_cloud_sdk/workflows/async_client/files.py +++ b/deepset_cloud_sdk/workflows/async_client/files.py @@ -206,7 +206,6 @@ async def download( async def download_pipeline_files( pipeline_name: str, - status: FileIndexingStatus = FileIndexingStatus.FAILED, workspace_name: str = DEFAULT_WORKSPACE_NAME, file_dir: Optional[Union[Path, str]] = None, batch_size: int = 50, From 43a2af1d3b9fcd59783c7015d3994712e35711cc Mon Sep 17 00:00:00 2001 From: Kristof Herrmann Date: Sat, 27 Apr 2024 12:10:34 +0200 Subject: [PATCH 11/11] fix: add chunked --- deepset_cloud_sdk/_api/pipelines.py | 2 +- .../workflows/async_client/files.py | 30 ++++++++++++------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/deepset_cloud_sdk/_api/pipelines.py b/deepset_cloud_sdk/_api/pipelines.py index 9e673e2b..e6c87aa3 100644 --- a/deepset_cloud_sdk/_api/pipelines.py +++ b/deepset_cloud_sdk/_api/pipelines.py @@ -56,7 +56,7 @@ async def get_pipeline_file_ids( :param workspace_name: Name of the workspace. :param status: Status that should be used for fetching files """ - params: Dict[str, str] = {"status": status} + params: Dict[str, str] = {"status": status.value} response = await self._deepset_cloud_api.get(workspace_name, f"pipelines/{pipeline_name}/files", params=params) if response.status_code == codes.NOT_FOUND: raise PipelineNotFoundException() diff --git a/deepset_cloud_sdk/workflows/async_client/files.py b/deepset_cloud_sdk/workflows/async_client/files.py index 6a825ab8..23f83e09 100644 --- a/deepset_cloud_sdk/workflows/async_client/files.py +++ b/deepset_cloud_sdk/workflows/async_client/files.py @@ -1,7 +1,7 @@ # pylint:disable=too-many-arguments """This module contains async functions for uploading files and folders to deepset Cloud.""" from pathlib import Path -from typing import AsyncGenerator, List, Optional, Union +from typing import AsyncGenerator, Generator, List, Optional, Union from uuid import UUID from sniffio import AsyncLibraryNotFoundError @@ -24,6 +24,12 @@ from deepset_cloud_sdk._service.pipeline_service import PipelinesService +def _chunked(iterable: List, chunk_size: int) -> Generator[List, None, None]: + """Yield successive n-sized chunks from iterable.""" + for i in range(0, len(iterable), chunk_size): + yield iterable[i : i + chunk_size] + + def _get_config(api_key: Optional[str] = None, api_url: Optional[str] = None) -> CommonConfig: return CommonConfig(api_key=api_key or API_KEY, api_url=api_url or API_URL) @@ -247,16 +253,18 @@ async def download_pipeline_files( async with FilesService.factory(_get_config(api_key=api_key, api_url=api_url)) as file_service: # WARNING: This filter might explode if we have many files that failed or did not # create documents. We need to use the download method to get the file names here. - file_ids_filters = ",".join([str(_id) for _id in file_ids]) - await file_service.download( - workspace_name=workspace_name, - file_dir=status_file_dir, - odata_filter=f"file_id in '{file_ids_filters}'", - include_meta=True, - batch_size=batch_size, - show_progress=show_progress, - timeout_s=timeout_s, - ) + + # chunk the file_ids into batches of 10 to avoid a too long URL + for chunk in _chunked(file_ids, 10): + await file_service.download( + workspace_name=workspace_name, + file_dir=status_file_dir, + odata_filter=" or ".join([f"file_id eq '{_id}'" for _id in chunk]), + include_meta=True, + batch_size=batch_size, + show_progress=show_progress, + timeout_s=timeout_s, + ) async def upload_texts(