Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ jobs:
with:
python-version-file: ".python-version"

- name: Install project
run: uv sync --locked --all-extras --dev

- name: Run tests
run: uv run --dev pytest -m "not integration"
run: make test-unit

lint:
name: Run Linting
Expand All @@ -41,14 +38,11 @@ jobs:
with:
python-version-file: ".python-version"

- name: Install project
run: uv sync --locked --all-extras --group lint

- name: Run lint
run: uv run ruff check
run: make lint

- name: Run format
run: uv run ruff format --check
run: make format-check

types:
name: Run mypy
Expand All @@ -64,11 +58,8 @@ jobs:
with:
python-version-file: ".python-version"

- name: Install project
run: uv sync --locked --all-extras --group types

- name: Run types
run: uv run mypy src/ test/
run: make types

license_headers:
name: Check License Headers
Expand Down
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.11
3.13
9 changes: 3 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,13 @@ dev = [
"pytest",
"pytest-asyncio",
"python-dotenv",
"docker"
]
lint = [
"docker",
"ruff",
]

types = [
"mypy",
"types-PyYAML",
"types-redis",
"pandas-stubs",
"redis>=4.0.0",
]

docs = [
Expand Down
28 changes: 28 additions & 0 deletions src/deepset_mcp/api/pipeline/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,31 @@ class LogLevel(StrEnum):
INFO = "info"
WARNING = "warning"
ERROR = "error"


class PipelineVersion(BaseModel):
"""Model representing a version of a pipeline."""

version_id: UUID
"Unique identifier for the pipeline version"
version_number: int | None = None
"Sequential version number"
config_yaml: str
"YAML configuration for this version"
description: str | None = None
"Optional description of the version"
is_draft: bool = False
"Whether this version is a draft"
created_at: datetime
"Timestamp when the version was created"
created_by: DeepsetUser | None = None
"User who created the version"
updated_at: datetime | None = None
"Timestamp when the version was last updated"
updated_by: DeepsetUser | None = None
"User who last updated the version"

class Config:
"""Configuration for serialization and deserialization."""

populate_by_name = True
40 changes: 35 additions & 5 deletions src/deepset_mcp/api/pipeline/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
LogLevel,
PipelineLog,
PipelineValidationResult,
PipelineVersion,
)
from deepset_mcp.api.shared_models import NoContentResponse, PaginatedResponse

Expand All @@ -35,13 +36,42 @@ async def create(self, name: str, yaml_config: str) -> NoContentResponse:
"""Create a new pipeline with a name and YAML config."""
...

async def update(
async def list_versions(
self,
pipeline_name: str,
updated_pipeline_name: str | None = None,
yaml_config: str | None = None,
) -> NoContentResponse:
"""Update name and/or YAML config of an existing pipeline."""
limit: int = 10,
after: str | None = None,
) -> PaginatedResponse[PipelineVersion]:
"""List versions of a pipeline."""
...

async def create_version(
self,
pipeline_name: str,
config_yaml: str,
description: str | None = None,
is_draft: bool = False,
) -> PipelineVersion:
"""Create a new version of a pipeline."""
...

async def get_version(self, pipeline_name: str, version_id: str) -> PipelineVersion:
"""Fetch a specific version of a pipeline."""
...

async def restore_version(self, pipeline_name: str, version_id: str) -> PipelineVersion:
"""Restore a pipeline to a previous version."""
...

async def patch_version(
self,
pipeline_name: str,
version_id: str,
config_yaml: str | None = None,
description: str | None = None,
is_draft: bool | None = None,
) -> PipelineVersion:
"""Patch fields of an existing pipeline version."""
...

async def get_logs(
Expand Down
170 changes: 133 additions & 37 deletions src/deepset_mcp/api/pipeline/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
LogLevel,
PipelineLog,
PipelineValidationResult,
PipelineVersion,
ValidationError,
)
from deepset_mcp.api.pipeline.protocols import PipelineResourceProtocol
Expand Down Expand Up @@ -161,54 +162,149 @@ async def create(self, pipeline_name: str, yaml_config: str) -> NoContentRespons

return NoContentResponse(message="Pipeline created successfully.")

