|
48 | 48 | # Set to true to log all activations and completions |
49 | 49 | LOG_PROTOS = False |
50 | 50 |
|
| 51 | +# Name prefix for threads in the workflow task ThreadPoolExecutor. Used by the |
| 52 | +# breakpoint hook below to detect calls coming from workflow code. |
| 53 | +_WORKFLOW_THREAD_NAME_PREFIX = "temporal_workflow_" |
| 54 | + |
| 55 | +_ORIGINAL_BREAKPOINTHOOK = sys.breakpointhook |
| 56 | + |
| 57 | + |
| 58 | +def _temporal_workflow_breakpoint_hook(*args: object, **kwargs: object) -> object: |
| 59 | + if threading.current_thread().name.startswith(_WORKFLOW_THREAD_NAME_PREFIX): |
| 60 | + raise RuntimeError( |
| 61 | + "breakpoint() / pdb.set_trace() inside workflow code requires " |
| 62 | + "debug_mode=True (or the TEMPORAL_DEBUG environment variable) on " |
| 63 | + "the Worker. Without it the workflow runs on a thread pool and " |
| 64 | + "pdb's interactive REPL cannot read stdin." |
| 65 | + ) |
| 66 | + return _ORIGINAL_BREAKPOINTHOOK(*args, **kwargs) |
| 67 | + |
| 68 | + |
| 69 | +def _install_workflow_breakpoint_hook() -> None: |
| 70 | + if sys.breakpointhook is not _temporal_workflow_breakpoint_hook: |
| 71 | + sys.breakpointhook = _temporal_workflow_breakpoint_hook |
| 72 | + |
| 73 | + |
| 74 | +def _relax_sandbox_for_debugger(workflow_runner: WorkflowRunner) -> WorkflowRunner: |
| 75 | + """Remove the sandbox restrictions on `breakpoint` and `input` so pdb / |
| 76 | + breakpoint() can be used inside sandboxed workflow code under debug_mode. |
| 77 | +
|
| 78 | + The sandbox flags both as non-deterministic builtins by default. Users |
| 79 | + who opt into debug_mode have explicitly accepted non-determinism for a |
| 80 | + debugging session, so we relax these specific restrictions instead of |
| 81 | + forcing users to switch to `UnsandboxedWorkflowRunner` (which also |
| 82 | + disables every other sandbox check, a larger blast radius). |
| 83 | +
|
| 84 | + Returns the runner unchanged if it isn't a SandboxedWorkflowRunner. |
| 85 | + """ |
| 86 | + # Import lazily so users without the sandbox module aren't penalized. |
| 87 | + from temporalio.worker.workflow_sandbox._runner import SandboxedWorkflowRunner |
| 88 | + |
| 89 | + if not isinstance(workflow_runner, SandboxedWorkflowRunner): |
| 90 | + return workflow_runner |
| 91 | + |
| 92 | + restrictions = workflow_runner.restrictions |
| 93 | + invalid = restrictions.invalid_module_members |
| 94 | + builtins_matcher = invalid.children.get("__builtins__") |
| 95 | + if builtins_matcher is None or not ( |
| 96 | + "breakpoint" in builtins_matcher.use or "input" in builtins_matcher.use |
| 97 | + ): |
| 98 | + return workflow_runner |
| 99 | + |
| 100 | + new_use = set(builtins_matcher.use) - {"breakpoint", "input"} |
| 101 | + new_builtins = dataclasses.replace(builtins_matcher, use=new_use) |
| 102 | + new_invalid = dataclasses.replace( |
| 103 | + invalid, children={**invalid.children, "__builtins__": new_builtins} |
| 104 | + ) |
| 105 | + new_restrictions = dataclasses.replace( |
| 106 | + restrictions, invalid_module_members=new_invalid |
| 107 | + ) |
| 108 | + return dataclasses.replace(workflow_runner, restrictions=new_restrictions) |
| 109 | + |
| 110 | + |
51 | 111 | # Value was chosen abitrarily as a small number that allows some concurrency and prevents |
52 | 112 | # large numbers of concurrent external storage operations causing resource contention. |
53 | 113 | # This default limit is per workflow task activation and does not limit the total number |
@@ -96,6 +156,15 @@ def __init__( |
96 | 156 | ) |
97 | 157 | ) |
98 | 158 | self._workflow_task_executor_user_provided = workflow_task_executor is not None |
| 159 | + |
| 160 | + # Debug mode also enabled by the TEMPORAL_DEBUG env var. In debug mode, |
| 161 | + # deadlock detection is disabled, workflow activations run inline on |
| 162 | + # the asyncio main thread so interactive debuggers (pdb, breakpoint(), |
| 163 | + # IDE debuggers) can read stdin, and the sandbox restriction on |
| 164 | + # `breakpoint`/`input` is lifted so calls reach the debugger. |
| 165 | + self._debug_mode = bool(debug_mode or os.environ.get("TEMPORAL_DEBUG")) |
| 166 | + if self._debug_mode: |
| 167 | + workflow_runner = _relax_sandbox_for_debugger(workflow_runner) |
99 | 168 | self._workflow_runner = workflow_runner |
100 | 169 | self._unsandboxed_workflow_runner = unsandboxed_workflow_runner |
101 | 170 | self._data_converter = data_converter |
@@ -127,11 +196,15 @@ def __init__( |
127 | 196 | ) |
128 | 197 | self._throw_after_activation: Exception | None = None |
129 | 198 |
|
130 | | - # If there's a debug mode or a truthy TEMPORAL_DEBUG env var, disable |
131 | | - # deadlock detection, otherwise set to 2 seconds |
132 | | - self._deadlock_timeout_seconds = ( |
133 | | - None if debug_mode or os.environ.get("TEMPORAL_DEBUG") else 2 |
134 | | - ) |
| 199 | + # self._debug_mode is set earlier (before workflow_runner is assigned) |
| 200 | + # so the sandbox relaxation can take effect on the runner. |
| 201 | + self._deadlock_timeout_seconds = None if self._debug_mode else 2 |
| 202 | + |
| 203 | + # Install a process-wide breakpoint hook that fails loudly when |
| 204 | + # breakpoint() is called from a workflow worker thread without debug |
| 205 | + # mode. Without this guard the call silently hangs because pdb's |
| 206 | + # input() runs off the main thread. |
| 207 | + _install_workflow_breakpoint_hook() |
135 | 208 |
|
136 | 209 | # Keep track of workflows that could not be evicted |
137 | 210 | self._could_not_evict_count = 0 |
@@ -241,6 +314,37 @@ async def drain_poll_queue(self) -> None: |
241 | 314 | except PollShutdownError: |
242 | 315 | return |
243 | 316 |
|
| 317 | + async def _activate_inline_for_debug( |
| 318 | + self, |
| 319 | + loop: asyncio.AbstractEventLoop, |
| 320 | + workflow: _RunningWorkflow, |
| 321 | + act: temporalio.bridge.proto.workflow_activation.WorkflowActivation, |
| 322 | + ) -> temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion: |
| 323 | + # Run `workflow.activate(act)` synchronously, but indirected through |
| 324 | + # `loop.call_soon` so it executes outside the dispatch task's __step() |
| 325 | + # context. Python 3.14's asyncio refuses to enter a task while another |
| 326 | + # task on the same thread is mid-step; awaiting the future below |
| 327 | + # suspends this dispatch task so the workflow's own task stepping does |
| 328 | + # not collide with it. |
| 329 | + future: asyncio.Future = loop.create_future() |
| 330 | + |
| 331 | + def run_inline() -> None: |
| 332 | + # The workflow's _run_once clears the running-loop registration to |
| 333 | + # None when it exits. Save the main loop here and restore it after |
| 334 | + # the activation so any code that runs subsequently still sees the |
| 335 | + # right loop. |
| 336 | + main_loop = asyncio._get_running_loop() |
| 337 | + try: |
| 338 | + completion = workflow.activate(act) |
| 339 | + future.set_result(completion) |
| 340 | + except BaseException as e: |
| 341 | + future.set_exception(e) |
| 342 | + finally: |
| 343 | + asyncio._set_running_loop(main_loop) |
| 344 | + |
| 345 | + loop.call_soon(run_inline) |
| 346 | + return await future |
| 347 | + |
244 | 348 | async def _handle_activation( |
245 | 349 | self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation |
246 | 350 | ) -> None: |
@@ -330,35 +434,52 @@ async def _handle_activation( |
330 | 434 | ) |
331 | 435 | self._running_workflows[act.run_id] = workflow |
332 | 436 |
|
333 | | - # Run activation in separate thread so we can check if it's |
334 | | - # deadlocked |
335 | | - activate_task = asyncio.get_running_loop().run_in_executor( |
336 | | - self._workflow_task_executor, |
337 | | - workflow.activate, |
338 | | - act, |
339 | | - ) |
340 | | - |
341 | | - # Run activation task with deadlock timeout |
342 | | - try: |
343 | | - completion = await asyncio.wait_for( |
344 | | - activate_task, self._deadlock_timeout_seconds |
| 437 | + if self._debug_mode: |
| 438 | + # Run activation inline on the asyncio main thread so |
| 439 | + # interactive debuggers (pdb, breakpoint(), IDE debuggers) |
| 440 | + # can read stdin. We schedule the synchronous activation as |
| 441 | + # a `call_soon` callback rather than calling it directly, |
| 442 | + # so the dispatch task suspends at the `await` below and is |
| 443 | + # no longer mid-`__step()` when the workflow's own task |
| 444 | + # gets stepped inside `workflow.activate`. Python 3.14's |
| 445 | + # asyncio refuses to enter a task while another task on |
| 446 | + # the same thread is currently being executed; the |
| 447 | + # call_soon detour avoids that nesting. The main loop is |
| 448 | + # still blocked while the activation runs, which is the |
| 449 | + # intended behavior when debugging. |
| 450 | + completion = await self._activate_inline_for_debug( |
| 451 | + asyncio.get_running_loop(), workflow, act |
345 | 452 | ) |
346 | | - except asyncio.TimeoutError: |
347 | | - # Need to create the deadlock exception up here so it |
348 | | - # captures the trace now instead of later after we may have |
349 | | - # interrupted it |
350 | | - deadlock_exc = _DeadlockError.from_deadlocked_workflow( |
351 | | - workflow.instance, self._deadlock_timeout_seconds |
| 453 | + else: |
| 454 | + # Run activation in separate thread so we can check if it's |
| 455 | + # deadlocked |
| 456 | + activate_task = asyncio.get_running_loop().run_in_executor( |
| 457 | + self._workflow_task_executor, |
| 458 | + workflow.activate, |
| 459 | + act, |
352 | 460 | ) |
353 | | - # When we deadlock, we will raise an exception to fail |
354 | | - # the task. But before we do that, we want to try to |
355 | | - # interrupt the thread and put this activation task on |
356 | | - # the workflow so that the successive eviction can wait |
357 | | - # on it before trying to evict. |
358 | | - workflow.attempt_deadlock_interruption() |
359 | | - # Set the task and raise |
360 | | - workflow.deadlocked_activation_task = activate_task |
361 | | - raise deadlock_exc from None |
| 461 | + |
| 462 | + # Run activation task with deadlock timeout |
| 463 | + try: |
| 464 | + completion = await asyncio.wait_for( |
| 465 | + activate_task, self._deadlock_timeout_seconds |
| 466 | + ) |
| 467 | + except asyncio.TimeoutError: |
| 468 | + # Need to create the deadlock exception up here so it |
| 469 | + # captures the trace now instead of later after we may have |
| 470 | + # interrupted it |
| 471 | + deadlock_exc = _DeadlockError.from_deadlocked_workflow( |
| 472 | + workflow.instance, self._deadlock_timeout_seconds |
| 473 | + ) |
| 474 | + # When we deadlock, we will raise an exception to fail |
| 475 | + # the task. But before we do that, we want to try to |
| 476 | + # interrupt the thread and put this activation task on |
| 477 | + # the workflow so that the successive eviction can wait |
| 478 | + # on it before trying to evict. |
| 479 | + workflow.attempt_deadlock_interruption() |
| 480 | + # Set the task and raise |
| 481 | + workflow.deadlocked_activation_task = activate_task |
| 482 | + raise deadlock_exc from None |
362 | 483 |
|
363 | 484 | except Exception as err: |
364 | 485 | if isinstance(err, _DeadlockError): |
@@ -576,22 +697,30 @@ async def _handle_cache_eviction( |
576 | 697 | handle_eviction_task: asyncio.Future | None = None |
577 | 698 | while True: |
578 | 699 | try: |
579 | | - # We only create the eviction task if we haven't already or |
580 | | - # it is done. This is because if it already is running and |
581 | | - # timed out, it's still running (and holding on to a |
582 | | - # thread). But if did complete running but failed with |
583 | | - # another error, we want to re-create the task. |
584 | | - if not handle_eviction_task or handle_eviction_task.done(): |
585 | | - handle_eviction_task = ( |
586 | | - asyncio.get_running_loop().run_in_executor( |
587 | | - self._workflow_task_executor, |
588 | | - workflow.activate, |
589 | | - act, |
| 700 | + if self._debug_mode: |
| 701 | + # Eviction activation runs inline on the main thread |
| 702 | + # too. See `_activate_inline_for_debug` for why the |
| 703 | + # `call_soon` detour is needed on Python 3.14+. |
| 704 | + await self._activate_inline_for_debug( |
| 705 | + asyncio.get_running_loop(), workflow, act |
| 706 | + ) |
| 707 | + else: |
| 708 | + # We only create the eviction task if we haven't already or |
| 709 | + # it is done. This is because if it already is running and |
| 710 | + # timed out, it's still running (and holding on to a |
| 711 | + # thread). But if did complete running but failed with |
| 712 | + # another error, we want to re-create the task. |
| 713 | + if not handle_eviction_task or handle_eviction_task.done(): |
| 714 | + handle_eviction_task = ( |
| 715 | + asyncio.get_running_loop().run_in_executor( |
| 716 | + self._workflow_task_executor, |
| 717 | + workflow.activate, |
| 718 | + act, |
| 719 | + ) |
590 | 720 | ) |
| 721 | + await asyncio.wait_for( |
| 722 | + handle_eviction_task, self._deadlock_timeout_seconds |
591 | 723 | ) |
592 | | - await asyncio.wait_for( |
593 | | - handle_eviction_task, self._deadlock_timeout_seconds |
594 | | - ) |
595 | 724 | # Break if it succeeds |
596 | 725 | break |
597 | 726 | except BaseException as err: |
|
0 commit comments