Skip to content

Commit bebf88f

Browse files
committed
fix(sea): address #410 review — fetch cleanup, cooperative cancel, parity, docs
Validated each finding against a live pecotesting warehouse first; the headline INTERVAL story turned out to be split-artifact, not breakage. - F7: getResultMetadata stored the *unpatched* Duration IPC bytes in meta.arrowSchema while advertising ArrowBased — store the patched bytes so an ArrowBased consumer doesn't hit `Unrecognized type "Duration" (18)`. - F3: fetchChunk now honors the `isClosed` cooperative-cancel probe (parity with ThriftOperationBackend) at its yield points. - F6: on a fetch error, best-effort close the statement (napi contract: stream is unspecified after Err) and surface a typed kernel error via decodeNapiKernelError. - F9: cancel-after-fetch now throws the canonical OperationStateError(Canceled) ("The operation was canceled by a client") — byte-matches the Thrift message. - F10: typed HiveDriverError (not raw Error) in the schema/fetchNextBatch guards. - F1: corrected SeaArrowIpcDurationFix docs — on this layer the rewriter only makes Duration *decodable* (raw Int64); the duration_unit formatter lands in #411 (verified live: byte-identical to Thrift). - F5: documented that nested Duration is a SHARED apache-arrow@13 limitation — verified the Thrift backend throws the identical error, so SEA matches parity. - F2: added a live e2e that drives a real Arrow Duration column through the rewriter (asserts no "Duration (18)" crash + raw-Int64 on this layer). - F8: pinned the no-`Failed` invariant in status() (failures reject at submit). - F12: renamed SeaResultsProvider's SeaStatementHandle → SeaFetchHandle (was a name collision with the lifecycle interface of a different shape). - F13: dropped the no-op await on the synchronous statement.schema(). - F14: fixed the Float-precision comment (Precision enum, not bit-width). - F15: SeaResultsProvider.prime loops instead of self-recursing on empty batches. Deferred (noted on the PR): F4 (per-batch triple-decode perf) and F11 (hasResultSet() hard-coded true for M0). Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent ec6c741 commit bebf88f

6 files changed

Lines changed: 207 additions & 52 deletions

File tree

lib/sea/SeaArrowIpc.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,9 @@ function arrowTypeToTTypeId(field: Field<DataType>): TTypeId {
205205
}
206206
}
207207
if (DataType.isFloat(arrowType)) {
208-
// arrow Float precision: 16=HALF, 32=SINGLE, 64=DOUBLE
208+
// `precision` is the Arrow `Precision` enum (HALF=0, SINGLE=1, DOUBLE=2),
209+
// NOT a bit-width — `=== 2` is DOUBLE. Everything else (HALF/SINGLE) maps
210+
// to the thrift FLOAT type.
209211
return arrowType.precision === 2 ? TTypeId.DOUBLE_TYPE : TTypeId.FLOAT_TYPE;
210212
}
211213
if (DataType.isDecimal(arrowType)) return TTypeId.DECIMAL_TYPE;

