Skip to content

Commit 7dfbd7a

Browse files
[agentserver] address feedback: tags internal, single shutdown-grace env var, simpler CHANGELOG, simpler README
Per review feedback: 1. **Single shutdown-grace env var.** Removed AGENTSERVER_TASK_MANAGER_SHUTDOWN_GRACE_SECONDS; the framework now reads only AGENTSERVER_SHUTDOWN_GRACE_SECONDS. 2. **Tags is internal.** Removed the public 'tags=' keyword from the @task decorator and Task.options(...). The framework still uses the internal TaskOptions.tags field for source-stamping; developers no longer have a way to set arbitrary tags from the public surface. - Removed: samples/durable_source (only demonstrated the removed tags= public keyword) - Removed: tests/durable/test_callable_factories.py (only tested the removed tags-callable factory feature) - Removed: tests/durable/test_sample_e2e.py:: test_reserved_tag_cannot_be_overridden - Updated: tests/durable/test_decorator.py to remove tags= cases and add 'tags' to the retired-args parametrize list - Updated: docs/durable-task-guide.md to drop the tags row from the @task reference table and the 'tags' mention in the recovery-safe options paragraph 3. **CHANGELOG simplified.** Trimmed the 2.0.0b6 entry to two bullets (durable-task primitive + httpx removal) — end-developer-facing, not a duplicate of the guide. Same treatment for invocations 1.0.0b5 (one bullet covering the four durable samples). 4. **README simplified.** Replaced the two durable examples (one with timeout but no cancellation hook; one storing conversation history in ctx.metadata — both anti-patterns per our own guide) with a single minimal example showing the @task decorator and .run() call. Pointers to the developer guide for streaming/suspend/retry/timeout. Verified: 433 core tests pass, pylint 10.00/10, mypy 0 new errors. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 24dc2c5 commit 7dfbd7a

12 files changed

Lines changed: 61 additions & 525 deletions

File tree

sdk/agentserver/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,6 @@ specs/
33
.specify/
44
.github/
55
.vscode/
6+
7+
# Demo session state — regenerated each time the demo runs
8+
.demo-session

sdk/agentserver/azure-ai-agentserver-core/CHANGELOG.md

Lines changed: 19 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -4,126 +4,25 @@
44

55
### Features Added
66

