|
| 1 | +import { createServer } from 'node:http' |
| 2 | +import amqp from 'amqplib' // v0.10.8 |
| 3 | +import staticHandler from 'serve-handler' // v6.1.6 |
| 4 | +import { WebSocketServer } from 'ws' // v8.18.2 |
| 5 | + |
| 6 | +const httpPort = process.argv[2] || 8080 |
| 7 | + |
| 8 | +// register the server with RabbitMQ and create a queue |
| 9 | +const connection = await amqp.connect('amqp://localhost') |
| 10 | +const channel = await connection.createChannel() |
| 11 | +await channel.assertExchange('chat', 'fanout') |
| 12 | +const { queue } = await channel.assertQueue(`chat_srv_${httpPort}`, { |
| 13 | + exclusive: true, |
| 14 | +}) |
| 15 | +await channel.bindQueue(queue, 'chat') |
| 16 | +channel.consume( |
| 17 | + queue, |
| 18 | + msg => { |
| 19 | + msg = msg.content.toString() |
| 20 | + console.log(`From queue: ${msg}`) |
| 21 | + broadcast(Buffer.from(msg)) |
| 22 | + }, |
| 23 | + { noAck: true } |
| 24 | +) |
| 25 | + |
| 26 | +// serve static files |
| 27 | +const server = createServer((req, res) => { |
| 28 | + return staticHandler(req, res, { public: 'web' }) |
| 29 | +}) |
| 30 | + |
| 31 | +const wss = new WebSocketServer({ server }) |
| 32 | +wss.on('connection', async client => { |
| 33 | + console.log('Client connected') |
| 34 | + client.on('message', msg => { |
| 35 | + console.log(`Sending message: ${msg}`) |
| 36 | + channel.publish( |
| 37 | + 'chat', |
| 38 | + '', |
| 39 | + Buffer.from( |
| 40 | + JSON.stringify({ |
| 41 | + text: msg.toString(), |
| 42 | + timestamp: Date.now(), |
| 43 | + }) |
| 44 | + ) |
| 45 | + ) |
| 46 | + }) |
| 47 | + |
| 48 | + // load previous messages from the history service |
| 49 | + const res = await fetch('http://localhost:8090') |
| 50 | + const messages = await res.json() |
| 51 | + for (const message of messages) { |
| 52 | + client.send(Buffer.from(JSON.stringify(message))) |
| 53 | + } |
| 54 | +}) |
| 55 | + |
| 56 | +function broadcast(msg) { |
| 57 | + for (const client of wss.clients) { |
| 58 | + if (client.readyState === WebSocket.OPEN) { |
| 59 | + client.send(msg) |
| 60 | + } |
| 61 | + } |
| 62 | +} |
| 63 | + |
| 64 | +server.listen(httpPort) |
0 commit comments