Skip to content

Commit d327e32

Browse files
committed
chore: working on chapter 13
1 parent 40c2953 commit d327e32

7 files changed

Lines changed: 168 additions & 0 deletions

File tree

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# 09-correlation-id
2+
3+
This sample demonstrates how to implement a request/reply pattern on top of a
4+
simple point-to-point one-way asynchronous channel.
5+
6+
## Dependencies
7+
8+
This example requires you to install some third-party dependencies from npm.
9+
10+
If you have `pnpm` installed, you can do that with:
11+
12+
```bash
13+
pnpm install
14+
```
15+
16+
Alternatively, if you prefer to use another package manager, make sure to delete
17+
the `pnpm-lock.yaml` file before using it.
18+
19+
If you want to use `npm`, you can run:
20+
21+
```bash
22+
npm install
23+
```
24+
25+
If you want to use `yarn`, you can run:
26+
27+
```bash
28+
yarn install
29+
```
30+
31+
## Run
32+
33+
To run this example, use the following command:
34+
35+
```bash
36+
node requestor.js
37+
```
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
export function createReplyChannel(channel) {
2+
return function registerHandler(handler) {
3+
channel.on('message', async message => {
4+
if (message.type !== 'request') {
5+
return
6+
}
7+
8+
const replyData = await handler(message.data)
9+
channel.send({
10+
type: 'response',
11+
data: replyData,
12+
inReplyTo: message.id,
13+
})
14+
})
15+
}
16+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { nanoid } from 'nanoid' // v5.1.5
2+
3+
export function createRequestChannel(channel) {
4+
const correlationMap = new Map()
5+
6+
function sendRequest(data) {
7+
console.log('Sending request', data)
8+
return new Promise((resolve, reject) => {
9+
const correlationId = nanoid()
10+
11+
const replyTimeout = setTimeout(() => {
12+
correlationMap.delete(correlationId)
13+
reject(new Error('Request timeout'))
14+
}, 10000)
15+
16+
correlationMap.set(correlationId, replyData => {
17+
correlationMap.delete(correlationId)
18+
clearTimeout(replyTimeout)
19+
resolve(replyData)
20+
})
21+
22+
channel.send({
23+
type: 'request',
24+
data,
25+
id: correlationId,
26+
})
27+
})
28+
}
29+
30+
channel.on('message', message => {
31+
const replyCb = correlationMap.get(message.inReplyTo)
32+
if (replyCb) {
33+
replyCb(message.data)
34+
}
35+
})
36+
37+
return sendRequest
38+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"name": "09-correlation-id",
3+
"version": "1.0.0",
4+
"description": "This sample demonstrates how to implement a request/reply pattern on top of a simple point-to-point one-way asynchronous channel.",
5+
"type": "module",
6+
"scripts": {},
7+
"engines": {
8+
"node": ">=24"
9+
},
10+
"engineStrict": true,
11+
"keywords": [],
12+
"author": "Luciano Mammino and Mario Casciaro",
13+
"license": "MIT",
14+
"dependencies": {
15+
"nanoid": "^5.1.5"
16+
}
17+
}

13-messaging-and-integration-patterns/09-correlation-id/pnpm-lock.yaml

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { createReplyChannel } from './createReplyChannel.js'
2+
3+
const registerReplyHandler = createReplyChannel(process)
4+
5+
registerReplyHandler(req => {
6+
return new Promise(resolve => {
7+
setTimeout(() => {
8+
resolve({ sum: req.a + req.b })
9+
}, req.delay)
10+
})
11+
})
12+
13+
process.send('ready')
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import { fork } from 'node:child_process'
2+
import { once } from 'node:events'
3+
import { join } from 'node:path'
4+
import { createRequestChannel } from './createRequestChannel.js'
5+
6+
const channel = fork(join(import.meta.dirname, 'replier.js'))
7+
const request = createRequestChannel(channel)
8+
9+
try {
10+
const [message] = await once(channel, 'message')
11+
console.log(`Child process initialized: ${message}`)
12+
const p1 = request({ a: 1, b: 2, delay: 500 }).then(res => {
13+
console.log(`Reply: 1 + 2 = ${res.sum}`)
14+
})
15+
16+
const p2 = request({ a: 6, b: 1, delay: 100 }).then(res => {
17+
console.log(`Reply: 6 + 1 = ${res.sum}`)
18+
})
19+
20+
await Promise.all([p1, p2])
21+
} finally {
22+
channel.disconnect()
23+
}

0 commit comments

Comments
 (0)