diff --git a/clarifai/cli/pipeline_run.py b/clarifai/cli/pipeline_run.py index 25a86166..1497b54d 100644 --- a/clarifai/cli/pipeline_run.py +++ b/clarifai/cli/pipeline_run.py @@ -320,6 +320,85 @@ def resume( raise click.ClickException(str(e)) +@pipelinerun.command(['ls']) +@click.option('--page_no', required=False, type=int, help='Page number to list.', default=1) +@click.option('--per_page', required=False, type=int, help='Number of items per page.', default=16) +@click.option('--user_id', required=False, help='User ID that owns the pipeline.') +@click.option('--app_id', required=False, help='App ID that contains the pipeline.') +@click.option('--pipeline_id', required=False, help='Pipeline ID.') +@click.option('--pipeline_version_id', required=False, help='Pipeline Version ID.') +@click.pass_context +def list(ctx, page_no, per_page, user_id, app_id, pipeline_id, pipeline_version_id): + """List pipeline version runs for a pipeline version. + + \b + Examples: + # List runs for a pipeline version (parameters loaded from config-lock.yaml) + clarifai pr list + + \b + # With explicit parameters + clarifai pr ls \\ + --user_id=USER_ID \\ + --app_id=APP_ID \\ + --pipeline_id=PIPELINE_ID \\ + --pipeline_version_id=VERSION_ID + """ + from clarifai.utils.cli import convert_timestamp_to_string, display_co_resources + + validate_context(ctx) + + # Load parameters from config-lock.yaml if not provided. + user_id, app_id, pipeline_id, pipeline_version_id = _load_pipeline_params_from_config( + user_id, app_id, pipeline_id, pipeline_version_id + ) + + # All four IDs are required by the backend's ListPipelineVersionRuns endpoint. + _validate_pipeline_params(user_id, app_id, pipeline_id, pipeline_version_id) + + pipeline = _create_pipeline(ctx, user_id, app_id, pipeline_id, pipeline_version_id) + + try: + response = pipeline.list_pipeline_version_runs(page_no=page_no, per_page=per_page) + + def _status(run): + status = run.get('orchestration_status') or {} + inner = status.get('status') or {} + code = inner.get('code') + if code is None: + return '' + try: + from clarifai_grpc.grpc.api.status import status_code_pb2 + + if isinstance(code, int): + return status_code_pb2.StatusCode.Name(code) + return str(code) + except Exception: + return str(code) + + display_co_resources( + response, + custom_columns={ + 'ID': lambda r: r.get('pipeline_version_run_id', '') or r.get('id', ''), + 'USER_ID': lambda r: r.get('user_id', ''), + 'APP_ID': lambda r: r.get('app_id', ''), + 'PIPELINE_VERSION_ID': lambda r: (r.get('pipeline_version') or {}).get('id', ''), + 'STATUS': _status, + 'CREATED_AT': lambda r: convert_timestamp_to_string(r.get('created_at', '')), + 'MODIFIED_AT': lambda r: convert_timestamp_to_string(r.get('modified_at', '')), + 'STARTED_AT': lambda r: r.get('started_at', '') or '', + 'ENDED_AT': lambda r: r.get('ended_at', '') or '', + }, + sort_by_columns=[ + ('CREATED_AT', 'desc'), + ('ID', 'asc'), + ], + ) + except Exception as e: + logger.error(f"Failed to list pipeline version runs: {e}") + raise click.ClickException(str(e)) + + @pipelinerun.command() @click.argument('pipeline_version_run_id', required=False) @click.option( diff --git a/clarifai/client/pipeline.py b/clarifai/client/pipeline.py index c7111aa9..93061d9f 100644 --- a/clarifai/client/pipeline.py +++ b/clarifai/client/pipeline.py @@ -1,7 +1,7 @@ import json import time import uuid -from typing import Dict, List, Optional +from typing import Any, Dict, Generator, List, Optional from clarifai_grpc.grpc.api import resources_pb2, service_pb2 from clarifai_grpc.grpc.api.status import status_code_pb2, status_pb2 @@ -355,6 +355,66 @@ def _display_new_logs(self, run_id: str, seen_logs: set, current_page: int = 1) # Return current page on error to retry the same page next fetch return current_page + def list_pipeline_version_runs( + self, + pipeline_version_id: Optional[str] = None, + filter_by: Dict[str, Any] = None, + page_no: int = None, + per_page: int = None, + ) -> Generator[Dict[str, Any], None, None]: + """Lists all pipeline version runs for a pipeline version. + + Args: + pipeline_version_id (str): Pipeline Version ID whose runs should be listed. + If not provided, falls back to ``self.pipeline_version_id``. + filter_by (dict): Additional filters to apply to the list request. May include + ``status_codes``, ``compute_cluster_id``, or ``nodepool_id``. + page_no (int): The page number to list. + per_page (int): The number of items per page. + + Yields: + Dict: Dictionaries containing information about each pipeline version run. + + Raises: + UserError: If neither ``pipeline_version_id`` is provided nor set on this + Pipeline instance. + + Example: + >>> from clarifai.client.pipeline import Pipeline + >>> pipeline = Pipeline(pipeline_id="pid", user_id="uid", app_id="aid") + >>> for run in pipeline.list_pipeline_version_runs(pipeline_version_id="vid"): + ... print(run) + + Note: + Defaults to 16 per page if page_no is specified and per_page is not specified. + If both page_no and per_page are None, then lists all the resources. + """ + version_id = pipeline_version_id or self.pipeline_version_id + if not version_id: + raise UserError( + "pipeline_version_id is required to list pipeline version runs. " + "Pass it as an argument or set it on the Pipeline instance." + ) + + request_data = dict( + user_app_id=self.user_app_id, + pipeline_id=self.pipeline_id, + pipeline_version_id=version_id, + ) + if filter_by: + request_data.update(filter_by) + + all_runs_info = self.list_pages_generator( + self.STUB.ListPipelineVersionRuns, + service_pb2.ListPipelineVersionRunsRequest, + request_data, + per_page=per_page, + page_no=page_no, + ) + + for run_info in all_runs_info: + yield run_info + def patch_pipeline_version_run( self, pipeline_version_run_id: str, diff --git a/tests/cli/test_pipeline_run.py b/tests/cli/test_pipeline_run.py index aa3591f8..1e41148e 100644 --- a/tests/cli/test_pipeline_run.py +++ b/tests/cli/test_pipeline_run.py @@ -403,3 +403,120 @@ def test_monitor_without_run_id(self, mock_validate, runner, mock_context): assert result.exit_code != 0 assert 'pipeline_version_run_id is required' in result.output + + +class TestPipelineRunList: + """Test cases for list command.""" + + @patch('clarifai.cli.pipeline_run.validate_context') + @patch('clarifai.client.pipeline.Pipeline') + def test_list_with_flags(self, mock_pipeline_class, mock_validate, runner, mock_context): + """Test list command with explicit flags.""" + from clarifai.cli.pipeline_run import list as list_cmd + + mock_pipeline = MagicMock() + mock_pipeline_class.return_value = mock_pipeline + mock_pipeline.list_pipeline_version_runs.return_value = iter( + [ + { + 'pipeline_version_run_id': 'run-1', + 'user_id': 'test-user', + 'app_id': 'test-app', + 'pipeline_version': {'id': 'v1'}, + 'orchestration_status': {'status': {'code': status_code_pb2.JOB_RUNNING}}, + } + ] + ) + + result = runner.invoke( + list_cmd, + [ + '--user_id=test-user', + '--app_id=test-app', + '--pipeline_id=test-pipeline', + '--pipeline_version_id=v1', + ], + obj=mock_context.obj, + ) + + assert result.exit_code == 0, result.output + mock_pipeline_class.assert_called_once_with( + pipeline_id='test-pipeline', + pipeline_version_id='v1', + user_id='test-user', + app_id='test-app', + pat='test-pat', + base_url='https://api.clarifai.com', + ) + mock_pipeline.list_pipeline_version_runs.assert_called_once_with(page_no=1, per_page=16) + assert 'run-1' in result.output + + @patch('clarifai.cli.pipeline_run.validate_context') + @patch('clarifai.cli.pipeline_run.from_yaml') + @patch('os.path.exists') + @patch('clarifai.client.pipeline.Pipeline') + def test_list_with_config_lock( + self, + mock_pipeline_class, + mock_exists, + mock_from_yaml, + mock_validate, + runner, + mock_context, + config_lock_data, + ): + """Test list command loading parameters from config-lock.yaml.""" + from clarifai.cli.pipeline_run import list as list_cmd + + mock_exists.return_value = True + mock_from_yaml.return_value = config_lock_data + mock_pipeline = MagicMock() + mock_pipeline_class.return_value = mock_pipeline + mock_pipeline.list_pipeline_version_runs.return_value = iter([]) + + result = runner.invoke(list_cmd, [], obj=mock_context.obj) + + assert result.exit_code == 0, result.output + mock_pipeline_class.assert_called_once_with( + pipeline_id='test-pipeline', + pipeline_version_id='v1', + user_id='test-user', + app_id='test-app', + pat='test-pat', + base_url='https://api.clarifai.com', + ) + + @patch('clarifai.cli.pipeline_run.validate_context') + @patch('os.path.exists') + def test_list_without_required_params(self, mock_exists, mock_validate, runner, mock_context): + """Test list command fails without required parameters and no config-lock.yaml.""" + from clarifai.cli.pipeline_run import list as list_cmd + + mock_exists.return_value = False + + result = runner.invoke(list_cmd, [], obj=mock_context.obj) + + assert result.exit_code != 0 + assert 'Missing required parameters' in result.output + + @patch('clarifai.cli.pipeline_run.validate_context') + @patch('clarifai.client.pipeline.Pipeline') + def test_list_without_pipeline_version_id( + self, mock_pipeline_class, mock_validate, runner, mock_context + ): + """Test list command fails when pipeline_version_id is missing (required by backend).""" + from clarifai.cli.pipeline_run import list as list_cmd + + result = runner.invoke( + list_cmd, + [ + '--user_id=test-user', + '--app_id=test-app', + '--pipeline_id=test-pipeline', + ], + obj=mock_context.obj, + ) + + assert result.exit_code != 0 + assert 'Missing required parameters' in result.output + mock_pipeline_class.assert_not_called()