Skip to content

Commit 092b14e

Browse files
authored
Merge pull request #40 from onkernel/mason/redis-failure-handling
fix(redis): add reconnect logic for transient errors
2 parents 2bcf6cc + 84e212b commit 092b14e

1 file changed

Lines changed: 64 additions & 14 deletions

File tree

src/lib/redis.ts

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,49 @@
11
import { createClient } from "redis";
22
import { createHmac } from "crypto";
33

4+
// Connect on first use
5+
let isConnected = false;
6+
let connectPromise: Promise<void> | null = null;
7+
48
const client = createClient({
59
url: process.env.REDIS_URL,
10+
socket: {
11+
// Modest backoff to smooth over first-hit cold connections
12+
reconnectStrategy: (retries) => Math.min(500 + retries * 100, 2000),
13+
},
614
});
715

816
client.on("error", (err) => {
17+
// Reset connection state so the next command will re-connect
18+
isConnected = false;
919
console.error("Redis Client Error", err);
1020
});
11-
12-
// Connect on first use
13-
let isConnected = false;
21+
client.on("end", () => {
22+
isConnected = false;
23+
});
24+
client.on("ready", () => {
25+
isConnected = true;
26+
});
1427

1528
async function ensureConnected(): Promise<void> {
16-
if (!isConnected) {
17-
await client.connect();
18-
isConnected = true;
19-
}
29+
// Prefer the client's readiness state when available
30+
// @ts-ignore node-redis exposes isReady at runtime
31+
if ((client as any).isReady) return;
32+
if (client.isOpen && isConnected) return;
33+
if (connectPromise) return await connectPromise;
34+
connectPromise = client
35+
.connect()
36+
.then(() => {
37+
// 'ready' event will flip isConnected when the client can process commands
38+
})
39+
.catch((err) => {
40+
isConnected = false;
41+
throw err;
42+
})
43+
.finally(() => {
44+
connectPromise = null;
45+
});
46+
return await connectPromise;
2047
}
2148

2249
// Hash JWT using HMAC-SHA256 with CLERK_SECRET_KEY for secure Redis storage
@@ -50,7 +77,7 @@ export async function setOrgIdForClientId({
5077
}): Promise<void> {
5178
await ensureConnected();
5279
const key = `client:${clientId}`;
53-
await client.setEx(key, ttlSeconds, orgId);
80+
await withReconnect(() => client.setEx(key, ttlSeconds, orgId));
5481
}
5582

5683
export async function getOrgIdForClientId({
@@ -60,7 +87,7 @@ export async function getOrgIdForClientId({
6087
}): Promise<string | null> {
6188
await ensureConnected();
6289
const key = `client:${clientId}`;
63-
return await client.get(key);
90+
return await withReconnect(() => client.get(key));
6491
}
6592

6693
export async function setOrgIdForJwt({
@@ -75,7 +102,7 @@ export async function setOrgIdForJwt({
75102
await ensureConnected();
76103
const hashedJwt = hashJwt(jwt);
77104
const key = `jwt:${hashedJwt}`;
78-
await client.setEx(key, ttlSeconds, orgId);
105+
await withReconnect(() => client.setEx(key, ttlSeconds, orgId));
79106
}
80107

81108
export { client as redisClient };
@@ -92,7 +119,7 @@ export async function setOrgIdForRefreshToken({
92119
await ensureConnected();
93120
const hashed = hashOpaqueToken(refreshToken);
94121
const key = `refresh:${hashed}`;
95-
await client.setEx(key, ttlSeconds, orgId);
122+
await withReconnect(() => client.setEx(key, ttlSeconds, orgId));
96123
}
97124

98125
export async function getOrgIdForRefreshTokenSliding({
@@ -105,10 +132,10 @@ export async function getOrgIdForRefreshTokenSliding({
105132
await ensureConnected();
106133
const hashed = hashOpaqueToken(refreshToken);
107134
const key = `refresh:${hashed}`;
108-
const orgId = await client.get(key);
135+
const orgId = await withReconnect(() => client.get(key));
109136
if (orgId) {
110137
// Refresh TTL to implement sliding expiration on active tokens
111-
await client.expire(key, ttlSeconds);
138+
await withReconnect(() => client.expire(key, ttlSeconds));
112139
}
113140
return orgId;
114141
}
@@ -121,5 +148,28 @@ export async function deleteOrgIdForRefreshToken({
121148
await ensureConnected();
122149
const hashed = hashOpaqueToken(refreshToken);
123150
const key = `refresh:${hashed}`;
124-
await client.del(key);
151+
await withReconnect(() => client.del(key));
152+
}
153+
154+
function isTransientSocketError(error: unknown): boolean {
155+
const message = String((error as any)?.message ?? error ?? "");
156+
return (
157+
message.includes("Socket closed") ||
158+
message.includes("ECONNRESET") ||
159+
message.includes("EPIPE") ||
160+
message.includes("ENETUNREACH")
161+
);
162+
}
163+
164+
async function withReconnect<T>(operation: () => Promise<T>): Promise<T> {
165+
try {
166+
return await operation();
167+
} catch (err) {
168+
if (isTransientSocketError(err)) {
169+
isConnected = false;
170+
await ensureConnected();
171+
return await operation();
172+
}
173+
throw err;
174+
}
125175
}

0 commit comments

Comments
 (0)