Skip to content

Commit 9000725

Browse files
brianstrauchclaude
andauthored
Add Strands Agents plugin (contrib) (#1539)
* Bump ruff to 0.15 and reformat Also bump `[tool.ruff] target-version` from py39 to py310 to match `requires-python`; the old setting caused 0.15 to reject `match` statements in the codebase. * contrib/strands: add Strands Agents plugin * contrib/strands: split activity helper into _TemporalActivityTool * contrib/strands: import strands_tools at module level in wrapper * contrib/strands: run models as activities via TemporalModel * contrib/strands: patch Agent.__init__ to route models through activities * contrib/strands: explicit TemporalModel/TemporalMCPClient, drop monkey-patch, add MCP * contrib/strands: rewrite README, default TemporalModel to BedrockModel Restructure the README into Quickstart + per-feature sections (Model, Structured Output, Streaming, Tools, MCP), add an experimental warning, installation instructions, and a link to strandsagents.com. Also default `TemporalModel.model_factory` to `BedrockModel`, matching the Strands `Agent` default, so the common case doesn't need a factory lambda. * contrib/strands: show worker activity registration in Tools snippet * contrib/strands: add activity_as_hook helper, document Hooks Adds activity_as_hook(activity_fn, *, extract, **options): wraps a Temporal activity as a Strands HookCallback so I/O-doing hook callbacks (audit logs, metrics) dispatch off the workflow. Co-locates with activity_as_tool in a new _workflow.py. * contrib/strands: rename activity_as_hook extract to activity_input * contrib/strands: document HITL interrupts, add integration test * contrib/strands: document continue-as-new for long chats * contrib/strands: document OpenTelemetryPlugin compatibility * contrib/strands: fix lint warnings and add missing docstrings * disable tiktoken and other sandbox warnings * contrib/strands: disable Strands retries, route via Temporal RetryPolicy Patch Agent.__init__ at import time to force retry_strategy=None and raise ValueError when a strategy is supplied, so retries happen at the Temporal activity layer (RetryPolicy on activity options) rather than blocking inside the activity body. Documents the behavior in README. * contrib/strands: move activity_as_tool/hook under workflow.* submodule * contrib/strands: disable Agent.take_snapshot/load_snapshot * contrib/strands: add CODEOWNERS entries * contrib/strands: type _InvokeModelInput fields as Any Python < 3.11's get_type_hints leaks NotRequired[...] through TypedDict fields, which the default JSON converter can't deserialize. Strands Message and ToolSpec use NotRequired, so loosen the activity input fields to Any; values pass through unchanged to Model.stream. * contrib/common: extract _heartbeat_decorator for cross-plugin use * contrib/strands: auto-heartbeat model activities, loosen input types Switch invoke_model/_streaming to the _auto_heartbeater pattern (matches openai_agents) so the heartbeat clock doesn't depend on event cadence and the non-streaming activity is covered too. Type _InvokeModelInput fields as Any: strands Message/ToolSpec use NotRequired, which Python < 3.11's get_type_hints leaks through and the default JSON converter then fails to deserialize. Values pass through unchanged to Model.stream. Drop the explicit activity name= override so the activities use their function names (invoke_model, invoke_model_streaming), matching the naming convention used by other contrib model activities. * contrib/strands: clarify tiktoken comment * contrib/common: drop leading underscore from auto_heartbeater Cross-module consumers can't be seen by basedpyright when the function is underscore-prefixed, producing a false unused-function warning. Promote the name and add a package docstring. * contrib/strands: drop redundant passthrough entries `pydantic` and `temporalio.contrib.strands` are already covered by the SDK's default passthrough (via `pydantic` and `temporalio` in `passthrough_modules_with_temporal`). Update the stale comment in _temporal_mcp_client.py that explained the redundant entry. * contrib/strands: support multiple models and MCP servers per worker Replace the singular model= / mcp_clients=[...] plugin args with name-keyed dicts: StrandsPlugin(models={name: factory}, mcp_clients={name: transport}). TemporalModel and TemporalMCPClient become pure workflow-side handles that reference the worker registration by name and carry only per-call activity options. A single pair of model activities now dispatches to any number of backing models by resolving model_name from the activity input. * contrib/strands: introduce TemporalAgent, drop Agent monkey-patches TemporalAgent(Agent) is the primary user-facing class: it takes model="name" to select a factory registered with StrandsPlugin(models=...), accepts the per-call activity options, and forwards all other kwargs to Strands' Agent. Construction-time validation of retry_strategy and overrides of take_snapshot/load_snapshot replace the previous Agent.__init__ and snapshot monkey-patches in StrandsPlugin. TemporalModel is no longer exported; it remains as internal plumbing for TemporalAgent. * contrib/strands: register OpenTelemetryPlugin on the client in the README Per the OpenTelemetry plugin's own guidance, plugins register on the client so workers built from that client pick them up automatically. Update the Observability section accordingly, plus minor wording polish in the Models and Structured Output sections. * contrib/strands: set max_cached_workflows=0 in tests Force every workflow task to replay from full history so the strands tests double as a continuous determinism check on the plugin and TemporalAgent. All 7 tests pass under the stricter setting. Also trims a redundant paragraph from StrandsPlugin's docstring. * contrib/strands: appease poe lint Drop the leading underscore from populate_cache / clear_cache / build_call_tool_activity in _temporal_mcp_client.py — basedpyright flagged them as unused because it doesn't follow cross-module imports for underscore-prefixed names. Add docstrings since pydocstyle now treats them as public. Also pick up a one-line ruff format fix in _model_activity.py. * contrib/strands: propagate InterruptException across activity boundary Install a failure converter on the plugin's data converter that translates strands InterruptException into an ApplicationError carrying the Interrupt payload in details. TemporalActivityTool.stream() catches the matching ApplicationError, reconstructs the Interrupt, and yields ToolInterruptEvent so AgentResult.interrupts is populated just like the in-workflow case. The path requires StrandsPlugin on the client (not just the worker), since _ActivityWorker reads the data converter from client_config. README HITL section is restructured to cover both hook-based and tool-body surfaces, with a note on the client-attachment requirement. New test_interrupt_exception.py exercises both surfaces end-to-end with signal-driven resume. * contrib/strands: forward invocation_state; default StrandsPlugin to BedrockModel Forward agent invocation_state across the model activity boundary so the worker-side model receives it via model.stream(invocation_state=...). Entries that aren't JSON-serializable are dropped before dispatch with a debug log naming the dropped keys. Make model selection optional. StrandsPlugin() with no args registers a single BedrockModel() factory under the name "bedrock" (matching Strands' own implicit default in agent.py:221), and TemporalAgent() with no model resolves to the sole registered factory at activity time. Multi-model setups continue to require an explicit model= on TemporalAgent. README quickstart shrinks accordingly: no BedrockModel import, no models= argument on StrandsPlugin, no model= on TemporalAgent. Model= remains in the multi-model example where it's load-bearing. * contrib/strands: gate implicit model resolution behind the plugin default Drop the single-entry guess for TemporalAgent(model=None). Implicit resolution is now valid only when StrandsPlugin auto-registers its own BedrockModel default; any user-supplied models= forces every TemporalAgent to pass model= explicitly. Track the gate via a default_name field on ModelActivity that the plugin sets only on the auto-registered path. * contrib/strands: mark terminal Strands exceptions non-retryable Extend StrandsFailureConverter.to_failure to translate Strands' terminal model/session exceptions into ApplicationError(non_retryable=True, type=...): MaxTokensReachedException, ContextWindowOverflowException, StructuredOutputException, SessionException. These deterministic failures won't succeed on retry, so the typed annotation stops Temporal's retry policy from churning on them. ModelThrottledException stays retryable. * contrib/strands: accept MCPClient factories in mcp_clients * contrib/strands: drop _get_encoding monkey patch strands-agents 1.39.0 removed _get_encoding and routes count_tokens straight to the chars-per-token heuristic, so the patch is a no-op. Bump the floor pin to 1.39.0 to keep that assumption true. * contrib/strands: swap current_time for shell in demos and tests * contrib/strands: ruff format populate_cache signature * contrib/strands: switch test_structured_output to TemporalAgent Also refresh stale Agent(...) references in _temporal_mcp_client and _temporal_model docstrings to point at TemporalAgent(...). * contrib/strands: rename optional dependency to strands-agents * contrib/strands: fix pydoctor docstring errors `warnings-as-errors = true` was failing CI's gen-docs step on invalid RST inline literals and unresolvable cross-references to the optional strands package. * tests: fix Windows test collection failures * test_type_errors.py: open test files with encoding="utf-8" so the rglob scan doesn't choke on UTF-8 characters (e.g. the strawberry emoji in test_tool.py) when the host's default codec is cp1252. * test_tool.py: skip the module on Windows; strands_tools.shell pulls in pty -> tty -> termios at import time, which is Unix-only. * tests/contrib/strands: swap shell tool for file_read strands_tools.shell imports pty/tty/termios at module load, which is Unix-only and broke Windows test collection. file_read on a tmp_path fixture is also non-deterministic (depends on filesystem state), has no in-workflow equivalent, and imports cleanly on every platform. * Fix Strands MCP client on Python 3.10 * contrib: inline _heartbeat_decorator into strands and openai_agents Removes the temporalio/contrib/common shared module by duplicating _heartbeat_decorator.py into each plugin and updating the two import sites. * contrib/strands: shorten strands import paths * contrib/strands: cache MCP connections across tool calls The per-server {server}-call-tool activity opened a fresh MCP session on every invocation (open transport + initialize + call + teardown), so an agent making several successive MCP calls paid that handshake per call -- and for stdio servers, a subprocess spawn per call. Hold a lazily-opened MCP session per server in the activity worker process so successive call-tool activities reuse one connection. A dedicated owner task enters and exits the anyio transport/ClientSession context managers in the same task (the cancel-scope rule); call-tool activities on the same event loop invoke session.call_tool directly, which MCP multiplexes by request id. Evict on idle timeout, on a call error (so a broken session reconnects), and on worker shutdown -- scoped to the servers the plugin registered rather than every cached connection. A reused session now carries server-side state across workflows sharing a worker, a behavior change from the previous per-call isolation. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * contrib/strands: make MCP connection idle timeout configurable The per-server call-tool activity keeps a worker-process MCP connection open between calls and evicts it after a fixed 5-minute idle window. Expose that window as a `mcp_connection_idle_timeout` plugin option (a timedelta); the 5-minute module default is unchanged. Arm the idle timer only once no calls remain in flight, rather than at the start of each call. The timer now measures genuine idle time between calls, and a call running longer than the timeout is never torn down underneath itself -- which also removes the connection-establishment race a short timeout previously exposed. A record only arms a timer while it is the cached connection, and the timer re-checks for in-flight calls before evicting. --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent d53a604 commit 9000725

