Environment
hatchet-sdk 1.33.2
- Python 3.13.13 (CPython, Linux x86_64)
Summary
hatchet_sdk.worker.runner.utils.capture_logs.AsyncLogSender.publish calls self.q.put_nowait(record) on an asyncio.Queue, but publish is invoked from any thread Python's logging module dispatches into — including worker-pool threads created by asyncio.to_thread(...) inside a step.
The queue is bound to one event loop. Cross-thread put_nowait calls _wakeup_next → call_soon on the wrong loop, which under loop.set_debug(True) raises RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one, and under default (debug-off) silently corrupts that loop's internal call queue. The corruption manifests as a wedged Hatchet worker that still heartbeats and receives start step run messages but never executes them ("Waiting Steps N blocked_for=Xs", "THE TIME TO START THE TASK RUN IS TOO LONG, THE EVENT LOOP MAY BE BLOCKED").
Stack trace (under loop.set_debug(True))
File "hatchet_sdk/worker/runner/utils/capture_logs.py", line 154, in publish
self.q.put_nowait(record)
File "asyncio/queues.py", line 170, in put_nowait
self._wakeup_next(self._getters)
File "asyncio/queues.py", line 76, in _wakeup_next
waiter.set_result(None)
File "asyncio/base_events.py", line 835, in call_soon
self._check_thread()
File "asyncio/base_events.py", line 872, in _check_thread
raise RuntimeError(
"Non-thread-safe operation invoked on an event loop other "
"than the current one")
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one
Root cause
- The SDK installs a
LogForwardingHandler (a logging.StreamHandler subclass) on the root logger. Its emit() checks ctx_step_run_id.get() and, if set, calls self.log_sender.publish(...).
ctx_step_run_id is a contextvars.ContextVar. asyncio.to_thread(fn, ...) copies the current Context into the worker-pool thread, so any sync helper invoked via to_thread from within a step run inherits a populated ctx_step_run_id.
- Common sync helpers do logging — e.g.
googleapiclient calls, requests, internal logger.info(...) for observability. Each logging.info(...) from the pool thread triggers LogForwardingHandler.emit → self.log_sender.publish(record) → self.q.put_nowait(record) on a thread that is not the queue's owning loop's thread.
asyncio.Queue.put_nowait is documented as a single-loop (coroutine-safe) operation. It calls _wakeup_next, which calls loop.call_soon(...), which under debug raises and under release silently mutates the loop's _ready deque from a foreign thread. The race eventually leaves the loop's internal scheduling state inconsistent, and the SDK runner stops launching new step coroutines.
Reproduction
Any Hatchet workflow whose step delegates to a sync helper that logs.
Minimal repro:
import asyncio, logging
from hatchet_sdk import Hatchet, Context
hatchet = Hatchet()
wf = hatchet.workflow(name="repro", on_events=["repro:fire"])
@wf.task()
async def step(input, ctx: Context):
def sync_helper():
logging.getLogger("repro").info("hello from pool thread")
# contextvars (incl. ctx_step_run_id) propagate into to_thread
await asyncio.to_thread(sync_helper)
return {}
Run with loop.set_debug(True) on the worker thread loop and fire repro:fire. The RuntimeError shown above fires immediately. Without debug mode, the same race silently corrupts loop state — fire enough events to surface the wedge (in practice, ~3 consecutive failing runs whose on-failure handlers also log from to_thread reliably wedged the worker within seconds).
Symptoms in production (no debug mode)
- Worker keeps heartbeating; sibling async tasks on a different loop keep running.
- Engine sends
rx: start step run: <id>/<step> messages and the runner logs them, but no run: start step body or finished step run follows.
- Hatchet engine eventually emits
"THE TIME TO START THE TASK RUN IS TOO LONG, THE EVENT LOOP MAY BE BLOCKED" and "Waiting Steps N" with monotonically growing blocked_for.
- Sometimes self-clears after ~15 min; often requires a worker restart.
Suggested fix
Capture the queue's owning loop in consume() and route publish through call_soon_threadsafe (minimal change, no new dependency):
class AsyncLogSender:
def __init__(self, event_client):
self.event_client = event_client
self.q = asyncio.Queue[...](maxsize=...)
self._owner_loop: asyncio.AbstractEventLoop | None = None
async def consume(self) -> None:
self._owner_loop = asyncio.get_running_loop()
while True:
...
def publish(self, record) -> None:
loop = self._owner_loop
if loop is None:
return # consume() not started yet — drop or buffer
try:
running = asyncio.get_running_loop()
except RuntimeError:
running = None
if running is loop:
try:
self.q.put_nowait(record)
except asyncio.QueueFull:
logger.warning("log queue is full, dropping log message")
return
try:
loop.call_soon_threadsafe(self._enqueue_or_drop, record)
except RuntimeError:
pass # loop closed
def _enqueue_or_drop(self, record) -> None:
try:
self.q.put_nowait(record)
except asyncio.QueueFull:
logger.warning("log queue is full, dropping log message")
Workaround
Monkey-patch AsyncLogSender.publish in user code along the lines of fix above (capture the loop in consume, route foreign-thread publish calls through call_soon_threadsafe).
Notes
Anyone who logs from inside asyncio.to_thread while a step's contextvars are set is exposed — and that's a very common pattern (HTTP clients, blocking SDKs like googleapiclient, file I/O wrapped via to_thread). The bug is silent without debug mode, so it surfaces as flaky workers in production with no obvious cause.
Environment
hatchet-sdk1.33.2Summary
hatchet_sdk.worker.runner.utils.capture_logs.AsyncLogSender.publishcallsself.q.put_nowait(record)on anasyncio.Queue, butpublishis invoked from any thread Python's logging module dispatches into — including worker-pool threads created byasyncio.to_thread(...)inside a step.The queue is bound to one event loop. Cross-thread
put_nowaitcalls_wakeup_next → call_soonon the wrong loop, which underloop.set_debug(True)raisesRuntimeError: Non-thread-safe operation invoked on an event loop other than the current one, and under default (debug-off) silently corrupts that loop's internal call queue. The corruption manifests as a wedged Hatchet worker that still heartbeats and receivesstart step runmessages but never executes them ("Waiting Steps N blocked_for=Xs","THE TIME TO START THE TASK RUN IS TOO LONG, THE EVENT LOOP MAY BE BLOCKED").Stack trace (under
loop.set_debug(True))Root cause
LogForwardingHandler(alogging.StreamHandlersubclass) on the root logger. Itsemit()checksctx_step_run_id.get()and, if set, callsself.log_sender.publish(...).ctx_step_run_idis acontextvars.ContextVar.asyncio.to_thread(fn, ...)copies the currentContextinto the worker-pool thread, so any sync helper invoked viato_threadfrom within a step run inherits a populatedctx_step_run_id.googleapiclientcalls,requests, internallogger.info(...)for observability. Eachlogging.info(...)from the pool thread triggersLogForwardingHandler.emit → self.log_sender.publish(record) → self.q.put_nowait(record)on a thread that is not the queue's owning loop's thread.asyncio.Queue.put_nowaitis documented as a single-loop (coroutine-safe) operation. It calls_wakeup_next, which callsloop.call_soon(...), which under debug raises and under release silently mutates the loop's_readydeque from a foreign thread. The race eventually leaves the loop's internal scheduling state inconsistent, and the SDK runner stops launching new step coroutines.Reproduction
Any Hatchet workflow whose step delegates to a sync helper that logs.
Minimal repro:
Run with
loop.set_debug(True)on the worker thread loop and firerepro:fire. TheRuntimeErrorshown above fires immediately. Without debug mode, the same race silently corrupts loop state — fire enough events to surface the wedge (in practice, ~3 consecutive failing runs whose on-failure handlers also log fromto_threadreliably wedged the worker within seconds).Symptoms in production (no debug mode)
rx: start step run: <id>/<step>messages and the runner logs them, but norun: start stepbody orfinished step runfollows."THE TIME TO START THE TASK RUN IS TOO LONG, THE EVENT LOOP MAY BE BLOCKED"and"Waiting Steps N"with monotonically growingblocked_for.Suggested fix
Capture the queue's owning loop in
consume()and routepublishthroughcall_soon_threadsafe(minimal change, no new dependency):Workaround
Monkey-patch
AsyncLogSender.publishin user code along the lines of fix above (capture the loop inconsume, route foreign-threadpublishcalls throughcall_soon_threadsafe).Notes
Anyone who logs from inside
asyncio.to_threadwhile a step's contextvars are set is exposed — and that's a very common pattern (HTTP clients, blocking SDKs likegoogleapiclient, file I/O wrapped viato_thread). The bug is silent without debug mode, so it surfaces as flaky workers in production with no obvious cause.