Skip to content

Commit d5771d4

Browse files
committed
feat: enhance performance and error handling in download engine with improved stream management and chunk processing
1 parent a718684 commit d5771d4

5 files changed

Lines changed: 204 additions & 117 deletions

File tree

src/download/download-engine/download-file/download-engine-file.ts

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { EventEmitter } from "../../../utils/EventEmitter.js";
22
import BaseDownloadEngineWriteStream from "../streams/download-engine-write-stream/base-download-engine-write-stream.js";
3-
import {ChunkStatus, DownloadFile, SaveProgressInfo} from "../types.js";
3+
import { ChunkStatus, DownloadFile, SaveProgressInfo } from "../types.js";
44
import BaseDownloadProgram from "./download-programs/base-download-program.js";
5-
import switchProgram, {AvailablePrograms} from "./download-programs/switch-program.js";
6-
import {DownloaderProgramManager} from "./downloaderProgramManager.js";
7-
import {DownloadFlags, DownloadStatus, ProgressStatus} from "./progress-status-file.js";
8-
import {pushComment} from "./utils/push-comment.js";
9-
import {randomUUID} from "crypto";
5+
import switchProgram, { AvailablePrograms } from "./download-programs/switch-program.js";
6+
import { DownloaderProgramManager } from "./downloaderProgramManager.js";
7+
import { DownloadFlags, DownloadStatus, ProgressStatus } from "./progress-status-file.js";
8+
import { pushComment } from "./utils/push-comment.js";
9+
import { randomUUID } from "crypto";
1010

