Skip to content

Commit e1653d9

Browse files
committed
feat: implement caching for user admission check
1 parent 842b78e commit e1653d9

10 files changed

Lines changed: 573 additions & 44 deletions

File tree

package-lock.json

Lines changed: 3 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@
9595
"@types/express": "4.17.21",
9696
"@types/js-yaml": "4.0.5",
9797
"@types/mocha": "^9.1.1",
98-
"@types/node": "^24.0.0",
98+
"@types/node": "^24.12.2",
9999
"@types/pg": "^8.6.5",
100100
"@types/ramda": "^0.28.13",
101101
"@types/sinon": "^10.0.11",

src/@types/adapters.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ export type IWebSocketAdapter = EventEmitter & {
2121
export interface ICacheAdapter {
2222
getKey(key: string): Promise<string>
2323
hasKey(key: string): Promise<boolean>
24-
setKey(key: string, value: string): Promise<boolean>
24+
setKey(key: string, value: string, expirySeconds?: number): Promise<boolean>
2525
addToSortedSet(key: string, set: Record<string, string> | Record<string, string>[]): Promise<number>
2626
removeRangeByScoreFromSortedSet(key: string, min: number, max: number): Promise<number>
2727
getRangeFromSortedSet(key: string, start: number, stop: number): Promise<string[]>

src/adapters/redis-adapter.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ export class RedisAdapter implements ICacheAdapter {
4242
}
4343

4444
private onClientError(error: Error) {
45-
console.error('Unable to connect to Redis.', error)
45+
debug('Unable to connect to Redis.', error)
4646
// throw error
4747
}
4848

@@ -58,9 +58,12 @@ export class RedisAdapter implements ICacheAdapter {
5858
return this.client.get(key)
5959
}
6060

61-
public async setKey(key: string, value: string): Promise<boolean> {
61+
public async setKey(key: string, value: string, expirySeconds?: number): Promise<boolean> {
6262
await this.connection
63-
debug('get %s key', key)
63+
debug('set %s key', key)
64+
if (typeof expirySeconds === 'number') {
65+
return 'OK' === await this.client.set(key, value, { EX: expirySeconds })
66+
}
6467
return 'OK' === await this.client.set(key, value)
6568
}
6669

src/constants/caching.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export enum CacheAdmissionState {
2+
ADMITTED = 'admitted',
3+
BLOCKED_NOT_ADMITTED = 'blocked_not_admitted',
4+
BLOCKED_INSUFFICIENT_BALANCE = 'blocked_insufficient_balance',
5+
}

src/factories/message-handler-factory.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
1+
import { ICacheAdapter, IWebSocketAdapter } from '../@types/adapters'
12
import { IEventRepository, INip05VerificationRepository, IUserRepository } from '../@types/repositories'
23
import { IncomingMessage, MessageType } from '../@types/messages'
34
import { createSettings } from './settings-factory'
45
import { EventMessageHandler } from '../handlers/event-message-handler'
56
import { eventStrategyFactory } from './event-strategy-factory'
6-
import { IWebSocketAdapter } from '../@types/adapters'
7+
import { getCacheClient } from '../cache/client'
8+
import { RedisAdapter } from '../adapters/redis-adapter'
79
import { slidingWindowRateLimiterFactory } from './rate-limiter-factory'
810
import { SubscribeMessageHandler } from '../handlers/subscribe-message-handler'
911
import { UnsubscribeMessageHandler } from '../handlers/unsubscribe-message-handler'
1012

13+
let cacheAdapter: ICacheAdapter | undefined = undefined
14+
const getCache = (): ICacheAdapter => {
15+
if (!cacheAdapter) {
16+
cacheAdapter = new RedisAdapter(getCacheClient())
17+
}
18+
return cacheAdapter
19+
}
20+
1121
export const messageHandlerFactory = (
1222
eventRepository: IEventRepository,
1323
userRepository: IUserRepository,
@@ -24,6 +34,7 @@ export const messageHandlerFactory = (
2434
createSettings,
2535
slidingWindowRateLimiterFactory,
2636
nip05VerificationRepository,
37+
getCache(),
2738
)
2839
}
2940
case MessageType.REQ:

src/handlers/event-message-handler.ts

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ import {
2626
} from '../utils/event'
2727
import { IEventRepository, INip05VerificationRepository, IUserRepository } from '../@types/repositories'
2828
import { IEventStrategy, IMessageHandler } from '../@types/message-handlers'
29+
import { CacheAdmissionState } from '../constants/caching'
2930
import { createCommandResult } from '../utils/messages'
3031
import { createLogger } from '../factories/logger-factory'
3132
import { Factory } from '../@types/base'
33+
import { ICacheAdapter } from '../@types/adapters'
3234
import { IncomingEventMessage } from '../@types/messages'
3335
import { IRateLimiter } from '../@types/utils'
3436
import { IWebSocketAdapter } from '../@types/adapters'
@@ -46,6 +48,7 @@ export class EventMessageHandler implements IMessageHandler {
4648
private readonly settings: () => Settings,
4749
private readonly slidingWindowRateLimiter: Factory<IRateLimiter>,
4850
private readonly nip05VerificationRepository: INip05VerificationRepository,
51+
private readonly cache: ICacheAdapter,
4952
) {}
5053

5154
public async handleMessage(message: IncomingEventMessage): Promise<void> {
@@ -112,8 +115,7 @@ export class EventMessageHandler implements IMessageHandler {
112115
try {
113116
await strategy.execute(event)
114117
this.processNip05Metadata(event)
115-
} catch (error) {
116-
console.error('error handling message', message, error)
118+
} catch (_error) {
117119
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: unable to process event'))
118120
}
119121
}
@@ -341,17 +343,44 @@ export class EventMessageHandler implements IMessageHandler {
341343
return
342344
}
343345

344-
// const hasKey = await this.cache.hasKey(`${event.pubkey}:is-admitted`)
345-
// TODO: use cache
346+
const cacheKey = `${event.pubkey}:is-admitted`
347+
348+
try {
349+
const cachedValue = await this.cache.getKey(cacheKey)
350+
if (cachedValue === CacheAdmissionState.ADMITTED) {
351+
debug('cache hit for %s admission: admitted', event.pubkey)
352+
return
353+
}
354+
if (cachedValue === CacheAdmissionState.BLOCKED_NOT_ADMITTED) {
355+
debug('cache hit for %s admission: blocked', event.pubkey)
356+
return 'blocked: pubkey not admitted'
357+
}
358+
if (cachedValue === CacheAdmissionState.BLOCKED_INSUFFICIENT_BALANCE) {
359+
debug('cache hit for %s admission: insufficient balance', event.pubkey)
360+
return 'blocked: insufficient balance'
361+
}
362+
} catch (error) {
363+
debug('cache error for %s: %o', event.pubkey, error)
364+
}
365+
346366
const user = await this.userRepository.findByPubkey(event.pubkey)
347367
if (!user || !user.isAdmitted) {
368+
this.cacheSet(cacheKey, CacheAdmissionState.BLOCKED_NOT_ADMITTED, 60)
348369
return 'blocked: pubkey not admitted'
349370
}
350371

351372
const minBalance = currentSettings.limits?.event?.pubkey?.minBalance ?? 0n
352373
if (minBalance > 0n && user.balance < minBalance) {
374+
this.cacheSet(cacheKey, CacheAdmissionState.BLOCKED_INSUFFICIENT_BALANCE, 60)
353375
return 'blocked: insufficient balance'
354376
}
377+
378+
this.cacheSet(cacheKey, CacheAdmissionState.ADMITTED, 300)
379+
}
380+
381+
private cacheSet(key: string, value: string, ttl: number): void {
382+
this.cache.setKey(key, value, ttl)
383+
.catch((error) => debug('unable to cache %s: %o', key, error))
355384
}
356385

357386
protected addExpirationMetadata(event: Event): Event | ExpiringEvent {

0 commit comments

Comments
 (0)