7-
**Durable tasks — new primitive.** This release introduces the durable-task
8-
primitive: a small decorator-driven API that lets a hosted agent run long
9-
operations as **named tasks** that survive process crashes, OOM kills, and
10-
container redeployments. Tasks pick up exactly where they were after recovery,
11-
without the developer writing any explicit checkpoint or replay code.
12-
13-
The full developer learning arc lives in
14-
[`docs/durable-task-guide.md`](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/agentserver/azure-ai-agentserver-core/docs/durable-task-guide.md).
15-
A short tour:
16-
17-
```python
18-
from azure.ai.agentserver.core.durable import task
19-
20-
@task(name="long_research")
21-
async def do_research(ctx, prompt: str) -> dict:
22-
# ctx.entry_mode tells you "fresh" | "resumed" | "recovered"
23-
if ctx.entry_mode == "recovered":
24-
# Pick up from where you were, using ctx.metadata
25-
...
26-
await ctx.stream({"phase": "searching"})
27-
...
28-
return {"summary": "..."}
29-
30-
# In your handler:
31-
run = await do_research.run(task_id="task-123", input={"prompt": "..."})
32-
async for chunk in run:
33-
...
34-
result = await run.result()
35-
```
36-
37-
**Concepts shipping in this release**
38-
39-
- `@task(...)` decorator + `Task` returned object with `.run()`, `.start()`,
40-
`.options(...)`, `.get_active_run(task_id)`.
41-
- `TaskContext` — passed to every handler invocation. Exposes `entry_mode`,
42-
`input`, `metadata` (with auto-flush at lifecycle boundaries), `cancel`
43-
(an `asyncio.Event`), the cause booleans `timeout_exceeded` and
44-
`cancel_requested`, the steering signals `pending_input_count` and
45-
`is_steered_turn`, the shutdown signal `shutdown`, the retry counter
46-
`retry_attempt`, and the recovery counter `recovery_count`. Provides
47-
`ctx.suspend(output=...)`, `ctx.stream(chunk)`, and
48-
`ctx.exit_for_recovery()` (used during graceful shutdown to leave the
49-
stored status `in_progress` so the next process recovers the task).
50-
- `TaskResult.status``Literal["completed", "suspended"]`. Failure paths
51-
surface as exceptions (`TaskFailed`, `TaskCancelled`,
52-
`TaskConflictError`); the stored status is `"completed"` plus an error
53-
payload.
54-
- `TaskConflictError` — single error type for any "task is busy / not
55-
available" state (live elsewhere, recovered elsewhere, evicted under
56-
split-brain protection, terminal with queued steerer). Carries
57-
`current_status` so callers can branch.
58-
- `RetryPolicy` — exponential / fixed / linear backoff presets, durable
59-
across crash and recovery (failures-only; crash recovery does not
60-
consume retry budget).
61-
- `EntryMode` Literal: `"fresh" | "resumed" | "recovered"`.
62-
- `Suspended` (sentinel for `.run()` of a suspended task),
63-
`TaskStatus` Literal (`"pending" | "in_progress" | "suspended" | "completed"`),
64-
`TaskMetadata`, `StreamHandler`, `StreamHandlerFactory`, `QueueStreamHandler`.
65-
66-
**Behavior shipping in this release**
67-
68-
- **Automatic recovery.** Crashed-mid-task records are detected at three
69-
layers — startup scan, periodic background scan, and inline reclaim at
70-
every scheduling primitive (`.run`, `.start`, `.get_active_run`). The
71-
developer sees `ctx.entry_mode == "recovered"` and otherwise the same
72-
`TaskContext` surface as on a fresh entry.
73-
- **Split-brain protection.** A new agent process that takes over a session
74-
cancels stranded executions in the previous process cleanly via
75-
`HTTP 409 binding_mismatch`. The previous process classifies this as
76-
*evicted*, cancels its execution, suppresses its terminal write, and
77-
signals its awaiters with `TaskConflictError`. Caller-observable behavior
78-
matches the live-elsewhere case.
79-
- **Steering as plain multi-turn.** `Task.start(...)` on an already-active
80-
steerable task queues the new input. The first turn's `ctx.suspend(...)`
81-
call resolves the steerer's `.result()` with the *next* turn's outcome,
82-
delivering plain pipelined-handler semantics with no separate "superseded"
83-
status. The first turn's caller observes the natural outcome of the first
84-
turn unchanged.
85-
- **Per-turn wall-clock timeout.** `@task(timeout=...)` is anchored to a
86-
persisted per-turn-start timestamp. A crash mid-turn does NOT reset the
87-
budget; the recovered watchdog computes
88-
`remaining = max(0, timeout - (now - turn_started_at))`. The watchdog is
89-
cooperative-only: it sets `ctx.timeout_exceeded = True` then
90-
`ctx.cancel.set()` — handlers that ignore both run until the process ends
91-
or `TaskRun.cancel()` is called externally.
92-
- **Metadata auto-flush at lifecycle boundaries.** `ctx.metadata` is
93-
flushed automatically at every terminal-of-turn boundary (suspend,
94-
complete, cooperative cancel, exception, suspend-with-queued-steering,
95-
return-with-queued-steering, raise-with-queued-steering, shutdown via
96-
`exit_for_recovery`). Explicit `ctx.metadata.flush()` remains available
97-
as a fence before at-most-once side effects.
98-
- **Bookkeeping is durable.** Suspended-resume input patches are
99-
ETag-protected. Steerable input data is cleared from the stored record at
100-
the suspend transition (data minimization). The lease owner string
101-
incorporates both `FOUNDRY_AGENT_NAME` and session ID, so two different
102-
agents sharing a session ID cannot collide on lease ownership.
103-
104-
**Transport**
105-
106-
- `HostedTaskProvider` is built on `azure.core.AsyncPipelineClient` with
107-
the standard policy chain (request-id, headers, user-agent, retry,
108-
`AsyncBearerTokenCredentialPolicy`, task-API logging, distributed
109-
tracing). `ContentDecodePolicy` is intentionally excluded; body parsing
110-
happens at the call site with defensive error handling. The retry policy
111-
retries on 5xx / 408 / 429 only — never on 409 regardless of body. The
112-
`credential` parameter on `HostedTaskProvider.__init__` is typed
113-
`AsyncTokenCredential`.
114-
- `httpx` is no longer a production dependency.
115-
116-
**Documentation & tests**
117-
118-
- New developer guide at `docs/durable-task-guide.md` is the end-to-end
119-
learning arc for the primitive (concepts → reference → patterns →
120-
troubleshooting).
121-
- New doc-review meta-test (`tests/durable/test_dev_guide_review.py`) keeps
122-
the guide and the public surface in sync.
123-
- The durable-task test suite (`tests/durable/`) covers the public surface
124-
contract, recovery in all three layers, eviction, steering as
125-
multi-turn, the per-turn-wall-clock-durable timeout, and metadata
126-
auto-flush invariants.
7+
- **Durable tasks** — new `@task` decorator and supporting types
8+
(`TaskContext`, `TaskResult`, `TaskRun`, `RetryPolicy`,
9+
`TaskConflictError`, `TaskFailed`, `TaskCancelled`) for
10+
crash-resilient long-running agents. Tasks survive container
11+
restarts, OOM kills, and redeployments; the framework re-enters the
12+
handler with `ctx.entry_mode == "recovered"` and a populated
13+
`ctx.metadata` after a crash. Supports streaming output via
14+
`ctx.stream()`, multi-turn suspend/resume via `ctx.suspend()`,
15+
cooperative cancel via `ctx.cancel`, per-turn wall-clock timeout via
16+
`@task(timeout=...)`, and steering of in-flight tasks via
17+
`@task(steerable=True)`. See the
18+
[developer guide](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/agentserver/azure-ai-agentserver-core/docs/durable-task-guide.md)
19+
for the full API and patterns reference.
20+
21+
### Other Changes
22+
23+
- The hosted task-store transport is now built on
24+
`azure.core.AsyncPipelineClient` instead of `httpx`; `httpx` is no
25+
longer a production dependency.
12726

