Skip to content

Commit 589a95f

Browse files
committed
http/dispatch: mirror subprocess stream fix (effectiveProducer + __inputSchema)
The subprocess path (src/dispatch/stream.ts) got this fix months ago (commit 61025e1): respect state.__inputSchema override for dynamic-input exchange methods, gate conformBatchToSchema on effectiveProducer, and let a conformance failure fall through rather than throw so exchange handlers with dummy tick inputs can still run. The HTTP path (src/http/dispatch.ts::httpDispatchStreamExchange) had never received the matching fix — it called conformBatchToSchema unconditionally, which blew up on every VGI init exchange that wasn't column-shape-compatible with method.inputSchema (i.e. every scalar function and most table_in_out functions under the HTTP transport). Port the same three changes here: - inputSchema now reads state.__inputSchema ?? method.inputSchema - conform is gated on !effectiveProducer && schema mismatch - conform failure logs and falls through with the original batch Unlocks ~20 HTTP test passes in vgi-typescript (all the integration/scalar/* + most integration/table_in_out/* tests).
1 parent 61025e1 commit 589a95f

1 file changed

Lines changed: 20 additions & 3 deletions

File tree

src/http/dispatch.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,11 @@ export async function httpDispatchStreamExchange(
223223
if (unpacked.inputSchemaBytes.length > 0) {
224224
inputSchema = await deserializeSchema(unpacked.inputSchemaBytes);
225225
} else {
226-
inputSchema = method.inputSchema ?? EMPTY_SCHEMA;
226+
// state.__inputSchema mirrors the __outputSchema pattern — set by
227+
// dynamic-input exchange methods (e.g. VGI's init, which binds to a
228+
// user-supplied input shape per invocation). Matches the fix already
229+
// applied in src/dispatch/stream.ts for the subprocess path.
230+
inputSchema = state?.__inputSchema ?? method.inputSchema ?? EMPTY_SCHEMA;
227231
}
228232
const effectiveProducer = state?.__isProducer ?? isProducer;
229233
if (process.env.VGI_DISPATCH_DEBUG)
@@ -255,8 +259,21 @@ export async function httpDispatchStreamExchange(
255259
// when effectiveProducer so finish() is allowed.
256260
const out = new OutputCollector(outputSchema, effectiveProducer, ctx.serverId, null, ctx.authContext, ctx.cookies);
257261

258-
// Cast compatible input types (e.g., decimal→double, int32→int64)
259-
const conformedBatch = conformBatchToSchema(reqBatch, inputSchema);
262+
// Cast compatible input types (e.g., decimal→double, int32→int64).
263+
// Gated on effectiveProducer (not isProducer) so methods that flip to
264+
// producer mode via state.__isProducer skip the conform entirely — the
265+
// tick batches they receive have a dummy shape that shouldn't be
266+
// checked against the declared input schema. Any conformance failure
267+
// falls through with the original batch; the handler owns input-shape
268+
// validation if it cares. Mirrors dispatch/stream.ts.
269+
let conformedBatch = reqBatch;
270+
if (!effectiveProducer && inputSchema !== EMPTY_SCHEMA && reqBatch.schema !== inputSchema) {
271+
try {
272+
conformedBatch = conformBatchToSchema(reqBatch, inputSchema);
273+
} catch (e) {
274+
console.debug?.(`Schema conformance skipped: ${e instanceof Error ? e.message : e}`);
275+
}
276+
}
260277

261278
try {
262279
if (method.exchangeFn) {

0 commit comments

Comments
 (0)