|
2 | 2 | import logging |
3 | 3 | import os |
4 | 4 | import pickle |
| 5 | +from cgitb import handler |
5 | 6 | from contextlib import suppress |
6 | 7 | from typing import Optional, cast |
7 | 8 |
|
8 | 9 | from llama_index.core.workflow import ( |
9 | 10 | Context, |
10 | 11 | HumanResponseEvent, |
11 | 12 | InputRequiredEvent, |
12 | | - JsonPickleSerializer, |
| 13 | + JsonPickleSerializer, StartEvent, |
13 | 14 | ) |
14 | 15 | from llama_index.core.workflow.errors import WorkflowTimeoutError |
15 | 16 | from llama_index.core.workflow.handler import WorkflowHandler |
@@ -87,30 +88,12 @@ async def execute(self) -> Optional[UiPathRuntimeResult]: |
87 | 88 | if self.context.workflow_context is None: |
88 | 89 | return None |
89 | 90 |
|
90 | | - handler: WorkflowHandler = self.context.workflow.run( |
91 | | - start_event=ev if self.context.resume else None, |
92 | | - ctx=self.context.workflow_context, |
93 | | - **(self.context.input_json or {}), |
94 | | - ) |
95 | | - |
96 | 91 | resume_trigger: Optional[UiPathResumeTrigger] = None |
97 | 92 |
|
98 | | - response_applied = False |
99 | | - async for event in handler.stream_events(): |
100 | | - # log the received event on trace level |
101 | | - if isinstance(event, InputRequiredEvent): |
102 | | - # for api trigger hitl scenarios only pass the str input for processing |
103 | | - hitl_processor = HitlProcessor(value=event.prefix) |
104 | | - if self.context.resume and not response_applied: |
105 | | - # If we are resuming, we need to apply the response to the event stream. |
106 | | - response_applied = True |
107 | | - response_event = await self.get_response_event() |
108 | | - if response_event: |
109 | | - # If we have a response event, send it to the workflow context. |
110 | | - self.context.workflow_context.send_event(response_event) |
111 | | - else: |
112 | | - resume_trigger = await hitl_processor.create_resume_trigger() |
113 | | - break |
| 93 | + if self.context.debug: |
| 94 | + handler = await self.debug_workflow() |
| 95 | + else: |
| 96 | + handler, resume_trigger = await self.run_workflow(ev) |
114 | 97 |
|
115 | 98 | if resume_trigger is None: |
116 | 99 | try: |
@@ -179,6 +162,51 @@ async def execute(self) -> Optional[UiPathRuntimeResult]: |
179 | 162 | finally: |
180 | 163 | self.trace_provider.shutdown() |
181 | 164 |
|
| 165 | + async def debug_workflow(self): |
| 166 | + try: |
| 167 | + import debugpy |
| 168 | + except ImportError: |
| 169 | + raise Exception("DebugPy not available") |
| 170 | + |
| 171 | + handler: WorkflowHandler = self.context.workflow.run( |
| 172 | + stepwise=True, |
| 173 | + ctx=self.context.workflow_context, |
| 174 | + **(self.context.input_json or {}), |
| 175 | + ) |
| 176 | + # if we are in debug mode, emit all events |
| 177 | + while produced_events := await handler.run_step(): |
| 178 | + for ev in produced_events: |
| 179 | + debugpy.breakpoint() |
| 180 | + handler.ctx.send_event(ev) |
| 181 | + |
| 182 | + async def run_workflow(self, start_event :type[StartEvent]) -> (WorkflowHandler, Optional[UiPathResumeTrigger]): |
| 183 | + resume_trigger: Optional[UiPathResumeTrigger] = None |
| 184 | + |
| 185 | + handler: WorkflowHandler = self.context.workflow.run( |
| 186 | + start_event=start_event if self.context.resume else None, |
| 187 | + ctx=self.context.workflow_context, |
| 188 | + **(self.context.input_json or {}), |
| 189 | + ) |
| 190 | + |
| 191 | + response_applied = False |
| 192 | + async for event in handler.stream_events(): |
| 193 | + # log the received event on trace level |
| 194 | + if isinstance(event, InputRequiredEvent): |
| 195 | + # for api trigger hitl scenarios only pass the str input for processing |
| 196 | + hitl_processor = HitlProcessor(value=event.prefix) |
| 197 | + if self.context.resume and not response_applied: |
| 198 | + # If we are resuming, we need to apply the response to the event stream. |
| 199 | + response_applied = True |
| 200 | + response_event = await self.get_response_event() |
| 201 | + if response_event: |
| 202 | + # If we have a response event, send it to the workflow context. |
| 203 | + self.context.workflow_context.send_event(response_event) |
| 204 | + else: |
| 205 | + resume_trigger = await hitl_processor.create_resume_trigger() |
| 206 | + break |
| 207 | + |
| 208 | + return handler, resume_trigger |
| 209 | + |
182 | 210 | async def validate(self) -> None: |
183 | 211 | """Validate runtime inputs and load Llama agent configuration.""" |
184 | 212 | try: |
|
0 commit comments