Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 46 additions & 6 deletions clarifai/cli/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,17 @@ def pipeline():
is_flag=True,
help='Skip creating config-lock.yaml file.',
)
def upload(path, no_lockfile):
@click.option(
'--user_id',
default=None,
help='Override the user_id from the Clarifai context.',
)
@click.option(
'--app_id',
default=None,
help='Override the app_id from the Clarifai context.',
)
def upload(path, no_lockfile, user_id, app_id):
"""Upload a pipeline with associated pipeline steps to Clarifai.

PATH: Path to the pipeline configuration file or directory containing config.yaml. If not specified, the current directory is used by default.
Expand All @@ -110,6 +120,10 @@ def upload(path, no_lockfile):

if os.path.isfile(path) and path.endswith('.py'):
pipeline_obj = load_pipeline_from_file(path)
if user_id:
pipeline_obj.user_id = user_id
if app_id:
pipeline_obj.app_id = app_id
output_dir = os.path.join(
os.path.dirname(os.path.abspath(path)), f'generated-{pipeline_obj.id}'
)
Expand All @@ -128,16 +142,42 @@ def upload(path, no_lockfile):
required=True,
help='Directory to write the compiled pipeline config and step folders.',
)
def compile(path, output_dir):
"""Compile YAML/config-based pipeline assets from a Python pipeline definition."""
@click.option('--user_id', default=None, help='Override the user_id from the Clarifai context.')
@click.option('--app_id', default=None, help='Override the app_id from the Clarifai context.')
def compile(path, output_dir, user_id, app_id):
"""Compile YAML/config-based pipeline assets from a Python pipeline definition.

Generates config.yaml, step directories (with requirements.txt and
pipeline_step.py), and a Dockerfile for each locally managed step.
"""
from clarifai.runners.pipeline_steps.pipeline_step_builder import PipelineStepBuilder
from clarifai.runners.pipelines import load_pipeline_from_file

if not os.path.isfile(path) or not path.endswith('.py'):
raise click.UsageError('clarifai pipeline compile expects a Python file path.')

pipeline_obj = load_pipeline_from_file(path)
if user_id:
pipeline_obj.user_id = user_id
if app_id:
pipeline_obj.app_id = app_id
config_path = pipeline_obj.generate(output_dir)
logger.info(f"Generated pipeline assets at {config_path}")

# Generate Dockerfiles for all locally managed step directories.
seen: set = set()
step_ids = []
for node in pipeline_obj.nodes:
sid = node.step_definition.id
if node.step_definition.is_managed and sid not in seen:
seen.add(sid)
step_ids.append(sid)
for step_id in step_ids:
step_dir = os.path.join(output_dir, step_id)
if os.path.isdir(step_dir):
PipelineStepBuilder(step_dir).create_dockerfile()
logger.info(f"Generated Dockerfile for step '{step_id}'")

logger.info(f'Generated pipeline assets at {config_path}')


@pipeline.command()
Expand Down Expand Up @@ -1049,7 +1089,7 @@ def validate_lock(lockfile_path):
raise click.Abort()


@pipeline.command(['ls'])
@pipeline.command(name='list', aliases=['ls'])
@click.option('--page_no', required=False, help='Page number to list.', default=1)
@click.option('--per_page', required=False, help='Number of items per page.', default=16)
@click.option(
Expand All @@ -1063,7 +1103,7 @@ def validate_lock(lockfile_path):
help='User ID to list pipelines from. If not provided, uses current user.',
)
@click.pass_context
def list(ctx, page_no, per_page, app_id, user_id):
def list_pipelines(ctx, page_no, per_page, app_id, user_id):
"""List all pipelines for the user."""
validate_context(ctx)

Expand Down
46 changes: 25 additions & 21 deletions clarifai/runners/pipeline_steps/pipeline_step_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import sys
import tarfile
import time
from string import Template
from typing import List, Optional

import yaml
Expand Down Expand Up @@ -240,41 +239,46 @@ def create_pipeline_step(self):

def create_dockerfile(self):
"""Create a Dockerfile for the pipeline step."""
# Use similar logic to model builder for dockerfile creation
dockerfile_template = """FROM --platform=$TARGETPLATFORM public.ecr.aws/clarifai-models/python-base:$PYTHON_VERSION-df565436eea93efb3e8d1eb558a0a46df29523ec as final

COPY --link requirements.txt /home/nonroot/requirements.txt

# Update clarifai package so we always have latest protocol to the API. Everything should land in /venv
RUN ["pip", "install", "--no-cache-dir", "-r", "/home/nonroot/requirements.txt"]

