Skip to content

Commit f1bbb63

Browse files
authored
Fix deadlock in AsyncWrapper reset_state() (#38427)
* Add a test to reproduce hanging. * Fix deadlock between shutdown in main thread and done callback in worker threads. * Address review comments. * Fix format * Modify the test to cover reset_state() hanging in asyncio mode. * Fix the deadlock when asyncio is used. * Fix formatting. * Increase timeout to reduce false-positives. * Revise test function names and some comments.
1 parent 13bbd5c commit f1bbb63

2 files changed

Lines changed: 59 additions & 5 deletions

File tree

sdks/python/apache_beam/transforms/async_dofn.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,21 +153,39 @@ def _run_event_loop():
153153

154154
@staticmethod
155155
def reset_state():
156+
event_loop_thread_to_join = None
156157
with AsyncWrapper._lock:
157158
if AsyncWrapper._event_loop:
158159
AsyncWrapper._event_loop.call_soon_threadsafe(
159160
AsyncWrapper._event_loop.stop)
160161
if AsyncWrapper._event_loop_thread:
161-
AsyncWrapper._event_loop_thread.join()
162+
event_loop_thread_to_join = AsyncWrapper._event_loop_thread
162163

163164
AsyncWrapper._event_loop = None
164165
AsyncWrapper._event_loop_thread = None
165166
if AsyncWrapper._loop_started is not None:
166167
AsyncWrapper._loop_started.clear()
167168

168-
for pool in AsyncWrapper._pool.values():
169-
pool.acquire(AsyncWrapper.initialize_pool(1)).shutdown(
170-
wait=True, cancel_futures=True)
169+
pools = list(AsyncWrapper._pool.values())
170+
171+
# We must join the asyncio event loop thread outside of the lock block.
172+
# If joined inside the lock, the waiting thread holds the lock while blocking,
173+
# preventing active coroutines' done callbacks from acquiring the lock on the
174+
# event loop thread, resulting in a deadlock.
175+
if event_loop_thread_to_join:
176+
event_loop_thread_to_join.join()
177+
178+
# We must acquire and shut down the thread pools outside of the lock block.
179+
# If shutdown(wait=True) is called inside the lock, the caller blocks holding
180+
# the lock, preventing active worker threads from acquiring the lock to run
181+
# their done callbacks, resulting in a deadlock.
182+
pools_to_shutdown = [
183+
pool.acquire(AsyncWrapper.initialize_pool(1)) for pool in pools
184+
]
185+
186+
for pool in pools_to_shutdown:
187+
pool.shutdown(wait=True, cancel_futures=True)
188+
171189
with AsyncWrapper._lock:
172190
AsyncWrapper._pool = {}
173191
AsyncWrapper._processing_elements = {}
@@ -268,7 +286,8 @@ async def _collect(result):
268286

269287
def decrement_items_in_buffer(self, future):
270288
with AsyncWrapper._lock:
271-
AsyncWrapper._items_in_buffer[self._uuid] -= 1
289+
if self._uuid in AsyncWrapper._items_in_buffer:
290+
AsyncWrapper._items_in_buffer[self._uuid] -= 1
272291

273292
def schedule_if_room(self, element, ignore_buffer=False, *args, **kwargs):
274293
"""Schedules an item to be processed asynchronously if there is room.

sdks/python/apache_beam/transforms/async_dofn_test.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#
1717

1818
import logging
19+
import multiprocessing
1920
import random
2021
import time
2122
import unittest
@@ -487,6 +488,40 @@ def add_item(i):
487488
self.check_output(results[i], expected_outputs['key' + str(i)])
488489
self.assertEqual(bag_states['key' + str(i)].items, [])
489490

491+
@staticmethod
492+
def _run_reset_state_concurrent_teardown(use_asyncio):
493+
dofn = BasicDofn(sleep_time=0.5)
494+
async_dofn = async_lib.AsyncWrapper(dofn, use_asyncio=use_asyncio)
495+
async_dofn.setup()
496+
fake_bag_state = FakeBagState([])
497+
fake_timer = FakeTimer(0)
498+
499+
# Start processing an item. This starts a worker thread/coroutine sleeping for 0.5s.
500+
async_dofn.process(('key1', 1), to_process=fake_bag_state, timer=fake_timer)
501+
time.sleep(0.05)
502+
503+
# Verify that calling reset_state() while background tasks are actively running
504+
# completes cleanly without causing lock-ordering deadlocks.
505+
async_lib.AsyncWrapper.reset_state()
506+
507+
def test_reset_state_concurrent_teardown(self):
508+
# Verify concurrent teardown safety in a separate process to prevent any potential
509+
# regressions from freezing the main pytest process at exit.
510+
p = multiprocessing.Process(
511+
target=AsyncTest._run_reset_state_concurrent_teardown,
512+
args=(self.use_asyncio, ))
513+
p.start()
514+
p.join(timeout=10.0)
515+
516+
if p.is_alive():
517+
p.terminate()
518+
p.join()
519+
self.fail(
520+
"reset_state() deadlocked/hung waiting for active threads/tasks to finish"
521+
)
522+
else:
523+
self.assertEqual(p.exitcode, 0)
524+
490525

491526
if __name__ == '__main__':
492527
unittest.main()

0 commit comments

Comments
 (0)