From b68af159603b99f2358e8383ea318117d4929ae9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Alcaraz=20Mart=C3=ADnez?= Date: Tue, 9 Sep 2025 18:40:01 +0200 Subject: [PATCH 1/2] fix: handle back pressure on reader prevent multiple drain listener --- package.json | 2 +- .../AbstractBatchEntityReaderStream.ts | 38 ++++++++++++------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/package.json b/package.json index ab69427..5499fa1 100644 --- a/package.json +++ b/package.json @@ -79,7 +79,7 @@ "sqlite3": "^5.1.7" }, "dependencies": { - "batchjs": "^1.2.1" + "batchjs": "^1.2.5" }, "commitlint": { "extends": "@commitlint/config-conventional" diff --git a/src/common/classes/AbstractBatchEntityReaderStream.ts b/src/common/classes/AbstractBatchEntityReaderStream.ts index bd691e6..22c5c83 100644 --- a/src/common/classes/AbstractBatchEntityReaderStream.ts +++ b/src/common/classes/AbstractBatchEntityReaderStream.ts @@ -17,6 +17,8 @@ export interface AbstractBatchEntityReaderStreamOptions extends ObjectReadableOp */ export abstract class AbstractBatchEntityReaderStream extends ObjectReadable { private reading: boolean = false; + private finished: boolean = false; + private awaitingDrain: boolean = false; protected buffer: BatchData = []; private readonly batchSize: number; @@ -40,23 +42,22 @@ export abstract class AbstractBatchEntityReaderStream extends ObjectReadable< * @param {number} [size] - The size parameter for controlling the read operation. */ _read(size: number): void { - if (this.reading) return; - + if (this.reading) + return; this.reading = true; - this.fetch(Math.min(size, this.batchSize)) .then((entities) => { - if (entities.length === 0) { - this.push(null); - } else { + if (entities.length === 0 && !this.finished) { + this.finished=true; + } + else { this.buffer.push(...entities); - this._flush().finally(() => { - this.reading = false; - }); } }) - .catch((error) => { - this.emit("error", error); + .then(()=>this._flush()) + .catch((error) => this.emit("error", error)) + .finally(() => { + this.reading = false; }); } @@ -68,13 +69,22 @@ export abstract class AbstractBatchEntityReaderStream extends ObjectReadable< * @returns {Promise} A promise that resolves when the buffer is flushed. */ private _flush():Promise{ - while (this.buffer.length > 0) { - const chunk = this.buffer.shift() as T; + while (this.buffer.length > 0 && !this.awaitingDrain) { + const chunk = this.buffer.shift(); if (!this.push(chunk)) { - this.once("drain", () => this._flush()); + this.awaitingDrain=true; + const timer = setTimeout(()=>this.emit("drain"), this.drainTimeout); + this.once("drain", () => { + clearTimeout(timer); + this.awaitingDrain=false; + this._flush(); + }); return Promise.resolve(); } } + if(this.buffer.length === 0 && this.finished){ + this.push(null); + } return Promise.resolve(); }; From 9f845e0656368081394176a288d3e10f2e7092b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Alcaraz=20Mart=C3=ADnez?= Date: Tue, 9 Sep 2025 18:42:38 +0200 Subject: [PATCH 2/2] 1.1.1 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 5499fa1..a614845 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "batchjs-data", - "version": "1.1.0", + "version": "1.1.1", "author": { "name": "Pablo Alcaraz Martínez", "url": "https://github.com/palcarazm/"