# Copy in the actual files like config.yaml, requirements.txt, and most importantly 1/pipeline_step.py for the actual pipeline step.
COPY --link=true 1 /home/nonroot/main/1
# At this point we only need these for validation in the SDK.
COPY --link=true requirements.txt config.yaml /home/nonroot/main/
"""

# Get Python version from config or use default
build_info = self.config.get('build_info', {})
python_version = build_info.get('python_version', '3.12')
base_image = build_info.get('base_image')
platform = build_info.get('platform')

# Ensure requirements.txt has clarifai
self._ensure_clarifai_requirement()

# Replace placeholders
dockerfile_content = Template(dockerfile_template).safe_substitute(
PYTHON_VERSION=python_version
platform_str = f'--platform={platform}' if platform else ''
image = (
base_image
or f'public.ecr.aws/clarifai-models/python-base:{python_version}-df565436eea93efb3e8d1eb558a0a46df29523ec'
)

dockerfile_content = (
f'FROM {platform_str} {image} as final\n'
'\n'
'COPY --link requirements.txt /home/nonroot/requirements.txt\n'
'\n'
'# Install uv, create a venv, and install requirements\n'
f'RUN pip install uv && uv venv /tmp/venv --python {python_version} --clear\n'
'ENV VIRTUAL_ENV=/tmp/venv\n'
'ENV PATH="/tmp/venv/bin:$PATH"\n'
'RUN uv pip install --no-cache-dir -r /home/nonroot/requirements.txt\n'
'\n'
'# Copy in the actual files like config.yaml, requirements.txt, and most importantly 1/pipeline_step.py for the actual pipeline step.\n'
'COPY --link=true 1 /home/nonroot/main/1\n'
'# At this point we only need these for validation in the SDK.\n'
'COPY --link=true requirements.txt config.yaml /home/nonroot/main/\n'
)

# Write Dockerfile if it doesn't exist
dockerfile_path = os.path.join(self.folder, 'Dockerfile')
if os.path.exists(dockerfile_path):
logger.info(f"Dockerfile already exists at {dockerfile_path}, skipping creation.")
logger.info(f'Dockerfile already exists at {dockerfile_path}, skipping creation.')
return
with open(dockerfile_path, 'w') as dockerfile:
dockerfile.write(dockerfile_content)

logger.info(f"Created Dockerfile at {dockerfile_path}")
logger.info(f'Created Dockerfile at {dockerfile_path}')

def _ensure_clarifai_requirement(self):
"""Ensure clarifai is in requirements.txt with proper version."""
Expand Down
10 changes: 9 additions & 1 deletion clarifai/runners/pipelines/codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,15 @@ def generate_step_directory(step_definition, output_dir: str, user_id: str, app_
'app_id': app_id,
},
'pipeline_step_input_params': step_definition.get_input_params(),
'build_info': {'python_version': step_definition.python_version},
'build_info': {
k: v
for k, v in [
('python_version', step_definition.python_version),
('base_image', step_definition.base_image),
('platform', step_definition.platform),
]
if v is not None
},
'pipeline_step_compute_info': MessageToDict(
step_definition.compute, preserving_proto_field_name=True
),
Expand Down
15 changes: 10 additions & 5 deletions clarifai/runners/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ def __init__(
visibility: str = 'PRIVATE',
):
user_id, app_id = self._resolve_from_context(user_id, app_id)
if not user_id or not app_id:
raise ValueError(
"Pipeline(...) needs user_id and app_id. Pass them explicitly, "
"or run `clarifai login` to set them in your CLI context."
)
self.id = id
self.user_id = user_id
self.app_id = app_id
Expand Down Expand Up @@ -112,6 +107,14 @@ def _generate_task_name(self, step_id: str) -> str:
suffix += 1
return candidate

def _validate_identity(self):
"""Raise if user_id/app_id are still unresolved at the time of use."""
if not self.user_id or not self.app_id:
raise ValueError(
"Pipeline(...) needs user_id and app_id. Pass them explicitly, "
"set --user_id/--app_id on the CLI, or run `clarifai login`."
)

def validate(self):
nodes_by_name = {node.name: node for node in self.nodes}
for node in self.nodes:
Expand Down Expand Up @@ -247,6 +250,7 @@ def to_config(self) -> Dict[str, Any]:
return config

def generate(self, output_dir: str) -> str:
self._validate_identity()
os.makedirs(output_dir, exist_ok=True)
step_definitions = OrderedDict()
for node in self.nodes:
Expand All @@ -262,6 +266,7 @@ def generate(self, output_dir: str) -> str:
return config_path

def upload(self, no_lockfile: bool = False) -> Optional[str]:
self._validate_identity()
from clarifai.runners.pipelines.pipeline_builder import PipelineBuilder

with tempfile.TemporaryDirectory(prefix='clarifai-pipeline-') as temp_dir:
Expand Down
8 changes: 8 additions & 0 deletions clarifai/runners/pipelines/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ def __init__(
assets=None,
compute: Optional[ComputeInfo] = None,
python_version: str = '3.12',
base_image: Optional[str] = None,
platform: Optional[str] = None,
secrets: Optional[Dict[str, str]] = None,
):
self.func = func
Expand All @@ -106,6 +108,8 @@ def __init__(
self.assets = assets or []
self.compute = compute or ComputeInfo()
self.python_version = python_version
self.base_image = base_image
self.platform = platform
self.secrets = secrets or {}
self.signature = inspect.signature(func)

Expand Down Expand Up @@ -215,6 +219,8 @@ def step(
assets=None,
compute: Optional[ComputeInfo] = None,
python_version: str = '3.12',
base_image: Optional[str] = None,
platform: Optional[str] = None,
secrets: Optional[Dict[str, str]] = None,
):
def decorator(func: Callable[..., Any]) -> StepDefinition:
Expand All @@ -225,6 +231,8 @@ def decorator(func: Callable[..., Any]) -> StepDefinition:
assets=assets,
compute=compute,
python_version=python_version,
base_image=base_image,
platform=platform,
secrets=secrets,
)

Expand Down
22 changes: 19 additions & 3 deletions tests/cli/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import yaml
from click.testing import CliRunner

from clarifai.cli.pipeline import compile as compile_command
from clarifai.cli.pipeline import init, run, upload
from clarifai.cli.pipeline_template import info, list_templates
from clarifai.runners.pipelines.pipeline_builder import (
Expand Down Expand Up @@ -384,6 +385,21 @@ def test_cli_upload_help(self):
assert result.exit_code == 0
assert "Upload a pipeline with associated pipeline steps" in result.output
assert "PATH" in result.output
assert '--user_id' in result.output
assert '--app_id' in result.output
assert '--user-id' not in result.output
assert '--app-id' not in result.output

def test_cli_compile_help_uses_underscore_identity_flags(self):
"""Test compile help uses the existing underscore flag convention."""
runner = CliRunner()
result = runner.invoke(compile_command, ['--help'])

assert result.exit_code == 0
assert '--user_id' in result.output
assert '--app_id' in result.output
assert '--user-id' not in result.output
assert '--app-id' not in result.output

def test_cli_upload_missing_config(self):
"""Test CLI upload with missing config file."""
Expand Down Expand Up @@ -2115,7 +2131,7 @@ def test_list_command_requires_app_id(self):
ctx_obj.current.api_base = 'https://api.clarifai.com'

# Import here to avoid circular imports in testing
from clarifai.cli.pipeline import list as list_command
from clarifai.cli.pipeline import list_pipelines as list_command

result = runner.invoke(
list_command,
Expand Down Expand Up @@ -2152,7 +2168,7 @@ def test_list_command_success_with_app_id(self, mock_display, mock_app_class, mo
ctx_obj.current.api_base = 'https://api.clarifai.com'

# Import here to avoid circular imports in testing
from clarifai.cli.pipeline import list as list_command
from clarifai.cli.pipeline import list_pipelines as list_command

result = runner.invoke(
list_command,
Expand Down Expand Up @@ -2185,7 +2201,7 @@ def test_list_command_default_parameters(self, mock_validate):
ctx_obj.current.api_base = 'https://api.clarifai.com'

# Import here to avoid circular imports in testing
from clarifai.cli.pipeline import list as list_command
from clarifai.cli.pipeline import list_pipelines as list_command

with patch('clarifai.client.app.App') as mock_app_class:
mock_app_instance = Mock()
Expand Down
25 changes: 25 additions & 0 deletions tests/cli/test_pipeline_dsl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def test_generate_python_pipeline_file_writes_output(tmp_path: Path):
with patch('clarifai.runners.pipelines.load_pipeline_from_file') as mock_loader:
mock_pipeline = Mock()
mock_pipeline.generate.return_value = str(output_dir / 'config.yaml')
mock_pipeline.nodes = [] # no managed steps → no Dockerfiles expected
mock_loader.return_value = mock_pipeline

result = runner.invoke(compile, [str(pipeline_file), '--output-dir', str(output_dir)])
Expand Down Expand Up @@ -68,3 +69,27 @@ def test_generate_real_example_pipeline_writes_mixed_step_config(tmp_path: Path)
assert (output_dir / 'prepare-text' / '1' / 'text_utils.py').exists()
assert not (output_dir / 'summarize').exists()
assert not (output_dir / 'classify-sentiment').exists()
# compile must also generate Dockerfiles for locally managed steps
assert (output_dir / 'prepare-text' / 'Dockerfile').exists()
assert (output_dir / 'assemble-report' / 'Dockerfile').exists()


def test_compile_generates_dockerfiles_for_managed_steps(tmp_path: Path):
"""compile writes a Dockerfile next to each locally managed step directory."""
repo_root = Path(__file__).resolve().parents[2]
pipeline_file = repo_root / 'examples' / 'pipeline_dsl_text_pipeline.py'
output_dir = tmp_path / 'compiled'
runner = CliRunner()

result = runner.invoke(compile, [str(pipeline_file), '--output-dir', str(output_dir)])

assert result.exit_code == 0, result.output
for step_id in ('prepare-text', 'assemble-report'):
dockerfile = output_dir / step_id / 'Dockerfile'
assert dockerfile.exists(), f'Dockerfile missing for step {step_id!r}'
content = dockerfile.read_text(encoding='utf-8')
assert 'FROM ' in content
assert 'COPY --link=true 1 /home/nonroot/main/1' in content
# Pre-existing (non-managed) steps must NOT get a Dockerfile.
assert not (output_dir / 'summarize').exists()
assert not (output_dir / 'classify-sentiment').exists()
Loading
Loading