1111
export type DownloadEngineFilePerPartOptions = {
1212
parallelStreams: number;
@@ -88,7 +88,7 @@ export default class DownloadEngineFile extends EventEmitter<DownloadEngineFileE
8888
public constructor(file: DownloadFile, options: DownloadEngineFileOptions) {
8989
super();
9090
this.file = file;
91-
this.options = {...DEFAULT_OPTIONS, ...options};
91+
this.options = { ...DEFAULT_OPTIONS, ...options };
9292
this._progressStatus = {
9393
totalDownloadParts: file.parts.length,
9494
fileName: file.localFileName,
@@ -132,23 +132,35 @@ export default class DownloadEngineFile extends EventEmitter<DownloadEngineFileE
132132
}
133133

134134
public get status(): ProgressStatus {
135+
let retrying = false;
136+
let retryingTotalAttempts = 0;
137+
let streamsNotResponding = 0;
138+
let streamingBytes = 0;
139+
140+
for (const key in this._activeStreamContext) {
141+
const streamContext = this._activeStreamContext[key];
142+
streamingBytes += streamContext.streamBytes;
143+
retrying ||= Boolean(streamContext.isRetrying);
144+
retryingTotalAttempts = Math.max(retryingTotalAttempts, streamContext.retryingAttempts);
145+
streamsNotResponding += Number(Boolean(streamContext.isStreamNotResponding));
146+
}
147+
148+
const transferredBytes = this._getAllTransferredBytes(streamingBytes);
149+
135150
const thisStatus: ProgressStatus = {
136151
...this._progressStatus as ProgressStatus,
137152
transferAction: this._activePart.fetchStream.transferAction,
138153
downloadId: this._progress.downloadId,
139154
downloadPart: this._progress.part + 1,
140-
transferredBytes: this.transferredBytes,
155+
transferredBytes,
141156
totalBytes: this.downloadSize,
142157
downloadStatus: this._downloadStatus,
143-
comment: this.options.comment
158+
comment: this.options.comment,
159+
retrying,
160+
retryingTotalAttempts,
161+
streamsNotResponding
144162
};
145163

146-
const streamContexts = Object.values(this._activeStreamContext);
147-
148-
thisStatus.retrying = streamContexts.some(c => c.isRetrying);
149-
thisStatus.retryingTotalAttempts = Math.max(0, ...streamContexts.map(x => x.retryingAttempts));
150-
thisStatus.streamsNotResponding = streamContexts.reduce((acc, cur) => acc + (cur.isStreamNotResponding ? 1 : 0), 0);
151-
152164
return thisStatus;
153165
}
154166

@@ -173,6 +185,10 @@ export default class DownloadEngineFile extends EventEmitter<DownloadEngineFileE
173185
const streamingBytes = Object.values(this._activeStreamContext)
174186
.reduce((acc, cur) => acc + cur.streamBytes, 0);
175187

188+
return this._getAllTransferredBytes(streamingBytes);
189+
}
190+
191+
private _getAllTransferredBytes(streamingBytes: number) {
176192
const streamBytes = this._activeDownloadedChunkSize + streamingBytes;
177193
const streamBytesMin = Math.min(streamBytes, this._activePart.downloadSize || streamBytes);
178194

@@ -304,7 +320,7 @@ export default class DownloadEngineFile extends EventEmitter<DownloadEngineFileE
304320
}
305321

306322
protected async _downloadSlice(startChunk: number, endChunk: number) {
307-
const getContext = () => this._activeStreamContext[startChunk] ??= {streamBytes: 0, retryingAttempts: 0};
323+
const getContext = () => this._activeStreamContext[startChunk] ??= { streamBytes: 0, retryingAttempts: 0 };
308324

309325
const fetchState = this._activePart.fetchStream.withSubState({
310326
chunkSize: this._progress.chunkSize,

src/download/download-engine/streams/download-engine-fetch-stream/base-download-engine-fetch-stream.ts

Lines changed: 56 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
import retry from "async-retry";
22
import { EventEmitter } from "../../../../utils/EventEmitter.js";
3-
import {withLock} from "lifecycle-utils";
3+
import { withLock } from "lifecycle-utils";
44
import prettyMillisecondsCompact from "../../../transfer-visualize/utils/prettyMSFast.js";
5-
import {AvailablePrograms} from "../../download-file/download-programs/switch-program.js";
6-
import {InputRange} from "../../engine/base-download-engine.js";
7-
import {sleepPromise} from "../../utils/sleepPromise.js";
5+
import { AvailablePrograms } from "../../download-file/download-programs/switch-program.js";
6+
import { InputRange } from "../../engine/base-download-engine.js";
7+
import { sleepPromise } from "../../utils/sleepPromise.js";
88
import HttpError from "./errors/http-error.js";
99
import StatusCodeError from "./errors/status-code-error.js";
10-
import {retryAsyncStatementSimple} from "./utils/retry-async-statement.js";
10+
import { retryAsyncStatementSimple } from "./utils/retry-async-statement.js";
1111

12-
export const STREAM_NOT_RESPONDING_TIMEOUT = 1000 * 3;
1312
export const MIN_LENGTH_FOR_MORE_INFO_REQUEST = 1024 * 1024 * 3; // 3MB
1413

1514
const TOKEN_EXPIRED_ERROR_CODES = [401, 403, 419, 440, 498, 499];
@@ -19,7 +18,15 @@ export type BaseDownloadEngineFetchStreamOptions = {
1918
retryFetchDownloadInfo?: retry.Options;
2019
range?: InputRange;
2120
/**
22-
* Max wait for next data stream
21+
* Interval read data and check if stream is not responding (default: 1s)
22+
*/
23+
streamCheckInterval?: number;
24+
/**
25+
* Max wait for stream to respond with data before raising stream not responding event (default: 3s)
26+
*/
27+
streamWaitAlert?: number;
28+
/**
29+
* Max wait for next data stream before aborting and retrying (default: 15s)
2330
*/
2431
maxStreamWait?: number;
2532
/**
@@ -41,19 +48,19 @@ export type BaseDownloadEngineFetchStreamOptions = {
4148
*/
4249
progressThrottleMs?: number;
4350
} & (
44-
{
45-
defaultFetchDownloadInfo?: { length: number, acceptRange: boolean; };
46-
} |
47-
{
48-
/**
49-
* Try different headers to see if any authentication is needed
50-
*/
51-
tryHeaders?: Record<string, string>[];
52-
/**
53-
* Delay between trying different headers
54-
*/
55-
tryHeadersDelay?: number;
56-
});
51+
{
52+
defaultFetchDownloadInfo?: { length: number, acceptRange: boolean; };
53+
} |
54+
{
55+
/**
56+
* Try different headers to see if any authentication is needed
57+
*/
58+
tryHeaders?: Record<string, string>[];
59+
/**
60+
* Delay between trying different headers
61+
*/
62+
tryHeadersDelay?: number;
63+
});
5764

5865
export type DownloadInfoResponse = {
5966
length: number,
@@ -93,6 +100,8 @@ export type WriteCallback = (data: Uint8Array[], position: number, index: number
93100

94101
const DEFAULT_OPTIONS: BaseDownloadEngineFetchStreamOptions = {
95102
retryOnServerError: true,
103+
streamCheckInterval: 10,
104+
streamWaitAlert: 1000 * 3,
96105
maxStreamWait: 1000 * 15,
97106
headersTimeout: 1000 * 30,
98107
retry: {
@@ -111,8 +120,7 @@ const DEFAULT_OPTIONS: BaseDownloadEngineFetchStreamOptions = {
111120
range: {
112121
start: 0,
113122
end: -1
114-
},
115-
progressThrottleMs: 15
123+
}
116124
};
117125

118126
export default abstract class BaseDownloadEngineFetchStream extends EventEmitter<BaseDownloadEngineFetchStreamEvents> {
@@ -127,13 +135,16 @@ export default abstract class BaseDownloadEngineFetchStream extends EventEmitter
127135
public aborted = false;
128136
protected _pausedResolve?: () => void;
129137
protected _cleanupClonedStateListeners?: () => void;
130-
public errorCount = {value: 0};
138+
public errorCount = { value: 0 };
131139
public lastFetchTime = 0;
132140
private _closed = false;
141+
private _watchDogCalls = new Set<() => void>();
142+
private _watchDogInterval?: NodeJS.Timeout;
133143

134144
constructor(options: Partial<BaseDownloadEngineFetchStreamOptions> = {}) {
135145
super();
136-
this.options = {...DEFAULT_OPTIONS, ...options};
146+
this.options = { ...DEFAULT_OPTIONS, ...options };
147+
this.watchDog = this.watchDog.bind(this);
137148
this.initEvents();
138149
}
139150

@@ -189,6 +200,8 @@ export default abstract class BaseDownloadEngineFetchStream extends EventEmitter
189200
fetchStream._cleanupClonedStateListeners = undefined;
190201
};
191202

203+
this.watchDog = fetchStream.watchDog;
204+
192205
return fetchStream;
193206
}
194207

@@ -338,6 +351,25 @@ export default abstract class BaseDownloadEngineFetchStream extends EventEmitter
338351
return parsed.href;
339352
}
340353

354+
protected watchDog(callback: () => void) {
355+
this._watchDogCalls.add(callback);
356+
if (!this._watchDogInterval) {
357+
this._watchDogInterval = setInterval(() => {
358+
for (const cb of this._watchDogCalls) {
359+
cb();
360+
}
361+
}, this.options.streamCheckInterval!);
362+
}
363+
364+
return () => {
365+
this._watchDogCalls.delete(callback);
366+
if (this._watchDogCalls.size === 0 && this._watchDogInterval) {
367+
clearInterval(this._watchDogInterval);
368+
this._watchDogInterval = undefined;
369+
}
370+
};
371+
}
372+
341373
protected retryOnServerError(error: Error): error is StatusCodeError {
342374
return Boolean(this.options.retryOnServerError) && error instanceof StatusCodeError &&
343375
(error.statusCode >= 500 || error.statusCode === 429);

0 commit comments

Comments
 (0)