Skip to content

Commit bb433be

Browse files
jssmithclaudetconley1428
authored
Add Workflow Streams library (#1423)
* Add temporalio.contrib.pubsub — reusable pub/sub for workflows A workflow mixin (PubSubMixin) that turns any workflow into a pub/sub broker. Activities and starters publish via batched signals; external clients subscribe via long-poll updates exposed as an async iterator. Key design decisions: - Payloads are opaque bytes for cross-language compatibility - Topics are plain strings, no hierarchy or prefix matching - Global monotonic offsets (not per-topic) for simple continuation - Batching built into PubSubClient with Nagle-like timer + priority flush - Structured concurrency: no fire-and-forget tasks, trio-compatible - Continue-as-new support: drain_pubsub() + get_pubsub_state() + validator to cleanly drain polls, plus follow_continues on the subscriber side Module layout: _types.py — PubSubItem, PublishInput, PollInput, PollResult, PubSubState _mixin.py — PubSubMixin (signal, update, query handlers) _client.py — PubSubClient (batcher, async iterator, CAN resilience) 9 E2E integration tests covering: activity publish + subscribe, topic filtering, offset-based replay, interleaved workflow/activity publish, priority flush, iterator cancellation, context manager flush, concurrent subscribers, and mixin coexistence with application signals/queries. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Fix PubSubState CAN serialization and simplify subscribe error handling PubSubState is now a Pydantic model so it survives serialization through Pydantic-based data converters when embedded in Any-typed fields. Without this, continue-as-new would fail with "'dict' object has no attribute 'log'" because Pydantic deserializes Any fields as plain dicts. Added two CAN tests: - test_continue_as_new_any_typed_fails: documents that Any-typed fields lose PubSubState type information (negative test) - test_continue_as_new_properly_typed: verifies CAN works with properly typed PubSubState | None fields Simplified subscribe() exception handling: removed the broad except Exception clause that tried _follow_continue_as_new() on every error. Now only catches WorkflowUpdateRPCTimeoutOrCancelledError for CAN follow. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Polish pub/sub contrib: README, flush safety, init guard, factory method README.md: usage-oriented documentation covering workflow mixin, activity publishing, subscribing, continue-as-new, and cross-language protocol. flush() safety: items are now removed from the buffer only after the signal succeeds. Previously, buffer.clear() ran before the signal, losing items on failure. Added test_flush_retains_items_on_signal_failure. init_pubsub() guard: publish() and _pubsub_publish signal handler now check for initialization and raise a clear RuntimeError instead of a cryptic AttributeError. PubSubClient.for_workflow() factory: preferred constructor that takes a Client + workflow_id. Enables follow_continues in subscribe() without accessing private WorkflowHandle._client. The handle-based constructor remains for simple cases that don't need CAN following. activity_pubsub_client() now uses for_workflow() internally with proper keyword-only typed arguments instead of **kwargs: object. CAN test timing: replaced asyncio.sleep(2) with assert_eq_eventually polling for a different run_id, matching sdk-python test patterns. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add init guards to poll/query handlers and fix README CAN example _pubsub_poll and _pubsub_offset now call _check_initialized() for a clear RuntimeError instead of cryptic AttributeError when init_pubsub() is forgotten. README CAN example now includes the required imports (@dataclass, workflow) and @workflow.init decorator. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Guard validator against missing init_pubsub, fix PubSubState docstring The poll validator accesses _pubsub_draining, which would AttributeError if init_pubsub() was never called. Added _check_initialized() guard. Fixed PubSubState docstring: the field must be typed as PubSubState | None, not Any. The old docstring incorrectly implied Any-typed fields would work. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Guard get_pubsub_state/drain_pubsub, add replay and max_batch_size tests get_pubsub_state() and drain_pubsub() now call _check_initialized(). Previously drain_pubsub() could silently set _pubsub_draining on an uninitialized instance, which init_pubsub() would then reset to False. New tests: - test_max_batch_size: verifies auto-flush when buffer reaches limit, using max_cached_workflows=0 to also test replay safety - test_replay_safety: interleaved workflow/activity publish with max_cached_workflows=0, proving the mixin is determinism-safe Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add review comments and design addenda for pubsub redesign Review comments (#@AGENT: annotations) capture design questions on: - Topic offset model and information leakage (resolved: global offsets with BFF-layer containment, per NATS JetStream model) - Exactly-once publish delivery (resolved: publisher ID + sequence number dedup, per Kafka producer model) - Flush concurrency (resolved: asyncio.Lock with buffer swap) - CAN follow behavior, poll rate limiting, activity context detection, validator purpose, pyright errors, API ergonomics DESIGN-ADDENDUM-TOPICS.md: full exploration of per-topic vs global offsets with industry survey (Kafka, Redis, NATS, PubNub, Google Pub/Sub, RabbitMQ). Concludes global offsets are correct for workflow-scoped pub/sub; leakage contained at BFF trust boundary. DESIGN-ADDENDUM-DEDUP.md: exactly-once delivery via publisher ID + monotonic sequence number. Workflow dedup state is dict[str, int], bounded by publisher count. Buffer swap pattern with sequence reuse on failure. PubSubState carries publisher_sequences through CAN. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Implement pubsub redesign: dedup, base_offset, flush safety, API cleanup Types: - Remove offset from PubSubItem (global offset is now derived) - Add publisher_id + sequence to PublishInput for exactly-once dedup - Add base_offset + publisher_sequences to PubSubState for CAN - Use Field(default_factory=...) for Pydantic mutable defaults Mixin: - Add _pubsub_base_offset for future log truncation support - Add _pubsub_publisher_sequences for signal deduplication - Dedup in signal handler: reject if sequence <= last seen - Poll uses base_offset arithmetic for offset translation - Class-body type declarations for basedpyright compatibility - Validator docstring explaining drain/CAN interaction - Module docstring gives specific init_pubsub() guidance Client: - asyncio.Lock + buffer swap for flush concurrency safety - Publisher ID (uuid) + monotonic sequence for exactly-once delivery - Sequence advances on failure to prevent data loss when new items merge with retry batch (found via Codex review) - Remove follow_continues param — always follow CAN via describe() - Configurable poll_interval (default 0.1s) for rate limiting - Merge activity_pubsub_client() into for_workflow() with auto-detect - _follow_continue_as_new is async with describe() check Tests: - New test_dedup_rejects_duplicate_signal - Updated flush failure test for new sequence semantics - All activities use PubSubClient.for_workflow() - Remove PubSubItem.offset assertions - poll_interval=0 in test helper for speed Docs: - DESIGN-v2.md: consolidated design doc superseding original + addenda - README.md: updated API reference - DESIGN-ADDENDUM-DEDUP.md: corrected flush failure semantics Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * TLA+-verified dedup rewrite, TTL pruning, truncation, API improvements Rewrite the client-side dedup algorithm to match the formally verified TLA+ protocol: failed flushes keep a separate _pending batch and retry with the same sequence number. Only advance the confirmed sequence on success. TLC proves NoDuplicates and OrderPreserved for the correct algorithm, and finds duplicates in the old algorithm. Add TTL-based pruning of publisher dedup entries during continue-as-new (default 15 min). Add max_retry_duration (default 600s) to bound client retries — must be less than publisher_ttl for safety. Both constraints are formally verified in PubSubDedupTTL.tla. Add truncate_pubsub() for explicit log prefix truncation. Add publisher_last_seen timestamps for TTL tracking. Preserve legacy state without timestamps during upgrade. API changes: for_workflow→create, flush removed (use priority=True), poll_interval→poll_cooldown, publisher ID shortened to 16 hex chars. Includes TLA+ specs (correct, broken, inductive, multi-publisher TTL), PROOF.md with per-action preservation arguments, scope and limitations. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Remove TLA+ proof references from implementation code Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Update uv.lock Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add signal vs update dedup analysis; clarify ordering guarantees New analysis document evaluates whether publishing should use signals or updates, examining Temporal's native dedup (Update ID per-run, request_id for RPCs) vs the application-level (publisher_id, sequence) protocol. Conclusion: app-level dedup is permanent for signals but could be dropped for updates once temporal/temporal#6375 is fixed. Non-blocking flush keeps signals as the right choice for streaming. Updates DESIGN-v2.md section 6 to be precise about the two Temporal guarantees that signal ordering relies on: sequential send order and history-order handler invocation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add end-to-end dedup analysis: proper layering for three duplicate types Analyzes deduplication through the end-to-end principle lens. Three types of duplicates exist in the pipeline, each handled at the layer that introduces them: - Type A (duplicate LLM work): belongs at application layer — data escapes to consumers before the duplicate exists, so only the application can resolve it - Type B (duplicate signal batches): belongs in pub/sub workflow — encapsulates transport details and is the only layer that can detect them correctly - Type C (duplicate SSE delivery): belongs at BFF/browser layer Concludes the (publisher_id, sequence) protocol is correctly placed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Expand DESIGN-v2 with offset model rationale and BFF/SSE reconnection design Fill gaps identified during design review: - Document why per-topic offsets were rejected (trust model, cursor portability, unjustified complexity) inline rather than only in historical addendum - Expand BFF section with the four reconnection options considered and the decision to use SSE Last-Event-ID with BFF-assigned gapless IDs - Add poll efficiency characteristics (O(new items) common case) - Document BFF restart fallback (replay from turn start) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * pubsub: use base64 wire format with native bytes API Wire types (PublishEntry, _WireItem, PollResult, PubSubState) encode data as base64 strings for cross-language compatibility across all Temporal SDKs. User-facing types (PubSubItem) use native bytes. Conversion happens inside handlers: - Signal handler decodes base64 → bytes on ingest - Poll handler encodes bytes → base64 on response - Client publish() accepts bytes, encodes for signal - Client subscribe() decodes poll response, yields bytes This means Go/Java/.NET ports get cross-language compat for free since their JSON serializers encode byte[] as base64 by default. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * pubsub: remove poll timeout and update design doc Remove the bounded poll wait from PubSubMixin and trim trailing whitespace from types. Update DESIGN-v2.md with streaming plugin rationale (no fencing needed, UI handles repeat delivery). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add token-level streaming to OpenAI and ADK Temporal plugins Add opt-in streaming code path to both agent framework plugins. When enabled, the model activity calls the streaming LLM endpoint, publishes TEXT_DELTA/THINKING_DELTA/TOOL_CALL_START events via PubSubClient as a side channel, and returns the complete response for the workflow to process (unchanged interface). OpenAI Agents SDK: - ModelActivityParameters.enable_streaming flag - New invoke_model_activity_streaming method on ModelActivity - ModelResponse reconstructed from ResponseCompletedEvent - Uses @_auto_heartbeater for periodic heartbeats - Routing in _temporal_model_stub (rejects local activities) Google ADK: - TemporalModel(streaming=True) constructor parameter - New invoke_model_streaming activity using stream=True - Registered in GoogleAdkPlugin Both use batch_interval=0.1s for near-real-time token delivery. No pubsub module changes needed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * pubsub: replace PubSubState Pydantic model with plain dataclass The Pydantic BaseModel was introduced as a workaround for Any-typed fields losing type information during continue-as-new serialization. The actual fix is using concrete type annotations (PubSubState | None), which the default data converter handles correctly for dataclasses — no Pydantic dependency needed. This removes the pydantic import from the pubsub contrib module entirely, making it work out of the box with the default data converter. All 18 tests pass, including both continue-as-new tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * pubsub: add per-item offsets to PubSubItem and _WireItem Implements DESIGN-ADDENDUM-ITEM-OFFSET.md. The poll handler now annotates each item with its global offset (base_offset + position in log), enabling subscribers to track fine-grained consumption progress for truncation. This is needed for the voice-terminal agent where audio chunks must not be truncated until actually played, not merely received. - Add offset field to PubSubItem and _WireItem (default 0) - Poll handler computes offset from base_offset + log_offset + enumerate index - subscribe() passes wire_item.offset through to yielded PubSubItem - Tests: per-item offsets, offsets with topic filtering, offsets after truncation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * pubsub: add design addendum for per-item offsets Documents the motivation and design for adding offset fields to PubSubItem and _WireItem, enabling subscribers to track consumption at item granularity rather than batch boundaries. Driven by the voice-terminal agent's need to truncate only after audio playback, not just after receipt. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * pubsub: fix truncated offset crash and add recovery Three changes: 1. Poll handler: replace ValueError with ApplicationError(non_retryable=True) when requested offset has been truncated. This fails the UPDATE (client gets the error) without crashing the WORKFLOW TASK — avoids the poison pill during replay that caused permanent workflow failures. 2. Poll handler: treat from_offset=0 as "from the beginning of whatever exists" (i.e., from base_offset). This lets subscribers recover from truncation by resubscribing from 0 without knowing the current base. 3. PubSubClient.subscribe(): catch WorkflowUpdateFailedError with type TruncatedOffset and retry from offset 0, auto-recovering. New tests: - test_poll_truncated_offset_returns_application_error - test_poll_offset_zero_after_truncation - test_subscribe_recovers_from_truncation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add cross-workflow and cross-namespace pub/sub tests Verify that PubSubClient can subscribe to events from a different workflow (same namespace) and that Nexus operations can start pub/sub broker workflows in a separate namespace with cross-namespace subscription working end-to-end. No library changes needed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * pubsub: cap poll response at ~1MB and skip cooldown when more data ready Poll responses now estimate wire size (base64 data + topic) and stop adding items once the response exceeds 1MB. The new `more_ready` flag on PollResult tells the subscriber that more data is available, so it skips the poll_cooldown sleep and immediately re-polls. This avoids unnecessary latency during big reloads or catch-up scenarios while keeping individual update payloads within Temporal's recommended limits. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add compatibility contract to pub/sub design doc Codify the four wire evolution rules that have been followed implicitly through four addenda: additive-only fields with defaults, immutable handler names, forward-compatible PubSubState, and no application-level version negotiation. Includes a precedent table showing all past changes and reasoning for why version fields in payloads would cause silent data loss on signals. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Fix sequence reuse after retry timeout (TLA+-verified) After max_retry_duration expires, the client dropped the pending batch without advancing _sequence. The next batch reused the same sequence number, which could be silently deduplicated by the workflow if the timed-out signal was actually delivered — causing permanent data loss for those items. The fix advances _sequence to _pending_seq before clearing _pending, ensuring subsequent batches always get a fresh sequence number. TLA+ verification: - Added DropPendingBuggy/DropPendingFixed actions to PubSubDedup.tla - Added SequenceFreshness invariant: (pending=<<>>) => (confirmed_seq >= wf_last_seq) - BuggyDropSpec FAILS SequenceFreshness (confirmed_seq=0 < wf_last_seq=1) - FixedDropSpec PASSES all invariants (489 distinct states) - NoDuplicates passes for both — the bug causes data loss, not duplicates Python test: - test_retry_timeout_sequence_reuse_causes_data_loss demonstrates the end-to-end consequence: reused seq=1 is rejected, fresh seq=2 accepted Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Remove backward-compat code and historical design docs from pubsub This is a new release with no legacy to support. Changes: - _mixin.py: Remove ts-is-None fallback that retained publishers without timestamps. All publishers always have timestamps, so this was dead code. - _types.py: Clean up docstrings referencing addendum docs - DESIGN-v2.md: Remove backward-compat framing, addendum references, and historical file listing. Keep the actual evolution rules. - PROOF.md: "Legacy publisher_id" → "Empty publisher_id" - README.md: Reference DESIGN-v2.md instead of deleted addendum - Delete DESIGN.md and 4 DESIGN-ADDENDUM-*.md files (preserved in the top-level streaming-comparisons repo) - Delete stale TLA+ trace .bin files Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Update pubsub README: rename for_workflow → create, streamline docs Simplify the README to focus on essential API patterns. Rename for_workflow() to create() throughout, condense the topics section, remove the exactly-once and type-warning sections (these details belong in DESIGN-v2.md), and update the API reference table with current parameter signatures. Also fix whitespace alignment in DESIGN-v2.md diagram. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Fix continue-as-new example to show application state carried alongside pubsub state The CAN example only showed pubsub_state being passed through, which could mislead readers into thinking that's all that's needed. Updated to include a representative application field (items_processed) to make it clear that your own workflow state must also be carried across the CAN boundary. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add motivation and architectural context to pubsub README intro Replace the terse opening with two paragraphs that explain why this module exists (boilerplate around batching, offsets, topics, CAN), ground it in concrete use cases (order updates, AI streaming, pipeline progress), and call out the Temporal primitives it builds on (signals for publish, updates for subscribe, client-side batching for compaction). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Move bytes/base64 payload detail to Cross-Language Protocol section This is an implementation detail more relevant to cross-language interop than to the introductory overview. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Move analysis docs and TLA+ verification out of pubsub module Design analysis (end-to-end dedup, signal-vs-update) and TLA+ formal verification specs are reference material, not part of the distributed module. Moved to worktree-level docs/. DESIGN-v2.md updated with three additions: - Decision #12: signals for publish, updates for poll (rationale) - Dedup scope section: Type A/B/C taxonomy with end-to-end principle - Session ordering: flush_lock mechanism and Temporal docs citation Removed file-path references to verification/ specs from DESIGN-v2.md since they no longer live in the module. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Remove TLA+ references, document opaque-bytes and JSON converter rationale Remove references to PubSubDedup.tla from code comments, test docstrings, and the design doc — the TLA+ spec was moved out of the published module. Add design rationale for opaque bytes vs typed payloads (decoupling, layering, type hints). Document the JSON data converter requirement for cross-language interop in both the design doc and README. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Clean up pubsub tests: remove redundant cases, de-flake barriers Review pass over tests/contrib/pubsub/test_pubsub.py: Delete redundant tests: - test_poll_offset_zero_after_truncation and test_per_item_offsets_after_truncation were fully covered by test_truncate_pubsub and test_subscribe_recovers_from_truncation. - test_small_response_more_ready_false was the trivial branch of the big-response test; fold a single more_ready=False assertion into test_poll_more_ready_when_response_exceeds_size_limit instead of standing up a separate workflow. - test_subscribe_from_offset merged into test_per_item_offsets, renamed to test_subscribe_from_offset_and_per_item_offsets. - test_retry_timeout_sequence_reuse_causes_data_loss was effectively a rename of test_dedup_rejects_duplicate_signal and asserted the BUG (silent dedup) rather than the FIX, so it would fail if the behavior became stricter. Rewrite white-box tests to be behavioral: - test_flush_keeps_pending_on_signal_failure and test_max_retry_duration_expiry asserted on private _buffer, _pending, _pending_seq, _sequence fields — any refactor of the retry state machine broke them even with preserved behavior. Replaced with test_flush_retry_preserves_items_after_failures and test_flush_raises_after_max_retry_duration, which use patch.object(handle, "signal", ...) to inject delivery failures against a real workflow and assert observable outcomes. - test_continue_as_new_any_typed_fails used an absence-timeout assertion (len == 0 within 3s) that would flake on slow CI and pass for the wrong reason. Switched to assert_task_fail_eventually on the new run, which asserts the specific failure mode. Remove sleep-as-barrier anti-pattern: Drop ~10 asyncio.sleep(0.3-0.5) barriers after __pubsub_publish / truncate signals. A subsequent query or update naturally waits for prior signals to be processed by the worker, so the sleeps were both redundant and brittle. Replace the while True: sleep(0.1) describe- poll in the cross-namespace test with assert_eq_eventually. Fix test_priority_flush to actually test priority: The 0.5s sleep at the end of the publish_with_priority activity made the test pass regardless — __aexit__ would always flush before the 10s external collect timeout elapsed. Extended the activity hold to ~10s and tightened the collect timeout to 5s so that a priority- wakeup regression surfaces as a missing item instead of a pass via exit-time flush. The hold is long enough that worker teardown outraces activity completion, so tests still finish in sub-second wall time. Result: 30 → 25 tests, 1848 → ~1590 lines, all passing in 5s. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Replace remaining brittle sleeps in pubsub tests and type handle helpers Follow-up to the prior cleanup. The two remaining timing-based sleeps are replaced with explicit coordination, and helper functions taking a handle now carry proper type annotations. test_iterator_cancellation: publish a seed item and wait for an asyncio.Event set on first yield (bounded by asyncio.timeout), then cancel. The iterator is provably active at cancel time, so the test no longer races against an arbitrary sleep. test_flush_raises_after_max_retry_duration: inject a controllable clock via patch of temporalio.contrib.pubsub._client.time.monotonic. Advance the clock between the failing flush and the retry check so the timeout fires deterministically without depending on wall-clock speed or clock resolution. _is_different_run and collect_items now annotate their handle parameters as WorkflowHandle[Any, Any] (WorkflowHandle is generic over workflow class and return type; the helpers are polymorphic). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Clarify that pubsub truncation is workflow-side only No external truncate API exists — truncation is a workflow-internal decision (retention policy, consumer progress), so external callers must define their own signal or update that invokes truncate_pubsub. - Expand the TruncateSignalWorkflow docstring to call out that it's test scaffolding and to point to the integration pattern. - Note the workflow-side-only nature in the README table row. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Switch test truncate from signal to update for explicit completion Signals are fire-and-forget. The truncate tests relied on "a subsequent update acts as a barrier for prior signals" — true but implicit. An update handler returns only after it completes, making the contract explicit and removing a class of reader confusion. Rename TruncateSignalWorkflow → TruncateWorkflow, change truncate from @workflow.signal to @workflow.update, and switch the three call sites from handle.signal("truncate", ...) to handle.execute_update(...). Drop stale barrier comments now that completion is intrinsic. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Delete test_mixin_coexistence Every other workflow in this file mixes PubSubMixin with a user-defined close signal (and custom init/run), so coexistence is proven implicitly by the full suite. The only unique claim here was that an app query coexists with the mixin's __pubsub_offset query — a vanishingly small risk given Temporal SDK registers handlers by explicit name and there is no shared registry. If a future conflict did arise, dozens of tests would fail, not just this one. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Force interleaving in test_concurrent_subscribers The prior version started two subscribe tasks via asyncio.gather and asserted each received its expected items. That passes even if subscriber A fully drains its items before subscriber B's first poll goes out — the test never observed interleaving, only topic filtering under parallel calls. Reshape the test as a ping-pong: publish A-0, wait (via asyncio.Event) for A to receive it; publish B-0, wait for B to receive it. At that point both subscribers are mid-subscription and polling for item 2, so both __pubsub_poll updates are in flight simultaneously. Repeat for item 2. A sequential execution cannot satisfy the publish order because B's first item isn't published until after A has received its first. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Strengthen CAN test, widen TTL margins, document Any-field pitfall Three related test-quality changes after a Codex challenge pass. Delete test_continue_as_new_any_typed_fails (and its workflow/input classes). It exercised the default Temporal data converter behavior (Any-typed dataclass field deserializes as dict) rather than a pubsub concern, and relied on a weak assert_task_fail_eventually that would pass for any task failure. Replace with a doc note on init_pubsub() warning about Any-typed pubsub_state fields, keeping the guidance where a user looks when wiring up CAN. Strengthen test_continue_as_new_properly_typed. Previously only verified log contents and offsets survived CAN. Now also verifies publisher dedup state survives: seeds publisher_id="pub" sequence=1, CANs, and asserts on publisher_sequences directly via a new query handler. Three assertions — after CAN, after a duplicate publish, and after a fresh-sequence publish — bracket the dedup contract without inferring it from log length. Inline the previously-shared _run_can_test helper since only one caller remained. Widen TTL test margins from (0.3s sleep, 0.1s TTL) to (1.0s sleep, 0.5s TTL). The tighter margin left ~100ms headroom on each side for pub-old to prune and pub-new to survive — borderline on slow CI where worker scheduling between publish and query can itself exceed 100ms. The new margins tolerate multi-hundred-ms scheduling jitter in both directions. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Hoist inline imports to module level in pubsub tests Four sets of function-local imports had no technical justification — no circular imports, no optional dependencies, no heavy-module deferral benefit for a test file. They were editorial drift from incremental additions. Move them to the top of the file: - WorkflowUpdateFailedError (was local in truncate-error test) - unittest.mock.patch (was duplicated in two retry tests) - temporalio.api.nexus.v1, temporalio.api.operatorservice.v1 (was local in create_cross_namespace_endpoint helper) - google.protobuf.duration_pb2, temporalio.api.workflowservice.v1 (was local in cross-namespace Nexus test) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Fix __aexit__ drain race and strengthen pubsub tests PubSubClient.__aexit__ could silently drop items on context-manager exit. A single _flush() processes either pending OR buffer (if/elif), so when the flusher task was cancelled mid-signal (pending set) while the producer had added more items (buffer non-empty), the final flush handled pending and left buffered items orphaned. Real impact: agent streaming that publishes a last token and immediately exits the context manager could silently drop trailing tokens depending on timing. Fix by draining both in a loop until pending and buffer are empty. This bug was latent in test_max_batch_size because that test's activity loop had no awaits — the flusher never ran during the loop, so pending never accumulated concurrently with buffer. Strengthening the test exposed it. Test changes: - test_max_batch_size: add an await asyncio.sleep(0) between publishes (matches real agent workloads that yield on every LLM token) and assert via publisher_sequences query that max_batch_size actually triggers >=2 mid-loop flushes, not a single exit flush. Without this the test passed even if max_batch_size were ignored entirely. - test_replay_safety: assert the full ordered 7-item sequence and offsets rather than just endpoints. Endpoint-only checks would miss mid-stream replay corruption (reordering, duplication, drops). - test_poll_truncated_offset_returns_application_error: add a comment explaining why pytest.raises(WorkflowUpdateFailedError) suffices to prove the handler raised ApplicationError — Temporal's update protocol completes with this error only for ApplicationError; other exceptions fail the workflow task instead, causing execute_update to hang rather than raise. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Style + docstring cleanups in pubsub contrib module Address a small set of stylistic issues flagged during review. Fix stale docstring in PubSubState's PollResult: the field is more_ready, not has_more. Readers following the docstring would have looked for a non-existent attribute. Add generic parameters to the WorkflowHandle annotation in PubSubClient.__init__ (WorkflowHandle[Any, Any]). Matches the treatment applied earlier in the tests; PubSubClient is polymorphic over workflow types. Rename the signal/update handler parameters in PubSubMixin from `input` (which shadowed the builtin) to `payload`. The type names (PublishInput, PollInput) already convey "input," so the parameter name was redundant. Drop the now-unnecessary `# noqa: A002` on the validator. Clarify the PubSubClient.__init__ docstring about continue-as-new: previously said "prefer create() when you need CAN following," now explicitly notes that the direct-handle form does not follow CAN and will stop yielding once the original run ends. Run `ruff check --select I --fix` and `ruff format` to bring the module and tests into line with project lint. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Apply pubsub review feedback: init pattern, force_flush, from_activity Four changes responding to review comments on sdk-python PR #1423: C1 (init_pubsub pattern). Docstrings, README, and DESIGN-v2.md now advise a single call site from @workflow.init with prior_state threaded through the workflow input, instead of the previous "call in __init__ for fresh, in run() for CAN" split. The signature is unchanged (prior_state is still optional and defaults to None) — the change is to the blessed pattern. C2 (rename priority -> force_flush). PubSubClient.publish() renames the kwarg to force_flush. The kwarg never implied ordering — it just forces an immediate flush of the buffer — so the new name is accurate. Internal test helpers, comments, and docs updated. C3 (split create / from_activity). PubSubClient.create() now requires explicit (client, workflow_id); the silent auto-detect path is gone. A new PubSubClient.from_activity() classmethod pulls both from the current activity context. This removes the failure mode where omitting args outside an activity produced a confusing runtime error. Activity-side test helpers migrated to from_activity(). C5 (truncation rationale). DESIGN-v2.md section 10 no longer describes truncation as "deferred to a future iteration" — the feature is implemented, and voice streaming workflows have shown it's needed in practice. Because CAN is the standard pattern for long-running workflows, workflow history size is not the binding constraint; in-memory log growth between CAN boundaries is. The section now says so. Tests pass (23/23, pytest tests/contrib/pubsub/). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Migrate pubsub payloads from opaque bytes to Temporal Payload Addresses PR #1423 review comment C4: expose Temporal Payload at the PubSubItem / PublishEntry boundary so subscribers can decode via subscribe(result_type=T), matching execute_update(result_type=...). API changes: - PubSubMixin.publish(topic, value): value is any payload-convertible object or a pre-built Payload (zero-copy). - PubSubClient.publish(topic, value, force_flush=False): same shape; defers conversion to flush time, batching cost amortized. - PubSubClient.subscribe(topics, *, result_type=None, ...): yields PubSubItem whose data is a Payload by default, or the decoded result_type when one is supplied. - PubSubItem.data is now Any (Payload | decoded value). Wire format and codec decisions: - PublishEntry.data / _WireItem.data are base64(Payload.SerializeToString()). Nested Payload inside a dataclass fails with "Object of type Payload is not JSON serializable" because the default JSON converter only special-cases top-level Payloads on signal/update args. The base64-of-serialized- proto wire format keeps the JSON envelope while preserving Payload.metadata end-to-end. Round-trip is guarded by the new test_payload_roundtrip_prototype.py tests. - Per-item encoding uses the SYNC payload converter (workflow.payload_ converter() on the mixin, client.data_converter.payload_converter on the client). The codec chain (encryption, PII-redaction, compression) is NOT invoked per item — Temporal already runs the user's DataConverter.encode on the __pubsub_publish signal envelope and the __pubsub_poll update response, so running the codec per item as well would double-encrypt/compress (and compressing already-encrypted bytes defeats the codec). The per-item Payload still carries encoding metadata ("encoding: json/plain", "messageType: ...") which is what the subscribe(result_type=T) decode path actually needs. - Workflow-side and client-side are now codec-symmetric; the previously-feared asymmetry does not exist. Tests: - Existing pubsub tests updated: collect_items takes the Client (needed to reach the payload converter), subscribe calls pass result_type=bytes where they compare against raw bytes. - Added test_structured_type_round_trip: workflow publishes dataclass values, subscriber decodes via result_type= — exercises the primary value-add of the migration. - Added test_payload_roundtrip_prototype.py as a regression guard for the wire-format choice: one test asserts nested Payload in a dataclass fails, another asserts base64(proto(Payload)) round-trips. All 26 pubsub tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Bump sdk-core submodule to match temporalio-client 0.2.0 The bridge's Cargo.toml requires temporalio-client = "0.2.0" (set in 68561ee7), but commit c4ec6e70 ("Update pubsub README: rename for_workflow → create") inadvertently reverted the sdk-core submodule pointer to f188eb53, a commit that still had the client crate at 0.1.0. This left uv/maturin unable to build the Rust bridge on this branch: Cargo resolves the requirement against the vendored crate and rejects 0.1.0 for the "^0.2.0" spec. Restore the pointer to b544f95d — the commit origin/main uses with the same Cargo.toml, so the bridge and its sdk-core workspace are consistent again. No Python code changes; purely a submodule pointer fix. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Port Notion narrative into DESIGN-v2.md and add sync-policy note Reconciles DESIGN-v2.md with the "Streaming API Design Considerations" Notion page so both track the authoritative Python implementation. The Notion page had richer narrative (durable-streams framing, pull-vs-push reasoning, one-way-door callouts, offset-options comparison table, alternatives-considered list for wire evolution, end-to-end-principle writeup). This change brings that into the in-repo doc. Changes: - New top-of-doc note establishing that the Python code in sdk-python/temporalio/contrib/pubsub/ is authoritative; both DESIGN-v2.md and the Notion page track it. - New Decision #1 "Durable streams" explaining the durable-by-default choice vs ephemeral streams (simpler model, reliability, correctness). Existing decisions renumbered. - Decision #4 (Global offsets) gains the 6-option ecosystem comparison table and a one-way-door callout flagging the wire-protocol commitment. - Decision #9 (Subscription is poll-based) expanded with the pull-vs-push trade-off (back-pressure, subscriber-controlled read position, data-at-rest) and explicit "both layers are exposed" framing. - New "Design Principles" section with the Saltzer/Reed/Clark end-to-end-dedup framing and the "retries remain in the log" contract, with a one-way-door callout on the append-only-of-attempts contract. - Compatibility section gains a full alternatives-considered list (version field, versioned handler names, protocol negotiation, SDK version embedding, accepting silent incompatibility) and a two-part one-way-door callout on immutable handler names + no version field. - New "Ecosystem analogs" section: a compact one-paragraph summary (NATS JetStream for offsets, Kafka for idempotent producers, Redis for blocking pull, Workflow SDK as the durable-execution peer) with a pointer to the Notion page for the full comparison tables. The Notion page itself is still behind on the Payload migration (Decision #5 "Opaque message payloads" needs rewriting, API signatures still show priority= and data: bytes). That update is deferred pending resolution of an open reviewer discussion on activity-retry/dedup (discussion 34a8fc56-7738-808c-b29b-001c5066e9d2) whose substance overlaps with the Decision #5 rewrite. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Apply pubsub API renames to ADK/OpenAI streaming plugins Follow-ups missed when the contrib/pubsub refactor renamed PubSubClient.create(batch_interval=...) → PubSubClient.from_activity(...) and publish(..., priority=True) → publish(..., force_flush=True). Both plugin activities still called the old signatures and failed at runtime with TypeError on the first publish. Also update the streaming tests to pass result_type=bytes to pubsub.subscribe(); after the bytes→Payload migration, item.data is a raw Payload unless a result_type is specified, so json.loads(item.data) was TypeErroring. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Replace PubSubMixin with PubSub dynamic handler registration Users no longer inherit a mixin class. Instead, they construct `PubSub(prior_state=...)` from `@workflow.init`; the constructor registers the `__pubsub_publish` signal, `__pubsub_poll` update (with validator), and `__pubsub_offset` query handlers dynamically via `workflow.set_signal_handler`, `set_update_handler`, and `set_query_handler`. The pub/sub wire contract (handler names, payload shapes, offset semantics) is unchanged. This matches how other-language SDKs will express the same pattern — imperative handler registration from inside the workflow body rather than inheritance — and lets the workflow retain its normal single base class. The constructor raises RuntimeError in two misuse cases: 1. Called twice on the same workflow — detected via `workflow.get_signal_handler("__pubsub_publish") is not None`. 2. Called from anywhere other than `__init__` — detected by inspecting the immediate caller's frame. History-length based detection was tried first but has two false positives (pre-start signals inflate first-task history length beyond 3, and cache eviction legitimately re-runs `__init__` with a higher current history length), so frame inspection is the correct mechanism. Method renames on the broker (no longer needed as `_pubsub_*` prefixes now that they live on a dedicated object): init_pubsub(prior_state=None) -> PubSub(prior_state=None) self.publish(topic, value) -> self.pubsub.publish(topic, value) self.get_pubsub_state(...) -> self.pubsub.get_state(...) self.drain_pubsub() -> self.pubsub.drain() self.truncate_pubsub(up_to) -> self.pubsub.truncate(up_to) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Document per-poll fan-out and list future-work items in DESIGN-v2 Fan-out: add a subsection under Design Decision 9 explaining that each __pubsub_poll is an independent update RPC with no shared delivery, so items destined for N subscribers cross the wire N times. Spells out the three concurrent-subscriber shapes (same topic/offset, different offsets, disjoint topics) and the rationale for the per-poll model. Future Work: new top-level section with three items — shared workflow fan-out (optimization of the above), workflow-defined filters and transforms, and a safe workflow-side subscribe() API. Each entry names the relevant design questions left open rather than prescribing an implementation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * openai_agents: publish raw stream events, drop normalization layer The streaming activity previously maintained a normalization layer: ~50 lines of if/elif mapping OpenAI event types (response.output_text.delta, response.reasoning_summary_*, etc.) to custom app event names (TEXT_DELTA, THINKING_*, LLM_CALL_START/COMPLETE), plus text-delta accumulation into a synthesized TEXT_COMPLETE, plus a function-call filter on output_item.added. That normalization made sense when a shared UI consumed events from multiple providers, but each provider-plugin should expose its native event stream and let consumers render idiomatically. The activity now publishes each yielded OpenAI event as its Pydantic JSON and returns the ModelResponse built from the final ResponseCompletedEvent — three lines inside the stream loop. Also factored out three helpers shared between the streaming and non-streaming activities (both paths were duplicating them verbatim): _build_tools_and_handoffs — tool/handoff reconstruction from dataclass form _build_tool — single tool-by-type dispatch _raise_for_openai_status — APIStatusError -> retry-posture translation The local-activity guard in _temporal_model_stub.py gains a comment explaining the two reasons streaming can't use local activities (no heartbeat channel, no pubsub signal context from the activity). Tests: replaced the normalized-event assertions with raw-event assertions; dropped the rich-dispatcher coverage test since there's no dispatcher left to cover. 115 passing / 16 skipped. Downstream impact: consumers that depend on the normalized event names (temporal-streaming-agents-samples frontend, shared-frontend hooks) need to switch on raw OpenAI event types instead. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Fix lint findings from CI (ruff format, pyright, pydocstyle) - ruff format: apply formatter to auto-generated style changes. - pyright: replace dict literals for Response.text/usage with the pydantic model types (ResponseTextConfig, ResponseUsage, InputTokensDetails, OutputTokensDetails). - basedpyright: suppress reportUnusedFunction on the private _encode_payload/_decode_payload helpers in pubsub._types (they are used from sibling modules, which basedpyright does not credit) and reportUnusedParameter on the CAN workflow run() input arg. - pydocstyle: add docstrings to PubSubClient.__aenter__/__aexit__. * Fix Python 3.10 lint/type errors in pubsub tests - typing.Self requires 3.11; import from typing_extensions like the rest of the SDK does. - asyncio.timeout requires 3.11; fall back to async_timeout.timeout on 3.10 (async_timeout is an aiohttp transitive dep there). * pubsub tests: also suppress reportUnreachable on the 3.11 import branch On Python 3.10 CI, the `if sys.version_info >= (3, 11):` branch is what basedpyright flags as unreachable. The ignore needs to be on both branches so it is silent under every Python version in the matrix. * pubsub tests: attach reportUnreachable ignore to the import-stmt line The previous attempt placed the pragma on the indented `timeout as _async_timeout` line, but basedpyright reports reportUnreachable against the outer `from ... import (` line (the block-opening statement), so the pragma had no effect. Move the ignore up to the import line and combine with reportMissingImports there. Locally verified clean on Python 3.10, 3.11, and 3.14 via `uv run --python <ver> poe lint`. * pubsub: fix dynamic-signal-vs-update race and pydoctor cross-ref Under parallel test load we saw test_poll_truncated_offset_returns_ application_error fail with "Cannot truncate to offset 3: only 0 items exist" — traced to an activation-ordering race. When a workflow receives an activation containing [InitializeWorkflow, Signal(__pubsub_publish), Update(truncate)] in one batch, _WorkflowInstanceImpl.activate groups signals and updates into job_sets[1] and init into job_sets[2]. During _apply of job_sets[1], __pubsub_publish (a dynamic signal registered inside PubSub.__init__) has no handler yet, so it is buffered; truncate is class-level @workflow.update, found in self._updates at activation time, and its task is created immediately and queued in self._ready. _run_once then lazy-instantiates the workflow, __init__ runs set_signal_handler which dispatches the buffered signal via a new task appended to self._ready after the update task. FIFO event-loop dispatch runs truncate against an empty log first; the handler raised ValueError which poisoned the whole workflow task. Fixes: 1. temporalio/contrib/pubsub/_broker.py — PubSub.truncate now raises ApplicationError(type="TruncateOutOfRange", non_retryable=True) instead of ValueError when the offset is past the end of the log. Matches what _on_poll already does for TruncatedOffset and lets update handlers surface the error cleanly without failing the task. 2. tests/contrib/pubsub/test_pubsub.py — TruncateWorkflow seeds the log from @workflow.init with a prepub_count arg. Three tests (test_poll_truncated_offset_returns_application_error, test_subscribe_recovers_from_truncation, test_truncate_pubsub) now pass prepub_count=5 to start_workflow rather than sending a client-side __pubsub_publish signal, sidestepping the dynamic- signal-before-init race entirely. 3. Tighten the poll-after-truncation assertion to check cause.type == "TruncatedOffset", and add test_truncate_past_end_raises_application_error to cover the new TruncateOutOfRange branch of PubSub.truncate. 4. temporalio/contrib/pubsub/_client.py — pydoctor couldn't resolve :class:\`~temporalio.api.common.v1.Payload\` against the generated proto module and was failing the docs build; switched that one cross-ref to plain backticks. Verified locally on Python 3.10 and 3.14: full lint clean, docs build clean, and pubsub tests pass 27/27 across three parallel runs. * pubsub: document sync-handler/publish race with asyncio.sleep(0) recipe Add a visible "Gotcha" section to the contrib/pubsub README covering the case where a custom synchronous update or signal handler reads PubSub state and races a same-activation __pubsub_publish signal. The race is inherent to registering __pubsub_publish dynamically from @workflow.init: on the first activation the signal is buffered until __init__ runs, and any class-level sync handler scheduled in the same activation observes pre-publish state. Framing in the README distinguishes the two cases where users do or don't need to care: - Independent producer/consumer shape (the common PubSub use): the handler already has to tolerate out-of-order arrival for reasons unrelated to this race, so no recipe is required. - Sequential same-client publish->update ordering: use the recipe. Recipe is a one-line "await asyncio.sleep(0)" at the top of the handler, which is a pure asyncio yield with no Temporal timer, no history events, and no server round trip. Explicit call-out that workflow.sleep(0) is not a substitute. Also extend SIGNAL-UPDATE-RACE.md with a "Zooming out" section that explains why the application layer typically subsumes this race, and update the Recommendation to treat the SDK-level dispatch fix (option 4) as optional follow-up rather than a must-fix. The PubSub class docstring gets a short note pointing at the README. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * pubsub tests: switch TruncateWorkflow.truncate to the async recipe The existing TruncateWorkflow sidestepped the dynamic-signal-vs-update race by seeding the log from @workflow.init via prepub_count. That kept CI green but meant the test workflow did not exercise the pattern the README now asks users to follow (await asyncio.sleep(0) at the top of sync-shaped handlers reading PubSub state). Make truncate async with the recipe so the test workflow is a living example of the documented pattern, and simplify the docstring now that the race is closed in the handler rather than avoided via init-time seeding. prepub_count is kept as a convenience for the error-path tests that just need deterministic log content. All four truncate tests still pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * pubsub: add public async flush() barrier flush() is an explicit synchronization point: it returns once items buffered at call time have been signaled to the workflow and acknowledged by the server, and returns immediately when the buffer is empty. It complements the two existing flush mechanisms (force_flush=True on publish, context-manager exit) for the case where the caller needs proof that prior publications landed but the moment doesn't naturally correspond to a specific event. Implementation reuses _flush() under the existing flush_lock, looped while either _pending or _buffer is non-empty so the pending-vs-buffer staging in _flush() can drain in one call. DESIGN-v2 updates the API table and replaces the "no public flush()" paragraph with a section framing the three complementary flush mechanisms and when each is appropriate. Test test_explicit_flush_barrier exercises the documented contract: empty-buffer no-op, flush as a barrier with batch_interval=60s so a regression hangs rather than passing on the timer, and idempotent second flush. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * pubsub: document migration to server-side request_id dedup Workflow-side (publisher_id, sequence) dedup is a polyfill for two gaps in Temporal's built-in signal request_id dedup: 1. The Python SDK does not expose request_id on WorkflowHandle.signal(), so cross-_flush() retries always allocate a fresh request_id and bypass server-side dedup even within a single run. 2. pendingSignalRequestedIDs is per-run mutable state and is not copied across continue-as-new, so retries that straddle CAN are accepted as fresh signals (verified empirically on dev server and Temporal Cloud — see experiments/can-signal-dup/README.md). When (1) and (2) are both fixed, the workflow-side check becomes redundant. The dedup keys at both layers already align on (publisher_id, sequence), so the migration is mechanical — pin request_id=f"{publisher_id}:{seq}" in _flush(), drop the dedup branch in _on_publish, retire publisher_sequences / publisher_last_seen / publisher_ttl from PubSubState in a follow-up wire-format pass. Adds a "Future Work" subsection in DESIGN-v2 capturing the prerequisites, the diff (what changes / stays / goes), and the rollout sequencing. Adds short pointer comments at the two code sites that would change so a future maintainer encounters the design note at the right place. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * pubsub: accept a single string for subscribe(topics=...) Convenience for single-topic subscribers — the common case. The previous signature required wrapping a single topic in a list, which is noisy at every call site. Internally we normalize to a list before issuing the poll update; behavior for None / empty list / multi-topic list is unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * pubsub: prefix internal handler names with __temporal_ Rename the wire-level handler identifiers to follow the existing __temporal_ convention (__temporal_workflow_metadata, __temporal_activity_definition, etc.) so they are clearly recognizable as Temporal-internal and won't collide with user-defined handlers: __pubsub_publish -> __temporal_pubsub_publish __pubsub_poll -> __temporal_pubsub_poll __pubsub_offset -> __temporal_pubsub_offset Updates the broker/client implementation, tests, and design docs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * pubsub: clean up three lint suppressions flagged by codex review - _broker.py:_validate_poll — rename `payload` to `_payload`, drop `del payload` and `# noqa: ARG002`. The noqa was dead code: CI runs only `ruff check --select I` (import sort), so ARG rules never fire. Underscore prefix silences basedpyright's reportUnusedParameter cleanly. - test_pubsub.py:ContinueAsNewTypedWorkflow.run — rename `input` to `_input` with `del _input`, drop the `type:ignore`. Now matches the existing `_prepub_count` pattern at TruncateWorkflow.run for the same @workflow.init/@workflow.run signature constraint. - test_pubsub.py async_timeout import — declare `async-timeout` as an explicit dev dep gated on `python_version < '3.11'`, drop the `reportMissingImports` half of the test pragma. Closes the audit gap of relying on aiohttp's transitive on 3.10. Kept the `reportUnreachable` ignores — still needed because basedpyright resolves `sys.version_info` against its own runtime, not the matrix Python. Verified `poe lint` clean on Python 3.10, 3.11, 3.14. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * pubsub: add PubSub.continue_as_new helper Packages the drain + wait-for-handlers + workflow.continue_as_new recipe behind `await self.pubsub.continue_as_new(build_args)`. The builder is typed `Callable[[PubSubState], Sequence[Any]]` and is invoked after drain stabilizes with the post-drain state as its single argument, so the snapshot ordering is structural rather than documented-by-prose. The helper deliberately does not mirror workflow.continue_as_new's 12-param signature; workflows that need to override task_queue, retry_policy, etc. fall back to the explicit drain/wait/CAN recipe. Reverses the 2026-04-24 rejection in DESIGN-v2 Future Work: the state-bound-builder shape resolves the "second footgun" objection to the zero-arg-lambda form (caller could still write self.pubsub.get_state() inside the lambda; with a state parameter the helper controls the read). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * pubsub: switch timing parameters to timedelta Brian noted on PR #1423 that timedelta is the convention in this codebase for duration parameters. Migrating the public API surface: - ``PubSubClient`` constructor / ``create`` / ``from_activity``: ``batch_interval`` and ``max_retry_duration`` now take ``timedelta`` (previously ``float`` seconds). - ``PubSubClient.subscribe``: ``poll_cooldown`` now takes ``timedelta`` (previously ``float`` seconds). - ``PubSub.get_state`` and ``PubSub.continue_as_new``: ``publisher_ttl`` now takes ``timedelta`` (previously ``float`` seconds). Internals continue to use ``.total_seconds()`` where needed (asyncio timeouts, comparisons against ``workflow.time()``). The TTL test workflow query keeps its arg as ``float`` seconds and constructs the ``timedelta`` inside the handler — query payloads use the default JSON converter, which does not serialize ``timedelta``. Docs and examples in DESIGN-v2.md and README.md updated to use ``timedelta(...)`` literals. This is contrib/preview, so no float-compat shim — callers that previously passed numeric seconds need to migrate. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * openai-agents: hook stream_response, opt in via Runner.run_streamed Previously, streaming was a plugin-level flag (``enable_streaming``) that silently rerouted ``Runner.run`` to a streaming activity which synthesized a ``ModelResponse`` from the terminal ``ResponseCompletedEvent`` and dropped intermediate stream events. Reviewers flagged two problems with that shape: 1. ``Runner.run`` callers did not opt into streaming behavior — flipping the plugin flag elsewhere changed what a workflow saw at runtime. That is the kind of spooky-action-at-a-distance that produces non-determinism if the flag is changed mid-history. 2. ``Model.stream_response`` is the natural hookpoint for streaming in the agents SDK. ``Runner.run_streamed`` already exposes the correct user-facing API — we just had not implemented it. This commit reworks both: - ``_TemporalModelStub.stream_response`` now executes the streaming activity and yields each event from its return list (an async generator). ``get_response`` keeps the non-streaming path; the ``enable_streaming`` branch is gone. - ``invoke_model_activity_streaming`` returns ``list[TResponseStreamEvent]`` rather than a synthesized ``ModelResponse``, and publishes the raw events to the configured pub/sub topic via ``pubsub.publish(topic, event)`` (relying on the payload converter rather than manual JSON encoding). - ``TemporalOpenAIRunner.run_streamed`` performs the same agent conversion + sandbox checks as ``run`` and forwards to the underlying ``AgentRunner.run_streamed``. Its ``run_loop_task`` is wrapped to mirror the ``AgentsException -> AgentsWorkflowError`` rewrap done in ``run`` (the plugin registers ``AgentsWorkflowError`` in ``workflow_failure_exception_types``; without the wrap, durable failures would surface as retrying workflow-task errors instead of terminal workflow failures). - The shared ``ActivityModelInput``-building logic is factored into ``_TemporalModelStub._build_activity_input`` so the two methods do not duplicate it. New plugin config on ``ModelActivityParameters``: - ``streaming_event_topic: str | None = "events"`` — set to ``None`` to skip pub/sub entirely (no ``PubSubClient`` constructed; workflows that consume only via ``stream_events()`` then need no broker). - ``streaming_event_batch_interval: timedelta = timedelta(milliseconds=100)`` — interval for the pub/sub publisher's flusher. The streaming activity keeps the ``@_auto_heartbeater`` decorator so long initial-token latency or pauses between chunks do not trip ``heartbeat_timeout``. Explicit per-event ``activity.heartbeat()`` is removed as redundant. Status-code retry block in ``_raise_for_openai_status`` now carries a short comment explaining 408/409/429 (Brian's review note). Tests (``test_openai_streaming.py``) switch to ``Runner.run_streamed`` and verify that both the workflow-side iteration (via ``stream_events()`` exposed through a query) and the pub/sub side channel observe the same native OpenAI events. A separate test covers ``streaming_event_topic=None``. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * google-adk: honor stream=True, publish raw LlmResponse chunks Mirrors the OpenAI-side rework: streaming opt-in moves from a constructor flag to the SDK-native API, and the streaming activity publishes raw response objects rather than synthesizing custom event types. - ``TemporalModel.generate_content_async(stream=True)`` is now honored. Users opt into streaming via the ADK-native API path — e.g. ``RunConfig(streaming_mode=StreamingMode.SSE)`` on ``runner.run_async`` — rather than a plugin-level ``streaming`` flag (which is removed). - ``invoke_model_streaming`` publishes each ``LlmResponse`` directly via ``pubsub.publish(topic, response)``. The previously-synthesized ``LLM_CALL_START`` / ``TEXT_DELTA`` / ``TOOL_CALL_START`` / ``TEXT_COMPLETE`` / ``LLM_CALL_COMPLETE`` events are gone — those semantic distinctions are speculative until the lifecycle hook design is settled (deferred to a follow-up). Raw publishes also remove the redundant double ``force_flush`` and the unused ``logger`` import that the review flagged. New constructor config on ``TemporalModel``: - ``streaming_event_topic: str | None = "events"`` — set to ``None`` to skip pub/sub entirely. - ``streaming_event_batch_interval: timedelta = timedelta(milliseconds=100)`` — interval for the publisher's flusher. ``_plugin.py`` annotates the activities list as ``list[Callable[..., Any]]`` because ``invoke_model`` and ``invoke_model_streaming`` now have different signatures (streaming takes the topic and batch interval), so type inference on the bare list literal would not satisfy ``SimplePlugin``'s parameter type. Tests (``test_adk_streaming.py``) opt into streaming via ``RunConfig(streaming_mode=StreamingMode.SSE)`` and subscribe to the pub/sub topic with ``result_type=LlmResponse``, asserting the raw chunks round-trip intact. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * pubsub: README CAN example uses generic AppState carrier Replace the ad-hoc items_processed counter with a nested AppState dataclass so the snapshot pattern reads symmetrically: app_state beside pubsub_state, each round-tripped the same way. Also rename the build_args lambda parameter to pubsub_state to disambiguate which snapshot it carries. Co…
1 parent 370608c commit bb433be

11 files changed

Lines changed: 4219 additions & 2 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ dev = [
8686
"opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2",
8787
"opentelemetry-semantic-conventions>=0.40b0,<1",
8888
"opentelemetry-sdk-extension-aws>=2.0.0,<3",
89+
"async-timeout>=4.0,<6; python_version < '3.11'",
8990
]
9091

9192
[tool.poe.tasks]
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Temporal Workflow Streams
2+
3+
> ⚠️ **This package is currently at an experimental release stage.** ⚠️
4+
5+
**Workflow Streams** is a Temporal Python SDK contrib library that gives a
6+
Workflow a durable, offset-addressed event channel for keeping outside
7+
observers updated on the progress of the Workflow and its Activities.
8+
Typical uses include driving a UI for a long-running AI agent, surfacing
9+
status during in-flight payment or order processing, and reporting progress
10+
from data pipelines. It is not designed for ultra-low-latency applications
11+
such as real-time voice; per-roundtrip latency is around 100ms, and cost
12+
scales with durable batches rather than tokens.
13+
14+
Under the hood the stream is built directly on Temporal's existing
15+
message-passing primitives: Signals carry publishes, Updates serve
16+
long-poll subscriptions, and a Query exposes the current global offset.
17+
The library packages the boilerplate that turns those primitives into
18+
a usable stream: batching to amortize per-event overhead, deduplication
19+
for exactly-once delivery, topic filtering, and continue-as-new helpers
20+
that hand stream state across Workflow runs.
21+
22+
## Documentation
23+
24+
📖 **The full guide lives in the Temporal documentation site:**
25+
**[Workflow Streams — Python SDK](https://docs.temporal.io/develop/python/libraries/workflow-streams)**
26+
27+
It covers installation, enabling streaming on a Workflow, publishing from
28+
Workflows and Activities, subscribing, continue-as-new, delivery semantics,
29+
codec and payload encoding, architecture, and caveats — with runnable code
30+
snippets throughout.
31+
32+
For runnable end-to-end examples, see the
33+
[Workflow Streams samples](https://github.com/temporalio/samples-python/tree/main/workflow-streams).
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""Workflow Streams for Temporal workflows.
2+
3+
.. warning::
4+
This package is experimental and may change in future versions.
5+
6+
The Workflow Streams contrib library gives a workflow a durable,
7+
offset-addressed event channel built from Signals and polling Updates
8+
with an SSE bridge. Cost scales with durable batches, not tokens.
9+
Latency is around 100ms per roundtrip; not for ultra-low-latency voice.
10+
11+
See :py:class:`WorkflowStream` for the workflow-side stream object and
12+
:py:class:`WorkflowStreamClient` for the external client interface.
13+
"""
14+
15+
from temporalio.contrib.workflow_streams._client import WorkflowStreamClient
16+
from temporalio.contrib.workflow_streams._stream import WorkflowStream
17+
from temporalio.contrib.workflow_streams._topic_handle import (
18+
TopicHandle,
19+
WorkflowTopicHandle,
20+
)
21+
from temporalio.contrib.workflow_streams._types import (
22+
PollInput,
23+
PollResult,
24+
PublishEntry,
25+
PublisherState,
26+
PublishInput,
27+
WorkflowStreamItem,
28+
WorkflowStreamState,
29+
)
30+
31+
__all__ = [
32+
"PollInput",
33+
"PollResult",
34+
"PublishEntry",
35+
"PublishInput",
36+
"PublisherState",
37+
"TopicHandle",
38+
"WorkflowStream",
39+
"WorkflowStreamClient",
40+
"WorkflowStreamItem",
41+
"WorkflowStreamState",
42+
"WorkflowTopicHandle",
43+
]

0 commit comments

Comments
 (0)