Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## [Unreleased]

### Features

* **hitl:** add production-ready Human-in-the-Loop approval gateway for ADK agents — includes `@hitl_tool` decorator, FastAPI approval service with SQLite persistence, ADK 1.x adapter, and reference Streamlit dashboard (`contributing/samples/hitl_approval`)

## [0.4.1](https://github.com/google/adk-python-community/compare/v0.4.0...v0.4.1) (2026-02-18)


Expand Down
91 changes: 91 additions & 0 deletions contributing/samples/hitl_approval/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# ADK HITL Approval Dashboard

A drop-in **production-ready Human-in-the-Loop (HITL) approval middleware** for Google Agent Development Kit (ADK) agents — complete with an API backend and a demo Streamlit dashboard UI.

## The Problem Solved

ADK 1.x ships with an experimental `require_confirmation=True` feature that handles pausing the LLM loop for human verification. However, it is fundamentally built for local debugging and introduces major blockers to an enterprise environment:
1. **Incompatible with Persistent Sessions:** Native confirmations intentionally do not serialize well and will completely fail to resume your agent if you use `DatabaseSessionService`, `SpannerSessionService`, or `VertexAiSessionService` (the mandatory session backends for production deployments).
2. **Single-Agent Limitations:** They silently break across `AgentTool` nested bounds and true multi-agent (A2A) topologies, causing missing events or infinitely looping models.
3. **No Resilient Audit Log:** The Native confirmation tool leaves no easily queryable paper trail linking the human supervisor to a precise LLM request.

