Skip to content

Commit a882dd2

Browse files
sumedhsakdeoclaude
andcommitted
fix: drain until sentinel to prevent deadlock on early generator close
When concurrent_files > 1 and max_buffered_batches is small, multiple workers can be blocked on batch_queue.put() at the moment the consumer closes early (e.g. due to a limit). The previous drain loop used get_nowait() + empty() which had a race: empty() could return True before a just-notified worker had a chance to put, leaving remaining workers stuck on put() forever while executor.shutdown(wait=True) hung. Fix: replace the racy drain loop with a blocking drain-until-sentinel loop. Each get() naturally wakes one blocked worker via not_full.notify(); that worker checks cancel and returns, eventually allowing the last worker to put the sentinel. Stopping only on the sentinel guarantees all workers have finished before we exit. Also move batch_queue.put(_QUEUE_SENTINEL) outside remaining_lock to avoid holding a lock during a potentially blocking call. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent caa079e commit a882dd2

File tree

2 files changed

+62
-8
lines changed

2 files changed

+62
-8
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1727,18 +1727,21 @@ def worker(task: FileScanTask) -> None:
17271727
finally:
17281728
with remaining_lock:
17291729
remaining -= 1
1730-
if remaining == 0:
1731-
batch_queue.put(_QUEUE_SENTINEL)
1730+
is_last = remaining == 0
1731+
if is_last:
1732+
batch_queue.put(_QUEUE_SENTINEL)
17321733

17331734
with ThreadPoolExecutor(max_workers=concurrent_streams) as executor:
17341735
for task in tasks:
17351736
executor.submit(worker, task)
17361737

1738+
saw_sentinel = False
17371739
try:
17381740
while True:
17391741
item = batch_queue.get()
17401742

17411743
if item is _QUEUE_SENTINEL:
1744+
saw_sentinel = True
17421745
break
17431746

17441747
if isinstance(item, BaseException):
@@ -1747,12 +1750,16 @@ def worker(task: FileScanTask) -> None:
17471750
yield item
17481751
finally:
17491752
cancel.set()
1750-
# Drain the queue to unblock any workers stuck on put()
1751-
while not batch_queue.empty():
1752-
try:
1753-
batch_queue.get_nowait()
1754-
except queue.Empty:
1755-
break
1753+
if not saw_sentinel:
1754+
# Drain the queue to unblock workers stuck on put().
1755+
# Each get() wakes one waiting producer; that producer checks
1756+
# cancel and returns, eventually allowing the last worker to
1757+
# put the sentinel. We stop only when we see the sentinel,
1758+
# which guarantees all workers have finished.
1759+
while True:
1760+
item = batch_queue.get()
1761+
if item is _QUEUE_SENTINEL:
1762+
break
17561763

17571764

17581765
_DEFAULT_SCAN_ORDER: ScanOrder = TaskOrder()

tests/io/test_bounded_concurrent_batches.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,53 @@ def batch_fn(t: FileScanTask) -> Iterator[pa.RecordBatch]:
142142
list(gen)
143143

144144

145+
def test_no_deadlock_on_early_termination_with_full_queue() -> None:
146+
"""Regression test for deadlock when concurrent_files > 1 and max_buffered_batches=1.
147+
148+
Scenario:
149+
- max_buffered_batches=1, concurrent_files=3
150+
- Queue is full (1 item), Workers A/B/C all blocked on put()
151+
- Consumer closes the generator (GeneratorExit → finally → cancel.set())
152+
- Buggy drain loop: get_nowait() removes the 1 item, empty() → True,
153+
exits before A/B/C have a chance to put. Workers remain stuck on put()
154+
and executor.shutdown(wait=True) hangs forever.
155+
"""
156+
tasks = [_make_task() for _ in range(3)]
157+
158+
blocked_on_put = 0
159+
blocked_lock = threading.Lock()
160+
at_least_two_blocked = threading.Event()
161+
162+
def batch_fn(t: FileScanTask) -> Iterator[pa.RecordBatch]:
163+
nonlocal blocked_on_put
164+
for i in range(100):
165+
with blocked_lock:
166+
blocked_on_put += 1
167+
if blocked_on_put >= 2:
168+
at_least_two_blocked.set()
169+
yield pa.record_batch({"col": [i]})
170+
with blocked_lock:
171+
blocked_on_put -= 1
172+
173+
gen = _bounded_concurrent_batches(tasks, batch_fn, concurrent_streams=3, max_buffered_batches=1)
174+
next(gen) # consume 1 batch, workers immediately try to put the next
175+
176+
# Wait until at least 2 workers are inside batch_fn trying to put — confirming
177+
# they will block on put() since the queue is full (maxsize=1)
178+
assert at_least_two_blocked.wait(timeout=5.0), "Workers did not reach put() in time"
179+
180+
closed = threading.Event()
181+
182+
def close_gen() -> None:
183+
gen.close()
184+
closed.set()
185+
186+
t = threading.Thread(target=close_gen, daemon=True)
187+
t.start()
188+
assert closed.wait(timeout=10.0), "Deadlock: gen.close() did not complete — workers stuck on put()"
189+
t.join(timeout=1.0)
190+
191+
145192
def test_early_termination() -> None:
146193
"""Test that stopping consumption cancels workers."""
147194
tasks = [_make_task() for _ in range(5)]

0 commit comments

Comments
 (0)