Skip to content

Commit 6f80c11

Browse files
committed
fix(sea): cascade operation review fixes
1 parent c213241 commit 6f80c11

12 files changed

Lines changed: 260 additions & 116 deletions

lib/result/ArrowResultConverter.ts

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ type ArrowSchemaField = Field<DataType<Type, TypeMap>>;
3232
* thrift-path which has no SEA awareness.
3333
*/
3434
const DURATION_UNIT_METADATA_KEY = 'databricks.arrow.duration_unit';
35+
const ZERO_BIGINT = BigInt(0);
36+
const NS_PER_MICRO = BigInt(1_000);
37+
const NS_PER_MILLI = BigInt(1_000_000);
38+
const NS_PER_SEC = BigInt(1_000_000_000);
39+
const MS_PER_DAY = BigInt(86_400_000);
40+
const NS_PER_MIN = NS_PER_SEC * BigInt(60);
41+
const NS_PER_HOUR = NS_PER_MIN * BigInt(60);
42+
const NS_PER_DAY = NS_PER_HOUR * BigInt(24);
3543

3644
/**
3745
* Format an Arrow `Interval[YearMonth]` or `Interval[DayTime]` value
@@ -63,8 +71,8 @@ function formatArrowInterval(value: any, valueType: any): string {
6371
// We re-normalise: total milliseconds = a * 86_400_000 + b, then split into
6472
// days, hours, minutes, seconds, nanoseconds (nanoseconds is always 0
6573
// because the legacy IntervalDayTime carries only millisecond precision).
66-
const totalMs = BigInt(a) * BigInt(86_400_000) + BigInt(b);
67-
return formatDayTimeFromTotal(totalMs * BigInt(1_000_000) /* → ns */, 'NANOSECOND');
74+
const totalMs = BigInt(a) * MS_PER_DAY + BigInt(b);
75+
return formatDayTimeFromTotal(totalMs * NS_PER_MILLI /* → ns */, 'NANOSECOND');
6876
}
6977

7078
/**
@@ -113,11 +121,11 @@ function formatDurationToIntervalDayTime(value: bigint | number, unit: string):
113121
function toNanoseconds(value: bigint, unit: string): bigint {
114122
switch (unit) {
115123
case 'SECOND':
116-
return value * BigInt(1_000_000_000);
124+
return value * NS_PER_SEC;
117125
case 'MILLISECOND':
118-
return value * BigInt(1_000_000);
126+
return value * NS_PER_MILLI;
119127
case 'MICROSECOND':
120-
return value * BigInt(1_000);
128+
return value * NS_PER_MICRO;
121129
case 'NANOSECOND':
122130
default:
123131
return value;
@@ -136,14 +144,8 @@ function toNanoseconds(value: bigint, unit: string): bigint {
136144
* for future use if a unit-aware precision is ever needed.
137145
*/
138146
function formatDayTimeFromTotal(totalNanos: bigint, _unit: string): string {
139-
const ZERO = BigInt(0);
140-
const sign = totalNanos < ZERO ? '-' : '';
141-
const abs = totalNanos < ZERO ? -totalNanos : totalNanos;
142-
143-
const NS_PER_SEC = BigInt(1_000_000_000);
144-
const NS_PER_MIN = NS_PER_SEC * BigInt(60);
145-
const NS_PER_HOUR = NS_PER_MIN * BigInt(60);
146-
const NS_PER_DAY = NS_PER_HOUR * BigInt(24);
147+
const sign = totalNanos < ZERO_BIGINT ? '-' : '';
148+
const abs = totalNanos < ZERO_BIGINT ? -totalNanos : totalNanos;
147149

148150
const days = abs / NS_PER_DAY;
149151
let rem = abs % NS_PER_DAY;

lib/sea/SeaArrowIpc.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,15 @@ const DATABRICKS_TYPE_NAME = 'databricks.type_name';
4040
*
4141
* Re-parsing inside the converter is unavoidable because `RecordBatch`
4242
* instances created here cannot be passed across the converter's
43-
* `Buffer[]` boundary without rewriting the converter. The IPC bytes
44-
* themselves are small enough (one record batch per call) that the
45-
* double-parse cost is negligible for M0.
43+
* `Buffer[]` boundary without rewriting the converter. Callers that already
44+
* patched the IPC bytes can set `alreadyPatched` to avoid running the
45+
* FlatBuffer rewrite twice on the hot fetch path.
4646
*/
47-
export function decodeIpcBatch(ipcBytes: Buffer): { schema: Schema<TypeMap>; rowCount: number } {
48-
const patched = rewriteDurationToInt64(ipcBytes);
47+
export function decodeIpcBatch(
48+
ipcBytes: Buffer,
49+
options: { alreadyPatched?: boolean } = {},
50+
): { schema: Schema<TypeMap>; rowCount: number } {
51+
const patched = options.alreadyPatched ? ipcBytes : rewriteDurationToInt64(ipcBytes);
4952
const reader = RecordBatchReader.from<TypeMap>(patched);
5053
// Eagerly open so `schema` is populated.
5154
reader.open();

lib/sea/SeaArrowIpcDurationFix.ts

Lines changed: 2 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -251,62 +251,8 @@ function maybeRewriteSchemaMessage(schemaMessageBytes: Buffer): Buffer | null {
251251
return null;
252252
}
253253

254-
// Snapshot the (name, originalTypeType, durationUnit, originalCustomMetadata)
255-
// for every field, then rebuild the schema using the flatbuffer builder.
256-
type FieldSnapshot = {
257-
name: string;
258-
nullable: boolean;
259-
isDuration: boolean;
260-
durationUnit?: number; // FbTimeUnit
261-
/** Preserved metadata key→value pairs (we add ours on top for Duration). */
262-
metadata: Array<[string, string]>;
263-
/** Raw bytes for the original field if no rewrite needed; we'll re-encode it. */
264-
typeType: number;
265-
/** Pre-decoded type sub-table bytes for non-Duration fields. */
266-
// For M0 we only rewrite Duration; other fields we re-create with the
267-
// same primitive type. To keep the rewriter narrow, we only support
268-
// schemas where non-Duration fields use type sub-tables that can be
269-
// round-tripped via Field.decode → re-encode through flatbuffers'
270-
// SizedByteArray serialization. That's complex, so instead we use
271-
// a different approach: copy the raw FlatBuffer field offset
272-
// directly when no rewrite is needed (handled by the
273-
// copy-field-by-reference path below).
274-
};
275-
// We can't simply "copy field by reference" across FlatBuffer
276-
// builders, so we have to re-encode every field. For non-Duration
277-
// fields, we re-encode using the apache-arrow `fb/*` accessors.
278-
// That requires touching every existing supported type.
279-
//
280-
// To keep this rewriter narrow and DRY, we take a different
281-
// approach: in-place patch. We do NOT rebuild the FlatBuffer.
282-
// Instead, we mutate the field's `type_type` byte from Duration(18)
283-
// to Int(2), and we point its `type` offset at a freshly-appended
284-
// Int sub-table that we splice into the message bytes. Then we
285-
// append a fresh `KeyValue` for `databricks.arrow.duration_unit`
286-
// into the field's `custom_metadata` vector. This avoids re-encoding
287-
// every other field.
288-
//
289-
// FlatBuffer in-place mutation is tricky because tables have vtables
290-
// and offsets are 32-bit relative pointers. The fields we need to
291-
// change are:
292-
// 1. Field.type_type (1-byte enum at vtable slot for field #2):
293-
// mutate the byte from 18 → 2. Same width, safe to overwrite.
294-
// 2. Field.type (4-byte relative offset to the type sub-table):
295-
// change the offset to point at our appended Int sub-table.
296-
// Same width, safe to overwrite.
297-
// 3. Field.custom_metadata (4-byte relative offset to vector):
298-
// either rewrite the existing vector to add our entry, or
299-
// append a new vector and update the offset.
300-
//
301-
// Because relative offsets are forward-only in FlatBuffers (offset is
302-
// distance from the storage location to the target), and our
303-
// appended sub-tables live AFTER the storage location, the math
304-
// works out. We append to a growing byte buffer and patch the
305-
// existing offset fields to point at the new tail.
306-
307-
// Bail back to the full rebuild approach; in-place patching of
308-
// arbitrary vtable layouts is fragile (vtables may share storage
309-
// across fields). Re-encode the whole schema.
254+
// Re-encode the whole schema. This is more verbose than an in-place
255+
// FlatBuffer patch, but it avoids relying on vtable layout details.
310256
return rebuildSchemaWithDurationRewritten(message, fbSchema);
311257
}
312258

lib/sea/SeaBackend.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ export default class SeaBackend implements IBackend {
119119
return new SeaSessionBackend({
120120
connection: nativeConnection!,
121121
context: this.context,
122+
id: nativeConnection!.sessionId,
122123
});
123124
}
124125

lib/sea/SeaNativeLoader.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ export interface SeaArrowSchema {
7070
* `await` them without `any` casts.
7171
*/
7272
export interface SeaNativeStatement {
73+
readonly statementId: string;
7374
fetchNextBatch(): Promise<SeaArrowBatch | null>;
7475
schema(): Promise<SeaArrowSchema>;
7576
cancel(): Promise<void>;
@@ -80,6 +81,7 @@ export interface SeaNativeStatement {
8081
* Typed surface for the opaque napi `Connection` handle.
8182
*/
8283
export interface SeaNativeConnection {
84+
readonly sessionId: string;
8385
/**
8486
* Execute a SQL statement. Catalog / schema / sessionConf are
8587
* session-level — set on `openSession`, applied to every statement

lib/sea/SeaOperationBackend.ts

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,14 @@ import { v4 as uuidv4 } from 'uuid';
4040
import {
4141
TGetOperationStatusResp,
4242
TGetResultSetMetadataResp,
43-
TOperationState,
4443
TSparkRowSetType,
4544
TStatusCode,
4645
TTableSchema,
4746
} from '../../thrift/TCLIService_types';
4847
import IOperationBackend from '../contracts/IOperationBackend';
4948
import IClientContext from '../contracts/IClientContext';
49+
import { OperationState, OperationStatus } from '../contracts/OperationStatus';
50+
import { ResultFormat, ResultMetadata } from '../contracts/ResultMetadata';
5051
import Status from '../dto/Status';
5152
import ArrowResultConverter from '../result/ArrowResultConverter';
5253
import ResultSlicer from '../result/ResultSlicer';
@@ -148,7 +149,17 @@ export default class SeaOperationBackend implements IOperationBackend {
148149
return slicer.hasMore();
149150
}
150151

151-
public async getResultMetadata(): Promise<TGetResultSetMetadataResp> {
152+
public async getResultMetadata(): Promise<ResultMetadata> {
153+
const metadata = await this.thriftResultMetadataResponse();
154+
return {
155+
schema: metadata.schema,
156+
resultFormat: ResultFormat.ArrowBased,
157+
lz4Compressed: metadata.lz4Compressed,
158+
isStagingOperation: Boolean(metadata.isStagingOperation),
159+
};
160+
}
161+
162+
private async thriftResultMetadataResponse(): Promise<TGetResultSetMetadataResp> {
152163
failIfNotActive(this.lifecycle);
153164
if (this.metadata) {
154165
return this.metadata;
@@ -187,28 +198,25 @@ export default class SeaOperationBackend implements IOperationBackend {
187198
// Status / lifecycle (owned by the sea-operation lifecycle helpers).
188199
// ---------------------------------------------------------------------------
189200

190-
public async status(_progress: boolean): Promise<TGetOperationStatusResp> {
201+
public async status(_progress: boolean): Promise<OperationStatus> {
191202
// Synthesised — kernel only surfaces terminal-or-running statements
192203
// through its public API; we report CANCELED/CLOSED if the lifecycle
193204
// flag is set, else FINISHED. Matches the Thrift status shape so
194205
// facade-level callers see consistent telemetry across backends.
195206
if (this.lifecycle.isCancelled) {
196207
return {
197-
status: { statusCode: TStatusCode.SUCCESS_STATUS },
198-
operationState: TOperationState.CANCELED_STATE,
208+
state: OperationState.Cancelled,
199209
hasResultSet: true,
200210
};
201211
}
202212
if (this.lifecycle.isClosed) {
203213
return {
204-
status: { statusCode: TStatusCode.SUCCESS_STATUS },
205-
operationState: TOperationState.CLOSED_STATE,
214+
state: OperationState.Closed,
206215
hasResultSet: true,
207216
};
208217
}
209218
return {
210-
status: { statusCode: TStatusCode.SUCCESS_STATUS },
211-
operationState: TOperationState.FINISHED_STATE,
219+
state: OperationState.Succeeded,
212220
hasResultSet: true,
213221
};
214222
}
@@ -245,7 +253,7 @@ export default class SeaOperationBackend implements IOperationBackend {
245253
if (!this.statement.fetchNextBatch) {
246254
throw new Error('SeaOperationBackend: statement.fetchNextBatch() is not available on this handle');
247255
}
248-
const metadata = await this.getResultMetadata();
256+
const metadata = await this.thriftResultMetadataResponse();
249257
// The lifecycle subset has cancel/close only; fetch methods exist on
250258
// the full napi Statement. Cast is safe here because we've just
251259
// verified `fetchNextBatch` is callable.

lib/sea/SeaOperationLifecycle.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import Status from '../dto/Status';
5252
import { LogLevel } from '../contracts/IDBSQLLogger';
5353
import IClientContext from '../contracts/IClientContext';
5454
import { mapKernelErrorToJsError, KernelErrorShape } from './SeaErrorMapping';
55+
import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError';
5556

5657
/**
5758
* Minimal shape of the napi `Statement` that the lifecycle helpers
@@ -156,6 +157,7 @@ export async function seaCancel(
156157
try {
157158
await statement.cancel();
158159
} catch (err) {
160+
state.isCancelled = false;
159161
rethrowKernelError(err);
160162
}
161163

@@ -193,6 +195,7 @@ export async function seaClose(
193195
try {
194196
await statement.close();
195197
} catch (err) {
198+
state.isClosed = false;
196199
rethrowKernelError(err);
197200
}
198201

@@ -260,11 +263,9 @@ export async function seaFinished(
260263

261264
/**
262265
* Pre-flight check used by fetch* methods on `SeaOperationBackend`.
263-
* If the operation has been cancelled or closed, throws the same
264-
* `HiveDriverError`-shaped failure that `DBSQLOperation.failIfClosed`
265-
* raises today (`lib/DBSQLOperation.ts:328-335`), via the kernel
266-
* error mapping so the SQLSTATE / message conventions stay
267-
* consistent.
266+
* If the operation has been cancelled or closed, throw the same
267+
* `OperationStateError` classes the facade uses. Keeping these typed lets
268+
* callers branch on `OperationStateErrorCode` consistently for Thrift and SEA.
268269
*
269270
* Exported so impl-results can call it at the top of every fetch
270271
* call without duplicating the if/throw logic.
@@ -277,9 +278,6 @@ export function failIfNotActive(state: SeaOperationLifecycleState): void {
277278
});
278279
}
279280
if (state.isClosed) {
280-
throw mapKernelErrorToJsError({
281-
code: 'InvalidStatementHandle',
282-
message: 'The operation was closed.',
283-
});
281+
throw new OperationStateError(OperationStateErrorCode.Closed);
284282
}
285283
}

lib/sea/SeaResultsProvider.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,11 @@ export default class SeaResultsProvider implements IResultsProvider<ArrowBatch>
100100
// Patch the raw bytes once: rewrite any Arrow `Duration` field to
101101
// `Int64` with a `databricks.arrow.duration_unit` marker, so that
102102
// apache-arrow@13 (which predates Duration support) can decode the
103-
// stream. `decodeIpcBatch` and the downstream
104-
// `RecordBatchReader.from` inside `ArrowResultConverter` both see
105-
// the patched buffer. See `SeaArrowIpcDurationFix.ts`.
103+
// stream. `decodeIpcBatch` is told these bytes are already patched;
104+
// the downstream `RecordBatchReader.from` inside `ArrowResultConverter`
105+
// sees the same patched buffer. See `SeaArrowIpcDurationFix.ts`.
106106
const ipcBytes = patchIpcBytes(next.ipcBytes);
107-
const { rowCount } = decodeIpcBatch(ipcBytes);
107+
const { rowCount } = decodeIpcBatch(ipcBytes, { alreadyPatched: true });
108108
if (rowCount === 0) {
109109
// Skip empty batches — the converter handles them but pre-filtering
110110
// here avoids one round-trip through the converter's prefetch loop.

lib/sea/SeaSessionBackend.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ export default class SeaSessionBackend implements ISessionBackend {
115115
'SEA executeStatement: queryTimeout is not supported in M0 (deferred to M1)',
116116
);
117117
}
118+
if (options.useCloudFetch !== undefined) {
119+
throw new HiveDriverError(
120+
'SEA executeStatement: useCloudFetch is controlled by the kernel result configuration and is not a per-statement option on SEA',
121+
);
122+
}
118123

119124
let nativeStatement;
120125
try {
@@ -125,6 +130,7 @@ export default class SeaSessionBackend implements ISessionBackend {
125130
return new SeaOperationBackend({
126131
statement: nativeStatement!,
127132
context: this.context,
133+
id: nativeStatement!.statementId,
128134
});
129135
}
130136

0 commit comments

Comments
 (0)