Skip to content

Commit 31a0a1a

Browse files
Adding links to Nexus signals (#1593)
* Signal work * Reviewing * Renaming to RequestLink and ResponseLink * Addressing PR comments * Updating method name * Removed unused test methods * Adding tests, fixed an issue * Adding tests * Fix a linter issue * Added entry to Changelog * Fixing a race condition * centralize and simplify start workflow response link handling. Move private start context accessor to nexus package. Update tests to reflect that response links are created when using plain start workflow * run formatter, address lint * move changelog entry. Remove inaccurate comment --------- Co-authored-by: Alex Mazzeo <alex.mazzeo@temporal.io>
1 parent 5f1d352 commit 31a0a1a

9 files changed

Lines changed: 1245 additions & 58 deletions

File tree

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ to include examples, links to docs, or any other relevant information.
2020

2121
### Added
2222

23+
- Nexus operation link propagation for signals. When a Nexus operation handler signals a workflow
24+
(including signal-with-start), the inbound Nexus request links are now forwarded onto the signaled
25+
workflow so its history events link back to the caller, and the link the server returns for the
26+
signaled event is attached to the caller workflow's Nexus operation history event. This makes the
27+
caller and callee mutually navigable in the UI for signal-based Nexus operations.
2328
- Exposed `backoff_start_interval` for continue-as-new, to allow the new workflow to start after a delay.
2429

2530
### Changed

temporalio/client/_impl.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ async def start_workflow(
200200
start_workflow_response=resp,
201201
)
202202
setattr(handle, "__temporal_eagerly_started", eagerly_started)
203+
nexus_ctx = temporalio.nexus._operation_context._try_start_operation_context()
204+
if nexus_ctx is not None:
205+
nexus_ctx._add_start_workflow_response_link(handle)
203206
return handle
204207

205208
async def _build_start_workflow_execution_request(
@@ -237,10 +240,24 @@ async def _build_start_workflow_execution_request(
237240
# Links are duplicated on request for compatibility with older server versions.
238241
req.links.extend(links)
239242

240-
if temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context():
243+
nexus_ctx = temporalio.nexus._operation_context._try_start_operation_context()
244+
if nexus_ctx is not None:
245+
# This start was issued from inside a Nexus operation handler. If the workflow ID
246+
# conflict policy is WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING and a conflict is
247+
# detected, attach this request's request ID, completion callbacks, and links to
248+
# the existing run. The TemporalNexusClient and WorkflowRunOperationContext are
249+
# responsible for setting the callbacks correctly, so it is safe to enable all
250+
# on-conflict options whenever we are invoked from an operation handler.
241251
req.on_conflict_options.attach_request_id = True
242252
req.on_conflict_options.attach_completion_callbacks = True
243253
req.on_conflict_options.attach_links = True
254+
# The nexus-backing workflow already carries its inbound links via input.links
255+
# (start_workflow forwards them as links=...). A plain start_workflow issued from
256+
# inside a Nexus operation handler must forward the inbound Nexus task links
257+
# explicitly so the started callee's WorkflowExecutionStarted event links back to
258+
# the caller.
259+
if not temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context():
260+
req.links.extend(nexus_ctx._get_request_links())
244261

245262
return req
246263

@@ -267,6 +284,15 @@ async def _build_signal_with_start_workflow_execution_request(
267284
await data_converter.encode(input.start_signal_args)
268285
)
269286
await self._populate_start_workflow_execution_request(req, input)
287+
# If this signal-with-start is issued from inside a Nexus operation handler (but not the
288+
# nexus-backing workflow), forward the inbound Nexus task links so both the callee's
289+
# WorkflowExecutionStarted and WorkflowExecutionSignaled events link back to the caller.
290+
if not temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context():
291+
nexus_ctx = (
292+
temporalio.nexus._operation_context._try_start_operation_context()
293+
)
294+
if nexus_ctx is not None:
295+
req.links.extend(nexus_ctx._get_request_links())
270296
return req
271297

272298
async def _build_update_with_start_start_workflow_execution_request(
@@ -500,9 +526,18 @@ async def signal_workflow(self, input: SignalWorkflowInput) -> None:
500526
req.input.payloads.extend(await data_converter.encode(input.args))
501527
if input.headers is not None: # type:ignore[reportUnnecessaryComparison]
502528
await self._apply_headers(input.headers, req.header.fields)
503-
await self._client.workflow_service.signal_workflow_execution(
529+
# If this signal is issued from inside a Nexus operation handler, forward the inbound
530+
# Nexus task links so the WorkflowExecutionSignaled event links back to the caller.
531+
nexus_ctx = temporalio.nexus._operation_context._try_start_operation_context()
532+
if nexus_ctx is not None:
533+
req.links.extend(nexus_ctx._get_request_links())
534+
resp = await self._client.workflow_service.signal_workflow_execution(
504535
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout
505536
)
537+
# Server >= 1.31 with EnableCHASMSignalBacklinks returns a response link pointing at the
538+
# signal event; older servers leave it unset. Propagate when present.
539+
if nexus_ctx is not None and resp.HasField("link"):
540+
nexus_ctx._add_response_link(resp.link)
506541

507542
async def terminate_workflow(self, input: TerminateWorkflowInput) -> None:
508543
data_converter = self._client.data_converter._with_contexts(

temporalio/nexus/_link_conversion.py

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
logger = logging.getLogger(__name__)
2121

2222
_NEXUS_OPERATION_LINK_URL_PATH_REGEX = re.compile(
23-
r"^/namespaces/(?P<namespace>[^/]+)/nexus-operations/(?P<operation_id>[^/]+)$"
23+
r"^/namespaces/(?P<namespace>[^/]+)/nexus-operations/(?P<operation_id>[^/]+)/(?P<run_id>[^/]*)/details$"
2424
)
2525

2626
_WORKFLOW_LINK_URL_PATH_REGEX = re.compile(
@@ -38,7 +38,6 @@ class _LinkType(str, Enum):
3838
LINK_EVENT_TYPE_PARAM_NAME = "eventType"
3939
LINK_REQUEST_ID_PARAM_NAME = "requestID"
4040
LINK_REFERENCE_TYPE_PARAM_NAME = "referenceType"
41-
LINK_RUN_ID_PARAM_NAME = "runID"
4241
LINK_REASON_PARAM_NAME = "reason"
4342

4443
EVENT_REFERENCE_TYPE = "EventReference"
@@ -182,18 +181,11 @@ def nexus_operation_to_nexus_link(
182181
"""
183182
namespace = urllib.parse.quote(op_link.namespace, safe="")
184183
operation_id = urllib.parse.quote(op_link.operation_id, safe="")
185-
path = f"/namespaces/{namespace}/nexus-operations/{operation_id}"
186-
187-
query_params = ""
188-
if op_link.run_id:
189-
query_params = urllib.parse.urlencode(
190-
{
191-
LINK_RUN_ID_PARAM_NAME: op_link.run_id,
192-
},
193-
)
184+
run_id = urllib.parse.quote(op_link.run_id, safe="")
185+
path = f"/namespaces/{namespace}/nexus-operations/{operation_id}/{run_id}/details"
194186

195187
return nexusrpc.Link(
196-
url=_temporal_nexus_url(path, query_params=query_params),
188+
url=_temporal_nexus_url(path),
197189
type=_LinkType.NEXUS_OPERATION.value,
198190
)
199191

@@ -333,19 +325,11 @@ def nexus_link_to_nexus_operation_link(
333325
)
334326
return None
335327

336-
query_params = urllib.parse.parse_qs(url.query)
337-
338-
try:
339-
run_id = _optional_single_query_param(query_params, LINK_RUN_ID_PARAM_NAME)
340-
except ValueError as err:
341-
logger.warning(f"Invalid Nexus link: {nexus_link}. {err}")
342-
return None
343-
344328
groups = match.groupdict()
345329
nexus_op_link = temporalio.api.common.v1.Link.NexusOperation(
346330
namespace=urllib.parse.unquote(groups["namespace"]),
347331
operation_id=urllib.parse.unquote(groups["operation_id"]),
348-
run_id=run_id,
332+
run_id=urllib.parse.unquote(groups["run_id"]),
349333
)
350334
return temporalio.api.common.v1.Link(nexus_operation=nexus_op_link)
351335

temporalio/nexus/_operation_context.py

Lines changed: 61 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
overload,
2424
)
2525

26+
import nexusrpc
2627
from nexusrpc.handler import (
2728
CancelOperationContext,
2829
OperationContext,
@@ -44,6 +45,7 @@
4445

4546
from ._link_conversion import (
4647
nexus_link_to_temporal_link,
48+
temporal_link_to_nexus_link,
4749
workflow_event_to_nexus_link,
4850
workflow_execution_started_event_link_from_workflow_handle,
4951
)
@@ -167,6 +169,11 @@ def _try_temporal_context() -> (
167169
return start_ctx or cancel_ctx
168170

169171

172+
def _try_start_operation_context() -> _TemporalStartOperationContext | None: # pyright: ignore[reportUnusedFunction]
173+
"""The Nexus start-operation context if a handler is currently running, else None."""
174+
return _temporal_start_operation_context.get(None)
175+
176+
170177
@contextmanager
171178
def _nexus_backing_workflow_start_context() -> Generator[None]:
172179
token = _temporal_nexus_backing_workflow_start_context.set(True)
@@ -239,44 +246,73 @@ def _get_callbacks(self, token: str) -> list[temporalio.client.Callback]:
239246
else []
240247
)
241248

242-
def _get_links(
243-
self,
244-
) -> list[temporalio.api.common.v1.Link]:
249+
def _get_request_links(self) -> list[temporalio.api.common.v1.Link]:
250+
"""Request links to attach to RPCs the operation handler issues.
251+
252+
These are the inbound Nexus task links. When the operation handler signals,
253+
signal-with-starts, or starts a workflow, these links are added to the request's
254+
``links`` field so the callee's history event links back to whatever scheduled this
255+
Nexus operation.
256+
"""
245257
event_links: list[temporalio.api.common.v1.Link] = []
246258
for inbound_link in self.nexus_context.inbound_links:
247259
if link := nexus_link_to_temporal_link(inbound_link):
248260
event_links.append(link)
249261
return event_links
250262

251-
def _add_outbound_links(
263+
def _add_start_workflow_response_link(
252264
self, workflow_handle: temporalio.client.WorkflowHandle[Any, Any]
253265
):
254-
# If links were not sent in StartWorkflowExecutionResponse then construct them.
255-
wf_event_links: list[temporalio.api.common.v1.Link.WorkflowEvent] = []
256-
try:
257-
if isinstance(
258-
workflow_handle._start_workflow_response,
259-
temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse,
260-
):
261-
if workflow_handle._start_workflow_response.HasField("link"):
262-
if link := workflow_handle._start_workflow_response.link:
263-
if link.HasField("workflow_event"):
264-
wf_event_links.append(link.workflow_event)
265-
if not wf_event_links:
266-
wf_event_links = [
267-
workflow_execution_started_event_link_from_workflow_handle(
266+
response = workflow_handle._start_workflow_response
267+
268+
nexus_link: nexusrpc.Link | None = None
269+
if isinstance(
270+
response, temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse
271+
):
272+
if response.HasField("link"):
273+
nexus_link = temporal_link_to_nexus_link(response.link)
274+
else:
275+
# If a link was not sent in response then construct it.
276+
link = temporalio.api.common.v1.Link(
277+
workflow_event=workflow_execution_started_event_link_from_workflow_handle(
268278
workflow_handle,
269279
self.nexus_context.request_id,
270280
)
271-
]
272-
self.nexus_context.outbound_links.extend(
273-
workflow_event_to_nexus_link(link) for link in wf_event_links
274-
)
281+
)
282+
nexus_link = temporal_link_to_nexus_link(link)
283+
284+
elif isinstance(
285+
response,
286+
temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse,
287+
):
288+
# Server >= 1.31 with EnableCHASMSignalBacklinks returns signal_link pointing at
289+
# the WorkflowExecutionSignaled event; older servers leave it unset.
290+
if response.HasField("signal_link"):
291+
nexus_link = temporal_link_to_nexus_link(response.signal_link)
292+
293+
try:
294+
if nexus_link is not None:
295+
self.nexus_context.outbound_links.append(nexus_link)
275296
except Exception as e:
276297
logger.warning(
277-
f"Failed to create WorkflowExecutionStarted event links for workflow {workflow_handle}: {e}"
298+
f"Failed to create event links for workflow {workflow_handle}: {e}"
278299
)
279-
return workflow_handle
300+
301+
def _add_response_link(self, link: temporalio.api.common.v1.Link | None) -> None:
302+
"""Append a response link returned by an RPC the operation handler issued.
303+
304+
``link`` is the ``common.v1.Link`` returned on a signal, signal-with-start, or start
305+
response (or ``None`` against a server that did not return one). When present and of the
306+
``workflow_event`` variant, it is converted to a Nexus link and added to the operation's
307+
outbound links so the caller workflow's Nexus history event links to the callee event.
308+
309+
This is only safe to call from the single thread/task that runs the operation handler.
310+
"""
311+
if link is None or not link.HasField("workflow_event"):
312+
return
313+
self.nexus_context.outbound_links.append(
314+
workflow_event_to_nexus_link(link.workflow_event)
315+
)
280316

281317

282318
class WorkflowRunOperationContext(StartOperationContext):
@@ -674,10 +710,8 @@ async def _start_nexus_backing_workflow(
674710
priority=priority,
675711
versioning_override=versioning_override,
676712
callbacks=temporal_context._get_callbacks(token),
677-
links=temporal_context._get_links(),
713+
links=temporal_context._get_request_links(),
678714
request_id=temporal_context.nexus_context.request_id,
679715
)
680716

681-
temporal_context._add_outbound_links(wf_handle)
682-
683717
return WorkflowHandle[ReturnType]._unsafe_from_client_workflow_handle(wf_handle)

temporalio/worker/_workflow.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -518,12 +518,13 @@ async def _handle_activation(
518518

519519
# Log workflow task duration with external storage metrics
520520
self._log_workflow_task_duration(
521-
act, task_start_time, download_metrics, upload_metrics
521+
act, workflow, task_start_time, download_metrics, upload_metrics
522522
)
523523

524524
def _log_workflow_task_duration(
525525
self,
526526
act: temporalio.bridge.proto.workflow_activation.WorkflowActivation,
527+
workflow: _RunningWorkflow | None,
527528
task_start_time: float,
528529
download_metrics: temporalio.converter._extstore.StorageOperationMetrics,
529530
upload_metrics: temporalio.converter._extstore.StorageOperationMetrics,
@@ -537,8 +538,7 @@ def _fmt_duration(td: timedelta) -> str:
537538
return f"{secs * 1000:.3f}ms"
538539

539540
completed_event_id = act.history_length + 1
540-
_running = self._running_workflows.get(act.run_id)
541-
_info = _running.get_info() if _running is not None else None
541+
_info = workflow.get_info() if workflow is not None else None
542542
attempt = _info.attempt if _info is not None else "unknown"
543543
log_id = f"{act.run_id}:{completed_event_id}:{attempt}"
544544
msg_details, extra = temporalio.workflow._build_log_context(

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
134134
"--dynamic-config-value",
135135
"history.enableChasmCallbacks=true",
136136
"--dynamic-config-value",
137+
"history.enableCHASMSignalBacklinks=true",
138+
"--dynamic-config-value",
137139
"nexusoperation.enableStandalone=true",
138140
"--dynamic-config-value",
139141
'system.system.refreshNexusEndpointsMinWait="0s"',

tests/nexus/test_link_conversion.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ def test_link_conversion_workflow_to_link_and_back(
277277
),
278278
nexusrpc.Link(
279279
type=temporalio.api.common.v1.Link.NexusOperation.DESCRIPTOR.full_name,
280-
url="temporal:///namespaces/ns/nexus-operations/op-id?runID=run-id",
280+
url="temporal:///namespaces/ns/nexus-operations/op-id/run-id/details",
281281
),
282282
),
283283
(
@@ -289,7 +289,7 @@ def test_link_conversion_workflow_to_link_and_back(
289289
),
290290
nexusrpc.Link(
291291
type=temporalio.api.common.v1.Link.NexusOperation.DESCRIPTOR.full_name,
292-
url="temporal:///namespaces/ns/nexus-operations/op-id",
292+
url="temporal:///namespaces/ns/nexus-operations/op-id//details",
293293
),
294294
),
295295
(
@@ -301,7 +301,7 @@ def test_link_conversion_workflow_to_link_and_back(
301301
),
302302
nexusrpc.Link(
303303
type=temporalio.api.common.v1.Link.NexusOperation.DESCRIPTOR.full_name,
304-
url="temporal:///namespaces/ns/nexus-operations/op%2Fid",
304+
url="temporal:///namespaces/ns/nexus-operations/op%2Fid//details",
305305
),
306306
),
307307
],
@@ -332,10 +332,12 @@ def test_link_conversion_nexus_operation_to_link_and_back(
332332
)
333333

334334

335-
def test_nexus_operation_link_with_duplicate_run_id_is_ignored():
335+
def test_nexus_operation_link_with_unparseable_url_is_ignored():
336+
# The canonical path is /nexus-operations/{op_id}/{run_id}/details; a URL missing the
337+
# run-id/details suffix (e.g. the legacy ?runID= form) does not parse.
336338
link = nexusrpc.Link(
337339
type=temporalio.api.common.v1.Link.NexusOperation.DESCRIPTOR.full_name,
338-
url="temporal:///namespaces/ns/nexus-operations/op-id?runID=one&runID=two",
340+
url="temporal:///namespaces/ns/nexus-operations/op-id?runID=run-id",
339341
)
340342
assert temporalio.nexus._link_conversion.nexus_link_to_temporal_link(link) is None
341343

0 commit comments

Comments
 (0)