30 files changed

Lines changed: 3048 additions & 20 deletions

.github/CODEOWNERS

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,18 @@
66

77

88
# Below are owners for modules in the temporalio/contrib/
9-
# and tests/contrib/ directories that are owned by teams
10-
# other than the SDK team. For each one, we add the owning team,
9+
# and tests/contrib/ directories that are owned by teams
10+
# other than the SDK team. For each one, we add the owning team,
1111
# as well as @temporalio/sdk, so the SDK team can continue to
1212
# manage repo-wide concerns.
13+
/temporalio/contrib/common/ @temporalio/ai-sdk @temporalio/sdk
1314
/temporalio/contrib/google_adk_agents/ @temporalio/ai-sdk @temporalio/sdk
1415
/temporalio/contrib/langgraph/ @temporalio/ai-sdk @temporalio/sdk
1516
/temporalio/contrib/langsmith/ @temporalio/ai-sdk @temporalio/sdk
1617
/temporalio/contrib/openai_agents/ @temporalio/ai-sdk @temporalio/sdk
18+
/temporalio/contrib/strands/ @temporalio/ai-sdk @temporalio/sdk
1719
/tests/contrib/google_adk_agents/ @temporalio/ai-sdk @temporalio/sdk
1820
/tests/contrib/langgraph/ @temporalio/ai-sdk @temporalio/sdk
1921
/tests/contrib/langsmith/ @temporalio/ai-sdk @temporalio/sdk
2022
/tests/contrib/openai_agents/ @temporalio/ai-sdk @temporalio/sdk
23+
/tests/contrib/strands/ @temporalio/ai-sdk @temporalio/sdk

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ lambda-worker-otel = [
4040
"opentelemetry-sdk-extension-aws>=2.0.0,<3",
4141
]
4242
aioboto3 = ["aioboto3>=10.4.0", "types-aioboto3[s3]>=10.4.0"]
43+
strands-agents = ["strands-agents>=1.39.0"]
4344

