Skip to content

Commit 44d852b

Browse files
committed
feat(webapp): share realtime replay cursors across instances
A load balancer hop previously made a connection's inter-poll gap unprovable, forcing a cold resolve and a full-window replay on the new instance. Per-connection replay cursors (one timestamp each) now live in Redis behind REALTIME_BACKEND_NATIVE_SHARED_REPLAY_CURSORS (default on), so any instance can read the true gap. Store reads have a bounded deadline and degrade to the old cold-probe behavior on any Redis trouble.
1 parent c7e7b0a commit 44d852b

5 files changed

Lines changed: 333 additions & 10 deletions

File tree

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,8 @@ const EnvironmentSchema = z
318318
REALTIME_BACKEND_NATIVE_RUNSET_CREATED_AT_BUCKET_MS: z.coerce.number().int().default(60_000),
319319
// Leading-edge throttle (ms) on per-env wake delivery; 0 wakes on every change.
320320
REALTIME_BACKEND_NATIVE_ENV_WAKE_COALESCE_WINDOW_MS: z.coerce.number().int().default(250),
321+
// "1" shares per-connection replay cursors fleet-wide via Redis, so a load-balancer hop reads the connection's true inter-poll gap instead of cold-resolving.
322+
REALTIME_BACKEND_NATIVE_SHARED_REPLAY_CURSORS: z.string().default("1"),
321323
// "1" holds a multi-run live poll open on a non-matching wake instead of replying up-to-date.
322324
REALTIME_BACKEND_NATIVE_HOLD_ON_EMPTY: z.string().default("1"),
323325
// Max concurrent fresh ClickHouse resolves per instance (reconnect-stampede gate); 0 disables.

apps/webapp/app/services/realtime/nativeRealtimeClient.server.ts

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
} from "./envChangeRouter.server";
3434
import { type RunHydrator, type RunListResolver } from "./runReader.server";
3535
import { type RealtimeConcurrencyLimiter } from "./realtimeConcurrencyLimiter.server";
36+
import { InMemoryReplayCursorStore, type ReplayCursorStore } from "./replayCursorStore.server";
3637

