Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
"@types/js-cookie": "3.0.6",
"@types/js-yaml": "4.0.9",
"@types/mdx": "2.0.13",
"@upstash/redis": "^1.38.0",
"@vercel/functions": "3.4.6",
"@vercel/otel": "2.1.2",
"@workos-inc/node": "catalog:",
Expand Down
6 changes: 5 additions & 1 deletion apps/web/src/lib/ai-gateway/experiments/pick-variant.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ const upstreamB = {
internal_id: 'partner-checkpoint-b',
base_url: 'https://partner.example.com/v1',
};
const redisIt = process.env.REDIS_URL ? it : it.skip;
const redisIt =
(process.env.UPSTASH_REDIS_REST_URL && process.env.UPSTASH_REDIS_REST_TOKEN) ||
(process.env.KV_REST_API_URL && process.env.KV_REST_API_TOKEN)
? it
: it.skip;

beforeEach(async () => {
await cleanupDbForTest();
Expand Down
146 changes: 69 additions & 77 deletions apps/web/src/lib/redis.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,23 @@
import { createClient } from 'redis';
import { Redis } from '@upstash/redis';
import { captureException } from '@sentry/nextjs';
import type { RedisKey } from '@/lib/redis-keys';

type RedisClient = ReturnType<typeof createClient>;
type RedisOperation = 'get' | 'getdel' | 'set' | 'del';
type RedisTimeoutPhase = 'connect' | 'command';
type RedisConfig = {
url: string;
token: string;
source: 'upstash' | 'vercel-kv';
};

// TCP handshake + TLS negotiation can take a moment on a cold connection.
// Redis official docs recommend 1-3s for connect (redis.io/docs/latest/develop/clients).
const CONNECT_TIMEOUT_MS = 1_500;

// Simple GET/SET commands complete in sub-millisecond; anything over 200ms
// means Redis is overloaded or unreachable and we should fail open.
// Simple GET/SET commands should still be fast over REST; anything over 200ms
// means Redis is overloaded or unreachable and callers should fail open.
const COMMAND_TIMEOUT_MS = 200;

let client: RedisClient | null = null;
let connectPromise: Promise<unknown> | null = null;
let client: Redis | null = null;

class RedisTimeoutError extends Error {
constructor(
readonly redisTimeoutPhase: RedisTimeoutPhase,
readonly redisTimeoutMs: number
) {
super(`Redis timeout (${redisTimeoutPhase})`);
constructor(readonly redisTimeoutMs: number) {
super('Redis timeout (command)');
this.name = 'RedisTimeoutError';
}
}
Expand All @@ -31,127 +26,124 @@ function captureRedisOperationException(
err: unknown,
operation: RedisOperation,
key: RedisKey,
c: RedisClient
config: RedisConfig
) {
const timeoutPhase = err instanceof RedisTimeoutError ? err.redisTimeoutPhase : undefined;
const timeoutPhase = err instanceof RedisTimeoutError ? 'command' : undefined;
captureException(err, {
tags: {
service: 'redis',
redis_transport: 'rest',
redis_config_source: config.source,
operation,
...(timeoutPhase ? { redis_timeout_phase: timeoutPhase } : {}),
},
extra: {
key,
client_is_open: c.isOpen,
client_is_ready: c.isReady,
redis_timeout_ms: err instanceof RedisTimeoutError ? err.redisTimeoutMs : undefined,
},
});
}

function getOrCreateClient(): RedisClient | null {
if (!process.env.REDIS_URL) {
return null;
function getRedisConfig(): RedisConfig | null {
if (process.env.UPSTASH_REDIS_REST_URL && process.env.UPSTASH_REDIS_REST_TOKEN) {
return {
url: process.env.UPSTASH_REDIS_REST_URL,
token: process.env.UPSTASH_REDIS_REST_TOKEN,
source: 'upstash',
};
}
if (!client) {
client = createClient({
url: process.env.REDIS_URL,
socket: { connectTimeout: CONNECT_TIMEOUT_MS },
});
client.on('error', err => {
captureException(err, { tags: { service: 'redis' } });
});

if (process.env.KV_REST_API_URL && process.env.KV_REST_API_TOKEN) {
return {
url: process.env.KV_REST_API_URL,
token: process.env.KV_REST_API_TOKEN,
source: 'vercel-kv',
};
}
return client;

return null;
}

async function ensureConnected(c: RedisClient): Promise<RedisClient> {
if (c.isReady) return c;
if (!connectPromise) {
connectPromise = c
.connect()
.catch(err => {
captureException(err, { tags: { service: 'redis', operation: 'connect' } });
throw err;
})
.finally(() => {
connectPromise = null;
});
function getOrCreateClient(): { client: Redis; config: RedisConfig } | null {
const config = getRedisConfig();
if (!config) {
return null;
}
if (!client) {
client = new Redis({
url: config.url,
token: config.token,
automaticDeserialization: false,
retry: false,
});
}
await connectPromise;
return c;
return { client, config };
}

function withTimeout<T>(
promise: Promise<T>,
ms: number,
redisTimeoutPhase: RedisTimeoutPhase
): Promise<T> {
let timer: ReturnType<typeof setTimeout>;
function withTimeout<T>(promise: Promise<T>, ms: number): Promise<T> {
let timer: ReturnType<typeof setTimeout> | undefined;
return Promise.race([
promise.finally(() => clearTimeout(timer)),
promise.finally(() => {
if (timer) clearTimeout(timer);
}),
new Promise<never>((_, reject) => {
timer = setTimeout(() => reject(new RedisTimeoutError(redisTimeoutPhase, ms)), ms);
timer = setTimeout(() => reject(new RedisTimeoutError(ms)), ms);
}),
]);
}

export async function redisGet(key: RedisKey): Promise<string | null> {
const c = getOrCreateClient();
if (!c) return null;
const redis = getOrCreateClient();
if (!redis) return null;
try {
await withTimeout(ensureConnected(c), CONNECT_TIMEOUT_MS, 'connect');
return await withTimeout(c.get(key), COMMAND_TIMEOUT_MS, 'command');
return await withTimeout(redis.client.get<string>(key), COMMAND_TIMEOUT_MS);
} catch (err) {
captureRedisOperationException(err, 'get', key, c);
captureRedisOperationException(err, 'get', key, redis.config);
throw err;
}
}

export async function redisGetDel(key: RedisKey): Promise<string | null> {
const c = getOrCreateClient();
if (!c) return null;
const redis = getOrCreateClient();
if (!redis) return null;
try {
await withTimeout(ensureConnected(c), CONNECT_TIMEOUT_MS, 'connect');
return await withTimeout(c.getDel(key), COMMAND_TIMEOUT_MS, 'command');
return await withTimeout(redis.client.getdel<string>(key), COMMAND_TIMEOUT_MS);
} catch (err) {
captureRedisOperationException(err, 'getdel', key, c);
captureRedisOperationException(err, 'getdel', key, redis.config);
throw err;
}
}

/** Returns false if Redis is not configured (REDIS_URL unset). */
/** Returns false if Redis REST env vars are not configured. */
export async function redisSet(
key: RedisKey,
value: string,
ttlSeconds?: number
): Promise<boolean> {
const c = getOrCreateClient();
if (!c) return false;
const redis = getOrCreateClient();
if (!redis) return false;
try {
await withTimeout(ensureConnected(c), CONNECT_TIMEOUT_MS, 'connect');
if (ttlSeconds) {
await withTimeout(c.set(key, value, { EX: ttlSeconds }), COMMAND_TIMEOUT_MS, 'command');
await withTimeout(redis.client.set(key, value, { ex: ttlSeconds }), COMMAND_TIMEOUT_MS);
} else {
await withTimeout(c.set(key, value), COMMAND_TIMEOUT_MS, 'command');
await withTimeout(redis.client.set(key, value), COMMAND_TIMEOUT_MS);
}
return true;
} catch (err) {
captureRedisOperationException(err, 'set', key, c);
captureRedisOperationException(err, 'set', key, redis.config);
throw err;
}
}

/** Returns false if Redis is not configured (REDIS_URL unset). */
/** Returns false if Redis REST env vars are not configured. */
export async function redisDel(key: RedisKey): Promise<boolean> {
const c = getOrCreateClient();
if (!c) return false;
const redis = getOrCreateClient();
if (!redis) return false;
try {
await withTimeout(ensureConnected(c), CONNECT_TIMEOUT_MS, 'connect');
await withTimeout(c.del(key), COMMAND_TIMEOUT_MS, 'command');
await withTimeout(redis.client.del(key), COMMAND_TIMEOUT_MS);
return true;
} catch (err) {
captureRedisOperationException(err, 'del', key, c);
captureRedisOperationException(err, 'del', key, redis.config);
throw err;
}
}
Loading