Skip to content

Commit 7240322

Browse files
committed
feat: implement caching for user admission check
Fixes #447
1 parent 12688ec commit 7240322

5 files changed

Lines changed: 94 additions & 16 deletions

File tree

src/@types/adapters.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export interface ICacheAdapter {
2222
getKey(key: string): Promise<string>
2323
hasKey(key: string): Promise<boolean>
2424
setKey(key: string, value: string): Promise<boolean>
25+
setKeyWithExpiry(key: string, value: string, ttl: number): Promise<boolean>
2526
addToSortedSet(key: string, set: Record<string, string> | Record<string, string>[]): Promise<number>
2627
removeRangeByScoreFromSortedSet(key: string, min: number, max: number): Promise<number>
2728
getRangeFromSortedSet(key: string, start: number, stop: number): Promise<string[]>

src/adapters/redis-adapter.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,16 @@ export class RedisAdapter implements ICacheAdapter {
6060

6161
public async setKey(key: string, value: string): Promise<boolean> {
6262
await this.connection
63-
debug('get %s key', key)
63+
debug('set %s key', key)
6464
return 'OK' === await this.client.set(key, value)
6565
}
6666

67+
public async setKeyWithExpiry(key: string, value: string, ttl: number): Promise<boolean> {
68+
await this.connection
69+
debug('set %s key with expiry %d', key, ttl)
70+
return 'OK' === await this.client.set(key, value, { EX: ttl })
71+
}
72+
6773
public async removeRangeByScoreFromSortedSet(key: string, min: number, max: number): Promise<number> {
6874
await this.connection
6975
debug('remove %d..%d range from sorted set %s', min, max, key)

src/factories/message-handler-factory.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
1+
import { ICacheAdapter, IWebSocketAdapter } from '../@types/adapters'
12
import { IEventRepository, IUserRepository } from '../@types/repositories'
23
import { IncomingMessage, MessageType } from '../@types/messages'
34
import { createSettings } from './settings-factory'
45
import { EventMessageHandler } from '../handlers/event-message-handler'
56
import { eventStrategyFactory } from './event-strategy-factory'
6-
import { IWebSocketAdapter } from '../@types/adapters'
7+
import { getCacheClient } from '../cache/client'
8+
import { RedisAdapter } from '../adapters/redis-adapter'
79
import { slidingWindowRateLimiterFactory } from './rate-limiter-factory'
810
import { SubscribeMessageHandler } from '../handlers/subscribe-message-handler'
911
import { UnsubscribeMessageHandler } from '../handlers/unsubscribe-message-handler'
1012

13+
let cacheAdapter: ICacheAdapter | undefined = undefined
14+
const getCache = (): ICacheAdapter => {
15+
if (!cacheAdapter) {
16+
cacheAdapter = new RedisAdapter(getCacheClient())
17+
}
18+
return cacheAdapter
19+
}
20+
1121
export const messageHandlerFactory = (
1222
eventRepository: IEventRepository,
1323
userRepository: IUserRepository,
@@ -22,6 +32,7 @@ export const messageHandlerFactory = (
2232
userRepository,
2333
createSettings,
2434
slidingWindowRateLimiterFactory,
35+
getCache(),
2536
)
2637
}
2738
case MessageType.REQ:

src/handlers/event-message-handler.ts

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,20 @@ import { IEventStrategy, IMessageHandler } from '../@types/message-handlers'
1818
import { createCommandResult } from '../utils/messages'
1919
import { createLogger } from '../factories/logger-factory'
2020
import { Factory } from '../@types/base'
21+
import { ICacheAdapter } from '../@types/adapters'
2122
import { IncomingEventMessage } from '../@types/messages'
2223
import { IRateLimiter } from '../@types/utils'
2324
import { IWebSocketAdapter } from '../@types/adapters'
2425
import { WebSocketAdapterEvent } from '../constants/adapter'
2526

2627
const debug = createLogger('event-message-handler')
2728

29+
export enum CacheAdmissionState {
30+
ADMITTED = 'admitted',
31+
NOT_ADMITTED = 'not-admitted',
32+
INSUFFICIENT_BALANCE = 'insufficient-balance',
33+
}
34+
2835
export class EventMessageHandler implements IMessageHandler {
2936
public constructor(
3037
protected readonly webSocket: IWebSocketAdapter,
@@ -33,6 +40,7 @@ export class EventMessageHandler implements IMessageHandler {
3340
protected readonly userRepository: IUserRepository,
3441
private readonly settings: () => Settings,
3542
private readonly slidingWindowRateLimiter: Factory<IRateLimiter>,
43+
private readonly cache: ICacheAdapter,
3644
) {}
3745

3846
public async handleMessage(message: IncomingEventMessage): Promise<void> {
@@ -313,17 +321,45 @@ export class EventMessageHandler implements IMessageHandler {
313321
return
314322
}
315323

316-
// const hasKey = await this.cache.hasKey(`${event.pubkey}:is-admitted`)
317-
// TODO: use cache
324+
const cacheKey = `${event.pubkey}:is-admitted`
325+
326+
try {
327+
const cachedAdmission = await this.cache.getKey(cacheKey)
328+
switch (cachedAdmission) {
329+
case CacheAdmissionState.ADMITTED:
330+
debug('cache hit for %s admission: admitted', event.pubkey)
331+
return
332+
case CacheAdmissionState.NOT_ADMITTED:
333+
debug('cache hit for %s admission: blocked', event.pubkey)
334+
return 'blocked: pubkey not admitted'
335+
case CacheAdmissionState.INSUFFICIENT_BALANCE:
336+
debug('cache hit for %s admission: blocked (insufficient balance)', event.pubkey)
337+
return 'blocked: insufficient balance'
338+
default:
339+
break
340+
}
341+
} catch (error) {
342+
debug('cache error for %s: %o', event.pubkey, error)
343+
}
344+
318345
const user = await this.userRepository.findByPubkey(event.pubkey)
319346
if (!user || !user.isAdmitted) {
347+
this.cacheSet(cacheKey, CacheAdmissionState.NOT_ADMITTED, 60)
320348
return 'blocked: pubkey not admitted'
321349
}
322350

323351
const minBalance = currentSettings.limits?.event?.pubkey?.minBalance ?? 0n
324352
if (minBalance > 0n && user.balance < minBalance) {
353+
this.cacheSet(cacheKey, CacheAdmissionState.INSUFFICIENT_BALANCE, 60)
325354
return 'blocked: insufficient balance'
326355
}
356+
357+
this.cacheSet(cacheKey, CacheAdmissionState.ADMITTED, 300)
358+
}
359+
360+
private cacheSet(key: string, value: string, ttl: number): void {
361+
this.cache.setKeyWithExpiry(key, value, ttl)
362+
.catch((error) => debug('unable to cache %s: %o', key, error))
327363
}
328364

329365
protected addExpirationMetadata(event: Event): Event | ExpiringEvent {

test/unit/handlers/event-message-handler.spec.ts

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ import sinonChai from 'sinon-chai'
88
chai.use(sinonChai)
99
chai.use(chaiAsPromised)
1010

11+
import { CacheAdmissionState, EventMessageHandler } from '../../../src/handlers/event-message-handler'
1112
import { EventLimits, Settings } from '../../../src/@types/settings'
1213
import { IncomingEventMessage, MessageType } from '../../../src/@types/messages'
1314
import { Event } from '../../../src/@types/event'
1415
import { EventKinds } from '../../../src/constants/base'
15-
import { EventMessageHandler } from '../../../src/handlers/event-message-handler'
1616
import { IUserRepository } from '../../../src/@types/repositories'
1717
import { IWebSocketAdapter } from '../../../src/@types/adapters'
1818
import { WebSocketAdapterEvent } from '../../../src/constants/adapter'
@@ -88,7 +88,8 @@ describe('EventMessageHandler', () => {
8888
() => ({
8989
info: { relay_url: 'relay_url' },
9090
}) as any,
91-
() => ({ hit: async () => false })
91+
() => ({ hit: async () => false }),
92+
{ hasKey: async () => false, setKey: async () => true } as any,
9293
)
9394
})
9495

@@ -262,7 +263,8 @@ describe('EventMessageHandler', () => {
262263
{ hasActiveRequestToVanish: async () => false } as any,
263264
userRepository,
264265
() => settings,
265-
() => ({ hit: async () => false })
266+
() => ({ hit: async () => false }),
267+
{ hasKey: async () => false, setKey: async () => true } as any,
266268
)
267269
})
268270

@@ -738,7 +740,8 @@ describe('EventMessageHandler', () => {
738740
{ hasActiveRequestToVanish: async () => false } as any,
739741
userRepository,
740742
() => settings,
741-
() => ({ hit: rateLimiterHitStub })
743+
() => ({ hit: rateLimiterHitStub }),
744+
{ hasKey: async () => false, setKey: async () => true, setKeyWithExpiry: async () => true } as any,
742745
)
743746
})
744747

@@ -953,6 +956,7 @@ describe('EventMessageHandler', () => {
953956
let webSocket: IWebSocketAdapter
954957
let getRelayPublicKeyStub: SinonStub
955958
let userRepositoryFindByPubkeyStub: SinonStub
959+
let cacheGetKeyStub: SinonStub
956960

957961
beforeEach(() => {
958962
settings = {
@@ -994,6 +998,7 @@ describe('EventMessageHandler', () => {
994998
getRelayPublicKeyStub = sandbox.stub(EventMessageHandler.prototype, 'getRelayPublicKey' as any)
995999
getClientAddressStub = sandbox.stub()
9961000
userRepositoryFindByPubkeyStub = sandbox.stub()
1001+
cacheGetKeyStub = sandbox.stub().resolves(null)
9971002
webSocket = {
9981003
getClientAddress: getClientAddressStub,
9991004
} as any
@@ -1006,11 +1011,18 @@ describe('EventMessageHandler', () => {
10061011
{ hasActiveRequestToVanish: async () => false } as any,
10071012
userRepository,
10081013
() => settings,
1009-
() => ({ hit: async () => false })
1014+
() => ({ hit: async () => false }),
1015+
{
1016+
hasKey: async () => false,
1017+
getKey: cacheGetKeyStub,
1018+
setKey: async () => true,
1019+
setKeyWithExpiry: async () => true,
1020+
setKeyExpiry: async () => undefined,
1021+
} as any,
10101022
)
10111023
})
10121024

1013-
it ('fulfills with undefined if payments are disabled', async () => {
1025+
it('fulfills with undefined if payments are disabled', async () => {
10141026
settings.payments.enabled = false
10151027

10161028
return expect((handler as any).isUserAdmitted(event)).to.eventually.be.undefined
@@ -1089,12 +1101,6 @@ describe('EventMessageHandler', () => {
10891101
return expect((handler as any).isUserAdmitted(event)).to.eventually.equal('blocked: pubkey not admitted')
10901102
})
10911103

1092-
it('fulfills with reason if user is not admitted', async () => {
1093-
userRepositoryFindByPubkeyStub.resolves({ isAdmitted: false })
1094-
1095-
return expect((handler as any).isUserAdmitted(event)).to.eventually.equal('blocked: pubkey not admitted')
1096-
})
1097-
10981104
it('fulfills with reason if user does not meet minimum balance', async () => {
10991105
settings.limits.event.pubkey.minBalance = 1000n
11001106
userRepositoryFindByPubkeyStub.resolves({ isAdmitted: true, balance: 999n })
@@ -1108,5 +1114,23 @@ describe('EventMessageHandler', () => {
11081114

11091115
return expect((handler as any).isUserAdmitted(event)).to.eventually.be.undefined
11101116
})
1117+
1118+
it('fulfills with undefined if user is admitted in cache', async () => {
1119+
cacheGetKeyStub.resolves(CacheAdmissionState.ADMITTED)
1120+
1121+
return expect((handler as any).isUserAdmitted(event)).to.eventually.be.undefined
1122+
})
1123+
1124+
it('fulfills with reason if user is blocked in cache', async () => {
1125+
cacheGetKeyStub.resolves(CacheAdmissionState.NOT_ADMITTED)
1126+
1127+
return expect((handler as any).isUserAdmitted(event)).to.eventually.equal('blocked: pubkey not admitted')
1128+
})
1129+
1130+
it('fulfills with reason if user has insufficient balance in cache', async () => {
1131+
cacheGetKeyStub.resolves(CacheAdmissionState.INSUFFICIENT_BALANCE)
1132+
1133+
return expect((handler as any).isUserAdmitted(event)).to.eventually.equal('blocked: insufficient balance')
1134+
})
11111135
})
11121136
})

0 commit comments

Comments
 (0)