Skip to content

Fix large XCom payload causing task heartbeat timeout#64743

Open
skymensch wants to merge 3 commits intoapache:mainfrom
skymensch:fix/xcom-heartbeat-blocking-64628
Open

Fix large XCom payload causing task heartbeat timeout#64743
skymensch wants to merge 3 commits intoapache:mainfrom
skymensch:fix/xcom-heartbeat-blocking-64628

Conversation

@skymensch
Copy link
Copy Markdown

When a task pushes a large XCom payload (e.g. 300 MB), the supervisor's single-threaded event loop is blocked on the synchronous HTTP POST to the API server. During that time no heartbeats can be sent, and the scheduler eventually marks the task instance as failed with a heartbeat timeout — even though the task itself is still running successfully.

Root cause: ActivitySubprocess._handle_request() calls self.client.xcoms.set() synchronously. Because the supervisor uses a selectors-based event loop, any blocking call inside a handler stalls the entire loop, including _send_heartbeat_if_needed().

Fix: Offload the SetXCom API call to a single-worker ThreadPoolExecutor. The handler submits the future and returns immediately, so the event loop keeps ticking and heartbeats continue uninterrupted. A new _drain_pending_requests() helper is called on every loop iteration; it inspects completed futures and sends the appropriate response (or error) back to the task process.

  • max_workers=1 preserves ordering of concurrent XCom writes from the same task.
  • httpx.Client is thread-safe, so sharing the existing client with the worker thread is safe.
  • On process cleanup shutdown(wait=False) discards any in-flight upload because the task process is already gone.

closes: #64628


Was generative AI tooling used to co-author this PR?
  • Yes — Claude Code (claude-sonnet-4-6)

Generated-by: Claude Code (claude-sonnet-4-6) following the guidelines

@boring-cyborg
Copy link
Copy Markdown

boring-cyborg bot commented Apr 5, 2026

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@skymensch skymensch force-pushed the fix/xcom-heartbeat-blocking-64628 branch from a1b4a37 to 3566a8a Compare April 5, 2026 14:23
@kaxil kaxil requested a review from Copilot April 10, 2026 19:55
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

This PR prevents supervisor heartbeat timeouts when a task pushes very large XCom payloads by moving the blocking SetXCom HTTP call off the supervisor’s single-threaded event loop and into a single-worker thread pool, then draining completed futures to send responses back to the task process.

Changes:

  • Offload SetXCom handling to a ThreadPoolExecutor(max_workers=1) and track request futures.
  • Add _drain_pending_requests() and call it each event-loop iteration (and in the in-process supervisor comms path).
  • Update unit test harness to wait for threaded SetXCom completion before reading the response.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
task-sdk/src/airflow/sdk/execution_time/supervisor.py Adds threaded offload for SetXCom and drains pending futures to keep the event loop responsive for heartbeats.
task-sdk/tests/task_sdk/execution_time/test_supervisor.py Ensures tests wait for offloaded SetXCom work to finish and are drained before asserting/reading responses.
Comments suppressed due to low confidence (1)

task-sdk/src/airflow/sdk/execution_time/supervisor.py:1

  • The new threaded request path introduces new, important behavior that should be covered by tests: (1) a successful offloaded SetXCom results in exactly one response being sent once the future completes, and (2) failures in the future (both ServerResponseError and a generic exception) are turned into an ErrorResponse rather than causing the supervisor to stall or crash. Adding focused tests around _drain_pending_requests() and the offloaded SetXCom path would prevent regressions.
#

Comment on lines +717 to +740
exc = future.exception()
if exc is not None:
if isinstance(exc, ServerResponseError):
error_details = exc.response.json() if exc.response else None
log.error(
"API server error",
status_code=exc.response.status_code,
detail=error_details,
message=str(exc),
)
self.send_msg(
msg=None,
error=ErrorResponse(
error=ErrorType.API_SERVER_ERROR,
detail={
"status_code": exc.response.status_code,
"message": str(exc),
"detail": error_details,
},
),
request_id=req_id,
)
else:
self.send_msg(msg=None, request_id=req_id)
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two concrete issues in the exception path: (1) when ServerResponseError.response is None, the code dereferences exc.response.status_code (lines 723/732), which will raise and prevent any response being sent back to the task; (2) for exceptions that are not ServerResponseError, no response is sent at all, which can leave the task process waiting indefinitely for a reply. Fix by handling response is None safely (e.g., omit status_code or set it to None) and by adding a fallback that sends an appropriate ErrorResponse for any non-ServerResponseError exception.

Copilot uses AI. Check for mistakes.
Comment on lines +719 to +720
if isinstance(exc, ServerResponseError):
error_details = exc.response.json() if exc.response else None
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exc.response.json() can raise (e.g., non-JSON error body / decode error). If that happens inside _drain_pending_requests(), it can bubble out and disrupt the supervisor loop, defeating the goal of keeping heartbeats flowing. Wrap JSON parsing in a try/except and fall back to exc.response.text (or None) for detail if parsing fails.

Copilot uses AI. Check for mistakes.
Comment on lines +2623 to +2625
if isinstance(message, SetXCom):
watched_subprocess._request_thread_pool.shutdown(wait=True)
watched_subprocess._drain_pending_requests()
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling shutdown(wait=True) inside the request loop makes the executor unusable for any subsequent SetXCom submissions in the same test run, which can cause order-dependent failures if the generator sends multiple SetXCom messages. Prefer waiting on the currently pending futures (similar to the in-process path’s concurrent.futures.wait) and then calling _drain_pending_requests(), without shutting down the executor in the middle of the test.

Copilot uses AI. Check for mistakes.
Offload SetXCom API calls to a background thread in the supervisor
so the event loop can continue sending heartbeats while large payloads
upload. Previously, a synchronous HTTP POST for a large XCom blocked
the single-threaded supervisor loop, preventing heartbeats and causing
the scheduler to mark the task as failed.

closes: apache#64628

Signed-off-by: Haneul Yeom <haneul0826@gmail.com>
When SetXCom is offloaded to the thread pool, _handle_request() returns
immediately without calling send_msg(). The InProcessSupervisorComms.send()
then calls _get_response() on an empty deque, causing IndexError.

Wait for any pending futures and drain them before popping the response.

Signed-off-by: Haneul Yeom <nyeom@nyeom.dev>
- Guard against ServerResponseError.response being None before
  accessing .status_code or .json()
- Wrap response.json() in try/except to handle non-JSON error bodies
- Add fallback ErrorResponse for non-ServerResponseError exceptions so
  the task process is never left waiting indefinitely for a reply
- Replace shutdown(wait=True) in test with concurrent.futures.wait() so
  the ThreadPoolExecutor remains usable for subsequent SetXCom calls

Signed-off-by: Haneul Yeom <nyeom@nyeom.dev>
@skymensch skymensch force-pushed the fix/xcom-heartbeat-blocking-64628 branch from 226a26d to 38a9435 Compare April 15, 2026 00:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Large XCom Payload Causes Task Heartbeat Timeout

2 participants