Skip to content

Commit 6b72ea6

Browse files
William MaWilliam Ma
authored andcommitted
Make workflow deadlock detection timeout configurable
The workflow-task deadlock detector timeout was hardcoded to 2 seconds, with `debug_mode` / `TEMPORAL_DEBUG` as the only escape hatch — and that disables detection entirely (and may relax the sandbox). There was no way to keep detection on but grant more headroom to workflows that legitimately do heavy CPU-bound work on the workflow thread before yielding. Add a `deadlock_detection_timeout: timedelta = timedelta(seconds=2)` option to `Worker` and `Replayer`, plumbed through to `_WorkflowWorker`. The default preserves the current 2s behavior; `debug_mode` still disables detection. This matches the Go SDK's `WorkerOptions.DeadlockDetectionTimeout`.
1 parent c37fa7e commit 6b72ea6

5 files changed

Lines changed: 64 additions & 3 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,13 @@ to include examples, links to docs, or any other relevant information.
2020

2121
### Added
2222

23+
- `Worker` and `Replayer` now accept a `deadlock_detection_timeout`
24+
(`timedelta`, default 2 seconds) to configure how long a workflow task may run
25+
without yielding before it is failed as a deadlock (`[TMPRL1101]`). This brings
26+
the Python SDK in line with the Go SDK's `WorkerOptions.DeadlockDetectionTimeout`.
27+
Raise it only for workflows that legitimately perform heavy CPU-bound work on
28+
the workflow thread; `debug_mode` still disables detection entirely.
29+
2330
### Changed
2431

2532
- AWS Lambda worker `configure` parameter supports sync, async, and async

temporalio/worker/_replayer.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from collections.abc import AsyncIterator, Mapping, Sequence
99
from contextlib import AbstractAsyncContextManager, asynccontextmanager
1010
from dataclasses import dataclass
11+
from datetime import timedelta
1112

1213
from typing_extensions import TypedDict
1314

