|
2 | 2 | import logging |
3 | 3 | import os |
4 | 4 | import pickle |
| 5 | +from cgitb import handler |
5 | 6 | from contextlib import suppress |
6 | 7 | from typing import Optional, cast |
7 | 8 |
|
8 | 9 | from llama_index.core.workflow import ( |
9 | 10 | Context, |
10 | 11 | HumanResponseEvent, |
11 | 12 | InputRequiredEvent, |
12 | | - JsonPickleSerializer, |
| 13 | + JsonPickleSerializer, StartEvent, |
13 | 14 | ) |
14 | 15 | from llama_index.core.workflow.errors import WorkflowTimeoutError |
15 | 16 | from llama_index.core.workflow.handler import WorkflowHandler |
@@ -87,30 +88,12 @@ async def execute(self) -> Optional[UiPathRuntimeResult]: |
87 | 88 | if self.context.workflow_context is None: |
88 | 89 | return None |
89 | 90 |
|
90 | | - handler: WorkflowHandler = self.context.workflow.run( |
91 | | - start_event=ev if self.context.resume else None, |
92 | | - ctx=self.context.workflow_context, |
93 | | - **(self.context.input_json or {}), |
94 | | - ) |
95 | | - |
96 | 91 | resume_trigger: Optional[UiPathResumeTrigger] = None |
97 | 92 |
|
98 | | - response_applied = False |
99 | | - async for event in handler.stream_events(): |
100 | | - # log the received event on trace level |
101 | | - if isinstance(event, InputRequiredEvent): |
102 | | - # for api trigger hitl scenarios only pass the str input for processing |
103 | | - hitl_processor = HitlProcessor(value=event.prefix) |
104 | | - if self.context.resume and not response_applied: |
105 | | - # If we are resuming, we need to apply the response to the event stream. |
106 | | - response_applied = True |
107 | | - response_event = await self.get_response_event() |
108 | | - if response_event: |
109 | | - # If we have a response event, send it to the workflow context. |
110 | | - self.context.workflow_context.send_event(response_event) |
111 | | - else: |
112 | | - resume_trigger = await hitl_processor.create_resume_trigger() |
113 | | - break |
| 93 | + if self.context.debug and os.getenv("VSCODE_EXPLORER_EXECUTION") == "True": |
| 94 | + handler = await self.debug_workflow() |
| 95 | + else: |
| 96 | + handler, resume_trigger = await self.run_workflow(ev) |
114 | 97 |
|
115 | 98 | if resume_trigger is None: |
116 | 99 | try: |
@@ -179,6 +162,67 @@ async def execute(self) -> Optional[UiPathRuntimeResult]: |
179 | 162 | finally: |
180 | 163 | self.trace_provider.shutdown() |
181 | 164 |
|
| 165 | + async def debug_workflow(self): |
| 166 | + try: |
| 167 | + import debugpy |
| 168 | + except ImportError: |
| 169 | + raise Exception("DebugPy not available") |
| 170 | + |
| 171 | + handler: WorkflowHandler = self.context.workflow.run( |
| 172 | + stepwise=True, |
| 173 | + **(self.context.input_json or {}), |
| 174 | + ) |
| 175 | + # if we are in debug mode, emit all events |
| 176 | + while produced_events := await handler.run_step(): |
| 177 | + for ev in produced_events: |
| 178 | + debug_file_path = os.environ.get('VSCODE_EXTENSION_TEMP_DEBUG_FILE') |
| 179 | + if debug_file_path: |
| 180 | + # Write the debug data |
| 181 | + debug_data = { |
| 182 | + 'event': ev, |
| 183 | + 'variables': { |
| 184 | + 'current_event': ev, |
| 185 | + 'handler_state': handler.ctx.state if hasattr(handler.ctx, 'state') else None, |
| 186 | + } |
| 187 | + } |
| 188 | + with open(debug_file_path, 'w') as f: |
| 189 | + json.dump(debug_data, f) |
| 190 | + |
| 191 | + # Trigger breakpoint |
| 192 | + debugpy.breakpoint() |
| 193 | + |
| 194 | + # Send the event |
| 195 | + handler.ctx.send_event(ev) |
| 196 | + return handler |
| 197 | + |
| 198 | + async def run_workflow(self, start_event :type[StartEvent]) -> (WorkflowHandler, Optional[UiPathResumeTrigger]): |
| 199 | + resume_trigger: Optional[UiPathResumeTrigger] = None |
| 200 | + |
| 201 | + handler: WorkflowHandler = self.context.workflow.run( |
| 202 | + start_event=start_event if self.context.resume else None, |
| 203 | + ctx=self.context.workflow_context, |
| 204 | + **(self.context.input_json or {}), |
| 205 | + ) |
| 206 | + |
| 207 | + response_applied = False |
| 208 | + async for event in handler.stream_events(): |
| 209 | + # log the received event on trace level |
| 210 | + if isinstance(event, InputRequiredEvent): |
| 211 | + # for api trigger hitl scenarios only pass the str input for processing |
| 212 | + hitl_processor = HitlProcessor(value=event.prefix) |
| 213 | + if self.context.resume and not response_applied: |
| 214 | + # If we are resuming, we need to apply the response to the event stream. |
| 215 | + response_applied = True |
| 216 | + response_event = await self.get_response_event() |
| 217 | + if response_event: |
| 218 | + # If we have a response event, send it to the workflow context. |
| 219 | + self.context.workflow_context.send_event(response_event) |
| 220 | + else: |
| 221 | + resume_trigger = await hitl_processor.create_resume_trigger() |
| 222 | + break |
| 223 | + |
| 224 | + return handler, resume_trigger |
| 225 | + |
182 | 226 | async def validate(self) -> None: |
183 | 227 | """Validate runtime inputs and load Llama agent configuration.""" |
184 | 228 | try: |
|
0 commit comments