Skip to content

Commit 294b7e2

Browse files
committed
Deal with new Nexus link type
1 parent 0b00bcf commit 294b7e2

2 files changed

Lines changed: 191 additions & 38 deletions

File tree

temporalio/nexus/_link_conversion.py

Lines changed: 136 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@
2323
r"^/namespaces/(?P<namespace>[^/]+)/nexus-operations/(?P<operation_id>[^/]+)$"
2424
)
2525

26-
_WORFKLOW_LINK_URL_PATH_REGEX = re.compile(
27-
r"^/namespaces/(?P<namespace>[^/]+)/workflows/(?P<workflow_id>[^/]+)/(?P<run_id>[^/]+)/history$"
26+
_WORKFLOW_LINK_URL_PATH_REGEX = re.compile(
27+
r"^/namespaces/(?P<namespace>[^/]+)/workflows/(?P<workflow_id>[^/]+)/(?P<run_id>[^/]+)(?P<history>/history)?$"
2828
)
2929

3030

3131
class _LinkType(str, Enum):
32-
WORKFLOW = temporalio.api.common.v1.Link.WorkflowEvent.DESCRIPTOR.full_name
32+
WORKFLOW_EVENT = temporalio.api.common.v1.Link.WorkflowEvent.DESCRIPTOR.full_name
33+
WORKFLOW = temporalio.api.common.v1.Link.Workflow.DESCRIPTOR.full_name
3334
NEXUS_OPERATION = temporalio.api.common.v1.Link.NexusOperation.DESCRIPTOR.full_name
3435

3536

@@ -38,6 +39,7 @@ class _LinkType(str, Enum):
3839
LINK_REQUEST_ID_PARAM_NAME = "requestID"
3940
LINK_REFERENCE_TYPE_PARAM_NAME = "referenceType"
4041
LINK_RUN_ID_PARAM_NAME = "runID"
42+
LINK_REASON_PARAM_NAME = "reason"
4143

4244
EVENT_REFERENCE_TYPE = "EventReference"
4345
REQUEST_ID_REFERENCE_TYPE = "RequestIdReference"
@@ -78,9 +80,12 @@ def nexus_link_to_temporal_link(
7880
return None
7981

8082
match link_type:
81-
case _LinkType.WORKFLOW:
83+
case _LinkType.WORKFLOW_EVENT:
8284
return nexus_link_to_workflow_event_link(nexus_link)
8385

86+
case _LinkType.WORKFLOW:
87+
return nexus_link_to_workflow_link(nexus_link)
88+
8489
case _LinkType.NEXUS_OPERATION:
8590
return nexus_link_to_nexus_operation_link(nexus_link)
8691

@@ -96,6 +101,9 @@ def temporal_link_to_nexus_link(
96101
case "workflow_event":
97102
return workflow_event_to_nexus_link(temporal_link.workflow_event)
98103

104+
case "workflow":
105+
return workflow_to_nexus_link(temporal_link.workflow)
106+
99107
case "nexus_operation":
100108
return nexus_operation_to_nexus_link(temporal_link.nexus_operation)
101109

@@ -117,12 +125,6 @@ def workflow_event_to_nexus_link(
117125
Used when propagating links from a StartWorkflow response to a Nexus start operation
118126
response.
119127
"""
120-
scheme = "temporal"
121-
namespace = urllib.parse.quote(workflow_event.namespace, safe="")
122-
workflow_id = urllib.parse.quote(workflow_event.workflow_id, safe="")
123-
run_id = urllib.parse.quote(workflow_event.run_id, safe="")
124-
path = f"/namespaces/{namespace}/workflows/{workflow_id}/{run_id}/history"
125-
126128
query_params = None
127129
match workflow_event.WhichOneof("reference"):
128130
case "event_ref":
@@ -134,10 +136,40 @@ def workflow_event_to_nexus_link(
134136
case _:
135137
pass
136138

137-
# urllib will omit '//' from the url if netloc is empty so we add the scheme manually
138-
url = f"{scheme}://{urllib.parse.urlunparse(('', '', path, '', query_params, ''))}"
139+
return nexusrpc.Link(
140+
url=_workflow_nexus_url(
141+
workflow_event.namespace,
142+
workflow_event.workflow_id,
143+
workflow_event.run_id,
144+
history=True,
145+
query_params=query_params,
146+
),
147+
type=_LinkType.WORKFLOW_EVENT.value,
148+
)
149+
150+
151+
def workflow_to_nexus_link(
152+
workflow: temporalio.api.common.v1.Link.Workflow,
153+
) -> nexusrpc.Link:
154+
"""Convert a Workflow link into a nexusrpc link."""
155+
query_params = ""
156+
if workflow.reason:
157+
query_params = urllib.parse.urlencode(
158+
{
159+
LINK_REASON_PARAM_NAME: workflow.reason,
160+
},
161+
)
139162

140-
return nexusrpc.Link(url=url, type=_LinkType.WORKFLOW.value)
163+
return nexusrpc.Link(
164+
url=_workflow_nexus_url(
165+
workflow.namespace,
166+
workflow.workflow_id,
167+
workflow.run_id,
168+
history=False,
169+
query_params=query_params,
170+
),
171+
type=_LinkType.WORKFLOW.value,
172+
)
141173

142174

143175
def nexus_operation_to_nexus_link(
@@ -148,7 +180,6 @@ def nexus_operation_to_nexus_link(
148180
Used when propagating links from a StartNexusOperation response to a Nexus start operation
149181
response.
150182
"""
151-
scheme = "temporal"
152183
namespace = urllib.parse.quote(op_link.namespace, safe="")
153184
operation_id = urllib.parse.quote(op_link.operation_id, safe="")
154185
path = f"/namespaces/{namespace}/nexus-operations/{operation_id}"
@@ -161,10 +192,65 @@ def nexus_operation_to_nexus_link(
161192
},
162193
)
163194

195+
return nexusrpc.Link(
196+
url=_temporal_nexus_url(path, query_params=query_params),
197+
type=_LinkType.NEXUS_OPERATION.value,
198+
)
199+
200+
201+
def _workflow_nexus_url(
202+
namespace: str,
203+
workflow_id: str,
204+
run_id: str,
205+
*,
206+
history: bool,
207+
query_params: str | None = "",
208+
) -> str:
209+
namespace = urllib.parse.quote(namespace, safe="")
210+
workflow_id = urllib.parse.quote(workflow_id, safe="")
211+
run_id = urllib.parse.quote(run_id, safe="")
212+
path = f"/namespaces/{namespace}/workflows/{workflow_id}/{run_id}"
213+
if history:
214+
path += "/history"
215+
return _temporal_nexus_url(path, query_params=query_params)
216+
217+
218+
def _temporal_nexus_url(path: str, *, query_params: str | None = "") -> str:
164219
# urllib will omit '//' from the url if netloc is empty so we add the scheme manually
165-
url = f"{scheme}://{urllib.parse.urlunparse(('', '', path, '', query_params, ''))}"
220+
return f"temporal://{urllib.parse.urlunparse(('', '', path, '', query_params or '', ''))}"
221+
222+
223+
def _parse_workflow_nexus_url(
224+
link: nexusrpc.Link, *, history: bool
225+
) -> tuple[dict[str, str], dict[str, list[str]]] | None:
226+
url = urllib.parse.urlparse(link.url)
227+
match = _WORKFLOW_LINK_URL_PATH_REGEX.match(url.path)
228+
if not match or bool(match.group("history")) != history:
229+
expected_suffix = "/history" if history else ""
230+
logger.warning(
231+
f"Invalid Nexus link: {link}. Expected path to match "
232+
f"/namespaces/{{namespace}}/workflows/{{workflow_id}}/{{run_id}}{expected_suffix}"
233+
)
234+
return None
166235

167-
return nexusrpc.Link(url=url, type=_LinkType.NEXUS_OPERATION.value)
236+
groups = {
237+
name: urllib.parse.unquote(value)
238+
for name, value in match.groupdict().items()
239+
if name != "history" and value is not None
240+
}
241+
return groups, urllib.parse.parse_qs(url.query)
242+
243+
244+
def _optional_single_query_param(
245+
query_params: dict[str, list[str]], param_name: str
246+
) -> str:
247+
match query_params.get(param_name):
248+
case [param]:
249+
return param
250+
case [] | None:
251+
return ""
252+
case _:
253+
raise ValueError(f"Expected {param_name} to have at most 1 value")
168254

169255

170256
def nexus_link_to_workflow_event_link(
@@ -175,16 +261,11 @@ def nexus_link_to_workflow_event_link(
175261
This is used when propagating links from a Nexus start operation request to a
176262
StartWorklow request.
177263
"""
178-
url = urllib.parse.urlparse(link.url)
179-
match = _WORFKLOW_LINK_URL_PATH_REGEX.match(url.path)
180-
if not match:
181-
logger.warning(
182-
f"Invalid Nexus link: {link}. Expected path to match {_WORFKLOW_LINK_URL_PATH_REGEX.pattern}"
183-
)
264+
parsed = _parse_workflow_nexus_url(link, history=True)
265+
if parsed is None:
184266
return None
267+
groups, query_params = parsed
185268
try:
186-
query_params = urllib.parse.parse_qs(url.query)
187-
188269
request_id_ref = None
189270
event_ref = None
190271
match query_params.get(LINK_REFERENCE_TYPE_PARAM_NAME):
@@ -203,17 +284,39 @@ def nexus_link_to_workflow_event_link(
203284
)
204285
return None
205286

206-
groups = match.groupdict()
207287
workflow_event_link = temporalio.api.common.v1.Link.WorkflowEvent(
208-
namespace=urllib.parse.unquote(groups["namespace"]),
209-
workflow_id=urllib.parse.unquote(groups["workflow_id"]),
210-
run_id=urllib.parse.unquote(groups["run_id"]),
288+
namespace=groups["namespace"],
289+
workflow_id=groups["workflow_id"],
290+
run_id=groups["run_id"],
211291
event_ref=event_ref,
212292
request_id_ref=request_id_ref,
213293
)
214294
return temporalio.api.common.v1.Link(workflow_event=workflow_event_link)
215295

216296

297+
def nexus_link_to_workflow_link(
298+
link: nexusrpc.Link,
299+
) -> temporalio.api.common.v1.Link | None:
300+
"""Convert a nexus link into a Temporal Workflow link."""
301+
parsed = _parse_workflow_nexus_url(link, history=False)
302+
if parsed is None:
303+
return None
304+
groups, query_params = parsed
305+
try:
306+
reason = _optional_single_query_param(query_params, LINK_REASON_PARAM_NAME)
307+
except ValueError as err:
308+
logger.warning(f"Invalid Nexus link: {link}. {err}")
309+
return None
310+
311+
workflow_link = temporalio.api.common.v1.Link.Workflow(
312+
namespace=groups["namespace"],
313+
workflow_id=groups["workflow_id"],
314+
run_id=groups["run_id"],
315+
reason=reason,
316+
)
317+
return temporalio.api.common.v1.Link(workflow=workflow_link)
318+
319+
217320
def nexus_link_to_nexus_operation_link(
218321
nexus_link: nexusrpc.Link,
219322
) -> temporalio.api.common.v1.Link | None:
@@ -232,16 +335,11 @@ def nexus_link_to_nexus_operation_link(
232335

233336
query_params = urllib.parse.parse_qs(url.query)
234337

235-
match query_params.get(LINK_RUN_ID_PARAM_NAME):
236-
case [run_id_param]:
237-
run_id = run_id_param
238-
case [] | None:
239-
run_id = ""
240-
case _:
241-
logger.warning(
242-
f"Invalid Nexus link: {nexus_link}. Expected {LINK_RUN_ID_PARAM_NAME} to have at most 1 value"
243-
)
244-
return None
338+
try:
339+
run_id = _optional_single_query_param(query_params, LINK_RUN_ID_PARAM_NAME)
340+
except ValueError as err:
341+
logger.warning(f"Invalid Nexus link: {nexus_link}. {err}")
342+
return None
245343

246344
groups = match.groupdict()
247345
nexus_op_link = temporalio.api.common.v1.Link.NexusOperation(

tests/nexus/test_link_conversion.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,61 @@ def test_link_conversion_workflow_event_to_link_and_back(
209209
assert wf_event_link == actual_event
210210

211211

212+
@pytest.mark.parametrize(
213+
["workflow_link", "expected_link"],
214+
[
215+
(
216+
temporalio.api.common.v1.Link(
217+
workflow=temporalio.api.common.v1.Link.Workflow(
218+
namespace="ns",
219+
workflow_id="wid",
220+
run_id="rid",
221+
reason="query",
222+
)
223+
),
224+
nexusrpc.Link(
225+
type=temporalio.api.common.v1.Link.Workflow.DESCRIPTOR.full_name,
226+
url="temporal:///namespaces/ns/workflows/wid/rid?reason=query",
227+
),
228+
),
229+
(
230+
temporalio.api.common.v1.Link(
231+
workflow=temporalio.api.common.v1.Link.Workflow(
232+
namespace="ns2",
233+
workflow_id="wid/2",
234+
run_id="rid2",
235+
)
236+
),
237+
nexusrpc.Link(
238+
type=temporalio.api.common.v1.Link.Workflow.DESCRIPTOR.full_name,
239+
url="temporal:///namespaces/ns2/workflows/wid%2F2/rid2",
240+
),
241+
),
242+
],
243+
)
244+
def test_link_conversion_workflow_to_link_and_back(
245+
workflow_link: temporalio.api.common.v1.Link, expected_link: nexusrpc.Link
246+
):
247+
actual_link = temporalio.nexus._link_conversion.workflow_to_nexus_link(
248+
workflow_link.workflow
249+
)
250+
assert expected_link == actual_link
251+
252+
actual_workflow = temporalio.nexus._link_conversion.nexus_link_to_workflow_link(
253+
actual_link
254+
)
255+
assert workflow_link == actual_workflow
256+
257+
assert (
258+
expected_link
259+
== temporalio.nexus._link_conversion.temporal_link_to_nexus_link(workflow_link)
260+
)
261+
assert (
262+
workflow_link
263+
== temporalio.nexus._link_conversion.nexus_link_to_temporal_link(expected_link)
264+
)
265+
266+
212267
@pytest.mark.parametrize(
213268
["operation_link", "expected_link"],
214269
[

0 commit comments

Comments
 (0)