Skip to content

Commit 954ee5c

Browse files
authored
fix(webapp): deliver realtime changes with current content when the read replica lags (#3910)
## Summary When the realtime runs feed (the backend behind the `realtimeBackend` feature flag) hydrates a change from a Postgres read replica, the read can race the replica's apply of the very write that triggered it. The delivered row then carries the previous change's content, and an isolated final change (for example a last `metadata.set` before a run goes quiet) is not corrected until the roughly 20 second backstop poll. Measured against a replica with deliberate apply delay, every delivery trailed exactly one change behind and a final change stranded for the full backstop interval. ## Fix Publishers stamp each change record with the committed row's `updatedAt`, taken from writes they already perform, so the stamp costs no extra queries. The router delays its wake hydrate until the replica's measured lag has passed, anchored to that timestamp: a record that has already spent longer than the lag in transit is hydrated immediately, so only the racing leading edge ever waits. After hydrating, a tripwire compares each row against its record's watermark. Still-stale rows are withheld and retried briefly, and each detection feeds the lag estimate. If retries run out, the rows are delivered anyway (liveness over freshness) and follow-up re-hydrates emit the fresh version through the normal working-set diff once the replica catches up, with the backstop as the terminal net. Replica lag is sampled reader-side only, and only while feeds are active. Aurora reports live lag via `aurora_replica_status()`; vanilla Postgres can only report "caught up or not" (mid-apply lag is not honestly measurable from a replica), so tripwire observations floor the estimate there. Deployments without a replica resolve to zero lag and skip the gate entirely. Tunables live under `REALTIME_BACKEND_NATIVE_REPLICA_LAG_*`, and `realtime_native.stale_hydrates` plus `realtime_native.replica_lag_estimate_ms` make replica health observable. Two adjacent fixes: a metadata update that writes nothing no longer publishes a change record, and buffered parent and root metadata operations now publish when the flusher writes them, so those changes wake live feeds instead of waiting for the backstop. For local testing, `docker-compose` gains an opt-in `database-replica` service (compose profile `replica`) with a configurable `recovery_min_apply_delay`, which reproduces replica-lag behavior deterministically. With the gate disabled this rig reproduces the one-change-behind delivery exactly; with it enabled, deliveries arrive with current content at roughly the true replica lag, across write rates faster and slower than the lag itself.
1 parent 8dc77c0 commit 954ee5c

13 files changed

Lines changed: 1037 additions & 31 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Realtime feed reads now wait out measured read-replica lag and retry stale reads, so subscribers receive each change's current content instead of trailing one change behind when a read replica races the write.

apps/webapp/app/env.server.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,28 @@ const EnvironmentSchema = z
338338
// TTL/size of the per-org realtimeBackend flag cache used to pick the serving backend.
339339
REALTIME_BACKEND_FLAG_CACHE_TTL_MS: z.coerce.number().int().default(30_000),
340340
REALTIME_BACKEND_FLAG_CACHE_MAX_ENTRIES: z.coerce.number().int().default(50_000),
341+
// "1" enables the read-your-writes gate: wake hydrates wait out the measured replica lag
342+
// (anchored to the change record's updatedAtMs) and stale reads are retried.
343+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_GATE_ENABLED: z.string().default("1"),
344+
// Reader-side lag probe cadence while the router is active; probing pauses when idle.
345+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_SAMPLE_INTERVAL_MS: z.coerce.number().int().default(250),
346+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_IDLE_AFTER_MS: z.coerce.number().int().default(30_000),
347+
// The lag estimate is the max sample inside this window (spikes widen it immediately).
348+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_WINDOW_MS: z.coerce.number().int().default(5_000),
349+
// Estimate before the first sample lands (and the floor when probing is unavailable).
350+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_DEFAULT_MS: z.coerce.number().int().default(30),
351+
// Safety margin (clock skew + scheduling) added on top of the lag estimate.
352+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_MARGIN_MS: z.coerce.number().int().default(10),
353+
// Hard cap on any single gate delay — a sick replica degrades freshness, never liveness.
354+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_MAX_DELAY_MS: z.coerce.number().int().default(1_000),
355+
// Re-hydrate attempts for rows the tripwire still finds stale after the delay.
356+
REALTIME_BACKEND_NATIVE_STALE_HYDRATE_RETRIES: z.coerce.number().int().default(3),
357+
// How long a tripwire-observed staleness floors the lag estimate (vanilla-PG replicas
358+
// can't measure mid-apply lag, so observations carry the estimate between races).
359+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_OBSERVED_FLOOR_TTL_MS: z.coerce
360+
.number()
361+
.int()
362+
.default(60_000),
341363

342364
PUBSUB_REDIS_HOST: z
343365
.string()

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,20 @@ export async function routeOperationsToRun(
100100
const [error, result] = await tryCatch(
101101
updateMetadataService.call(targetRunId, { operations }, env)
102102
);
103-
if (!error && result !== undefined) return;
103+
if (!error && result !== undefined) {
104+
// The parent/root run changed too — wake its live feeds (only when something was
105+
// actually written here; buffered writes publish from the flusher).
106+
if (result.updatedAtMs !== undefined) {
107+
publishChangeRecord({
108+
runId: result.runId,
109+
envId: env.id,
110+
tags: result.runTags,
111+
batchId: result.batchId,
112+
updatedAtMs: result.updatedAtMs,
113+
});
114+
}
115+
return;
116+
}
104117

105118
if (error) {
106119
// PG threw — auxiliary op, stay best-effort and don't surface this
@@ -186,13 +199,18 @@ const { action } = createActionApiRoute(
186199
}
187200
if (pgResult) {
188201
// Reflect metadata.set() on a live feed before the next lifecycle event. Publish the
189-
// internal id (the router keys single-run feeds by it, not the friendly id from the URL).
190-
publishChangeRecord({
191-
runId: pgResult.runId,
192-
envId: env.id,
193-
tags: pgResult.runTags,
194-
batchId: pgResult.batchId,
195-
});
202+
// internal id (the router keys single-run feeds by it, not the friendly id from the
203+
// URL) with the committed updatedAt as the read-your-writes watermark. No write
204+
// (no-op body, or ops buffered for the flusher) means nothing to announce here.
205+
if (pgResult.updatedAtMs !== undefined) {
206+
publishChangeRecord({
207+
runId: pgResult.runId,
208+
envId: env.id,
209+
tags: pgResult.runTags,
210+
batchId: pgResult.batchId,
211+
updatedAtMs: pgResult.updatedAtMs,
212+
});
213+
}
196214
return json({ metadata: pgResult.metadata }, { status: 200 });
197215
}
198216

apps/webapp/app/routes/api.v1.runs.$runId.tags.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,20 +84,22 @@ export async function action({ request, params }: ActionFunctionArgs) {
8484
if (newTags.length === 0) {
8585
return json({ message: "No new tags to add" }, { status: 200 });
8686
}
87-
await prisma.taskRun.update({
87+
const updated = await prisma.taskRun.update({
8888
where: {
8989
id: taskRun.id,
9090
runtimeEnvironmentId: env.id,
9191
},
9292
data: { runTags: { push: newTags } },
93+
select: { updatedAt: true },
9394
});
9495
// Publish a run-changed record with the NEW tag set so tag feeds reindex
95-
// (no-op unless enabled).
96+
// (no-op unless enabled). updatedAt is the read-your-writes watermark.
9697
publishChangeRecord({
9798
runId: taskRun.id,
9899
envId: env.id,
99100
tags: existing.concat(newTags),
100101
batchId: taskRun.batchId,
102+
updatedAtMs: updated.updatedAt.getTime(),
101103
});
102104
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
103105
},

apps/webapp/app/services/metadata/updateMetadata.server.ts

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ export type UpdateMetadataServiceOptions = {
3030
maximumSize?: number;
3131
logger?: Logger;
3232
logLevel?: LogLevel;
33+
/** Called after the batched flusher writes a run's buffered operations, with everything
34+
* a realtime change record needs — buffered (parent/root) updates otherwise never wake
35+
* live feeds. */
36+
onRunFlushed?: (run: {
37+
runId: string;
38+
environmentId: string;
39+
tags: string[];
40+
batchId: string | null;
41+
updatedAtMs: number;
42+
}) => void;
3343
// Testing hooks
3444
onBeforeUpdate?: (runId: string) => Promise<void>;
3545
onAfterRead?: (runId: string, metadataVersion: number) => Promise<void>;
@@ -172,12 +182,20 @@ export class UpdateMetadataService {
172182
operations: BufferedRunMetadataChangeOperation[]
173183
) => {
174184
return Effect.gen(this, function* (_) {
175-
// Fetch current run
185+
// Fetch current run (+ the realtime membership keys, so a flush can publish)
176186
const run = yield* _(
177187
Effect.tryPromise(() =>
178188
this._prisma.taskRun.findFirst({
179189
where: { id: runId },
180-
select: { id: true, metadata: true, metadataType: true, metadataVersion: true },
190+
select: {
191+
id: true,
192+
metadata: true,
193+
metadataType: true,
194+
metadataVersion: true,
195+
runtimeEnvironmentId: true,
196+
runTags: true,
197+
batchId: true,
198+
},
181199
})
182200
)
183201
);
@@ -237,6 +255,9 @@ export class UpdateMetadataService {
237255
yield* _(Effect.tryPromise(() => this.options.onBeforeUpdate!(runId)));
238256
}
239257

258+
// Stamp updatedAt explicitly so the realtime publish can carry the exact committed
259+
// value without a follow-up read (updateMany can't RETURNING).
260+
const writeTime = new Date();
240261
const result = yield* _(
241262
Effect.tryPromise(() =>
242263
this._prisma.taskRun.updateMany({
@@ -247,6 +268,7 @@ export class UpdateMetadataService {
247268
data: {
248269
metadata: newMetadataPacket.data,
249270
metadataVersion: { increment: 1 },
271+
updatedAt: writeTime,
250272
},
251273
})
252274
)
@@ -262,6 +284,16 @@ export class UpdateMetadataService {
262284
return yield* _(Effect.fail(new Error("Optimistic lock failed")));
263285
}
264286

287+
yield* Effect.sync(() => {
288+
this.options.onRunFlushed?.({
289+
runId,
290+
environmentId: run.runtimeEnvironmentId,
291+
tags: run.runTags,
292+
batchId: run.batchId,
293+
updatedAtMs: writeTime.getTime(),
294+
});
295+
});
296+
265297
return result;
266298
});
267299
};
@@ -346,7 +378,7 @@ export class UpdateMetadataService {
346378
this.#ingestRunOperations(taskRun.rootTaskRun?.id ?? taskRun.id, body.rootOperations);
347379
}
348380

349-
const newMetadata = await this.#updateRunMetadata({
381+
const result = await this.#updateRunMetadata({
350382
runId: taskRun.id,
351383
body,
352384
existingMetadata: {
@@ -356,11 +388,14 @@ export class UpdateMetadataService {
356388
});
357389

358390
return {
359-
metadata: newMetadata,
391+
metadata: result?.metadata,
360392
// Internal id + membership keys, so callers can publish full realtime records the router routes by index.
361393
runId: taskRun.id,
362394
batchId: taskRun.batchId,
363395
runTags: taskRun.runTags,
396+
// The committed row's updatedAt — the realtime watermark. Undefined when nothing was
397+
// written here (no-op, or buffered for the flusher, which publishes itself).
398+
updatedAtMs: result?.updatedAtMs,
364399
};
365400
}
366401

@@ -372,15 +407,18 @@ export class UpdateMetadataService {
372407
runId: string;
373408
body: UpdateMetadataRequestBody;
374409
existingMetadata: IOPacket;
375-
}) {
410+
}): Promise<{ metadata: Record<string, unknown> | undefined; updatedAtMs?: number }> {
376411
if (Array.isArray(body.operations)) {
377412
return this.#updateRunMetadataWithOperations(runId, body.operations);
378413
} else {
379414
return this.#updateRunMetadataDirectly(runId, body, existingMetadata);
380415
}
381416
}
382417

383-
async #updateRunMetadataWithOperations(runId: string, operations: RunMetadataChangeOperation[]) {
418+
async #updateRunMetadataWithOperations(
419+
runId: string,
420+
operations: RunMetadataChangeOperation[]
421+
): Promise<{ metadata: Record<string, unknown> | undefined; updatedAtMs?: number }> {
384422
const MAX_RETRIES = 3;
385423
let attempts = 0;
386424

@@ -408,9 +446,9 @@ export class UpdateMetadataService {
408446
// Apply operations to the current metadata
409447
const applyResults = applyMetadataOperations(currentMetadata, operations);
410448

411-
// If no operations were applied, return the current metadata
449+
// If no operations were applied, return the current metadata (nothing written)
412450
if (applyResults.unappliedOperations.length === operations.length) {
413-
return currentMetadata;
451+
return { metadata: currentMetadata };
414452
}
415453

416454
const newMetadataPacket = handleMetadataPacket(
@@ -428,7 +466,9 @@ export class UpdateMetadataService {
428466
await this.options.onBeforeUpdate(runId);
429467
}
430468

431-
// Update with optimistic locking
469+
// Update with optimistic locking; updatedAt stamped explicitly so the caller can
470+
// publish the exact committed watermark without a follow-up read.
471+
const writeTime = new Date();
432472
const result = await this._prisma.taskRun.updateMany({
433473
where: {
434474
id: runId,
@@ -440,6 +480,7 @@ export class UpdateMetadataService {
440480
metadataVersion: {
441481
increment: 1,
442482
},
483+
updatedAt: writeTime,
443484
},
444485
});
445486

@@ -454,9 +495,10 @@ export class UpdateMetadataService {
454495
}
455496

456497
// If this was our last attempt, buffer the operations and return optimistically
498+
// (no watermark — the flusher writes later and publishes itself).
457499
if (attempts === MAX_RETRIES) {
458500
this.#ingestRunOperations(runId, operations);
459-
return applyResults.newMetadata;
501+
return { metadata: applyResults.newMetadata };
460502
}
461503

462504
// Otherwise sleep and try again
@@ -474,8 +516,10 @@ export class UpdateMetadataService {
474516
}
475517

476518
// Success! Return the new metadata
477-
return applyResults.newMetadata;
519+
return { metadata: applyResults.newMetadata, updatedAtMs: writeTime.getTime() };
478520
}
521+
522+
return { metadata: undefined };
479523
}
480524

481525
// Checks to see if a run is updatable
@@ -493,17 +537,19 @@ export class UpdateMetadataService {
493537
runId: string,
494538
body: UpdateMetadataRequestBody,
495539
existingMetadata: IOPacket
496-
) {
540+
): Promise<{ metadata: Record<string, unknown> | undefined; updatedAtMs?: number }> {
497541
const metadataPacket = handleMetadataPacket(
498542
body.metadata,
499543
"application/json",
500544
this.maximumSize
501545
);
502546

503547
if (!metadataPacket) {
504-
return {};
548+
return { metadata: {} };
505549
}
506550

551+
let updatedAtMs: number | undefined;
552+
507553
if (
508554
metadataPacket.data !== "{}" ||
509555
(existingMetadata.data && metadataPacket.data !== existingMetadata.data)
@@ -515,7 +561,9 @@ export class UpdateMetadataService {
515561
});
516562
}
517563

518-
// Update the metadata without version check
564+
// Update the metadata without version check; updatedAt stamped explicitly so the
565+
// caller can publish the exact committed watermark.
566+
const writeTime = new Date();
519567
await this._prisma.taskRun.update({
520568
where: {
521569
id: runId,
@@ -526,12 +574,14 @@ export class UpdateMetadataService {
526574
metadataVersion: {
527575
increment: 1,
528576
},
577+
updatedAt: writeTime,
529578
},
530579
});
580+
updatedAtMs = writeTime.getTime();
531581
}
532582

533583
const newMetadata = await parsePacket(metadataPacket);
534-
return newMetadata;
584+
return { metadata: newMetadata, updatedAtMs };
535585
}
536586

537587
#ingestRunOperations(runId: string, operations: RunMetadataChangeOperation[]) {

apps/webapp/app/services/metadata/updateMetadataInstance.server.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { singleton } from "~/utils/singleton";
22
import { env } from "~/env.server";
33
import { UpdateMetadataService } from "./updateMetadata.server";
44
import { prisma } from "~/db.server";
5+
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
56

67
export const updateMetadataService = singleton(
78
"update-metadata-service",
@@ -13,5 +14,16 @@ export const updateMetadataService = singleton(
1314
flushLoggingEnabled: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1",
1415
maximumSize: env.TASK_RUN_METADATA_MAXIMUM_SIZE,
1516
logLevel: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1" ? "debug" : "info",
17+
// Buffered (parent/root) operations land via the flusher, not the caller's request —
18+
// publish here so those changes wake live feeds too (no-op when the backend is off).
19+
onRunFlushed: (run) => {
20+
publishChangeRecord({
21+
runId: run.runId,
22+
envId: run.environmentId,
23+
tags: run.tags,
24+
batchId: run.batchId,
25+
updatedAtMs: run.updatedAtMs,
26+
});
27+
},
1628
})
1729
);

0 commit comments

Comments
 (0)