Skip to content

Commit 400101f

Browse files
authored
feat: add pipeline versions (#205)
* feat: add pipeline versions * fix lint * fix ci * fix ci * fix ci * fix ci * fix ci * upgrade lock file
1 parent b750f7a commit 400101f

17 files changed

Lines changed: 1945 additions & 1289 deletions

File tree

.github/workflows/ci.yml

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,8 @@ jobs:
2121
with:
2222
python-version-file: ".python-version"
2323

24-
- name: Install project
25-
run: uv sync --locked --all-extras --dev
26-
2724
- name: Run tests
28-
run: uv run --dev pytest -m "not integration"
25+
run: make test-unit
2926

3027
lint:
3128
name: Run Linting
@@ -41,14 +38,11 @@ jobs:
4138
with:
4239
python-version-file: ".python-version"
4340

44-
- name: Install project
45-
run: uv sync --locked --all-extras --group lint
46-
4741
- name: Run lint
48-
run: uv run ruff check
42+
run: make lint
4943

5044
- name: Run format
51-
run: uv run ruff format --check
45+
run: make format-check
5246

5347
types:
5448
name: Run mypy
@@ -64,11 +58,8 @@ jobs:
6458
with:
6559
python-version-file: ".python-version"
6660

67-
- name: Install project
68-
run: uv sync --locked --all-extras --group types
69-
7061
- name: Run types
71-
run: uv run mypy src/ test/
62+
run: make types
7263

7364
license_headers:
7465
name: Check License Headers

.python-version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.11
1+
3.13

pyproject.toml

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,13 @@ dev = [
6464
"pytest",
6565
"pytest-asyncio",
6666
"python-dotenv",
67-
"docker"
68-
]
69-
lint = [
67+
"docker",
7068
"ruff",
71-
]
72-
73-
types = [
7469
"mypy",
7570
"types-PyYAML",
71+
"types-redis",
7672
"pandas-stubs",
73+
"redis>=4.0.0",
7774
]
7875

7976
docs = [

src/deepset_mcp/api/pipeline/models.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,3 +275,31 @@ class LogLevel(StrEnum):
275275
INFO = "info"
276276
WARNING = "warning"
277277
ERROR = "error"
278+
279+
280+
class PipelineVersion(BaseModel):
281+
"""Model representing a version of a pipeline."""
282+
283+
version_id: UUID
284+
"Unique identifier for the pipeline version"
285+
version_number: int | None = None
286+
"Sequential version number"
287+
config_yaml: str
288+
"YAML configuration for this version"
289+
description: str | None = None
290+
"Optional description of the version"
291+
is_draft: bool = False
292+
"Whether this version is a draft"
293+
created_at: datetime
294+
"Timestamp when the version was created"
295+
created_by: DeepsetUser | None = None
296+
"User who created the version"
297+
updated_at: datetime | None = None
298+
"Timestamp when the version was last updated"
299+
updated_by: DeepsetUser | None = None
300+
"User who last updated the version"
301+
302+
class Config:
303+
"""Configuration for serialization and deserialization."""
304+
305+
populate_by_name = True

src/deepset_mcp/api/pipeline/protocols.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
LogLevel,
1313
PipelineLog,
1414
PipelineValidationResult,
15+
PipelineVersion,
1516
)
1617
from deepset_mcp.api.shared_models import NoContentResponse, PaginatedResponse
1718

@@ -35,13 +36,42 @@ async def create(self, name: str, yaml_config: str) -> NoContentResponse:
3536
"""Create a new pipeline with a name and YAML config."""
3637
...
3738

38-
async def update(
39+
async def list_versions(
3940
self,
4041
pipeline_name: str,
41-
updated_pipeline_name: str | None = None,
42-
yaml_config: str | None = None,
43-
) -> NoContentResponse:
44-
"""Update name and/or YAML config of an existing pipeline."""
42+
limit: int = 10,
43+
after: str | None = None,
44+
) -> PaginatedResponse[PipelineVersion]:
45+
"""List versions of a pipeline."""
46+
...
47+
48+
async def create_version(
49+
self,
50+
pipeline_name: str,
51+
config_yaml: str,
52+
description: str | None = None,
53+
is_draft: bool = False,
54+
) -> PipelineVersion:
55+
"""Create a new version of a pipeline."""
56+
...
57+
58+
async def get_version(self, pipeline_name: str, version_id: str) -> PipelineVersion:
59+
"""Fetch a specific version of a pipeline."""
60+
...
61+
62+
async def restore_version(self, pipeline_name: str, version_id: str) -> PipelineVersion:
63+
"""Restore a pipeline to a previous version."""
64+
...
65+
66+
async def patch_version(
67+
self,
68+
pipeline_name: str,
69+
version_id: str,
70+
config_yaml: str | None = None,
71+
description: str | None = None,
72+
is_draft: bool | None = None,
73+
) -> PipelineVersion:
74+
"""Patch fields of an existing pipeline version."""
4575
...
4676

4777
async def get_logs(

src/deepset_mcp/api/pipeline/resource.py

Lines changed: 133 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
LogLevel,
1717
PipelineLog,
1818
PipelineValidationResult,
19+
PipelineVersion,
1920
ValidationError,
2021
)
2122
from deepset_mcp.api.pipeline.protocols import PipelineResourceProtocol
@@ -161,54 +162,149 @@ async def create(self, pipeline_name: str, yaml_config: str) -> NoContentRespons
161162

162163
return NoContentResponse(message="Pipeline created successfully.")
163164

164-
async def update(
165+
async def list_versions(
165166
self,
166167
pipeline_name: str,
167-
updated_pipeline_name: str | None = None,
168-
yaml_config: str | None = None,
169-
) -> NoContentResponse:
170-
"""Update name and/or YAML config of an existing pipeline.
171-
172-
:param pipeline_name: Current name of the pipeline.
173-
:param updated_pipeline_name: New name for the pipeline (optional).
174-
:param yaml_config: New YAML configuration (optional).
175-
:returns: NoContentResponse indicating successful update.
176-
:raises ValueError: If neither updated_pipeline_name nor yaml_config is provided.
168+
limit: int = 10,
169+
after: str | None = None,
170+
) -> PaginatedResponse[PipelineVersion]:
171+
"""List versions of a pipeline, most recent first.
172+
173+
:param pipeline_name: Name of the pipeline.
174+
:param limit: Maximum number of versions to return per page.
175+
:param after: Cursor (version_id UUID) for the next page.
176+
:returns: A `PaginatedResponse` containing pipeline versions.
177177
"""
178-
# Handle name update first if any
179-
if updated_pipeline_name is not None:
180-
name_resp = await self._client.request(
181-
endpoint=f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/{quote(pipeline_name, safe='')}",
182-
method="PATCH",
183-
data={"name": updated_pipeline_name},
184-
)
178+
request_params: dict[str, Any] = {"limit": limit, "order": "DESC", "field": "version_number"}
179+
if after is not None:
180+
request_params["after"] = after
185181

186-
raise_for_status(name_resp)
182+
page = await self._list_versions_api_call(pipeline_name, **request_params)
183+
page._inject_paginator(
184+
fetch_func=lambda **kwargs: self._list_versions_api_call(pipeline_name, **kwargs),
185+
base_args={"limit": limit, "order": "DESC", "field": "version_number"},
186+
cursor_param="after",
187+
)
188+
return page
187189

188-
pipeline_name = updated_pipeline_name
190+
async def _list_versions_api_call(self, pipeline_name: str, **kwargs: Any) -> PaginatedResponse[PipelineVersion]:
191+
resp = await self._client.request(
192+
endpoint=(
193+
f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/{quote(pipeline_name, safe='')}/versions"
194+
),
195+
method="GET",
196+
params=kwargs,
197+
)
198+
raise_for_status(resp)
199+
if resp.json is None:
200+
raise UnexpectedAPIError(status_code=resp.status_code, message="Empty response", detail=None)
201+
return PaginatedResponse[PipelineVersion].create_with_cursor_field(resp.json, "version_id")
189202

190-
if yaml_config is None:
191-
return NoContentResponse(message="Pipeline name updated successfully.")
203+
async def create_version(
204+
self,
205+
pipeline_name: str,
206+
config_yaml: str,
207+
description: str | None = None,
208+
is_draft: bool = False,
209+
) -> PipelineVersion:
210+
"""Create a new version of a pipeline.
211+
212+
:param pipeline_name: Name of the pipeline.
213+
:param config_yaml: YAML configuration for the new version.
214+
:param description: Optional description of the version.
215+
:param is_draft: Whether to create the version as a draft.
216+
:returns: The newly created PipelineVersion.
217+
"""
218+
data: dict[str, Any] = {"config_yaml": config_yaml, "is_draft": is_draft}
219+
if description is not None:
220+
data["description"] = description
192221

193-
if yaml_config is not None:
194-
yaml_resp = await self._client.request(
195-
endpoint=(
196-
f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/{quote(pipeline_name, safe='')}/yaml"
197-
),
198-
method="PUT",
199-
data={"query_yaml": yaml_config},
200-
)
222+
resp = await self._client.request(
223+
endpoint=(
224+
f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/{quote(pipeline_name, safe='')}/versions"
225+
),
226+
method="POST",
227+
data=data,
228+
)
229+
raise_for_status(resp)
230+
if resp.json is None:
231+
raise UnexpectedAPIError(status_code=resp.status_code, message="Empty response", detail=None)
232+
return PipelineVersion.model_validate(resp.json)
201233

202-
raise_for_status(yaml_resp)
234+
async def get_version(self, pipeline_name: str, version_id: str) -> PipelineVersion:
235+
"""Fetch a specific version of a pipeline.
203236
204-
if updated_pipeline_name is not None:
205-
response = NoContentResponse(message="Pipeline name and YAML updated successfully.")
206-
else:
207-
response = NoContentResponse(message="Pipeline YAML updated successfully.")
237+
:param pipeline_name: Name of the pipeline.
238+
:param version_id: UUID of the version to fetch.
239+
:returns: The requested PipelineVersion.
240+
"""
241+
resp = await self._client.request(
242+
endpoint=(
243+
f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/"
244+
f"{quote(pipeline_name, safe='')}/versions/{quote(version_id, safe='')}"
245+
),
246+
)
247+
raise_for_status(resp)
248+
if resp.json is None:
249+
raise UnexpectedAPIError(status_code=resp.status_code, message="Empty response", detail=None)
250+
return PipelineVersion.model_validate(resp.json)
251+
252+
async def restore_version(self, pipeline_name: str, version_id: str) -> PipelineVersion:
253+
"""Restore a pipeline to a previous version.
254+
255+
:param pipeline_name: Name of the pipeline.
256+
:param version_id: UUID of the version to restore.
257+
:returns: The restored PipelineVersion.
258+
"""
259+
resp = await self._client.request(
260+
endpoint=(
261+
f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/"
262+
f"{quote(pipeline_name, safe='')}/versions/{quote(version_id, safe='')}/restore"
263+
),
264+
method="POST",
265+
)
266+
raise_for_status(resp)
267+
if resp.json is None:
268+
raise UnexpectedAPIError(status_code=resp.status_code, message="Empty response", detail=None)
269+
return PipelineVersion.model_validate(resp.json)
208270

209-
return response
271+
async def patch_version(
272+
self,
273+
pipeline_name: str,
274+
version_id: str,
275+
config_yaml: str | None = None,
276+
description: str | None = None,
277+
is_draft: bool | None = None,
278+
) -> PipelineVersion:
279+
"""Patch fields of an existing pipeline version.
280+
281+
:param pipeline_name: Name of the pipeline.
282+
:param version_id: UUID of the version to patch.
283+
:param config_yaml: New YAML configuration (optional).
284+
:param description: New description (optional).
285+
:param is_draft: New draft status (optional).
286+
:returns: The updated PipelineVersion.
287+
"""
288+
data: dict[str, Any] = {}
289+
if config_yaml is not None:
290+
data["config_yaml"] = config_yaml
291+
if description is not None:
292+
data["description"] = description
293+
if is_draft is not None:
294+
data["is_draft"] = is_draft
210295

211-
raise ValueError("Either `updated_pipeline_name` or `yaml_config` must be provided.")
296+
resp = await self._client.request(
297+
endpoint=(
298+
f"v1/workspaces/{quote(self._workspace, safe='')}/pipelines/"
299+
f"{quote(pipeline_name, safe='')}/versions/{quote(version_id, safe='')}"
300+
),
301+
method="PATCH",
302+
data=data,
303+
)
304+
raise_for_status(resp)
305+
if resp.json is None:
306+
raise UnexpectedAPIError(status_code=resp.status_code, message="Empty response", detail=None)
307+
return PipelineVersion.model_validate(resp.json)
212308

213309
async def get_logs(
214310
self,

0 commit comments

Comments
 (0)