-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAbstractBatchEntityReaderStream.ts
More file actions
102 lines (95 loc) · 3.87 KB
/
AbstractBatchEntityReaderStream.ts
File metadata and controls
102 lines (95 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import { ObjectReadable, ObjectReadableOptions, BatchData } from "batchjs";
/**
* @interface
* Options for the AbstractBatchEntityReaderStream.
* @extends ObjectReadableOptions
*/
export interface AbstractBatchEntityReaderStreamOptions extends ObjectReadableOptions {
batchSize: number;
}
/**
* @class
* Class that enable to implement classes to read data in batches of a specified size in different types of data storage.
* @extends ObjectReadable
* @template T
*/
export abstract class AbstractBatchEntityReaderStream<T> extends ObjectReadable<T> {
private reading: boolean = false;
private finished: boolean = false;
private awaitingDrain: boolean = false;
protected buffer: BatchData<T> = [];
private readonly batchSize: number;
/**
* @constructor
* @param {AbstractBatchEntityReaderStreamOptions} options - The options for the AbstractBatchEntityReaderStream.
* @param [options.batchSize] {number} - The maximum number of elements in a batch.
*/
constructor(options: AbstractBatchEntityReaderStreamOptions) {
super(options);
this.batchSize = options.batchSize;
}
/**
* Reads a batch of data from the data storage and pushes it to the consumer stream.
* If the size parameter is not specified, it reads the number of entities specified in the `batchSize` option.
* If the size parameter is specified, it reads the minimum of the size and the `batchSize` option.
* If no data is available, it pushes null to the consumer stream to signal that the end of the stream has been reached.
* If an error occurs while reading data, it emits an error event to the stream.
*
* @param {number} [size] - The size parameter for controlling the read operation.
*/
_read(size: number): void {
if (this.reading)
return;
this.reading = true;
this.fetch(Math.min(size, this.batchSize))
.then((entities) => {
if (entities.length === 0 && !this.finished) {
this.finished=true;
}
else {
this.buffer.push(...entities);
}
})
.then(()=>this._flush())
.catch((error) => this.emit("error", error))
.finally(() => {
this.reading = false;
});
}
/**
* Flushes the buffer by pushing its content to the consumer stream. If the consumer stream is not ready to receive data, it waits for the drain event and flushes the buffer again when it is emitted.
* This function is recursive and will keep flushing the buffer until it is empty.
*
* @private
* @returns {Promise<void>} A promise that resolves when the buffer is flushed.
*/
private _flush():Promise<void>{
while (this.buffer.length > 0 && !this.awaitingDrain) {
const chunk = this.buffer.shift();
if (!this.push(chunk)) {
this.awaitingDrain=true;
const timer = setTimeout(()=>this.emit("drain"), this.drainTimeout);
this.once("drain", () => {
clearTimeout(timer);
this.awaitingDrain=false;
this._flush();
});
return Promise.resolve();
}
}
if(this.buffer.length === 0 && this.finished){
this.push(null);
}
return Promise.resolve();
};
/**
* Abstract method for fetching data from the data storage. This method should be implemented
* by subclasses to define the specific logic for reading a batch of data.
*
* @protected
* @abstract
* @param size {number} - The size parameter for controlling the read operation.
* @returns {Promise<T[]>} A promise that resolves with an array of entities.
*/
protected abstract fetch(size: number): Promise<T[]>;
}