Skip to content

Commit f6f21e6

Browse files
authored
feat: add teardown functionality for deleting pipelines and indexes (#83)
* feat: add teardown_actions module for deleting pipelines and indexes * feat: import teardown actions in CLI module * feat: add teardown CLI commands for deleting pipelines and indexes * fix: format
1 parent 0910138 commit f6f21e6

2 files changed

Lines changed: 350 additions & 0 deletions

File tree

src/deepset_mcp/benchmark/runner/cli.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212
setup_pipeline,
1313
setup_test_case,
1414
)
15+
from deepset_mcp.benchmark.runner.teardown_actions import (
16+
teardown_all,
17+
teardown_index,
18+
teardown_pipeline,
19+
teardown_test_case,
20+
)
1521

1622
app = typer.Typer(help="Short commands for listing/creating test cases, pipelines, and indexes.")
1723

@@ -199,6 +205,154 @@ def create_index(
199205
raise typer.Exit(code=1)
200206

201207

208+
@app.command("delete-case")
209+
def delete_case(
210+
test_name: str = typer.Argument(..., help="Test-case name (without .yml)."),
211+
workspace_name: str = typer.Option(
212+
"default", "--workspace", "-w", help="Workspace from which to delete pipelines and indexes."
213+
),
214+
api_key: str | None = typer.Option(
215+
None,
216+
"--api-key",
217+
"-k",
218+
help="Explicit DP_API_KEY to use (overrides environment).",
219+
),
220+
task_dir: str | None = typer.Option(
221+
None,
222+
help="Directory where test-case YAMLs are stored.",
223+
),
224+
) -> None:
225+
"""Load a single test-case by name and delete its pipeline + index (if any) from `workspace_name`."""
226+
try:
227+
test_cfg = load_test_case_by_name(name=test_name, task_dir=task_dir)
228+
except FileNotFoundError:
229+
typer.secho(f"Test-case '{test_name}' not found under {task_dir}.", fg=typer.colors.RED)
230+
raise typer.Exit(code=1)
231+
except Exception as e:
232+
typer.secho(f"Failed to load test-case '{test_name}': {e}", fg=typer.colors.RED)
233+
raise typer.Exit(code=1)
234+
235+
typer.secho(f"→ Deleting resources for '{test_name}' from '{workspace_name}'…", fg=typer.colors.GREEN)
236+
try:
237+
teardown_test_case(test_cfg=test_cfg, workspace_name=workspace_name, api_key=api_key)
238+
except Exception as e:
239+
typer.secho(f"✘ Failed to teardown '{test_name}': {e}", fg=typer.colors.RED)
240+
raise typer.Exit(code=1)
241+
242+
typer.secho(f"✔ '{test_name}' resources deleted.", fg=typer.colors.GREEN)
243+
244+
245+
@app.command("delete-all")
246+
def delete_all(
247+
workspace_name: str = typer.Option(
248+
"default", "--workspace", "-w", help="Workspace from which to delete pipelines and indexes."
249+
),
250+
api_key: str | None = typer.Option(
251+
None,
252+
"--api-key",
253+
"-k",
254+
help="Explicit DP_API_KEY to use (overrides environment).",
255+
),
256+
concurrency: int = typer.Option(
257+
5,
258+
"--concurrency",
259+
"-c",
260+
help="Maximum number of test-cases to teardown in parallel.",
261+
),
262+
task_dir: str | None = typer.Option(
263+
None,
264+
help="Directory where test-case YAMLs are stored.",
265+
),
266+
) -> None:
267+
"""Load every test-case under `task_dir` and delete pipelines + indexes from `workspace_name` in parallel."""
268+
paths = find_all_test_case_paths(task_dir)
269+
if not paths:
270+
typer.secho(f"No test-case files found in {task_dir}", fg=typer.colors.RED)
271+
raise typer.Exit(code=1)
272+
273+
# 1) Load all configs
274+
test_cfgs: list[TestCaseConfig] = []
275+
for p in paths:
276+
try:
277+
cfg = load_test_case_from_path(path=p)
278+
test_cfgs.append(cfg)
279+
except Exception as e:
280+
typer.secho(f"Skipping '{p.stem}' (load error: {e})", fg=typer.colors.YELLOW)
281+
282+
if not test_cfgs:
283+
typer.secho("No valid test-case configs to delete.", fg=typer.colors.RED)
284+
raise typer.Exit(code=1)
285+
286+
typer.secho(
287+
f"→ Deleting {len(test_cfgs)} test-cases from '{workspace_name}' (concurrency={concurrency})…",
288+
fg=typer.colors.GREEN,
289+
)
290+
try:
291+
teardown_all(
292+
test_cfgs=test_cfgs,
293+
workspace_name=workspace_name,
294+
api_key=api_key,
295+
concurrency=concurrency,
296+
)
297+
except Exception as e:
298+
typer.secho(f"✘ Some test-cases failed during deletion: {e}", fg=typer.colors.RED)
299+
raise typer.Exit(code=1)
300+
301+
typer.secho("✔ All test-cases teardown attempted.", fg=typer.colors.GREEN)
302+
303+
304+
@app.command("delete-pipe")
305+
def delete_pipe(
306+
pipeline_name: str = typer.Option(..., "--name", "-n", help="Name of the pipeline to delete."),
307+
workspace_name: str = typer.Option(
308+
"default", "--workspace", "-w", help="Workspace from which to delete the pipeline."
309+
),
310+
api_key: str | None = typer.Option(
311+
None,
312+
"--api-key",
313+
"-k",
314+
help="Explicit DP_API_KEY to use (overrides environment).",
315+
),
316+
) -> None:
317+
"""Delete a single pipeline from `workspace_name`."""
318+
try:
319+
teardown_pipeline(
320+
pipeline_name=pipeline_name,
321+
workspace_name=workspace_name,
322+
api_key=api_key,
323+
)
324+
typer.secho(f"✔ Pipeline '{pipeline_name}' deleted from '{workspace_name}'.", fg=typer.colors.GREEN)
325+
except Exception as e:
326+
typer.secho(f"✘ Failed to delete pipeline '{pipeline_name}': {e}", fg=typer.colors.RED)
327+
raise typer.Exit(code=1)
328+
329+
330+
@app.command("delete-index")
331+
def delete_index(
332+
index_name: str = typer.Option(..., "--name", "-n", help="Name of the index to delete."),
333+
workspace_name: str = typer.Option(
334+
"default", "--workspace", "-w", help="Workspace from which to delete the index."
335+
),
336+
api_key: str | None = typer.Option(
337+
None,
338+
"--api-key",
339+
"-k",
340+
help="Explicit DP_API_KEY to use (overrides environment).",
341+
),
342+
) -> None:
343+
"""Delete a single index from `workspace_name`."""
344+
try:
345+
teardown_index(
346+
index_name=index_name,
347+
workspace_name=workspace_name,
348+
api_key=api_key,
349+
)
350+
typer.secho(f"✔ Index '{index_name}' deleted from '{workspace_name}'.", fg=typer.colors.GREEN)
351+
except Exception as e:
352+
typer.secho(f"✘ Failed to delete index '{index_name}': {e}", fg=typer.colors.RED)
353+
raise typer.Exit(code=1)
354+
355+
202356
def cli() -> None:
203357
"""Entrypoint for the benchmark CLI."""
204358
app()
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
import asyncio
2+
import os
3+
from typing import Any
4+
5+
from deepset_mcp.api.client import AsyncDeepsetClient
6+
from deepset_mcp.benchmark.runner.models import TestCaseConfig
7+
8+
9+
def _get_api_key(explicit_key: str | None) -> str:
10+
"""
11+
Return whichever API key to use: explicit_key takes precedence, otherwise read DP_API_KEY from the environment.
12+
13+
If still missing, raise ValueError.
14+
"""
15+
if explicit_key:
16+
return explicit_key
17+
env_key = os.getenv("DP_API_KEY")
18+
if not env_key:
19+
raise ValueError("No API key provided: pass --api-key or set DP_API_KEY in env.")
20+
return env_key
21+
22+
23+
# ─────────────────────────────────────────────────────────────────────────────
24+
# 1) LOW-LEVEL: "teardown_pipeline" and "teardown_index" using AsyncDeepsetClient as a context manager
25+
# ─────────────────────────────────────────────────────────────────────────────
26+
27+
28+
async def teardown_pipeline_async(
29+
*,
30+
pipeline_name: str,
31+
workspace_name: str,
32+
api_key: str | None = None,
33+
) -> None:
34+
"""
35+
Delete a pipeline in the given workspace.
36+
37+
Uses DP_API_KEY or explicit api_key.
38+
"""
39+
key_to_use = _get_api_key(api_key)
40+
async with AsyncDeepsetClient(api_key=key_to_use) as client:
41+
await client.pipelines(workspace=workspace_name).delete(pipeline_name)
42+
return None
43+
44+
45+
async def teardown_index_async(
46+
*,
47+
index_name: str,
48+
workspace_name: str,
49+
api_key: str | None = None,
50+
) -> None:
51+
"""
52+
Delete an index in the given workspace.
53+
54+
Uses DP_API_KEY or explicit api_key.
55+
"""
56+
key_to_use = _get_api_key(api_key)
57+
async with AsyncDeepsetClient(api_key=key_to_use) as client:
58+
await client.indexes(workspace=workspace_name).delete(index_name)
59+
return None
60+
61+
62+
# ─────────────────────────────────────────────────────────────────────────────
63+
# 2) MID-LEVEL: teardown a full test-case (pipeline + index if present)
64+
# ─────────────────────────────────────────────────────────────────────────────
65+
66+
67+
async def teardown_test_case_async(
68+
*,
69+
test_cfg: TestCaseConfig,
70+
workspace_name: str,
71+
api_key: str | None = None,
72+
) -> None:
73+
"""
74+
Given a TestCaseConfig, delete the index and the pipeline in the specified workspace.
75+
76+
Uses DP_API_KEY or explicit api_key.
77+
"""
78+
# 1) If there's a "query pipeline" to delete:
79+
if test_cfg.query_yaml:
80+
assert test_cfg.query_name is not None # already validated by Pydantic model; added to satisfy mypy
81+
await teardown_pipeline_async(
82+
pipeline_name=test_cfg.query_name,
83+
workspace_name=workspace_name,
84+
api_key=api_key,
85+
)
86+
87+
# 2) If there's an index to delete:
88+
if test_cfg.index_yaml:
89+
assert test_cfg.index_name is not None # already validated by Pydantic model; added to satisfy mypy
90+
await teardown_index_async(
91+
index_name=test_cfg.index_name,
92+
workspace_name=workspace_name,
93+
api_key=api_key,
94+
)
95+
96+
return None
97+
98+
99+
# ─────────────────────────────────────────────────────────────────────────────
100+
# 3) HIGH-LEVEL: parallel "teardown all" with configurable concurrency
101+
# ─────────────────────────────────────────────────────────────────────────────
102+
103+
104+
async def teardown_all_async(
105+
*,
106+
test_cfgs: list[TestCaseConfig],
107+
workspace_name: str,
108+
api_key: str | None = None,
109+
concurrency: int = 5,
110+
) -> None:
111+
"""
112+
Given a list of TestCaseConfig, delete all indexes and pipelines in parallel.
113+
114+
Uses DP_API_KEY or explicit api_key.
115+
"""
116+
semaphore = asyncio.Semaphore(concurrency)
117+
tasks: list[asyncio.Task[Any]] = []
118+
119+
async def sem_task(cfg: TestCaseConfig) -> str:
120+
async with semaphore:
121+
await teardown_test_case_async(test_cfg=cfg, workspace_name=workspace_name, api_key=api_key)
122+
return cfg.name
123+
124+
for cfg in test_cfgs:
125+
tasks.append(asyncio.create_task(sem_task(cfg)))
126+
127+
done, _ = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
128+
errors: list[Exception] = []
129+
for t in done:
130+
if t.exception():
131+
errors.append(t.exception()) # type: ignore
132+
133+
if errors:
134+
raise RuntimeError(f"Errors during teardown: {errors}")
135+
136+
return None
137+
138+
139+
# ─────────────────────────────────────────────────────────────────────────────
140+
# 4) SYNC WRAPPERS for all of the above (now accept api_key)
141+
# ─────────────────────────────────────────────────────────────────────────────
142+
143+
144+
def teardown_pipeline(
145+
*,
146+
pipeline_name: str,
147+
workspace_name: str,
148+
api_key: str | None = None,
149+
) -> None:
150+
"""Synchronous wrapper for teardown_pipeline_async. Blocks until the pipeline is deleted."""
151+
return asyncio.run(
152+
teardown_pipeline_async(
153+
pipeline_name=pipeline_name,
154+
workspace_name=workspace_name,
155+
api_key=api_key,
156+
)
157+
)
158+
159+
160+
def teardown_index(
161+
*,
162+
index_name: str,
163+
workspace_name: str,
164+
api_key: str | None = None,
165+
) -> None:
166+
"""Synchronous wrapper for teardown_index_async. Blocks until the index is deleted."""
167+
return asyncio.run(
168+
teardown_index_async(
169+
index_name=index_name,
170+
workspace_name=workspace_name,
171+
api_key=api_key,
172+
)
173+
)
174+
175+
176+
def teardown_test_case(
177+
*,
178+
test_cfg: TestCaseConfig,
179+
workspace_name: str,
180+
api_key: str | None = None,
181+
) -> None:
182+
"""Synchronous wrapper: blocks until both pipeline and index (if any) are deleted."""
183+
return asyncio.run(teardown_test_case_async(test_cfg=test_cfg, workspace_name=workspace_name, api_key=api_key))
184+
185+
186+
def teardown_all(
187+
*,
188+
test_cfgs: list[TestCaseConfig],
189+
workspace_name: str,
190+
api_key: str | None = None,
191+
concurrency: int = 5,
192+
) -> None:
193+
"""Synchronous wrapper for teardown_all_async. Blocks until all test-cases are deleted."""
194+
return asyncio.run(
195+
teardown_all_async(test_cfgs=test_cfgs, workspace_name=workspace_name, api_key=api_key, concurrency=concurrency)
196+
)

0 commit comments

Comments
 (0)