Skip to content

Commit 4593ef8

Browse files
committed
fix(webapp): make session append idempotency atomic
Claim the part id with SET NX before appending instead of a read then write, so two concurrent or retried POSTs with the same X-Part-Id can never both write a record. The claim is released when the append fails so a genuine retry still proceeds.
1 parent 22680b7 commit 4593ef8

2 files changed

Lines changed: 56 additions & 49 deletions

File tree

apps/webapp/app/routes/realtime.v1.sessions.$session.$io.append.ts

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ import {
1212
} from "~/services/realtime/sessions.server";
1313
import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server";
1414
import {
15+
claimSessionStreamPart,
1516
drainSessionStreamWaitpoints,
16-
markSessionStreamPartAppended,
17-
wasSessionStreamPartAppended,
17+
releaseSessionStreamPart,
1818
} from "~/services/sessionStreamWaitpointCache.server";
1919
import {
2020
anyResource,
@@ -150,25 +150,35 @@ const { action, loader } = createActionApiRoute(
150150
const clientPartId = request.headers.get("X-Part-Id");
151151
const partId = clientPartId ?? nanoid(7);
152152

153-
// Idempotency on client-supplied part ids: a retried POST whose first
154-
// attempt committed skips the second append (which would duplicate the
155-
// record), but still falls through to the drain below — so a retry whose
156-
// first attempt died before waking the waitpoint can still recover it.
157-
const alreadyAppended =
158-
!!clientPartId &&
159-
(await wasSessionStreamPartAppended(
160-
authentication.environment.id,
161-
addressingKey,
162-
params.io,
163-
clientPartId
164-
));
153+
// Idempotency on client-supplied part ids: atomically claim the id before
154+
// appending. A concurrent or retried POST that loses the claim skips the
155+
// append (no duplicate record) but still falls through to the drain below,
156+
// so a retry whose first attempt died before waking the waitpoint can still
157+
// recover it. The claim is released on append failure so a genuine retry
158+
// can re-claim and proceed.
159+
const wonClaim = clientPartId
160+
? await claimSessionStreamPart(
161+
authentication.environment.id,
162+
addressingKey,
163+
params.io,
164+
clientPartId
165+
)
166+
: true;
165167

166-
if (!alreadyAppended) {
168+
if (wonClaim) {
167169
const [appendError] = await tryCatch(
168170
realtimeStream.appendPartToSessionStream(part, partId, addressingKey, params.io)
169171
);
170172

171173
if (appendError) {
174+
if (clientPartId) {
175+
await releaseSessionStreamPart(
176+
authentication.environment.id,
177+
addressingKey,
178+
params.io,
179+
clientPartId
180+
);
181+
}
172182
if (appendError instanceof ServiceValidationError) {
173183
return json(
174184
{ ok: false, error: appendError.message },
@@ -185,15 +195,6 @@ const { action, loader } = createActionApiRoute(
185195
{ status: 500 }
186196
);
187197
}
188-
189-
if (clientPartId) {
190-
await markSessionStreamPartAppended(
191-
authentication.environment.id,
192-
addressingKey,
193-
params.io,
194-
clientPartId
195-
);
196-
}
197198
}
198199

199200
// Fire any run-scoped waitpoints registered against this channel. Best

apps/webapp/app/services/sessionStreamWaitpointCache.server.ts

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -151,13 +151,14 @@ export async function drainSessionStreamWaitpoints(
151151
* Remove a single waitpoint from the pending set. Called after a race
152152
* where `.wait()` completed the waitpoint from pre-arrived data.
153153
*/
154-
// "ssa" — session-stream-append. Best-effort idempotency marker for the
155-
// append route: when a caller supplies an `X-Part-Id`, a retried POST
156-
// whose first attempt actually committed is skipped instead of producing
157-
// a duplicate record (and double-firing the waitpoint drain). The marker
158-
// is only written AFTER a successful S2 append, so a retry of a genuinely
159-
// failed append still goes through. 5-minute window — this covers HTTP
160-
// retry storms, not a permanent idempotency store.
154+
// "ssa" — session-stream-append. Idempotency claim for the append route:
155+
// when a caller supplies an `X-Part-Id`, the first request atomically claims
156+
// the key (SET NX) before appending; a concurrent or retried POST with the
157+
// same id fails the claim and skips the append, so it never produces a
158+
// duplicate record (or double-fires the waitpoint drain). The claim is
159+
// released if the append fails, so a retry of a genuinely failed append
160+
// still goes through. 5-minute window — covers retry storms, not a
161+
// permanent idempotency store.
161162
const APPEND_DEDUPE_PREFIX = "ssa:";
162163
const APPEND_DEDUPE_TTL_SECONDS = 5 * 60;
163164

@@ -176,35 +177,45 @@ function buildAppendDedupeKey(
176177
}
177178

178179
/**
179-
* True if a part with this id was already successfully appended to the
180-
* channel within the dedupe window. Fails open (returns false) when Redis
181-
* is unavailable — appends degrade to at-least-once, never to dropped.
180+
* Atomically claim a part id before appending. Returns true if this caller
181+
* won the claim (first to see this id) and should perform the append, false
182+
* if the id was already claimed (a concurrent or retried POST) and the append
183+
* should be skipped. Fails open (returns true) when Redis is unavailable —
184+
* appends degrade to at-least-once, never to dropped.
182185
*/
183-
export async function wasSessionStreamPartAppended(
186+
export async function claimSessionStreamPart(
184187
environmentId: string,
185188
addressingKey: string,
186189
io: "out" | "in",
187190
partId: string
188191
): Promise<boolean> {
189-
if (!redis) return false;
192+
if (!redis) return true;
190193

191194
try {
192-
const value = await redis.get(buildAppendDedupeKey(environmentId, addressingKey, io, partId));
193-
return value !== null;
195+
// SET NX is the atomic claim: "OK" when set (we won), null when the key
196+
// already exists (someone else owns this id).
197+
const result = await redis.set(
198+
buildAppendDedupeKey(environmentId, addressingKey, io, partId),
199+
"1",
200+
"EX",
201+
APPEND_DEDUPE_TTL_SECONDS,
202+
"NX"
203+
);
204+
return result === "OK";
194205
} catch (error) {
195-
logger.error("Failed to read session stream append dedupe marker", {
206+
logger.error("Failed to claim session stream append part", {
196207
environmentId,
197208
addressingKey,
198209
io,
199210
partId,
200211
error,
201212
});
202-
return false;
213+
return true;
203214
}
204215
}
205216

206-
/** Record a successful append so a retried POST with the same part id is skipped. */
207-
export async function markSessionStreamPartAppended(
217+
/** Release a claim so a retry can proceed — called when the append itself failed. */
218+
export async function releaseSessionStreamPart(
208219
environmentId: string,
209220
addressingKey: string,
210221
io: "out" | "in",
@@ -213,14 +224,9 @@ export async function markSessionStreamPartAppended(
213224
if (!redis) return;
214225

215226
try {
216-
await redis.set(
217-
buildAppendDedupeKey(environmentId, addressingKey, io, partId),
218-
"1",
219-
"EX",
220-
APPEND_DEDUPE_TTL_SECONDS
221-
);
227+
await redis.del(buildAppendDedupeKey(environmentId, addressingKey, io, partId));
222228
} catch (error) {
223-
logger.error("Failed to write session stream append dedupe marker", {
229+
logger.error("Failed to release session stream append part", {
224230
environmentId,
225231
addressingKey,
226232
io,

0 commit comments

Comments
 (0)