Skip to content

Commit 2f8eece

Browse files
committed
fix(sea): address #410 review — error fidelity, fetch perf, validation, tests
Addresses the 8 review threads on PR #410. Validated against a live warehouse (results-e2e parity gate + interval-duration-e2e + execution-e2e all pass) plus new/updated unit tests. - SeaOperationLifecycle: rethrowKernelError now delegates to the canonical decodeNapiKernelError, so cancel/close errors get the same fidelity as fetch errors — the sqlState remap (envelope field is `sqlState`, the old code read `sqlstate` and dropped it), the kernelMetadata namespace, and the strict `startsWith` sentinel match (was a loose `indexOf >= 0`). - SeaArrowIpc: replace decodeIpcBatch (full RecordBatchReader materialization just to sum row counts) with countRowsInIpc, which reads RecordBatch header `length` via MessageReader and skips bodies — no vector decode. Removes ~2x Arrow decode CPU + transient allocation on the fetch hot path (the converter still re-decodes for values). SeaResultsProvider switched to it. - SeaArrowIpc: hermetic unit tests (tests/unit/sea/SeaArrowIpc.test.ts) for the framing walk, no-op/garbage rewrite paths, the row-count path, and the empty-schema guard. (The Duration-positive rewrite stays covered by the live e2e — apache-arrow@13 can't construct a Duration column hermetically.) - SeaOperationBackend: on a fetch-error cleanup close() that also fails, log the failure at warn (statement may leak) instead of fully swallowing it — the original fetch error is still surfaced. - SeaSessionBackend: reject queryTags / useLZ4Compression / stagingAllowedLocalPath with M0-style errors instead of silently ignoring them (+ unit tests). Silent no-ops are the worst failure mode for callers. - SeaArrowIpc: throw a typed HiveDriverError (not a raw TypeError) when an IPC payload carries no schema. - SeaArrowIpcDurationFix.readMessageAt: fail-closed guards for negative metadataLength / bodyLength (was relying on subarray clamping). - Fix stale `tests/integration/sea/...` doc refs → `tests/e2e/sea/`. Co-authored-by: Isaac Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent bebf88f commit 2f8eece

8 files changed

Lines changed: 238 additions & 67 deletions

lib/sea/SeaArrowIpc.ts

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
import { RecordBatchReader, Schema, Field, DataType, TypeMap } from 'apache-arrow';
15+
import { RecordBatchReader, MessageReader, MessageHeader, Schema, Field, DataType, TypeMap } from 'apache-arrow';
1616
import { TTableSchema, TTypeId, TPrimitiveTypeEntry } from '../../thrift/TCLIService_types';
1717
import { rewriteDurationToInt64, DURATION_UNIT_METADATA_KEY } from './SeaArrowIpcDurationFix';
18+
import HiveDriverError from '../errors/HiveDriverError';
1819

