Skip to content

Commit 7f260fe

Browse files
committed
Add SAA Temporal Nexus operation handling
Expose customizable Temporal Nexus operation handlers and client support. Add activity link conversion, operation token handling, and related tests.
1 parent d53a604 commit 7f260fe

14 files changed

Lines changed: 1219 additions & 32 deletions

temporalio/client/_activity.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,8 @@ def __init__(
691691
*,
692692
run_id: str | None = None,
693693
result_type: type | None = None,
694+
start_activity_response: None
695+
| temporalio.api.workflowservice.v1.StartActivityExecutionResponse = None,
694696
) -> None:
695697
"""Create activity handle."""
696698
self._client = client
@@ -700,6 +702,7 @@ def __init__(
700702
self._known_outcome: (
701703
temporalio.api.activity.v1.ActivityExecutionOutcome | None
702704
) = None
705+
self._start_activity_response = start_activity_response
703706

704707
@functools.cached_property
705708
def _data_converter(self) -> temporalio.converter.DataConverter:

temporalio/client/_client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1484,6 +1484,12 @@ async def start_activity(
14841484
start_delay: timedelta | None = None,
14851485
rpc_metadata: Mapping[str, str | bytes] = {},
14861486
rpc_timeout: timedelta | None = None,
1487+
# The following options should not be considered part of the public API. They
1488+
# are deliberately not exposed in overloads, and are not subject to any
1489+
# backwards compatibility guarantees.
1490+
callbacks: Sequence[Callback] = [],
1491+
links: Sequence[temporalio.api.common.v1.Link] = [],
1492+
request_id: str | None = None,
14871493
) -> ActivityHandle[ReturnType]:
14881494
"""Start an activity and return its handle.
14891495
@@ -1542,6 +1548,9 @@ async def start_activity(
15421548
rpc_metadata=rpc_metadata,
15431549
rpc_timeout=rpc_timeout,
15441550
priority=priority,
1551+
callbacks=callbacks,
1552+
links=links,
1553+
request_id=request_id,
15451554
)
15461555
)
15471556

temporalio/client/_impl.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ async def _build_start_workflow_execution_request(
237237
# Links are duplicated on request for compatibility with older server versions.
238238
req.links.extend(links)
239239

240-
if temporalio.nexus._operation_context._in_nexus_backing_workflow_start_context():
240+
if temporalio.nexus._operation_context._in_nexus_backing_start_context():
241241
req.on_conflict_options.attach_request_id = True
242242
req.on_conflict_options.attach_completion_callbacks = True
243243
req.on_conflict_options.attach_links = True
@@ -566,6 +566,7 @@ async def start_activity(self, input: StartActivityInput) -> ActivityHandle[Any]
566566
input.id,
567567
run_id=resp.run_id,
568568
result_type=input.result_type,
569+
start_activity_response=resp,
569570
)
570571

571572
async def _build_start_activity_execution_request(
@@ -609,6 +610,8 @@ async def _build_start_activity_execution_request(
609610
),
610611
)
611612

613+
if input.request_id:
614+
req.request_id = input.request_id
612615
if input.schedule_to_close_timeout is not None:
613616
req.schedule_to_close_timeout.FromTimedelta(input.schedule_to_close_timeout)
614617
if input.start_to_close_timeout is not None:
@@ -644,6 +647,23 @@ async def _build_start_activity_execution_request(
644647
# Set priority
645648
req.priority.CopyFrom(input.priority._to_proto())
646649

650+
req.completion_callbacks.extend(
651+
temporalio.api.common.v1.Callback(
652+
nexus=temporalio.api.common.v1.Callback.Nexus(
653+
url=callback.url,
654+
header=callback.headers,
655+
),
656+
links=input.links,
657+
)
658+
for callback in input.callbacks
659+
)
660+
req.links.extend(input.links)
661+
662+
if temporalio.nexus._operation_context._in_nexus_backing_start_context():
663+
req.on_conflict_options.attach_request_id = True
664+
req.on_conflict_options.attach_completion_callbacks = True
665+
req.on_conflict_options.attach_links = True
666+
647667
return req
648668

649669
async def cancel_activity(self, input: CancelActivityInput) -> None:

temporalio/client/_interceptor.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@ class StartActivityInput:
230230
headers: Mapping[str, temporalio.api.common.v1.Payload]
231231
rpc_metadata: Mapping[str, str | bytes]
232232
rpc_timeout: timedelta | None
233+
# The following options are experimental and unstable.
234+
callbacks: Sequence[Callback]
235+
links: Sequence[temporalio.api.common.v1.Link]
236+
request_id: str | None
233237

234238

235239
@dataclass

temporalio/nexus/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
wait_for_worker_shutdown_sync,
2626
)
2727
from ._operation_handlers import (
28+
CancelActivityOptions,
2829
CancelWorkflowRunOptions,
2930
TemporalNexusOperationHandler,
3031
)
@@ -33,6 +34,7 @@
3334

3435
__all__ = (
3536
"workflow_run_operation",
37+
"CancelActivityOptions",
3638
"CancelWorkflowRunOptions",
3739
"Info",
3840
"LoggerAdapter",

temporalio/nexus/_link_conversion.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
r"^/namespaces/(?P<namespace>[^/]+)/nexus-operations/(?P<operation_id>[^/]+)$"
2424
)
2525

26+
_ACTIVITY_LINK_URL_PATH_REGEX = re.compile(
27+
r"^/namespaces/(?P<namespace>[^/]+)/activities/(?P<activity_id>[^/]+)$"
28+
)
29+
2630
_WORFKLOW_LINK_URL_PATH_REGEX = re.compile(
2731
r"^/namespaces/(?P<namespace>[^/]+)/workflows/(?P<workflow_id>[^/]+)/(?P<run_id>[^/]+)/history$"
2832
)
@@ -31,6 +35,7 @@
3135
class _LinkType(str, Enum):
3236
WORKFLOW = temporalio.api.common.v1.Link.WorkflowEvent.DESCRIPTOR.full_name
3337
NEXUS_OPERATION = temporalio.api.common.v1.Link.NexusOperation.DESCRIPTOR.full_name
38+
ACTIVITY = temporalio.api.common.v1.Link.Activity.DESCRIPTOR.full_name
3439

3540

3641
LINK_EVENT_ID_PARAM_NAME = "eventID"
@@ -84,6 +89,9 @@ def nexus_link_to_temporal_link(
8489
case _LinkType.NEXUS_OPERATION:
8590
return nexus_link_to_nexus_operation_link(nexus_link)
8691

92+
case _LinkType.ACTIVITY:
93+
return nexus_link_to_activity_link(nexus_link)
94+
8795

8896
def temporal_link_to_nexus_link(
8997
temporal_link: temporalio.api.common.v1.Link,
@@ -99,7 +107,10 @@ def temporal_link_to_nexus_link(
99107
case "nexus_operation":
100108
return nexus_operation_to_nexus_link(temporal_link.nexus_operation)
101109

102-
case "activity" | "batch_job":
110+
case "activity":
111+
return activity_link_to_nexus_link(temporal_link.activity)
112+
113+
case "batch_job":
103114
raise NotImplementedError("only workflow links are supported")
104115

105116
case None:
@@ -165,6 +176,25 @@ def nexus_operation_to_nexus_link(
165176
return nexusrpc.Link(url=url, type=_LinkType.NEXUS_OPERATION.value)
166177

167178

179+
def activity_link_to_nexus_link(
180+
activity: temporalio.api.common.v1.Link.Activity,
181+
) -> nexusrpc.Link:
182+
"""Convert an Activity link into a nexusrpc link."""
183+
scheme = "temporal"
184+
namespace = urllib.parse.quote(activity.namespace, safe="")
185+
activity_id = urllib.parse.quote(activity.activity_id, safe="")
186+
path = f"/namespaces/{namespace}/activities/{activity_id}"
187+
188+
if activity.run_id:
189+
query_params = urllib.parse.urlencode({LINK_RUN_ID_PARAM_NAME: activity.run_id})
190+
else:
191+
query_params = ""
192+
193+
url = f"{scheme}://{urllib.parse.urlunparse(('', '', path, '', query_params, ''))}"
194+
195+
return nexusrpc.Link(url=url, type=_LinkType.ACTIVITY.value)
196+
197+
168198
def nexus_link_to_workflow_event_link(
169199
link: nexusrpc.Link,
170200
) -> temporalio.api.common.v1.Link | None:
@@ -250,6 +280,38 @@ def nexus_link_to_nexus_operation_link(
250280
return temporalio.api.common.v1.Link(nexus_operation=nexus_op_link)
251281

252282

283+
def nexus_link_to_activity_link(
284+
nexus_link: nexusrpc.Link,
285+
) -> temporalio.api.common.v1.Link | None:
286+
"""Convert a nexus link into a Temporal Activity link."""
287+
url = urllib.parse.urlparse(nexus_link.url)
288+
match = _ACTIVITY_LINK_URL_PATH_REGEX.match(url.path)
289+
if not match:
290+
logger.warning(
291+
f"Invalid Nexus link: {nexus_link}. Expected path to match {_ACTIVITY_LINK_URL_PATH_REGEX.pattern}"
292+
)
293+
return None
294+
295+
query_params = urllib.parse.parse_qs(url.query, keep_blank_values=True)
296+
297+
match query_params.get(LINK_RUN_ID_PARAM_NAME):
298+
case [run_id_param]:
299+
run_id = run_id_param
300+
case _:
301+
logger.warning(
302+
f"Invalid Nexus link: {nexus_link}. Expected {LINK_RUN_ID_PARAM_NAME} to have exactly 1 value"
303+
)
304+
return None
305+
306+
groups = match.groupdict()
307+
activity_link = temporalio.api.common.v1.Link.Activity(
308+
namespace=urllib.parse.unquote(groups["namespace"]),
309+
activity_id=urllib.parse.unquote(groups["activity_id"]),
310+
run_id=run_id,
311+
)
312+
return temporalio.api.common.v1.Link(activity=activity_link)
313+
314+
253315
def _event_reference_to_query_params(
254316
event_ref: temporalio.api.common.v1.Link.WorkflowEvent.EventReference,
255317
) -> str:

temporalio/nexus/_operation_context.py

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
)
4444

4545
from ._link_conversion import (
46+
activity_link_to_nexus_link,
4647
nexus_link_to_temporal_link,
4748
workflow_event_to_nexus_link,
4849
workflow_execution_started_event_link_from_workflow_handle,
@@ -64,12 +65,12 @@
6465
ContextVar("temporal-cancel-operation-context")
6566
)
6667

67-
# A Nexus start handler might start zero or more workflows as usual using a Temporal client. In
68-
# addition, it may start one "nexus-backing" workflow, using
69-
# WorkflowRunOperationContext.start_workflow. This context is active while the latter is being done.
68+
# A Nexus start handler might start zero or async Temporal actions as usual using a Temporal client. In
69+
# addition, it may start one "nexus-backing" async Temporal action, using
70+
# WorkflowRunOperationContext.start_workflow or methods from TemporalNexusClient. This context is active while the latter is being done.
7071
# It is thus a narrower context than _temporal_start_operation_context.
71-
_temporal_nexus_backing_workflow_start_context: ContextVar[bool] = ContextVar(
72-
"temporal-nexus-backing-workflow-start-context"
72+
_temporal_nexus_backing_start_context: ContextVar[bool] = ContextVar(
73+
"temporal-nexus-backing-start-context"
7374
)
7475

7576

@@ -168,16 +169,16 @@ def _try_temporal_context() -> (
168169

169170

170171
@contextmanager
171-
def _nexus_backing_workflow_start_context() -> Generator[None]:
172-
token = _temporal_nexus_backing_workflow_start_context.set(True)
172+
def _nexus_backing_start_context() -> Generator[None]:
173+
token = _temporal_nexus_backing_start_context.set(True)
173174
try:
174175
yield
175176
finally:
176-
_temporal_nexus_backing_workflow_start_context.reset(token)
177+
_temporal_nexus_backing_start_context.reset(token)
177178

178179

179-
def _in_nexus_backing_workflow_start_context() -> bool: # type:ignore[reportUnusedClass]
180-
return _temporal_nexus_backing_workflow_start_context.get(False)
180+
def _in_nexus_backing_start_context() -> bool: # type:ignore[reportUnusedClass]
181+
return _temporal_nexus_backing_start_context.get(False)
181182

182183

183184
_OperationCtxT = TypeVar("_OperationCtxT", bound=OperationContext)
@@ -243,13 +244,13 @@ def _get_callbacks(
243244
def _get_links(
244245
self,
245246
) -> list[temporalio.api.common.v1.Link]:
246-
event_links: list[temporalio.api.common.v1.Link] = []
247+
links: list[temporalio.api.common.v1.Link] = []
247248
for inbound_link in self.nexus_context.inbound_links:
248249
if link := nexus_link_to_temporal_link(inbound_link):
249-
event_links.append(link)
250-
return event_links
250+
links.append(link)
251+
return links
251252

252-
def _add_outbound_links(
253+
def _add_outbound_workflow_links(
253254
self, workflow_handle: temporalio.client.WorkflowHandle[Any, Any]
254255
):
255256
# If links were not sent in StartWorkflowExecutionResponse then construct them.
@@ -279,6 +280,45 @@ def _add_outbound_links(
279280
)
280281
return workflow_handle
281282

283+
def _add_outbound_activity_links(
284+
self, activity_handle: temporalio.client.ActivityHandle[Any]
285+
):
286+
activity_links: list[temporalio.api.common.v1.Link.Activity] = []
287+
try:
288+
if isinstance(
289+
activity_handle._start_activity_response,
290+
temporalio.api.workflowservice.v1.StartActivityExecutionResponse,
291+
):
292+
if activity_handle._start_activity_response.HasField("link"):
293+
if activity_handle._start_activity_response.link.HasField(
294+
"activity"
295+
):
296+
activity_links.append(
297+
activity_handle._start_activity_response.link.activity
298+
)
299+
if not activity_links:
300+
activity_run_id = activity_handle.run_id
301+
if activity_run_id is None:
302+
raise ValueError(
303+
f"Activity handle {activity_handle} has no run ID. "
304+
"Cannot create Activity link."
305+
)
306+
activity_links.append(
307+
temporalio.api.common.v1.Link.Activity(
308+
namespace=activity_handle._client.namespace,
309+
activity_id=activity_handle.id,
310+
run_id=activity_run_id,
311+
)
312+
)
313+
self.nexus_context.outbound_links.extend(
314+
activity_link_to_nexus_link(link) for link in activity_links
315+
)
316+
except Exception as e:
317+
logger.warning(
318+
f"Failed to create Activity link for activity {activity_handle}: {e}"
319+
)
320+
return activity_handle
321+
282322

283323
class WorkflowRunOperationContext(StartOperationContext):
284324
"""Context received by a workflow run operation."""
@@ -642,7 +682,7 @@ async def _start_nexus_backing_workflow(
642682
# namespace to deliver the result to the caller namespace when the workflow reaches a
643683
# terminal state) and inbound links to the caller workflow (attached to history events of
644684
# the workflow started in the handler namespace, and displayed in the UI).
645-
with _nexus_backing_workflow_start_context():
685+
with _nexus_backing_start_context():
646686
wf_handle = await temporal_context.client.start_workflow( # type: ignore
647687
workflow=workflow,
648688
arg=arg,
@@ -674,6 +714,6 @@ async def _start_nexus_backing_workflow(
674714
request_id=temporal_context.nexus_context.request_id,
675715
)
676716

677-
temporal_context._add_outbound_links(wf_handle)
717+
temporal_context._add_outbound_workflow_links(wf_handle)
678718

679719
return WorkflowHandle[ReturnType]._unsafe_from_client_workflow_handle(wf_handle)

0 commit comments

Comments
 (0)