Skip to content

Commit f55ce74

Browse files
committed
feat: add opt-in event retention purge (#359)
1 parent 9cf04a1 commit f55ce74

9 files changed

Lines changed: 202 additions & 1 deletion

File tree

CONFIGURATION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ Running `nostream` for the first time creates the settings file in `<project_roo
109109
| limits.event.rateLimits[].rate | Maximum number of events during period. |
110110
| limits.event.whitelists.pubkeys | List of public keys to ignore rate limits. |
111111
| limits.event.whitelists.ipAddresses | List of IPs (IPv4 or IPv6) to ignore rate limits. |
112+
| limits.event.retentionDays | Number of days to retain events (based on created_at). Older events are purged. Defaults to disabled. |
112113
| limits.client.subscription.maxSubscriptions | Maximum number of subscriptions per connected client. Defaults to 10. Disabled when set to zero. |
113114
| limits.client.subscription.maxFilters | Maximum number of filters per subscription. Defaults to 10. Disabled when set to zero. |
114115
| limits.message.rateLimits[].period | Rate limit period in milliseconds. |

resources/default-settings.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ limits:
7676
- "10.10.10.1"
7777
- "::ffff:10.10.10.1"
7878
event:
79+
# Uncomment the next line to enable event retention and set the number of days
80+
# retentionDays: 30
7981
eventId:
8082
minLeadingZeroBits: 0
8183
kind:

src/@types/repositories.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ export interface IEventRepository {
1717
upsert(event: Event): Promise<number>
1818
findByFilters(filters: SubscriptionFilter[]): IQueryResult<DBEvent[]>
1919
deleteByPubkeyAndIds(pubkey: Pubkey, ids: EventId[]): Promise<number>
20+
deleteExpiredAndRetained(retentionDays?: number): Promise<number>
2021
}
2122

2223
export interface IInvoiceRepository {

src/@types/settings.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ export interface EventLimits {
7777
content?: ContentLimits | ContentLimits[]
7878
rateLimits?: EventRateLimit[]
7979
whitelists?: EventWhitelists
80+
retentionDays?: number
8081
}
8182

8283
export interface ClientSubscriptionLimits {

src/app/maintenance-worker.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { IRunnable } from '../@types/base'
33

44
import { createLogger } from '../factories/logger-factory'
55
import { delayMs } from '../utils/misc'
6+
import { IEventRepository } from '../@types/repositories'
67
import { InvoiceStatus } from '../@types/invoice'
78
import { IPaymentsService } from '../@types/services'
89
import { Settings } from '../@types/settings'
@@ -17,6 +18,7 @@ export class MaintenanceWorker implements IRunnable {
1718
public constructor(
1819
private readonly process: NodeJS.Process,
1920
private readonly paymentsService: IPaymentsService,
21+
private readonly eventRepository: IEventRepository,
2022
private readonly settings: () => Settings,
2123
) {
2224
this.process
@@ -34,6 +36,20 @@ export class MaintenanceWorker implements IRunnable {
3436
private async onSchedule(): Promise<void> {
3537
const currentSettings = this.settings()
3638

39+
const retentionDays = currentSettings.limits?.event?.retentionDays
40+
41+
if (typeof retentionDays === 'number' && retentionDays > 0) {
42+
try {
43+
debug('purging deleted and expired events')
44+
const deletedCount = await this.eventRepository.deleteExpiredAndRetained(retentionDays)
45+
if (deletedCount > 0) {
46+
console.info(`[Maintenance] Deleted ${deletedCount} expired and retained events.`)
47+
}
48+
} catch (error) {
49+
console.error('Unable to purge events. Reason:', error)
50+
}
51+
}
52+
3753
if (!path(['payments','enabled'], currentSettings)) {
3854
return
3955
}
Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
1+
import { getMasterDbClient, getReadReplicaDbClient } from '../database/client'
12
import { createPaymentsService } from './payments-service-factory'
23
import { createSettings } from './settings-factory'
4+
import { EventRepository } from '../repositories/event-repository'
35
import { MaintenanceWorker } from '../app/maintenance-worker'
46

57
export const maintenanceWorkerFactory = () => {
6-
return new MaintenanceWorker(process, createPaymentsService(), createSettings)
8+
return new MaintenanceWorker(
9+
process,
10+
createPaymentsService(),
11+
new EventRepository(getMasterDbClient(), getReadReplicaDbClient()),
12+
createSettings
13+
)
714
}

src/repositories/event-repository.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,4 +251,20 @@ export class EventRepository implements IEventRepository {
251251
deleted_at: this.masterDbClient.raw('now()'),
252252
})
253253
}
254+
255+
public deleteExpiredAndRetained(retentionDays?: number): Promise<number> {
256+
debug('deleting expired and retained events (retentionDays: %d)', retentionDays)
257+
258+
if (!(typeof retentionDays === 'number' && retentionDays > 0)) {
259+
return this.masterDbClient('events').whereRaw('1 = 0').del()
260+
}
261+
262+
const retentionLimit = Math.floor(Date.now() / 1000) - (retentionDays * 86400)
263+
264+
return this.masterDbClient('events')
265+
.where('expires_at', '<', Math.floor(Date.now() / 1000))
266+
.orWhereNotNull('deleted_at')
267+
.orWhere('event_created_at', '<', retentionLimit)
268+
.del()
269+
}
254270
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import * as chai from 'chai'
2+
import * as sinon from 'sinon'
3+
import sinonChai from 'sinon-chai'
4+
5+
chai.use(sinonChai)
6+
7+
const { expect } = chai
8+
9+
import { IEventRepository } from '../../../src/@types/repositories'
10+
import { IPaymentsService } from '../../../src/@types/services'
11+
import { MaintenanceWorker } from '../../../src/app/maintenance-worker'
12+
import { Settings } from '../../../src/@types/settings'
13+
14+
describe('MaintenanceWorker', () => {
15+
let worker: MaintenanceWorker
16+
let sandbox: sinon.SinonSandbox
17+
let paymentsService: sinon.SinonStubbedInstance<IPaymentsService>
18+
let eventRepository: sinon.SinonStubbedInstance<IEventRepository>
19+
let settings: sinon.SinonStub
20+
let processMock: any
21+
22+
beforeEach(() => {
23+
sandbox = sinon.createSandbox()
24+
paymentsService = {
25+
getPendingInvoices: sandbox.stub(),
26+
getInvoiceFromPaymentsProcessor: sandbox.stub(),
27+
updateInvoiceStatus: sandbox.stub(),
28+
confirmInvoice: sandbox.stub(),
29+
sendInvoiceUpdateNotification: sandbox.stub(),
30+
} as any
31+
eventRepository = {
32+
deleteExpiredAndRetained: sandbox.stub(),
33+
} as any
34+
settings = sandbox.stub()
35+
processMock = {
36+
on: sandbox.stub().returnsThis(),
37+
}
38+
39+
worker = new MaintenanceWorker(
40+
processMock as any,
41+
paymentsService as any,
42+
eventRepository as any,
43+
settings as any,
44+
)
45+
})
46+
47+
afterEach(() => {
48+
sandbox.restore()
49+
})
50+
51+
describe('onSchedule', () => {
52+
it('purges events and processes invoices', async () => {
53+
const currentSettings: Settings = {
54+
info: {} as any,
55+
network: {} as any,
56+
payments: {
57+
enabled: true,
58+
} as any,
59+
limits: {
60+
event: {
61+
retentionDays: 30,
62+
},
63+
} as any,
64+
}
65+
settings.returns(currentSettings)
66+
eventRepository.deleteExpiredAndRetained.resolves(10)
67+
paymentsService.getPendingInvoices.resolves([])
68+
69+
await (worker as any).onSchedule()
70+
71+
expect(eventRepository.deleteExpiredAndRetained).to.have.been.calledOnceWithExactly(30)
72+
expect(paymentsService.getPendingInvoices).to.have.been.calledOnce
73+
})
74+
75+
it('purges events even if payments are disabled', async () => {
76+
const currentSettings: Settings = {
77+
info: {} as any,
78+
network: {} as any,
79+
payments: {
80+
enabled: false,
81+
} as any,
82+
limits: {
83+
event: {
84+
retentionDays: 30,
85+
},
86+
} as any,
87+
}
88+
settings.returns(currentSettings)
89+
eventRepository.deleteExpiredAndRetained.resolves(10)
90+
91+
await (worker as any).onSchedule()
92+
93+
expect(eventRepository.deleteExpiredAndRetained).to.have.been.calledOnceWithExactly(30)
94+
expect(paymentsService.getPendingInvoices).not.to.have.been.called
95+
})
96+
97+
it('does not purge when retentionDays is not configured', async () => {
98+
const currentSettings: Settings = {
99+
info: {} as any,
100+
network: {} as any,
101+
payments: {
102+
enabled: false,
103+
} as any,
104+
}
105+
settings.returns(currentSettings)
106+
107+
await (worker as any).onSchedule()
108+
109+
expect(eventRepository.deleteExpiredAndRetained).not.to.have.been.called
110+
})
111+
112+
it('handles error during purge when retentionDays is configured', async () => {
113+
const currentSettings: Settings = {
114+
info: {} as any,
115+
network: {} as any,
116+
payments: {
117+
enabled: false,
118+
} as any,
119+
limits: {
120+
event: {
121+
retentionDays: 30,
122+
},
123+
} as any,
124+
}
125+
settings.returns(currentSettings)
126+
eventRepository.deleteExpiredAndRetained.rejects(new Error('DB Error'))
127+
128+
// Should not throw
129+
await (worker as any).onSchedule()
130+
131+
expect(eventRepository.deleteExpiredAndRetained).to.have.been.calledOnceWithExactly(30)
132+
})
133+
})
134+
})

test/unit/repositories/event-repository.spec.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,29 @@ describe('EventRepository', () => {
447447
})
448448
})
449449

450+
describe('deleteExpiredAndRetained', () => {
451+
let clock: sinon.SinonFakeTimers
452+
beforeEach(() => {
453+
clock = sinon.useFakeTimers(1000000000) // 1970-01-12T13:46:40.000Z
454+
})
455+
456+
afterEach(() => {
457+
clock.restore()
458+
})
459+
460+
it('does not delete anything when retentionDays is not set', () => {
461+
const query = repository.deleteExpiredAndRetained().toString()
462+
463+
expect(query).to.equal('delete from "events" where 1 = 0')
464+
})
465+
466+
it('deletes expired, deleted and old events when retentionDays is set', () => {
467+
const query = repository.deleteExpiredAndRetained(7).toString()
468+
469+
expect(query).to.equal('delete from "events" where "expires_at" < 1000000 or "deleted_at" is not null or "event_created_at" < 395200')
470+
})
471+
})
472+
450473
describe('upsert', () => {
451474
it('replaces event based on event_pubkey and event_kind', () => {
452475
const event: Event = {

0 commit comments

Comments
 (0)