|
| 1 | +"""Workflow debugger support. |
| 2 | +
|
| 3 | +When ``debug_mode=True`` on the Worker (or the ``TEMPORAL_DEBUG`` env var |
| 4 | +is set), the worker uses helpers from this module to make ``breakpoint()`` |
| 5 | +inside workflow code open an interactive pdb prompt. The inline-dispatch |
| 6 | +piece lives on the worker itself; everything else (sandbox relaxation, |
| 7 | +breakpoint hook, custom Pdb subclass) lives here. |
| 8 | +""" |
| 9 | + |
| 10 | +from __future__ import annotations |
| 11 | + |
| 12 | +import dataclasses |
| 13 | +import sys |
| 14 | +import threading |
| 15 | +from types import FrameType, TracebackType |
| 16 | + |
| 17 | +import temporalio.workflow |
| 18 | +from temporalio.worker.workflow_sandbox._runner import SandboxedWorkflowRunner |
| 19 | + |
| 20 | +from ._workflow_instance import WorkflowRunner |
| 21 | + |
| 22 | +__all__ = [ |
| 23 | + "_install_workflow_breakpoint_hook", |
| 24 | + "_relax_sandbox_for_debugger", |
| 25 | + "_temporal_workflow_breakpoint_hook", |
| 26 | +] |
| 27 | + |
| 28 | +# Prefix used to detect threads in the workflow task ThreadPoolExecutor. |
| 29 | +_WORKFLOW_THREAD_NAME_PREFIX = "temporal_workflow_" |
| 30 | + |
| 31 | +_ORIGINAL_BREAKPOINTHOOK = sys.breakpointhook |
| 32 | + |
| 33 | + |
| 34 | +def _build_workflow_pdb_class() -> type: |
| 35 | + """Build a Pdb subclass that suspends sandbox restrictions during the REPL. |
| 36 | +
|
| 37 | + pdb's cmdloop touches ``readline.get_completer`` and other |
| 38 | + sandbox-restricted internals each time it interacts with the user; we |
| 39 | + bracket each interaction with ``_sandbox_unrestricted.value = True`` and |
| 40 | + restore the previous value afterwards. Outside the REPL the sandbox |
| 41 | + stays intact. |
| 42 | +
|
| 43 | + ``pdb`` is imported lazily because it's a debug-only dependency that |
| 44 | + pulls in ``cmd``/``bdb``/``linecache``; no reason to pay that cost at |
| 45 | + worker import time. |
| 46 | + """ |
| 47 | + import pdb |
| 48 | + |
| 49 | + from temporalio.workflow._sandbox import _sandbox_unrestricted |
| 50 | + |
| 51 | + class _WorkflowPdb(pdb.Pdb): |
| 52 | + # The `interaction` signature differs across Python versions: 3.10-3.12 |
| 53 | + # typeshed names the second parameter `traceback: TracebackType | None`, |
| 54 | + # while 3.13+ renames it `tb_or_exc` and widens the type to include |
| 55 | + # `BaseException`. No single signature satisfies both stubs, so we |
| 56 | + # suppress the override check. |
| 57 | + def interaction( # type: ignore[override] |
| 58 | + self, |
| 59 | + frame: FrameType | None, |
| 60 | + tb_or_exc: TracebackType | BaseException | None, |
| 61 | + ) -> None: |
| 62 | + prev = getattr(_sandbox_unrestricted, "value", False) |
| 63 | + _sandbox_unrestricted.value = True |
| 64 | + try: |
| 65 | + super().interaction(frame, tb_or_exc) # type: ignore[arg-type] |
| 66 | + finally: |
| 67 | + _sandbox_unrestricted.value = prev |
| 68 | + |
| 69 | + # Override `q`/`quit`/`exit`/EOF (Ctrl-D) to behave like `continue`. |
| 70 | + # Default pdb raises `BdbQuit`, which propagates as an uncaught |
| 71 | + # exception out of workflow.run, fails the workflow task, and |
| 72 | + # triggers a server retry storm during teardown. For a debug |
| 73 | + # session the user almost always wants "stop debugging and let the |
| 74 | + # workflow finish" — that's `continue`. Users who truly want to |
| 75 | + # abort can Ctrl-C the outer shell. |
| 76 | + def do_quit(self, arg: str) -> bool | None: |
| 77 | + self.message( |
| 78 | + "[Temporal] 'q'/Ctrl-D continues the workflow. " |
| 79 | + "Ctrl-C the outer shell to abort." |
| 80 | + ) |
| 81 | + return self.do_continue(arg) |
| 82 | + |
| 83 | + do_q = do_exit = do_quit |
| 84 | + do_EOF = do_quit |
| 85 | + |
| 86 | + return _WorkflowPdb |
| 87 | + |
| 88 | + |
| 89 | +def _temporal_workflow_breakpoint_hook(*args: object, **kwargs: object) -> object: |
| 90 | + """Process-wide ``sys.breakpointhook`` that handles ``breakpoint()`` calls. |
| 91 | +
|
| 92 | + From a workflow worker thread without ``debug_mode``: raises a clear |
| 93 | + ``RuntimeError`` (replacing the previous silent hang). From inside a |
| 94 | + workflow activation (with ``debug_mode`` on): drops the user into a |
| 95 | + custom Pdb at the workflow's own frame. From anywhere else: delegates |
| 96 | + to whatever hook was previously installed. |
| 97 | + """ |
| 98 | + if threading.current_thread().name.startswith(_WORKFLOW_THREAD_NAME_PREFIX): |
| 99 | + raise RuntimeError( |
| 100 | + "breakpoint() / pdb.set_trace() inside workflow code requires " |
| 101 | + "debug_mode=True (or the TEMPORAL_DEBUG environment variable) on " |
| 102 | + "the Worker. Without it the workflow runs on a thread pool and " |
| 103 | + "pdb's interactive REPL cannot read stdin." |
| 104 | + ) |
| 105 | + if not temporalio.workflow.in_workflow(): |
| 106 | + # Not inside a workflow activation — let pytest's wrapper, ipdb, or |
| 107 | + # whatever else is configured handle it. |
| 108 | + return _ORIGINAL_BREAKPOINTHOOK(*args, **kwargs) |
| 109 | + # Inside a workflow: drop the user into pdb at the caller's frame (the |
| 110 | + # workflow's `run` method, where breakpoint() was actually written) rather |
| 111 | + # than landing inside this hook. Bypassing the configured breakpoint hook |
| 112 | + # also avoids pytest's pdb wrapper, which assumes a test-code context and |
| 113 | + # touches sandbox-restricted internals during its terminal-writer setup. |
| 114 | + # `sandbox_unrestricted()` lifts member checks for the duration of the |
| 115 | + # REPL so pdb's own initialization (readline, etc.) isn't blocked. |
| 116 | + # `skip` tells pdb not to stop in our hook frame or the contextlib |
| 117 | + # plumbing — without it pdb's first step lands at the `with` teardown |
| 118 | + # instead of the user's next workflow line. |
| 119 | + caller_frame = sys._getframe(1) |
| 120 | + with temporalio.workflow.unsafe.sandbox_unrestricted(): |
| 121 | + pdb_cls = _build_workflow_pdb_class() |
| 122 | + pdb_cls( |
| 123 | + skip=[ |
| 124 | + "temporalio.worker._debugger", |
| 125 | + "temporalio.workflow._sandbox", |
| 126 | + "contextlib", |
| 127 | + ] |
| 128 | + ).set_trace(caller_frame) |
| 129 | + return None |
| 130 | + |
| 131 | + |
| 132 | +def _install_workflow_breakpoint_hook() -> None: |
| 133 | + """Set ``sys.breakpointhook`` to the workflow hook if it isn't already.""" |
| 134 | + if sys.breakpointhook is not _temporal_workflow_breakpoint_hook: |
| 135 | + sys.breakpointhook = _temporal_workflow_breakpoint_hook |
| 136 | + |
| 137 | + |
| 138 | +def _relax_sandbox_for_debugger(workflow_runner: WorkflowRunner) -> WorkflowRunner: |
| 139 | + """Allow ``breakpoint()`` past the sandbox so it can reach the worker hook. |
| 140 | +
|
| 141 | + The sandbox flags ``breakpoint`` as non-deterministic by default; without |
| 142 | + this relaxation the call raises before our breakpoint hook can run. |
| 143 | + Once inside the hook, the hook itself enters ``sandbox_unrestricted()`` |
| 144 | + for the duration of the debugger session, so pdb's internals (readline, |
| 145 | + os.environ, etc.) aren't blocked either — without permanently dropping |
| 146 | + sandbox checks for the rest of workflow execution. |
| 147 | + """ |
| 148 | + if not isinstance(workflow_runner, SandboxedWorkflowRunner): |
| 149 | + return workflow_runner |
| 150 | + |
| 151 | + restrictions = workflow_runner.restrictions |
| 152 | + invalid = restrictions.invalid_module_members |
| 153 | + builtins_matcher = invalid.children.get("__builtins__") |
| 154 | + if builtins_matcher is None or "breakpoint" not in builtins_matcher.use: |
| 155 | + return workflow_runner |
| 156 | + |
| 157 | + new_use = set(builtins_matcher.use) - {"breakpoint"} |
| 158 | + new_builtins = dataclasses.replace(builtins_matcher, use=new_use) |
| 159 | + new_invalid = dataclasses.replace( |
| 160 | + invalid, children={**invalid.children, "__builtins__": new_builtins} |
| 161 | + ) |
| 162 | + new_restrictions = dataclasses.replace( |
| 163 | + restrictions, invalid_module_members=new_invalid |
| 164 | + ) |
| 165 | + return dataclasses.replace(workflow_runner, restrictions=new_restrictions) |
0 commit comments