Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions clarifai/cli/pipeline_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,85 @@ def resume(
raise click.ClickException(str(e))


@pipelinerun.command(['ls'])
@click.option('--page_no', required=False, type=int, help='Page number to list.', default=1)
@click.option('--per_page', required=False, type=int, help='Number of items per page.', default=16)
@click.option('--user_id', required=False, help='User ID that owns the pipeline.')
@click.option('--app_id', required=False, help='App ID that contains the pipeline.')
@click.option('--pipeline_id', required=False, help='Pipeline ID.')
@click.option('--pipeline_version_id', required=False, help='Pipeline Version ID.')
@click.pass_context
def list(ctx, page_no, per_page, user_id, app_id, pipeline_id, pipeline_version_id):
"""List pipeline version runs for a pipeline version.

\b
Examples:
# List runs for a pipeline version (parameters loaded from config-lock.yaml)
clarifai pr list

\b
# With explicit parameters
clarifai pr ls \\
--user_id=USER_ID \\
--app_id=APP_ID \\
--pipeline_id=PIPELINE_ID \\
--pipeline_version_id=VERSION_ID
"""
from clarifai.utils.cli import convert_timestamp_to_string, display_co_resources

validate_context(ctx)

# Load parameters from config-lock.yaml if not provided.
user_id, app_id, pipeline_id, pipeline_version_id = _load_pipeline_params_from_config(
user_id, app_id, pipeline_id, pipeline_version_id
)

# All four IDs are required by the backend's ListPipelineVersionRuns endpoint.
_validate_pipeline_params(user_id, app_id, pipeline_id, pipeline_version_id)

pipeline = _create_pipeline(ctx, user_id, app_id, pipeline_id, pipeline_version_id)

try:
response = pipeline.list_pipeline_version_runs(page_no=page_no, per_page=per_page)

def _status(run):
status = run.get('orchestration_status') or {}
inner = status.get('status') or {}
code = inner.get('code')
if code is None:
return ''
try:
from clarifai_grpc.grpc.api.status import status_code_pb2

if isinstance(code, int):
return status_code_pb2.StatusCode.Name(code)
return str(code)
except Exception:
return str(code)

display_co_resources(
response,
custom_columns={
'ID': lambda r: r.get('pipeline_version_run_id', '') or r.get('id', ''),
'USER_ID': lambda r: r.get('user_id', ''),
'APP_ID': lambda r: r.get('app_id', ''),
'PIPELINE_VERSION_ID': lambda r: (r.get('pipeline_version') or {}).get('id', ''),
'STATUS': _status,
'CREATED_AT': lambda r: convert_timestamp_to_string(r.get('created_at', '')),
'MODIFIED_AT': lambda r: convert_timestamp_to_string(r.get('modified_at', '')),
'STARTED_AT': lambda r: r.get('started_at', '') or '',
'ENDED_AT': lambda r: r.get('ended_at', '') or '',
},
sort_by_columns=[
('CREATED_AT', 'desc'),
('ID', 'asc'),
],
)
except Exception as e:
logger.error(f"Failed to list pipeline version runs: {e}")
raise click.ClickException(str(e))


@pipelinerun.command()
@click.argument('pipeline_version_run_id', required=False)
@click.option(
Expand Down
62 changes: 61 additions & 1 deletion clarifai/client/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import time
import uuid
from typing import Dict, List, Optional
from typing import Any, Dict, Generator, List, Optional

from clarifai_grpc.grpc.api import resources_pb2, service_pb2
from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2
Expand Down Expand Up @@ -355,6 +355,66 @@ def _display_new_logs(self, run_id: str, seen_logs: set, current_page: int = 1)
# Return current page on error to retry the same page next fetch
return current_page

def list_pipeline_version_runs(
self,
pipeline_version_id: Optional[str] = None,
filter_by: Dict[str, Any] = None,
page_no: int = None,
per_page: int = None,
) -> Generator[Dict[str, Any], None, None]:
"""Lists all pipeline version runs for a pipeline version.

Args:
pipeline_version_id (str): Pipeline Version ID whose runs should be listed.
If not provided, falls back to ``self.pipeline_version_id``.
filter_by (dict): Additional filters to apply to the list request. May include
``status_codes``, ``compute_cluster_id``, or ``nodepool_id``.
page_no (int): The page number to list.
per_page (int): The number of items per page.

Yields:
Dict: Dictionaries containing information about each pipeline version run.

Raises:
UserError: If neither ``pipeline_version_id`` is provided nor set on this
Pipeline instance.

Example:
>>> from clarifai.client.pipeline import Pipeline
>>> pipeline = Pipeline(pipeline_id="pid", user_id="uid", app_id="aid")
>>> for run in pipeline.list_pipeline_version_runs(pipeline_version_id="vid"):
... print(run)

Note:
Defaults to 16 per page if page_no is specified and per_page is not specified.
If both page_no and per_page are None, then lists all the resources.
"""
version_id = pipeline_version_id or self.pipeline_version_id
if not version_id:
raise UserError(
"pipeline_version_id is required to list pipeline version runs. "
"Pass it as an argument or set it on the Pipeline instance."
)

request_data = dict(
user_app_id=self.user_app_id,
pipeline_id=self.pipeline_id,
pipeline_version_id=version_id,
)
if filter_by:
request_data.update(filter_by)

all_runs_info = self.list_pages_generator(
self.STUB.ListPipelineVersionRuns,
service_pb2.ListPipelineVersionRunsRequest,
request_data,
per_page=per_page,
page_no=page_no,
)

for run_info in all_runs_info:
yield run_info

def patch_pipeline_version_run(
self,
pipeline_version_run_id: str,
Expand Down
117 changes: 117 additions & 0 deletions tests/cli/test_pipeline_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,120 @@ def test_monitor_without_run_id(self, mock_validate, runner, mock_context):

assert result.exit_code != 0
assert 'pipeline_version_run_id is required' in result.output


class TestPipelineRunList:
"""Test cases for list command."""

@patch('clarifai.cli.pipeline_run.validate_context')
@patch('clarifai.client.pipeline.Pipeline')
def test_list_with_flags(self, mock_pipeline_class, mock_validate, runner, mock_context):
"""Test list command with explicit flags."""
from clarifai.cli.pipeline_run import list as list_cmd

mock_pipeline = MagicMock()
mock_pipeline_class.return_value = mock_pipeline
mock_pipeline.list_pipeline_version_runs.return_value = iter(
[
{
'pipeline_version_run_id': 'run-1',
'user_id': 'test-user',
'app_id': 'test-app',
'pipeline_version': {'id': 'v1'},
'orchestration_status': {'status': {'code': status_code_pb2.JOB_RUNNING}},
}
]
)

result = runner.invoke(
list_cmd,
[
'--user_id=test-user',
'--app_id=test-app',
'--pipeline_id=test-pipeline',
'--pipeline_version_id=v1',
],
obj=mock_context.obj,
)

assert result.exit_code == 0, result.output
mock_pipeline_class.assert_called_once_with(
pipeline_id='test-pipeline',
pipeline_version_id='v1',
user_id='test-user',
app_id='test-app',
pat='test-pat',
base_url='https://api.clarifai.com',
)
mock_pipeline.list_pipeline_version_runs.assert_called_once_with(page_no=1, per_page=16)
assert 'run-1' in result.output

@patch('clarifai.cli.pipeline_run.validate_context')
@patch('clarifai.cli.pipeline_run.from_yaml')
@patch('os.path.exists')
@patch('clarifai.client.pipeline.Pipeline')
def test_list_with_config_lock(
self,
mock_pipeline_class,
mock_exists,
mock_from_yaml,
mock_validate,
runner,
mock_context,
config_lock_data,
):
"""Test list command loading parameters from config-lock.yaml."""
from clarifai.cli.pipeline_run import list as list_cmd

mock_exists.return_value = True
mock_from_yaml.return_value = config_lock_data
mock_pipeline = MagicMock()
mock_pipeline_class.return_value = mock_pipeline
mock_pipeline.list_pipeline_version_runs.return_value = iter([])

result = runner.invoke(list_cmd, [], obj=mock_context.obj)

assert result.exit_code == 0, result.output
mock_pipeline_class.assert_called_once_with(
pipeline_id='test-pipeline',
pipeline_version_id='v1',
user_id='test-user',
app_id='test-app',
pat='test-pat',
base_url='https://api.clarifai.com',
)

@patch('clarifai.cli.pipeline_run.validate_context')
@patch('os.path.exists')
def test_list_without_required_params(self, mock_exists, mock_validate, runner, mock_context):
"""Test list command fails without required parameters and no config-lock.yaml."""
from clarifai.cli.pipeline_run import list as list_cmd

mock_exists.return_value = False

result = runner.invoke(list_cmd, [], obj=mock_context.obj)

assert result.exit_code != 0
assert 'Missing required parameters' in result.output

@patch('clarifai.cli.pipeline_run.validate_context')
@patch('clarifai.client.pipeline.Pipeline')
def test_list_without_pipeline_version_id(
self, mock_pipeline_class, mock_validate, runner, mock_context
):
"""Test list command fails when pipeline_version_id is missing (required by backend)."""
from clarifai.cli.pipeline_run import list as list_cmd

result = runner.invoke(
list_cmd,
[
'--user_id=test-user',
'--app_id=test-app',
'--pipeline_id=test-pipeline',
],
obj=mock_context.obj,
)

assert result.exit_code != 0
assert 'Missing required parameters' in result.output
mock_pipeline_class.assert_not_called()
Loading