Skip to content

Commit a4c3330

Browse files
d-v-bclaude
andcommitted
fix(core): cancel pending fetches on early exit and stop-after-miss
Two related correctness issues in coalesced_get's drain loop: 1. When the consumer breaks out of the async-for (early exit), the generator's finally block only awaited in-flight tasks rather than cancelling them. That wasted I/O. Cancel first, then gather. 2. The drain loop waited on completion_queue for ``total`` entries, but after a "missing" or "error" we cancel pending tasks -- and cancelled tasks never enqueue a completion. With max_concurrency > 1 this could hang. Rework the drain loop to break out immediately on the first miss/error; the finally block handles cleanup. The new structure also collapses the redundant miss/error branches and removes the now-unused ``total``/``drained``/``stopped`` bookkeeping. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent b2ec638 commit a4c3330

1 file changed

Lines changed: 16 additions & 20 deletions

File tree

src/zarr/core/_coalesce.py

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,6 @@ async def coalesced_get(
117117
]
118118
work_items.extend([(idx, single)] for idx, single in uncoalescable)
119119

120-
total = len(work_items)
121-
122120
# Completion queue entries are either ("ok", payload), ("missing", None),
123121
# or ("error", exception). Kept as Any internally to avoid dragging
124122
# Sequence out of TYPE_CHECKING.
@@ -166,34 +164,32 @@ async def run_one(members: list[tuple[int, ByteRequest | None]]) -> None:
166164
tasks.add(asyncio.create_task(run_one(item)))
167165

168166
try:
169-
drained = 0
170-
stopped = False
171167
pending_error: BaseException | None = None
172-
while drained < total:
168+
for _ in range(len(work_items)):
173169
kind, payload = await completion_queue.get()
174-
drained += 1
175-
if stopped:
176-
continue # Discard remaining results after a miss or error.
177170
if kind == "ok":
178171
assert payload is not None
179172
assert not isinstance(payload, BaseException)
180173
yield payload
181-
elif kind == "missing":
182-
stopped = True
183-
# Cancel any still-pending tasks to avoid unnecessary I/O.
184-
for t in tasks:
185-
if not t.done():
186-
t.cancel()
187-
else: # "error"
174+
continue
175+
# "missing" or "error": stop scheduling and cancel pending work.
176+
# Late arrivals that raced to enqueue before cancellation took
177+
# effect sit in the completion queue and are discarded by the
178+
# finally block (the queue is local and will be garbage-collected).
179+
for t in tasks:
180+
if not t.done():
181+
t.cancel()
182+
if kind == "error":
188183
assert isinstance(payload, BaseException)
189-
stopped = True
190184
pending_error = payload
191-
for t in tasks:
192-
if not t.done():
193-
t.cancel()
185+
break
194186
if pending_error is not None:
195187
raise pending_error
196188
finally:
197-
# Ensure we wait for any cancelled tasks to finish so no task escapes.
189+
# Best-effort cancellation for in-flight tasks (covers the consumer
190+
# break / early-exit case where we did not proactively cancel).
191+
for t in tasks:
192+
if not t.done():
193+
t.cancel()
198194
if tasks:
199195
await asyncio.gather(*tasks, return_exceptions=True)

0 commit comments

Comments
 (0)