Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion KERNEL_REV
Original file line number Diff line number Diff line change
@@ -1 +1 @@
80b68e1eef3b613910183a50dfa4dace854d50dd
9c2e2378f9a0bcee7d2750371392c07cac38fc3d
7 changes: 7 additions & 0 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I

useLZ4Compression: true,

preserveBigNumericPrecision: false,

// Telemetry defaults are sourced from DEFAULT_TELEMETRY_CONFIG so
// every component reads from the same single frozen const. Mapping the
// unprefixed TelemetryConfiguration keys to the `telemetry`-prefixed
Expand Down Expand Up @@ -604,6 +606,11 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
this.config.enableMetricViewMetadata = options.enableMetricViewMetadata;
}

// Opt-in: preserve DECIMAL (string) / BIGINT (bigint) precision in results.
if (options.preserveBigNumericPrecision !== undefined) {
this.config.preserveBigNumericPrecision = options.preserveBigNumericPrecision;
}

// Override telemetry config if provided in options. Per-key narrowed copy
// preserves the structural type system: `ConnectionOptions` and
// `ClientConfig` declare identical types for these knobs, so a user
Expand Down
51 changes: 49 additions & 2 deletions lib/DBSQLParameter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,37 @@ export enum DBSQLParameterType {
INTERVALDAY = 'INTERVAL DAY',
}

// 32-bit signed integer bounds — the range of the Spark `INT` type.
const INT32_MIN = -2147483648;
const INT32_MAX = 2147483647;

/**
* Infer the Spark parameter type for a JS `number` when the caller didn't set
* one explicitly.
*
* A JS `number` is an IEEE-754 double, so a whole-number value can still be far
* outside the `INT` range (e.g. `1e30`). Typing such a value as `INTEGER`
* makes the server reject it (`invalid INT literal "1e+30"`). Pick the
* narrowest type that actually fits:
* - non-integer / non-finite → `DOUBLE`
* - integer within INT (i32) range → `INTEGER`
* - integer within the safe-integer range → `BIGINT`
* - anything larger → `DOUBLE` (can't be represented exactly as an integer
* anyway; callers needing exact 64-bit integers should pass a `bigint`).
*/
function inferNumberType(value: number): DBSQLParameterType {
if (!Number.isInteger(value)) {
return DBSQLParameterType.DOUBLE;
}
if (value >= INT32_MIN && value <= INT32_MAX) {
return DBSQLParameterType.INTEGER;
}
if (Number.isSafeInteger(value)) {
return DBSQLParameterType.BIGINT;
}
return DBSQLParameterType.DOUBLE;
}

interface DBSQLParameterOptions {
type?: DBSQLParameterType;
value: DBSQLParameterValue;
Expand Down Expand Up @@ -78,7 +109,7 @@ export class DBSQLParameter {
if (typeof this.value === 'number') {
return new TSparkParameter({
name,
type: wireType ?? (Number.isInteger(this.value) ? DBSQLParameterType.INTEGER : DBSQLParameterType.DOUBLE),
type: wireType ?? inferNumberType(this.value),
value: new TSparkParameterValue({
stringValue: Number(this.value).toString(),
}),
Expand All @@ -96,11 +127,27 @@ export class DBSQLParameter {
}

if (this.value instanceof Date) {
// A `Date` bound as `DATE` must project a calendar date (`yyyy-mm-dd`),
// not a full ISO-8601 timestamp: the SEA wire rejects
// `2024-03-14T00:00:00.000Z` as a DATE literal ("trailing input"), and
// Thrift accepts the date-only form just as well. Without an explicit
// DATE type the value still binds as a TIMESTAMP from the full ISO string.
const isDateType = wireType === DBSQLParameterType.DATE;
return new TSparkParameter({
name,
type: wireType ?? DBSQLParameterType.TIMESTAMP,
value: new TSparkParameterValue({
stringValue: this.value.toISOString(),
// For DATE, project the *calendar* date using local-time accessors
// rather than `toISOString().slice(0, 10)`. `toISOString()` first
// converts to UTC, so a `new Date(2024, 2, 14)` constructed in a
// positive-offset zone (e.g. UTC+10, internal `2024-03-13T14:00Z`)
// would yield "2024-03-13" — off by one. Users reason about a DATE
// as the wall-calendar date they constructed, so extract that.
stringValue: isDateType
? `${this.value.getFullYear()}-${String(this.value.getMonth() + 1).padStart(2, '0')}-${String(
this.value.getDate(),
).padStart(2, '0')}`
: this.value.toISOString(),
}),
});
}
Expand Down
164 changes: 136 additions & 28 deletions lib/connection/connections/HttpRetryPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,28 @@ function delay(milliseconds: number): Promise<void> {
});
}

// Transient network error codes worth retrying. Aligned with the OS-level errno set
// surfaced by Node's `http`/`https` (and `node-fetch` via `system` FetchError type)
// when an in-flight request fails before/while delivering a response. Matches the
// classes of errors that the Python (urllib3) and JDBC (Apache HttpClient) drivers
// retry by default at the connection layer.
const RETRYABLE_NETWORK_ERROR_CODES = new Set([
'ECONNRESET',
'ECONNREFUSED',
'ETIMEDOUT',
'EHOSTUNREACH',
'ENETUNREACH',
'EPIPE',
'ENOTFOUND',
'EAI_AGAIN',
]);

// Fallback message patterns for errors that don't carry an errno. node-fetch surfaces
// "socket hang up" as a generic FetchError, and "Premature close" when the response
// body stream closes before all data is received — both occur regularly when a
// keep-alive TCP connection is silently dropped by an intermediate load balancer.
const RETRYABLE_NETWORK_ERROR_MESSAGE_RE = /socket hang up|premature close|aborted/i;

export default class HttpRetryPolicy implements IRetryPolicy<HttpTransactionDetails> {
private context: IClientContext;

Expand All @@ -24,45 +46,131 @@ export default class HttpRetryPolicy implements IRetryPolicy<HttpTransactionDeta

public async shouldRetry(details: HttpTransactionDetails): Promise<ShouldRetryResult> {
if (this.isRetryable(details)) {
const clientConfig = this.context.getConfig();
return this.computeRetry(details);
}

// Don't retry if overall retry timeout exceeded
const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout;
if (timeoutExceeded) {
throw new RetryError(RetryErrorCode.TimeoutExceeded, details);
}
return { shouldRetry: false };
}

this.attempt += 1;
public async invokeWithRetry(operation: RetryableOperation<HttpTransactionDetails>): Promise<HttpTransactionDetails> {
for (;;) {
// Capture either the resolved response or the thrown error so the
// retry-decision logic below can flow without an early `continue` and
// share one backoff site between both paths.
let outcome: { ok: true; details: HttpTransactionDetails } | { ok: false; error: unknown };
try {
// eslint-disable-next-line no-await-in-loop
const details = await operation();
outcome = { ok: true, details };
} catch (error) {
outcome = { ok: false, error };
}

// Don't retry if max attempts count reached
const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
if (attemptsExceeded) {
throw new RetryError(RetryErrorCode.AttemptsExceeded, details);
if (outcome.ok) {
// eslint-disable-next-line no-await-in-loop
const status = await this.shouldRetry(outcome.details);
if (!status.shouldRetry) {
return outcome.details;
}
// eslint-disable-next-line no-await-in-loop
await delay(status.retryAfter);
} else {
// The operation threw before producing a response. This is typically a
// transient network failure (stale keep-alive socket reset by a load
// balancer, DNS hiccup, truncated response body, etc.). The status-code-
// driven `shouldRetry` path can't see these because there's no `Response`
// to inspect, so we have a separate decision point here. Non-network
// errors (programmer errors, config errors, RetryError raised by our
// own attempts/timeout budget) are re-thrown unchanged.
if (!this.isRetryableNetworkError(outcome.error)) {
throw outcome.error;
}
// eslint-disable-next-line no-await-in-loop
const status = await this.computeNetworkErrorRetry(outcome.error);
if (!status.shouldRetry) {
throw outcome.error;
}
// eslint-disable-next-line no-await-in-loop
await delay(status.retryAfter);
}
}
}

// Shared budgeting logic — bumps the attempt counter, enforces overall retries
// timeout/max attempts, and computes the next backoff. Used by both the HTTP
// status-code path (`shouldRetry`) and the network-error path
// (`computeNetworkErrorRetry`) so they share a single attempt budget.
private computeRetry(details: HttpTransactionDetails): ShouldRetryResult {
const clientConfig = this.context.getConfig();

// Don't retry if overall retry timeout exceeded
const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout;
if (timeoutExceeded) {
throw new RetryError(RetryErrorCode.TimeoutExceeded, details);
}

// If possible, use `Retry-After` header as a floor for a backoff algorithm
const retryAfterHeader = this.getRetryAfterHeader(details, clientConfig.retryDelayMin);
const retryAfter = this.getBackoffDelay(
this.attempt,
retryAfterHeader ?? clientConfig.retryDelayMin,
clientConfig.retryDelayMax,
);
this.attempt += 1;

return { shouldRetry: true, retryAfter };
// Don't retry if max attempts count reached
const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
if (attemptsExceeded) {
throw new RetryError(RetryErrorCode.AttemptsExceeded, details);
}

return { shouldRetry: false };
// If possible, use `Retry-After` header as a floor for a backoff algorithm
const retryAfterHeader = this.getRetryAfterHeader(details, clientConfig.retryDelayMin);
const retryAfter = this.getBackoffDelay(
this.attempt,
retryAfterHeader ?? clientConfig.retryDelayMin,
clientConfig.retryDelayMax,
);

return { shouldRetry: true, retryAfter };
}

public async invokeWithRetry(operation: RetryableOperation<HttpTransactionDetails>): Promise<HttpTransactionDetails> {
for (;;) {
const details = await operation(); // eslint-disable-line no-await-in-loop
const status = await this.shouldRetry(details); // eslint-disable-line no-await-in-loop
if (!status.shouldRetry) {
return details;
}
await delay(status.retryAfter); // eslint-disable-line no-await-in-loop
private async computeNetworkErrorRetry(error: unknown): Promise<ShouldRetryResult> {
const clientConfig = this.context.getConfig();

const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout;
if (timeoutExceeded) {
throw new RetryError(RetryErrorCode.TimeoutExceeded, error);
}

this.attempt += 1;

const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
if (attemptsExceeded) {
throw new RetryError(RetryErrorCode.AttemptsExceeded, error);
}

const retryAfter = this.getBackoffDelay(this.attempt, clientConfig.retryDelayMin, clientConfig.retryDelayMax);

return { shouldRetry: true, retryAfter };
}

protected isRetryableNetworkError(error: unknown): boolean {
if (!error || typeof error !== 'object') {
return false;
}
const candidate = error as { code?: string; type?: string; message?: string };

// node-fetch FetchError surfaces low-level network failures with `type: 'system'`
// and a body-stream timeout with `type: 'body-timeout'`. Both should be retried;
// `request-timeout` is converted to a Thrift TApplicationException upstream so
// we don't need to retry it here.
if (candidate.type === 'system' || candidate.type === 'body-timeout') {
return true;
}

if (typeof candidate.code === 'string' && RETRYABLE_NETWORK_ERROR_CODES.has(candidate.code)) {
return true;
}

if (typeof candidate.message === 'string' && RETRYABLE_NETWORK_ERROR_MESSAGE_RE.test(candidate.message)) {
return true;
}

return false;
}

protected isRetryable({ response }: HttpTransactionDetails): boolean {
Expand Down
23 changes: 19 additions & 4 deletions lib/connection/connections/ThriftHttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,36 @@ export default class ThriftHttpConnection extends EventEmitter {
body: data,
};

// Consume the response body inside the retried block. node-fetch surfaces
// late-stage failures (TCP RST after headers, "Premature close") as rejections
// from `response.buffer()`, not from `fetch()`. Reading the body here means
// the retry policy sees those failures and can retry them like any other
// transient network error — the body Buffer is captured via closure so the
// post-retry caller still gets it.
let responseBuffer: Buffer | undefined;

this.getThriftMethodName(data)
.then((thriftMethod) => this.getRetryPolicy(thriftMethod))
.then((retryPolicy) => {
const makeRequest = () => {
const makeRequest = async () => {
responseBuffer = undefined;
const request = new Request(this.url, requestConfig);
return fetch(request).then((response) => ({ request, response }));
const response = await fetch(request);
if (response.status === 200) {
responseBuffer = await response.buffer();
}
return { request, response };
};
return retryPolicy.invokeWithRetry(makeRequest);
})
.then(({ response }) => {
if (response.status !== 200) {
throw new THTTPException(response);
}

return response.buffer();
// `responseBuffer` is always set when status is 200, since `makeRequest`
// assigns it before resolving in that branch and the retry loop only
// returns a fulfilled response (failures are thrown).
return responseBuffer as Buffer;
})
.then((buffer) => {
this.transport.receiver((transportWithData) => this.handleThriftResponse(transportWithData), seqId)(buffer);
Expand Down
6 changes: 6 additions & 0 deletions lib/contracts/IClientContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ export interface ClientConfig {
useLZ4Compression: boolean;
enableMetricViewMetadata?: boolean;

// When true, DECIMAL values are returned as exact strings and 64-bit
// integers as JS `bigint`, instead of being coerced to a lossy `number`.
// Off by default to preserve the long-standing representation on both the
// Thrift and SEA backends. See `ConnectionOptions.preserveBigNumericPrecision`.
preserveBigNumericPrecision?: boolean;

// Telemetry configuration
telemetryEnabled?: boolean;
telemetryBatchSize?: number;
Expand Down
9 changes: 9 additions & 0 deletions lib/contracts/IDBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ export type ConnectionOptions = {
proxy?: ProxyOptions;
enableMetricViewMetadata?: boolean;

/**
* Preserve full numeric precision in results. When `true`, DECIMAL columns
* are returned as exact strings and 64-bit integers (BIGINT) as JS `bigint`,
* instead of the default lossy coercion to a JS `number` (which silently
* rounds DECIMALs and integers beyond 2^53). Applies to both the Thrift and
* SEA backends. Defaults to `false` to preserve the existing representation.
*/
preserveBigNumericPrecision?: boolean;

/**
* Extra HTTP headers attached to driver-owned out-of-band requests
* (telemetry POSTs and feature-flag GETs). Not applied to the primary
Expand Down
Loading
Loading