Skip to content

Commit a658951

Browse files
committed
fix: added expired_at filter to message pipeline
1 parent 6a8ccb4 commit a658951

2 files changed

Lines changed: 65 additions & 2 deletions

File tree

src/handlers/subscribe-message-handler.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { pipeline } from 'stream/promises'
44

55
import { createEndOfStoredEventsNoticeMessage, createNoticeMessage, createOutgoingEventMessage } from '../utils/messages'
66
import { IAbortable, IMessageHandler } from '../@types/message-handlers'
7-
import { isEventMatchingFilter, toNostrEvent } from '../utils/event'
7+
import { isEventMatchingFilter, isExpiredEvent, toNostrEvent } from '../utils/event'
88
import { streamEach, streamEnd, streamFilter, streamMap } from '../utils/stream'
99
import { SubscriptionFilter, SubscriptionId } from '../@types/subscription'
1010
import { createLogger } from '../factories/logger-factory'
@@ -55,6 +55,12 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
5555
const sendEOSE = () =>
5656
this.webSocket.emit(WebSocketAdapterEvent.Message, createEndOfStoredEventsNoticeMessage(subscriptionId))
5757
const isSubscribedToEvent = SubscribeMessageHandler.isClientSubscribedToEvent(filters)
58+
const isNotExpired = (event: Event)=>{
59+
if (isExpiredEvent(event)) {
60+
return false
61+
}
62+
return true
63+
}
5864

5965
const findEvents = this.eventRepository.findByFilters(filters).stream()
6066

@@ -65,6 +71,7 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
6571
findEvents,
6672
streamFilter(propSatisfies(isNil, 'deleted_at')),
6773
streamMap(toNostrEvent),
74+
streamFilter(isNotExpired),
6875
streamFilter(isSubscribedToEvent),
6976
streamEach(sendEvent),
7077
streamEnd(sendEOSE),
@@ -117,3 +124,4 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
117124

118125
}
119126
}
127+

test/unit/handlers/subscribe-message-handler.spec.ts

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import chai from 'chai'
33
import chaiAsPromised from 'chai-as-promised'
44
import EventEmitter from 'events'
55
import Sinon from 'sinon'
6+
import sinonChai from 'sinon-chai'
67

78
import { IAbortable, IMessageHandler } from '../../../src/@types/message-handlers'
89
import { MessageType, SubscribeMessage } from '../../../src/@types/messages'
@@ -14,17 +15,22 @@ import { PassThrough } from 'stream'
1415
import { SubscribeMessageHandler } from '../../../src/handlers/subscribe-message-handler'
1516
import { WebSocketAdapterEvent } from '../../../src/constants/adapter'
1617

18+
chai.use(sinonChai)
1719
chai.use(chaiAsPromised)
1820
const { expect } = chai
1921

20-
const toDbEvent = (event: Event) => ({
22+
const toDbEvent = (
23+
event: Event,
24+
metadata: { expires_at?: number, deleted_at?: Date | null } = {},
25+
) => ({
2126
event_id: Buffer.from(event.id, 'hex'),
2227
event_kind: event.kind,
2328
event_pubkey: Buffer.from(event.pubkey, 'hex'),
2429
event_created_at: event.created_at,
2530
event_content: event.content,
2631
event_tags: event.tags,
2732
event_signature: Buffer.from(event.sig, 'hex'),
33+
...metadata,
2834
})
2935

3036
describe('SubscribeMessageHandler', () => {
@@ -112,11 +118,13 @@ describe('SubscribeMessageHandler', () => {
112118

113119
describe('#fetchAndSend', () => {
114120
let event: Event
121+
let clock: Sinon.SinonFakeTimers
115122
let webSocketOnMessageStub: Sinon.SinonStub
116123
let webSocketOnSubscribeStub: Sinon.SinonStub
117124
let isClientSubscribedToEventStub: Sinon.SinonStub
118125

119126
beforeEach(() => {
127+
clock = sandbox.useFakeTimers(1665546189000)
120128
event = {
121129
'id': 'b1601d26958e6508b7b9df0af609c652346c09392b6534d93aead9819a51b4ef',
122130
'pubkey': '22e804d26ed16b68db5259e78449e96dab5d464c8f470bda3eb1a70467f2c793',
@@ -165,6 +173,53 @@ describe('SubscribeMessageHandler', () => {
165173
)
166174
})
167175

176+
it('does not send expired events', async () => {
177+
isClientSubscribedToEventStub.returns(always(true))
178+
179+
const now = Math.floor(clock.now / 1000)
180+
const promise = (handler as any).fetchAndSend(subscriptionId, filters)
181+
182+
const expiredEvent: Event = {
183+
...event,
184+
tags: [['expiration', String(now - 1)] as any],
185+
}
186+
187+
stream.write(toDbEvent(expiredEvent))
188+
stream.end()
189+
190+
await promise
191+
192+
expect(eventRepositoryFindByFiltersStub).to.have.been.calledOnceWithExactly(filters)
193+
expect(webSocketOnMessageStub).to.have.been.calledOnceWithExactly(
194+
['EOSE', subscriptionId],
195+
)
196+
})
197+
198+
it('sends event if expiration is in the future', async () => {
199+
isClientSubscribedToEventStub.returns(always(true))
200+
201+
const now = Math.floor(clock.now / 1000)
202+
const promise = (handler as any).fetchAndSend(subscriptionId, filters)
203+
204+
const eventWithFutureExpiration: Event = {
205+
...event,
206+
tags: [['expiration', String(now + 60)] as any],
207+
}
208+
209+
stream.write(toDbEvent(eventWithFutureExpiration))
210+
stream.end()
211+
212+
await promise
213+
214+
expect(eventRepositoryFindByFiltersStub).to.have.been.calledOnceWithExactly(filters)
215+
expect(webSocketOnMessageStub).to.have.been.calledWithExactly(
216+
['EVENT', subscriptionId, eventWithFutureExpiration],
217+
)
218+
expect(webSocketOnMessageStub).to.have.been.calledWithExactly(
219+
['EOSE', subscriptionId],
220+
)
221+
})
222+
168223
it('sends EOSE', async () => {
169224
const promise = (handler as any).fetchAndSend(subscriptionId, filters)
170225

0 commit comments

Comments
 (0)