4445
[project.urls]
4546
Homepage = "https://github.com/temporalio/sdk-python"
@@ -85,6 +86,8 @@ dev = [
8586
"opentelemetry-sdk-extension-aws>=2.0.0,<3",
8687
"pytest-flakefinder>=1.1.0",
8788
"async-timeout>=4.0,<6; python_version < '3.11'",
89+
"strands-agents>=1.39.0",
90+
"strands-agents-tools>=0.5.2",
8891
]
8992

9093
[tool.poe.tasks]

temporalio/contrib/openai_agents/_heartbeat_decorator.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,22 @@
88
F = TypeVar("F", bound=Callable[..., Awaitable[Any]])
99

1010

11-
def _auto_heartbeater(fn: F) -> F: # type:ignore[reportUnusedClass]
12-
# Propagate type hints from the original callable.
11+
def auto_heartbeater(fn: F) -> F:
12+
"""Decorator that heartbeats at half the activity's heartbeat timeout."""
13+
1314
@wraps(fn)
1415
async def wrapper(*args: Any, **kwargs: Any) -> Any:
1516
heartbeat_timeout = activity.info().heartbeat_timeout
1617
heartbeat_task = None
1718
if heartbeat_timeout:
18-
# Heartbeat twice as often as the timeout
1919
heartbeat_task = asyncio.create_task(
20-
heartbeat_every(heartbeat_timeout.total_seconds() / 2)
20+
_heartbeat_every(heartbeat_timeout.total_seconds() / 2)
2121
)
2222
try:
2323
return await fn(*args, **kwargs)
2424
finally:
2525
if heartbeat_task:
2626
heartbeat_task.cancel()
27-
# Wait for heartbeat cancellation to complete
2827
try:
2928
await heartbeat_task
3029
except asyncio.CancelledError:
@@ -33,8 +32,7 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any:
3332
return cast(F, wrapper)
3433

