diff --git a/package.json b/package.json index ab69427..a614845 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "batchjs-data", - "version": "1.1.0", + "version": "1.1.1", "author": { "name": "Pablo Alcaraz Martínez", "url": "https://github.com/palcarazm/" @@ -79,7 +79,7 @@ "sqlite3": "^5.1.7" }, "dependencies": { - "batchjs": "^1.2.1" + "batchjs": "^1.2.5" }, "commitlint": { "extends": "@commitlint/config-conventional" diff --git a/src/common/classes/AbstractBatchEntityReaderStream.ts b/src/common/classes/AbstractBatchEntityReaderStream.ts index bd691e6..22c5c83 100644 --- a/src/common/classes/AbstractBatchEntityReaderStream.ts +++ b/src/common/classes/AbstractBatchEntityReaderStream.ts @@ -17,6 +17,8 @@ export interface AbstractBatchEntityReaderStreamOptions extends ObjectReadableOp */ export abstract class AbstractBatchEntityReaderStream extends ObjectReadable { private reading: boolean = false; + private finished: boolean = false; + private awaitingDrain: boolean = false; protected buffer: BatchData = []; private readonly batchSize: number; @@ -40,23 +42,22 @@ export abstract class AbstractBatchEntityReaderStream extends ObjectReadable< * @param {number} [size] - The size parameter for controlling the read operation. */ _read(size: number): void { - if (this.reading) return; - + if (this.reading) + return; this.reading = true; - this.fetch(Math.min(size, this.batchSize)) .then((entities) => { - if (entities.length === 0) { - this.push(null); - } else { + if (entities.length === 0 && !this.finished) { + this.finished=true; + } + else { this.buffer.push(...entities); - this._flush().finally(() => { - this.reading = false; - }); } }) - .catch((error) => { - this.emit("error", error); + .then(()=>this._flush()) + .catch((error) => this.emit("error", error)) + .finally(() => { + this.reading = false; }); } @@ -68,13 +69,22 @@ export abstract class AbstractBatchEntityReaderStream extends ObjectReadable< * @returns {Promise} A promise that resolves when the buffer is flushed. */ private _flush():Promise{ - while (this.buffer.length > 0) { - const chunk = this.buffer.shift() as T; + while (this.buffer.length > 0 && !this.awaitingDrain) { + const chunk = this.buffer.shift(); if (!this.push(chunk)) { - this.once("drain", () => this._flush()); + this.awaitingDrain=true; + const timer = setTimeout(()=>this.emit("drain"), this.drainTimeout); + this.once("drain", () => { + clearTimeout(timer); + this.awaitingDrain=false; + this._flush(); + }); return Promise.resolve(); } } + if(this.buffer.length === 0 && this.finished){ + this.push(null); + } return Promise.resolve(); };