Skip to content

Commit d5be198

Browse files
committed
Deal with new Nexus link type
1 parent 57f5506 commit d5be198

2 files changed

Lines changed: 191 additions & 38 deletions

File tree

temporalio/nexus/_link_conversion.py

Lines changed: 136 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@
2323
r"^/namespaces/(?P<namespace>[^/]+)/nexus-operations/(?P<operation_id>[^/]+)$"
2424
)
2525

26-
_WORFKLOW_LINK_URL_PATH_REGEX = re.compile(
27-
r"^/namespaces/(?P<namespace>[^/]+)/workflows/(?P<workflow_id>[^/]+)/(?P<run_id>[^/]+)/history$"
26+
_WORKFLOW_LINK_URL_PATH_REGEX = re.compile(
27+
r"^/namespaces/(?P<namespace>[^/]+)/workflows/(?P<workflow_id>[^/]+)/(?P<run_id>[^/]+)(?P<history>/history)?$"
2828
)
2929

3030

3131
class _LinkType(str, Enum):
32-
WORKFLOW = temporalio.api.common.v1.Link.WorkflowEvent.DESCRIPTOR.full_name
32+
WORKFLOW_EVENT = temporalio.api.common.v1.Link.WorkflowEvent.DESCRIPTOR.full_name
33+
WORKFLOW = temporalio.api.common.v1.Link.Workflow.DESCRIPTOR.full_name
3334
NEXUS_OPERATION = temporalio.api.common.v1.Link.NexusOperation.DESCRIPTOR.full_name
3435

3536

@@ -38,6 +39,7 @@ class _LinkType(str, Enum):
3839
LINK_REQUEST_ID_PARAM_NAME = "requestID"
3940
LINK_REFERENCE_TYPE_PARAM_NAME = "referenceType"
4041
LINK_RUN_ID_PARAM_NAME = "runID"
42+
LINK_REASON_PARAM_NAME = "reason"
4143

4244
EVENT_REFERENCE_TYPE = "EventReference"
4345
REQUEST_ID_REFERENCE_TYPE = "RequestIdReference"
@@ -78,9 +80,12 @@ def nexus_link_to_temporal_link(
7880
return None
7981

8082
match link_type:
81-
case _LinkType.WORKFLOW:
83+
case _LinkType.WORKFLOW_EVENT:
8284
return nexus_link_to_workflow_event_link(nexus_link)
8385

86+
case _LinkType.WORKFLOW:
87+
return nexus_link_to_workflow_link(nexus_link)
88+
8489
case _LinkType.NEXUS_OPERATION:
8590
return nexus_link_to_nexus_operation_link(nexus_link)
8691

@@ -96,6 +101,9 @@ def temporal_link_to_nexus_link(
96101
case "workflow_event":
97102
return workflow_event_to_nexus_link(temporal_link.workflow_event)
98103

104+
case "workflow":
105+
return workflow_to_nexus_link(temporal_link.workflow)
106+
99107
case "nexus_operation":
100108
return nexus_operation_to_nexus_link(temporal_link.nexus_operation)
101109

@@ -115,12 +123,6 @@ def workflow_event_to_nexus_link(
115123
Used when propagating links from a StartWorkflow response to a Nexus start operation
116124
response.
117125
"""
118-
scheme = "temporal"
119-
namespace = urllib.parse.quote(workflow_event.namespace, safe="")
120-
workflow_id = urllib.parse.quote(workflow_event.workflow_id, safe="")
121-
run_id = urllib.parse.quote(workflow_event.run_id, safe="")
122-
path = f"/namespaces/{namespace}/workflows/{workflow_id}/{run_id}/history"
123-
124126
query_params = None
125127
match workflow_event.WhichOneof("reference"):
126128
case "event_ref":
@@ -132,10 +134,40 @@ def workflow_event_to_nexus_link(
132134
case _:
133135
pass
134136

135-
# urllib will omit '//' from the url if netloc is empty so we add the scheme manually
136-
url = f"{scheme}://{urllib.parse.urlunparse(('', '', path, '', query_params, ''))}"
137+
return nexusrpc.Link(
138+
url=_workflow_nexus_url(
139+
workflow_event.namespace,
140+
workflow_event.workflow_id,
141+
workflow_event.run_id,
142+
history=True,
143+
query_params=query_params,
144+
),
145+
type=_LinkType.WORKFLOW_EVENT.value,
146+
)
147+
148+
149+
def workflow_to_nexus_link(
150+
workflow: temporalio.api.common.v1.Link.Workflow,
151+
) -> nexusrpc.Link:
152+
"""Convert a Workflow link into a nexusrpc link."""
153+
query_params = ""
154+
if workflow.reason:
155+
query_params = urllib.parse.urlencode(
156+
{
157+
LINK_REASON_PARAM_NAME: workflow.reason,
158+
},
159+
)
137160

138-
return nexusrpc.Link(url=url, type=_LinkType.WORKFLOW.value)
161+
return nexusrpc.Link(
162+
url=_workflow_nexus_url(
163+
workflow.namespace,
164+
workflow.workflow_id,
165+
workflow.run_id,
166+
history=False,
167+
query_params=query_params,
168+
),
169+
type=_LinkType.WORKFLOW.value,
170+
)
139171

140172

141173
def nexus_operation_to_nexus_link(
@@ -146,7 +178,6 @@ def nexus_operation_to_nexus_link(
146178
Used when propagating links from a StartNexusOperation response to a Nexus start operation
147179
response.
148180
"""
149-
scheme = "temporal"
150181
namespace = urllib.parse.quote(op_link.namespace, safe="")
151182
operation_id = urllib.parse.quote(op_link.operation_id, safe="")
152183
path = f"/namespaces/{namespace}/nexus-operations/{operation_id}"
@@ -159,10 +190,65 @@ def nexus_operation_to_nexus_link(
159190
},
160191
)
161192

193+
return nexusrpc.Link(
194+
url=_temporal_nexus_url(path, query_params=query_params),
195+
type=_LinkType.NEXUS_OPERATION.value,
196+
)
197+
198+
199+
def _workflow_nexus_url(
200+
namespace: str,
201+
workflow_id: str,
202+
run_id: str,
203+
*,
204+
history: bool,
205+
query_params: str | None = "",
206+
) -> str:
207+
namespace = urllib.parse.quote(namespace, safe="")
208+
workflow_id = urllib.parse.quote(workflow_id, safe="")
209+
run_id = urllib.parse.quote(run_id, safe="")
210+
path = f"/namespaces/{namespace}/workflows/{workflow_id}/{run_id}"
211+
if history:
212+
path += "/history"
213+
return _temporal_nexus_url(path, query_params=query_params)
214+
215+
216+
def _temporal_nexus_url(path: str, *, query_params: str | None = "") -> str:
162217
# urllib will omit '//' from the url if netloc is empty so we add the scheme manually
163-
url = f"{scheme}://{urllib.parse.urlunparse(('', '', path, '', query_params, ''))}"
218+
return f"temporal://{urllib.parse.urlunparse(('', '', path, '', query_params or '', ''))}"
219+
220+
221+
def _parse_workflow_nexus_url(
222+
link: nexusrpc.Link, *, history: bool
223+
) -> tuple[dict[str, str], dict[str, list[str]]] | None:
224+
url = urllib.parse.urlparse(link.url)
225+
match = _WORKFLOW_LINK_URL_PATH_REGEX.match(url.path)
226+
if not match or bool(match.group("history")) != history:
227+
expected_suffix = "/history" if history else ""
228+
logger.warning(
229+
f"Invalid Nexus link: {link}. Expected path to match "
230+
f"/namespaces/{{namespace}}/workflows/{{workflow_id}}/{{run_id}}{expected_suffix}"
231+
)
232+
return None
164233

165-
return nexusrpc.Link(url=url, type=_LinkType.NEXUS_OPERATION.value)
234+
groups = {
235+
name: urllib.parse.unquote(value)
236+
for name, value in match.groupdict().items()
237+
if name != "history" and value is not None
238+
}
239+
return groups, urllib.parse.parse_qs(url.query)
240+
241+
242+
def _optional_single_query_param(
243+
query_params: dict[str, list[str]], param_name: str
244+
) -> str:
245+
match query_params.get(param_name):
246+
case [param]:
247+
return param
248+
case [] | None:
249+
return ""
250+
case _:
251+
raise ValueError(f"Expected {param_name} to have at most 1 value")
166252

167253

168254
def nexus_link_to_workflow_event_link(
@@ -173,16 +259,11 @@ def nexus_link_to_workflow_event_link(
173259
This is used when propagating links from a Nexus start operation request to a
174260
StartWorklow request.
175261
"""
176-
url = urllib.parse.urlparse(link.url)
177-
match = _WORFKLOW_LINK_URL_PATH_REGEX.match(url.path)
178-
if not match:
179-
logger.warning(
180-
f"Invalid Nexus link: {link}. Expected path to match {_WORFKLOW_LINK_URL_PATH_REGEX.pattern}"
181-
)
262+
parsed = _parse_workflow_nexus_url(link, history=True)
263+
if parsed is None:
182264
return None
265+
groups, query_params = parsed
183266
try:
184-
query_params = urllib.parse.parse_qs(url.query)
185-
186267
request_id_ref = None
187268
event_ref = None
188269
match query_params.get(LINK_REFERENCE_TYPE_PARAM_NAME):
@@ -201,17 +282,39 @@ def nexus_link_to_workflow_event_link(
201282
)
202283
return None
203284

204-
groups = match.groupdict()
205285
workflow_event_link = temporalio.api.common.v1.Link.WorkflowEvent(
206-
namespace=urllib.parse.unquote(groups["namespace"]),
207-
workflow_id=urllib.parse.unquote(groups["workflow_id"]),
208-
run_id=urllib.parse.unquote(groups["run_id"]),
286+
namespace=groups["namespace"],
287+
workflow_id=groups["workflow_id"],
288+
run_id=groups["run_id"],
209289
event_ref=event_ref,
210290
request_id_ref=request_id_ref,
211291
)
212292
return temporalio.api.common.v1.Link(workflow_event=workflow_event_link)
213293

214294

295+
def nexus_link_to_workflow_link(
296+
link: nexusrpc.Link,
297+
) -> temporalio.api.common.v1.Link | None:
298+
"""Convert a nexus link into a Temporal Workflow link."""
299+
parsed = _parse_workflow_nexus_url(link, history=False)
300+
if parsed is None:
301+
return None
302+
groups, query_params = parsed
303+
try:
304+
reason = _optional_single_query_param(query_params, LINK_REASON_PARAM_NAME)
305+
except ValueError as err:
306+
logger.warning(f"Invalid Nexus link: {link}. {err}")
307+
return None
308+
309+
workflow_link = temporalio.api.common.v1.Link.Workflow(
310+
namespace=groups["namespace"],
311+
workflow_id=groups["workflow_id"],
312+
run_id=groups["run_id"],
313+
reason=reason,
314+
)
315+
return temporalio.api.common.v1.Link(workflow=workflow_link)
316+
317+
215318
def nexus_link_to_nexus_operation_link(
216319
nexus_link: nexusrpc.Link,
217320
) -> temporalio.api.common.v1.Link | None:
@@ -230,16 +333,11 @@ def nexus_link_to_nexus_operation_link(
230333

231334
query_params = urllib.parse.parse_qs(url.query)
232335

233-
match query_params.get(LINK_RUN_ID_PARAM_NAME):
234-
case [run_id_param]:
235-
run_id = run_id_param
236-
case [] | None:
237-
run_id = ""
238-
case _:
239-
logger.warning(
240-
f"Invalid Nexus link: {nexus_link}. Expected {LINK_RUN_ID_PARAM_NAME} to have at most 1 value"
241-
)
242-
return None
336+
try:
337+
run_id = _optional_single_query_param(query_params, LINK_RUN_ID_PARAM_NAME)
338+
except ValueError as err:
339+
logger.warning(f"Invalid Nexus link: {nexus_link}. {err}")
340+
return None
243341

244342
groups = match.groupdict()
245343
nexus_op_link = temporalio.api.common.v1.Link.NexusOperation(

tests/nexus/test_link_conversion.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,61 @@ def test_link_conversion_workflow_event_to_link_and_back(
209209
assert wf_event_link == actual_event
210210

211211

212+
@pytest.mark.parametrize(
213+
["workflow_link", "expected_link"],
214+
[
215+
(
216+
temporalio.api.common.v1.Link(
217+
workflow=temporalio.api.common.v1.Link.Workflow(
218+
namespace="ns",
219+
workflow_id="wid",
220+
run_id="rid",
221+
reason="query",
222+
)
223+
),
224+
nexusrpc.Link(
225+
type=temporalio.api.common.v1.Link.Workflow.DESCRIPTOR.full_name,
226+
url="temporal:///namespaces/ns/workflows/wid/rid?reason=query",
227+
),
228+
),
229+
(
230+
temporalio.api.common.v1.Link(
231+
workflow=temporalio.api.common.v1.Link.Workflow(
232+
namespace="ns2",
233+
workflow_id="wid/2",
234+
run_id="rid2",
235+
)
236+
),
237+
nexusrpc.Link(
238+
type=temporalio.api.common.v1.Link.Workflow.DESCRIPTOR.full_name,
239+
url="temporal:///namespaces/ns2/workflows/wid%2F2/rid2",
240+
),
241+
),
242+
],
243+
)
244+
def test_link_conversion_workflow_to_link_and_back(
245+
workflow_link: temporalio.api.common.v1.Link, expected_link: nexusrpc.Link
246+
):
247+
actual_link = temporalio.nexus._link_conversion.workflow_to_nexus_link(
248+
workflow_link.workflow
249+
)
250+
assert expected_link == actual_link
251+
252+
actual_workflow = temporalio.nexus._link_conversion.nexus_link_to_workflow_link(
253+
actual_link
254+
)
255+
assert workflow_link == actual_workflow
256+
257+
assert (
258+
expected_link
259+
== temporalio.nexus._link_conversion.temporal_link_to_nexus_link(workflow_link)
260+
)
261+
assert (
262+
workflow_link
263+
== temporalio.nexus._link_conversion.nexus_link_to_temporal_link(expected_link)
264+
)
265+
266+
212267
@pytest.mark.parametrize(
213268
["operation_link", "expected_link"],
214269
[

0 commit comments

Comments
 (0)