Skip to content

fix(fetch): serialize result consumption per operation to prevent concurrent-fetch row loss#432

Open
msrathore-db wants to merge 2 commits into
mainfrom
fix/kernel-concurrent-fetch-rowloss
Open

fix(fetch): serialize result consumption per operation to prevent concurrent-fetch row loss#432
msrathore-db wants to merge 2 commits into
mainfrom
fix/kernel-concurrent-fetch-rowloss

Conversation

@msrathore-db

Copy link
Copy Markdown
Contributor

Problem

Two concurrent fetchAll() / fetchChunk() calls on a single operation could silently drop rows — the worst failure class (data loss, not an error).

The result pipeline behind every backend is a single stateful cursor. The kernel (SEA) path in particular threads shared, non-atomic prefetch state through KernelResultsProviderArrowResultConverterResultSlicer: each layer parks a pulled batch in a single look-ahead slot and mutates remainingRows / a shared recordBatchReader across await points with no guard. Two overlapping consumers interleave inside those non-atomic sections — a batch pulled off the cursor gets overwritten before it's delivered.

Reproduced on a 100k-row SELECT id FROM range(0, 100000) with two concurrent fetchAll():

Path Before
SEA (kernel) combined 99788 / 100000 — 212 rows lost
Thrift passes — but only because it delivers this result set in one drainable unit, so the two loops never interleave

(For reference, the Python connector's pyo3 binding wraps the same kernel stream in a RefCell, so its second concurrent fetch fails loud with Already borrowed rather than losing data. Node's napi binding waits on a tokio::Mutex instead, so both calls proceed — which is what fed the JS overwrite window.)

Fix

Serialize all result consumption on an operation via a per-operation fetch lock in DBSQLOperation (the shared facade — so both backends behave identically by construction):

  • fetchAll holds the lock across its entire drain loop (calling non-locking *Internal primitives to avoid self-deadlock).
  • fetchChunk / hasMoreRows hold it per call.

Holding the lock across the whole drain (not per chunk) is what produces true Thrift parity: the first fetchAll() drains the complete result set; a concurrent second one observes an exhausted cursor and returns []. No kernel change required.

Behavior change & Thrift safety

This is a behavior change only for the (ill-defined) pattern of concurrent consumers on one operation. The everyday single-consumer path is uncontended — the lock chain is an already-resolved promise.

Verified live on both backends:

THRIFT: a=100000, b=0   combined=100000
SEA   : a=100000, b=0   combined=100000   (was 99788 before)
  • overlapped fetch concurrency test: passes on both SEA and Thrift.
  • Thrift fetch-path regression suite (FetchAll, FetchChunk, HasMoreRows, Iterator, ResultSlicer, LimitExact, SchemaAndDrain): 39 passing, incl. fetchAll after a partial fetchChunk(20) returning the remaining 80 rows (confirms fetchChunkfetchAll sequencing still works through the lock).
  • SEA equivalent: 38 passing.
  • SEA full concurrency suite: 18 passing.

This pull request and its description were written by Isaac.

@github-actions

Copy link
Copy Markdown

Thanks for your contribution! To satisfy the DCO policy in our contributing guide every commit message must include a sign-off message. One or more of your commits is missing this message. You can reword previous commit messages with an interactive rebase (git rebase -i main).

@github-actions

Copy link
Copy Markdown

Thanks for your contribution! To satisfy the DCO policy in our contributing guide every commit message must include a sign-off message. One or more of your commits is missing this message. You can reword previous commit messages with an interactive rebase (git rebase -i main).

@msrathore-db msrathore-db enabled auto-merge June 11, 2026 07:09
…current-fetch row loss

Two concurrent fetchAll()/fetchChunk() calls on a single operation could
silently drop rows. The result pipeline behind every backend is a single
stateful cursor; the kernel (SEA) path in particular threads shared,
non-atomic prefetch state through KernelResultsProvider →
ArrowResultConverter → ResultSlicer, so overlapping consumers corrupt the
cursor and lose batches (observed ~99788/100000 rows on a 100k-row SELECT).
Thrift avoided visible loss only because it delivers this result set in one
drainable unit.

Serialize all result consumption on an operation via a per-operation fetch
lock. fetchAll holds it across its entire drain loop; fetchChunk/hasMoreRows
hold it per call. Holding it across the whole drain makes two concurrent
fetchAll() calls behave identically on both backends: the first drains the
complete result set, the second observes an exhausted cursor and returns []
— Thrift parity by construction, no kernel change required. The
single-consumer hot path is uncontended (the chain is an already-resolved
promise).

Co-authored-by: Isaac
Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
The fetchAll() drain holds the per-operation fetch lock across the whole loop
and therefore calls the non-locking `fetchChunkInternal`/`hasMoreRowsInternal`
primitives (calling the public `fetchChunk`/`hasMoreRows`, which re-acquire the
same lock, would self-deadlock). The existing unit test stubbed the *public*
methods, which the refactored drain no longer calls — so the stubs were bypassed,
the real internals ran with no data source, and the test timed out (2000ms).

Stub the internal primitives the drain actually invokes. Behavior asserted is
unchanged (fetchAll drains all chunks and returns all rows). Test is explicitly
implementation-specific (see its comment).

Co-authored-by: Isaac
Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
@msrathore-db msrathore-db force-pushed the fix/kernel-concurrent-fetch-rowloss branch from bbb22df to df3e568 Compare June 11, 2026 07:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant