Skip to content

Commit 1c95ee1

Browse files
committed
fix(write-queue): better merging
1 parent 57a0762 commit 1c95ee1

2 files changed

Lines changed: 17 additions & 16 deletions

File tree

src/download/download-engine/streams/download-engine-write-stream/download-engine-write-stream-nodejs.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ export default class DownloadEngineWriteStreamNodejs extends BaseDownloadEngineW
100100
* Buffer a write for the given cursor position.
101101
* Fragments are concatenated into a single Buffer, contiguous regions merged,
102102
* and auto-flushed when the adaptive threshold is exceeded.
103-
* Returns void (synchronous queue); errors surface via ensureBytesSynced() / close().
104103
*/
105104
write(cursor: number, buffers: Uint8Array[]) {
106105
return this._writeQueue.addWrite(cursor, buffers);

src/download/download-engine/streams/download-engine-write-stream/utils/WriteQueue.ts

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import {FileHandle} from "fs/promises";
1+
import { FileHandle } from "fs/promises";
22
import WriterIsClosedError from "../errors/writer-is-closed-error.js";
33

44
const MIN_BUFFER_SIZE = 2 * 1024 * 1024; // 2 MB
@@ -66,7 +66,7 @@ export default class WriteQueue {
6666

6767
const merged = this._tryMerge(cursor, buffers, length);
6868
if (!merged) {
69-
this._regions.push({cursor, buffers, length});
69+
this._regions.push({ cursor, buffers, length });
7070
}
7171

7272
this._totalBuffered += length;
@@ -86,14 +86,16 @@ export default class WriteQueue {
8686
if (cursor === regionEnd) {
8787
region.buffers.push(...buffers);
8888
region.length += length;
89-
return true;
90-
}
91-
92-
if (cursor + length === region.cursor) {
89+
} else if (cursor + length === region.cursor) {
9390
region.cursor = cursor;
9491
region.buffers.unshift(...buffers);
9592
region.length += length;
96-
return true;
93+
} else {
94+
continue;
95+
}
96+
97+
if (this._tryMerge(region.cursor, region.buffers, region.length)) {
98+
this._regions.splice(i, 1);
9799
}
98100
}
99101

@@ -104,30 +106,30 @@ export default class WriteQueue {
104106
* Flush all buffered regions to disk as parallel positional writes.
105107
* Non-overlapping positional writes via fd.write(buf, 0, len, position) are safe concurrently.
106108
*/
107-
private _flushNow(flashMetadata = true, flashAll = false): void | Promise<void> {
109+
private _flushNow(flushMetadata = true, flashAll = false): void | Promise<void> {
108110
if (this._regions.length === 0) return;
109111

110112
const regionsToFlush = this._regions;
111113
this._regions = [];
112114
this._totalBuffered = 0;
113115

114-
const flushPromise = this._doFlush(regionsToFlush, flashMetadata)
116+
const flushPromise = this._doFlush(regionsToFlush, flushMetadata)
115117
.finally(() => this._inFlightWrites.delete(flushPromise));
116118

117119
this._inFlightWrites.add(flushPromise);
118120

119121
return flushPromise.then(async () => {
120-
if (this._inFlightWrites.size > 0){
122+
if (this._inFlightWrites.size > 0) {
121123
await this._waitForInFlight();
122124
}
123125

124-
if (this._totalBuffered >= this._maxBufferedBytes || flashAll && this._regions.length > 0){
125-
return this._flushNow(flashMetadata, flashAll);
126+
if (this._totalBuffered >= this._maxBufferedBytes || flashAll && this._regions.length > 0) {
127+
return this._flushNow(flushMetadata, flashAll);
126128
}
127129
});
128130
}
129131

130-
private async _doFlush(regions: PendingRegion[], flashMetadata = true): Promise<void> {
132+
private async _doFlush(regions: PendingRegion[], flushMetadata = true): Promise<void> {
131133
const fdResult = this._options.getFd();
132134
const fd = fdResult instanceof Promise ? await fdResult : fdResult;
133135

@@ -138,7 +140,7 @@ export default class WriteQueue {
138140

139141
await Promise.all(writes);
140142

141-
if (flashMetadata){
143+
if (flushMetadata) {
142144
await this._options.flushMetadata();
143145
}
144146
}
@@ -148,7 +150,7 @@ export default class WriteQueue {
148150
* Called by ensureBytesSynced(), close(), ftruncate().
149151
*/
150152
async drain(): Promise<void> {
151-
if (this._inFlightWrites.size > 0){
153+
if (this._inFlightWrites.size > 0) {
152154
await this._waitForInFlight();
153155
}
154156

0 commit comments

Comments
 (0)