diff --git a/clarifai/cli/pipeline.py b/clarifai/cli/pipeline.py index 1cfe2e29..3aebeba1 100644 --- a/clarifai/cli/pipeline.py +++ b/clarifai/cli/pipeline.py @@ -225,6 +225,19 @@ def compile(path, output_dir): help='Upload local code to an ephemeral dev pipeline before running. ' 'Only changed steps are re-uploaded.', ) +@click.option( + '--local', + is_flag=True, + default=False, + help='Run the pipeline locally on a Minikube/K3s/kind cluster. ' + 'Bypasses the Clarifai API and submits directly to the local K8s cluster via Argo Workflows.', +) +@click.option( + '--namespace', + 'local_namespace', + default='clarifai-local', + help='Kubernetes namespace for local pipeline runs. Default: clarifai-local.', +) @click.pass_context def run( ctx, @@ -248,6 +261,8 @@ def run( override_params, overrides_file, dev, + local, + local_namespace, ): """Run a pipeline and monitor its progress. @@ -256,7 +271,34 @@ def run( \tWhen provided, config precedence is config-lock.yaml > config.yaml. The --config option is accepted for backwards compatibility but PATH is preferred. + + \bWith --local, the pipeline runs directly on a local K8s cluster + (Minikube/K3s/kind) via Argo Workflows, bypassing the Clarifai API. + Requires: Docker, kubectl, and Argo Workflows installed on the cluster. """ + # Local execution path — bypass the Clarifai API entirely + if local: + from clarifai.runners.pipelines.local.runner import run_local_pipeline + + run_path = path or os.getcwd() + if not os.path.isdir(run_path): + raise click.ClickException(f'--local requires a pipeline directory, got: {run_path}') + + pat = ctx.obj.current.pat if ctx.obj and ctx.obj.current else None + api_base = ctx.obj.current.api_base if ctx.obj and ctx.obj.current else None + + phase = run_local_pipeline( + pipeline_dir=run_path, + namespace=local_namespace, + pat=pat, + api_base=api_base, + timeout=timeout, + poll_interval=monitor_interval, + ) + if phase != 'Succeeded': + sys.exit(1) + return + import json from clarifai.client.pipeline import Pipeline diff --git a/clarifai/runners/pipelines/local/__init__.py b/clarifai/runners/pipelines/local/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/clarifai/runners/pipelines/local/image_loader.py b/clarifai/runners/pipelines/local/image_loader.py new file mode 100644 index 00000000..baf9eb37 --- /dev/null +++ b/clarifai/runners/pipelines/local/image_loader.py @@ -0,0 +1,92 @@ +"""Build Docker images for pipeline steps and load them into the local K8s cluster.""" + +import os +import subprocess + +from clarifai.runners.pipeline_steps.pipeline_run_locally import PipelineStepRunLocally +from clarifai.utils.logging import logger + + +def _image_name_for_step(step_id, step_path): + """Generate a deterministic local image name for a pipeline step.""" + manager = PipelineStepRunLocally(step_path) + # Ensure Dockerfile exists before computing hash + manager.builder.create_dockerfile() + tag = manager._docker_hash() + return f'clarifai-local/{step_id}:{tag}' + + +def build_step_image(step_path): + """Build a Docker image for a single pipeline step. + + Returns the image name (tag). + """ + manager = PipelineStepRunLocally(step_path) + step_id = manager.config['pipeline_step']['id'].lower() + image_name = _image_name_for_step(step_id, step_path) + + if manager.docker_image_exists(image_name): + logger.info(f'Image {image_name} already exists, skipping build.') + else: + logger.info(f'Building Docker image {image_name} ...') + manager.build_docker_image(image_name=image_name) + + return image_name + + +def load_image_into_cluster(image_name, cluster_type): + """Load a Docker image into the local K8s cluster.""" + logger.info(f'Loading image {image_name} into {cluster_type} cluster ...') + + if cluster_type == 'minikube': + # Use docker save | minikube image load - for reliability + result = subprocess.run( + f'docker save {image_name} | minikube image load --daemon=false -', + shell=True, + capture_output=True, + text=True, + ) + elif cluster_type == 'k3d': + result = subprocess.run( + ['k3d', 'image', 'import', image_name], + capture_output=True, + text=True, + ) + elif cluster_type == 'kind': + result = subprocess.run( + ['kind', 'load', 'docker-image', image_name], + capture_output=True, + text=True, + ) + else: + logger.warning( + f'Unknown cluster type {cluster_type}. Assuming image is accessible via Docker daemon.' + ) + return + + if result.returncode != 0: + raise RuntimeError( + f'Failed to load image {image_name} into {cluster_type}: {result.stderr}' + ) + logger.info(f'Loaded {image_name} into cluster.') + + +def build_and_load_all_steps(pipeline_dir, step_directories, cluster_type): + """Build and load images for all pipeline steps. + + Returns a dict mapping step_id -> image_name. + """ + step_images = {} + for step_dir_name in step_directories: + step_path = os.path.join(pipeline_dir, step_dir_name) + if not os.path.isdir(step_path): + raise FileNotFoundError(f'Step directory not found: {step_path}') + + manager = PipelineStepRunLocally(step_path) + step_id = manager.config['pipeline_step']['id'] + + image_name = build_step_image(step_path) + load_image_into_cluster(image_name, cluster_type) + step_images[step_id] = image_name + + return step_images diff --git a/clarifai/runners/pipelines/local/log_streamer.py b/clarifai/runners/pipelines/local/log_streamer.py new file mode 100644 index 00000000..809932b2 --- /dev/null +++ b/clarifai/runners/pipelines/local/log_streamer.py @@ -0,0 +1,147 @@ +"""Stream pod logs from an Argo Workflow running on the local K8s cluster.""" + +import subprocess +import sys +import threading +import time + +from clarifai.utils.logging import logger + +_TERMINAL_PHASES = frozenset({'Succeeded', 'Failed', 'Error'}) + + +def _get_workflow_status(wf_name, namespace): + """Get the current phase of an Argo Workflow.""" + result = subprocess.run( + [ + 'kubectl', + 'get', + 'workflow', + wf_name, + '-n', + namespace, + '-o', + 'jsonpath={.status.phase}', + ], + capture_output=True, + text=True, + ) + return result.stdout.strip() if result.returncode == 0 else '' + + +def _get_workflow_pods(wf_name, namespace): + """Get pod names associated with a workflow, mapped by node name.""" + result = subprocess.run( + [ + 'kubectl', + 'get', + 'pods', + '-n', + namespace, + '-l', + f'workflows.argoproj.io/workflow={wf_name}', + '-o', + 'jsonpath={range .items[*]}{.metadata.name}{"\\t"}{.metadata.labels.workflows\\.argoproj\\.io/node-name}{"\\n"}{end}', + ], + capture_output=True, + text=True, + ) + pods = {} + for line in result.stdout.strip().split('\n'): + parts = line.strip().split('\t') + if len(parts) >= 2 and parts[0]: + pods[parts[0]] = parts[1] + elif len(parts) == 1 and parts[0]: + pods[parts[0]] = parts[0] + return pods + + +def _stream_pod_logs(pod_name, display_name, namespace): + """Stream logs from a single pod to stdout with a prefix.""" + # Wait for the pod's main container to be ready + for _ in range(30): + check = subprocess.run( + ['kubectl', 'wait', '--for=condition=Ready', 'pod', pod_name, + '-n', namespace, '--timeout=5s'], + capture_output=True, text=True, + ) + if check.returncode == 0: + break + # Also check if the pod already completed + phase_check = subprocess.run( + ['kubectl', 'get', 'pod', pod_name, '-n', namespace, + '-o', 'jsonpath={.status.phase}'], + capture_output=True, text=True, + ) + if phase_check.stdout.strip() in ('Succeeded', 'Failed'): + break + time.sleep(1) + + try: + process = subprocess.Popen( + [ + 'kubectl', + 'logs', + '-f', + pod_name, + '-c', + 'main', + '-n', + namespace, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + for line in process.stdout: + print(f'[{display_name}] {line}', end='', file=sys.stdout, flush=True) + process.wait() + except Exception as e: + logger.debug(f'Log streaming ended for {pod_name}: {e}') + + +def stream_workflow_logs(wf_name, namespace='clarifai-local', poll_interval=3, timeout=3600): + """Monitor a workflow and stream pod logs in real-time. + + Polls for new pods and streams their logs in background threads. + Blocks until the workflow reaches a terminal phase or times out. + + Returns the final workflow phase (e.g. 'Succeeded', 'Failed'). + """ + logger.info(f'Monitoring workflow {wf_name} in namespace {namespace} ...') + + seen_pods = set() + log_threads = [] + start_time = time.time() + + while True: + elapsed = time.time() - start_time + if elapsed > timeout: + logger.error(f'Timeout waiting for workflow {wf_name} after {timeout}s.') + return 'Timeout' + + # Check for new pods and start log streaming + pods = _get_workflow_pods(wf_name, namespace) + for pod_name, node_name in pods.items(): + if pod_name not in seen_pods: + seen_pods.add(pod_name) + # Use the node name (step name) as the display prefix + display_name = node_name or pod_name + logger.info(f'Streaming logs from pod {pod_name} ({display_name}) ...') + thread = threading.Thread( + target=_stream_pod_logs, + args=(pod_name, display_name, namespace), + daemon=True, + ) + thread.start() + log_threads.append(thread) + + # Check workflow status + phase = _get_workflow_status(wf_name, namespace) + if phase in _TERMINAL_PHASES: + # Give log threads a moment to flush + for thread in log_threads: + thread.join(timeout=5) + return phase + + time.sleep(poll_interval) diff --git a/clarifai/runners/pipelines/local/preflight.py b/clarifai/runners/pipelines/local/preflight.py new file mode 100644 index 00000000..9c4cc94a --- /dev/null +++ b/clarifai/runners/pipelines/local/preflight.py @@ -0,0 +1,108 @@ +"""Preflight checks for local pipeline execution. + +Validates that the local K8s cluster and Argo Workflows are ready. +""" + +import shutil +import subprocess + +from clarifai.utils.logging import logger + +ARGO_WORKFLOW_CRD = 'workflows.argoproj.io' + + +def check_docker(): + """Verify Docker is installed and running.""" + if not shutil.which('docker'): + raise EnvironmentError('Docker is not installed or not on PATH.') + result = subprocess.run( + ['docker', 'info'], + capture_output=True, + text=True, + ) + if result.returncode != 0: + raise EnvironmentError('Docker daemon is not running. Please start Docker.') + logger.info('Docker is running.') + + +def check_kubectl(): + """Verify kubectl is available and a cluster is reachable.""" + if not shutil.which('kubectl'): + raise EnvironmentError('kubectl is not installed or not on PATH.') + result = subprocess.run( + ['kubectl', 'cluster-info'], + capture_output=True, + text=True, + ) + if result.returncode != 0: + raise EnvironmentError( + 'Cannot connect to Kubernetes cluster. Is minikube/k3s running?\n' + f' stderr: {result.stderr.strip()}' + ) + logger.info('Kubernetes cluster is reachable.') + + +def check_argo_crds(): + """Verify Argo Workflow CRDs are registered in the cluster.""" + result = subprocess.run( + ['kubectl', 'get', 'crd', ARGO_WORKFLOW_CRD], + capture_output=True, + text=True, + ) + if result.returncode != 0: + raise EnvironmentError( + 'Argo Workflows CRDs not found in cluster.\n' + 'Install Argo Workflows first:\n' + ' kubectl create namespace argo\n' + ' kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.5.12/quick-start-minimal.yaml' + ) + logger.info('Argo Workflows CRDs found.') + + +def detect_cluster_type(): + """Detect the local K8s cluster type for image loading strategy. + + Returns one of: 'minikube', 'k3d', 'kind', or 'generic'. + """ + result = subprocess.run( + ['kubectl', 'config', 'current-context'], + capture_output=True, + text=True, + ) + context = result.stdout.strip() if result.returncode == 0 else '' + + if 'minikube' in context: + return 'minikube' + if 'k3d' in context: + return 'k3d' + if 'kind' in context: + return 'kind' + return 'generic' + + +def ensure_namespace(namespace): + """Create the namespace if it doesn't exist.""" + result = subprocess.run( + ['kubectl', 'get', 'namespace', namespace], + capture_output=True, + text=True, + ) + if result.returncode != 0: + subprocess.run( + ['kubectl', 'create', 'namespace', namespace], + check=True, + capture_output=True, + text=True, + ) + logger.info(f'Created namespace {namespace}.') + + +def run_all_checks(namespace='clarifai-local'): + """Run all preflight checks. Returns the detected cluster type.""" + check_docker() + check_kubectl() + check_argo_crds() + ensure_namespace(namespace) + cluster_type = detect_cluster_type() + logger.info(f'Detected cluster type: {cluster_type}') + return cluster_type diff --git a/clarifai/runners/pipelines/local/runner.py b/clarifai/runners/pipelines/local/runner.py new file mode 100644 index 00000000..8056b2cf --- /dev/null +++ b/clarifai/runners/pipelines/local/runner.py @@ -0,0 +1,115 @@ +"""Orchestrate a full local pipeline run. + +Ties together preflight checks, image building, spec adaptation, +secret creation, workflow submission, and log streaming. +""" + +import os +from typing import Dict, Optional + +import yaml + +from clarifai.runners.pipelines.local.image_loader import build_and_load_all_steps +from clarifai.runners.pipelines.local.log_streamer import stream_workflow_logs +from clarifai.runners.pipelines.local.preflight import run_all_checks +from clarifai.runners.pipelines.local.secrets import ( + SECRET_NAME, + build_env_vars, + create_env_secret, +) +from clarifai.runners.pipelines.local.spec_adapter import adapt_spec_for_local, load_argo_spec_from_config +from clarifai.runners.pipelines.local.submitter import submit_workflow +from clarifai.utils.logging import logger + + +def run_local_pipeline( + pipeline_dir: str, + namespace: str = 'clarifai-local', + pat: Optional[str] = None, + api_base: Optional[str] = None, + timeout: int = 3600, + poll_interval: int = 3, +): + """Execute a pipeline locally on the current K8s cluster. + + Args: + pipeline_dir: Path to the pipeline directory containing config.yaml and step subdirectories. + namespace: K8s namespace to use. + pat: Clarifai PAT for step env vars. Falls back to CLARIFAI_PAT env var. + api_base: Clarifai API base URL. + timeout: Max wait time in seconds. + poll_interval: Seconds between status polls. + + Returns: + Final workflow phase string (e.g. 'Succeeded', 'Failed'). + """ + pipeline_dir = os.path.abspath(pipeline_dir) + + # Load pipeline config + config_path = _resolve_config(pipeline_dir) + with open(config_path, 'r') as f: + config_data = yaml.safe_load(f) + + pipeline_config = config_data.get('pipeline', config_data) + step_directories = pipeline_config.get('step_directories', []) + user_id = pipeline_config.get('user_id', '') + app_id = pipeline_config.get('app_id', '') + + if not step_directories: + raise ValueError('No step_directories found in pipeline config.') + + # 1. Preflight checks + logger.info('Running preflight checks ...') + cluster_type = run_all_checks(namespace=namespace) + + # 2. Build and load step images + logger.info('Building and loading step images ...') + step_images = build_and_load_all_steps(pipeline_dir, step_directories, cluster_type) + + # 3. Parse Argo spec from config + argo_spec = load_argo_spec_from_config(config_data) + if not argo_spec: + raise ValueError('No orchestration_spec.argo_orchestration_spec found in pipeline config.') + + # 4. Create K8s secret with env vars + env_vars = build_env_vars(pat=pat, api_base=api_base, user_id=user_id, app_id=app_id) + secret_name = None + if env_vars: + secret_name = create_env_secret(namespace, env_vars) + + # 5. Adapt spec for local execution + adapted_spec = adapt_spec_for_local( + argo_spec, + step_images, + namespace=namespace, + env_secret_name=secret_name, + ) + + # 6. Submit workflow + wf_name = submit_workflow(adapted_spec, namespace=namespace) + + # 7. Stream logs and wait for completion + phase = stream_workflow_logs( + wf_name, + namespace=namespace, + poll_interval=poll_interval, + timeout=timeout, + ) + + if phase == 'Succeeded': + logger.info(f'Pipeline completed successfully!') + else: + logger.error(f'Pipeline finished with phase: {phase}') + + return phase + + +def _resolve_config(pipeline_dir): + """Find the config file in a pipeline directory.""" + for name in ('config-lock.yaml', 'config.yaml'): + path = os.path.join(pipeline_dir, name) + if os.path.isfile(path): + return path + raise FileNotFoundError( + f'No config.yaml or config-lock.yaml found in {pipeline_dir}' + ) diff --git a/clarifai/runners/pipelines/local/secrets.py b/clarifai/runners/pipelines/local/secrets.py new file mode 100644 index 00000000..76cae4bc --- /dev/null +++ b/clarifai/runners/pipelines/local/secrets.py @@ -0,0 +1,57 @@ +"""Create K8s Secrets for pipeline step environment variables.""" + +import json +import subprocess + +from clarifai.utils.logging import logger + +SECRET_NAME = 'clarifai-local-pipeline-env' + + +def create_env_secret(namespace, env_vars, secret_name=SECRET_NAME): + """Create or update a K8s Secret with the given environment variables. + + Args: + namespace: K8s namespace. + env_vars: Dict of env var name -> value. + secret_name: Name of the K8s Secret. + """ + # Delete existing secret if present (idempotent) + subprocess.run( + ['kubectl', 'delete', 'secret', secret_name, '-n', namespace, '--ignore-not-found'], + capture_output=True, + text=True, + ) + + # Build --from-literal args + cmd = ['kubectl', 'create', 'secret', 'generic', secret_name, '-n', namespace] + for key, value in env_vars.items(): + cmd.append(f'--from-literal={key}={value}') + + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + raise RuntimeError(f'Failed to create secret {secret_name}: {result.stderr}') + + logger.info(f'Created K8s secret {secret_name} in namespace {namespace}.') + return secret_name + + +def build_env_vars(pat=None, api_base=None, user_id=None, app_id=None): + """Build the standard Clarifai env vars dict for pipeline steps.""" + import os + + env = {} + + pat = pat or os.environ.get('CLARIFAI_PAT', '') + if pat: + env['CLARIFAI_PAT'] = pat + + api_base = api_base or os.environ.get('CLARIFAI_API_BASE', 'https://api.clarifai.com') + env['CLARIFAI_API_BASE'] = api_base + + if user_id: + env['CLARIFAI_USER_ID'] = user_id + if app_id: + env['CLARIFAI_APP_ID'] = app_id + + return env diff --git a/clarifai/runners/pipelines/local/spec_adapter.py b/clarifai/runners/pipelines/local/spec_adapter.py new file mode 100644 index 00000000..67506196 --- /dev/null +++ b/clarifai/runners/pipelines/local/spec_adapter.py @@ -0,0 +1,169 @@ +"""Adapt an Argo Workflow spec for local execution. + +Converts remote templateRef-based specs into self-contained inline templates. +""" + +import copy +import os +from typing import Any, Dict, List, Optional + +import yaml + +from clarifai.utils.logging import logger + + +def _build_inline_template(step_id, image_name, input_params): + """Build an inline Argo template for a pipeline step. + + The step runs ``python /home/nonroot/main/1/pipeline_step.py`` with + argparse-style arguments derived from input parameters. + """ + args = [] + for param in input_params: + name = param['name'] + args.extend([f'--{name}', f'{{{{inputs.parameters.{name}}}}}']) + + template = { + 'name': step_id, + 'inputs': { + 'parameters': [{'name': p['name']} for p in input_params], + }, + 'outputs': { + 'parameters': [ + { + 'name': 'result', + 'valueFrom': {'path': '/tmp/result'}, + 'globalName': f'{step_id}-result', + } + ], + }, + 'container': { + 'image': image_name, + 'imagePullPolicy': 'Never', + 'command': ['python', '/home/nonroot/main/1/pipeline_step.py'], + 'args': args, + }, + } + + if not input_params: + del template['inputs'] + template['container']['args'] = [] + + return template + + +def _extract_step_id_from_template_ref(template_ref_name): + """Extract step_id from a templateRef name like 'users/X/apps/Y/pipeline_steps/Z'.""" + parts = template_ref_name.split('/') + # Pattern: users/{user}/apps/{app}/pipeline_steps/{step_id}[/versions/{ver}] + try: + idx = parts.index('pipeline_steps') + return parts[idx + 1] + except (ValueError, IndexError): + return template_ref_name + + +def _collect_step_params_from_spec(argo_spec): + """Collect parameter names per step from the workflow spec's step arguments.""" + step_params = {} + templates = argo_spec.get('spec', {}).get('templates', []) + for template in templates: + for step_group in template.get('steps', []): + for step_entry in step_group: + ref = step_entry.get('templateRef', {}) + ref_name = ref.get('name', '') + step_id = _extract_step_id_from_template_ref(ref_name) + + params = [] + for p in step_entry.get('arguments', {}).get('parameters', []): + params.append({'name': p['name']}) + step_params[step_id] = params + return step_params + + +def adapt_spec_for_local( + argo_spec, + step_images, + namespace='clarifai-local', + env_secret_name=None, +): + """Transform a remote Argo Workflow spec for local execution. + + - Replaces ``templateRef`` entries with references to inline templates + - Adds inline container templates for each step + - Sets ``imagePullPolicy: Never`` + - Removes cloud-specific affinity/tolerations + - Adds metadata (name, namespace, labels) + + Args: + argo_spec: The parsed Argo Workflow spec dict. + step_images: Dict mapping step_id -> local Docker image name. + namespace: K8s namespace to run in. + env_secret_name: Optional K8s Secret name to inject as envFrom. + + Returns: + The adapted workflow spec dict, ready for K8s submission. + """ + spec = copy.deepcopy(argo_spec) + + # Collect parameter info from step arguments in the spec + step_params = _collect_step_params_from_spec(spec) + + # Build inline templates for each step + inline_templates = [] + for step_id, image_name in step_images.items(): + params = step_params.get(step_id, []) + tmpl = _build_inline_template(step_id, image_name, params) + + # Inject env secret if provided + if env_secret_name: + tmpl['container']['envFrom'] = [ + {'secretRef': {'name': env_secret_name}}, + ] + + inline_templates.append(tmpl) + + # Replace templateRef with template in step entries + for template in spec.get('spec', {}).get('templates', []): + for step_group in template.get('steps', []): + for step_entry in step_group: + if 'templateRef' in step_entry: + ref_name = step_entry['templateRef'].get('name', '') + step_id = _extract_step_id_from_template_ref(ref_name) + del step_entry['templateRef'] + step_entry['template'] = step_id + + # Add inline templates + spec['spec']['templates'].extend(inline_templates) + + # Remove cloud-specific scheduling constraints + for key in ('affinity', 'tolerations', 'nodeSelector'): + spec['spec'].pop(key, None) + + # Set metadata + entrypoint = spec['spec'].get('entrypoint', 'pipeline') + wf_name = f'local-{entrypoint}'[:63] # K8s name limit + spec.setdefault('metadata', {}) + spec['metadata']['name'] = wf_name + spec['metadata']['namespace'] = namespace + spec['metadata'].setdefault('labels', {}) + spec['metadata']['labels']['clarifai.com/local-run'] = 'true' + + # Ensure generateName isn't set (conflicts with name) + spec['metadata'].pop('generateName', None) + + # Set service account for Argo + spec['spec'].setdefault('serviceAccountName', 'argo') + + return spec + + +def load_argo_spec_from_config(config_data): + """Extract and parse the Argo orchestration spec from a pipeline config dict.""" + pipeline_config = config_data.get('pipeline', config_data) + orch_spec = pipeline_config.get('orchestration_spec', {}) + argo_spec_raw = orch_spec.get('argo_orchestration_spec', '') + + if isinstance(argo_spec_raw, str): + return yaml.safe_load(argo_spec_raw) + return argo_spec_raw diff --git a/clarifai/runners/pipelines/local/submitter.py b/clarifai/runners/pipelines/local/submitter.py new file mode 100644 index 00000000..f294e340 --- /dev/null +++ b/clarifai/runners/pipelines/local/submitter.py @@ -0,0 +1,40 @@ +"""Submit an Argo Workflow to the local K8s cluster.""" + +import json +import subprocess + +from clarifai.utils.logging import logger + + +def submit_workflow(workflow_spec, namespace='clarifai-local'): + """Submit an Argo Workflow CR to the local K8s cluster via kubectl. + + Args: + workflow_spec: The full Argo Workflow dict (with apiVersion, kind, metadata, spec). + namespace: K8s namespace. + + Returns: + The workflow name. + """ + spec_json = json.dumps(workflow_spec) + + # Delete existing workflow with the same name if present (for re-runs) + wf_name = workflow_spec.get('metadata', {}).get('name', '') + if wf_name: + subprocess.run( + ['kubectl', 'delete', 'workflow', wf_name, '-n', namespace, '--ignore-not-found'], + capture_output=True, + text=True, + ) + + result = subprocess.run( + ['kubectl', 'apply', '-n', namespace, '-f', '-'], + input=spec_json, + capture_output=True, + text=True, + ) + if result.returncode != 0: + raise RuntimeError(f'Failed to submit Argo Workflow: {result.stderr}') + + logger.info(f'Submitted Argo Workflow: {wf_name}') + return wf_name