Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/recover-stranded-batch-trigger-and-wait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Recover `batchTriggerAndWait` parents that previously hung forever when a batch's item stream never completed. Batches left unsealed past a timeout are now aborted and the waiting parent resumes with an error instead of waiting indefinitely.
7 changes: 7 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,13 @@ const EnvironmentSchema = z
// in-flight memory per request ≈ this × STREAMING_BATCH_ITEM_MAXIMUM_SIZE,
// so raise with care. Set to 1 for fully sequential ingestion.
STREAMING_BATCH_INGEST_CONCURRENCY: z.coerce.number().int().positive().default(10),
// Seal-timeout reaper: if a batch's Phase 2 item stream never seals the batch
// (rate-limit, request timeout, crash), abort it after this delay and resume any
// blocked parent (batchTriggerAndWait) with an error instead of hanging forever.
// Must exceed the worst-case legitimate time-to-seal: the SDK retries the stream
// up to maxAttempts (5) times, each attempt bounded by the server request timeout
// (~300s), so the floor is ~5 × requestTimeout. Default 30m leaves headroom.
BATCH_SEAL_TIMEOUT_MS: z.coerce.number().int().positive().default(1_800_000),
BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(100),
BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200),
BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"),
Expand Down
9 changes: 9 additions & 0 deletions apps/webapp/app/runEngine/services/createBatch.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { BatchId, RunId } from "@trigger.dev/core/v3/isomorphic";
import { type BatchTaskRun, Prisma } from "@trigger.dev/database";
import { Evt } from "evt";
import { prisma, type PrismaClientOrTransaction } from "~/db.server";
import { env } from "~/env.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
Expand Down Expand Up @@ -149,6 +150,14 @@ export class CreateBatchService extends WithRunEngine {

await this._engine.initializeBatch(initOptions);

// Guard the 2-phase gap: if Phase 2 never seals this batch, the reaper
// aborts it after the timeout and resumes any blocked parent with an
// error instead of leaving it suspended forever.
await this._engine.scheduleExpireBatch({
batchId: batch.id,
availableAt: new Date(Date.now() + env.BATCH_SEAL_TIMEOUT_MS),
});

Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
logger.info("Batch created", {
batchId: friendlyId,
runCount: body.runCount,
Expand Down
26 changes: 26 additions & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ export class RunEngine {
tryCompleteBatch: async ({ payload }) => {
await this.batchSystem.performCompleteBatch({ batchId: payload.batchId });
},
expireBatch: async ({ payload }) => {
await this.batchSystem.expireBatch({ batchId: payload.batchId });
},
continueRunIfUnblocked: async ({ payload }) => {
await this.waitpointSystem.continueRunIfUnblocked({
runId: payload.runId,
Expand Down Expand Up @@ -1640,6 +1643,29 @@ export class RunEngine {
return this.batchSystem.scheduleCompleteBatch({ batchId });
}

/**
* Terminally fail a batch whose Phase 2 item stream never sealed, resolving the
* parent's batchTriggerAndWait waitpoint with an error so the parent resumes
* instead of hanging forever.
*/
async expireBatch({ batchId }: { batchId: string }): Promise<void> {
return this.batchSystem.expireBatch({ batchId });
}

/**
* Schedule the seal-timeout reaper for a batch. If the batch hasn't sealed by
* `availableAt`, {@link expireBatch} terminally fails it and resumes the parent.
*/
async scheduleExpireBatch({
batchId,
availableAt,
}: {
batchId: string;
availableAt: Date;
}): Promise<void> {
return this.batchSystem.scheduleExpireBatch({ batchId, availableAt });
}

// ============================================================================
// BatchQueue methods (DRR-based batch processing)
// ============================================================================
Expand Down
93 changes: 93 additions & 0 deletions internal-packages/run-engine/src/engine/systems/batchSystem.ts
Comment thread
matt-aitken marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { startSpan } from "@internal/tracing";
import { TaskRunError } from "@trigger.dev/core/v3/schemas";
import { isFinalRunStatus } from "../statuses.js";
import { SystemResources } from "./systems.js";
import { WaitpointSystem } from "./waitpointSystem.js";
Expand Down Expand Up @@ -32,6 +33,98 @@ export class BatchSystem {
await this.#tryCompleteBatch({ batchId });
}

public async scheduleExpireBatch({
batchId,
availableAt,
}: {
batchId: string;
availableAt: Date;
}): Promise<void> {
await this.$.worker.enqueue({
// Stable id dedupes repeated schedules for the same batch.
id: `expireBatch:${batchId}`,
job: "expireBatch",
payload: { batchId },
availableAt,
});
}

/**
* Terminally fail a batch whose Phase 2 item stream never sealed it, and resolve
* the parent's batchTriggerAndWait waitpoint with an error so the parent resumes
* with a failure instead of hanging forever.
*
* Idempotent and race-safe: if the stream sealed the batch (or it otherwise
* progressed past an unsealed PENDING state) in the meantime, this is a no-op.
*/
public async expireBatch({ batchId }: { batchId: string }): Promise<void> {
return startSpan(this.$.tracer, "expireBatch", async (span) => {
span.setAttribute("batchId", batchId);

const batch = await this.$.prisma.batchTaskRun.findFirst({
select: { status: true, sealed: true },
where: { id: batchId },
});

if (!batch) {
this.$.logger.debug("expireBatch: batch doesn't exist", { batchId });
return;
}

// The stream sealed the batch, or it already progressed — nothing to fail.
if (batch.sealed || batch.status !== "PENDING") {
this.$.logger.debug("expireBatch: batch already sealed or no longer PENDING", {
batchId,
status: batch.status,
sealed: batch.sealed,
});
return;
}

// Conditional update guards against racing a late seal — whichever loses no-ops.
const aborted = await this.$.prisma.batchTaskRun.updateMany({
where: { id: batchId, sealed: false, status: "PENDING" },
data: {
status: "ABORTED",
completedAt: new Date(),
processingCompletedAt: new Date(),
},
});
Comment thread
matt-aitken marked this conversation as resolved.
Outdated

if (aborted.count === 0) {
this.$.logger.debug("expireBatch: lost race to seal, no-op", { batchId });
return;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

// Only batchTriggerAndWait blocks a parent, so only it has a waitpoint to resolve.
const waitpoint = await this.$.prisma.waitpoint.findFirst({
where: { completedByBatchId: batchId },
});

if (!waitpoint) {
this.$.logger.debug("expireBatch: no waitpoint to resolve (fire-and-forget batch)", {
batchId,
});
return;
}

const error: TaskRunError = {
type: "STRING_ERROR",
raw: "Batch items could not be streamed before the batch timed out",
};

await this.waitpointSystem.completeWaitpoint({
id: waitpoint.id,
output: { value: JSON.stringify(error), isError: true },
});

this.$.logger.warn("expireBatch: aborted unsealed batch and resumed parent with error", {
batchId,
waitpointId: waitpoint.id,
});
});
}

/**
* Checks to see if all runs for a BatchTaskRun are completed, if they are then update the status.
* This isn't used operationally, but it's used for the Batches dashboard page.
Expand Down
Loading