12827
## 2.0.0b5 (2026-05-25)
12928

sdk/agentserver/azure-ai-agentserver-core/README.md

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -118,48 +118,23 @@ python my_agent.py
118118
The `@task` decorator builds crash-resilient agents that survive container restarts, OOM kills, and redeployments. Task state is persisted to a task store, enabling automatic recovery and multi-turn suspend/resume patterns.
119119

120120
```python
121-
from datetime import timedelta
122-
from azure.ai.agentserver.core.durable import task, TaskContext, RetryPolicy
121+
from azure.ai.agentserver.core.durable import task, TaskContext
123122

124-
@task(
125-
timeout=timedelta(minutes=30),
126-
retry=RetryPolicy.exponential_backoff(max_attempts=3),
127-
tags={"priority": "high"},
128-
)
123+
@task
129124
async def process_document(ctx: TaskContext[dict]) -> dict:
130-
ctx.metadata["phase"] = "processing"
131-
result = await analyze(ctx.input["document_url"])
132-
ctx.metadata["phase"] = "complete"
133-
return {"summary": result}
134-
```
135-
136-
**Start and await a task:**
137-
138-
```python
139-
result = await process_document.run(task_id="doc-42", input={"document_url": "..."})
125+
# ctx.entry_mode is "fresh" | "resumed" | "recovered".
126+
# The framework re-invokes the handler from the top after a
127+
# crash; ctx.input survives, so the handler picks up.
128+
summary = await analyze(ctx.input["document_url"])
129+
return {"summary": summary}
130+
131+
result = await process_document.run(
132+
task_id="doc-42", input={"document_url": "..."},
133+
)
140134
print(result.output) # {"summary": "..."}
141135
```
142136