@@ -48,6 +49,7 @@ def __init__(
4849
identity: str | None = None,
4950
workflow_failure_exception_types: Sequence[type[BaseException]] = [],
5051
debug_mode: bool = False,
52+
deadlock_detection_timeout: timedelta = timedelta(seconds=2),
5153
runtime: temporalio.runtime.Runtime | None = None,
5254
disable_safe_workflow_eviction: bool = False,
5355
header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC,
@@ -78,6 +80,7 @@ def __init__(
7880
identity=identity,
7981
workflow_failure_exception_types=workflow_failure_exception_types,
8082
debug_mode=debug_mode,
83+
deadlock_detection_timeout=deadlock_detection_timeout,
8184
runtime=runtime,
8285
disable_safe_workflow_eviction=disable_safe_workflow_eviction,
8386
header_codec_behavior=header_codec_behavior,
@@ -256,6 +259,9 @@ def on_eviction_hook(
256259
"workflow_failure_exception_types", []
257260
),
258261
debug_mode=self._config.get("debug_mode", False),
262+
deadlock_detection_timeout=self._config.get(
263+
"deadlock_detection_timeout", timedelta(seconds=2)
264+
),
259265
metric_meter=runtime.metric_meter,
260266
on_eviction_hook=on_eviction_hook,
261267
disable_eager_activity_execution=False,
@@ -410,6 +416,7 @@ class ReplayerConfig(TypedDict, total=False):
410416
identity: str | None
411417
workflow_failure_exception_types: Sequence[type[BaseException]]
412418
debug_mode: bool
419+
deadlock_detection_timeout: timedelta
413420
runtime: temporalio.runtime.Runtime | None
414421
disable_safe_workflow_eviction: bool
415422
header_codec_behavior: HeaderCodecBehavior

temporalio/worker/_worker.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ def __init__(
130130
workflow_failure_exception_types: Sequence[type[BaseException]] = [],
131131
shared_state_manager: SharedStateManager | None = None,
132132
debug_mode: bool = False,
133+
deadlock_detection_timeout: timedelta = timedelta(seconds=2),
133134
disable_eager_activity_execution: bool = False,
134135
on_fatal_error: Callable[[BaseException], Awaitable[None]] | None = None,
135136
use_worker_versioning: bool = False,
@@ -282,6 +283,14 @@ def __init__(
282283
sandboxing in order to make using a debugger easier. If false
283284
but the environment variable ``TEMPORAL_DEBUG`` is truthy, this
284285
will be set to true.
286+
deadlock_detection_timeout: Maximum amount of time a workflow task
287+
is allowed to run without yielding control back to the event
288+
loop before it is considered a deadlock and the task fails with
289+
``[TMPRL1101]``. Defaults to 2 seconds. Increase this only for
290+
workflows that legitimately perform heavy CPU-bound work on the
291+
workflow thread; raising it weakens the deadlock detector's
292+
ability to catch genuinely stuck workflows. Ignored when
293+
``debug_mode`` is enabled (detection is fully disabled then).
285294
disable_eager_activity_execution: If true, will disable eager
286295
activity execution. Eager activity execution is an optimization
287296
on some servers that sends activities back to the same worker as
@@ -363,6 +372,7 @@ def __init__(
363372
workflow_failure_exception_types=workflow_failure_exception_types,
364373
shared_state_manager=shared_state_manager,
365374
debug_mode=debug_mode,
375+
deadlock_detection_timeout=deadlock_detection_timeout,
366376
disable_eager_activity_execution=disable_eager_activity_execution,
367377
on_fatal_error=on_fatal_error,
368378
use_worker_versioning=use_worker_versioning,
@@ -529,6 +539,7 @@ def check_activity(activity: str):
529539
"workflow_failure_exception_types"
530540
], # type: ignore[reportTypedDictNotRequiredAccess]
531541
debug_mode=config["debug_mode"], # type: ignore[reportTypedDictNotRequiredAccess]
542+
deadlock_detection_timeout=config["deadlock_detection_timeout"], # type: ignore[reportTypedDictNotRequiredAccess]
532543
disable_eager_activity_execution=config[
533544
"disable_eager_activity_execution"
534545
], # type: ignore[reportTypedDictNotRequiredAccess]
@@ -977,6 +988,7 @@ class WorkerConfig(TypedDict, total=False):
977988
workflow_failure_exception_types: Sequence[type[BaseException]]
978989
shared_state_manager: SharedStateManager | None
979990
debug_mode: bool
991+
deadlock_detection_timeout: timedelta
980992
disable_eager_activity_execution: bool
981993
on_fatal_error: Callable[[BaseException], Awaitable[None]] | None
982994
use_worker_versioning: bool

temporalio/worker/_workflow.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ def __init__(
7979
interceptors: Sequence[Interceptor],
8080
workflow_failure_exception_types: Sequence[type[BaseException]],
8181
debug_mode: bool,
82+
deadlock_detection_timeout: timedelta,
8283
disable_eager_activity_execution: bool,
8384
metric_meter: temporalio.common.MetricMeter,
8485
on_eviction_hook: Callable[
@@ -155,9 +156,11 @@ def __init__(
155156
)
156157
self._throw_after_activation: Exception | None = None
157158

158-
# If debug mode is enabled, disable deadlock detection
159-
# otherwise set to 2 seconds
160-
self._deadlock_timeout_seconds = None if self._debug_mode else 2
159+
# If debug mode is enabled, disable deadlock detection entirely;
160+
# otherwise use the configured timeout (defaults to 2 seconds).
161+
self._deadlock_timeout_seconds = (
162+
None if self._debug_mode else deadlock_detection_timeout.total_seconds()
163+
)
161164

162165
# Keep track of workflows that could not be evicted
163166
self._could_not_evict_count = 0

tests/worker/test_worker.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1707,3 +1707,35 @@ def debug_envvar():
17071707
"_pydevd_bundle"
17081708
in worker._workflow_worker._workflow_runner.restrictions.passthrough_modules
17091709
)
1710+
1711+
1712+
async def test_worker_deadlock_detection_timeout(client: Client):
1713+
# Default is 2 seconds.
1714+
worker = Worker(
1715+
client,
1716+
workflows=[SimpleWorkflow],
1717+
task_queue=f"task-queue-{uuid.uuid4()}",
1718+
)
1719+
assert worker._workflow_worker
1720+
assert worker._workflow_worker._deadlock_timeout_seconds == 2
1721+
1722+
# A custom timeout is honored.
1723+
worker = Worker(
1724+
client,
1725+
workflows=[SimpleWorkflow],
1726+
task_queue=f"task-queue-{uuid.uuid4()}",
1727+
deadlock_detection_timeout=timedelta(seconds=10),
1728+
)
1729+
assert worker._workflow_worker
1730+
assert worker._workflow_worker._deadlock_timeout_seconds == 10
1731+
1732+
# debug_mode disables detection regardless of the configured timeout.
1733+
worker = Worker(
1734+
client,
1735+
workflows=[SimpleWorkflow],
1736+
task_queue=f"task-queue-{uuid.uuid4()}",
1737+
deadlock_detection_timeout=timedelta(seconds=10),
1738+
debug_mode=True,
1739+
)
1740+
assert worker._workflow_worker
1741+
assert worker._workflow_worker._deadlock_timeout_seconds is None

0 commit comments

Comments
 (0)