async def update(
async def list_versions(
self,
pipeline_name: str,
updated_pipeline_name: str | None = None,
yaml_config: str | None = None,
) -> NoContentResponse:
"""Update name and/or YAML config of an existing pipeline.

:param pipeline_name: Current name of the pipeline.
:param updated_pipeline_name: New name for the pipeline (optional).
:param yaml_config: New YAML configuration (optional).
:returns: NoContentResponse indicating successful update.
:raises ValueError: If neither updated_pipeline_name nor yaml_config is provided.
limit: int = 10,
after: str | None = None,
) -> PaginatedResponse[PipelineVersion]:
"""List versions of a pipeline, most recent first.

:param pipeline_name: Name of the pipeline.
:param limit: Maximum number of versions to return per page.
:param after: Cursor (version_id UUID) for the next page.
:returns: A `PaginatedResponse` containing pipeline versions.
"""
# Handle name update first if any
if updated_pipeline_name is not None:
name_resp = await self._client.request(
endpoint=f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/{quote(pipeline_name, safe='')}",
method="PATCH",
data={"name": updated_pipeline_name},
)
request_params: dict[str, Any] = {"limit": limit, "order": "DESC", "field": "version_number"}
if after is not None:
request_params["after"] = after

raise_for_status(name_resp)
page = await self._list_versions_api_call(pipeline_name, **request_params)
page._inject_paginator(
fetch_func=lambda **kwargs: self._list_versions_api_call(pipeline_name, **kwargs),
base_args={"limit": limit, "order": "DESC", "field": "version_number"},
cursor_param="after",
)
return page

pipeline_name = updated_pipeline_name
async def _list_versions_api_call(self, pipeline_name: str, **kwargs: Any) -> PaginatedResponse[PipelineVersion]:
resp = await self._client.request(
endpoint=(
f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/{quote(pipeline_name, safe='')}/versions"
),
method="GET",
params=kwargs,
)
raise_for_status(resp)
if resp.json is None:
raise UnexpectedAPIError(status_code=resp.status_code, message="Empty response", detail=None)
return PaginatedResponse[PipelineVersion].create_with_cursor_field(resp.json, "version_id")

if yaml_config is None:
return NoContentResponse(message="Pipeline name updated successfully.")
async def create_version(
self,
pipeline_name: str,
config_yaml: str,
description: str | None = None,
is_draft: bool = False,
) -> PipelineVersion:
"""Create a new version of a pipeline.

:param pipeline_name: Name of the pipeline.
:param config_yaml: YAML configuration for the new version.
:param description: Optional description of the version.
:param is_draft: Whether to create the version as a draft.
:returns: The newly created PipelineVersion.
"""
data: dict[str, Any] = {"config_yaml": config_yaml, "is_draft": is_draft}
if description is not None:
data["description"] = description

if yaml_config is not None:
yaml_resp = await self._client.request(
endpoint=(
f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/{quote(pipeline_name, safe='')}/yaml"
),
method="PUT",
data={"query_yaml": yaml_config},
)
resp = await self._client.request(
endpoint=(
f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/{quote(pipeline_name, safe='')}/versions"
),
method="POST",
data=data,
)
raise_for_status(resp)
if resp.json is None:
raise UnexpectedAPIError(status_code=resp.status_code, message="Empty response", detail=None)
return PipelineVersion.model_validate(resp.json)

raise_for_status(yaml_resp)
async def get_version(self, pipeline_name: str, version_id: str) -> PipelineVersion:
"""Fetch a specific version of a pipeline.

if updated_pipeline_name is not None:
response = NoContentResponse(message="Pipeline name and YAML updated successfully.")
else:
response = NoContentResponse(message="Pipeline YAML updated successfully.")
:param pipeline_name: Name of the pipeline.
:param version_id: UUID of the version to fetch.
:returns: The requested PipelineVersion.
"""
resp = await self._client.request(
endpoint=(
f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/"
f"{quote(pipeline_name, safe='')}/versions/{quote(version_id, safe='')}"
),
)
raise_for_status(resp)
if resp.json is None:
raise UnexpectedAPIError(status_code=resp.status_code, message="Empty response", detail=None)
return PipelineVersion.model_validate(resp.json)

async def restore_version(self, pipeline_name: str, version_id: str) -> PipelineVersion:
"""Restore a pipeline to a previous version.

:param pipeline_name: Name of the pipeline.
:param version_id: UUID of the version to restore.
:returns: The restored PipelineVersion.
"""
resp = await self._client.request(
endpoint=(
f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/"
f"{quote(pipeline_name, safe='')}/versions/{quote(version_id, safe='')}/restore"
),
method="POST",
)
raise_for_status(resp)
if resp.json is None:
raise UnexpectedAPIError(status_code=resp.status_code, message="Empty response", detail=None)
return PipelineVersion.model_validate(resp.json)

return response
async def patch_version(
self,
pipeline_name: str,
version_id: str,
config_yaml: str | None = None,
description: str | None = None,
is_draft: bool | None = None,
) -> PipelineVersion:
"""Patch fields of an existing pipeline version.

:param pipeline_name: Name of the pipeline.
:param version_id: UUID of the version to patch.
:param config_yaml: New YAML configuration (optional).
:param description: New description (optional).
:param is_draft: New draft status (optional).
:returns: The updated PipelineVersion.
"""
data: dict[str, Any] = {}
if config_yaml is not None:
data["config_yaml"] = config_yaml
if description is not None:
data["description"] = description
if is_draft is not None:
data["is_draft"] = is_draft

raise ValueError("Either `updated_pipeline_name` or `yaml_config` must be provided.")
resp = await self._client.request(
endpoint=(
f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/"
f"{quote(pipeline_name, safe='')}/versions/{quote(version_id, safe='')}"
),
method="PATCH",
data=data,
)
raise_for_status(resp)
if resp.json is None:
raise UnexpectedAPIError(status_code=resp.status_code, message="Empty response", detail=None)
return PipelineVersion.model_validate(resp.json)

async def get_logs(
self,
Expand Down
Loading
Loading