Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/realtime-append-cap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Session `.in/append` returns readable 413s on oversize bodies (was failing browser fetches as opaque `TypeError: Failed to fetch`) and now rejects only records that would actually exceed S2's per-record ceiling, instead of guessing at a conservative pre-encoding cap.
18 changes: 12 additions & 6 deletions apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,18 @@ const ParamsSchema = z.object({
// POST: server-side append of a single record to a session channel. Mirrors
// the existing /realtime/v1/streams/:runId/:target/:streamId/append route,
// scoped to a Session primitive.
// S2 enforces a 1 MiB per-record limit (metered as
// `8 + 2*H + Σ(header name+value) + body`). We cap the raw HTTP body at
// 512 KiB so the JSON wrapper (`{"data":"...","id":"..."}`), string
// escaping, and any future per-record header additions all stay comfortably
// below S2's ceiling. See https://s2.dev/docs/limits.
const MAX_APPEND_BODY_BYTES = 1024 * 512;
//
// The HTTP body cap here is just a DoS pre-guard — set generously at
// 1 MiB so we don't buffer arbitrarily large inputs before we can
// compute the wrapped size. The actual S2 per-record limit (verified
// empirically against cloud S2) is enforced precisely inside
// `S2RealtimeStreams.#appendPartByName` — it throws
// `S2RecordTooLargeError` (a `ServiceValidationError` with status
// 413) when the metered record size would exceed S2's 1 MiB ceiling
// after JSON wrapping. That lets legitimate bodies up to ~1023 KiB
// raw through (ASCII or low-escape content) while still rejecting
// pathological all-quote content that would double on wrap.
const MAX_APPEND_BODY_BYTES = 1024 * 1024;

const { action, loader } = createActionApiRoute(
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ const ParamsSchema = z.object({
io: z.enum(["out", "in"]),
});

// S2 record body cap. Mirrors the public /realtime/v1/sessions/:s/:io/append
// route — keep it well under S2's 1 MiB per-record limit so JSON wrapping,
// string escaping, and any future per-record headers stay safe.
const MAX_APPEND_BODY_BYTES = 1024 * 512;
// HTTP body cap. Mirrors the public /realtime/v1/sessions/:s/:io/append
// route — DoS pre-guard only. The actual S2 per-record limit is
// enforced precisely by `S2RealtimeStreams.#appendPartByName`
// (throws `S2RecordTooLargeError` with status 413 when the metered
// record size would exceed S2's 1 MiB ceiling after JSON wrapping).
const MAX_APPEND_BODY_BYTES = 1024 * 1024;

// POST: Append a single record to a Session channel from the dashboard
// playground. Mirrors the public `POST /realtime/v1/sessions/:session/:io/append`
Expand Down
38 changes: 37 additions & 1 deletion apps/webapp/app/services/realtime/s2realtimeStreams.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,36 @@ import { StreamIngestor, StreamRecord, StreamResponder, StreamResponseOptions }
import { Logger, LogLevel } from "@trigger.dev/core/logger";
import { headerValue } from "@trigger.dev/core/v3";
import { randomUUID } from "node:crypto";
import { ServiceValidationError } from "~/v3/services/common.server";

// S2's per-record metered-size limit. Verified empirically against
// cloud S2: append succeeds at metered=1048576 and 422s at 1048577
// with `"record must have metered size less than 1 MiB"` (the "less
// than" wording is slightly off — the boundary is inclusive).
//
// Metered size formula:
// metered = 8 (record overhead) + 2*H + Σ(header name + value) + body
// where `body` is the unescaped record body length in UTF-8 bytes and
// `H` is the number of S2 record headers.
//
// We attach no record headers (H=0), so the budget reduces to:
// 8 + body ≤ 1048576 → body ≤ 1048568
export const S2_MAX_METERED_BYTES = 1024 * 1024; // 1 MiB
export const S2_RECORD_BASE_OVERHEAD_BYTES = 8;

/**
* Thrown when a record's metered size would exceed S2's hard per-record
* limit. Caught by the route handler and surfaced as 413.
*/
export class S2RecordTooLargeError extends ServiceValidationError {
constructor(public readonly meteredBytes: number) {
super(
`Record metered size ${meteredBytes} bytes exceeds the S2 per-record limit of ${S2_MAX_METERED_BYTES} bytes. Reduce tool-output size or split into smaller parts.`,
413
);
this.name = "S2RecordTooLargeError";
}
}

export type S2RealtimeStreamsOptions = {
// S2
Expand Down Expand Up @@ -181,8 +211,14 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor {
async #appendPartByName(part: string, partId: string, s2Stream: string): Promise<void> {
this.logger.debug(`S2 appending to stream`, { part, stream: s2Stream });

const recordBody = JSON.stringify({ data: part, id: partId });
const meteredBytes = Buffer.byteLength(recordBody, "utf8") + S2_RECORD_BASE_OVERHEAD_BYTES;
if (meteredBytes > S2_MAX_METERED_BYTES) {
throw new S2RecordTooLargeError(meteredBytes);
}
Comment thread
ericallam marked this conversation as resolved.

const result = await this.s2Append(s2Stream, {
records: [{ body: JSON.stringify({ data: part, id: partId }) }],
records: [{ body: recordBody }],
Comment thread
ericallam marked this conversation as resolved.
});

this.logger.debug(`S2 append result`, { result });
Expand Down
6 changes: 5 additions & 1 deletion apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,11 @@ export function createActionApiRoute<
const contentLength = request.headers.get("content-length");

if (!contentLength || parseInt(contentLength) > maxContentLength) {
return json({ error: "Request body too large" }, { status: 413 });
return await wrapResponse(
request,
json({ error: "Request body too large" }, { status: 413 }),
corsStrategy !== "none"
);
}
}

Expand Down
Loading