Skip to content

Commit eba8b97

Browse files
rustyconoverclaude
andcommitted
Wire external storage resolution into HTTP stream client
HttpStreamSession now resolves external pointer batches in stream responses, completing the client-side wiring. The externalConfig is passed from httpConnect through to the stream session. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 0f97f2b commit eba8b97

2 files changed

Lines changed: 21 additions & 7 deletions

File tree

src/client/connect.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ export function httpConnect(baseUrl: string, options?: HttpConnectOptions): RpcC
309309
compressFn,
310310
decompressFn,
311311
authorization,
312+
externalConfig,
312313
});
313314
},
314315

src/client/stream.ts

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import { Field, makeData, RecordBatch, Schema, Struct, vectorFromArray } from "@query-farm/apache-arrow";
55
import { STATE_KEY } from "../constants.js";
66
import { RpcError } from "../errors.js";
7+
import { type ExternalLocationConfig, isExternalLocationBatch, resolveExternalLocation } from "../external.js";
78
import { ARROW_CONTENT_TYPE, serializeIpcStream } from "../http/common.js";
89
import { dispatchLogOrError, extractBatchRows, inferArrowType, readResponseBatches } from "./ipc.js";
910
import type { LogMessage, StreamSession } from "./types.js";
@@ -26,6 +27,7 @@ export class HttpStreamSession implements StreamSession {
2627
private _compressFn?: CompressFn;
2728
private _decompressFn?: DecompressFn;
2829
private _authorization?: string;
30+
private _externalConfig?: ExternalLocationConfig;
2931

3032
constructor(opts: {
3133
baseUrl: string;
@@ -42,6 +44,7 @@ export class HttpStreamSession implements StreamSession {
4244
compressFn?: CompressFn;
4345
decompressFn?: DecompressFn;
4446
authorization?: string;
47+
externalConfig?: ExternalLocationConfig;
4548
}) {
4649
this._baseUrl = opts.baseUrl;
4750
this._prefix = opts.prefix;
@@ -57,6 +60,7 @@ export class HttpStreamSession implements StreamSession {
5760
this._compressFn = opts.compressFn;
5861
this._decompressFn = opts.decompressFn;
5962
this._authorization = opts.authorization;
63+
this._externalConfig = opts.externalConfig;
6064
}
6165

6266
get header(): Record<string, any> | null {
@@ -211,10 +215,14 @@ export class HttpStreamSession implements StreamSession {
211215
*/
212216
async *[Symbol.asyncIterator](): AsyncIterableIterator<Record<string, any>[]> {
213217
// Yield pre-loaded batches from init
214-
for (const batch of this._pendingBatches) {
218+
for (let batch of this._pendingBatches) {
215219
if (batch.numRows === 0) {
216-
dispatchLogOrError(batch, this._onLog);
217-
continue;
220+
if (isExternalLocationBatch(batch)) {
221+
batch = await resolveExternalLocation(batch, this._externalConfig);
222+
} else {
223+
dispatchLogOrError(batch, this._onLog);
224+
continue;
225+
}
218226
}
219227
yield extractBatchRows(batch);
220228
}
@@ -229,7 +237,7 @@ export class HttpStreamSession implements StreamSession {
229237
const { batches } = await readResponseBatches(responseBody);
230238

231239
let gotContinuation = false;
232-
for (const batch of batches) {
240+
for (let batch of batches) {
233241
if (batch.numRows === 0) {
234242
// Check for continuation token
235243
const token = batch.metadata?.get(STATE_KEY);
@@ -238,9 +246,14 @@ export class HttpStreamSession implements StreamSession {
238246
gotContinuation = true;
239247
continue;
240248
}
241-
// Log/error batch
242-
dispatchLogOrError(batch, this._onLog);
243-
continue;
249+
// Check for external location pointer
250+
if (isExternalLocationBatch(batch)) {
251+
batch = await resolveExternalLocation(batch, this._externalConfig);
252+
} else {
253+
// Log/error batch
254+
dispatchLogOrError(batch, this._onLog);
255+
continue;
256+
}
244257
}
245258

246259
yield extractBatchRows(batch);

0 commit comments

Comments
 (0)