143-
**Multi-turn suspend/resume (e.g., conversational agents):**
144-
145-
```python
146-
@task()
147-
async def chat_session(ctx: TaskContext[dict]) -> dict:
148-
message = ctx.input["message"]
149-
history = ctx.metadata.get("history", [])
150-
reply = await generate_reply(message, history)
151-
history.append({"role": "user", "content": message})
152-
history.append({"role": "assistant", "content": reply})
153-
ctx.metadata["history"] = history
154-
return await ctx.suspend(output={"reply": reply})
155-
156-
# Each call resumes the same session:
157-
result = await chat_session.run(task_id="session-1", input={"message": "Hello"})
158-
print(result.output) # {"reply": "Hi! How can I help?"}
159-
print(result.is_suspended) # True
160-
```
161-
162-
See the [Developer Guide](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/agentserver/azure-ai-agentserver-core/docs/durable-task-guide.md) for the full API reference.
137+
See the [Developer Guide](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/agentserver/azure-ai-agentserver-core/docs/durable-task-guide.md) for streaming, multi-turn suspend/resume, retries, timeouts, steering, and the patterns reference.
163138

164139
## Troubleshooting
165140

sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/_base.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,16 @@
4040
def _read_task_manager_shutdown_grace() -> float:
4141
"""Return TaskManager shutdown grace in seconds (env-driven, default 25.0).
4242
43-
Reads ``AGENTSERVER_TASK_MANAGER_SHUTDOWN_GRACE_SECONDS`` if set,
44-
falling back to ``AGENTSERVER_SHUTDOWN_GRACE_SECONDS`` (used by the
45-
responses layer for in-process draining). Defaults to 25.0 when
46-
neither is set, matching the original behaviour. Allows tests
47-
(and operators) to keep shutdown fast when no long-running durable
48-
handlers need to checkpoint — for example the conformance suite
49-
runs with a 1s grace so the in-process shutdown marker fires
50-
before the handler completes naturally.
43+
Reads ``AGENTSERVER_SHUTDOWN_GRACE_SECONDS``. Defaults to 25.0 when
44+
unset. Allows tests (and operators) to keep shutdown fast when no
45+
long-running durable handlers need to checkpoint — for example the
46+
conformance suite runs with a 1s grace so the in-process shutdown
47+
marker fires before the handler completes naturally.
5148
5249
:return: Grace period in seconds (non-negative).
5350
:rtype: float
5451
"""
55-
raw = (
56-
os.environ.get("AGENTSERVER_TASK_MANAGER_SHUTDOWN_GRACE_SECONDS")
57-
or os.environ.get("AGENTSERVER_SHUTDOWN_GRACE_SECONDS")
58-
)
52+
raw = os.environ.get("AGENTSERVER_SHUTDOWN_GRACE_SECONDS")
5953
if raw is None:
6054
return 25.0
6155
try:

