Skip to content

Commit a8da676

Browse files
authored
fix: durable interrupts - use SkipInterruptValue to avoid conditional interrupts (#629)
1 parent 26d5083 commit a8da676

File tree

14 files changed

+180
-77
lines changed

14 files changed

+180
-77
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-langchain"
3-
version = "0.7.5"
3+
version = "0.7.6"
44
description = "Python SDK that enables developers to build and deploy LangGraph agents to the UiPath Cloud Platform"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.11"
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
"""Durable interrupt package for side-effect-safe interrupt/resume in LangGraph."""
2+
3+
from .decorator import _durable_state, durable_interrupt
4+
from .skip_interrupt import SkipInterruptValue
5+
6+
__all__ = [
7+
"durable_interrupt",
8+
"SkipInterruptValue",
9+
"_durable_state",
10+
]

src/uipath_langchain/agent/tools/durable_interrupt.py renamed to src/uipath_langchain/agent/tools/durable_interrupt/decorator.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ async def start_job():
3939
from langgraph.config import get_config
4040
from langgraph.types import interrupt
4141

42+
from .skip_interrupt import SkipInterruptValue
43+
4244
F = TypeVar("F", bound=Callable[..., Any])
4345

4446
# Tracks (scratchpad identity, call index) per node execution.
@@ -77,6 +79,21 @@ def _is_resumed(scratchpad: Any, idx: int) -> bool:
7779
return scratchpad is not None and scratchpad.resume and idx < len(scratchpad.resume)
7880

7981

82+
def _inject_resume(scratchpad: Any, value: Any) -> Any:
83+
"""Inject a value into the scratchpad resume list and return it via interrupt(None).
84+
85+
This keeps LangGraph's interrupt_counter in sync (interrupt(None) increments it)
86+
while avoiding a real suspend — interrupt(None) finds the injected value and
87+
returns it immediately without raising GraphInterrupt.
88+
"""
89+
if scratchpad is not None:
90+
if scratchpad.resume is None:
91+
scratchpad.resume = []
92+
scratchpad.resume.append(value)
93+
return interrupt(None)
94+
return value
95+
96+
8097
def durable_interrupt(fn: F) -> F:
8198
"""Decorator that executes a side-effecting function exactly once and interrupts.
8299
@@ -85,6 +102,10 @@ def durable_interrupt(fn: F) -> F:
85102
is skipped and ``interrupt(None)`` returns the resume value from the
86103
runtime.
87104
105+
If the body returns a ``SkipInterruptValue``, the resolved value is
106+
injected into the scratchpad resume list and ``interrupt(None)`` returns
107+
it immediately — no real suspend/resume cycle occurs.
108+
88109
Replaces the ``@task`` + ``interrupt()`` two-step pattern with a single
89110
decorator that enforces the pairing contract. Works correctly in both
90111
parent graphs and subgraphs.
@@ -112,7 +133,10 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
112133
scratchpad, idx = _next_durable_index()
113134
if _is_resumed(scratchpad, idx):
114135
return interrupt(None)
115-
return interrupt(await fn(*args, **kwargs))
136+
result = await fn(*args, **kwargs)
137+
if isinstance(result, SkipInterruptValue):
138+
return _inject_resume(scratchpad, result.resume_value)
139+
return interrupt(result)
116140

117141
return async_wrapper # type: ignore[return-value]
118142

@@ -121,6 +145,9 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
121145
scratchpad, idx = _next_durable_index()
122146
if _is_resumed(scratchpad, idx):
123147
return interrupt(None)
124-
return interrupt(fn(*args, **kwargs))
148+
result = fn(*args, **kwargs)
149+
if isinstance(result, SkipInterruptValue):
150+
return _inject_resume(scratchpad, result.resume_value)
151+
return interrupt(result)
125152

126153
return sync_wrapper # type: ignore[return-value]
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
"""Skip-interrupt value types for @durable_interrupt.
2+
3+
SkipInterruptValue — base class
4+
================================
5+
6+
When a node has **multiple sequential @durable_interrupt calls**, the
7+
decorator's internal counter must produce the same sequence of indices on
8+
every execution (first run *and* all resume runs). A conditional interrupt
9+
— one that only fires under certain conditions — breaks this assumption and
10+
causes index drift on resume.
11+
12+
``SkipInterruptValue`` solves this by letting a @durable_interrupt-decorated
13+
function signal "the result is already available, skip the real interrupt"
14+
while still keeping LangGraph's interrupt counter in sync. The decorator
15+
injects the resolved value into ``scratchpad.resume`` and calls
16+
``interrupt(None)``, which returns immediately without raising ``GraphInterrupt``.
17+
18+
Usage example::
19+
20+
@durable_interrupt
21+
async def create_index():
22+
index = await client.create_index_async(...)
23+
if index.in_progress():
24+
return WaitIndex(index=index) # real interrupt
25+
return ReadyIndex(index=index) # instant resume
26+
27+
@durable_interrupt
28+
async def start_processing():
29+
return StartProcessing(index_id=index.id) # real interrupt
30+
31+
# Both @durable_interrupt calls always execute — the counter always
32+
# increments by 2. When the index is ready, ReadyIndex (a
33+
# SkipInterruptValue subclass) injects the result into the scratchpad
34+
# so the graph continues without suspending.
35+
"""
36+
37+
from typing import Any
38+
39+
40+
class SkipInterruptValue:
41+
"""Base class for values that skip the interrupt in @durable_interrupt.
42+
43+
Subclasses must implement the ``resume_value`` property, returning the
44+
value to inject into the scratchpad resume list.
45+
"""
46+
47+
@property
48+
def resume_value(self) -> Any:
49+
"""The value to inject into the resume list and return to the caller."""
50+
raise NotImplementedError

src/uipath_langchain/agent/tools/internal_tools/batch_transform_tool.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@
2929
from uipath_langchain.agent.exceptions import AgentStartupError, AgentStartupErrorCode
3030
from uipath_langchain.agent.react.jsonschema_pydantic_converter import create_model
3131
from uipath_langchain.agent.react.types import AgentGraphState
32-
from uipath_langchain.agent.tools.durable_interrupt import durable_interrupt
32+
from uipath_langchain.agent.tools.durable_interrupt import (
33+
SkipInterruptValue,
34+
durable_interrupt,
35+
)
3336
from uipath_langchain.agent.tools.internal_tools.schema_utils import (
3437
BATCH_TRANSFORM_OUTPUT_SCHEMA,
3538
add_query_field_to_schema,
@@ -42,6 +45,17 @@
4245
from uipath_langchain.agent.tools.utils import sanitize_tool_name
4346

4447

48+
class ReadyEphemeralIndex(SkipInterruptValue):
49+
"""An ephemeral index that is already ready (no wait needed)."""
50+
51+
def __init__(self, index: ContextGroundingIndex):
52+
self.index = index
53+
54+
@property
55+
def resume_value(self) -> Any:
56+
return self.index.model_dump()
57+
58+
4559
def create_batch_transform_tool(
4660
resource: AgentInternalToolResourceConfig, llm: BaseChatModel
4761
) -> StructuredTool:
@@ -131,7 +145,7 @@ async def create_ephemeral_index():
131145
)
132146
if ephemeral_index.in_progress_ingestion():
133147
return WaitEphemeralIndex(index=ephemeral_index)
134-
return ephemeral_index
148+
return ReadyEphemeralIndex(index=ephemeral_index)
135149

136150
index_result = await create_ephemeral_index()
137151
if isinstance(index_result, dict):

src/uipath_langchain/agent/tools/internal_tools/deeprag_tool.py

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
from uipath_langchain.agent.exceptions import AgentStartupError, AgentStartupErrorCode
2626
from uipath_langchain.agent.react.jsonschema_pydantic_converter import create_model
2727
from uipath_langchain.agent.react.types import AgentGraphState
28-
from uipath_langchain.agent.tools.durable_interrupt import durable_interrupt
28+
from uipath_langchain.agent.tools.durable_interrupt import (
29+
SkipInterruptValue,
30+
durable_interrupt,
31+
)
2932
from uipath_langchain.agent.tools.internal_tools.schema_utils import (
3033
add_query_field_to_schema,
3134
)
@@ -37,6 +40,17 @@
3740
from uipath_langchain.agent.tools.utils import sanitize_tool_name
3841

3942

43+
class ReadyEphemeralIndex(SkipInterruptValue):
44+
"""An ephemeral index that is already ready (no wait needed)."""
45+
46+
def __init__(self, index: ContextGroundingIndex):
47+
self.index = index
48+
49+
@property
50+
def resume_value(self) -> Any:
51+
return self.index.model_dump()
52+
53+
4054
def create_deeprag_tool(
4155
resource: AgentInternalToolResourceConfig, llm: BaseChatModel
4256
) -> StructuredTool:
@@ -101,6 +115,7 @@ async def deeprag_tool_fn(**kwargs: Any) -> dict[str, Any]:
101115
example_calls=[], # Examples cannot be provided for internal tools
102116
)
103117
async def invoke_deeprag(**_tool_kwargs: Any):
118+
@durable_interrupt
104119
async def create_ephemeral_index():
105120
uipath = UiPath()
106121
ephemeral_index = (
@@ -109,21 +124,9 @@ async def create_ephemeral_index():
109124
attachments=[attachment_id],
110125
)
111126
)
112-
113-
# TODO this will not resume on concurrent runs for the same attachment
114127
if ephemeral_index.in_progress_ingestion():
115-
116-
@durable_interrupt
117-
async def wait_for_ephemeral_index():
118-
return WaitEphemeralIndex(index=ephemeral_index)
119-
120-
index_result = await wait_for_ephemeral_index()
121-
if isinstance(index_result, dict):
122-
ephemeral_index = ContextGroundingIndex(**index_result)
123-
else:
124-
ephemeral_index = index_result
125-
126-
return ephemeral_index
128+
return WaitEphemeralIndex(index=ephemeral_index)
129+
return ReadyEphemeralIndex(index=ephemeral_index)
127130

128131
index_result = await create_ephemeral_index()
129132
if isinstance(index_result, dict):
@@ -142,7 +145,9 @@ async def create_deeprag():
142145
is_ephemeral_index=True,
143146
)
144147

145-
return await create_deeprag()
148+
result = await create_deeprag()
149+
150+
return result
146151

147152
return await invoke_deeprag(**kwargs)
148153

tests/agent/tools/internal_tools/test_batch_transform_tool.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ def resource_config_dynamic(self, batch_transform_settings_dynamic_query):
150150
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPathConfig"
151151
)
152152
@patch("uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPath")
153-
@patch("uipath_langchain.agent.tools.durable_interrupt.interrupt")
153+
@patch("uipath_langchain.agent.tools.durable_interrupt.decorator.interrupt")
154154
@patch(
155155
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.mockable",
156156
lambda **kwargs: lambda f: f,
@@ -180,9 +180,8 @@ async def test_create_batch_transform_tool_static_query_index_ready(
180180
return_value=mock_index
181181
)
182182

183-
# durable_interrupt always calls interrupt(); first for index, second for transform
183+
# Index is ready → ReadyEphemeralIndex skips interrupt(). Only create_batch_transform fires.
184184
mock_interrupt.side_effect = [
185-
mock_index,
186185
{"file_path": "/path/to/output.csv"},
187186
]
188187

@@ -226,8 +225,8 @@ async def test_create_batch_transform_tool_static_query_index_ready(
226225
assert call_kwargs["usage"] == "BatchRAG"
227226
assert mock_attachment.ID in call_kwargs["attachments"]
228227

229-
# Both durable_interrupts call interrupt()
230-
assert mock_interrupt.call_count == 2
228+
# Only create_batch_transform calls interrupt(); index was instant-resumed
229+
assert mock_interrupt.call_count == 1
231230

232231
# Verify attachment was uploaded
233232
mock_uipath.jobs.create_attachment_async.assert_called_once_with(
@@ -243,7 +242,7 @@ async def test_create_batch_transform_tool_static_query_index_ready(
243242
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPathConfig"
244243
)
245244
@patch("uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPath")
246-
@patch("uipath_langchain.agent.tools.durable_interrupt.interrupt")
245+
@patch("uipath_langchain.agent.tools.durable_interrupt.decorator.interrupt")
247246
@patch(
248247
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.mockable",
249248
lambda **kwargs: lambda f: f,
@@ -324,7 +323,7 @@ async def test_create_batch_transform_tool_static_query_wait_for_ingestion(
324323
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPathConfig"
325324
)
326325
@patch("uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPath")
327-
@patch("uipath_langchain.agent.tools.durable_interrupt.interrupt")
326+
@patch("uipath_langchain.agent.tools.durable_interrupt.decorator.interrupt")
328327
@patch(
329328
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.mockable",
330329
lambda **kwargs: lambda f: f,
@@ -354,9 +353,8 @@ async def test_create_batch_transform_tool_dynamic_query(
354353
return_value=mock_index
355354
)
356355

357-
# durable_interrupt always calls interrupt(); first for index, second for transform
356+
# Index is ready → ReadyEphemeralIndex skips interrupt(). Only create_batch_transform fires.
358357
mock_interrupt.side_effect = [
359-
mock_index,
360358
{"output": "Transformation complete"},
361359
]
362360

@@ -397,7 +395,7 @@ async def test_create_batch_transform_tool_dynamic_query(
397395
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPathConfig"
398396
)
399397
@patch("uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPath")
400-
@patch("uipath_langchain.agent.tools.durable_interrupt.interrupt")
398+
@patch("uipath_langchain.agent.tools.durable_interrupt.decorator.interrupt")
401399
@patch(
402400
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.mockable",
403401
lambda **kwargs: lambda f: f,
@@ -427,9 +425,8 @@ async def test_create_batch_transform_tool_default_destination_path(
427425
return_value=mock_index
428426
)
429427

430-
# durable_interrupt always calls interrupt(); first for index, second for transform
428+
# Index is ready → ReadyEphemeralIndex skips interrupt(). Only create_batch_transform fires.
431429
mock_interrupt.side_effect = [
432-
mock_index,
433430
{"file_path": "output.csv"},
434431
]
435432

@@ -461,8 +458,8 @@ async def test_create_batch_transform_tool_default_destination_path(
461458
}
462459
}
463460

464-
# Both durable_interrupts call interrupt()
465-
assert mock_interrupt.call_count == 2
461+
# Only create_batch_transform calls interrupt(); index was instant-resumed
462+
assert mock_interrupt.call_count == 1
466463

467464
# Verify attachment was uploaded with default path
468465
mock_uipath.jobs.create_attachment_async.assert_called_once_with(
@@ -478,7 +475,7 @@ async def test_create_batch_transform_tool_default_destination_path(
478475
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPathConfig"
479476
)
480477
@patch("uipath_langchain.agent.tools.internal_tools.batch_transform_tool.UiPath")
481-
@patch("uipath_langchain.agent.tools.durable_interrupt.interrupt")
478+
@patch("uipath_langchain.agent.tools.durable_interrupt.decorator.interrupt")
482479
@patch(
483480
"uipath_langchain.agent.tools.internal_tools.batch_transform_tool.mockable",
484481
lambda **kwargs: lambda f: f,
@@ -508,9 +505,8 @@ async def test_create_batch_transform_tool_custom_destination_path(
508505
return_value=mock_index
509506
)
510507

511-
# durable_interrupt always calls interrupt(); first for index, second for transform
508+
# Index is ready → ReadyEphemeralIndex skips interrupt(). Only create_batch_transform fires.
512509
mock_interrupt.side_effect = [
513-
mock_index,
514510
{"file_path": "/custom/path/result.csv"},
515511
]
516512

0 commit comments

Comments
 (0)