Skip to content

Commit 53b0455

Browse files
committed
fix: use Context._create_internal to get InternalContext in breakpoint wrapper
the step config patching approach failed because the llama_index instrumentation dispatcher validates kwargs against the original function signature via inspect.signature().bind(), rejecting the injected param. instead, create an InternalContext directly via Context._create_internal(workflow). this works because the wrapper runs inside the step worker where StepWorkerStateContextVar is already set by the framework.
1 parent febce2e commit 53b0455

1 file changed

Lines changed: 18 additions & 33 deletions

File tree

  • packages/uipath-llamaindex/src/uipath_llamaindex/runtime

packages/uipath-llamaindex/src/uipath_llamaindex/runtime/breakpoints.py

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -64,55 +64,40 @@ def inject_breakpoints(workflow: Workflow) -> None:
6464
cls._step_functions[name] = wrapped
6565

6666

67-
_BP_CTX_PARAM = "__bp_ctx__"
68-
69-
7067
def make_wrapper(
7168
step_name: str,
7269
original: StepFunction[..., Any],
7370
) -> StepFunction[..., Any]:
7471
"""
7572
Return a wrapped step function that pauses on breakpoints.
7673
77-
The workflows framework only passes the internal Context to a step when
78-
the step's ``StepConfig.context_parameter`` is set. Many steps don't
79-
declare a ``ctx`` parameter, so we patch the config to request one under
80-
a private name (``__bp_ctx__``). The ``partial()`` helper inside the
81-
framework will then bind the per-step InternalContext into kwargs for us.
74+
The wrapper creates an InternalContext via ``Context._create_internal``
75+
to call ``wait_for_event``. This works because the wrapper executes
76+
inside the step worker where the framework has already set the
77+
``StepWorkerStateContextVar``.
8278
"""
8379

8480
@functools.wraps(original)
8581
async def wrapper(self, *args: Any, **kwargs: Any) -> Any:
86-
# Pop the injected internal context so it doesn't leak to the original step.
87-
ctx: Context | None = kwargs.pop(_BP_CTX_PARAM, None)
88-
89-
if isinstance(ctx, Context):
90-
bp_event = BreakpointEvent(
91-
breakpoint_node=step_name,
92-
prefix=f"Breakpoint at {step_name}",
93-
)
94-
# Suspend until debugger resumes
95-
await ctx.wait_for_event(
96-
BreakpointResumeEvent,
97-
waiter_event=bp_event,
98-
waiter_id=f"bp_{step_name}",
99-
timeout=None,
100-
)
82+
ctx = Context._create_internal(workflow=self)
83+
84+
bp_event = BreakpointEvent(
85+
breakpoint_node=step_name,
86+
prefix=f"Breakpoint at {step_name}",
87+
)
88+
# Suspend until debugger resumes
89+
await ctx.wait_for_event(
90+
BreakpointResumeEvent,
91+
waiter_event=bp_event,
92+
waiter_id=f"bp_{step_name}",
93+
timeout=None,
94+
)
10195

10296
# Continue original step logic
10397
return await original(self, *args, **kwargs)
10498

10599
wrapped = cast(StepFunction[..., Any], wrapper)
106-
107-
# Clone the step config and ensure context_parameter is set so the
108-
# framework passes the InternalContext to our wrapper.
109-
original_config = original._step_config
110-
if original_config and not original_config.context_parameter:
111-
wrapped._step_config = original_config.model_copy(
112-
update={"context_parameter": _BP_CTX_PARAM}
113-
)
114-
else:
115-
wrapped._step_config = original_config
100+
wrapped._step_config = original._step_config
116101

117102
return wrapped
118103

0 commit comments

Comments
 (0)