Skip to content

[BUG] AsyncLogSender.publish is not thread-safe and can wedge the worker event loop #3805

@hobbyhack

Description

@hobbyhack

Environment

  • hatchet-sdk 1.33.2
  • Python 3.13.13 (CPython, Linux x86_64)

Summary

hatchet_sdk.worker.runner.utils.capture_logs.AsyncLogSender.publish calls self.q.put_nowait(record) on an asyncio.Queue, but publish is invoked from any thread Python's logging module dispatches into — including worker-pool threads created by asyncio.to_thread(...) inside a step.

The queue is bound to one event loop. Cross-thread put_nowait calls _wakeup_next → call_soon on the wrong loop, which under loop.set_debug(True) raises RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one, and under default (debug-off) silently corrupts that loop's internal call queue. The corruption manifests as a wedged Hatchet worker that still heartbeats and receives start step run messages but never executes them ("Waiting Steps N blocked_for=Xs", "THE TIME TO START THE TASK RUN IS TOO LONG, THE EVENT LOOP MAY BE BLOCKED").


Stack trace (under loop.set_debug(True))

File "hatchet_sdk/worker/runner/utils/capture_logs.py", line 154, in publish
    self.q.put_nowait(record)
File "asyncio/queues.py", line 170, in put_nowait
    self._wakeup_next(self._getters)
File "asyncio/queues.py", line 76, in _wakeup_next
    waiter.set_result(None)
File "asyncio/base_events.py", line 835, in call_soon
    self._check_thread()
File "asyncio/base_events.py", line 872, in _check_thread
    raise RuntimeError(
        "Non-thread-safe operation invoked on an event loop other "
        "than the current one")
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

Root cause

  1. The SDK installs a LogForwardingHandler (a logging.StreamHandler subclass) on the root logger. Its emit() checks ctx_step_run_id.get() and, if set, calls self.log_sender.publish(...).
  2. ctx_step_run_id is a contextvars.ContextVar. asyncio.to_thread(fn, ...) copies the current Context into the worker-pool thread, so any sync helper invoked via to_thread from within a step run inherits a populated ctx_step_run_id.
  3. Common sync helpers do logging — e.g. googleapiclient calls, requests, internal logger.info(...) for observability. Each logging.info(...) from the pool thread triggers LogForwardingHandler.emit → self.log_sender.publish(record) → self.q.put_nowait(record) on a thread that is not the queue's owning loop's thread.
  4. asyncio.Queue.put_nowait is documented as a single-loop (coroutine-safe) operation. It calls _wakeup_next, which calls loop.call_soon(...), which under debug raises and under release silently mutates the loop's _ready deque from a foreign thread. The race eventually leaves the loop's internal scheduling state inconsistent, and the SDK runner stops launching new step coroutines.

Reproduction

Any Hatchet workflow whose step delegates to a sync helper that logs.

Minimal repro:

import asyncio, logging
from hatchet_sdk import Hatchet, Context

hatchet = Hatchet()
wf = hatchet.workflow(name="repro", on_events=["repro:fire"])

@wf.task()
async def step(input, ctx: Context):
    def sync_helper():
        logging.getLogger("repro").info("hello from pool thread")
    # contextvars (incl. ctx_step_run_id) propagate into to_thread
    await asyncio.to_thread(sync_helper)
    return {}

Run with loop.set_debug(True) on the worker thread loop and fire repro:fire. The RuntimeError shown above fires immediately. Without debug mode, the same race silently corrupts loop state — fire enough events to surface the wedge (in practice, ~3 consecutive failing runs whose on-failure handlers also log from to_thread reliably wedged the worker within seconds).


Symptoms in production (no debug mode)

  • Worker keeps heartbeating; sibling async tasks on a different loop keep running.
  • Engine sends rx: start step run: <id>/<step> messages and the runner logs them, but no run: start step body or finished step run follows.
  • Hatchet engine eventually emits "THE TIME TO START THE TASK RUN IS TOO LONG, THE EVENT LOOP MAY BE BLOCKED" and "Waiting Steps N" with monotonically growing blocked_for.
  • Sometimes self-clears after ~15 min; often requires a worker restart.

Suggested fix

Capture the queue's owning loop in consume() and route publish through call_soon_threadsafe (minimal change, no new dependency):

class AsyncLogSender:
    def __init__(self, event_client):
        self.event_client = event_client
        self.q = asyncio.Queue[...](maxsize=...)
        self._owner_loop: asyncio.AbstractEventLoop | None = None

    async def consume(self) -> None:
        self._owner_loop = asyncio.get_running_loop()
        while True:
            ...

    def publish(self, record) -> None:
        loop = self._owner_loop
        if loop is None:
            return  # consume() not started yet — drop or buffer
        try:
            running = asyncio.get_running_loop()
        except RuntimeError:
            running = None
        if running is loop:
            try:
                self.q.put_nowait(record)
            except asyncio.QueueFull:
                logger.warning("log queue is full, dropping log message")
            return
        try:
            loop.call_soon_threadsafe(self._enqueue_or_drop, record)
        except RuntimeError:
            pass  # loop closed

    def _enqueue_or_drop(self, record) -> None:
        try:
            self.q.put_nowait(record)
        except asyncio.QueueFull:
            logger.warning("log queue is full, dropping log message")

Workaround

Monkey-patch AsyncLogSender.publish in user code along the lines of fix above (capture the loop in consume, route foreign-thread publish calls through call_soon_threadsafe).


Notes

Anyone who logs from inside asyncio.to_thread while a step's contextvars are set is exposed — and that's a very common pattern (HTTP clients, blocking SDKs like googleapiclient, file I/O wrapped via to_thread). The bug is silent without debug mode, so it surfaces as flaky workers in production with no obvious cause.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions