Skip to content

Commit a580fae

Browse files
committed
Reviewing
1 parent 66d86ca commit a580fae

3 files changed

Lines changed: 42 additions & 54 deletions

File tree

temporalio/client/_impl.py

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -200,42 +200,25 @@ async def start_workflow(
200200
start_workflow_response=resp,
201201
)
202202
setattr(handle, "__temporal_eagerly_started", eagerly_started)
203-
# If this start / signal-with-start is issued from inside a Nexus operation handler (but
204-
# not as the nexus-backing workflow, whose links are handled separately by
205-
# WorkflowRunOperationContext.start_workflow), capture the backlink the server returned so
206-
# the caller workflow's Nexus history event links to the callee event.
203+
# If this signal-with-start is issued from inside a Nexus operation handler (but not as the
204+
# nexus-backing workflow, whose links are handled separately by
205+
# WorkflowRunOperationContext.start_workflow), capture the signal backlink the server
206+
# returned so the caller workflow's Nexus history event links to the signaled event. A
207+
# plain start does not capture a backlink: it only forwards the inbound links onto the
208+
# start request.
207209
nexus_ctx = self._try_nexus_start_operation_context()
208210
if (
209211
nexus_ctx is not None
210212
and not temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context()
211-
):
212-
if isinstance(
213+
and isinstance(
213214
resp,
214215
temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse,
215-
):
216-
# Server >= 1.31 with EnableCHASMSignalBacklinks returns signal_link pointing at
217-
# the WorkflowExecutionSignaled event; older servers leave it unset.
218-
if resp.HasField("signal_link"):
219-
nexus_ctx._add_backlink(resp.signal_link)
220-
else:
221-
if resp.HasField("link"):
222-
nexus_ctx._add_backlink(resp.link)
223-
else:
224-
# Older servers (pre-1.31) don't return a link on the start response.
225-
# Fabricate one pointing at the started workflow's WorkflowExecutionStarted
226-
# event so the caller still gets a backlink.
227-
nexus_ctx._add_backlink(
228-
temporalio.api.common.v1.Link(
229-
workflow_event=temporalio.api.common.v1.Link.WorkflowEvent(
230-
namespace=self._client.namespace,
231-
workflow_id=req.workflow_id,
232-
run_id=resp.run_id,
233-
event_ref=temporalio.api.common.v1.Link.WorkflowEvent.EventReference(
234-
event_type=temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
235-
),
236-
)
237-
)
238-
)
216+
)
217+
):
218+
# Server >= 1.31 with EnableCHASMSignalBacklinks returns signal_link pointing at
219+
# the WorkflowExecutionSignaled event; older servers leave it unset.
220+
if resp.HasField("signal_link"):
221+
nexus_ctx._add_backlink(resp.signal_link)
239222
return handle
240223

241224
async def _build_start_workflow_execution_request(

tests/nexus/test_signal_link_propagation.py

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
SignalWorkflowInput,
3838
StartWorkflowInput,
3939
)
40-
from temporalio.nexus._link_conversion import _LinkType
4140
from temporalio.worker._nexus import _NexusTaskCancellation, _NexusWorker
4241