3738
/** Widened with projectId so the tag-list feed can resolve ids via ClickHouse (needs org + project + env). */
3839
export type RealtimeListEnvironment = RealtimeEnvironment & { projectId: string };
@@ -106,6 +107,10 @@ export type NativeRealtimeClientOptions = {
106107
holdOnEmpty?: boolean;
107108
/** Max concurrent fresh ClickHouse resolves (cache misses) per instance, bounding a distinct-filter stampede. Defaults to 16; 0 disables. */
108109
resolveAdmissionLimit?: number;
110+
/** Per-connection replay-cursor store. Inject a fleet-shared (Redis) store so an instance
111+
* hop reads the connection's true inter-poll gap instead of cold-probing; defaults to a
112+
* per-instance in-memory cache. */
113+
replayCursorStore?: ReplayCursorStore;
109114
/** Observability hook: why a live request woke (notify vs timeout vs abort). */
110115
onWakeup?: (reason: WakeupReason) => void;
111116
/** Observability hook: how a live poll resolved (fast path vs full resolve). */
@@ -206,18 +211,21 @@ export class NativeRealtimeClient implements RealtimeStreamClient {
206211
/** Bounds concurrent fresh CH resolves (undefined => unbounded). */
207212
readonly #admissionGate?: ResolveAdmissionGate;
208213
/** Per-connection: when this connection's last response was sent, so the router's
209-
* replay covers exactly the inter-poll gap instead of rewinding a full window. */
210-
readonly #replayCursorCache: BoundedTtlCache<number>;
214+
* replay covers exactly the inter-poll gap instead of rewinding a full window.
215+
* Fleet-shared when a store is injected (hops stop looking like unknown gaps). */
216+
readonly #replayCursors: ReplayCursorStore;
211217

212218
constructor(private readonly options: NativeRealtimeClientOptions) {
213219
this.#workingSetCache = new BoundedTtlCache(
214220
options.workingSetCacheTtlMs ?? LIST_CACHE_TTL_MS,
215221
options.listCacheMaxEntries ?? LIST_CACHE_MAX_ENTRIES
216222
);
217-
this.#replayCursorCache = new BoundedTtlCache(
218-
options.workingSetCacheTtlMs ?? LIST_CACHE_TTL_MS,
219-
options.listCacheMaxEntries ?? LIST_CACHE_MAX_ENTRIES
220-
);
223+
this.#replayCursors =
224+
options.replayCursorStore ??
225+
new InMemoryReplayCursorStore(
226+
options.workingSetCacheTtlMs ?? LIST_CACHE_TTL_MS,
227+
options.listCacheMaxEntries ?? LIST_CACHE_MAX_ENTRIES
228+
);
221229
this.#runSetCache = new BoundedTtlCache(
222230
options.runSetResolveCacheTtlMs ?? DEFAULT_RUNSET_CACHE_TTL_MS,
223231
options.runSetResolveCacheMaxEntries ?? DEFAULT_RUNSET_CACHE_MAX_ENTRIES
@@ -528,7 +536,7 @@ export class NativeRealtimeClient implements RealtimeStreamClient {
528536
maxUpdatedAt = Math.max(maxUpdatedAt, updatedAtMs);
529537
}
530538
this.#workingSetCache.set(this.#workingSetKey(environment.id, handle), seen);
531-
this.#replayCursorCache.set(this.#workingSetKey(environment.id, handle), Date.now());
539+
this.#replayCursors.set(this.#workingSetKey(environment.id, handle), Date.now());
532540

533541
return this.#buildResponse(buildRowsBody(changes, skipColumns), apiVersion, clientVersion, {
534542
offset: encodeOffset(maxUpdatedAt, this.#nextSeq()),
@@ -559,7 +567,7 @@ export class NativeRealtimeClient implements RealtimeStreamClient {
559567
const workingSetKey = this.#workingSetKey(environment.id, handle);
560568
let prevSeen = this.#workingSetCache.get(workingSetKey);
561569

562-
const markPollEnd = () => this.#replayCursorCache.set(workingSetKey, Date.now());
570+
const markPollEnd = () => this.#replayCursors.set(workingSetKey, Date.now());
563571
const emitFromSerialized = (changes: SerializedRowChange[], maxUpdatedAt: number): Response => {
564572
const seq = this.#nextSeq();
565573
markPollEnd();
@@ -588,12 +596,14 @@ export class NativeRealtimeClient implements RealtimeStreamClient {
588596
});
589597
};
590598

599+
// When this connection last received data, so replay covers exactly its gap. A store
600+
// error degrades to undefined (cold probe), never a failed poll.
601+
const replaySinceMs = await this.#replayCursors.get(workingSetKey);
591602
const registration = this.options.router.register(
592603
environment.id,
593604
this.#feedFilter(filter),
594605
skipColumns,
595-
// When this connection last received data, so replay covers exactly its gap.
596-
{ replaySinceMs: this.#replayCursorCache.get(workingSetKey) }
606+
{ replaySinceMs }
597607
);
598608

599609
// Cold start (fresh env subscription, e.g. an instance hop): resolve once up front

apps/webapp/app/services/realtime/nativeRealtimeClientInstance.server.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { EnvChangeRouter } from "./envChangeRouter.server";
99
import { NativeRealtimeClient } from "./nativeRealtimeClient.server";
1010
import { RealtimeConcurrencyLimiter } from "./realtimeConcurrencyLimiter.server";
1111
import { getRunChangeNotifier } from "./runChangeNotifierInstance.server";
12+
import { RedisReplayCursorStore } from "./replayCursorStore.server";
1213
import { RunHydrator } from "./runReader.server";
1314

1415
// Process-singleton wiring for the native realtime client; only constructed when a
@@ -77,6 +78,11 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient {
7778
description: "Polls rejected (429) by the per-env concurrency limiter.",
7879
});
7980

81+
const replayCursorOps = meter.createCounter("realtime_native.replay_cursor_ops", {
82+
description:
83+
"Shared replay-cursor store operations by outcome. Errors degrade hops to cold resolves (watch live_polls{path='cold-resolve'} rise with them), never failed polls.",
84+
});
85+
8086
const limiter = new RealtimeConcurrencyLimiter({
8187
keyPrefix: "tr:realtime:native:concurrency",
8288
redis: {
@@ -89,6 +95,24 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient {
8995
},
9096
});
9197

98+
// Fleet-shared replay cursors (one timestamp per connection) on the same Redis as the
99+
// change channel, so a load-balancer hop reads the connection's true inter-poll gap.
100+
const replayCursorStore =
101+
env.REALTIME_BACKEND_NATIVE_SHARED_REPLAY_CURSORS === "1"
102+
? new RedisReplayCursorStore({
103+
redis: {
104+
host: env.REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_HOST,
105+
port: env.REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_PORT,
106+
username: env.REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_USERNAME,
107+
password: env.REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_PASSWORD,
108+
tlsDisabled: env.REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_TLS_DISABLED === "true",
109+
clusterMode: env.REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1",
110+
},
111+
ttlMs: env.REALTIME_BACKEND_NATIVE_WORKING_SET_TTL_MS,
112+
onResult: (op, ok) => replayCursorOps.add(1, { op, result: ok ? "ok" : "error" }),
113+
})
114+
: undefined;
115+
92116
// One RunHydrator shared by the router and the client, so its single-flight + short-TTL cache covers both.
93117
const runReader = new RunHydrator({
94118
replica: $replica,
@@ -138,6 +162,7 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient {
138162
runSetCreatedAtBucketMs: env.REALTIME_BACKEND_NATIVE_RUNSET_CREATED_AT_BUCKET_MS,
139163
holdOnEmpty: env.REALTIME_BACKEND_NATIVE_HOLD_ON_EMPTY === "1",
140164
resolveAdmissionLimit: env.REALTIME_BACKEND_NATIVE_RESOLVE_ADMISSION_LIMIT,
165+
replayCursorStore,
141166
onWakeup: (reason) => wakeups.add(1, { reason }),
142167
onLivePollPath: (path) => livePollPaths.add(1, { path }),
143168
onRunSetResolve: (result) => runSetResolves.add(1, { result }),
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import { createRedisClient, type RedisClient, type RedisWithClusterOptions } from "~/redis.server";
2+
import { logger } from "../logger.server";
3+
import { BoundedTtlCache } from "./boundedTtlCache";
4+
5+
/**
6+
* Per-connection replay cursors ("when did this connection last receive data"), keyed by the
7+
* env-prefixed working-set key. Sharing them fleet-wide makes an instance hop look like a normal
8+
* inter-poll gap instead of an unknown one, so hops stop triggering cold resolves and full-window
9+
* replays. Values are single timestamps, so the shared store stays cheap.
10+
*/
11+
export interface ReplayCursorStore {
12+
/** The connection's last-response timestamp; undefined on miss OR error (the caller
13+
* degrades to a cold probe / full-window replay, never blocks the poll). */
14+
get(key: string): Promise<number | undefined>;
15+
/** Fire-and-forget stamp; must never throw. */
16+
set(key: string, ms: number): void;
17+
}
18+
19+
/** Per-instance fallback with the same shape (used when the shared store is disabled, and in tests). */
20+
export class InMemoryReplayCursorStore implements ReplayCursorStore {
21+
readonly #cache: BoundedTtlCache<number>;
22+
23+
constructor(ttlMs: number, maxEntries: number) {
24+
this.#cache = new BoundedTtlCache<number>(ttlMs, maxEntries);
25+
}
26+
27+
async get(key: string): Promise<number | undefined> {
28+
return this.#cache.get(key);
29+
}
30+
31+
set(key: string, ms: number): void {
32+
this.#cache.set(key, ms);
33+
}
34+
}
35+
36+
export type RedisReplayCursorStoreOptions = {
37+
redis: RedisWithClusterOptions;
38+
/** Entry TTL (ms); matches the working-set TTL so both views of a connection age out together. */
39+
ttlMs: number;
40+
/** Read deadline (ms): a slow or down Redis degrades the poll to a cold probe instead of stalling it. */
41+
getTimeoutMs?: number;
42+
keyPrefix?: string;
43+
connectionName?: string;
44+
/** Observability hook: a store op settled (errors are the degradation signal, not failures). */
45+
onResult?: (op: "get" | "set", ok: boolean) => void;
46+
};
47+
48+
const DEFAULT_KEY_PREFIX = "realtime:replay-cursor:";
49+
const DEFAULT_GET_TIMEOUT_MS = 250;
50+
const TIMED_OUT = Symbol("replay-cursor-get-timeout");
51+
52+
export class RedisReplayCursorStore implements ReplayCursorStore {
53+
#client: RedisClient | undefined;
54+
55+
constructor(private readonly options: RedisReplayCursorStoreOptions) {}
56+
57+
async get(key: string): Promise<number | undefined> {
58+
try {
59+
const raw = await this.#getWithDeadline(this.#key(key));
60+
if (raw === TIMED_OUT) {
61+
this.options.onResult?.("get", false);
62+
logger.warn("[replayCursorStore] replay-cursor read timed out", { key });
63+
return undefined;
64+
}
65+
this.options.onResult?.("get", true);
66+
if (raw === null) {
67+
return undefined;
68+
}
69+
const ms = Number(raw);
70+
return Number.isFinite(ms) && ms > 0 ? ms : undefined;
71+
} catch (error) {
72+
this.options.onResult?.("get", false);
73+
logger.error("[replayCursorStore] failed to read a replay cursor", { error, key });
74+
return undefined;
75+
}
76+
}
77+
78+
/** GET raced against the read deadline (ioredis queues commands while disconnected, which
79+
* would otherwise stall every poll start through an outage). */
80+
#getWithDeadline(key: string): Promise<string | null | typeof TIMED_OUT> {
81+
return new Promise((resolve, reject) => {
82+
const timer = setTimeout(
83+
() => resolve(TIMED_OUT),
84+
this.options.getTimeoutMs ?? DEFAULT_GET_TIMEOUT_MS
85+
);
86+
timer.unref?.();
87+
this.#ensureClient()
88+
.get(key)
89+
.then(
90+
(value) => {
91+
clearTimeout(timer);
92+
resolve(value);
93+
},
94+
(error) => {
95+
clearTimeout(timer);
96+
reject(error);
97+
}
98+
);
99+
});
100+
}
101+
102+
set(key: string, ms: number): void {
103+
try {
104+
this.#ensureClient()
105+
.set(this.#key(key), String(ms), "PX", this.options.ttlMs)
106+
.then(
107+
() => this.options.onResult?.("set", true),
108+
(error) => {
109+
this.options.onResult?.("set", false);
110+
logger.error("[replayCursorStore] failed to write a replay cursor", { error, key });
111+
}
112+
);
113+
} catch (error) {
114+
this.options.onResult?.("set", false);
115+
logger.error("[replayCursorStore] failed to write a replay cursor", { error, key });
116+
}
117+
}
118+
119+
async quit(): Promise<void> {
120+
const client = this.#client;
121+
this.#client = undefined;
122+
if (!client) return;
123+
try {
124+
// Bounded graceful QUIT; cursor writes are best-effort, so force-close beyond it.
125+
await Promise.race([client.quit(), new Promise((resolve) => setTimeout(resolve, 500))]);
126+
} catch {
127+
// force-close below
128+
}
129+
client.disconnect();
130+
}
131+
132+
#key(key: string): string {
133+
return `${this.options.keyPrefix ?? DEFAULT_KEY_PREFIX}${key}`;
134+
}
135+
136+
#ensureClient(): RedisClient {
137+
if (!this.#client) {
138+
this.#client = createRedisClient(
139+
this.options.connectionName ?? "trigger:realtime:replay-cursors",
140+
this.options.redis
141+
);
142+
}
143+
return this.#client;
144+
}
145+
}

0 commit comments

Comments
 (0)