From 254c5cfd9147f2417236cfc6dc3bc7df9bfd54ec Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Wed, 20 May 2026 11:34:10 +0200 Subject: [PATCH 1/3] Add agentic human-in-the-loop pipeline example Adds examples/agentic_hitl_pipeline/, a runnable dynamic pipeline that plans agent tasks, fans them out with step.map, summarizes the results, pauses on zenml.wait for human approval, and branches the final action on the decision. Step outputs are named with Annotated for clean lineage. Adds cross-links from docs/book to the new example: - getting-started/your-first-ai-pipeline.md: include in Related examples - how-to/steps-pipelines/dynamic_pipelines.md: link from map-reduce - how-to/steps-pipelines/wait_resume.md: new Agentic approval section Co-Authored-By: Claude Opus 4.7 (1M context) --- .../getting-started/your-first-ai-pipeline.md | 1 + .../steps-pipelines/dynamic_pipelines.md | 4 + .../how-to/steps-pipelines/wait_resume.md | 55 ++++++ examples/agentic_hitl_pipeline/LICENSE | 6 + examples/agentic_hitl_pipeline/README.md | 184 ++++++++++++++++++ .../agentic_hitl_pipeline/requirements.txt | 2 + examples/agentic_hitl_pipeline/run.py | 133 +++++++++++++ 7 files changed, 385 insertions(+) create mode 100644 examples/agentic_hitl_pipeline/LICENSE create mode 100644 examples/agentic_hitl_pipeline/README.md create mode 100644 examples/agentic_hitl_pipeline/requirements.txt create mode 100644 examples/agentic_hitl_pipeline/run.py diff --git a/docs/book/getting-started/your-first-ai-pipeline.md b/docs/book/getting-started/your-first-ai-pipeline.md index 3159c8042a8..92e9821478c 100644 --- a/docs/book/getting-started/your-first-ai-pipeline.md +++ b/docs/book/getting-started/your-first-ai-pipeline.md @@ -92,6 +92,7 @@ Then follow the guide in [`examples/deploying_agent`](https://github.com/zenml-i ### Related examples - **[agent_outer_loop](https://github.com/zenml-io/zenml/tree/main/examples/agent_outer_loop)**: Combine ML classifiers with agents for hybrid intelligent systems +- **[agentic_hitl_pipeline](https://github.com/zenml-io/zenml/tree/main/examples/agentic_hitl_pipeline)**: Add dynamic fan-out and human approval to an agent workflow - **[agent_comparison](https://github.com/zenml-io/zenml/tree/main/examples/agent_comparison)**: Compare different agent architectures and LLM providers - **[agent_framework_integrations](https://github.com/zenml-io/zenml/tree/main/examples/agent_framework_integrations)**: Integrate with popular agent frameworks - **[llm_finetuning](https://github.com/zenml-io/zenml/tree/main/examples/llm_finetuning)**: Fine-tune LLMs for specialized tasks diff --git a/docs/book/how-to/steps-pipelines/dynamic_pipelines.md b/docs/book/how-to/steps-pipelines/dynamic_pipelines.md index b5088a3369f..43ddab689a2 100644 --- a/docs/book/how-to/steps-pipelines/dynamic_pipelines.md +++ b/docs/book/how-to/steps-pipelines/dynamic_pipelines.md @@ -148,6 +148,10 @@ def map_reduce(): reducer(results) # pass list of artifacts directly ``` +For a complete agentic workflow that combines dynamic mapping, reduction, and a +human approval gate, see the +[`agentic_hitl_pipeline` example](https://github.com/zenml-io/zenml/tree/main/examples/agentic_hitl_pipeline). + Key points: - `step.map(...)` fans out a step over sequence-like inputs. These inputs can be either - a single list-like output artifact (see the code sample above) diff --git a/docs/book/how-to/steps-pipelines/wait_resume.md b/docs/book/how-to/steps-pipelines/wait_resume.md index c9205fc6237..6300f88651d 100644 --- a/docs/book/how-to/steps-pipelines/wait_resume.md +++ b/docs/book/how-to/steps-pipelines/wait_resume.md @@ -79,6 +79,61 @@ def deployment_pipeline() -> None: print(config.environment, config.replicas, config.notify_slack) ``` +## Agentic approval workflow + +`wait(...)` is useful when an agentic workflow should prepare a recommendation +but a human should approve the final action. The example below plans multiple +agent tasks, runs them with dynamic mapping, summarizes the results, and then +continues only if the approval wait condition resolves to `true`. + +```python +from zenml import pipeline, step, wait + + +@step +def plan_agent_tasks(goal: str) -> list[dict[str, str]]: + return [ + {"task_id": "research", "instruction": f"Research {goal}"}, + {"task_id": "draft", "instruction": f"Draft a plan for {goal}"}, + {"task_id": "risk_check", "instruction": f"Review risks for {goal}"}, + ] + + +@step +def execute_agent_task(task: dict[str, str]) -> dict[str, str]: + return {"task_id": task["task_id"], "result": "completed"} + + +@step +def summarize_agent_work(results: list[dict[str, str]]) -> list[dict[str, str]]: + return results + + +@step +def take_final_action(summary: list[dict[str, str]]) -> None: + print(f"Acting on {len(summary)} reviewed tasks.") + + +@pipeline(dynamic=True) +def agentic_approval_pipeline(goal: str) -> None: + tasks = plan_agent_tasks(goal=goal) + results = execute_agent_task.map(task=tasks) + summary = summarize_agent_work(results) + approved = wait( + schema=bool, + question="Approve the agent recommendation and continue?", + metadata={"goal": goal}, + name="human_approval", + ) + + if approved: + take_final_action(summary) +``` + +For a runnable version that also logs metadata and returns tabular artifacts, +see the +[`agentic_hitl_pipeline` example](https://github.com/zenml-io/zenml/tree/main/examples/agentic_hitl_pipeline). + ## Timeouts and pausing `wait(...)` accepts a `timeout` (default: 600 seconds). When the timeout diff --git a/examples/agentic_hitl_pipeline/LICENSE b/examples/agentic_hitl_pipeline/LICENSE new file mode 100644 index 00000000000..7c4d97fda6f --- /dev/null +++ b/examples/agentic_hitl_pipeline/LICENSE @@ -0,0 +1,6 @@ +Apache License +Version 2.0, January 2004 +http://www.apache.org/licenses/ + +This example is part of the ZenML project and is licensed under the Apache +License, Version 2.0. See the root repository LICENSE file for the full terms. diff --git a/examples/agentic_hitl_pipeline/README.md b/examples/agentic_hitl_pipeline/README.md new file mode 100644 index 00000000000..91acf7658b1 --- /dev/null +++ b/examples/agentic_hitl_pipeline/README.md @@ -0,0 +1,184 @@ +# Agentic Human-in-the-Loop Pipeline + +Pause an agentic ZenML pipeline for human approval before it takes a final action, using `zenml.wait(...)` inside a dynamic pipeline. + +**ZenML version**: 0.94+ (Python 3.10+) + +## ๐ŸŽฏ What You'll Learn + +- Build a `@pipeline(dynamic=True)` that drives runtime control flow with regular Python +- Fan out mapped work with `step.map(...)` and reduce the results +- Pause a run with `zenml.wait(...)` and branch on the human decision +- Surface a concise approval prompt + a structured report on the dashboard +- Name step outputs with `Annotated` for clean lineage + +## ๐Ÿƒ Quickstart + +```bash +pip install -r requirements.txt +zenml init +python run.py +``` + +When the run pauses, open the dashboard URL printed in the logs and resolve the +`human_approval` wait condition (Continue / Abort). + +Pass a custom goal: + +```bash +python run.py --goal "prepare a production rollout plan" +``` + +## ๐Ÿ“‹ Prerequisites + +- Python 3.10 or higher +- A ZenML server (`zenml login --local` for local mode, or `zenml login ` for ZenML Pro) + +## ๐Ÿ—๏ธ What's Inside + +``` +๐Ÿ“ agentic_hitl_pipeline/ +โ”œโ”€โ”€ run.py - Dynamic pipeline + step definitions +โ”œโ”€โ”€ requirements.txt +โ””โ”€โ”€ README.md +``` + +Four steps, one dynamic pipeline: + +- `plan_agent_tasks` โ†’ returns the task list (`agent_tasks`) +- `execute_agent_task` โ†’ mapped over the task list, returns a per-task trace (`task_trace`) +- `summarize_agent_work` โ†’ reduces mapped outputs into a single summary (`agent_summary`) +- `finalize_decision` โ†’ runs after approval resolves; deploys on `True`, records rejection on `False` (`decision_record`) + +## ๐Ÿ”‘ Key Concepts + +### Dynamic fan-out + reduce + +The planning step returns a list of tasks; `step.map(...)` runs `execute_agent_task` once per task in parallel, then `summarize_agent_work` reduces the mapped outputs: + +```python +tasks = plan_agent_tasks(goal=goal) +task_results = execute_agent_task.map(task=tasks) +summary = summarize_agent_work(task_results) +``` + +### Artifact naming with `Annotated` + +Every step output is named so lineage and dashboards stay readable: + +```python +from typing import Annotated + +@step +def summarize_agent_work( + results: list[pd.DataFrame], +) -> Annotated[pd.DataFrame, "agent_summary"]: + return pd.concat(results, ignore_index=True) +``` + +### Human approval with `zenml.wait(...)` + +`wait()` pauses the dynamic pipeline until an external actor resolves the condition. The `question` is a short prompt shown on the wait card; rich context goes into `metadata`, which the dashboard exposes on the Metadata tab. + +```python +report = summary.load() +avg_confidence = round(float(report["confidence"].mean()), 3) + +approved = wait( + schema=bool, + question=( + f"Approve and deploy? {len(report)} tasks, " + f"avg confidence {avg_confidence}." + ), + metadata={ + "goal": goal, + "tasks_completed": len(report), + "avg_confidence": avg_confidence, + "report": report.to_dict(orient="records"), + }, + name="human_approval", +) + +finalize_decision(summary=summary, approved=approved) +``` + +If using the Kubernetes orchestrator, `wait(...)` doesn't just block โ€” once the timeout elapses and the tree quiesces, ZenML transitions the run to `PAUSED` and **tears down the orchestration pod** so it stops consuming cluster resources while waiting on the human. When the wait condition is resolved (via UI or CLI), the run is rehydrated and continues from the wait point. This makes long-running approvals practically free. + +You can resolve the wait condition from the CLI as well: + +```bash +zenml pipeline runs wait-conditions resolve --run --interactive +``` + +### Branching on the decision + +`finalize_decision` always runs after the wait resolves; it inspects `approved` and either deploys or records the rejection: + +```python +@step +def finalize_decision( + summary: pd.DataFrame, approved: bool +) -> Annotated[pd.DataFrame, "decision_record"]: + if approved: + action, status = "deploy_agent_recommendation", "completed" + else: + action, status = "skip_agent_recommendation", "rejected" + return pd.DataFrame([{"action": action, "status": status, "reviewed_tasks": len(summary)}]) +``` + +Aborting the wait via the dashboard is handled by the framework โ€” it terminates the run; you don't catch it in user code. + +### Isolated task execution + +`execute_agent_task` uses `runtime="isolated"` so each mapped task can run in its own environment when the orchestrator supports it (e.g. Kubernetes). On the local orchestrator, ZenML logs a warning and falls back to inline execution โ€” useful for trying the example without a remote stack. + +```python +@step(runtime="isolated") +def execute_agent_task( + task: dict[str, str], +) -> Annotated[pd.DataFrame, "task_trace"]: + ... +``` + +## ๐Ÿš€ Run the Example + +1. **Install dependencies** + ```bash + pip install -r requirements.txt + ``` + +2. **Initialize ZenML in this directory** + ```bash + zenml init + ``` + +3. **Run the pipeline** + ```bash + python run.py + ``` + + The logs print a `Dashboard URL for Pipeline Run: ...` line โ€” open it. The run will execute through `summarize_agent_work` and pause on the `human_approval` wait card. + +4. **Resolve the wait condition** + - **Dashboard**: toggle the `Value` switch (True approves and deploys, False rejects), then click **Continue** โ€” or click **Abort** to terminate the run. + - **CLI** (alternative): + ```bash + zenml pipeline runs wait-conditions resolve --run --interactive + ``` + +5. **Inspect the artifacts** in the dashboard โ€” `agent_tasks`, `task_trace` (one per mapped task), `agent_summary`, and `decision_record` are all named outputs you can drill into. + +## ๐Ÿงช Customization Ideas + +- **Real agents**: Replace the placeholder logic in `execute_agent_task` with calls to your actual agent (LangChain, your own LLM client, etc.). Keep the typed I/O so the rest of the pipeline doesn't change. +- **Structured decisions**: Swap `schema=bool` for a Pydantic model so the human can return richer input โ€” e.g. `DeploymentConfig(environment, replicas, notify_slack)` โ€” and have `finalize_decision` consume it directly. +- **Per-task approval**: Move the `wait(...)` *inside* the map (one wait per task) instead of after `summarize_agent_work`, so the human approves each task before it acts. +- **Run on Kubernetes**: Set the active stack to a Kubernetes orchestrator and observe the pod actually being torn down at the wait point and rehydrated on resume. +- **Add a timeout policy**: Set `timeout=` on `wait(...)` and decide what `finalize_decision` should do if approval times out (e.g. auto-reject). + +## ๐Ÿ“š Learn More + +- [Dynamic pipelines](https://docs.zenml.io/how-to/steps-pipelines/dynamic_pipelines) +- [Wait for external input and resume](https://docs.zenml.io/how-to/steps-pipelines/wait_resume) +- [Artifact management with `Annotated`](https://docs.zenml.io/concepts/artifacts) +- [Kubernetes orchestrator](https://docs.zenml.io/stack-components/orchestrators/kubernetes) diff --git a/examples/agentic_hitl_pipeline/requirements.txt b/examples/agentic_hitl_pipeline/requirements.txt new file mode 100644 index 00000000000..eddfd2f2b20 --- /dev/null +++ b/examples/agentic_hitl_pipeline/requirements.txt @@ -0,0 +1,2 @@ +pandas>=2.0 +zenml diff --git a/examples/agentic_hitl_pipeline/run.py b/examples/agentic_hitl_pipeline/run.py new file mode 100644 index 00000000000..6da8426c9a7 --- /dev/null +++ b/examples/agentic_hitl_pipeline/run.py @@ -0,0 +1,133 @@ +"""Agentic human-in-the-loop workflow using ZenML dynamic pipelines.""" + +import argparse +from typing import Annotated + +import pandas as pd + +from zenml import pipeline, step, wait +from zenml.config import DockerSettings + + +@step +def plan_agent_tasks( + goal: str, +) -> Annotated[list[dict[str, str]], "agent_tasks"]: + """Create the task list that the agent workflow will execute.""" + return [ + { + "task_id": "research", + "instruction": f"Research constraints for: {goal}", + "tool": "search", + }, + { + "task_id": "draft", + "instruction": f"Draft a candidate plan for: {goal}", + "tool": "planner", + }, + { + "task_id": "risk_check", + "instruction": f"Identify risks before acting on: {goal}", + "tool": "reviewer", + }, + ] + + +@step(runtime="isolated") +def execute_agent_task( + task: dict[str, str], +) -> Annotated[pd.DataFrame, "task_trace"]: + """Execute one planned task and return a tabular trace.""" + return pd.DataFrame( + [ + { + "task_id": task["task_id"], + "tool": task["tool"], + "instruction": task["instruction"], + "result": f"Completed {task['task_id']} with {task['tool']}", + "confidence": 0.82 + if task["task_id"] == "risk_check" + else 0.9, + } + ] + ) + + +@step +def summarize_agent_work( + results: list[pd.DataFrame], +) -> Annotated[pd.DataFrame, "agent_summary"]: + """Combine mapped task results into a single summary.""" + return pd.concat(results, ignore_index=True) + + +@step +def finalize_decision( + summary: pd.DataFrame, approved: bool +) -> Annotated[pd.DataFrame, "decision_record"]: + """Act on the human decision: deploy when approved, record rejection otherwise.""" + if approved: + action, status = "deploy_agent_recommendation", "completed" + else: + action, status = "skip_agent_recommendation", "rejected" + return pd.DataFrame( + [ + { + "action": action, + "status": status, + "reviewed_tasks": len(summary), + } + ] + ) + + +docker = DockerSettings( + requirements=[ + "pandas>=2.0", + ], +) + + +@pipeline(dynamic=True, enable_cache=False, settings={"docker": docker}) +def agentic_hitl_pipeline( + goal: str = "prepare an agent workflow for production", +) -> None: + """Fan out agent tasks, pause for approval, then conditionally act.""" + tasks = plan_agent_tasks(goal=goal) + task_results = execute_agent_task.map(task=tasks) + summary = summarize_agent_work(task_results) + + report = summary.load() + avg_confidence = round(float(report["confidence"].mean()), 3) + approved = wait( + schema=bool, + question=( + f"Approve and deploy? {len(report)} tasks, " + f"avg confidence {avg_confidence}." + ), + metadata={ + "goal": goal, + "tasks_completed": len(report), + "avg_confidence": avg_confidence, + "report": report.to_dict(orient="records"), + }, + name="human_approval", + ) + + finalize_decision(summary=summary, approved=approved) + + +def main() -> None: + """Run the example pipeline.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--goal", + default="prepare an agent workflow for production", + help="Goal used to plan the agent tasks.", + ) + args = parser.parse_args() + agentic_hitl_pipeline(goal=args.goal) + + +if __name__ == "__main__": + main() From 1453218c73723fb66f436ad40f06fa6a8674b957 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Wed, 20 May 2026 11:36:36 +0200 Subject: [PATCH 2/3] removed livecense --- examples/agentic_hitl_pipeline/LICENSE | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 examples/agentic_hitl_pipeline/LICENSE diff --git a/examples/agentic_hitl_pipeline/LICENSE b/examples/agentic_hitl_pipeline/LICENSE deleted file mode 100644 index 7c4d97fda6f..00000000000 --- a/examples/agentic_hitl_pipeline/LICENSE +++ /dev/null @@ -1,6 +0,0 @@ -Apache License -Version 2.0, January 2004 -http://www.apache.org/licenses/ - -This example is part of the ZenML project and is licensed under the Apache -License, Version 2.0. See the root repository LICENSE file for the full terms. From 467f4bb276455dc99fa37b757c730f72abbda071 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Wed, 20 May 2026 11:43:26 +0200 Subject: [PATCH 3/3] Linting --- examples/agentic_hitl_pipeline/run.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/agentic_hitl_pipeline/run.py b/examples/agentic_hitl_pipeline/run.py index 6da8426c9a7..a37e6570ca2 100644 --- a/examples/agentic_hitl_pipeline/run.py +++ b/examples/agentic_hitl_pipeline/run.py @@ -45,9 +45,7 @@ def execute_agent_task( "tool": task["tool"], "instruction": task["instruction"], "result": f"Completed {task['task_id']} with {task['tool']}", - "confidence": 0.82 - if task["task_id"] == "risk_check" - else 0.9, + "confidence": 0.82 if task["task_id"] == "risk_check" else 0.9, } ] )