|
| 1 | +# Durable Research Agent — Crash-Resilient Demo |
| 2 | + |
| 3 | +This sample demonstrates a **long-running research agent** that survives process |
| 4 | +crashes and automatically resumes from its last checkpoint. It uses the |
| 5 | +`@task` decorator from `azure-ai-agentserver-core` to provide built-in |
| 6 | +crash resilience without any manual state management. |
| 7 | + |
| 8 | +## What it showcases |
| 9 | + |
| 10 | +1. **12-stage deep research pipeline** — each stage is a distinct LLM call with real-time token streaming |
| 11 | +2. **Crash resilience** — send `{"message": "crash"}` to kill the process; the supervisor |
| 12 | + restarts it, and the task resumes from its last checkpoint |
| 13 | +3. **Fire-and-forget POST** — `POST /invocations` dispatches the task and returns 202 immediately |
| 14 | +4. **GET streaming with resume** — `GET /invocations/{id}?last_event_id=N` streams SSE, skipping already-seen events |
| 15 | +5. **Cancel support** — `POST /invocations/{id}/cancel` stops the task gracefully |
| 16 | +6. **File-backed streaming** — stream items persist to disk for replay after crashes |
| 17 | + |
| 18 | +## Architecture |
| 19 | + |
| 20 | +``` |
| 21 | +┌────────────────────────────────────────────────────────────┐ |
| 22 | +│ Hosted Agent Sandbox (port 8088) │ |
| 23 | +│ │ |
| 24 | +│ supervisor.py (PID 1 — always responds to /readiness) │ |
| 25 | +│ └── python app.py (port 8089, restarted on crash) │ |
| 26 | +│ │ |
| 27 | +│ POST /invocations (fire-and-forget) │ |
| 28 | +│ ├── {"message": "crash"} → 202, then exit 💥 │ |
| 29 | +│ └── {"message": "<topic>"} → │ |
| 30 | +│ deep_research.start() → 202 JSON response │ |
| 31 | +│ { invocation_id, session_id, task_id, status } │ |
| 32 | +│ │ |
| 33 | +│ GET /invocations/{id}?last_event_id=N │ |
| 34 | +│ └── Streams SSE from active task (skips first N events) │ |
| 35 | +│ or replays from persisted file │ |
| 36 | +│ │ |
| 37 | +│ POST /invocations/{id}/cancel │ |
| 38 | +│ └── Signals cancellation to running task │ |
| 39 | +│ │ |
| 40 | +│ Local disk: ~/.durable-tasks/ (persists across restarts) │ |
| 41 | +└────────────────────────────────────────────────────────────┘ |
| 42 | +``` |
| 43 | + |
| 44 | +## Prerequisites |
| 45 | + |
| 46 | +- Python 3.11+ |
| 47 | +- Azure subscription with AI Foundry access |
| 48 | +- [Azure Developer CLI (azd)](https://learn.microsoft.com/azure/developer/azure-developer-cli/install-azd) |
| 49 | +- `azd` AI agents extension: `azd extension install azure.ai.agents` |
| 50 | + |
| 51 | +## Quick Start (Deploy to Foundry) |
| 52 | + |
| 53 | +```bash |
| 54 | +# 1. Build wheels (included in Docker image) |
| 55 | +./build.sh |
| 56 | + |
| 57 | +# 2. Login and deploy |
| 58 | +azd auth login |
| 59 | +azd up |
| 60 | +``` |
| 61 | + |
| 62 | +## Demo Script — Crash Recovery & Reconnection |
| 63 | + |
| 64 | +This walkthrough demonstrates the full durability story. Total time: ~3 minutes. |
| 65 | + |
| 66 | +### Quick Demo (recommended) |
| 67 | + |
| 68 | +Use the included `demo-client.sh` which handles token refresh, session sharing, |
| 69 | +auto-reconnection, and event resumption: |
| 70 | + |
| 71 | +```bash |
| 72 | +# Terminal 1 — start research (auto-reconnects after crashes) |
| 73 | +./demo-client.sh start "quantum computing" |
| 74 | + |
| 75 | +# Terminal 2 — crash the agent while it's running |
| 76 | +./demo-client.sh crash |
| 77 | + |
| 78 | +# Watch Terminal 1 auto-reconnect and resume from where it left off! |
| 79 | +# Crash again, as many times as you want: |
| 80 | +./demo-client.sh crash |
| 81 | + |
| 82 | +# Terminal 3 — stream container logs (optional) |
| 83 | +./demo-client.sh logs |
| 84 | + |
| 85 | +# Or cancel: |
| 86 | +./demo-client.sh cancel |
| 87 | + |
| 88 | +# Reset session to start fresh: |
| 89 | +./demo-client.sh reset |
| 90 | +``` |
| 91 | + |
| 92 | +### How it works (client flow) |
| 93 | + |
| 94 | +1. **POST** `/invocations?agent_session_id=X` → returns 202 with `invocation_id` |
| 95 | +2. **GET** `/invocations/{inv_id}` → streams SSE events (`id: N\ndata: {...}\n\n`) |
| 96 | +3. Client tracks `last_event_id` (the `id:` field of the last received event) |
| 97 | +4. On disconnect (crash): **POST** same session → new `invocation_id` → **GET** with `?last_event_id=N` |
| 98 | +5. Server skips first N events → client sees only new content from the recovery point |
| 99 | + |
| 100 | +### Manual Demo (curl) |
| 101 | + |
| 102 | +```bash |
| 103 | +# Get access token |
| 104 | +TOKEN=$(az account get-access-token --resource https://ai.azure.com --query accessToken -o tsv) |
| 105 | + |
| 106 | +# Endpoint |
| 107 | +ENDPOINT="https://e2e-tests-westus2-account.services.ai.azure.com/api/projects/e2e-tests-westus2/agents/durable-research-agent/endpoint/protocols" |
| 108 | + |
| 109 | +# Generate a unique session ID (reuse across all calls in this demo) |
| 110 | +SESSION_ID="demo-$(uuidgen | tr '[:upper:]' '[:lower:]')" |
| 111 | +echo "Session: $SESSION_ID" |
| 112 | +``` |
| 113 | + |
| 114 | +### Step 1: Start the research task (fire-and-forget) |
| 115 | + |
| 116 | +```bash |
| 117 | +# POST dispatches the task and returns immediately with IDs |
| 118 | +curl -s -X POST "${ENDPOINT}/invocations?api-version=2025-11-15-preview&agent_session_id=${SESSION_ID}" \ |
| 119 | + -H "Authorization: Bearer $TOKEN" \ |
| 120 | + -H "Content-Type: application/json" \ |
| 121 | + -d '{"message": "Research the history and future of quantum computing"}' |
| 122 | +``` |
| 123 | + |
| 124 | +Response (202): |
| 125 | +```json |
| 126 | +{"status": "started", "invocation_id": "inv_abc123...", "session_id": "demo-..."} |
| 127 | +``` |
| 128 | + |
| 129 | +Save the invocation ID: |
| 130 | +```bash |
| 131 | +INV_ID="inv_abc123..." # from response above |
| 132 | +``` |
| 133 | + |
| 134 | +### Step 2: Stream results via GET |
| 135 | + |
| 136 | +```bash |
| 137 | +curl -N -X GET "${ENDPOINT}/invocations/${INV_ID}?api-version=2025-11-15-preview" \ |
| 138 | + -H "Authorization: Bearer $TOKEN" |
| 139 | +``` |
| 140 | + |
| 141 | +You'll see SSE events with sequential IDs: |
| 142 | +``` |
| 143 | +id: 1 |
| 144 | +data: {"type": "token", "content": "\n\n**[Stage 1/12]** Decomposing topic...\n"} |
| 145 | +
|
| 146 | +id: 2 |
| 147 | +data: {"type": "token", "content": "Quantum"} |
| 148 | +
|
| 149 | +id: 3 |
| 150 | +data: {"type": "token", "content": " computing"} |
| 151 | +... |
| 152 | +``` |
| 153 | + |
| 154 | +### Step 3: Crash the agent! 💥 |
| 155 | + |
| 156 | +While the research is running, send a crash trigger (same session): |
| 157 | + |
| 158 | +```bash |
| 159 | +curl -s -X POST "${ENDPOINT}/invocations?api-version=2025-11-15-preview&agent_session_id=${SESSION_ID}" \ |
| 160 | + -H "Authorization: Bearer $TOKEN" \ |
| 161 | + -H "Content-Type: application/json" \ |
| 162 | + -d '{"message": "crash"}' |
| 163 | +``` |
| 164 | + |
| 165 | +Response (202): |
| 166 | +```json |
| 167 | +{"status": "crashing", "message": "💥 Process will crash now"} |
| 168 | +``` |
| 169 | + |
| 170 | +The process exits. The supervisor immediately restarts it and recovers the task. |
| 171 | + |
| 172 | +### Step 4: Reconnect with resume |
| 173 | + |
| 174 | +Wait ~10 seconds, then POST again to get a new invocation ID, and GET with `last_event_id`: |
| 175 | + |
| 176 | +```bash |
| 177 | +# Get new invocation ID (task is already in progress) |
| 178 | +NEW_RESPONSE=$(curl -s -X POST "${ENDPOINT}/invocations?api-version=2025-11-15-preview&agent_session_id=${SESSION_ID}" \ |
| 179 | + -H "Authorization: Bearer $TOKEN" \ |
| 180 | + -H "Content-Type: application/json" \ |
| 181 | + -d '{"message": "quantum computing"}') |
| 182 | +NEW_INV_ID=$(echo "$NEW_RESPONSE" | python3 -c "import sys,json; print(json.loads(sys.stdin.read())['invocation_id'])") |
| 183 | + |
| 184 | +# Resume from where we left off (e.g., last_event_id=370) |
| 185 | +curl -N -X GET "${ENDPOINT}/invocations/${NEW_INV_ID}?api-version=2025-11-15-preview&last_event_id=370" \ |
| 186 | + -H "Authorization: Bearer $TOKEN" |
| 187 | +``` |
| 188 | + |
| 189 | +You'll see only NEW events (stages after the crash): |
| 190 | +``` |
| 191 | +id: 371 |
| 192 | +data: {"type": "token", "content": "\n\n⚡ **Recovered from crash!** Resuming from stage 5/12...\n\n"} |
| 193 | +
|
| 194 | +id: 372 |
| 195 | +data: {"type": "token", "content": "\n\n**[Stage 5/12]** Examining competing theories...\n"} |
| 196 | +... |
| 197 | +``` |
| 198 | + |
| 199 | +### Step 5: Cancel the task (optional) |
| 200 | + |
| 201 | +```bash |
| 202 | +curl -X POST "${ENDPOINT}/invocations/${NEW_INV_ID}/cancel?api-version=2025-11-15-preview" \ |
| 203 | + -H "Authorization: Bearer $TOKEN" \ |
| 204 | + -H "Content-Type: application/json" \ |
| 205 | + -d '{}' |
| 206 | +``` |
| 207 | + |
| 208 | +Response: |
| 209 | +```json |
| 210 | +{"status": "cancelled", "message": "Task cancellation requested."} |
| 211 | +``` |
| 212 | + |
| 213 | +## Container Logs |
| 214 | + |
| 215 | +Stream real-time container logs (stdout/stderr) in a separate terminal: |
| 216 | + |
| 217 | +```bash |
| 218 | +# Via demo-client.sh (uses session from .demo-session file) |
| 219 | +./demo-client.sh logs |
| 220 | + |
| 221 | +# Or directly via azd: |
| 222 | +azd ai agent monitor --session-id <session-id> --follow |
| 223 | + |
| 224 | +# Recent logs (last 20 lines): |
| 225 | +azd ai agent monitor --tail 20 |
| 226 | + |
| 227 | +# System events (container start/stop): |
| 228 | +azd ai agent monitor --type system |
| 229 | +``` |
| 230 | + |
| 231 | +## How it works |
| 232 | + |
| 233 | +The `@task` decorator provides: |
| 234 | + |
| 235 | +- **Automatic persistence** — task state is checkpointed after each stage via |
| 236 | + `ctx.metadata.flush()` |
| 237 | +- **Crash recovery** — on startup, stale (in-flight) tasks are automatically |
| 238 | + detected by lease owner and re-executed, with `ctx.metadata` containing all |
| 239 | + previously saved progress |
| 240 | +- **Entry mode awareness** — `ctx.entry_mode` tells the function why it was |
| 241 | + called: `"fresh"`, `"resumed"`, or `"recovered"` |
| 242 | +- **File-backed streaming** — stream items are persisted to disk via a custom |
| 243 | + `FileStreamHandler` so GET can replay them after a crash |
| 244 | +- **Event IDs** — each SSE event has a sequential `id:` field; clients use |
| 245 | + `last_event_id` query param to skip already-seen events on reconnect |
| 246 | + |
| 247 | +Key code pattern: |
| 248 | +```python |
| 249 | +@task(name="deep_research", stream_handler_factory=file_stream_factory) |
| 250 | +async def deep_research(ctx: TaskContext[dict]) -> dict: |
| 251 | + completed = ctx.metadata.get("completed_stages", 0) |
| 252 | + |
| 253 | + if ctx.entry_mode == "recovered": |
| 254 | + await ctx.stream(json.dumps({"type": "token", "content": "⚡ Recovered!"})) |
| 255 | + |
| 256 | + for i in range(completed, len(STAGES)): |
| 257 | + # Stream LLM tokens in real-time |
| 258 | + async for event in llm_stream: |
| 259 | + await ctx.stream(json.dumps({"type": "token", "content": event.delta})) |
| 260 | + |
| 261 | + # CHECKPOINT — survives crashes |
| 262 | + ctx.metadata["completed_stages"] = i + 1 |
| 263 | + await ctx.metadata.flush() |
| 264 | + |
| 265 | + return final_result |
| 266 | +``` |
| 267 | + |
| 268 | +## Environment Variables |
| 269 | + |
| 270 | +| Variable | Description | Default | |
| 271 | +|---|---|---| |
| 272 | +| `FOUNDRY_PROJECT_ENDPOINT` | AI Foundry project endpoint (set by platform) | Required | |
| 273 | +| `AZURE_AI_MODEL_DEPLOYMENT_NAME` | Model deployment to use | `gpt-4.1-mini` | |
| 274 | +| `FOUNDRY_TASK_API_ENABLED` | Use platform Task Storage (vs local file) | `0` (local) | |
| 275 | +| `STAGE_DURATION` | Seconds between stages (for demo pacing) | `5` | |
| 276 | + |
| 277 | +## File Structure |
| 278 | + |
| 279 | +``` |
| 280 | +durable-agent-demo/ |
| 281 | +├── demo-client.sh # ⭐ Demo client (handles sessions, reconnect, crash) |
| 282 | +├── azure.yaml # azd service config |
| 283 | +├── build.sh # Build local wheels for Docker |
| 284 | +├── infra/ # Bicep templates |
| 285 | +├── src/durable-research-agent/ |
| 286 | +│ ├── agent.py # ⭐ The durable task (12-stage research pipeline) |
| 287 | +│ ├── app.py # HTTP handlers (POST fire-and-forget, GET stream, cancel) |
| 288 | +│ ├── supervisor.py # PID 1 reverse proxy (keeps /readiness alive) |
| 289 | +│ ├── agent.yaml # Agent definition for Foundry |
| 290 | +│ ├── Dockerfile |
| 291 | +│ ├── requirements.txt |
| 292 | +│ └── wheels/ # Local package wheels (built by build.sh) |
| 293 | +└── README.md |
| 294 | +``` |
0 commit comments