Skip to content

Commit 5dcc2be

Browse files
authored
refactor: event-bus inherits EventEmitter (#235)
1 parent 220a4ac commit 5dcc2be

2 files changed

Lines changed: 42 additions & 47 deletions

File tree

lib/event-bus.js

Lines changed: 33 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,64 +6,60 @@ import { EventEmitter } from 'node:events'
66
* allowing payloads to be received by one instance of Smee and sent by
77
* others. This allows Smee to be multi-instanced!
88
*/
9-
export default class EventBus {
9+
export default class EventBus extends EventEmitter {
10+
#namespace = ''
11+
12+
#connection = ''
13+
14+
/** @type {Redis} */
15+
#sub
16+
17+
/** @type {Redis} */
18+
#pub
19+
20+
#connected = false
21+
1022
constructor ({ logger = console } = {}) {
11-
if (!(this instanceof EventBus)) {
12-
return new EventBus()
13-
}
14-
this.events = new EventEmitter()
23+
super()
1524

1625
// If Redis isn't enabled, don't try to connect
1726
if (!process.env.REDIS_URL) {
1827
logger.warn('Redis not enabled; events will not be shared between instances')
1928
return
2029
}
2130

22-
this.opts = {
23-
connection: process.env.REDIS_URL,
24-
namespace: 'global'
25-
}
31+
this.#namespace = process.env.REDIS_NAMESPACE || 'global'
32+
33+
this.#connection = process.env.REDIS_URL || ''
2634

2735
// Need two Redis clients; one cannot subscribe and publish.
28-
this.sub = new Redis(this.opts.connection)
29-
this.pub = new Redis(this.opts.connection)
36+
this.#sub = new Redis(this.#connection)
37+
this.#pub = new Redis(this.#connection)
3038

3139
// Subscribe to the Redis event channel
32-
this.sub.subscribe(this.opts.namespace)
33-
34-
logger.info(`Redis enabled; events will be shared between instances using ${this.opts.namespace} as the namespace`)
40+
this.#sub.subscribe(this.#namespace)
3541

42+
3643
// When we get a message, parse it and
3744
// throw it over to the EventEmitter.
38-
this.sub.on('message', (_, message) => {
45+
this.#sub.on('message', (_, message) => {
3946
const channel = message.slice(0, message.indexOf(':'))
40-
if (this.events.listenerCount(channel)) {
41-
return this.events.emit(channel, message.slice(channel.length + 1))
47+
if (this.listenerCount(channel)) {
48+
return super.emit(channel, message.slice(channel.length + 1))
4249
}
4350
})
51+
52+
this.#connected = true
53+
logger.info(`Redis enabled; events will be shared between instances using '${this.#namespace}' as the namespace`)
4454
}
4555

4656
/**
47-
* Emit an event to this machine's in-memory EventEmitter
48-
* @param {object} opts
49-
* @param {string} opts.channel - Channel name
50-
* @param {any} opts.payload
51-
*/
52-
emitLocalEvent (opts) {
53-
return this.events.emit(opts.channel, opts.payload)
54-
}
55-
56-
/**
57-
* Emit an event to the Redis bus, which will tell every subscriber about it
58-
* @param {object} opts
59-
* @param {string} opts.channel - Channel name
60-
* @param {any} opts.payload
57+
* @param {string} channelId
58+
* @param {string} payload
6159
*/
62-
emitEvent (opts) {
63-
if (process.env.REDIS_URL) {
64-
return this.pub.publish(this.opts.namespace, opts.channel + ':' + JSON.stringify(opts.payload))
65-
}
66-
// Only emit local events if Redis isn't configured
67-
return this.emitLocalEvent(opts)
60+
async emit (channelId, payload) {
61+
return this.#connected
62+
? this.#pub.publish(this.#namespace, channelId + ':' + JSON.stringify(payload)) !== 0
63+
: super.emit(channelId, payload)
6864
}
6965
}

lib/server.js

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -227,24 +227,24 @@ const [
227227
})
228228

229229
function close () {
230-
bus.events.removeListener(channel, reply.sse)
230+
bus.removeListener(channel, reply.sse)
231231
keepAlive.stop(reply)
232-
fastify.log.info('Client disconnected', channel, bus.events.listenerCount(channel))
232+
fastify.log.info('Client disconnected', channel, bus.listenerCount(channel))
233233
}
234234

235235
// Listen for events on this channel
236-
bus.events.on(channel, reply.sse.bind(reply))
236+
bus.on(channel, reply.sse.bind(reply))
237237

238238
// Clean up when the client disconnects
239239
reply.raw.on('close', close)
240240

241241
reply.sseReady()
242242

243-
fastify.log.info('Client connected to sse', channel, bus.events.listenerCount(channel))
243+
fastify.log.info('Client connected to sse', channel, bus.listenerCount(channel))
244244
return
245245
}
246246

247-
fastify.log.info('Client connected to web', channel, bus.events.listenerCount(channel))
247+
fastify.log.info('Client connected to web', channel, bus.listenerCount(channel))
248248
return reply
249249
.status(200)
250250
.header('Content-Type', 'text/html; charset=utf-8')
@@ -271,14 +271,13 @@ const [
271271
}
272272
}, async (req, reply) => {
273273
// Emit an event to the Redis bus
274-
await bus.emitEvent({
275-
channel: req.params.channel,
276-
payload: {
274+
await bus.emit(
275+
req.params.channel,
276+
{
277277
...req.headers,
278278
body: req.body,
279279
query: req.query,
280280
timestamp: Date.now()
281-
}
282281
})
283282

284283
return reply.status(200).send()
@@ -314,7 +313,7 @@ const [
314313
}
315314
}, async (req, reply) => {
316315
// Emit an event to the Redis bus
317-
await bus.emitEvent({
316+
await bus.emit({
318317
channel: req.params.channel,
319318
payload: req.body
320319
})

0 commit comments

Comments
 (0)