Skip to content

Commit f48c897

Browse files
matt-aitkenclaude
andauthored
perf(webapp): parallelize streaming batch-item ingest (#3777)
## Problem The item-streaming endpoint of the two-phase batch API (`POST /api/v3/batches/:batchId/items`) processed streamed items strictly sequentially. For a batch of many large payloads, each offloaded to object storage inline, this serialized N object-store round-trips inside a single request and could exceed Node's default `server.requestTimeout` (300s). The webapp then returned `408`, which the SDK reads as `408 terminated` and retries up to 5 times, turning a slow ingest into a failure that takes tens of minutes to surface. ## Fix Ingest now runs through `p-map` over the NDJSON async iterable with bounded concurrency (`STREAMING_BATCH_INGEST_CONCURRENCY`, default 10): - `p-map` pulls lazily from the stream, so at most `concurrency` items are read and in-flight at once. Peak memory stays bounded to roughly `concurrency × STREAMING_BATCH_ITEM_MAXIMUM_SIZE` and request-body backpressure is preserved. - Set the env to `1` for fully sequential ingestion (escape hatch). ## Why this is safe (ordering and idempotency unchanged) - Ordering derives from each item's index (enqueue `timestamp = batch.createdAt + index`), not enqueue order. - Dedup is atomic per index in `enqueueBatchItem`. - The NDJSON parser now stamps oversized-item markers with their emit position, removing the consumer's sequential `lastIndex` assumption (the only order-dependent bit). - The count-check and conditional-seal path is untouched. ## Scope This speeds up every batch ingested through the streaming endpoint, not just large-payload batches. Each item does a per-item Redis enqueue regardless of size, and those now overlap. Large payloads benefit most because they add an object-store offload round-trip on top of the enqueue. ## Verification Added an integration test (`streamBatchItems.test.ts`) that drives the real service against Postgres + Redis + RunEngine and times a 150-item batch at increasing concurrency. Object-store offload is modelled as a fixed per-item latency (local round-trips are too small to compare meaningfully): ``` runCount=150 large payloads (10ms/item offload): concurrency=1 1739ms concurrency=10 192ms (9.1x faster) concurrency=50 57ms (30.7x faster) small payloads (Redis enqueue only, no offload): concurrency=1 90ms concurrency=10 24ms (3.7x faster) ``` The test asserts correctness at every concurrency (all items accepted, sealed, enqueued exactly once), that parallel ingest beats the sequential floor, and that the small-payload case is strictly faster than sequential, so the win is not specific to large payloads. Also exercised end-to-end over real HTTP against a local server: a 20-item batch (12MB body) ingests and seals, a re-stream of the sealed batch returns `sealed: true` with zero re-accepted items (idempotent retry), and an oversized item still seals at its correct index. Existing coverage stays green: concurrent ingest of a 100-item batch, in-flight processing never exceeding the configured concurrency, concurrent dedup on streaming retry, and emit-position marker indexing. ## Follow-ups (not in this PR) - SDK pre-offload of large item payloads (send `application/store` refs instead of raw blobs) to remove object-store work from the request hot path and shrink the request body. - Optional `server.requestTimeout` bump as a safety net. ## CI fix Added `.github/workflows/codeql.yml` to replace GitHub's automatic ("dynamic") CodeQL scanning. The dynamic setup was failing to upload SARIF results because the auto-generated `GITHUB_TOKEN` lacked the `security-events: write` permission. The explicit workflow grants that permission at the job level and pins all actions to commit SHAs, consistent with the repo's security conventions. ## ✅ Checklist - [ ] I have followed every step in the [contributing guide](https://github.com/triggerdotdev/trigger.dev/blob/main/CONTRIBUTING.md) - [ ] The PR title follows the convention. - [ ] I ran and tested the code works --- ## Testing - Integration test (`streamBatchItems.test.ts`) validates correctness and performance at concurrency 1, 10, and 50 for both large and small payloads. - End-to-end verified over real HTTP: 20-item/12MB batch ingests and seals, idempotent retry returns `sealed: true`, oversized item seals at correct index. --- ## Changelog Streaming batch ingest now processes items with bounded concurrency instead of one at a time, so batches of many large payloads ingest far faster and no longer time out. Concurrency is configurable via `STREAMING_BATCH_INGEST_CONCURRENCY` (default 10); set it to 1 for fully sequential ingestion. --- ## Screenshots _[Screenshots]_ 💯 --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 5d6ea33 commit f48c897

6 files changed

Lines changed: 593 additions & 94 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Streaming batch ingest now processes items with bounded concurrency instead of one at a time, so batches of many large payloads ingest far faster and no longer time out. Concurrency is configurable via `STREAMING_BATCH_INGEST_CONCURRENCY` (default 10); set it to 1 for fully sequential ingestion.

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,10 @@ const EnvironmentSchema = z
768768
// 2-phase batch API settings
769769
STREAMING_BATCH_MAX_ITEMS: z.coerce.number().int().default(1_000), // Max items in streaming batch
770770
STREAMING_BATCH_ITEM_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728),
771+
// Number of streamed batch items ingested concurrently in Phase 2. Peak
772+
// in-flight memory per request ≈ this × STREAMING_BATCH_ITEM_MAXIMUM_SIZE,
773+
// so raise with care. Set to 1 for fully sequential ingestion.
774+
STREAMING_BATCH_INGEST_CONCURRENCY: z.coerce.number().int().positive().default(10),
771775
BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(100),
772776
BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200),
773777
BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"),

