|
48 | 48 | # Set to true to log all activations and completions |
49 | 49 | LOG_PROTOS = False |
50 | 50 |
|
| 51 | +# Prefix used to detect threads in the workflow task ThreadPoolExecutor. |
| 52 | +_WORKFLOW_THREAD_NAME_PREFIX = "temporal_workflow_" |
| 53 | + |
| 54 | +_ORIGINAL_BREAKPOINTHOOK = sys.breakpointhook |
| 55 | + |
| 56 | + |
| 57 | +def _temporal_workflow_breakpoint_hook(*args: object, **kwargs: object) -> object: |
| 58 | + if threading.current_thread().name.startswith(_WORKFLOW_THREAD_NAME_PREFIX): |
| 59 | + raise RuntimeError( |
| 60 | + "breakpoint() / pdb.set_trace() inside workflow code requires " |
| 61 | + "debug_mode=True (or the TEMPORAL_DEBUG environment variable) on " |
| 62 | + "the Worker. Without it the workflow runs on a thread pool and " |
| 63 | + "pdb's interactive REPL cannot read stdin." |
| 64 | + ) |
| 65 | + return _ORIGINAL_BREAKPOINTHOOK(*args, **kwargs) |
| 66 | + |
| 67 | + |
| 68 | +def _install_workflow_breakpoint_hook() -> None: |
| 69 | + if sys.breakpointhook is not _temporal_workflow_breakpoint_hook: |
| 70 | + sys.breakpointhook = _temporal_workflow_breakpoint_hook |
| 71 | + |
| 72 | + |
| 73 | +def _relax_sandbox_for_debugger(workflow_runner: WorkflowRunner) -> WorkflowRunner: |
| 74 | + """Lift sandbox restrictions on `breakpoint` and `input` for debug_mode. |
| 75 | +
|
| 76 | + Both are flagged as non-deterministic by default. Users opting into |
| 77 | + debug_mode have accepted non-determinism for the session, so a targeted |
| 78 | + relaxation beats forcing them to swap to UnsandboxedWorkflowRunner. |
| 79 | + """ |
| 80 | + from temporalio.worker.workflow_sandbox._runner import SandboxedWorkflowRunner |
| 81 | + |
| 82 | + if not isinstance(workflow_runner, SandboxedWorkflowRunner): |
| 83 | + return workflow_runner |
| 84 | + |
| 85 | + restrictions = workflow_runner.restrictions |
| 86 | + invalid = restrictions.invalid_module_members |
| 87 | + builtins_matcher = invalid.children.get("__builtins__") |
| 88 | + if builtins_matcher is None or not ( |
| 89 | + "breakpoint" in builtins_matcher.use or "input" in builtins_matcher.use |
| 90 | + ): |
| 91 | + return workflow_runner |
| 92 | + |
| 93 | + new_use = set(builtins_matcher.use) - {"breakpoint", "input"} |
| 94 | + new_builtins = dataclasses.replace(builtins_matcher, use=new_use) |
| 95 | + new_invalid = dataclasses.replace( |
| 96 | + invalid, children={**invalid.children, "__builtins__": new_builtins} |
| 97 | + ) |
| 98 | + new_restrictions = dataclasses.replace( |
| 99 | + restrictions, invalid_module_members=new_invalid |
| 100 | + ) |
| 101 | + return dataclasses.replace(workflow_runner, restrictions=new_restrictions) |
| 102 | + |
| 103 | + |
51 | 104 | # Value was chosen abitrarily as a small number that allows some concurrency and prevents |
52 | 105 | # large numbers of concurrent external storage operations causing resource contention. |
53 | 106 | # This default limit is per workflow task activation and does not limit the total number |
@@ -96,6 +149,13 @@ def __init__( |
96 | 149 | ) |
97 | 150 | ) |
98 | 151 | self._workflow_task_executor_user_provided = workflow_task_executor is not None |
| 152 | + |
| 153 | + # Debug mode (also enabled by TEMPORAL_DEBUG) disables deadlock |
| 154 | + # detection, runs activations inline on the main thread, and lifts |
| 155 | + # the sandbox restriction on breakpoint()/input() so pdb works. |
| 156 | + self._debug_mode = bool(debug_mode or os.environ.get("TEMPORAL_DEBUG")) |
| 157 | + if self._debug_mode: |
| 158 | + workflow_runner = _relax_sandbox_for_debugger(workflow_runner) |
99 | 159 | self._workflow_runner = workflow_runner |
100 | 160 | self._unsandboxed_workflow_runner = unsandboxed_workflow_runner |
101 | 161 | self._data_converter = data_converter |
@@ -127,11 +187,9 @@ def __init__( |
127 | 187 | ) |
128 | 188 | self._throw_after_activation: Exception | None = None |
129 | 189 |
|
130 | | - # If there's a debug mode or a truthy TEMPORAL_DEBUG env var, disable |
131 | | - # deadlock detection, otherwise set to 2 seconds |
132 | | - self._deadlock_timeout_seconds = ( |
133 | | - None if debug_mode or os.environ.get("TEMPORAL_DEBUG") else 2 |
134 | | - ) |
| 190 | + self._deadlock_timeout_seconds = None if self._debug_mode else 2 |
| 191 | + |
| 192 | + _install_workflow_breakpoint_hook() |
135 | 193 |
|
136 | 194 | # Keep track of workflows that could not be evicted |
137 | 195 | self._could_not_evict_count = 0 |
@@ -241,6 +299,34 @@ async def drain_poll_queue(self) -> None: |
241 | 299 | except PollShutdownError: |
242 | 300 | return |
243 | 301 |
|
| 302 | + async def _activate_inline_for_debug( |
| 303 | + self, |
| 304 | + loop: asyncio.AbstractEventLoop, |
| 305 | + workflow: _RunningWorkflow, |
| 306 | + act: temporalio.bridge.proto.workflow_activation.WorkflowActivation, |
| 307 | + ) -> temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion: |
| 308 | + # Indirect through call_soon + a future so the activation runs outside |
| 309 | + # the dispatch task's __step() context. Python 3.14 refuses to enter a |
| 310 | + # task while another on the same thread is mid-step; suspending at the |
| 311 | + # await below clears that state so workflow.activate can step its own |
| 312 | + # task without collision. |
| 313 | + future: asyncio.Future = loop.create_future() |
| 314 | + |
| 315 | + def run_inline() -> None: |
| 316 | + # _run_once clears the running-loop registration on exit; restore |
| 317 | + # the main loop so later code sees the right one. |
| 318 | + main_loop = asyncio._get_running_loop() |
| 319 | + try: |
| 320 | + completion = workflow.activate(act) |
| 321 | + future.set_result(completion) |
| 322 | + except BaseException as e: |
| 323 | + future.set_exception(e) |
| 324 | + finally: |
| 325 | + asyncio._set_running_loop(main_loop) |
| 326 | + |
| 327 | + loop.call_soon(run_inline) |
| 328 | + return await future |
| 329 | + |
244 | 330 | async def _handle_activation( |
245 | 331 | self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation |
246 | 332 | ) -> None: |
@@ -330,35 +416,43 @@ async def _handle_activation( |
330 | 416 | ) |
331 | 417 | self._running_workflows[act.run_id] = workflow |
332 | 418 |
|
333 | | - # Run activation in separate thread so we can check if it's |
334 | | - # deadlocked |
335 | | - activate_task = asyncio.get_running_loop().run_in_executor( |
336 | | - self._workflow_task_executor, |
337 | | - workflow.activate, |
338 | | - act, |
339 | | - ) |
340 | | - |
341 | | - # Run activation task with deadlock timeout |
342 | | - try: |
343 | | - completion = await asyncio.wait_for( |
344 | | - activate_task, self._deadlock_timeout_seconds |
| 419 | + if self._debug_mode: |
| 420 | + # Inline on the main thread so pdb / breakpoint() can read |
| 421 | + # stdin. The loop blocks during the activation — that's the |
| 422 | + # intended single-stepping semantic. |
| 423 | + completion = await self._activate_inline_for_debug( |
| 424 | + asyncio.get_running_loop(), workflow, act |
345 | 425 | ) |
346 | | - except asyncio.TimeoutError: |
347 | | - # Need to create the deadlock exception up here so it |
348 | | - # captures the trace now instead of later after we may have |
349 | | - # interrupted it |
350 | | - deadlock_exc = _DeadlockError.from_deadlocked_workflow( |
351 | | - workflow.instance, self._deadlock_timeout_seconds |
| 426 | + else: |
| 427 | + # Run activation in separate thread so we can check if it's |
| 428 | + # deadlocked |
| 429 | + activate_task = asyncio.get_running_loop().run_in_executor( |
| 430 | + self._workflow_task_executor, |
| 431 | + workflow.activate, |
| 432 | + act, |
352 | 433 | ) |
353 | | - # When we deadlock, we will raise an exception to fail |
354 | | - # the task. But before we do that, we want to try to |
355 | | - # interrupt the thread and put this activation task on |
356 | | - # the workflow so that the successive eviction can wait |
357 | | - # on it before trying to evict. |
358 | | - workflow.attempt_deadlock_interruption() |
359 | | - # Set the task and raise |
360 | | - workflow.deadlocked_activation_task = activate_task |
361 | | - raise deadlock_exc from None |
| 434 | + |
| 435 | + # Run activation task with deadlock timeout |
| 436 | + try: |
| 437 | + completion = await asyncio.wait_for( |
| 438 | + activate_task, self._deadlock_timeout_seconds |
| 439 | + ) |
| 440 | + except asyncio.TimeoutError: |
| 441 | + # Need to create the deadlock exception up here so it |
| 442 | + # captures the trace now instead of later after we may have |
| 443 | + # interrupted it |
| 444 | + deadlock_exc = _DeadlockError.from_deadlocked_workflow( |
| 445 | + workflow.instance, self._deadlock_timeout_seconds |
| 446 | + ) |
| 447 | + # When we deadlock, we will raise an exception to fail |
| 448 | + # the task. But before we do that, we want to try to |
| 449 | + # interrupt the thread and put this activation task on |
| 450 | + # the workflow so that the successive eviction can wait |
| 451 | + # on it before trying to evict. |
| 452 | + workflow.attempt_deadlock_interruption() |
| 453 | + # Set the task and raise |
| 454 | + workflow.deadlocked_activation_task = activate_task |
| 455 | + raise deadlock_exc from None |
362 | 456 |
|
363 | 457 | except Exception as err: |
364 | 458 | if isinstance(err, _DeadlockError): |
@@ -576,22 +670,27 @@ async def _handle_cache_eviction( |
576 | 670 | handle_eviction_task: asyncio.Future | None = None |
577 | 671 | while True: |
578 | 672 | try: |
579 | | - # We only create the eviction task if we haven't already or |
580 | | - # it is done. This is because if it already is running and |
581 | | - # timed out, it's still running (and holding on to a |
582 | | - # thread). But if did complete running but failed with |
583 | | - # another error, we want to re-create the task. |
584 | | - if not handle_eviction_task or handle_eviction_task.done(): |
585 | | - handle_eviction_task = ( |
586 | | - asyncio.get_running_loop().run_in_executor( |
587 | | - self._workflow_task_executor, |
588 | | - workflow.activate, |
589 | | - act, |
| 673 | + if self._debug_mode: |
| 674 | + await self._activate_inline_for_debug( |
| 675 | + asyncio.get_running_loop(), workflow, act |
| 676 | + ) |
| 677 | + else: |
| 678 | + # We only create the eviction task if we haven't already or |
| 679 | + # it is done. This is because if it already is running and |
| 680 | + # timed out, it's still running (and holding on to a |
| 681 | + # thread). But if did complete running but failed with |
| 682 | + # another error, we want to re-create the task. |
| 683 | + if not handle_eviction_task or handle_eviction_task.done(): |
| 684 | + handle_eviction_task = ( |
| 685 | + asyncio.get_running_loop().run_in_executor( |
| 686 | + self._workflow_task_executor, |
| 687 | + workflow.activate, |
| 688 | + act, |
| 689 | + ) |
590 | 690 | ) |
| 691 | + await asyncio.wait_for( |
| 692 | + handle_eviction_task, self._deadlock_timeout_seconds |
591 | 693 | ) |
592 | | - await asyncio.wait_for( |
593 | | - handle_eviction_task, self._deadlock_timeout_seconds |
594 | | - ) |
595 | 694 | # Break if it succeeds |
596 | 695 | break |
597 | 696 | except BaseException as err: |
|
0 commit comments