Skip to content

Commit 3fc6d4d

Browse files
committed
fix(webapp): dual-read the legacy waitpoint cache key during rollout
Drain both the environment-scoped session-stream waitpoint key and the previous unscoped key, so a waitpoint registered by the prior deploy still wakes its run across the deploy boundary. The legacy read can be dropped a release later once no pre-deploy waitpoints remain.
1 parent 03c6290 commit 3fc6d4d

1 file changed

Lines changed: 19 additions & 5 deletions

File tree

apps/webapp/app/services/sessionStreamWaitpointCache.server.ts

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ function buildKey(environmentId: string, addressingKey: string, io: "out" | "in"
1919
return `${KEY_PREFIX}${environmentId}:${addressingKey}:${io}`;
2020
}
2121

22+
// Pre-env-scoping key format, drained for one release so waitpoints from the
23+
// previous deploy still wake. Removable once this has been live > turn timeout.
24+
function buildLegacyKey(addressingKey: string, io: "out" | "in"): string {
25+
return `${KEY_PREFIX}${addressingKey}:${io}`;
26+
}
27+
2228
function initializeRedis(): Redis | undefined {
2329
const host = env.CACHE_REDIS_HOST;
2430
if (!host) {
@@ -112,16 +118,24 @@ export async function drainSessionStreamWaitpoints(
112118

113119
try {
114120
const key = buildKey(environmentId, addressingKey, io);
121+
const legacyKey = buildLegacyKey(addressingKey, io);
115122
const pipeline = redis.multi();
116123
pipeline.smembers(key);
117124
pipeline.del(key);
125+
pipeline.smembers(legacyKey);
126+
pipeline.del(legacyKey);
118127
const results = await pipeline.exec();
119128
if (!results) return [];
120-
const [smembersResult] = results;
121-
if (!smembersResult) return [];
122-
const [err, members] = smembersResult;
123-
if (err) return [];
124-
return Array.isArray(members) ? (members as string[]) : [];
129+
// Union members from the env-scoped key and the legacy key (dual-read).
130+
const ids = new Set<string>();
131+
for (const idx of [0, 2]) {
132+
const entry = results[idx];
133+
if (!entry) continue;
134+
const [err, members] = entry;
135+
if (err || !Array.isArray(members)) continue;
136+
for (const m of members as string[]) ids.add(m);
137+
}
138+
return [...ids];
125139
} catch (error) {
126140
logger.error("Failed to drain session stream waitpoint cache", {
127141
environmentId,

0 commit comments

Comments
 (0)