apps/webapp/app/routes/api.v3.batches.$batchId.items.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
8484
const service = new StreamBatchItemsService();
8585
const result = await service.call(authResult.environment, batchId, itemsIterator, {
8686
maxItemBytes: env.STREAMING_BATCH_ITEM_MAXIMUM_SIZE,
87+
concurrency: env.STREAMING_BATCH_INGEST_CONCURRENCY,
8788
});
8889

8990
return json(result, { status: 200 });

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 156 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
} from "@trigger.dev/core/v3";
55
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
66
import type { BatchItem, RunEngine } from "@internal/run-engine";
7+
import pMap from "p-map";
78
import type { BatchTaskRunStatus } from "@trigger.dev/database";
89
import { prisma, type PrismaClientOrTransaction } from "~/db.server";
910
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
@@ -55,6 +56,8 @@ export function isIdempotentRetrySuccess(
5556

5657
export type StreamBatchItemsServiceOptions = {
5758
maxItemBytes: number;
59+
/** Max items processed concurrently. The route wires this to STREAMING_BATCH_INGEST_CONCURRENCY. */
60+
concurrency: number;
5861
};
5962

6063
export type OversizedItemMarker = {
@@ -68,6 +71,8 @@ export type OversizedItemMarker = {
6871
export type StreamBatchItemsServiceConstructorOptions = {
6972
prisma?: PrismaClientOrTransaction;
7073
engine?: RunEngine;
74+
/** Override the payload processor (used in tests to observe ingest concurrency). */
75+
payloadProcessor?: BatchPayloadProcessor;
7176
};
7277

7378
/**
@@ -88,7 +93,7 @@ export class StreamBatchItemsService extends WithRunEngine {
8893

8994
constructor(opts: StreamBatchItemsServiceConstructorOptions = {}) {
9095
super({ prisma: opts.prisma ?? prisma, engine: opts.engine });
91-
this.payloadProcessor = new BatchPayloadProcessor();
96+
this.payloadProcessor = opts.payloadProcessor ?? new BatchPayloadProcessor();
9297
}
9398

9499
/**
@@ -170,94 +175,28 @@ export class StreamBatchItemsService extends WithRunEngine {
170175
);
171176
}
172177

178+
// Process items from the stream with bounded concurrency.
179+
//
180+
// Ordering and idempotency do NOT depend on processing order:
181+
// - The BatchQueue derives run order from each item's index
182+
// (enqueue timestamp = batch.createdAt + itemIndex), not enqueue order.
183+
// - enqueueBatchItem() dedups atomically per index.
184+
// We cap concurrency to bound peak in-flight memory (≈ concurrency ×
185+
// maxItemBytes) and to keep backpressure on the request body stream.
186+
// p-map pulls lazily from the async iterator — at most `concurrency`
187+
// items are read and in flight at once. stopOnError aborts ingestion on
188+
// the first failure (the batch is left unsealed; the SDK's retry
189+
// re-streams and dedups already-enqueued items).
190+
const outcomes = await pMap(
191+
itemsIterator,
192+
(rawItem) => this.#processItem(rawItem, batchId, environment, batch.runCount),
193+
{ concurrency: options.concurrency, stopOnError: true }
194+
);
195+
173196
let itemsAccepted = 0;
174197
let itemsDeduplicated = 0;
175-
let lastIndex = -1;
176-
177-
// Process items from the stream
178-
for await (const rawItem of itemsIterator) {
179-
// Check for oversized item markers from the NDJSON parser
180-
if (rawItem && typeof rawItem === "object" && "__batchItemError" in rawItem) {
181-
const marker = rawItem as OversizedItemMarker;
182-
const itemIndex = marker.index >= 0 ? marker.index : lastIndex + 1;
183-
184-
const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(1)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(1)} KB. Reduce the payload size or offload large data to external storage.`;
185-
186-
// Enqueue with __error metadata - processItemCallback will detect this
187-
// and use TriggerFailedTaskService to create a pre-failed run
188-
const batchItem: BatchItem = {
189-
task: marker.task,
190-
payload: "{}",
191-
payloadType: "application/json",
192-
options: {
193-
__error: errorMessage,
194-
__errorCode: "PAYLOAD_TOO_LARGE",
195-
},
196-
};
197-
198-
const result = await this._engine.enqueueBatchItem(
199-
batchId,
200-
environment.id,
201-
itemIndex,
202-
batchItem
203-
);
204-
205-
if (result.enqueued) {
206-
itemsAccepted++;
207-
} else {
208-
itemsDeduplicated++;
209-
}
210-
lastIndex = itemIndex;
211-
continue;
212-
}
213-
214-
// Parse and validate the item
215-
const parseResult = BatchItemNDJSONSchema.safeParse(rawItem);
216-
if (!parseResult.success) {
217-
throw new ServiceValidationError(
218-
`Invalid item at index ${lastIndex + 1}: ${parseResult.error.message}`
219-
);
220-
}
221-
222-
const item = parseResult.data;
223-
lastIndex = item.index;
224-
225-
// Validate index is within expected range
226-
if (item.index >= batch.runCount) {
227-
throw new ServiceValidationError(
228-
`Item index ${item.index} exceeds batch runCount ${batch.runCount}`
229-
);
230-
}
231-
232-
// Get the original payload type
233-
const originalPayloadType = (item.options?.payloadType as string) ?? "application/json";
234-
235-
// Process payload - offload to R2 if it exceeds threshold
236-
const processedPayload = await this.payloadProcessor.process(
237-
item.payload,
238-
originalPayloadType,
239-
batchId,
240-
item.index,
241-
environment
242-
);
243-
244-
// Convert to BatchItem format with potentially offloaded payload
245-
const batchItem: BatchItem = {
246-
task: item.task,
247-
payload: processedPayload.payload,
248-
payloadType: processedPayload.payloadType,
249-
options: item.options,
250-
};
251-
252-
// Enqueue the item
253-
const result = await this._engine.enqueueBatchItem(
254-
batchId,
255-
environment.id,
256-
item.index,
257-
batchItem
258-
);
259-
260-
if (result.enqueued) {
198+
for (const outcome of outcomes) {
199+
if (outcome === "accepted") {
261200
itemsAccepted++;
262201
} else {
263202
itemsDeduplicated++;
@@ -446,6 +385,112 @@ export class StreamBatchItemsService extends WithRunEngine {
446385
}
447386
);
448387
}
388+
389+
/**
390+
* Process a single streamed batch item: validate it, offload its payload to
391+
* object storage if oversized, and enqueue it. Returns whether the item was
392+
* newly enqueued ("accepted") or was a duplicate ("deduplicated"). Throws
393+
* ServiceValidationError for invalid items, which aborts the stream.
394+
*
395+
* Safe to run concurrently: enqueueBatchItem() is atomic and order-independent
396+
* per item index, and each item carries its own index (real items from the
397+
* SDK; oversized markers are stamped by the NDJSON parser).
398+
*/
399+
async #processItem(
400+
rawItem: unknown,
401+
batchId: string,
402+
environment: AuthenticatedEnvironment,
403+
runCount: number
404+
): Promise<"accepted" | "deduplicated"> {
405+
// Oversized item marker emitted by the NDJSON parser
406+
if (rawItem && typeof rawItem === "object" && "__batchItemError" in rawItem) {
407+
const marker = rawItem as OversizedItemMarker;
408+
409+
// Same out-of-range guard as normal items: an oversized item with an
410+
// out-of-range index must 4xx rather than create a stray pre-failed run.
411+
if (marker.index >= runCount) {
412+
throw new ServiceValidationError(
413+
`Item index ${marker.index} exceeds batch runCount ${runCount}`
414+
);
415+
}
416+
417+
const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(
418+
1
419+
)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(
420+
1
421+
)} KB. Reduce the payload size or offload large data to external storage.`;
422+
423+
// Enqueue with __error metadata - processItemCallback will detect this
424+
// and use TriggerFailedTaskService to create a pre-failed run
425+
const batchItem: BatchItem = {
426+
task: marker.task,
427+
payload: "{}",
428+
payloadType: "application/json",
429+
options: {
430+
__error: errorMessage,
431+
__errorCode: "PAYLOAD_TOO_LARGE",
432+
},
433+
};
434+
435+
const result = await this._engine.enqueueBatchItem(
436+
batchId,
437+
environment.id,
438+
marker.index,
439+
batchItem
440+
);
441+
442+
return result.enqueued ? "accepted" : "deduplicated";
443+
}
444+
445+
// Parse and validate the item
446+
const parseResult = BatchItemNDJSONSchema.safeParse(rawItem);
447+
if (!parseResult.success) {
448+
const rawIndex = (rawItem as { index?: unknown } | null)?.index;
449+
const where = typeof rawIndex === "number" ? `index ${rawIndex}` : "unknown index";
450+
throw new ServiceValidationError(
451+
`Invalid item at ${where}: ${parseResult.error.message}`
452+
);
453+
}
454+
455+
const item = parseResult.data;
456+
457+
// Validate index is within expected range
458+
if (item.index >= runCount) {
459+
throw new ServiceValidationError(
460+
`Item index ${item.index} exceeds batch runCount ${runCount}`
461+
);
462+
}
463+
464+
// Get the original payload type
465+
const originalPayloadType = (item.options?.payloadType as string) ?? "application/json";
466+
467+
// Process payload - offload to object storage if it exceeds threshold
468+
const processedPayload = await this.payloadProcessor.process(
469+
item.payload,
470+
originalPayloadType,
471+
batchId,
472+
item.index,
473+
environment
474+
);
475+
476+
// Convert to BatchItem format with potentially offloaded payload
477+
const batchItem: BatchItem = {
478+
task: item.task,
479+
payload: processedPayload.payload,
480+
payloadType: processedPayload.payloadType,
481+
options: item.options,
482+
};
483+
484+
// Enqueue the item
485+
const result = await this._engine.enqueueBatchItem(
486+
batchId,
487+
environment.id,
488+
item.index,
489+
batchItem
490+
);
491+
492+
return result.enqueued ? "accepted" : "deduplicated";
493+
}
449494
}
450495

451496
/**
@@ -587,12 +632,29 @@ export function createNdjsonParserStream(
587632
let chunks: Uint8Array[] = [];
588633
let totalBytes = 0;
589634
let lineNumber = 0;
635+
// 0-based position of the next object we emit (parsed item or oversized
636+
// marker). The parser is the single sequential point in the pipeline, so this
637+
// is the authoritative source of item ordering — downstream consumers can
638+
// process items concurrently and must not rely on processing order to derive
639+
// an item's index. Used to back-fill an oversized marker's index when it
640+
// couldn't be extracted from the (truncated) raw bytes.
641+
let emittedCount = 0;
590642
// When an oversized incomplete line is detected (Case 2), we must discard
591643
// all remaining bytes of that line until the next newline delimiter.
592644
let skipUntilNewline = false;
593645

594646
const NEWLINE_BYTE = 0x0a; // '\n'
595647

648+
/**
649+
* Emit a parsed object or marker downstream and advance the emit position.
650+
* Every emitted object MUST go through here so `emittedCount` stays aligned
651+
* with item position (empty/skipped lines never emit, so they don't count).
652+
*/
653+
function emit(controller: TransformStreamDefaultController<unknown>, obj: unknown): void {
654+
controller.enqueue(obj);
655+
emittedCount++;
656+
}
657+
596658
/**
597659
* Concatenate all chunks into a single Uint8Array
598660
*/
@@ -675,7 +737,7 @@ export function createNdjsonParserStream(
675737

676738
try {
677739
const obj = JSON.parse(trimmed);
678-
controller.enqueue(obj);
740+
emit(controller, obj);
679741
} catch (err) {
680742
throw new Error(`Invalid JSON at line ${lineNumber}: ${(err as Error).message}`);
681743
}
@@ -715,12 +777,12 @@ export function createNdjsonParserStream(
715777
const extracted = extractIndexAndTask(lineBytes);
716778
const marker: OversizedItemMarker = {
717779
__batchItemError: "OVERSIZED",
718-
index: extracted.index,
780+
index: extracted.index >= 0 ? extracted.index : emittedCount,
719781
task: extracted.task,
720782
actualSize: newlineIndex,
721783
maxSize: maxItemBytes,
722784
};
723-
controller.enqueue(marker);
785+
emit(controller, marker);
724786
lineNumber++;
725787
continue;
726788
}
@@ -736,12 +798,12 @@ export function createNdjsonParserStream(
736798
const extracted = extractIndexAndTask(concatenateChunks());
737799
const marker: OversizedItemMarker = {
738800
__batchItemError: "OVERSIZED",
739-
index: extracted.index,
801+
index: extracted.index >= 0 ? extracted.index : emittedCount,
740802
task: extracted.task,
741803
actualSize: totalBytes,
742804
maxSize: maxItemBytes,
743805
};
744-
controller.enqueue(marker);
806+
emit(controller, marker);
745807
lineNumber++;
746808
// Clear buffer and skip remaining bytes of this oversized line
747809
// until the next newline delimiter is found in a subsequent chunk
@@ -768,12 +830,12 @@ export function createNdjsonParserStream(
768830
const extracted = extractIndexAndTask(concatenateChunks());
769831
const marker: OversizedItemMarker = {
770832
__batchItemError: "OVERSIZED",
771-
index: extracted.index,
833+
index: extracted.index >= 0 ? extracted.index : emittedCount,
772834
task: extracted.task,
773835
actualSize: totalBytes,
774836
maxSize: maxItemBytes,
775837
};
776-
controller.enqueue(marker);
838+
emit(controller, marker);
777839
return;
778840
}
779841

0 commit comments

Comments
 (0)