Hi there, I'm reading multiplexing in Chapter 6, and I ran into a problem that I can't figure out.
First I copied the contents of the client.js and server.js into my local file, but instead use file read/write stream rather than stdout or stderr, and the code runs smooth without any problem. The complete code is shown below:
// client.mjs
import { connect } from 'net'
import { createReadStream } from 'fs'
function multiplexChannels (sources, destination) {
let openChannels = sources.length
for (let i = 0; i < sources.length; i++) {
sources[i]
.on('readable', function () { // ①
let chunk
while ((chunk = this.read()) !== null) {
const outBuff = Buffer.alloc(1 + 4 + chunk.length) // ②
outBuff.writeUInt8(i, 0)
outBuff.writeUInt32BE(chunk.length, 1)
chunk.copy(outBuff, 5)
console.log(`Sending packet to channel: ${i}`)
destination.write(outBuff) // ③
}
})
.on('end', () => { // ④
if (--openChannels === 0) {
destination.end()
}
})
}
}
const socket = connect(3000, '127.0.0.1', () => {
+ const sourceFiles = [
+ createReadStream('/Users/cuishimin/Documents/space/ndp/client.mjs'),
+ createReadStream('/Users/cuishimin/Documents/space/ndp/server.mjs'),
+ ]
multiplexChannels(sourceFiles, socket)
})
// server.mjs
import { createWriteStream } from 'fs'
import { createServer } from 'net'
import { resolve } from 'path'
function demultiplexChannel (source, destinations) {
let currentChannel = null
let currentLength = null
source
.on('readable', () => { // ①
let chunk
if (currentChannel === null) { // ②
chunk = source.read(1)
currentChannel = chunk && chunk.readUInt8(0)
}
if (currentLength === null) { // ③
chunk = source.read(4)
currentLength = chunk && chunk.readUInt32BE(0)
if (currentLength === null) {
return null
}
}
chunk = source.read(currentLength) // ④
if (chunk === null) {
return null
}
console.log(`Received packet from: ${currentChannel}`)
destinations[currentChannel].write(chunk) // ⑤
currentChannel = null
currentLength = null
})
.on('end', () => { // ⑥
destinations.forEach(destination => destination.end())
console.log('Source channel closed')
})
}
const server = createServer(socket => {
+ const destFiles = [
+ createWriteStream('/tmp/filetcp/1.txt'),
+ createWriteStream('/tmp/filetcp/2.txt'),
+ ]
demultiplexChannel(socket, destFiles)
})
server.listen(3000, () => console.log('Server started'))
But when I add highWaterMark option to file read stream of client.mjs like below:
-const sourceFiles = [
- createReadStream('/Users/cuishimin/Documents/space/ndp/client.mjs'),
- createReadStream('/Users/cuishimin/Documents/space/ndp/server.mjs'),
- ]
+ const sourceFiles = [
+ createReadStream('/Users/cuishimin/Documents/space/ndp/client.mjs', { highWaterMark: 8 }),
+ createReadStream('/Users/cuishimin/Documents/space/ndp/server.mjs', { highWaterMark: 8 }),
+ ]
and run client.mjs again, something strange happened: both client and server socket hang forever. And the content of the server-generated file is incomplete, eg. 1.txt has partial content of client.mjs in this case.
I know that hightWaterMark is the maximum number of bytes of the internal buffer, but I can't figure out this has something to do with the problem, is there something that I missing?
Hi there, I'm reading multiplexing in Chapter 6, and I ran into a problem that I can't figure out.
First I copied the contents of the
client.jsandserver.jsinto my local file, but instead use file read/write stream rather thanstdoutorstderr, and the code runs smooth without any problem. The complete code is shown below:// client.mjs import { connect } from 'net' import { createReadStream } from 'fs' function multiplexChannels (sources, destination) { let openChannels = sources.length for (let i = 0; i < sources.length; i++) { sources[i] .on('readable', function () { // ① let chunk while ((chunk = this.read()) !== null) { const outBuff = Buffer.alloc(1 + 4 + chunk.length) // ② outBuff.writeUInt8(i, 0) outBuff.writeUInt32BE(chunk.length, 1) chunk.copy(outBuff, 5) console.log(`Sending packet to channel: ${i}`) destination.write(outBuff) // ③ } }) .on('end', () => { // ④ if (--openChannels === 0) { destination.end() } }) } } const socket = connect(3000, '127.0.0.1', () => { + const sourceFiles = [ + createReadStream('/Users/cuishimin/Documents/space/ndp/client.mjs'), + createReadStream('/Users/cuishimin/Documents/space/ndp/server.mjs'), + ] multiplexChannels(sourceFiles, socket) })// server.mjs import { createWriteStream } from 'fs' import { createServer } from 'net' import { resolve } from 'path' function demultiplexChannel (source, destinations) { let currentChannel = null let currentLength = null source .on('readable', () => { // ① let chunk if (currentChannel === null) { // ② chunk = source.read(1) currentChannel = chunk && chunk.readUInt8(0) } if (currentLength === null) { // ③ chunk = source.read(4) currentLength = chunk && chunk.readUInt32BE(0) if (currentLength === null) { return null } } chunk = source.read(currentLength) // ④ if (chunk === null) { return null } console.log(`Received packet from: ${currentChannel}`) destinations[currentChannel].write(chunk) // ⑤ currentChannel = null currentLength = null }) .on('end', () => { // ⑥ destinations.forEach(destination => destination.end()) console.log('Source channel closed') }) } const server = createServer(socket => { + const destFiles = [ + createWriteStream('/tmp/filetcp/1.txt'), + createWriteStream('/tmp/filetcp/2.txt'), + ] demultiplexChannel(socket, destFiles) }) server.listen(3000, () => console.log('Server started'))But when I add
highWaterMarkoption to file read stream ofclient.mjslike below:and run
client.mjsagain, something strange happened: both client and server socket hang forever. And the content of the server-generated file is incomplete, eg.1.txthas partial content ofclient.mjsin this case.I know that
hightWaterMarkis the maximum number of bytes of the internal buffer, but I can't figure out this has something to do with the problem, is there something that I missing?