Skip to content

Commit 97a5c43

Browse files
SharangC96Copilot
andauthored
Add 'pipelinerun list' command to CLI (#1045)
* Add 'pipelinerun list' command to CLI Adds a new `clarifai pipelinerun list` (alias `pr ls`) command that lists runs for a pipeline version via the existing `ListPipelineVersionRuns` gRPC endpoint. - New `Pipeline.list_pipeline_version_runs(pipeline_version_id=None, ...)` client method using `list_pages_generator`. Falls back to the instance's `pipeline_version_id` when not passed; raises `UserError` if neither is set. - New CLI command supports `--user_id`, `--app_id`, `--pipeline_id`, `--pipeline_version_id`, `--page_no`, `--per_page`. Missing IDs are loaded from `config-lock.yaml` (consistent with pause/cancel/resume/monitor). All four IDs are required after lockfile resolution (matches the backend validator). - Renders results with `display_co_resources`: ID, USER_ID, APP_ID, PIPELINE_VERSION_ID, STATUS, CREATED_AT, MODIFIED_AT, STARTED_AT, ENDED_AT, sorted by CREATED_AT desc. - Tests: 4 new tests covering flag invocation, lockfile fallback, missing required params, and missing pipeline_version_id. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Apply ruff format Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 7c17de0 commit 97a5c43

3 files changed

Lines changed: 257 additions & 1 deletion

File tree

clarifai/cli/pipeline_run.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,85 @@ def resume(
320320
raise click.ClickException(str(e))
321321

322322

323+
@pipelinerun.command(['ls'])
324+
@click.option('--page_no', required=False, type=int, help='Page number to list.', default=1)
325+
@click.option('--per_page', required=False, type=int, help='Number of items per page.', default=16)
326+
@click.option('--user_id', required=False, help='User ID that owns the pipeline.')
327+
@click.option('--app_id', required=False, help='App ID that contains the pipeline.')
328+
@click.option('--pipeline_id', required=False, help='Pipeline ID.')
329+
@click.option('--pipeline_version_id', required=False, help='Pipeline Version ID.')
330+
@click.pass_context
331+
def list(ctx, page_no, per_page, user_id, app_id, pipeline_id, pipeline_version_id):
332+
"""List pipeline version runs for a pipeline version.
333+
334+
\b
335+
Examples:
336+
# List runs for a pipeline version (parameters loaded from config-lock.yaml)
337+
clarifai pr list
338+
339+
\b
340+
# With explicit parameters
341+
clarifai pr ls \\
342+
--user_id=USER_ID \\
343+
--app_id=APP_ID \\
344+
--pipeline_id=PIPELINE_ID \\
345+
--pipeline_version_id=VERSION_ID
346+
"""
347+
from clarifai.utils.cli import convert_timestamp_to_string, display_co_resources
348+
349+
validate_context(ctx)
350+
351+
# Load parameters from config-lock.yaml if not provided.
352+
user_id, app_id, pipeline_id, pipeline_version_id = _load_pipeline_params_from_config(
353+
user_id, app_id, pipeline_id, pipeline_version_id
354+
)
355+
356+
# All four IDs are required by the backend's ListPipelineVersionRuns endpoint.
357+
_validate_pipeline_params(user_id, app_id, pipeline_id, pipeline_version_id)
358+
359+
pipeline = _create_pipeline(ctx, user_id, app_id, pipeline_id, pipeline_version_id)
360+
361+
try:
362+
response = pipeline.list_pipeline_version_runs(page_no=page_no, per_page=per_page)
363+
364+
def _status(run):
365+
status = run.get('orchestration_status') or {}
366+
inner = status.get('status') or {}
367+
code = inner.get('code')
368+
if code is None:
369+
return ''
370+
try:
371+
from clarifai_grpc.grpc.api.status import status_code_pb2
372+
373+
if isinstance(code, int):
374+
return status_code_pb2.StatusCode.Name(code)
375+
return str(code)
376+
except Exception:
377+
return str(code)
378+
379+
display_co_resources(
380+
response,
381+
custom_columns={
382+
'ID': lambda r: r.get('pipeline_version_run_id', '') or r.get('id', ''),
383+
'USER_ID': lambda r: r.get('user_id', ''),
384+
'APP_ID': lambda r: r.get('app_id', ''),
385+
'PIPELINE_VERSION_ID': lambda r: (r.get('pipeline_version') or {}).get('id', ''),
386+
'STATUS': _status,
387+
'CREATED_AT': lambda r: convert_timestamp_to_string(r.get('created_at', '')),
388+
'MODIFIED_AT': lambda r: convert_timestamp_to_string(r.get('modified_at', '')),
389+
'STARTED_AT': lambda r: r.get('started_at', '') or '',
390+
'ENDED_AT': lambda r: r.get('ended_at', '') or '',
391+
},
392+
sort_by_columns=[
393+
('CREATED_AT', 'desc'),
394+
('ID', 'asc'),
395+
],
396+
)
397+
except Exception as e:
398+
logger.error(f"Failed to list pipeline version runs: {e}")
399+
raise click.ClickException(str(e))
400+
401+
323402
@pipelinerun.command()
324403
@click.argument('pipeline_version_run_id', required=False)
325404
@click.option(

clarifai/client/pipeline.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
22
import time
33
import uuid
4-
from typing import Dict, List, Optional
4+
from typing import Any, Dict, Generator, List, Optional
55

66
from clarifai_grpc.grpc.api import resources_pb2, service_pb2
77
from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2
@@ -355,6 +355,66 @@ def _display_new_logs(self, run_id: str, seen_logs: set, current_page: int = 1)
355355
# Return current page on error to retry the same page next fetch
356356
return current_page
357357

358+
def list_pipeline_version_runs(
359+
self,
360+
pipeline_version_id: Optional[str] = None,
361+
filter_by: Dict[str, Any] = None,
362+
page_no: int = None,
363+
per_page: int = None,
364+
) -> Generator[Dict[str, Any], None, None]:
365+
"""Lists all pipeline version runs for a pipeline version.
366+
367+
Args:
368+
pipeline_version_id (str): Pipeline Version ID whose runs should be listed.
369+
If not provided, falls back to ``self.pipeline_version_id``.
370+
filter_by (dict): Additional filters to apply to the list request. May include
371+
``status_codes``, ``compute_cluster_id``, or ``nodepool_id``.
372+
page_no (int): The page number to list.
373+
per_page (int): The number of items per page.
374+
375+
Yields:
376+
Dict: Dictionaries containing information about each pipeline version run.
377+
378+
Raises:
379+
UserError: If neither ``pipeline_version_id`` is provided nor set on this
380+
Pipeline instance.
381+
382+
Example:
383+
>>> from clarifai.client.pipeline import Pipeline
384+
>>> pipeline = Pipeline(pipeline_id="pid", user_id="uid", app_id="aid")
385+
>>> for run in pipeline.list_pipeline_version_runs(pipeline_version_id="vid"):
386+
... print(run)
387+
388+
Note:
389+
Defaults to 16 per page if page_no is specified and per_page is not specified.
390+
If both page_no and per_page are None, then lists all the resources.
391+
"""
392+
version_id = pipeline_version_id or self.pipeline_version_id
393+
if not version_id:
394+
raise UserError(
395+
"pipeline_version_id is required to list pipeline version runs. "
396+
"Pass it as an argument or set it on the Pipeline instance."
397+
)
398+
399+
request_data = dict(
400+
user_app_id=self.user_app_id,
401+
pipeline_id=self.pipeline_id,
402+
pipeline_version_id=version_id,
403+
)
404+
if filter_by:
405+
request_data.update(filter_by)
406+
407+
all_runs_info = self.list_pages_generator(
408+
self.STUB.ListPipelineVersionRuns,
409+
service_pb2.ListPipelineVersionRunsRequest,
410+
request_data,
411+
per_page=per_page,
412+
page_no=page_no,
413+
)
414+
415+
for run_info in all_runs_info:
416+
yield run_info
417+
358418
def patch_pipeline_version_run(
359419
self,
360420
pipeline_version_run_id: str,

tests/cli/test_pipeline_run.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,3 +403,120 @@ def test_monitor_without_run_id(self, mock_validate, runner, mock_context):
403403

404404
assert result.exit_code != 0
405405
assert 'pipeline_version_run_id is required' in result.output
406+
407+
408+
class TestPipelineRunList:
409+
"""Test cases for list command."""
410+
411+
@patch('clarifai.cli.pipeline_run.validate_context')
412+
@patch('clarifai.client.pipeline.Pipeline')
413+
def test_list_with_flags(self, mock_pipeline_class, mock_validate, runner, mock_context):
414+
"""Test list command with explicit flags."""
415+
from clarifai.cli.pipeline_run import list as list_cmd
416+
417+
mock_pipeline = MagicMock()
418+
mock_pipeline_class.return_value = mock_pipeline
419+
mock_pipeline.list_pipeline_version_runs.return_value = iter(
420+
[
421+
{
422+
'pipeline_version_run_id': 'run-1',
423+
'user_id': 'test-user',
424+
'app_id': 'test-app',
425+
'pipeline_version': {'id': 'v1'},
426+
'orchestration_status': {'status': {'code': status_code_pb2.JOB_RUNNING}},
427+
}
428+
]
429+
)
430+
431+
result = runner.invoke(
432+
list_cmd,
433+
[
434+
'--user_id=test-user',
435+
'--app_id=test-app',
436+
'--pipeline_id=test-pipeline',
437+
'--pipeline_version_id=v1',
438+
],
439+
obj=mock_context.obj,
440+
)
441+
442+
assert result.exit_code == 0, result.output
443+
mock_pipeline_class.assert_called_once_with(
444+
pipeline_id='test-pipeline',
445+
pipeline_version_id='v1',
446+
user_id='test-user',
447+
app_id='test-app',
448+
pat='test-pat',
449+
base_url='https://api.clarifai.com',
450+
)
451+
mock_pipeline.list_pipeline_version_runs.assert_called_once_with(page_no=1, per_page=16)
452+
assert 'run-1' in result.output
453+
454+
@patch('clarifai.cli.pipeline_run.validate_context')
455+
@patch('clarifai.cli.pipeline_run.from_yaml')
456+
@patch('os.path.exists')
457+
@patch('clarifai.client.pipeline.Pipeline')
458+
def test_list_with_config_lock(
459+
self,
460+
mock_pipeline_class,
461+
mock_exists,
462+
mock_from_yaml,
463+
mock_validate,
464+
runner,
465+
mock_context,
466+
config_lock_data,
467+
):
468+
"""Test list command loading parameters from config-lock.yaml."""
469+
from clarifai.cli.pipeline_run import list as list_cmd
470+
471+
mock_exists.return_value = True
472+
mock_from_yaml.return_value = config_lock_data
473+
mock_pipeline = MagicMock()
474+
mock_pipeline_class.return_value = mock_pipeline
475+
mock_pipeline.list_pipeline_version_runs.return_value = iter([])
476+
477+
result = runner.invoke(list_cmd, [], obj=mock_context.obj)
478+
479+
assert result.exit_code == 0, result.output
480+
mock_pipeline_class.assert_called_once_with(
481+
pipeline_id='test-pipeline',
482+
pipeline_version_id='v1',
483+
user_id='test-user',
484+
app_id='test-app',
485+
pat='test-pat',
486+
base_url='https://api.clarifai.com',
487+
)
488+
489+
@patch('clarifai.cli.pipeline_run.validate_context')
490+
@patch('os.path.exists')
491+
def test_list_without_required_params(self, mock_exists, mock_validate, runner, mock_context):
492+
"""Test list command fails without required parameters and no config-lock.yaml."""
493+
from clarifai.cli.pipeline_run import list as list_cmd
494+
495+
mock_exists.return_value = False
496+
497+
result = runner.invoke(list_cmd, [], obj=mock_context.obj)
498+
499+
assert result.exit_code != 0
500+
assert 'Missing required parameters' in result.output
501+
502+
@patch('clarifai.cli.pipeline_run.validate_context')
503+
@patch('clarifai.client.pipeline.Pipeline')
504+
def test_list_without_pipeline_version_id(
505+
self, mock_pipeline_class, mock_validate, runner, mock_context
506+
):
507+
"""Test list command fails when pipeline_version_id is missing (required by backend)."""
508+
from clarifai.cli.pipeline_run import list as list_cmd
509+
510+
result = runner.invoke(
511+
list_cmd,
512+
[
513+
'--user_id=test-user',
514+
'--app_id=test-app',
515+
'--pipeline_id=test-pipeline',
516+
],
517+
obj=mock_context.obj,
518+
)
519+
520+
assert result.exit_code != 0
521+
assert 'Missing required parameters' in result.output
522+
mock_pipeline_class.assert_not_called()

0 commit comments

Comments
 (0)