Skip to content

Commit 66d86ca

Browse files
committed
Signal work
1 parent 53ae9fc commit 66d86ca

5 files changed

Lines changed: 915 additions & 1 deletion

File tree

temporalio/client/_impl.py

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,42 @@ async def start_workflow(
200200
start_workflow_response=resp,
201201
)
202202
setattr(handle, "__temporal_eagerly_started", eagerly_started)
203+
# If this start / signal-with-start is issued from inside a Nexus operation handler (but
204+
# not as the nexus-backing workflow, whose links are handled separately by
205+
# WorkflowRunOperationContext.start_workflow), capture the backlink the server returned so
206+
# the caller workflow's Nexus history event links to the callee event.
207+
nexus_ctx = self._try_nexus_start_operation_context()
208+
if (
209+
nexus_ctx is not None
210+
and not temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context()
211+
):
212+
if isinstance(
213+
resp,
214+
temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse,
215+
):
216+
# Server >= 1.31 with EnableCHASMSignalBacklinks returns signal_link pointing at
217+
# the WorkflowExecutionSignaled event; older servers leave it unset.
218+
if resp.HasField("signal_link"):
219+
nexus_ctx._add_backlink(resp.signal_link)
220+
else:
221+
if resp.HasField("link"):
222+
nexus_ctx._add_backlink(resp.link)
223+
else:
224+
# Older servers (pre-1.31) don't return a link on the start response.
225+
# Fabricate one pointing at the started workflow's WorkflowExecutionStarted
226+
# event so the caller still gets a backlink.
227+
nexus_ctx._add_backlink(
228+
temporalio.api.common.v1.Link(
229+
workflow_event=temporalio.api.common.v1.Link.WorkflowEvent(
230+
namespace=self._client.namespace,
231+
workflow_id=req.workflow_id,
232+
run_id=resp.run_id,
233+
event_ref=temporalio.api.common.v1.Link.WorkflowEvent.EventReference(
234+
event_type=temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
235+
),
236+
)
237+
)
238+
)
203239
return handle
204240

205241
async def _build_start_workflow_execution_request(
@@ -241,6 +277,14 @@ async def _build_start_workflow_execution_request(
241277
req.on_conflict_options.attach_request_id = True
242278
req.on_conflict_options.attach_completion_callbacks = True
243279
req.on_conflict_options.attach_links = True
280+
else:
281+
# If this is a plain start_workflow issued from inside a Nexus operation handler
282+
# (not the nexus-backing workflow, which already carries inbound links via
283+
# input.links), forward the inbound Nexus task links so the started callee's
284+
# WorkflowExecutionStarted event links back to the caller.
285+
nexus_ctx = self._try_nexus_start_operation_context()
286+
if nexus_ctx is not None:
287+
req.links.extend(nexus_ctx._get_outgoing_request_links())
244288

245289
return req
246290

@@ -267,6 +311,13 @@ async def _build_signal_with_start_workflow_execution_request(
267311
await data_converter.encode(input.start_signal_args)
268312
)
269313
await self._populate_start_workflow_execution_request(req, input)
314+
# If this signal-with-start is issued from inside a Nexus operation handler (but not the
315+
# nexus-backing workflow), forward the inbound Nexus task links so both the callee's
316+
# WorkflowExecutionStarted and WorkflowExecutionSignaled events link back to the caller.
317+
if not temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context():
318+
nexus_ctx = self._try_nexus_start_operation_context()
319+
if nexus_ctx is not None:
320+
req.links.extend(nexus_ctx._get_outgoing_request_links())
270321
return req
271322

272323
async def _build_update_with_start_start_workflow_execution_request(
@@ -500,9 +551,18 @@ async def signal_workflow(self, input: SignalWorkflowInput) -> None:
500551
req.input.payloads.extend(await data_converter.encode(input.args))
501552
if input.headers is not None: # type:ignore[reportUnnecessaryComparison]
502553
await self._apply_headers(input.headers, req.header.fields)
503-
await self._client.workflow_service.signal_workflow_execution(
554+
# If this signal is issued from inside a Nexus operation handler, forward the inbound
555+
# Nexus task links so the WorkflowExecutionSignaled event links back to the caller.
556+
nexus_ctx = self._try_nexus_start_operation_context()
557+
if nexus_ctx is not None:
558+
req.links.extend(nexus_ctx._get_outgoing_request_links())
559+
resp = await self._client.workflow_service.signal_workflow_execution(
504560
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout
505561
)
562+
# Server >= 1.31 with EnableCHASMSignalBacklinks returns a backlink pointing at the
563+
# signal event; older servers leave it unset. Propagate when present.
564+
if nexus_ctx is not None and resp.HasField("link"):
565+
nexus_ctx._add_backlink(resp.link)
506566

507567
async def terminate_workflow(self, input: TerminateWorkflowInput) -> None:
508568
data_converter = self._client.data_converter._with_contexts(
@@ -1636,6 +1696,17 @@ async def count_nexus_operations(
16361696
)
16371697
)
16381698

1699+
@staticmethod
1700+
def _try_nexus_start_operation_context() -> (
1701+
temporalio.nexus._operation_context._TemporalStartOperationContext | None
1702+
):
1703+
"""The Nexus start-operation context if a handler is currently running, else None."""
1704+
return (
1705+
temporalio.nexus._operation_context._temporal_start_operation_context.get(
1706+
None
1707+
)
1708+
)
1709+
16391710
async def _apply_headers(
16401711
self,
16411712
source: Mapping[str, temporalio.api.common.v1.Payload] | None,

temporalio/nexus/_operation_context.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,31 @@ def _add_outbound_links(
278278
)
279279
return workflow_handle
280280

281+
def _get_outgoing_request_links(self) -> list[temporalio.api.common.v1.Link]:
282+
"""Inbound Nexus task links to attach to RPCs the operation handler issues.
283+
284+
When the operation handler signals, signal-with-starts, or starts a workflow, these
285+
links are added to the request's ``links`` field so the callee's history event links
286+
back to the caller workflow that scheduled this Nexus operation.
287+
"""
288+
return self._get_links()
289+
290+
def _add_backlink(self, link: temporalio.api.common.v1.Link | None) -> None:
291+
"""Append a backlink returned by an RPC the operation handler issued.
292+
293+
``link`` is the ``common.v1.Link`` returned on a signal, signal-with-start, or start
294+
response (or ``None`` against a server that did not return one). When present and of the
295+
``workflow_event`` variant, it is converted to a Nexus link and added to the operation's
296+
outbound links so the caller workflow's Nexus history event links to the callee event.
297+
298+
This is only safe to call from the single thread/task that runs the operation handler.
299+
"""
300+
if link is None or not link.HasField("workflow_event"):
301+
return
302+
self.nexus_context.outbound_links.append(
303+
workflow_event_to_nexus_link(link.workflow_event)
304+
)
305+
281306

282307
class WorkflowRunOperationContext(StartOperationContext):
283308
"""Context received by a workflow run operation."""

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
133133
"--dynamic-config-value",
134134
"history.enableChasmCallbacks=true",
135135
"--dynamic-config-value",
136+
"history.enableCHASMSignalBacklinks=true",
137+
"--dynamic-config-value",
136138
"nexusoperation.enableStandalone=true",
137139
"--dynamic-config-value",
138140
'system.system.refreshNexusEndpointsMinWait="0s"',

0 commit comments

Comments
 (0)