4342
NAMESPACE = "test-namespace"
@@ -316,7 +315,9 @@ async def test_signal_with_start_against_older_server_captures_no_backlink(
316315
# ── start ─────────────────────────────────────────────────────────────────────────────────
317316

318317

319-
async def test_start_uses_server_link_when_present(nexus_ctx: Any) -> None:
318+
async def test_start_forwards_inbound_links_and_captures_no_backlink(
319+
nexus_ctx: Any,
320+
) -> None:
320321
server_link = _workflow_event_link(
321322
WORKFLOW_ID,
322323
"target-run",
@@ -333,15 +334,18 @@ async def test_start_uses_server_link_when_present(nexus_ctx: Any) -> None:
333334

334335
await impl.start_workflow(_start_input())
335336

336-
assert len(nexus_ctx.nexus_context.outbound_links) == 1
337-
[link] = nexus_ctx.nexus_context.outbound_links
338-
assert link.type == _LinkType.WORKFLOW.value
339-
assert "wf-target" in link.url
340-
# WorkflowExecutionStarted event type round-trips through the URL.
341-
assert "WorkflowExecutionStarted" in link.url
337+
# Forward: the start request carries the single inbound link.
338+
sent = workflow_service.start_workflow_execution.call_args.args[0]
339+
assert len(sent.links) == 1
340+
assert sent.links[0] == _inbound_nexus_link()
341+
342+
# Backward: a plain start does not capture a backlink, even when the server returns one.
343+
assert nexus_ctx.nexus_context.outbound_links == []
342344

343345

344-
async def test_start_fabricates_link_when_server_omits_it(nexus_ctx: Any) -> None:
346+
async def test_start_against_older_server_captures_no_backlink(
347+
nexus_ctx: Any,
348+
) -> None:
345349
workflow_service = mock.MagicMock()
346350
workflow_service.start_workflow_execution = mock.AsyncMock(
347351
return_value=temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse(
@@ -352,12 +356,13 @@ async def test_start_fabricates_link_when_server_omits_it(nexus_ctx: Any) -> Non
352356

353357
await impl.start_workflow(_start_input())
354358

355-
assert len(nexus_ctx.nexus_context.outbound_links) == 1
356-
[link] = nexus_ctx.nexus_context.outbound_links
357-
# Fabricated link points at the started workflow's WorkflowExecutionStarted event.
358-
assert "wf-target" in link.url
359-
assert "target-run" in link.url
360-
assert "WorkflowExecutionStarted" in link.url
359+
# Forward direction still works regardless of server version.
360+
sent = workflow_service.start_workflow_execution.call_args.args[0]
361+
assert len(sent.links) == 1
362+
assert sent.links[0] == _inbound_nexus_link()
363+
364+
# Backward: a plain start never fabricates a backlink.
365+
assert nexus_ctx.nexus_context.outbound_links == []
361366

362367

363368
async def test_start_outside_nexus_context_does_not_touch_links() -> None:

tests/nexus/test_signal_link_propagation_e2e.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class SignalingService:
5959

6060

6161
@workflow.defn
62-
class SignalCalleeWorkflow:
62+
class CalleeWorkflow:
6363
def __init__(self) -> None:
6464
self._received: list[str] = []
6565
self._expected = 1
@@ -79,7 +79,7 @@ def ping(self, msg: str) -> None:
7979

8080

8181
@workflow.defn
82-
class SignalCallerWorkflow:
82+
class CallerWorkflow:
8383
@workflow.run
8484
async def run(self, mode: str, callee_id: str, task_queue: str) -> str:
8585
client = workflow.create_nexus_client(
@@ -133,7 +133,7 @@ async def op(self, _ctx: StartOperationContext, input: str) -> str:
133133
await (
134134
nexus.client()
135135
.get_workflow_handle(callee_id)
136-
.signal(SignalCalleeWorkflow.ping, "second")
136+
.signal(CalleeWorkflow.ping, "second")
137137
)
138138
return "ok:sync"
139139

@@ -155,7 +155,7 @@ async def _signal_with_start(callee_id: str, payload: str) -> None:
155155
# signal-with-start exercises the SignalWithStartWorkflowExecutionResponse.signal_link
156156
# backlink path in temporalio.client._impl.
157157
await nexus.client().start_workflow(
158-
SignalCalleeWorkflow.run,
158+
CalleeWorkflow.run,
159159
2 if payload == "first" else 1,
160160
id=callee_id,
161161
task_queue=nexus.info().task_queue,
@@ -251,17 +251,17 @@ async def test_sync_signal_operation_links(
251251

252252
task_queue = str(uuid.uuid4())
253253
await env.create_nexus_endpoint(make_nexus_endpoint_name(task_queue), task_queue)
254-
callee_id = f"sync-callee-{uuid.uuid4()}"
255-
caller_id = f"sync-caller-{uuid.uuid4()}"
254+
callee_id = f"callee-{uuid.uuid4()}"
255+
caller_id = f"caller-{uuid.uuid4()}"
256256

257257
async with Worker(
258258
client,
259259
task_queue=task_queue,
260260
nexus_service_handlers=[SignalingServiceHandler()],
261-
workflows=[SignalCallerWorkflow, SignalCalleeWorkflow],
261+
workflows=[CallerWorkflow, CalleeWorkflow],
262262
):
263263
caller_handle = await client.start_workflow(
264-
SignalCallerWorkflow.run,
264+
CallerWorkflow.run,
265265
args=[MODE_SYNC, callee_id, task_queue],
266266
id=caller_id,
267267
task_queue=task_queue,
@@ -307,7 +307,7 @@ async def test_async_signal_operation_links(
307307
client,
308308
task_queue=task_queue,
309309
nexus_service_handlers=[AsyncSignalingServiceHandler()],
310-
workflows=[AsyncSignalCallerWorkflow, SignalCalleeWorkflow],
310+
workflows=[AsyncSignalCallerWorkflow, CalleeWorkflow],
311311
):
312312
caller_handle = await client.start_workflow(
313313
AsyncSignalCallerWorkflow.run,

0 commit comments

Comments
 (0)