3534

36-
async def heartbeat_every(delay: float, *details: Any) -> None:
37-
"""Heartbeat every so often while not cancelled"""
35+
async def _heartbeat_every(delay: float) -> None:
3836
while True:
3937
await asyncio.sleep(delay)
40-
activity.heartbeat(*details)
38+
activity.heartbeat()

temporalio/contrib/openai_agents/_invoke_model_activity.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
from typing_extensions import Required, TypedDict
4444

4545
from temporalio import activity
46-
from temporalio.contrib.openai_agents._heartbeat_decorator import _auto_heartbeater
46+
from temporalio.contrib.openai_agents._heartbeat_decorator import auto_heartbeater
4747
from temporalio.contrib.workflow_streams import WorkflowStreamClient
4848
from temporalio.exceptions import ApplicationError
4949

@@ -314,7 +314,7 @@ def __init__(self, model_provider: ModelProvider | None = None):
314314
)
315315

316316
@activity.defn
317-
@_auto_heartbeater
317+
@auto_heartbeater
318318
async def invoke_model_activity(self, input: ActivityModelInput) -> ModelResponse:
319319
"""Activity that invokes a model with the given input."""
320320
model = self._model_provider.get_model(input.get("model_name"))
@@ -337,7 +337,7 @@ async def invoke_model_activity(self, input: ActivityModelInput) -> ModelRespons
337337
_raise_for_openai_status(e)
338338

339339
@activity.defn
340-
@_auto_heartbeater
340+
@auto_heartbeater
341341
async def invoke_model_activity_streaming(
342342
self, input: StreamingActivityModelInput
343343
) -> list[TResponseStreamEvent]:
@@ -357,7 +357,7 @@ async def invoke_model_activity_streaming(
357357
``streaming_topic`` so external consumers (UIs, tracing,
358358
etc.) can observe events as they arrive.
359359
360-
Heartbeats run on a background task via ``_auto_heartbeater`` so
360+
Heartbeats run on a background task via ``auto_heartbeater`` so
361361
long initial-token latency or long pauses between chunks do not
362362
trip ``heartbeat_timeout``.
363363
"""

0 commit comments

Comments
 (0)