Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 55 additions & 5 deletions sdk/agentserver/azure-ai-agentserver-responses/CHANGELOG.md

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions sdk/agentserver/azure-ai-agentserver-responses/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ The library orchestrates the complete response lifecycle: `created` → `in_prog

For detailed handler implementation guidance, see [docs/handler-implementation-guide.md](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/agentserver/azure-ai-agentserver-responses/docs/handler-implementation-guide.md).

### Durability

Background responses with `store=True` are automatically crash-recoverable. If the server crashes mid-response, the handler is re-invoked on restart — no code changes needed. Stream events are persisted incrementally so clients can reconnect and resume from where they left off. For advanced scenarios (metadata checkpointing, multi-turn steering), see the [Durable Responses Developer Guide](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/agentserver/azure-ai-agentserver-responses/docs/durable-responses-developer-guide.md).

## Examples

### Echo handler
Expand Down Expand Up @@ -214,6 +218,10 @@ Visit the [Samples](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/
| [File Inputs](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/agentserver/azure-ai-agentserver-responses/samples/sample_14_file_inputs.py) | Receive files via base64 data URL, URL, or file ID |
| [Annotations](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/agentserver/azure-ai-agentserver-responses/samples/sample_15_annotations.py) | Attach file_path, file_citation, and url_citation annotations |
| [Structured Outputs](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/agentserver/azure-ai-agentserver-responses/samples/sample_16_structured_outputs.py) | Return structured JSON as a `structured_outputs` item |
| [Durable Claude](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/agentserver/azure-ai-agentserver-responses/samples/durable_claude/agent.py) | Claude Agent SDK with stateful sessions and three-phase cancel |
| [Durable Copilot](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/agentserver/azure-ai-agentserver-responses/samples/durable_copilot/agent.py) | Copilot SDK with session lifecycle and steering |
| [Durable LangGraph](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/agentserver/azure-ai-agentserver-responses/samples/durable_langgraph/agent.py) | LangGraph multi-step graph with per-node checkpointing |
| [Durable Multi-turn](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/agentserver/azure-ai-agentserver-responses/samples/durable_multiturn/agent.py) | Multi-turn conversation with bounded metadata |

- [Handler implementation guide](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/agentserver/azure-ai-agentserver-responses/docs/handler-implementation-guide.md) — Detailed reference for building handlers

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
get_input_expanded,
to_output_item,
)
from .models.runtime import CancellationReason
from .store._base import ResponseProviderProtocol, ResponseStreamProviderProtocol
from .store._foundry_errors import (
FoundryApiError,
Expand All @@ -32,6 +33,7 @@
__all__ = [
"__version__",
"data_url", # pylint: disable=naming-mismatch
"CancellationReason",
"ResponsesAgentServerHost",
"ResponseContext",
"IsolationContext",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""DurabilityContext — recovery-awareness state exposed to response handlers.

Per spec 015 FR-040 / FR-005, the handler-facing metadata wrapper rejects
any key (or named-namespace name) starting with ``_`` so that response
handlers cannot accidentally collide with framework-reserved namespaces
(e.g. ``_responses``). The framework layer reaches those namespaces via
the underlying :class:`~azure.ai.agentserver.core.durable.TaskContext`
directly — the primitive itself does not enforce the convention.
"""

from __future__ import annotations

from collections.abc import Iterator, MutableMapping
from typing import Any, Literal, Optional

DurabilityEntryMode = Literal["fresh", "recovered"]


class _DeveloperMetadataFacade(MutableMapping[str, Any]):
"""Handler-facing wrapper over a ``TaskMetadata``-like backing store.

Provides the same dict-like + callable shape as
:class:`~azure.ai.agentserver.core.durable.TaskMetadata` but rejects
any key (or namespace name) starting with ``_``. Framework layers
that need to write into reserved namespaces (e.g. ``_responses``)
must use the underlying ``TaskContext.metadata`` directly — they do
NOT go through this wrapper.
"""

def __init__(self, raw: Any, _namespaces: Optional[dict[str, Any]] = None) -> None:
self._raw = raw
# For plain-dict backing stores (used in unit tests where the
# backing object isn't a real TaskMetadata), maintain a private
# per-namespace dict registry so ``facade(name)`` returns a
# genuinely isolated store. For real TaskMetadata stores (callable),
# the underlying primitive owns the registry.
self._namespaces: dict[str, Any] = _namespaces if _namespaces is not None else {}

@staticmethod
def _check_key(key: Any) -> None:
if isinstance(key, str) and key.startswith("_"):
raise ValueError(
f"metadata keys starting with '_' are reserved for "
f"framework-internal namespaces (got {key!r}). Pick a "
f"non-underscore-prefixed name."
)

def __getitem__(self, key: str) -> Any:
self._check_key(key)
return self._raw[key]

def __setitem__(self, key: str, value: Any) -> None:
self._check_key(key)
self._raw[key] = value

def __delitem__(self, key: str) -> None:
self._check_key(key)
del self._raw[key]

def __iter__(self) -> Iterator[str]:
return iter(k for k in self._raw if not (isinstance(k, str) and k.startswith("_")))

def __len__(self) -> int:
return sum(1 for k in self._raw if not (isinstance(k, str) and k.startswith("_")))

def __contains__(self, key: object) -> bool:
if isinstance(key, str) and key.startswith("_"):
return False
return key in self._raw

def get(self, key: str, default: Any = None) -> Any:
if isinstance(key, str) and key.startswith("_"):
return default
return self._raw.get(key, default)

def __call__(self, name: Optional[str] = None) -> "_DeveloperMetadataFacade":
"""Return a sibling namespace facade.

``ctx.metadata`` accesses the default (unnamed) namespace.
``ctx.metadata(name)`` accesses a named namespace.

:raises ValueError: If ``name`` starts with ``_`` (reserved).
"""
if name is None:
return self
if not isinstance(name, str):
raise TypeError(
f"namespace name must be a str, got {type(name).__name__}"
)
if name.startswith("_"):
raise ValueError(
f"named namespace {name!r} starts with '_', which is "
f"reserved for framework-internal layers (e.g. "
f"'_responses'). Pick a non-underscore-prefixed name."
)
raw = self._raw
if callable(raw):
sub = raw(name)
return _DeveloperMetadataFacade(sub)
# Plain-dict fallback: keep an isolated sub-dict per namespace
sub = self._namespaces.setdefault(name, {})
return _DeveloperMetadataFacade(sub)

async def flush(self) -> None:
"""Force-persist any pending metadata writes for this namespace.

Delegates to the underlying ``TaskMetadata.flush()`` when present.
For non-durable / transient contexts (e.g. ``store=false`` responses
or unit tests where the backing store is a plain ``dict``), this
is a no-op.
"""
flush = getattr(self._raw, "flush", None)
if callable(flush):
import asyncio # local import to avoid top-level cycle # noqa: PLC0415

result = flush()
if asyncio.iscoroutine(result):
await result


class DurabilityContext:
"""Recovery-awareness context exposed to response handlers.

All properties are read-only except :attr:`metadata`, which is a
mutable mapping (also callable for named namespaces) for
developer-controlled checkpointing.

:param entry_mode: How the handler was entered — ``"fresh"`` for
normal invocation or ``"recovered"`` after a crash.
:param retry_attempt: Retry attempt counter — durable across crash
recovery. Resets to 0 on a successful invocation chain; increments
only on retryable failures.
:param was_steered: Whether this invocation resulted from steering.
:param pending_inputs: Number of queued steering inputs after this one.
:param metadata: Developer-accessible checkpoint store. Use
``ctx.metadata`` for the default namespace or
``ctx.metadata(name)`` for a named namespace.
"""

__slots__ = (
"_entry_mode",
"_retry_attempt",
"_was_steered",
"_pending_inputs",
"_metadata",
)

def __init__(
self,
*,
entry_mode: DurabilityEntryMode,
retry_attempt: int,
was_steered: bool,
pending_inputs: int,
metadata: Any,
) -> None:
self._entry_mode = entry_mode
self._retry_attempt = retry_attempt
self._was_steered = was_steered
self._pending_inputs = pending_inputs
self._metadata = (
metadata
if isinstance(metadata, _DeveloperMetadataFacade)
else _DeveloperMetadataFacade(metadata)
)

@property
def entry_mode(self) -> DurabilityEntryMode:
"""How the handler was entered: ``'fresh'`` or ``'recovered'``."""
return self._entry_mode

@property
def is_recovery(self) -> bool:
"""Convenience: True when this is a recovered re-invocation after a crash.

Equivalent to ``entry_mode == "recovered"``.
"""
return self._entry_mode == "recovered"

@property
def retry_attempt(self) -> int:
"""Retry attempt counter — durable across crash recovery.

Resets to 0 on a successful invocation; increments only when the
handler is re-invoked due to a retryable failure. The value is
persisted to the task store at lifecycle boundaries, so it is
stable across both in-process retries and post-crash recovery.

Per spec 015 FR-001/FR-002, this counter unifies the previous
``run_attempt`` (per-process) and the cross-lifetime intent: the
framework now tracks a single durable retry count.
"""
return self._retry_attempt

@property
def was_steered(self) -> bool:
"""Whether this invocation was triggered by a steering input."""
return self._was_steered

@property
def pending_inputs(self) -> int:
"""Number of queued steering inputs remaining after this one."""
return self._pending_inputs

@property
def metadata(self) -> _DeveloperMetadataFacade:
"""Developer-accessible checkpoint store.

Use ``ctx.metadata["key"] = value`` for the default namespace, or
``ctx.metadata("my_namespace")["key"] = value`` for a named
namespace. Keys (and namespace names) starting with ``_`` are
rejected — those are reserved for framework-internal layers.
"""
return self._metadata
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ def __init__(
sse_keep_alive_interval_seconds: int | None = None,
shutdown_grace_period_seconds: int = 10,
create_span_hook: "CreateSpanHook | None" = None,
durable_background: bool = True,
steerable_conversations: bool = False,
store_disabled: bool = False,
max_pending: int = 10,
replay_event_ttl_seconds: float = 600,
) -> None:
if additional_server_version is not None:
normalized = additional_server_version.strip()
Expand All @@ -34,7 +39,10 @@ def __init__(
default_model = normalized_model or None
self.default_model = default_model

if sse_keep_alive_interval_seconds is not None and sse_keep_alive_interval_seconds <= 0:
if (
sse_keep_alive_interval_seconds is not None
and sse_keep_alive_interval_seconds <= 0
):
raise ValueError("sse_keep_alive_interval_seconds must be > 0 when set")
self.sse_keep_alive_interval_seconds = sse_keep_alive_interval_seconds

Expand All @@ -48,8 +56,30 @@ def __init__(

self.create_span_hook = create_span_hook

# Durability options (developer-controlled, baked into container image)
if steerable_conversations and store_disabled:
raise ValueError(
"steerable_conversations=True requires store to be enabled "
"(store_disabled must be False)"
)
if steerable_conversations and not durable_background:
raise ValueError(
"steerable_conversations=True requires durable_background=True "
"for background responses"
)
if max_pending <= 0:
raise ValueError("max_pending must be > 0")

self.durable_background = durable_background
self.steerable_conversations = steerable_conversations
self.store_disabled = store_disabled
self.max_pending = max_pending
self.replay_event_ttl_seconds = replay_event_ttl_seconds

@classmethod
def from_env(cls, environ: Mapping[str, str] | None = None) -> "ResponsesServerOptions":
def from_env(
cls, environ: Mapping[str, str] | None = None
) -> "ResponsesServerOptions":
"""Create options from environment variables.

:param environ: Optional mapping of environment variables. Defaults to ``os.environ``.
Expand Down
Loading
Loading