Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions clarifai/cli/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,19 @@ def compile(path, output_dir):
help='Upload local code to an ephemeral dev pipeline before running. '
'Only changed steps are re-uploaded.',
)
@click.option(
'--local',
is_flag=True,
default=False,
help='Run the pipeline locally on a Minikube/K3s/kind cluster. '
'Bypasses the Clarifai API and submits directly to the local K8s cluster via Argo Workflows.',
)
@click.option(
'--namespace',
'local_namespace',
default='clarifai-local',
help='Kubernetes namespace for local pipeline runs. Default: clarifai-local.',
)
@click.pass_context
def run(
ctx,
Expand All @@ -248,6 +261,8 @@ def run(
override_params,
overrides_file,
dev,
local,
local_namespace,
):
"""Run a pipeline and monitor its progress.

Expand All @@ -256,7 +271,34 @@ def run(
\tWhen provided, config precedence is config-lock.yaml > config.yaml.
The --config option is accepted for backwards compatibility but PATH
is preferred.

\bWith --local, the pipeline runs directly on a local K8s cluster
(Minikube/K3s/kind) via Argo Workflows, bypassing the Clarifai API.
Requires: Docker, kubectl, and Argo Workflows installed on the cluster.
"""
# Local execution path — bypass the Clarifai API entirely
if local:
from clarifai.runners.pipelines.local.runner import run_local_pipeline

run_path = path or os.getcwd()
if not os.path.isdir(run_path):
raise click.ClickException(f'--local requires a pipeline directory, got: {run_path}')

pat = ctx.obj.current.pat if ctx.obj and ctx.obj.current else None
api_base = ctx.obj.current.api_base if ctx.obj and ctx.obj.current else None

phase = run_local_pipeline(
pipeline_dir=run_path,
namespace=local_namespace,
pat=pat,
api_base=api_base,
timeout=timeout,
poll_interval=monitor_interval,
)
if phase != 'Succeeded':
sys.exit(1)
return

import json

from clarifai.client.pipeline import Pipeline
Expand Down
Empty file.
92 changes: 92 additions & 0 deletions clarifai/runners/pipelines/local/image_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Build Docker images for pipeline steps and load them into the local K8s cluster."""

import os
import subprocess

from clarifai.runners.pipeline_steps.pipeline_run_locally import PipelineStepRunLocally
from clarifai.utils.logging import logger


def _image_name_for_step(step_id, step_path):
"""Generate a deterministic local image name for a pipeline step."""
manager = PipelineStepRunLocally(step_path)
# Ensure Dockerfile exists before computing hash
manager.builder.create_dockerfile()
tag = manager._docker_hash()
return f'clarifai-local/{step_id}:{tag}'


def build_step_image(step_path):
"""Build a Docker image for a single pipeline step.

Returns the image name (tag).
"""
manager = PipelineStepRunLocally(step_path)
step_id = manager.config['pipeline_step']['id'].lower()
image_name = _image_name_for_step(step_id, step_path)

if manager.docker_image_exists(image_name):
logger.info(f'Image {image_name} already exists, skipping build.')
else:
logger.info(f'Building Docker image {image_name} ...')
manager.build_docker_image(image_name=image_name)

return image_name


def load_image_into_cluster(image_name, cluster_type):
"""Load a Docker image into the local K8s cluster."""
logger.info(f'Loading image {image_name} into {cluster_type} cluster ...')

if cluster_type == 'minikube':
# Use docker save | minikube image load - for reliability
result = subprocess.run(
f'docker save {image_name} | minikube image load --daemon=false -',
shell=True,
capture_output=True,
text=True,
)
elif cluster_type == 'k3d':
result = subprocess.run(
['k3d', 'image', 'import', image_name],
capture_output=True,
text=True,
)
elif cluster_type == 'kind':
result = subprocess.run(
['kind', 'load', 'docker-image', image_name],
capture_output=True,
text=True,
)
else:
logger.warning(
f'Unknown cluster type {cluster_type}. Assuming image is accessible via Docker daemon.'
)
return

if result.returncode != 0:
raise RuntimeError(
f'Failed to load image {image_name} into {cluster_type}: {result.stderr}'
)
logger.info(f'Loaded {image_name} into cluster.')


def build_and_load_all_steps(pipeline_dir, step_directories, cluster_type):
"""Build and load images for all pipeline steps.

Returns a dict mapping step_id -> image_name.
"""
step_images = {}
for step_dir_name in step_directories:
step_path = os.path.join(pipeline_dir, step_dir_name)
if not os.path.isdir(step_path):
raise FileNotFoundError(f'Step directory not found: {step_path}')

manager = PipelineStepRunLocally(step_path)
step_id = manager.config['pipeline_step']['id']

image_name = build_step_image(step_path)
load_image_into_cluster(image_name, cluster_type)
step_images[step_id] = image_name

return step_images
147 changes: 147 additions & 0 deletions clarifai/runners/pipelines/local/log_streamer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""Stream pod logs from an Argo Workflow running on the local K8s cluster."""

import subprocess
import sys
import threading
import time

from clarifai.utils.logging import logger

_TERMINAL_PHASES = frozenset({'Succeeded', 'Failed', 'Error'})


def _get_workflow_status(wf_name, namespace):
"""Get the current phase of an Argo Workflow."""
result = subprocess.run(
[
'kubectl',
'get',
'workflow',
wf_name,
'-n',
namespace,
'-o',
'jsonpath={.status.phase}',
],
capture_output=True,
text=True,
)
return result.stdout.strip() if result.returncode == 0 else ''


def _get_workflow_pods(wf_name, namespace):
"""Get pod names associated with a workflow, mapped by node name."""
result = subprocess.run(
[
'kubectl',
'get',
'pods',
'-n',
namespace,
'-l',
f'workflows.argoproj.io/workflow={wf_name}',
'-o',
'jsonpath={range .items[*]}{.metadata.name}{"\\t"}{.metadata.labels.workflows\\.argoproj\\.io/node-name}{"\\n"}{end}',
],
capture_output=True,
text=True,
)
pods = {}
for line in result.stdout.strip().split('\n'):
parts = line.strip().split('\t')
if len(parts) >= 2 and parts[0]:
pods[parts[0]] = parts[1]
elif len(parts) == 1 and parts[0]:
pods[parts[0]] = parts[0]
return pods


def _stream_pod_logs(pod_name, display_name, namespace):
"""Stream logs from a single pod to stdout with a prefix."""
# Wait for the pod's main container to be ready
for _ in range(30):
check = subprocess.run(
['kubectl', 'wait', '--for=condition=Ready', 'pod', pod_name,
'-n', namespace, '--timeout=5s'],
capture_output=True, text=True,
)
if check.returncode == 0:
break
# Also check if the pod already completed
phase_check = subprocess.run(
['kubectl', 'get', 'pod', pod_name, '-n', namespace,
'-o', 'jsonpath={.status.phase}'],
capture_output=True, text=True,
)
if phase_check.stdout.strip() in ('Succeeded', 'Failed'):
break
time.sleep(1)

try:
process = subprocess.Popen(
[
'kubectl',
'logs',
'-f',
pod_name,
'-c',
'main',
'-n',
namespace,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
for line in process.stdout:
print(f'[{display_name}] {line}', end='', file=sys.stdout, flush=True)
process.wait()
except Exception as e:
logger.debug(f'Log streaming ended for {pod_name}: {e}')


def stream_workflow_logs(wf_name, namespace='clarifai-local', poll_interval=3, timeout=3600):
"""Monitor a workflow and stream pod logs in real-time.

Polls for new pods and streams their logs in background threads.
Blocks until the workflow reaches a terminal phase or times out.

Returns the final workflow phase (e.g. 'Succeeded', 'Failed').
"""
logger.info(f'Monitoring workflow {wf_name} in namespace {namespace} ...')

seen_pods = set()
log_threads = []
start_time = time.time()

while True:
elapsed = time.time() - start_time
if elapsed > timeout:
logger.error(f'Timeout waiting for workflow {wf_name} after {timeout}s.')
return 'Timeout'

# Check for new pods and start log streaming
pods = _get_workflow_pods(wf_name, namespace)
for pod_name, node_name in pods.items():
if pod_name not in seen_pods:
seen_pods.add(pod_name)
# Use the node name (step name) as the display prefix
display_name = node_name or pod_name
logger.info(f'Streaming logs from pod {pod_name} ({display_name}) ...')
thread = threading.Thread(
target=_stream_pod_logs,
args=(pod_name, display_name, namespace),
daemon=True,
)
thread.start()
log_threads.append(thread)

# Check workflow status
phase = _get_workflow_status(wf_name, namespace)
if phase in _TERMINAL_PHASES:
# Give log threads a moment to flush
for thread in log_threads:
thread.join(timeout=5)
return phase

time.sleep(poll_interval)
Loading
Loading