*This project is the production implementation of the HITL pattern covered in the [ADK Multi-Agent Patterns Guide (Advent of Agents Day 13)](#).*

## What This Library Provides

This project solves the production gaps by explicitly decoupling the human approval payload from ADK's internal session memory. It introduces a session-agnostic REST API layer using an Adapter pattern.

### The 3-Layer Architecture

```text
┌─────────────────────────────────────────┐
│ Dashboard UI (Streamlit) │ Layer 3: Demo/reference UI
│ Approval inbox, audit log viewer │ (Easily replaced by Zendesk/etc.)
└──────────────────┬──────────────────────┘
┌──────────────────▼──────────────────────┐
│ ApprovalRequest Model (Pydantic) │ Layer 2: Normalised Contract API
│ FastAPI backend + SQLite store │ Session-agnostic persistence
└────────────────┬────────────────────────┘
┌──────────┴───────────┐
┌─────▼──────┐ ┌──────────▼──────┐
│ ADK 1.x │ │ ADK 2.0 │ Layer 1: Adapters
│ Adapter │ │ Adapter │ Only this changes between versions
└────────────┘ └─────────────────┘
```

By retaining HITL state inside an independent FastAPI engine and SQLite database, an active agent can pause safely. When a human supervisor hits "Approve" inside a centralized web portal hours later, the middleware simply posts the decision back into the agent's `/run_sse` stream seamlessly.

## Quick Start (Local Sandbox)

We have provided a demo customer service agent (`credit_agent`) alongside a launch script to test the interaction end-to-end.

1. Create your python virtual environment and sync dependencies using `uv` (requires Python 3.11+):
```bash
uv venv --python "python3.11" ".venv"
source .venv/bin/activate
uv sync --all-extras
```
2. Start the FastAPI backend, Streamlit dashboard, and ADK Live Chat agent all at once:
```bash
./start_servers.sh
```
3. Open `http://localhost:8080` to chat with the agent and ask for a $75 account credit.
4. When the agent pauses and asks for a supervisor, open `http://localhost:8501` to approve or reject the request.

## How to use in your own ADK application

Wrapping an ADK agent with a formal enterprise HITL checkpoint takes under 5 lines of code:

1. Import the `hitl_tool` gateway wrapper.
2. Decorate your function tool.
3. Attach it to your ADK Agent initialization using a standard `FunctionTool`.

```python
from google.adk.tools import FunctionTool
from google.adk_community.tools.hitl.gateway import hitl_tool

# 1. Wrap your function with the decorator
@hitl_tool(agent_name="my_billing_agent")
async def issue_refund(user_id: str, amount: float):
# This block won't execute until explicitly approved inside the FastAPI dashboard
return {"status": "success", "amount_refunded": amount}

# 2. Attach to ADK Agent
root_agent = Agent(
name="my_billing_agent",
tools=[FunctionTool(issue_refund)]
)
```

## Production Integration Strategies

This repository acts as the production baseline for a contact center or enterprise orchestration grid. Once deployed to staging, consider swapping out:
* **Storage Layer:** Replace the local `SQLite` engine in `app/api/store.py` with `PostgreSQL` or `Cloud Spanner`.
* **Proactive Notification:** Hook the FastAPI `POST /approvals/` route into Slack, PagerDuty, or Microsoft Teams to actively ping channels when a high-risk request pops up.
* **Remove Streamlit:** Bypass the Streamlit frontend completely and point your existing support portal interface (like Salesforce Service Cloud) directly to `GET /approvals/pending` and `POST /approvals/{id}/decide`.

## ADK 2.0 Compatibility

This project currently uses ADK 1.x conventions and event triggers. Because it strictly implements an `adapters` layer, all the Pydantic API schemas and Streamlit logic are completely forward-compatible with ADK 2.0 `RequestInput` workflow yielding. You'll simply need to switch the adapter layer translation once ADK 2.0 exits Alpha.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import agent
80 changes: 80 additions & 0 deletions contributing/samples/hitl_approval/credit_agent/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Credit agent — external supervisor HITL demo.

This agent demonstrates the cross-user approval pattern:
- Customer chats in ADK web (:8080)
- Agent wants to apply a credit → submits request to HITL API (:8000)
- Agent blocks (non-blocking async poll) waiting for a decision
- Supervisor opens Streamlit dashboard (:8501), reviews and approves/rejects
- Agent resumes and informs the customer of the outcome

Make sure all three services are running before chatting (see start_servers.sh):
HITL API: uvicorn google.adk_community.services.hitl_approval.api:app --port 8000
Dashboard: streamlit run dashboard/app.py --server.headless true
ADK web: adk web credit_agent/ --port 8080
"""

from __future__ import annotations

import os
import sys

sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../.."))

from google.adk.agents import Agent
from google.adk.tools import FunctionTool

from google.adk_community.tools.hitl.gateway import hitl_tool


@hitl_tool(agent_name="credit_agent")
async def apply_account_credit(account_id: str, amount: float, reason: str) -> dict:
"""Apply a credit to a customer account. Requires supervisor approval.

Args:
account_id: The customer account ID to credit.
amount: Credit amount in USD.
reason: Business justification for the credit.

Returns:
Confirmation with the updated account balance.
"""
# Real implementation would call your billing/CRM API here
return {
"status": "credited",
"account_id": account_id,
"amount_credited": amount,
"new_balance": f"${amount:.2f} credit applied successfully.",
}


root_agent = Agent(
name="credit_agent",
model="gemini-2.5-flash",
description=(
"Customer support agent that can apply account credits. "
"Every credit requires supervisor approval via the HITL dashboard."
),
instruction=(
"You are a customer support agent. When a customer requests an account credit, "
"call apply_account_credit with their account ID, the amount, and the reason. "
"Let them know their request is being reviewed by a supervisor and that you will "
"update them once a decision is made. "
"If the credit is approved, confirm it to the customer. "
"If rejected, apologise and explain that the supervisor did not approve it."
),
tools=[FunctionTool(apply_account_credit)],
)
126 changes: 126 additions & 0 deletions contributing/samples/hitl_approval/dashboard/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Streamlit HITL Approval Dashboard.

Run:
streamlit run contributing/samples/hitl_approval/dashboard/app.py
"""

import httpx
import streamlit as st

API_BASE = "http://localhost:8000"


def _resolve(request_id: str, decision: str, note: str):
try:
r = httpx.post(
f"{API_BASE}/approvals/{request_id}/decide",
json={
"decision": decision,
"reviewer_id": "dashboard_admin",
"notes": note or None,
},
timeout=5,
)
r.raise_for_status()
st.success(f"Request {request_id[:8]}… marked as {decision}.")
st.rerun()
except Exception as e:
st.error(f"Failed to resolve: {e}")


st.set_page_config(page_title="ADK HITL Dashboard", page_icon="🔍", layout="wide")
st.title("ADK HITL Approval Dashboard")

# ── Sidebar filters ───────────────────────────────────────────────────────────

status_filter = st.sidebar.selectbox(
"Filter by status", ["All", "pending", "approved", "rejected", "escalated"]
)

if st.sidebar.button("Refresh"):
st.rerun()

# ── Fetch approvals ───────────────────────────────────────────────────────────

try:
if status_filter == "pending":
resp = httpx.get(f"{API_BASE}/approvals/pending", timeout=5)
else:
params = {}
if status_filter != "All":
params["decision"] = status_filter
resp = httpx.get(f"{API_BASE}/approvals/audit", params=params, timeout=5)

resp.raise_for_status()
requests = resp.json()
except Exception as e:
st.error(f"Could not connect to API: {e}")
st.stop()

# ── Render approval cards ─────────────────────────────────────────────────────

if not requests:
st.info("No approval requests found.")
else:
for req in requests:
status = req["status"]
color = {
"pending": "🟡",
"approved": "🟢",
"rejected": "🔴",
"escalated": "🟠",
}.get(status, "⚪")

with st.expander(
f"{color} [{status.upper()}] {req['tool_name']} — {req['agent_name']} ({req['id'][:8]}…)"
):
col1, col2 = st.columns(2)
col1.markdown(
f"**App:** `{req.get('app_name', 'N/A')}` | **User:** `{req.get('user_id', 'N/A')}`"
)
col1.markdown(f"**Agent:** `{req['agent_name']}`")
col1.markdown(f"**Tool:** `{req['tool_name']}`")
col1.markdown(f"**Session:** `{req['session_id']}`")
col2.markdown(f"**Created:** {req['created_at']}")
if req.get("decided_at"):
col2.markdown(
f"**Resolved:** {req['decided_at']} by `{req.get('decided_by', 'unknown')}`"
)

st.markdown(f"**Message / Hint:**")
st.info(req.get("message", "No message provided."))

st.markdown("**Payload / Arguments:**")
st.json(req.get("payload", {}))

if req.get("decision_notes"):
st.markdown(f"**Reviewer note:** {req['decision_notes']}")

if status == "pending":
note = st.text_input(
"Reviewer note (optional)", key=f"note_{req['id']}"
)
c1, c2, c3 = st.columns(3)

if c1.button("Approve", key=f"approve_{req['id']}", type="primary"):
_resolve(req["id"], "approved", note)

if c2.button("Reject", key=f"reject_{req['id']}"):
_resolve(req["id"], "rejected", note)

if c3.button("Escalate", key=f"escalate_{req['id']}"):
_resolve(req["id"], "escalated", note)
14 changes: 14 additions & 0 deletions contributing/samples/hitl_approval/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Sample-specific dependencies for the HITL Approval demo.
# Install into the repo virtualenv after `uv sync --all-extras`:
#
# uv pip install -r contributing/samples/hitl_approval/requirements.txt
#
# The core package (google-adk-community) and its deps (google-adk, httpx)
# are already installed by `uv sync`. Only the service and dashboard extras
# are listed here.

fastapi>=0.111.0
uvicorn[standard]>=0.30.0
sqlalchemy>=2.0.0
aiosqlite>=0.20.0
streamlit>=1.35.0
42 changes: 42 additions & 0 deletions contributing/samples/hitl_approval/start_servers.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/bin/bash
# Copyright 2026 Google LLC

# 1. Kill any lingering local servers from previous runs to free up ports
killall python uvicorn streamlit adk 2>/dev/null || true
sleep 1

# 2. Ensure we're running from the repo root so imports resolve correctly
cd "$(git rev-parse --show-toplevel)"

# 3. Load GOOGLE_GENAI_API_KEY from .env if present
if [ -f .env ]; then
source .env
fi

echo "Starting FastAPI HITL Backend (:8000)..."
export HITL_DB_PATH="./contributing/samples/hitl_approval/hitl.db"
.venv/bin/uvicorn google.adk_community.services.hitl_approval.api:app --port 8000 &
API_PID=$!

echo "Starting Streamlit Dashboard (:8501)..."
STREAMLIT_BROWSER_GATHER_USAGE_STATS=false \
.venv/bin/streamlit run contributing/samples/hitl_approval/dashboard/app.py \
--server.headless true &
STREAMLIT_PID=$!

echo "Starting ADK Web Chat (:8080)..."
.venv/bin/adk web contributing/samples/hitl_approval --port 8080 &
ADK_PID=$!

echo ""
echo "All services launched."
echo "=========================================="
echo "Backend API: http://localhost:8000/docs"
echo "Dashboard UI: http://localhost:8501"
echo "ADK Agent Chat: http://localhost:8080"
echo "=========================================="
echo "Press Ctrl+C to shut down all servers."

trap "kill $API_PID $STREAMLIT_PID $ADK_PID 2>/dev/null; exit" EXIT

wait
Empty file.
Loading