lib/sea/SeaArrowIpcDurationFix.ts

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,24 @@
3232
* bytes pass through unchanged. We embed the original `Duration` time
3333
* unit (`SECOND`/`MILLISECOND`/`MICROSECOND`/`NANOSECOND`) into the
3434
* rewritten field's `custom_metadata` under the key
35-
* `databricks.arrow.duration_unit` so the JS converter can format the
36-
* Int64 value back into a thrift-equivalent string (e.g.
37-
* `"1 02:03:04.000000000"`).
35+
* `databricks.arrow.duration_unit`.
36+
*
37+
* **Scope on this layer (SEA execution + results, PR 2/3):** the rewrite's
38+
* job here is purely to make the stream *decodable* by apache-arrow@13. The
39+
* resulting Int64 surfaces to callers as a raw microsecond/nanosecond number
40+
* on this layer. The consumer that reads the `duration_unit` marker and
41+
* formats it back into the thrift-equivalent string (e.g.
42+
* `"1 02:03:04.000000000"`) lands in PR 3/3 (#411) — verified against a live
43+
* warehouse to produce byte-identical output to the Thrift path. Until that
44+
* consumer merges, INTERVAL DAY-TIME columns are raw Int64 under SEA.
3845
*
3946
* Why this lives in its own file: the rewriter is the only place in the
4047
* codebase that needs to construct FlatBuffers by hand using the
4148
* `flatbuffers` library; isolating it keeps `SeaArrowIpc.ts` focused on
4249
* the high-level Arrow-decoded views.
4350
*
44-
* @see lib/result/ArrowResultConverter.ts — Phase-1 INTERVAL formatting
45-
* reads the metadata key written here.
51+
* @see lib/result/ArrowResultConverter.ts — the Phase-1 INTERVAL formatter
52+
* (PR 3/3) reads the metadata key written here.
4653
* @see findings/parity-mismatch/round5-implementation-2026-05-15.md —
4754
* original failure mode (`Unrecognized type: "Duration" (18)`).
4855
*/
@@ -234,10 +241,17 @@ function maybeRewriteSchemaMessage(schemaMessageBytes: Buffer): Buffer | null {
234241
return null;
235242
}
236243

237-
// Scan top-level fields and children for Duration. We rewrite only
238-
// top-level Duration fields for M0 (Spark INTERVAL DAY-TIME surfaces
239-
// as a top-level column — children of Struct/List/Map are out of
240-
// scope until we see a real-world payload with nested Duration).
244+
// We rewrite only TOP-LEVEL Duration fields for M0 (Spark INTERVAL
245+
// DAY-TIME surfaces as a top-level column). A Duration nested inside
246+
// STRUCT/ARRAY/MAP is left untouched and apache-arrow@13 then throws
247+
// `Unrecognized type: "Duration" (18)` when decoding the batch.
248+
//
249+
// This is a SHARED apache-arrow@13 limitation, NOT a SEA-specific gap:
250+
// verified against a live warehouse, the Thrift backend throws the
251+
// identical error for `array(INTERVAL '1' SECOND)` — so SEA matches Thrift
252+
// here rather than diverging. Lifting it (recurse the rewrite into
253+
// children) is deferred until there's a real-world nested-Duration payload
254+
// to validate against, and would ideally land on both backends together.
241255
let hasDuration = false;
242256
const fieldsLength = fbSchema.fieldsLength();
243257
for (let i = 0; i < fieldsLength; i += 1) {

lib/sea/SeaOperationBackend.ts

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@ import { OperationStatus, OperationState } from '../contracts/OperationStatus';
4343
import { ResultMetadata, ResultFormat } from '../contracts/ResultMetadata';
4444
import IClientContext from '../contracts/IClientContext';
4545
import Status from '../dto/Status';
46+
import HiveDriverError from '../errors/HiveDriverError';
4647
import ArrowResultConverter from '../result/ArrowResultConverter';
4748
import ResultSlicer from '../result/ResultSlicer';
4849
import SeaResultsProvider from './SeaResultsProvider';
49-
import { arrowSchemaToThriftSchema, decodeIpcSchema } from './SeaArrowIpc';
50+
import { arrowSchemaToThriftSchema, decodeIpcSchema, patchIpcBytes } from './SeaArrowIpc';
51+
import { decodeNapiKernelError } from './SeaErrorMapping';
5052
import { SeaStatement } from './SeaNativeLoader';
5153
import {
5254
SeaStatementHandle,
@@ -126,15 +128,36 @@ export default class SeaOperationBackend implements IOperationBackend {
126128
public async fetchChunk({
127129
limit,
128130
disableBuffering,
131+
isClosed,
129132
}: {
130133
limit: number;
131134
disableBuffering?: boolean;
135+
isClosed?: () => boolean;
132136
}): Promise<Array<object>> {
133137
// Cancel-mid-fetch propagation: if cancel() has flipped the
134138
// lifecycle flag, fail locally without a wire round-trip.
135139
failIfNotActive(this.lifecycle);
136-
const slicer = await this.getResultSlicer();
137-
return slicer.fetchNext({ limit, disableBuffering });
140+
// Cooperative cancel (parity with ThriftOperationBackend): the facade
141+
// supplies `isClosed` and re-checks `failIfClosed()` after we return, so
142+
// bailing with `[]` at a yield point is the contract-correct way for a
143+
// concurrent cancel()/close() to interrupt the fetch.
144+
if (isClosed?.()) {
145+
return [];
146+
}
147+
try {
148+
const slicer = await this.getResultSlicer();
149+
if (isClosed?.()) {
150+
return [];
151+
}
152+
return await slicer.fetchNext({ limit, disableBuffering });
153+
} catch (err) {
154+
// The napi fetch contract leaves the stream in an unspecified state on
155+
// error ("call close() and discard"). Close the statement so the server
156+
// reclaims it promptly — best-effort, so a close failure never masks the
157+
// original fetch error — then surface a typed kernel error.
158+
await seaClose(this.lifecycle, this.statement, this.context, this._id).catch(() => undefined);
159+
throw decodeNapiKernelError(err);
160+
}
138161
}
139162

140163
public async hasMore(): Promise<boolean> {
@@ -153,9 +176,11 @@ export default class SeaOperationBackend implements IOperationBackend {
153176
}
154177
this.metadataPromise = (async () => {
155178
if (!this.statement.schema) {
156-
throw new Error('SeaOperationBackend: statement.schema() is not available on this handle');
179+
throw new HiveDriverError('SeaOperationBackend: statement.schema() is not available on this handle');
157180
}
158-
const arrowSchemaIpc = await this.statement.schema();
181+
// `schema()` is a synchronous napi getter (returns `ArrowSchema`, not a
182+
// Promise) — no `await` needed.
183+
const arrowSchemaIpc = this.statement.schema();
159184
const arrowSchema = decodeIpcSchema(arrowSchemaIpc.ipcBytes);
160185
// `ResultMetadata.schema` keeps the Thrift `TTableSchema` shape for
161186
// back-compat with the public `IOperation.getSchema()` surface.
@@ -166,8 +191,11 @@ export default class SeaOperationBackend implements IOperationBackend {
166191
// both flow through the same Arrow result converter.
167192
resultFormat: ResultFormat.ArrowBased,
168193
lz4Compressed: false,
169-
// Carry the raw Arrow IPC schema bytes for ARROW_BASED consumers.
170-
arrowSchema: arrowSchemaIpc.ipcBytes,
194+
// Carry the *patched* Arrow IPC schema bytes (Duration → Int64 with the
195+
// `duration_unit` marker) so an ARROW_BASED consumer decoding
196+
// `arrowSchema` doesn't hit apache-arrow@13's `Unrecognized type
197+
// "Duration" (18)`. Matches what the per-batch fetch path already does.
198+
arrowSchema: patchIpcBytes(arrowSchemaIpc.ipcBytes),
171199
isStagingOperation: false,
172200
};
173201
this.metadata = meta;
@@ -187,10 +215,15 @@ export default class SeaOperationBackend implements IOperationBackend {
187215
public async status(_progress: boolean): Promise<OperationStatus> {
188216
// Synthesised — the kernel resolves `Statement::execute().await` before
189217
// it hands back a Statement handle, so by the time a SeaOperationBackend
190-
// exists the statement is terminal. Report Cancelled/Closed if the
191-
// lifecycle flag is set, else Succeeded. Returns the backend-neutral
192-
// OperationStatus the IOperationBackend contract expects, so the
193-
// DBSQLOperation facade switches on `state` identically across backends.
218+
// exists the statement is terminal. Note there is intentionally no
219+
// `Failed` arm: a failed execution rejects inside `executeStatement`
220+
// (the kernel surfaces the error at submit), so a `Failed` statement
221+
// never becomes a SeaOperationBackend — `status()` only ever observes
222+
// Succeeded, or Cancelled/Closed from a client-side lifecycle call.
223+
// Report Cancelled/Closed if the lifecycle flag is set, else Succeeded.
224+
// Returns the backend-neutral OperationStatus the IOperationBackend
225+
// contract expects, so the DBSQLOperation facade switches on `state`
226+
// identically across backends.
194227
if (this.lifecycle.isCancelled) {
195228
return { state: OperationState.Cancelled, hasResultSet: true };
196229
}
@@ -227,7 +260,7 @@ export default class SeaOperationBackend implements IOperationBackend {
227260
return this.resultSlicer;
228261
}
229262
if (!this.statement.fetchNextBatch) {
230-
throw new Error('SeaOperationBackend: statement.fetchNextBatch() is not available on this handle');
263+
throw new HiveDriverError('SeaOperationBackend: statement.fetchNextBatch() is not available on this handle');
231264
}
232265
const metadata = await this.getResultMetadata();
233266
// The lifecycle subset has cancel/close only; fetch methods exist on

lib/sea/SeaOperationLifecycle.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -261,10 +261,11 @@ export async function seaFinished(
261261
*/
262262
export function failIfNotActive(state: SeaOperationLifecycleState): void {
263263
if (state.isCancelled) {
264-
throw mapKernelErrorToJsError({
265-
code: 'Cancelled',
266-
message: 'The operation was cancelled.',
267-
});
264+
// Use the canonical `OperationStateError(Canceled)` (message "The operation
265+
// was canceled by a client") rather than a custom string, so the message
266+
// matches the Thrift path verbatim and the code branch stays consistent
267+
// with the Closed case below.
268+
throw new OperationStateError(OperationStateErrorCode.Canceled);
268269
}
269270
if (state.isClosed) {
270271
throw new OperationStateError(OperationStateErrorCode.Closed);

lib/sea/SeaResultsProvider.ts

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import { decodeIpcBatch, patchIpcBytes } from './SeaArrowIpc';
2222
* d.ts) so the loader layer's loose `unknown` typing doesn't force
2323
* unsafe casts at every call site, and so unit tests can pass a stub.
2424
*/
25-
export interface SeaStatementHandle {
25+
export interface SeaFetchHandle {
2626
fetchNextBatch(): Promise<{ ipcBytes: Buffer } | null>;
2727
}
2828

@@ -48,7 +48,7 @@ export interface SeaStatementHandle {
4848
* `SeaArrowIpc.ts:decodeIpcBatch` for the cost rationale.
4949
*/
5050
export default class SeaResultsProvider implements IResultsProvider<ArrowBatch> {
51-
private readonly statement: SeaStatementHandle;
51+
private readonly statement: SeaFetchHandle;
5252

5353
// Prefetched next batch so `hasMore()` can be answered without an
5454
// extra round-trip. Set by `prime()` (lazy) and by `fetchNext`.
@@ -57,7 +57,7 @@ export default class SeaResultsProvider implements IResultsProvider<ArrowBatch>
5757
// Set once the kernel returns `null` from `fetchNextBatch()`.
5858
private exhausted = false;
5959

60-
constructor(statement: SeaStatementHandle) {
60+
constructor(statement: SeaFetchHandle) {
6161
this.statement = statement;
6262
}
6363

@@ -89,29 +89,30 @@ export default class SeaResultsProvider implements IResultsProvider<ArrowBatch>
8989
// to keep one batch buffered ahead so `hasMore` is accurate without
9090
// re-asking the kernel.
9191
private async prime(): Promise<void> {
92-
if (this.exhausted || this.prefetched !== undefined) {
93-
return;
92+
// Loop rather than self-recurse: a long run of empty batches drains
93+
// iteratively toward the null sentinel without building a deep promise
94+
// chain.
95+
while (!this.exhausted && this.prefetched === undefined) {
96+
// eslint-disable-next-line no-await-in-loop
97+
const next = await this.statement.fetchNextBatch();
98+
if (next === null) {
99+
this.exhausted = true;
100+
return;
101+
}
102+
// Patch the raw bytes once: rewrite any Arrow `Duration` field to
103+
// `Int64` with a `databricks.arrow.duration_unit` marker, so that
104+
// apache-arrow@13 (which predates Duration support) can decode the
105+
// stream. `decodeIpcBatch` is told these bytes are already patched;
106+
// the downstream `RecordBatchReader.from` inside `ArrowResultConverter`
107+
// sees the same patched buffer. See `SeaArrowIpcDurationFix.ts`.
108+
const ipcBytes = patchIpcBytes(next.ipcBytes);
109+
const { rowCount } = decodeIpcBatch(ipcBytes, { alreadyPatched: true });
110+
// Skip empty batches — the converter handles them but pre-filtering here
111+
// avoids a round-trip through the converter's prefetch loop. Continue to
112+
// find a non-empty batch or hit exhaustion.
113+
if (rowCount > 0) {
114+
this.prefetched = { batches: [ipcBytes], rowCount };
115+
}
94116
}
95-
const next = await this.statement.fetchNextBatch();
96-
if (next === null) {
97-
this.exhausted = true;
98-
return;
99-
}
100-
// Patch the raw bytes once: rewrite any Arrow `Duration` field to
101-
// `Int64` with a `databricks.arrow.duration_unit` marker, so that
102-
// apache-arrow@13 (which predates Duration support) can decode the
103-
// stream. `decodeIpcBatch` is told these bytes are already patched;
104-
// the downstream `RecordBatchReader.from` inside `ArrowResultConverter`
105-
// sees the same patched buffer. See `SeaArrowIpcDurationFix.ts`.
106-
const ipcBytes = patchIpcBytes(next.ipcBytes);
107-
const { rowCount } = decodeIpcBatch(ipcBytes, { alreadyPatched: true });
108-
if (rowCount === 0) {
109-
// Skip empty batches — the converter handles them but pre-filtering
110-
// here avoids one round-trip through the converter's prefetch loop.
111-
// Re-prime to either find a non-empty batch or hit exhaustion.
112-
await this.prime();
113-
return;
114-
}
115-
this.prefetched = { batches: [ipcBytes], rowCount };
116117
}
117118
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright (c) 2026 Databricks, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/* eslint-disable no-console */
16+
17+
import { expect } from 'chai';
18+
import { DBSQLClient } from '../../../lib';
19+
import { ConnectionOptions } from '../../../lib/contracts/IDBSQLClient';
20+
import { InternalConnectionOptions } from '../../../lib/contracts/InternalConnectionOptions';
21+
22+
// Exercises the SeaArrowIpcDurationFix rewriter against a REAL kernel-produced
23+
// Arrow Duration buffer (Spark INTERVAL DAY-TIME surfaces as Arrow Duration,
24+
// type id 18, which apache-arrow@13 can't decode). Without the rewriter this
25+
// query throws `Unrecognized type: "Duration" (18)`, so a passing fetch proves
26+
// the hand-rolled FlatBuffer schema re-encode + Int64 splice-back ran.
27+
//
28+
// On THIS layer (SEA execution + results, PR 2/3) the converter does not yet
29+
// consume the `duration_unit` marker, so the value surfaces as a raw Int64 —
30+
// asserted here. The Phase-1 formatter that turns it into the thrift string
31+
// "1 02:03:04.000000000" lands in PR 3/3 (#411), where SeaIntervalParity covers
32+
// the formatted output. Requires the pecotesting secrets; skips otherwise.
33+
34+
const DURATION_QUERY = "SELECT INTERVAL '1 02:03:04' DAY TO SECOND AS dt";
35+
36+
interface PecoSecrets {
37+
host: string;
38+
path: string;
39+
token: string;
40+
}
41+
42+
function readSecrets(): PecoSecrets | null {
43+
const host = process.env.DATABRICKS_PECOTESTING_SERVER_HOSTNAME;
44+
const path = process.env.DATABRICKS_PECOTESTING_HTTP_PATH;
45+
const token = process.env.DATABRICKS_PECOTESTING_TOKEN_PERSONAL;
46+
if (!host || !path || !token) return null;
47+
return { host, path, token };
48+
}
49+
50+
async function fetchOneRow(useSEA: boolean, secrets: PecoSecrets): Promise<Record<string, unknown>> {
51+
const client = new DBSQLClient();
52+
await client.connect({
53+
host: secrets.host,
54+
path: secrets.path,
55+
token: secrets.token,
56+
useSEA,
57+
} as ConnectionOptions & InternalConnectionOptions);
58+
try {
59+
const session = await client.openSession();
60+
try {
61+
const operation = await session.executeStatement(DURATION_QUERY);
62+
try {
63+
const rows = (await operation.fetchAll()) as Array<Record<string, unknown>>;
64+
return rows[0];
65+
} finally {
66+
await operation.close();
67+
}
68+
} finally {
69+
await session.close();
70+
}
71+
} finally {
72+
await client.close();
73+
}
74+
}
75+
76+
describe('SEA INTERVAL DAY-TIME (Arrow Duration rewriter) end-to-end', function suite() {
77+
this.timeout(120_000);
78+
79+
const secrets = readSecrets();
80+
81+
before(function gate() {
82+
if (!secrets) {
83+
// eslint-disable-next-line no-invalid-this
84+
this.skip();
85+
}
86+
});
87+
88+
it('decodes a real Arrow Duration column without the "Duration (18)" crash (rewriter ran)', async () => {
89+
const row = await fetchOneRow(true, secrets as PecoSecrets);
90+
// A returned row at all means the schema rewrite + Int64 splice-back ran;
91+
// an un-rewritten Duration would have thrown during RecordBatchReader.from.
92+
expect(row).to.have.property('dt');
93+
expect(row.dt, 'INTERVAL DAY-TIME value should be present').to.not.equal(undefined);
94+
});
95+
96+
it('surfaces the value as a raw Int64 on this layer (formatter is PR 3/3)', async () => {
97+
const row = await fetchOneRow(true, secrets as PecoSecrets);
98+
// Documents the M0/2-of-3 contract: the rewriter makes the column
99+
// *decodable* but the duration_unit formatter is not wired here yet, so the
100+
// value is the raw integer microsecond/nanosecond count, not the thrift
101+
// "1 02:03:04.000000000" string. (#411 flips this to the formatted string.)
102+
expect(['number', 'bigint']).to.include(typeof row.dt);
103+
});
104+
});

0 commit comments

Comments
 (0)