Skip to content

Commit 313db46

Browse files
committed
chore: working on chapter 13
1 parent d327e32 commit 313db46

7 files changed

Lines changed: 234 additions & 0 deletions

File tree

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# 09-correlation-id
2+
3+
This sample demonstrates how to implement a request/reply pattern on top of a
4+
simple point-to-point one-way asynchronous channel.
5+
6+
## Dependencies
7+
8+
As pre-requisite to this sample, you first need to
9+
[install RabbitMQ](http://www.rabbitmq.com/download.html)
10+
11+
If you have docker installed, you can run an ephemeral instance of RabbitMQ with
12+
the following command:
13+
14+
```bash
15+
docker run -it -p 5672:5672 --hostname my-rabbit rabbitmq:4
16+
```
17+
18+
This example requires you to install some third-party dependencies from npm.
19+
20+
If you have `pnpm` installed, you can do that with:
21+
22+
```bash
23+
pnpm install
24+
```
25+
26+
Alternatively, if you prefer to use another package manager, make sure to delete
27+
the `pnpm-lock.yaml` file before using it.
28+
29+
If you want to use `npm`, you can run:
30+
31+
```bash
32+
npm install
33+
```
34+
35+
If you want to use `yarn`, you can run:
36+
37+
```bash
38+
yarn install
39+
```
40+
41+
## Run
42+
43+
To run this example, use the following commands (in different terminals):
44+
45+
```bash
46+
node replier.js
47+
node requestor.js
48+
node requestor.js # you can run as many requestors as you want
49+
```
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import amqp from 'amqplib' // v0.10.8
2+
3+
export class AmqpReply {
4+
constructor(requestsQueueName) {
5+
this.requestsQueueName = requestsQueueName
6+
}
7+
8+
async initialize() {
9+
const connection = await amqp.connect('amqp://localhost')
10+
this.channel = await connection.createChannel()
11+
const { queue } = await this.channel.assertQueue(this.requestsQueueName)
12+
this.queue = queue
13+
}
14+
15+
handleRequests(handler) {
16+
this.channel.consume(this.queue, async msg => {
17+
const content = JSON.parse(msg.content.toString())
18+
const replyData = await handler(content)
19+
this.channel.sendToQueue(
20+
msg.properties.replyTo,
21+
Buffer.from(JSON.stringify(replyData)),
22+
{ correlationId: msg.properties.correlationId }
23+
)
24+
this.channel.ack(msg)
25+
})
26+
}
27+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import amqp from 'amqplib' // v0.10.8
2+
import { nanoid } from 'nanoid' // v5.1.5
3+
4+
export class AmqpRequest {
5+
constructor() {
6+
this.correlationMap = new Map()
7+
}
8+
9+
async initialize() {
10+
this.connection = await amqp.connect('amqp://localhost')
11+
this.channel = await this.connection.createChannel()
12+
const { queue } = await this.channel.assertQueue('', { exclusive: true })
13+
this.replyQueue = queue
14+
15+
this.channel.consume(
16+
this.replyQueue,
17+
msg => {
18+
const correlationId = msg.properties.correlationId
19+
const handler = this.correlationMap.get(correlationId)
20+
if (handler) {
21+
handler(JSON.parse(msg.content.toString()))
22+
}
23+
},
24+
{ noAck: true }
25+
)
26+
}
27+
28+
send(queue, message) {
29+
return new Promise((resolve, reject) => {
30+
const id = nanoid()
31+
const replyTimeout = setTimeout(() => {
32+
this.correlationMap.delete(id)
33+
reject(new Error('Request timeout'))
34+
}, 10000)
35+
36+
this.correlationMap.set(id, replyData => {
37+
this.correlationMap.delete(id)
38+
clearTimeout(replyTimeout)
39+
resolve(replyData)
40+
})
41+
42+
this.channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)), {
43+
correlationId: id,
44+
replyTo: this.replyQueue,
45+
})
46+
})
47+
}
48+
49+
destroy() {
50+
this.channel.close()
51+
this.connection.close()
52+
}
53+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"name": "09-correlation-id",
3+
"version": "1.0.0",
4+
"description": "This sample demonstrates how to implement a request/reply pattern on top of a simple point-to-point one-way asynchronous channel.",
5+
"type": "module",
6+
"scripts": {},
7+
"engines": {
8+
"node": ">=24"
9+
},
10+
"engineStrict": true,
11+
"keywords": [],
12+
"author": "Luciano Mammino and Mario Casciaro",
13+
"license": "MIT",
14+
"dependencies": {
15+
"nanoid": "^5.1.5",
16+
"amqplib": "^0.10.8"
17+
}
18+
}

13-messaging-and-integration-patterns/10-return-address/pnpm-lock.yaml

Lines changed: 59 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { AmqpReply } from './amqpReply.js'
2+
3+
const reply = new AmqpReply('requests_queue')
4+
await reply.initialize()
5+
6+
reply.handleRequests(req => {
7+
console.log('Request received', req)
8+
return { sum: req.a + req.b }
9+
})
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { setTimeout } from 'node:timers/promises'
2+
import { AmqpRequest } from './amqpRequest.js'
3+
4+
const request = new AmqpRequest()
5+
await request.initialize()
6+
7+
async function sendRandomRequest() {
8+
const a = Math.round(Math.random() * 100)
9+
const b = Math.round(Math.random() * 100)
10+
const reply = await request.send('requests_queue', { a, b })
11+
console.log(`${a} + ${b} = ${reply.sum}`)
12+
}
13+
14+
for (let i = 0; i < 20; i++) {
15+
await sendRandomRequest()
16+
await setTimeout(1000)
17+
}
18+
19+
request.destroy()

0 commit comments

Comments
 (0)