sdk/agentserver/azure-ai-agentserver-core/azure/ai/agentserver/core/durable/_decorator.py

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,7 +1126,6 @@ def options(
11261126
self,
11271127
*,
11281128
title: str | Callable[[Any, str], str] | None = None,
1129-
tags: dict[str, str] | Callable[[Any, str], dict[str, str]] | None = None,
11301129
timeout: timedelta | None = None,
11311130
ephemeral: bool | None = None,
11321131
retry: RetryPolicy | None = None,
@@ -1138,8 +1137,6 @@ def options(
11381137
11391138
:keyword title: Title override.
11401139
:paramtype title: str | Callable[[Any, str], str] | None
1141-
:keyword tags: Tag overrides.
1142-
:paramtype tags: dict[str, str] | Callable[[Any, str], dict[str, str]] | None
11431140
:keyword timeout: Execution timeout override.
11441141
:paramtype timeout: timedelta | None
11451142
:keyword ephemeral: Whether to delete task on terminal exit.
@@ -1151,27 +1148,10 @@ def options(
11511148
:return: A new Task with overridden options.
11521149
:rtype: Task[Input, Output]
11531150
"""
1154-
# For tags: if both old and new are dicts, merge them.
1155-
# Mixing callable and dict is not supported — use one or the other.
1156-
resolved_tags: dict[str, str] | Callable[[Any, str], dict[str, str]] | None
1157-
if tags is not None:
1158-
if callable(tags) != callable(self._opts.tags) and self._opts.tags:
1159-
raise TypeError(
1160-
"Cannot mix callable and dict tags in options(). "
1161-
"Pass a callable to replace a callable, or a dict to merge with a dict."
1162-
)
1163-
if callable(tags):
1164-
resolved_tags = tags
1165-
else:
1166-
existing = self._opts.tags if isinstance(self._opts.tags, dict) else {}
1167-
resolved_tags = _strip_reserved_tags({**existing, **(tags or {})})
1168-
else:
1169-
resolved_tags = self._opts.tags
1170-
11711151
new_opts = TaskOptions(
11721152
name=self._opts.name,
11731153
title=title if title is not None else self._opts.title,
1174-
tags=resolved_tags,
1154+
tags=self._opts.tags,
11751155
timeout=timeout if timeout is not None else self._opts.timeout,
11761156
ephemeral=(ephemeral if ephemeral is not None else self._opts.ephemeral),
11771157
retry=retry if retry is not None else self._opts.retry,
@@ -1197,7 +1177,6 @@ def task(
11971177
*,
11981178
name: str | None = ...,
11991179
title: str | Callable[[Any, str], str] | None = ...,
1200-
tags: dict[str, str] | Callable[[Any, str], dict[str, str]] | None = ...,
12011180
timeout: timedelta | None = ...,
12021181
ephemeral: bool = ...,
12031182
retry: RetryPolicy | None = ...,
@@ -1214,7 +1193,6 @@ def task(
12141193
*,
12151194
name: str | None = None,
12161195
title: str | Callable[[Any, str], str] | None = None,
1217-
tags: dict[str, str] | Callable[[Any, str], dict[str, str]] | None = None,
12181196
timeout: timedelta | None = None,
12191197
ephemeral: bool = True,
12201198
retry: RetryPolicy | None = None,
@@ -1239,8 +1217,6 @@ async def my_task(ctx: TaskContext[MyInput]) -> MyOutput: ...
12391217
existing in-flight tasks are still recovered correctly because the
12401218
framework matches on this name, not the Python function name.
12411219
:keyword title: Human-readable title (string or callable).
1242-
:keyword tags: Default tags (static dict or callable factory receiving
1243-
``(input, task_id)``).
12441220
:keyword timeout: Per-turn, wall-clock, durable, cooperative-only
12451221
execution budget. When the budget elapses for the current turn,
12461222
``ctx.timeout_exceeded`` is set then ``ctx.cancel`` is set; the
@@ -1275,15 +1251,10 @@ def _wrap(
12751251

12761252
input_type, output_type = _extract_generic_args(func)
12771253

1278-
# Preserve callable tags as-is (stripped at resolve time); strip static dicts now
1279-
resolved_tags = (
1280-
tags if callable(tags) else _strip_reserved_tags(dict(tags) if tags else {})
1281-
)
1282-
12831254
opts = TaskOptions(
12841255
name=name or func.__qualname__,
12851256
title=title,
1286-
tags=resolved_tags,
1257+
tags={},
12871258
timeout=timeout,
12881259
ephemeral=ephemeral,
12891260
retry=retry,

sdk/agentserver/azure-ai-agentserver-core/docs/durable-task-guide.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,6 @@ changing it strands existing tasks.
659659
|--------------------------|-------------------------------------------|---------|-------------|
660660
| `name` | `str` | `fn.__qualname__` | Stable identity for recovery routing. Always set this explicitly for production tasks. |
661661
| `title` | `str \| Callable[[T, str], str] \| None` | `None` | Human-readable title (template or callable). |
662-
| `tags` | `dict[str, str] \| Callable[[T, str], dict[str, str]] \| None` | `None` | Default tags (static dict or callable factory). |
663662
| `timeout` | `timedelta \| None` | `None` | Execution timeout. When elapsed, `ctx.cancel` is set cooperatively. |
664663
| `ephemeral` | `bool` | `True` | Delete the persisted record on terminal exit. |
665664
| `retry` | `RetryPolicy \| None` | `None` | Retry policy for handler-raised exceptions. Recovery-safe (applied on every entry, including post-crash). |
@@ -695,7 +694,7 @@ you can stream from or `await handle.result()` on. Both accept the
695694
same `input_id` / `if_last_input_id` sequential-input preconditions
696695
(see §4).
697696

698-
Everything else that characterises a task — `title`, `tags`, `retry`,
697+
Everything else that characterises a task — `title`, `retry`,
699698
`stream_handler_factory`, `steerable`, `ephemeral`,
700699
`timeout`is configured once on the `@task(...)` decorator (or via
701700
`Task.options(...)` for a derived `Task`). There is no per-call

0 commit comments

Comments
 (0)