1920
/**
2021
* Field metadata key used by the kernel to attach the original Databricks
@@ -23,43 +24,46 @@ import { rewriteDurationToInt64, DURATION_UNIT_METADATA_KEY } from './SeaArrowIp
2324
const DATABRICKS_TYPE_NAME = 'databricks.type_name';
2425

2526
/**
26-
* Decode an Arrow IPC stream payload (schema header + zero-or-more
27-
* record-batch messages) into its row count.
27+
* Sum the row counts of every RecordBatch message in an Arrow IPC
28+
* stream, WITHOUT materializing the Arrow vector tree.
2829
*
29-
* Returns `{ schema, rowCount }`. The schema is left intact as the
30-
* apache-arrow Schema object so callers can reuse it; the rowCount is
31-
* the sum of `RecordBatch.numRows` across every record-batch message
32-
* in the stream.
30+
* Why this exists: `ArrowResultConverter` consumes `ArrowBatch` objects
31+
* that carry an explicit `rowCount`, but the kernel's IPC payload only
32+
* carries per-RecordBatch `length` (no separate total). `SeaResultsProvider`
33+
* needs that count to build the `ArrowBatch` it hands to the converter —
34+
* which then re-decodes the same bytes for the actual values.
3335
*
34-
* Why we parse upfront: `ArrowResultConverter` consumes `ArrowBatch`
35-
* objects which carry an explicit `rowCount`. The kernel's IPC payload
36-
* does not carry a separate count — only per-RecordBatch numRows. We
37-
* walk the messages once to sum them so the converter sees the same
38-
* shape as the thrift path (`ArrowResultHandler.fetchNext` at
39-
* `lib/result/ArrowResultHandler.ts:55`).
36+
* The previous implementation used `RecordBatchReader` and iterated the
37+
* batches, which calls `_loadVectors` and materializes the full vector
38+
* tree for every batch just to read `numRows` and discard everything
39+
* else — ~2x Arrow decode CPU + transient allocation on the fetch hot
40+
* path. `MessageReader` instead reads only each message's FlatBuffer
41+
* metadata header (where `RecordBatch.length` lives) and skips the body
42+
* bytes, so no vectors are decoded here. The converter's later re-decode
43+
* is the only real materialization.
4044
*
41-
* Re-parsing inside the converter is unavoidable because `RecordBatch`
42-
* instances created here cannot be passed across the converter's
43-
* `Buffer[]` boundary without rewriting the converter. Callers that already
44-
* patched the IPC bytes can set `alreadyPatched` to avoid running the
45-
* FlatBuffer rewrite twice on the hot fetch path.
45+
* `ipcBytes` must already be Duration-patched (row count is unaffected
46+
* by the Duration→Int64 rewrite, and the framing is unchanged). Returns
47+
* 0 for an empty / schema-only stream.
4648
*/
47-
export function decodeIpcBatch(
48-
ipcBytes: Buffer,
49-
options: { alreadyPatched?: boolean } = {},
50-
): { schema: Schema<TypeMap>; rowCount: number } {
51-
const patched = options.alreadyPatched ? ipcBytes : rewriteDurationToInt64(ipcBytes);
52-
const reader = RecordBatchReader.from<TypeMap>(patched);
53-
// Eagerly open so `schema` is populated.
54-
reader.open();
55-
const { schema } = reader;
56-
49+
export function countRowsInIpc(ipcBytes: Buffer): number {
50+
const reader = new MessageReader(ipcBytes);
5751
let rowCount = 0;
58-
// Iterate all record batches in the stream and sum row counts.
59-
for (const batch of reader) {
60-
rowCount += batch.numRows;
52+
for (const message of reader) {
53+
if (message.headerType === MessageHeader.RecordBatch) {
54+
// header() for a RecordBatch message carries `length` (the row
55+
// count) in the FlatBuffer metadata — no body decode needed.
56+
rowCount += Number((message.header() as { length: number | bigint }).length);
57+
}
58+
// Advance past the (undecoded) body so the next message reads at the
59+
// correct offset. readMessageBody returns a view; it does not decode
60+
// the body into Arrow vectors.
61+
const bodyLength = Number(message.bodyLength);
62+
if (bodyLength > 0) {
63+
reader.readMessageBody(bodyLength);
64+
}
6165
}
62-
return { schema, rowCount };
66+
return rowCount;
6367
}
6468

6569
/**
@@ -70,6 +74,14 @@ export function decodeIpcSchema(ipcBytes: Buffer): Schema<TypeMap> {
7074
const patched = rewriteDurationToInt64(ipcBytes);
7175
const reader = RecordBatchReader.from<TypeMap>(patched);
7276
reader.open();
77+
// `RecordBatchReader.from(emptyBuffer).open()` does not throw — it
78+
// leaves `schema` undefined. Without this guard a downstream
79+
// `arrowSchemaToThriftSchema(undefined)` would hit `undefined.fields`
80+
// and surface a raw TypeError instead of a typed driver error. The
81+
// real kernel always materialises a schema, so this is defensive.
82+
if (!reader.schema) {
83+
throw new HiveDriverError('SEA result: Arrow IPC stream carried no schema (empty or truncated payload)');
84+
}
7385
return reader.schema;
7486
}
7587

lib/sea/SeaArrowIpcDurationFix.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,12 @@ function readMessageAt(
196196
cursor += 4;
197197
}
198198

199-
if (metadataLength === 0) {
199+
// A zero or negative length (other than the -1 continuation sentinel
200+
// handled above) is malformed. Reject by intent rather than relying on
201+
// `subarray` clamping: a negative `metadataLength` would make
202+
// `metadataEnd < cursor`, which silently passes the `> byteLength`
203+
// upper-bound check below.
204+
if (metadataLength <= 0) {
200205
return null;
201206
}
202207

@@ -211,6 +216,12 @@ function readMessageAt(
211216
const message = Message.getRootAsMessage(bb);
212217

213218
const bodyLength = Number(message.bodyLength());
219+
// Same fail-closed intent as metadataLength: a negative body length
220+
// (server-controlled) would make `bodyEnd < bodyStart` and slip past
221+
// the upper-bound check.
222+
if (bodyLength < 0) {
223+
return null;
224+
}
214225
const bodyStart = metadataEnd;
215226
const bodyEnd = bodyStart + bodyLength;
216227
if (bodyEnd > view.byteLength) {

lib/sea/SeaOperationBackend.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
* `ArrowResultConverter` (Phase 1 + Phase 2; reused unchanged) →
2222
* `ResultSlicer` (chunk-size normalisation; reused unchanged). The M0
2323
* row shape is byte-identical to the thrift path for every M0
24-
* datatype (parity gate exercised by `tests/integration/sea/results-e2e.test.ts`).
24+
* datatype (parity gate exercised by `tests/e2e/sea/results-e2e.test.ts`).
2525
*
2626
* - **Lifecycle (from sea-operation):** `cancel()` / `close()` /
2727
* `finished()` (alias of `waitUntilReady`) delegate to the helpers
@@ -42,6 +42,7 @@ import IOperationBackend, { IOperationBackendWaitOptions } from '../contracts/IO
4242
import { OperationStatus, OperationState } from '../contracts/OperationStatus';
4343
import { ResultMetadata, ResultFormat } from '../contracts/ResultMetadata';
4444
import IClientContext from '../contracts/IClientContext';
45+
import { LogLevel } from '../contracts/IDBSQLLogger';
4546
import Status from '../dto/Status';
4647
import HiveDriverError from '../errors/HiveDriverError';
4748
import ArrowResultConverter from '../result/ArrowResultConverter';
@@ -155,7 +156,22 @@ export default class SeaOperationBackend implements IOperationBackend {
155156
// error ("call close() and discard"). Close the statement so the server
156157
// reclaims it promptly — best-effort, so a close failure never masks the
157158
// original fetch error — then surface a typed kernel error.
158-
await seaClose(this.lifecycle, this.statement, this.context, this._id).catch(() => undefined);
159+
//
160+
// If close() ALSO fails, seaClose has reset isClosed back to false and
161+
// the kernel-side statement handle is now leaked (the stream is already
162+
// wedged, so nothing downstream forces another close). We still don't
163+
// mask the original fetch error, but log the close failure at warn so
164+
// the leak is diagnosable rather than completely invisible.
165+
await seaClose(this.lifecycle, this.statement, this.context, this._id).catch((closeErr) => {
166+
const cause = closeErr instanceof Error ? closeErr.message : String(closeErr);
167+
this.context
168+
.getLogger()
169+
.log(
170+
LogLevel.warn,
171+
`SEA fetch-error cleanup: close() failed for operation ${this._id}; the server-side ` +
172+
`statement may leak until the session is closed. Cause: ${cause}`,
173+
);
174+
});
159175
throw decodeNapiKernelError(err);
160176
}
161177
}

lib/sea/SeaOperationLifecycle.ts

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import Status from '../dto/Status';
4747
import { OperationStatus, OperationState } from '../contracts/OperationStatus';
4848
import { LogLevel } from '../contracts/IDBSQLLogger';
4949
import IClientContext from '../contracts/IClientContext';
50-
import { mapKernelErrorToJsError, KernelErrorShape } from './SeaErrorMapping';
50+
import { decodeNapiKernelError } from './SeaErrorMapping';
5151
import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError';
5252

5353
/**
@@ -89,33 +89,18 @@ export function createLifecycleState(): SeaOperationLifecycleState {
8989

9090
/**
9191
* Normalise an error thrown by the napi `Statement` into one of the
92-
* driver's typed error classes. The binding surfaces kernel errors as
93-
* a JSON envelope on `napi::Error.reason` with the sentinel prefix
94-
* `__databricks_error__:` (see the napi-binding round 2 findings,
95-
* section "JSON-envelope error reason"). If we can parse out a kernel
96-
* payload, we route it through `mapKernelErrorToJsError`; otherwise
97-
* the original error is rethrown unchanged.
92+
* driver's typed error classes, then throw it.
93+
*
94+
* Delegates to the canonical {@link decodeNapiKernelError} so cancel /
95+
* close errors get exactly the same fidelity as fetch errors: the
96+
* `sqlState` remap (the envelope field is `sqlState`, not `sqlstate`),
97+
* the `kernelMetadata` namespace (vendorCode / httpStatus / retryable /
98+
* queryId), and the strict `startsWith` sentinel match. The previous
99+
* hand-rolled reimplementation here dropped SQLSTATE and metadata and
100+
* used a looser substring match.
98101
*/
99102
function rethrowKernelError(err: unknown): never {
100-
if (err instanceof Error && typeof err.message === 'string') {
101-
const sentinel = '__databricks_error__:';
102-
const idx = err.message.indexOf(sentinel);
103-
if (idx >= 0) {
104-
const json = err.message.slice(idx + sentinel.length);
105-
let parsed: KernelErrorShape | undefined;
106-
try {
107-
parsed = JSON.parse(json) as KernelErrorShape;
108-
} catch {
109-
// Malformed envelope — fall through and rethrow the original
110-
// below; we never silently drop a kernel error.
111-
parsed = undefined;
112-
}
113-
if (parsed) {
114-
throw mapKernelErrorToJsError(parsed);
115-
}
116-
}
117-
}
118-
throw err;
103+
throw decodeNapiKernelError(err);
119104
}
120105

121106
/**

lib/sea/SeaResultsProvider.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
import IResultsProvider, { ResultsProviderFetchNextOptions } from '../result/IResultsProvider';
1616
import { ArrowBatch } from '../result/utils';
17-
import { decodeIpcBatch, patchIpcBytes } from './SeaArrowIpc';
17+
import { countRowsInIpc, patchIpcBytes } from './SeaArrowIpc';
1818

1919
/**
2020
* The minimal slice of the napi-binding `Statement` class that we
@@ -102,11 +102,15 @@ export default class SeaResultsProvider implements IResultsProvider<ArrowBatch>
102102
// Patch the raw bytes once: rewrite any Arrow `Duration` field to
103103
// `Int64` with a `databricks.arrow.duration_unit` marker, so that
104104
// apache-arrow@13 (which predates Duration support) can decode the
105-
// stream. `decodeIpcBatch` is told these bytes are already patched;
106-
// the downstream `RecordBatchReader.from` inside `ArrowResultConverter`
107-
// sees the same patched buffer. See `SeaArrowIpcDurationFix.ts`.
105+
// stream. The downstream `RecordBatchReader.from` inside
106+
// `ArrowResultConverter` sees the same patched buffer. See
107+
// `SeaArrowIpcDurationFix.ts`.
108108
const ipcBytes = patchIpcBytes(next.ipcBytes);
109-
const { rowCount } = decodeIpcBatch(ipcBytes, { alreadyPatched: true });
109+
// Row count only — `countRowsInIpc` reads the RecordBatch metadata
110+
// headers without materializing vectors (the converter re-decodes
111+
// the bytes for the actual values). Avoids a full second Arrow
112+
// decode on the fetch hot path.
113+
const rowCount = countRowsInIpc(ipcBytes);
110114
// Skip empty batches — the converter handles them but pre-filtering here
111115
// avoids a round-trip through the converter's prefetch loop. Continue to
112116
// find a non-empty batch or hit exhaustion.

lib/sea/SeaSessionBackend.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,25 @@ export default class SeaSessionBackend implements ISessionBackend {
116116
'SEA executeStatement: useCloudFetch is controlled by the kernel result configuration and is not a per-statement option on SEA',
117117
);
118118
}
119+
// Reject — rather than silently ignore — the remaining Thrift-path
120+
// options the SEA M0 backend does not honor. Silently dropping them
121+
// is the worst failure mode for an agent/caller: passing e.g.
122+
// `queryTags` or `useLZ4Compression` would no-op with zero signal.
123+
// (`maxRows` is intentionally NOT here — the facade applies it at
124+
// fetch time.)
125+
if (options.queryTags !== undefined) {
126+
throw new HiveDriverError('SEA executeStatement: queryTags is not supported in M0 (deferred to M1)');
127+
}
128+
if (options.useLZ4Compression !== undefined) {
129+
throw new HiveDriverError(
130+
'SEA executeStatement: useLZ4Compression is not supported on SEA (result compression is governed by the kernel)',
131+
);
132+
}
133+
if (options.stagingAllowedLocalPath !== undefined) {
134+
throw new HiveDriverError(
135+
'SEA executeStatement: stagingAllowedLocalPath (volume operations) is not supported in M0 (deferred to M1)',
136+
);
137+
}
119138

120139
let nativeStatement;
121140
try {

tests/unit/sea/SeaArrowIpc.test.ts

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright (c) 2026 Databricks, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import { expect } from 'chai';
16+
import { tableFromArrays, RecordBatchStreamWriter } from 'apache-arrow';
17+
import { countRowsInIpc, decodeIpcSchema, patchIpcBytes } from '../../../lib/sea/SeaArrowIpc';
18+
import { rewriteDurationToInt64 } from '../../../lib/sea/SeaArrowIpcDurationFix';
19+
import HiveDriverError from '../../../lib/errors/HiveDriverError';
20+
21+
// Hermetic coverage for the SEA Arrow-IPC layer. apache-arrow@13 cannot
22+
// construct a `Duration` column, so the Duration-positive rewrite path is
23+
// covered by the live e2e (tests/e2e/sea/interval-duration-e2e.test.ts).
24+
// These tests pin everything reachable without a warehouse: the IPC
25+
// framing walk, the no-op / malformed-input handling, the cheap
26+
// row-count path, and the schema-decode guard.
27+
28+
/** Build a multi-message Arrow IPC stream from arrays of per-batch row data. */
29+
function makeIpcStream(batchRows: number[][]): Buffer {
30+
const writer = new RecordBatchStreamWriter();
31+
for (const rows of batchRows) {
32+
const table = tableFromArrays({ a: Int32Array.from(rows) });
33+
writer.write(table.batches[0]);
34+
}
35+
writer.finish();
36+
return Buffer.from(writer.toUint8Array(true));
37+
}
38+
39+
/** Schema-only IPC stream (no record batches). */
40+
function makeSchemaOnlyStream(): Buffer {
41+
const writer = new RecordBatchStreamWriter();
42+
const table = tableFromArrays({ a: Int32Array.from([1]) });
43+
// Start the stream (emits the schema) then finish without writing a batch.
44+
writer.reset(undefined, table.schema);
45+
writer.finish();
46+
return Buffer.from(writer.toUint8Array(true));
47+
}
48+
49+
describe('SeaArrowIpc.countRowsInIpc', () => {
50+
it('sums RecordBatch row counts across messages', () => {
51+
const ipc = makeIpcStream([
52+
[1, 2, 3],
53+
[4, 5],
54+
]);
55+
expect(countRowsInIpc(ipc)).to.equal(5);
56+
});
57+
58+
it('returns 0 for a schema-only stream (no record batches)', () => {
59+
expect(countRowsInIpc(makeSchemaOnlyStream())).to.equal(0);
60+
});
61+
62+
it('counts a single-batch stream', () => {
63+
expect(countRowsInIpc(makeIpcStream([[10, 20, 30, 40]]))).to.equal(4);
64+
});
65+
});
66+
67+
describe('SeaArrowIpc.rewriteDurationToInt64 (no-Duration / malformed paths)', () => {
68+
it('is a no-op for a stream with no Duration field (returns input unchanged)', () => {
69+
const ipc = makeIpcStream([[1, 2, 3]]);
70+
const out = rewriteDurationToInt64(ipc);
71+
expect(out.equals(ipc)).to.equal(true);
72+
});
73+
74+
it('returns the input unchanged for an empty buffer (no throw)', () => {
75+
const empty = Buffer.alloc(0);
76+
const out = rewriteDurationToInt64(empty);
77+
expect(out.byteLength).to.equal(0);
78+
});
79+
80+
it('does not throw on garbage / truncated bytes (fail-closed framing)', () => {
81+
// Random bytes are not a valid IPC stream; readMessageAt must reject
82+
// them (the negative-length and bounds guards) rather than crash.
83+
const garbage = Buffer.from([0x01, 0x02, 0x03, 0x04, 0xff, 0xff, 0xff, 0xff]);
84+
expect(() => rewriteDurationToInt64(garbage)).to.not.throw();
85+
});
86+
87+
it('patchIpcBytes is byte-identical to the input when no Duration is present', () => {
88+
const ipc = makeIpcStream([[7, 8]]);
89+
expect(patchIpcBytes(ipc).equals(ipc)).to.equal(true);
90+
});
91+
});
92+
93+
describe('SeaArrowIpc.decodeIpcSchema', () => {
94+
it('decodes the schema of a normal stream', () => {
95+
const schema = decodeIpcSchema(makeIpcStream([[1]]));
96+
expect(schema.fields.map((f) => f.name)).to.deep.equal(['a']);
97+
});
98+
99+
it('throws a typed HiveDriverError (not a raw TypeError) on an empty payload', () => {
100+
expect(() => decodeIpcSchema(Buffer.alloc(0))).to.throw(HiveDriverError);
101+
});
102+
});

0 commit comments

Comments
 (0)