Skip to content

Commit bec8a52

Browse files
committed
fix(fetch): serialize result consumption per operation to prevent concurrent-fetch row loss
Two concurrent fetchAll()/fetchChunk() calls on a single operation could silently drop rows. The result pipeline behind every backend is a single stateful cursor; the kernel (SEA) path in particular threads shared, non-atomic prefetch state through KernelResultsProvider → ArrowResultConverter → ResultSlicer, so overlapping consumers corrupt the cursor and lose batches (observed ~99788/100000 rows on a 100k-row SELECT). Thrift avoided visible loss only because it delivers this result set in one drainable unit. Serialize all result consumption on an operation via a per-operation fetch lock. fetchAll holds it across its entire drain loop; fetchChunk/hasMoreRows hold it per call. Holding it across the whole drain makes two concurrent fetchAll() calls behave identically on both backends: the first drains the complete result set, the second observes an exhausted cursor and returns [] — Thrift parity by construction, no kernel change required. The single-consumer hot path is uncontended (the chain is an already-resolved promise). Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent 6110300 commit bec8a52

1 file changed

Lines changed: 72 additions & 29 deletions

File tree

lib/DBSQLOperation.ts

Lines changed: 72 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,20 @@ export default class DBSQLOperation implements IOperation {
6565

6666
private sessionId?: string;
6767

68+
// Serialises all result consumption on THIS operation. `fetchAll` holds it
69+
// across its entire drain loop; `fetchChunk` / `hasMoreRows` hold it per
70+
// call. The result pipeline behind every backend is a single stateful cursor
71+
// (the kernel path in particular threads shared, non-atomic prefetch state
72+
// through `KernelResultsProvider` → `ArrowResultConverter` → `ResultSlicer`),
73+
// so concurrent consumers on one operation must be serialised or they corrupt
74+
// that cursor and silently drop rows. Holding the lock across the WHOLE
75+
// `fetchAll` drain (not per chunk) is what makes two concurrent `fetchAll()`
76+
// calls behave like the Thrift backend: the first drains the complete result
77+
// set, the second observes an exhausted cursor and returns `[]` — rather than
78+
// splitting the rows between them. Uncontended on the normal single-consumer
79+
// path (the chain is an already-resolved promise).
80+
private fetchChain: Promise<unknown> = Promise.resolve();
81+
6882
constructor(options: DBSQLOperationConstructorOptions) {
6983
this.context = options.context;
7084
this.backend = options.backend;
@@ -115,21 +129,30 @@ export default class DBSQLOperation implements IOperation {
115129
* const result = await queryOperation.fetchAll();
116130
*/
117131
public async fetchAll(options?: FetchOptions): Promise<Array<object>> {
118-
const data: Array<Array<object>> = [];
119-
120-
const fetchChunkOptions = {
121-
...options,
122-
disableBuffering: true,
123-
};
124-
125-
do {
126-
// eslint-disable-next-line no-await-in-loop
127-
const chunk = await this.fetchChunk(fetchChunkOptions);
128-
data.push(chunk);
129-
} while (await this.hasMoreRows()); // eslint-disable-line no-await-in-loop
130-
this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.id}`);
131-
132-
return data.flat();
132+
// Hold the fetch lock across the ENTIRE drain (see `fetchChain`): a
133+
// concurrent fetchAll()/fetchChunk() on the same operation queues behind
134+
// this loop instead of interleaving with it. The loop calls the
135+
// *Internal (non-locking) primitives to avoid self-deadlock on the lock we
136+
// already hold. Error telemetry wraps the whole drain.
137+
return this.runFetchExclusive(() =>
138+
this.withErrorTelemetry(async () => {
139+
const data: Array<Array<object>> = [];
140+
141+
const fetchChunkOptions = {
142+
...options,
143+
disableBuffering: true,
144+
};
145+
146+
do {
147+
// eslint-disable-next-line no-await-in-loop
148+
const chunk = await this.fetchChunkInternal(fetchChunkOptions);
149+
data.push(chunk);
150+
} while (await this.hasMoreRowsInternal()); // eslint-disable-line no-await-in-loop
151+
this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.id}`);
152+
153+
return data.flat();
154+
}),
155+
);
133156
}
134157

135158
/**
@@ -142,7 +165,7 @@ export default class DBSQLOperation implements IOperation {
142165
* const result = await queryOperation.fetchChunk({maxRows: 1000});
143166
*/
144167
public async fetchChunk(options?: FetchOptions): Promise<Array<object>> {
145-
return this.withErrorTelemetry(() => this.fetchChunkInternal(options));
168+
return this.runFetchExclusive(() => this.withErrorTelemetry(() => this.fetchChunkInternal(options)));
146169
}
147170

148171
private async fetchChunkInternal(options?: FetchOptions): Promise<Array<object>> {
@@ -241,21 +264,26 @@ export default class DBSQLOperation implements IOperation {
241264
}
242265

243266
public async hasMoreRows(): Promise<boolean> {
244-
return this.withErrorTelemetry(async () => {
245-
// If operation is closed or cancelled - we should not try to get data from it
246-
if (this.closed || this.cancelled) {
247-
return false;
248-
}
267+
return this.runFetchExclusive(() => this.withErrorTelemetry(() => this.hasMoreRowsInternal()));
268+
}
249269

250-
// Wait for operation to finish before checking for more rows
251-
// This ensures metadata can be fetched successfully
252-
if (this.backend.hasResultSet()) {
253-
await this.waitUntilReadyThroughBackend();
254-
}
270+
// Non-locking body of `hasMoreRows`. Called directly by `fetchAll`'s drain
271+
// loop (which already holds `fetchChain`) and by the public `hasMoreRows`
272+
// wrapper (which acquires it). Must never acquire the lock itself.
273+
private async hasMoreRowsInternal(): Promise<boolean> {
274+
// If operation is closed or cancelled - we should not try to get data from it
275+
if (this.closed || this.cancelled) {
276+
return false;
277+
}
255278

256-
// If we fetched all the data from server - check if there's anything buffered in result handler
257-
return this.backend.hasMore();
258-
});
279+
// Wait for operation to finish before checking for more rows
280+
// This ensures metadata can be fetched successfully
281+
if (this.backend.hasResultSet()) {
282+
await this.waitUntilReadyThroughBackend();
283+
}
284+
285+
// If we fetched all the data from server - check if there's anything buffered in result handler
286+
return this.backend.hasMore();
259287
}
260288

261289
public async getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null> {
@@ -338,6 +366,21 @@ export default class DBSQLOperation implements IOperation {
338366
}
339367
}
340368

369+
// Run `fn` with exclusive access to this operation's result cursor by
370+
// chaining it onto `fetchChain`. The next caller waits for this one to settle
371+
// (success OR failure) before starting, so the single stateful fetch pipeline
372+
// is only ever driven by one in-flight consumer. A rejection is delivered to
373+
// THIS caller but not propagated to the next waiter (the chain swallows it),
374+
// so one failed fetch never poisons subsequent fetches.
375+
private runFetchExclusive<T>(fn: () => Promise<T>): Promise<T> {
376+
const run = this.fetchChain.then(fn, fn);
377+
this.fetchChain = run.then(
378+
() => undefined,
379+
() => undefined,
380+
);
381+
return run;
382+
}
383+
341384
private async failIfClosed(): Promise<void> {
342385
if (this.closed) {
343386
throw new OperationStateError(OperationStateErrorCode.Closed);

0 commit comments

Comments
 (0)