Fix large XCom payload causing task heartbeat timeout#64743
Fix large XCom payload causing task heartbeat timeout#64743skymensch wants to merge 3 commits intoapache:mainfrom
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
a1b4a37 to
3566a8a
Compare
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR prevents supervisor heartbeat timeouts when a task pushes very large XCom payloads by moving the blocking SetXCom HTTP call off the supervisor’s single-threaded event loop and into a single-worker thread pool, then draining completed futures to send responses back to the task process.
Changes:
- Offload
SetXComhandling to aThreadPoolExecutor(max_workers=1)and track request futures. - Add
_drain_pending_requests()and call it each event-loop iteration (and in the in-process supervisor comms path). - Update unit test harness to wait for threaded SetXCom completion before reading the response.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| task-sdk/src/airflow/sdk/execution_time/supervisor.py | Adds threaded offload for SetXCom and drains pending futures to keep the event loop responsive for heartbeats. |
| task-sdk/tests/task_sdk/execution_time/test_supervisor.py | Ensures tests wait for offloaded SetXCom work to finish and are drained before asserting/reading responses. |
Comments suppressed due to low confidence (1)
task-sdk/src/airflow/sdk/execution_time/supervisor.py:1
- The new threaded request path introduces new, important behavior that should be covered by tests: (1) a successful offloaded
SetXComresults in exactly one response being sent once the future completes, and (2) failures in the future (bothServerResponseErrorand a generic exception) are turned into anErrorResponserather than causing the supervisor to stall or crash. Adding focused tests around_drain_pending_requests()and the offloadedSetXCompath would prevent regressions.
#
| exc = future.exception() | ||
| if exc is not None: | ||
| if isinstance(exc, ServerResponseError): | ||
| error_details = exc.response.json() if exc.response else None | ||
| log.error( | ||
| "API server error", | ||
| status_code=exc.response.status_code, | ||
| detail=error_details, | ||
| message=str(exc), | ||
| ) | ||
| self.send_msg( | ||
| msg=None, | ||
| error=ErrorResponse( | ||
| error=ErrorType.API_SERVER_ERROR, | ||
| detail={ | ||
| "status_code": exc.response.status_code, | ||
| "message": str(exc), | ||
| "detail": error_details, | ||
| }, | ||
| ), | ||
| request_id=req_id, | ||
| ) | ||
| else: | ||
| self.send_msg(msg=None, request_id=req_id) |
There was a problem hiding this comment.
Two concrete issues in the exception path: (1) when ServerResponseError.response is None, the code dereferences exc.response.status_code (lines 723/732), which will raise and prevent any response being sent back to the task; (2) for exceptions that are not ServerResponseError, no response is sent at all, which can leave the task process waiting indefinitely for a reply. Fix by handling response is None safely (e.g., omit status_code or set it to None) and by adding a fallback that sends an appropriate ErrorResponse for any non-ServerResponseError exception.
| if isinstance(exc, ServerResponseError): | ||
| error_details = exc.response.json() if exc.response else None |
There was a problem hiding this comment.
exc.response.json() can raise (e.g., non-JSON error body / decode error). If that happens inside _drain_pending_requests(), it can bubble out and disrupt the supervisor loop, defeating the goal of keeping heartbeats flowing. Wrap JSON parsing in a try/except and fall back to exc.response.text (or None) for detail if parsing fails.
| if isinstance(message, SetXCom): | ||
| watched_subprocess._request_thread_pool.shutdown(wait=True) | ||
| watched_subprocess._drain_pending_requests() |
There was a problem hiding this comment.
Calling shutdown(wait=True) inside the request loop makes the executor unusable for any subsequent SetXCom submissions in the same test run, which can cause order-dependent failures if the generator sends multiple SetXCom messages. Prefer waiting on the currently pending futures (similar to the in-process path’s concurrent.futures.wait) and then calling _drain_pending_requests(), without shutting down the executor in the middle of the test.
Offload SetXCom API calls to a background thread in the supervisor so the event loop can continue sending heartbeats while large payloads upload. Previously, a synchronous HTTP POST for a large XCom blocked the single-threaded supervisor loop, preventing heartbeats and causing the scheduler to mark the task as failed. closes: apache#64628 Signed-off-by: Haneul Yeom <haneul0826@gmail.com>
When SetXCom is offloaded to the thread pool, _handle_request() returns immediately without calling send_msg(). The InProcessSupervisorComms.send() then calls _get_response() on an empty deque, causing IndexError. Wait for any pending futures and drain them before popping the response. Signed-off-by: Haneul Yeom <nyeom@nyeom.dev>
- Guard against ServerResponseError.response being None before accessing .status_code or .json() - Wrap response.json() in try/except to handle non-JSON error bodies - Add fallback ErrorResponse for non-ServerResponseError exceptions so the task process is never left waiting indefinitely for a reply - Replace shutdown(wait=True) in test with concurrent.futures.wait() so the ThreadPoolExecutor remains usable for subsequent SetXCom calls Signed-off-by: Haneul Yeom <nyeom@nyeom.dev>
226a26d to
38a9435
Compare
When a task pushes a large XCom payload (e.g. 300 MB), the supervisor's single-threaded event loop is blocked on the synchronous HTTP POST to the API server. During that time no heartbeats can be sent, and the scheduler eventually marks the task instance as failed with a heartbeat timeout — even though the task itself is still running successfully.
Root cause:
ActivitySubprocess._handle_request()callsself.client.xcoms.set()synchronously. Because the supervisor uses aselectors-based event loop, any blocking call inside a handler stalls the entire loop, including_send_heartbeat_if_needed().Fix: Offload the
SetXComAPI call to a single-workerThreadPoolExecutor. The handler submits the future and returns immediately, so the event loop keeps ticking and heartbeats continue uninterrupted. A new_drain_pending_requests()helper is called on every loop iteration; it inspects completed futures and sends the appropriate response (or error) back to the task process.max_workers=1preserves ordering of concurrent XCom writes from the same task.httpx.Clientis thread-safe, so sharing the existing client with the worker thread is safe.shutdown(wait=False)discards any in-flight upload because the task process is already gone.closes: #64628
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (claude-sonnet-4-6) following the guidelines