1- import { Cluster , ClusterOptions } from "ioredis " ;
1+ import { Redis } from "@upstash/redis " ;
22import { logger } from "./logger" ;
33import { db , folders } from "../db" ;
44import { eq } from "drizzle-orm" ;
55import * as Sentry from "@sentry/node" ;
66
7- let client : Cluster | null = null ;
7+ let client : Redis | null = null ;
88
9- export function getCacheClient ( ) : Cluster | null {
10- if ( ! process . env . VALKEY_HOST ) {
11- logger . warn ( "VALKEY_HOST not configured, caching disabled" ) ;
9+ export function getCacheClient ( ) : Redis | null {
10+ if ( ! process . env . UPSTASH_REDIS_REST_URL || ! process . env . UPSTASH_REDIS_REST_TOKEN ) {
11+ logger . warn ( "Upstash Redis not configured, caching disabled" ) ;
1212 return null ;
1313 }
1414
1515 if ( ! client ) {
1616 try {
17- const clusterOptions : ClusterOptions = {
18- dnsLookup : ( address , callback ) => callback ( null , address ) ,
19- redisOptions : {
20- tls : process . env . NODE_ENV === "production" ? { } : undefined ,
21- connectTimeout : 5000 ,
22- } ,
23- clusterRetryStrategy : ( times ) => {
24- if ( times > 3 ) {
25- logger . error ( "Valkey connection failed after 3 retries" ) ;
26- return null ;
27- }
28- return Math . min ( times * 200 , 2000 ) ;
29- } ,
30- } ;
31-
32- client = new Cluster (
33- [
34- {
35- host : process . env . VALKEY_HOST ,
36- port : parseInt ( process . env . VALKEY_PORT || "6379" ) ,
37- } ,
38- ] ,
39- clusterOptions
40- ) ;
41-
42- client . on ( "error" , ( err ) => {
43- logger . error ( "Valkey client error" , { error : err . message } , err ) ;
17+ client = new Redis ( {
18+ url : process . env . UPSTASH_REDIS_REST_URL ,
19+ token : process . env . UPSTASH_REDIS_REST_TOKEN ,
4420 } ) ;
4521
46- client . on ( "connect" , ( ) => {
47- logger . info ( "Connected to Valkey cluster" ) ;
48- } ) ;
22+ logger . info ( "Connected to Upstash Redis" ) ;
4923 } catch ( error ) {
5024 logger . error (
51- "Failed to initialize Valkey client" ,
25+ "Failed to initialize Upstash Redis client" ,
5226 {
5327 error : error instanceof Error ? error . message : String ( error ) ,
5428 } ,
@@ -63,9 +37,9 @@ export function getCacheClient(): Cluster | null {
6337
6438export async function closeCache ( ) : Promise < void > {
6539 if ( client ) {
66- await client . disconnect ( ) ;
40+ // Upstash REST client doesn't need explicit disconnection
6741 client = null ;
68- logger . info ( "Valkey connection closed" ) ;
42+ logger . info ( "Upstash Redis connection closed" ) ;
6943 }
7044}
7145
@@ -85,20 +59,20 @@ export async function getCache<T>(key: string): Promise<T | null> {
8559 async ( span ) => {
8660 const startTime = Date . now ( ) ;
8761 try {
88- const data = await cache . get ( key ) ;
62+ const data = await cache . get < string > ( key ) ;
8963 const duration = Date . now ( ) - startTime ;
9064 const hit = data !== null ;
9165
9266 // Set Sentry span attributes
9367 span . setAttribute ( "cache.hit" , hit ) ;
9468 if ( data ) {
95- span . setAttribute ( "cache.item_size" , data . length ) ;
69+ span . setAttribute ( "cache.item_size" , JSON . stringify ( data ) . length ) ;
9670 }
9771
9872 // Log cache operation with metrics
9973 logger . cacheOperation ( "get" , key , hit , duration ) ;
10074
101- return data ? JSON . parse ( data ) : null ;
75+ return data ? ( JSON . parse ( data ) as T ) : null ;
10276 } catch ( error ) {
10377 span . setStatus ( { code : 2 , message : "error" } ) ; // SPAN_STATUS_ERROR
10478 logger . cacheError ( "get" , key , error instanceof Error ? error : new Error ( String ( error ) ) ) ;
@@ -164,16 +138,12 @@ export async function deleteCache(...keys: string[]): Promise<void> {
164138 async ( span ) => {
165139 const startTime = Date . now ( ) ;
166140 try {
167- // In cluster mode, keys may hash to different slots
168- // Use pipeline to delete individually (more efficient than separate awaits)
141+ // Delete keys individually (Upstash REST API)
169142 if ( keys . length === 1 ) {
170143 await cache . del ( keys [ 0 ] ) ;
171144 } else {
172- const pipeline = cache . pipeline ( ) ;
173- for ( const key of keys ) {
174- pipeline . del ( key ) ;
175- }
176- await pipeline . exec ( ) ;
145+ // Use Promise.all for parallel deletion
146+ await Promise . all ( keys . map ( ( key ) => cache . del ( key ) ) ) ;
177147 }
178148 const duration = Date . now ( ) - startTime ;
179149
@@ -197,36 +167,31 @@ export async function deleteCachePattern(pattern: string): Promise<void> {
197167
198168 try {
199169 const keys : string [ ] = [ ] ;
170+ let cursor = "0" ;
171+
172+ // Use SCAN to find keys matching pattern
173+ do {
174+ // Upstash REST API supports SCAN
175+ const result = await cache . scan ( Number ( cursor ) , {
176+ match : pattern ,
177+ count : 100 ,
178+ } ) ;
200179
201- // In cluster mode, we need to scan all master nodes
202- const nodes = cache . nodes ( "master" ) ;
203-
204- for ( const node of nodes ) {
205- let cursor = "0" ;
206- do {
207- // Scan each master node individually
208- const result = await node . scan ( cursor , "MATCH" , pattern , "COUNT" , 100 ) ;
209- cursor = result [ 0 ] ;
210- keys . push ( ...result [ 1 ] ) ;
211- } while ( cursor !== "0" ) ;
212- }
180+ cursor = String ( result [ 0 ] ) ;
181+ keys . push ( ...result [ 1 ] ) ;
182+ } while ( cursor !== "0" ) ;
213183
214184 if ( keys . length > 0 ) {
215- // Delete in batches using pipeline (cluster mode compatible)
185+ // Delete in batches of 100
216186 const batchSize = 100 ;
217187 for ( let i = 0 ; i < keys . length ; i += batchSize ) {
218188 const batch = keys . slice ( i , i + batchSize ) ;
219- const pipeline = cache . pipeline ( ) ;
220- for ( const key of batch ) {
221- pipeline . del ( key ) ;
222- }
223- await pipeline . exec ( ) ;
189+ await Promise . all ( batch . map ( ( key ) => cache . del ( key ) ) ) ;
224190 }
225191
226192 logger . info ( `Deleted cache keys matching pattern` , {
227193 pattern,
228194 keyCount : keys . length ,
229- nodeCount : nodes . length ,
230195 } ) ;
231196 }
232197 } catch ( error ) {
@@ -295,7 +260,7 @@ export async function invalidateNoteCounts(userId: string, folderId: string | nu
295260 }
296261 }
297262
298- // Delete all cache keys using pipeline for cluster compatibility
263+ // Delete all cache keys
299264 if ( cacheKeys . length > 0 ) {
300265 await deleteCache ( ...cacheKeys ) ;
301266 logger . debug ( "Invalidated note counts cache" , {
0 commit comments