Skip to content

Commit 226a26d

Browse files
committed
Fix InProcessSupervisorComms not awaiting deferred SetXCom response
When SetXCom is offloaded to the thread pool, _handle_request() returns immediately without calling send_msg(). The InProcessSupervisorComms.send() then calls _get_response() on an empty deque, causing IndexError. Wait for any pending futures and drain them before popping the response. Signed-off-by: Haneul Yeom <nyeom@nyeom.dev>
1 parent 3566a8a commit 226a26d

1 file changed

Lines changed: 10 additions & 0 deletions

File tree

task-sdk/src/airflow/sdk/execution_time/supervisor.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1611,6 +1611,16 @@ def send(self, msg: BaseModel):
16111611
with set_supervisor_comms(None):
16121612
self.supervisor._handle_request(msg, log, 0) # type: ignore[arg-type]
16131613

1614+
# Some requests (e.g. SetXCom) are offloaded to the supervisor's thread pool to
1615+
# avoid blocking its event loop. In the in-process path there is no event loop
1616+
# calling _drain_pending_requests(), so we wait for any in-flight futures here
1617+
# and drain them so that the response is available before we try to pop it.
1618+
if self.supervisor._pending_requests:
1619+
from concurrent.futures import wait as futures_wait
1620+
1621+
futures_wait([f for f, _ in self.supervisor._pending_requests])
1622+
self.supervisor._drain_pending_requests()
1623+
16141624
return self._get_response()
16151625

16161626

0 commit comments

Comments
 (0)