Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 72 additions & 29 deletions lib/DBSQLOperation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ export default class DBSQLOperation implements IOperation {

private sessionId?: string;

// Serialises all result consumption on THIS operation. `fetchAll` holds it
// across its entire drain loop; `fetchChunk` / `hasMoreRows` hold it per
// call. The result pipeline behind every backend is a single stateful cursor
// (the kernel path in particular threads shared, non-atomic prefetch state
// through `KernelResultsProvider` → `ArrowResultConverter` → `ResultSlicer`),
// so concurrent consumers on one operation must be serialised or they corrupt
// that cursor and silently drop rows. Holding the lock across the WHOLE
// `fetchAll` drain (not per chunk) is what makes two concurrent `fetchAll()`
// calls behave like the Thrift backend: the first drains the complete result
// set, the second observes an exhausted cursor and returns `[]` — rather than
// splitting the rows between them. Uncontended on the normal single-consumer
// path (the chain is an already-resolved promise).
private fetchChain: Promise<unknown> = Promise.resolve();

constructor(options: DBSQLOperationConstructorOptions) {
this.context = options.context;
this.backend = options.backend;
Expand Down Expand Up @@ -115,21 +129,30 @@ export default class DBSQLOperation implements IOperation {
* const result = await queryOperation.fetchAll();
*/
public async fetchAll(options?: FetchOptions): Promise<Array<object>> {
const data: Array<Array<object>> = [];

const fetchChunkOptions = {
...options,
disableBuffering: true,
};

do {
// eslint-disable-next-line no-await-in-loop
const chunk = await this.fetchChunk(fetchChunkOptions);
data.push(chunk);
} while (await this.hasMoreRows()); // eslint-disable-line no-await-in-loop
this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.id}`);

return data.flat();
// Hold the fetch lock across the ENTIRE drain (see `fetchChain`): a
// concurrent fetchAll()/fetchChunk() on the same operation queues behind
// this loop instead of interleaving with it. The loop calls the
// *Internal (non-locking) primitives to avoid self-deadlock on the lock we
// already hold. Error telemetry wraps the whole drain.
return this.runFetchExclusive(() =>
this.withErrorTelemetry(async () => {
const data: Array<Array<object>> = [];

const fetchChunkOptions = {
...options,
disableBuffering: true,
};

do {
// eslint-disable-next-line no-await-in-loop
const chunk = await this.fetchChunkInternal(fetchChunkOptions);
data.push(chunk);
} while (await this.hasMoreRowsInternal()); // eslint-disable-line no-await-in-loop
this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.id}`);

return data.flat();
}),
);
}

/**
Expand All @@ -142,7 +165,7 @@ export default class DBSQLOperation implements IOperation {
* const result = await queryOperation.fetchChunk({maxRows: 1000});
*/
public async fetchChunk(options?: FetchOptions): Promise<Array<object>> {
return this.withErrorTelemetry(() => this.fetchChunkInternal(options));
return this.runFetchExclusive(() => this.withErrorTelemetry(() => this.fetchChunkInternal(options)));
}

private async fetchChunkInternal(options?: FetchOptions): Promise<Array<object>> {
Expand Down Expand Up @@ -241,21 +264,26 @@ export default class DBSQLOperation implements IOperation {
}

public async hasMoreRows(): Promise<boolean> {
return this.withErrorTelemetry(async () => {
// If operation is closed or cancelled - we should not try to get data from it
if (this.closed || this.cancelled) {
return false;
}
return this.runFetchExclusive(() => this.withErrorTelemetry(() => this.hasMoreRowsInternal()));
}

// Wait for operation to finish before checking for more rows
// This ensures metadata can be fetched successfully
if (this.backend.hasResultSet()) {
await this.waitUntilReadyThroughBackend();
}
// Non-locking body of `hasMoreRows`. Called directly by `fetchAll`'s drain
// loop (which already holds `fetchChain`) and by the public `hasMoreRows`
// wrapper (which acquires it). Must never acquire the lock itself.
private async hasMoreRowsInternal(): Promise<boolean> {
// If operation is closed or cancelled - we should not try to get data from it
if (this.closed || this.cancelled) {
return false;
}

// If we fetched all the data from server - check if there's anything buffered in result handler
return this.backend.hasMore();
});
// Wait for operation to finish before checking for more rows
// This ensures metadata can be fetched successfully
if (this.backend.hasResultSet()) {
await this.waitUntilReadyThroughBackend();
}

// If we fetched all the data from server - check if there's anything buffered in result handler
return this.backend.hasMore();
}

public async getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null> {
Expand Down Expand Up @@ -338,6 +366,21 @@ export default class DBSQLOperation implements IOperation {
}
}

// Run `fn` with exclusive access to this operation's result cursor by
// chaining it onto `fetchChain`. The next caller waits for this one to settle
// (success OR failure) before starting, so the single stateful fetch pipeline
// is only ever driven by one in-flight consumer. A rejection is delivered to
// THIS caller but not propagated to the next waiter (the chain swallows it),
// so one failed fetch never poisons subsequent fetches.
private runFetchExclusive<T>(fn: () => Promise<T>): Promise<T> {
const run = this.fetchChain.then(fn, fn);
this.fetchChain = run.then(
() => undefined,
() => undefined,
);
return run;
}

private async failIfClosed(): Promise<void> {
if (this.closed) {
throw new OperationStateError(OperationStateErrorCode.Closed);
Expand Down
24 changes: 15 additions & 9 deletions tests/unit/DBSQLOperation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1008,20 +1008,26 @@ describe('DBSQLOperation', () => {
const originalData = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0];

const tempData = [...originalData];
const fetchChunkStub = sinon.stub(operation, 'fetchChunk').callsFake(async (): Promise<Array<any>> => {
return tempData.splice(0, 3);
});
const hasMoreRowsStub = sinon.stub(operation, 'hasMoreRows').callsFake(async () => {
// Warning: this check is implementation-specific.
// `fetchAll` holds the per-operation fetch lock across the entire drain, so
// it calls the *non-locking* internal primitives (`fetchChunkInternal` /
// `hasMoreRowsInternal`) rather than the public `fetchChunk` / `hasMoreRows`
// (which re-acquire the same lock and would self-deadlock). Stub the
// internals that the drain loop actually invokes.
const fetchChunkStub = sinon
.stub(operation as any, 'fetchChunkInternal')
.callsFake(async (): Promise<Array<any>> => {
return tempData.splice(0, 3);
});
const hasMoreRowsStub = sinon.stub(operation as any, 'hasMoreRowsInternal').callsFake(async () => {
return tempData.length > 0;
});

const fetchedData = await operation.fetchAll();

// Warning: this check is implementation-specific
// `fetchAll` should wait for operation to complete. In current implementation
// it does so by calling `fetchChunk` at least once, which internally does
// all the job. But since here we stub `fetchChunk` it won't really wait,
// therefore here we ensure it was called at least once
// `fetchAll` should wait for the operation to complete; in the current
// implementation it does so by draining via `fetchChunkInternal` at least
// once, which internally does all the work.
expect(fetchChunkStub.callCount).to.be.gte(1);

expect(fetchChunkStub.called).to.be.true;
Expand Down
Loading