diff --git a/common/changes/@microsoft/rush/copilot-stream-cache-entry-for-http-plugin_2026-04-05-03-56.json b/common/changes/@microsoft/rush/copilot-stream-cache-entry-for-http-plugin_2026-04-05-03-56.json new file mode 100644 index 00000000000..fa3539c5131 --- /dev/null +++ b/common/changes/@microsoft/rush/copilot-stream-cache-entry-for-http-plugin_2026-04-05-03-56.json @@ -0,0 +1,11 @@ +{ + "changes": [ + { + "comment": "Add optional file-based transfer APIs (`tryDownloadCacheEntryToFileAsync`, `tryUploadCacheEntryFromFileAsync`) to `ICloudBuildCacheProvider`, allowing cache plugins to transfer cache entries directly to and from files on disk without buffering entire contents in memory. Implement in `@rushstack/rush-http-build-cache-plugin`, `@rushstack/rush-amazon-s3-build-cache-plugin`, and `@rushstack/rush-azure-storage-build-cache-plugin`. Gated behind the `useDirectFileTransfersForBuildCache` experiment.", + "type": "none", + "packageName": "@microsoft/rush" + } + ], + "packageName": "@microsoft/rush", + "email": "198982749+Copilot@users.noreply.github.com" +} \ No newline at end of file diff --git a/common/changes/@rushstack/node-core-library/copilot-stream-cache-entry-for-http-plugin_2026-04-05-04-27.json b/common/changes/@rushstack/node-core-library/copilot-stream-cache-entry-for-http-plugin_2026-04-05-04-27.json new file mode 100644 index 00000000000..7c7d4cd77b9 --- /dev/null +++ b/common/changes/@rushstack/node-core-library/copilot-stream-cache-entry-for-http-plugin_2026-04-05-04-27.json @@ -0,0 +1,11 @@ +{ + "changes": [ + { + "comment": "Add `FileSystem.createReadStream`, `FileSystem.createWriteStream`, and `FileSystem.createWriteStreamAsync` APIs for creating read and write filesystem streams.", + "type": "minor", + "packageName": "@rushstack/node-core-library" + } + ], + "packageName": "@rushstack/node-core-library", + "email": "198982749+Copilot@users.noreply.github.com" +} \ No newline at end of file diff --git a/common/reviews/api/node-core-library.api.md b/common/reviews/api/node-core-library.api.md index c5b2ebe4f9b..2a557ba9e75 100644 --- a/common/reviews/api/node-core-library.api.md +++ b/common/reviews/api/node-core-library.api.md @@ -166,12 +166,15 @@ export class FileSystem { static copyFilesAsync(options: IFileSystemCopyFilesAsyncOptions): Promise; static createHardLink(options: IFileSystemCreateLinkOptions): void; static createHardLinkAsync(options: IFileSystemCreateLinkOptions): Promise; + static createReadStream(filePath: string): FileSystemReadStream; static createSymbolicLinkFile(options: IFileSystemCreateLinkOptions): void; static createSymbolicLinkFileAsync(options: IFileSystemCreateLinkOptions): Promise; static createSymbolicLinkFolder(options: IFileSystemCreateLinkOptions): void; static createSymbolicLinkFolderAsync(options: IFileSystemCreateLinkOptions): Promise; static createSymbolicLinkJunction(options: IFileSystemCreateLinkOptions): void; static createSymbolicLinkJunctionAsync(options: IFileSystemCreateLinkOptions): Promise; + static createWriteStream(filePath: string, options?: IFileSystemCreateWriteStreamOptions): FileSystemWriteStream; + static createWriteStreamAsync(filePath: string, options?: IFileSystemCreateWriteStreamOptions): Promise; static deleteFile(filePath: string, options?: IFileSystemDeleteFileOptions): void; static deleteFileAsync(filePath: string, options?: IFileSystemDeleteFileOptions): Promise; static deleteFolder(folderPath: string): void; @@ -225,9 +228,15 @@ export type FileSystemCopyFilesAsyncFilter = (sourcePath: string, destinationPat // @public export type FileSystemCopyFilesFilter = (sourcePath: string, destinationPath: string) => boolean; +// @public +export type FileSystemReadStream = fs.ReadStream; + // @public export type FileSystemStats = fs.Stats; +// @public +export type FileSystemWriteStream = fs.WriteStream; + // @public export class FileWriter { close(): void; @@ -336,15 +345,18 @@ export interface IFileSystemCreateLinkOptions { newLinkPath: string; } +// @public +export interface IFileSystemCreateWriteStreamOptions extends IFileSystemWriteFileOptionsBase { +} + // @public export interface IFileSystemDeleteFileOptions { throwIfNotExists?: boolean; } // @public -export interface IFileSystemMoveOptions { +export interface IFileSystemMoveOptions extends IFileSystemWriteFileOptionsBase { destinationPath: string; - ensureFolderExists?: boolean; overwrite?: boolean; sourcePath: string; } @@ -367,8 +379,7 @@ export interface IFileSystemUpdateTimeParameters { } // @public -export interface IFileSystemWriteBinaryFileOptions { - ensureFolderExists?: boolean; +export interface IFileSystemWriteBinaryFileOptions extends IFileSystemWriteFileOptionsBase { } // @public @@ -377,6 +388,11 @@ export interface IFileSystemWriteFileOptions extends IFileSystemWriteBinaryFileO encoding?: Encoding; } +// @public (undocumented) +export interface IFileSystemWriteFileOptionsBase { + ensureFolderExists?: boolean; +} + // @public export interface IFileWriterFlags { append?: boolean; diff --git a/common/reviews/api/rush-amazon-s3-build-cache-plugin.api.md b/common/reviews/api/rush-amazon-s3-build-cache-plugin.api.md index e0d5fe032ad..05745fddbb8 100644 --- a/common/reviews/api/rush-amazon-s3-build-cache-plugin.api.md +++ b/common/reviews/api/rush-amazon-s3-build-cache-plugin.api.md @@ -15,6 +15,7 @@ import { WebClient } from '@rushstack/rush-sdk/lib/utilities/WebClient'; // @public export class AmazonS3Client { constructor(credentials: IAmazonS3Credentials | undefined, options: IAmazonS3BuildCacheProviderOptionsAdvanced, webClient: WebClient, terminal: ITerminal); + downloadObjectToFileAsync(objectName: string, localFilePath: string): Promise; // (undocumented) getObjectAsync(objectName: string): Promise; // (undocumented) @@ -25,6 +26,7 @@ export class AmazonS3Client { static tryDeserializeCredentials(credentialString: string | undefined): IAmazonS3Credentials | undefined; // (undocumented) uploadObjectAsync(objectName: string, objectBuffer: Buffer): Promise; + uploadObjectFromFileAsync(objectName: string, localFilePath: string): Promise; // (undocumented) static UriEncode(input: string): string; } diff --git a/common/reviews/api/rush-lib.api.md b/common/reviews/api/rush-lib.api.md index dbbb440e40b..9532ccf52e6 100644 --- a/common/reviews/api/rush-lib.api.md +++ b/common/reviews/api/rush-lib.api.md @@ -312,7 +312,7 @@ export class ExperimentsConfiguration { // @beta export class FileSystemBuildCacheProvider { constructor(options: IFileSystemBuildCacheProviderOptions); - getCacheEntryPath(cacheId: string): string; + readonly getCacheEntryPath: (cacheId: string) => string; tryGetCacheEntryPathByIdAsync(terminal: ITerminal, cacheId: string): Promise; trySetCacheEntryBufferAsync(terminal: ITerminal, cacheId: string, entryBuffer: Buffer): Promise; } @@ -345,10 +345,12 @@ export interface ICloudBuildCacheProvider { deleteCachedCredentialsAsync(terminal: ITerminal): Promise; // (undocumented) readonly isCacheWriteAllowed: boolean; + tryDownloadCacheEntryToFileAsync?(terminal: ITerminal, cacheId: string, localFilePath: string): Promise; // (undocumented) tryGetCacheEntryBufferByIdAsync(terminal: ITerminal, cacheId: string): Promise; // (undocumented) trySetCacheEntryBufferAsync(terminal: ITerminal, cacheId: string, entryBuffer: Buffer): Promise; + tryUploadCacheEntryFromFileAsync?(terminal: ITerminal, cacheId: string, localFilePath: string): Promise; // (undocumented) updateCachedCredentialAsync(terminal: ITerminal, credential: string): Promise; // (undocumented) @@ -481,6 +483,7 @@ export interface IExperimentsJson { omitImportersFromPreventManualShrinkwrapChanges?: boolean; printEventHooksOutputToConsole?: boolean; rushAlerts?: boolean; + useDirectFileTransfersForBuildCache?: boolean; useIPCScriptsInWatchMode?: boolean; usePnpmFrozenLockfileForRushInstall?: boolean; usePnpmLockfileOnlyThenFrozenLockfileForRushUpdate?: boolean; @@ -594,6 +597,7 @@ export interface _IOperationBuildCacheOptions { buildCacheConfiguration: BuildCacheConfiguration; excludeAppleDoubleFiles: boolean; terminal: ITerminal; + useDirectFileTransfersForBuildCache: boolean; } // @alpha diff --git a/libraries/node-core-library/src/FileSystem.ts b/libraries/node-core-library/src/FileSystem.ts index a2ff0b74d2f..0975847f35d 100644 --- a/libraries/node-core-library/src/FileSystem.ts +++ b/libraries/node-core-library/src/FileSystem.ts @@ -28,6 +28,24 @@ export type FileSystemStats = fs.Stats; */ export type FolderItem = fs.Dirent; +/** + * An alias for the Node.js `fs.ReadStream` object. + * + * @remarks + * This avoids the need to import the `fs` package when using the {@link FileSystem} API. + * @public + */ +export type FileSystemReadStream = fs.ReadStream; + +/** + * An alias for the Node.js `fs.WriteStream` object. + * + * @remarks + * This avoids the need to import the `fs` package when using the {@link FileSystem} API. + * @public + */ +export type FileSystemWriteStream = fs.WriteStream; + // The PosixModeBits are intended to be used with bitwise operations. /* eslint-disable no-bitwise */ @@ -44,10 +62,9 @@ export interface IFileSystemReadFolderOptions { } /** - * The options for {@link FileSystem.writeBuffersToFile} * @public */ -export interface IFileSystemWriteBinaryFileOptions { +export interface IFileSystemWriteFileOptionsBase { /** * If true, will ensure the folder is created before writing the file. * @defaultValue false @@ -55,6 +72,12 @@ export interface IFileSystemWriteBinaryFileOptions { ensureFolderExists?: boolean; } +/** + * The options for {@link FileSystem.writeBuffersToFile} + * @public + */ +export interface IFileSystemWriteBinaryFileOptions extends IFileSystemWriteFileOptionsBase {} + /** * The options for {@link FileSystem.writeFile} * @public @@ -95,7 +118,7 @@ export interface IFileSystemReadFileOptions { * The options for {@link FileSystem.move} * @public */ -export interface IFileSystemMoveOptions { +export interface IFileSystemMoveOptions extends IFileSystemWriteFileOptionsBase { /** * The path of the existing object to be moved. * The path may be absolute or relative. @@ -113,12 +136,6 @@ export interface IFileSystemMoveOptions { * @defaultValue true */ overwrite?: boolean; - - /** - * If true, will ensure the folder is created before writing the file. - * @defaultValue false - */ - ensureFolderExists?: boolean; } /** @@ -258,6 +275,12 @@ export interface IFileSystemCopyFilesOptions extends IFileSystemCopyFilesAsyncOp filter?: FileSystemCopyFilesFilter; // narrow the type to exclude FileSystemCopyFilesAsyncFilter } +/** + * The options for {@link FileSystem.createWriteStream} + * @public + */ +export interface IFileSystemCreateWriteStreamOptions extends IFileSystemWriteFileOptionsBase {} + /** * The options for {@link FileSystem.deleteFile} * @public @@ -750,10 +773,11 @@ export class FileSystem { * Writes a text string to a file on disk, overwriting the file if it already exists. * Behind the scenes it uses `fs.writeFileSync()`. * @remarks - * Throws an error if the folder doesn't exist, unless ensureFolder=true. + * Throws an error if the folder doesn't exist, unless {@link IFileSystemWriteFileOptionsBase.ensureFolderExists} + * is set to `true`. * @param filePath - The absolute or relative path of the file. * @param contents - The text that should be written to the file. - * @param options - Optional settings that can change the behavior. Type: `IWriteFileOptions` + * @param options - Optional settings that can change the behavior. */ public static writeFile( filePath: string, @@ -796,7 +820,8 @@ export class FileSystem { * multiple sources. * * @remarks - * Throws an error if the folder doesn't exist, unless ensureFolder=true. + * Throws an error if the folder doesn't exist, unless {@link IFileSystemWriteFileOptionsBase.ensureFolderExists} + * is set to `true`. * @param filePath - The absolute or relative path of the file. * @param contents - The content that should be written to the file. * @param options - Optional settings that can change the behavior. @@ -956,10 +981,11 @@ export class FileSystem { * Writes a text string to a file on disk, appending to the file if it already exists. * Behind the scenes it uses `fs.appendFileSync()`. * @remarks - * Throws an error if the folder doesn't exist, unless ensureFolder=true. + * Throws an error if the folder doesn't exist, unless {@link IFileSystemWriteFileOptionsBase.ensureFolderExists} + * is set to `true`. * @param filePath - The absolute or relative path of the file. * @param contents - The text that should be written to the file. - * @param options - Optional settings that can change the behavior. Type: `IWriteFileOptions` + * @param options - Optional settings that can change the behavior. */ public static appendToFile( filePath: string, @@ -1237,6 +1263,61 @@ export class FileSystem { }); } + /** + * Creates a readable stream for an existing file. + * Behind the scenes it uses `fs.createReadStream()`. + * + * @param filePath - The path to the file. The path may be absolute or relative. + * @returns A new readable stream for the file. + */ + public static createReadStream(filePath: string): FileSystemReadStream { + return FileSystem._wrapException(() => { + return fs.createReadStream(filePath); + }); + } + + /** + * Creates a writable stream for writing to a file. + * Behind the scenes it uses `fs.createWriteStream()`. + * + * @remarks + * Throws an error if the folder doesn't exist, unless {@link IFileSystemWriteFileOptionsBase.ensureFolderExists} + * is set to `true`. + * @param filePath - The path to the file. The path may be absolute or relative. + * @param options - Optional settings that can change the behavior. + * @returns A new writable stream for the file. + */ + public static createWriteStream( + filePath: string, + options?: IFileSystemCreateWriteStreamOptions + ): FileSystemWriteStream { + return FileSystem._wrapException(() => { + if (options?.ensureFolderExists) { + const folderPath: string = nodeJsPath.dirname(filePath); + FileSystem.ensureFolder(folderPath); + } + + return fs.createWriteStream(filePath); + }); + } + + /** + * An async version of {@link FileSystem.createWriteStream}. + */ + public static async createWriteStreamAsync( + filePath: string, + options?: IFileSystemCreateWriteStreamOptions + ): Promise { + return await FileSystem._wrapExceptionAsync(async () => { + if (options?.ensureFolderExists) { + const folderPath: string = nodeJsPath.dirname(filePath); + await FileSystem.ensureFolderAsync(folderPath); + } + + return fs.createWriteStream(filePath); + }); + } + // =============== // LINK OPERATIONS // =============== diff --git a/libraries/node-core-library/src/index.ts b/libraries/node-core-library/src/index.ts index c48a9c950a3..3f4592cf6b1 100644 --- a/libraries/node-core-library/src/index.ts +++ b/libraries/node-core-library/src/index.ts @@ -50,8 +50,11 @@ export { type IFileErrorOptions, type IFileErrorFormattingOptions, FileError } f export { AlreadyExistsBehavior, FileSystem, + type IFileSystemWriteFileOptionsBase, type FileSystemCopyFilesAsyncFilter, type FileSystemCopyFilesFilter, + type FileSystemReadStream, + type FileSystemWriteStream, type FolderItem, type FileSystemStats, type IFileSystemCopyFileBaseOptions, @@ -59,6 +62,7 @@ export { type IFileSystemCopyFilesAsyncOptions, type IFileSystemCopyFilesOptions, type IFileSystemCreateLinkOptions, + type IFileSystemCreateWriteStreamOptions, type IFileSystemDeleteFileOptions, type IFileSystemMoveOptions, type IFileSystemReadFileOptions, diff --git a/libraries/rush-lib/src/api/ExperimentsConfiguration.ts b/libraries/rush-lib/src/api/ExperimentsConfiguration.ts index f8a9ae66e34..563917d1062 100644 --- a/libraries/rush-lib/src/api/ExperimentsConfiguration.ts +++ b/libraries/rush-lib/src/api/ExperimentsConfiguration.ts @@ -136,6 +136,14 @@ export interface IExperimentsJson { * be included in the shared build cache. */ omitAppleDoubleFilesFromBuildCache?: boolean; + + /** + * If true, the build cache will use file-based APIs to transfer cache entries to and from cloud + * storage. This avoids loading the entire cache entry into memory, which can prevent out-of-memory + * errors for large build outputs. The cloud cache provider plugin must implement the optional + * file-based methods for this to take effect; otherwise it falls back to the buffer-based approach. + */ + useDirectFileTransfersForBuildCache?: boolean; } const _EXPERIMENTS_JSON_SCHEMA: JsonSchema = JsonSchema.fromLoadedObject(schemaJson); diff --git a/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts b/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts index fd41ff7ce7c..90b435d9095 100644 --- a/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts +++ b/libraries/rush-lib/src/cli/scriptActions/PhasedScriptAction.ts @@ -350,6 +350,11 @@ export class PhasedScriptAction extends BaseScriptAction i public async runAsync(): Promise { const stopwatch: Stopwatch = Stopwatch.start(); + const { + defaultSubspace, + subspacesFeatureEnabled, + pnpmOptions: { useWorkspaces } + } = this.rushConfiguration; if (this._alwaysInstall || this._installParameter?.value) { await measureAsyncFn(`${PERF_PREFIX}:install`, async () => { const { doBasicInstallAsync } = await import( @@ -373,7 +378,7 @@ export class PhasedScriptAction extends BaseScriptAction i afterInstallAsync: (subspace: Subspace) => this.rushSession.hooks.afterInstall.promise(this, subspace, variant), // Eventually we may want to allow a subspace to be selected here - subspace: this.rushConfiguration.defaultSubspace + subspace: defaultSubspace }); }); } @@ -382,14 +387,12 @@ export class PhasedScriptAction extends BaseScriptAction i await measureAsyncFn(`${PERF_PREFIX}:checkInstallFlag`, async () => { // TODO: Replace with last-install.flag when "rush link" and "rush unlink" are removed const lastLinkFlag: FlagFile = new FlagFile( - this.rushConfiguration.defaultSubspace.getSubspaceTempFolderPath(), + defaultSubspace.getSubspaceTempFolderPath(), RushConstants.lastLinkFlagFilename, {} ); // Only check for a valid link flag when subspaces is not enabled - if (!(await lastLinkFlag.isValidAsync()) && !this.rushConfiguration.subspacesFeatureEnabled) { - const useWorkspaces: boolean = - this.rushConfiguration.pnpmOptions && this.rushConfiguration.pnpmOptions.useWorkspaces; + if (!(await lastLinkFlag.isValidAsync()) && !subspacesFeatureEnabled) { if (useWorkspaces) { throw new Error('Link flag invalid.\nDid you run "rush install" or "rush update"?'); } else { @@ -513,18 +516,27 @@ export class PhasedScriptAction extends BaseScriptAction i ).IPCOperationRunnerPlugin().apply(this.hooks); } + const { + experimentsConfiguration: { + configuration: { + buildCacheWithAllowWarningsInSuccessfulBuild = false, + buildSkipWithAllowWarningsInSuccessfulBuild, + omitAppleDoubleFilesFromBuildCache: excludeAppleDoubleFiles = false, + useDirectFileTransfersForBuildCache = false, + usePnpmSyncForInjectedDependencies + } + }, + isPnpm + } = this.rushConfiguration; if (buildCacheConfiguration?.buildCacheEnabled) { terminal.writeVerboseLine(`Incremental strategy: cache restoration`); new CacheableOperationPlugin({ - allowWarningsInSuccessfulBuild: - !!this.rushConfiguration.experimentsConfiguration.configuration - .buildCacheWithAllowWarningsInSuccessfulBuild, + allowWarningsInSuccessfulBuild: buildCacheWithAllowWarningsInSuccessfulBuild, buildCacheConfiguration, cobuildConfiguration, terminal, - excludeAppleDoubleFiles: - !!this.rushConfiguration.experimentsConfiguration.configuration - .omitAppleDoubleFilesFromBuildCache + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache }).apply(this.hooks); if (this._debugBuildCacheIdsParameter.value) { @@ -534,9 +546,7 @@ export class PhasedScriptAction extends BaseScriptAction i terminal.writeVerboseLine(`Incremental strategy: output preservation`); // Explicitly disabling the build cache also disables legacy skip detection. new LegacySkipPlugin({ - allowWarningsInSuccessfulBuild: - this.rushConfiguration.experimentsConfiguration.configuration - .buildSkipWithAllowWarningsInSuccessfulBuild, + allowWarningsInSuccessfulBuild: buildSkipWithAllowWarningsInSuccessfulBuild, terminal, changedProjectsOnly, isIncrementalBuildAllowed: this._isIncrementalBuildAllowed @@ -551,12 +561,12 @@ export class PhasedScriptAction extends BaseScriptAction i if (!buildCacheConfiguration?.buildCacheEnabled) { throw new Error('You must have build cache enabled to use this option.'); } + const { BuildPlanPlugin } = await import('../../logic/operations/BuildPlanPlugin'); new BuildPlanPlugin(terminal).apply(this.hooks); } - const { configuration: experiments } = this.rushConfiguration.experimentsConfiguration; - if (this.rushConfiguration?.isPnpm && experiments?.usePnpmSyncForInjectedDependencies) { + if (isPnpm && usePnpmSyncForInjectedDependencies) { const { PnpmSyncCopyOperationPlugin } = await import( '../../logic/operations/PnpmSyncCopyOperationPlugin' ); diff --git a/libraries/rush-lib/src/logic/buildCache/FileSystemBuildCacheProvider.ts b/libraries/rush-lib/src/logic/buildCache/FileSystemBuildCacheProvider.ts index 8fdd54bf444..96af3eebb27 100644 --- a/libraries/rush-lib/src/logic/buildCache/FileSystemBuildCacheProvider.ts +++ b/libraries/rush-lib/src/logic/buildCache/FileSystemBuildCacheProvider.ts @@ -1,8 +1,6 @@ // Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license. // See LICENSE in the project root for license information. -import * as path from 'node:path'; - import { FileSystem } from '@rushstack/node-core-library'; import type { ITerminal } from '@rushstack/terminal'; @@ -32,19 +30,17 @@ const DEFAULT_BUILD_CACHE_FOLDER_NAME: string = 'build-cache'; * @beta */ export class FileSystemBuildCacheProvider { - private readonly _cacheFolderPath: string; - - public constructor(options: IFileSystemBuildCacheProviderOptions) { - this._cacheFolderPath = - options.rushUserConfiguration.buildCacheFolder || - path.join(options.rushConfiguration.commonTempFolder, DEFAULT_BUILD_CACHE_FOLDER_NAME); - } - /** * Returns the absolute disk path for the specified cache id. */ - public getCacheEntryPath(cacheId: string): string { - return path.join(this._cacheFolderPath, cacheId); + public readonly getCacheEntryPath: (cacheId: string) => string; + + public constructor(options: IFileSystemBuildCacheProviderOptions) { + const { + rushConfiguration: { commonTempFolder }, + rushUserConfiguration: { buildCacheFolder = `${commonTempFolder}/${DEFAULT_BUILD_CACHE_FOLDER_NAME}` } + } = options; + this.getCacheEntryPath = (cacheId: string) => `${buildCacheFolder}/${cacheId}`; } /** @@ -55,7 +51,8 @@ export class FileSystemBuildCacheProvider { cacheId: string ): Promise { const cacheEntryFilePath: string = this.getCacheEntryPath(cacheId); - if (await FileSystem.existsAsync(cacheEntryFilePath)) { + const cacheEntryExists: boolean = await FileSystem.existsAsync(cacheEntryFilePath); + if (cacheEntryExists) { return cacheEntryFilePath; } else { return undefined; diff --git a/libraries/rush-lib/src/logic/buildCache/ICloudBuildCacheProvider.ts b/libraries/rush-lib/src/logic/buildCache/ICloudBuildCacheProvider.ts index f55a0870ad8..a10e1019282 100644 --- a/libraries/rush-lib/src/logic/buildCache/ICloudBuildCacheProvider.ts +++ b/libraries/rush-lib/src/logic/buildCache/ICloudBuildCacheProvider.ts @@ -11,6 +11,35 @@ export interface ICloudBuildCacheProvider { tryGetCacheEntryBufferByIdAsync(terminal: ITerminal, cacheId: string): Promise; trySetCacheEntryBufferAsync(terminal: ITerminal, cacheId: string, entryBuffer: Buffer): Promise; + + /** + * If implemented, the build cache will prefer to use this method over + * {@link ICloudBuildCacheProvider.tryGetCacheEntryBufferByIdAsync} to avoid loading the entire + * cache entry into memory, if possible. The implementation should download the cache entry and write it + * to the specified local file path. + * + * @returns `true` if the cache entry was found and written to the file, `false` if it was + * not found. Throws on errors. + */ + tryDownloadCacheEntryToFileAsync?( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise; + /** + * If implemented, the build cache will prefer to use this method over + * {@link ICloudBuildCacheProvider.trySetCacheEntryBufferAsync} to avoid loading the entire + * cache entry into memory, if possible. The implementation should read the cache entry from + * the specified local file path and upload it. + * + * @returns `true` if the cache entry was written to the cache, otherwise `false`. + */ + tryUploadCacheEntryFromFileAsync?( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise; + updateCachedCredentialAsync(terminal: ITerminal, credential: string): Promise; updateCachedCredentialInteractiveAsync(terminal: ITerminal): Promise; deleteCachedCredentialsAsync(terminal: ITerminal): Promise; diff --git a/libraries/rush-lib/src/logic/buildCache/OperationBuildCache.ts b/libraries/rush-lib/src/logic/buildCache/OperationBuildCache.ts index 0abf76c221b..a4070c52aed 100644 --- a/libraries/rush-lib/src/logic/buildCache/OperationBuildCache.ts +++ b/libraries/rush-lib/src/logic/buildCache/OperationBuildCache.ts @@ -32,6 +32,11 @@ export interface IOperationBuildCacheOptions { * and a companion file exists in the same directory. */ excludeAppleDoubleFiles: boolean; + /** + * If true, use file-based APIs (when available) to transfer cache entries to and from the + * cloud provider, avoiding buffering the entire entry in memory. + */ + useDirectFileTransfersForBuildCache: boolean; } /** @@ -75,6 +80,7 @@ export class OperationBuildCache { private readonly _projectOutputFolderNames: ReadonlyArray; private readonly _cacheId: string | undefined; private readonly _excludeAppleDoubleFiles: boolean; + private readonly _useDirectFileTransfersForBuildCache: boolean; private constructor(cacheId: string | undefined, options: IProjectBuildCacheOptions) { const { @@ -86,7 +92,8 @@ export class OperationBuildCache { }, project, projectOutputFolderNames, - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache } = options; this._project = project; this._localBuildCacheProvider = localCacheProvider; @@ -96,6 +103,7 @@ export class OperationBuildCache { this._projectOutputFolderNames = projectOutputFolderNames || []; this._cacheId = cacheId; this._excludeAppleDoubleFiles = excludeAppleDoubleFiles && process.platform === 'darwin'; + this._useDirectFileTransfersForBuildCache = useDirectFileTransfersForBuildCache; } private static _tryGetTarUtility(terminal: ITerminal): Promise { @@ -119,7 +127,12 @@ export class OperationBuildCache { executionResult: IOperationExecutionResult, options: IOperationBuildCacheOptions ): OperationBuildCache { - const { buildCacheConfiguration, terminal, excludeAppleDoubleFiles } = options; + const { + buildCacheConfiguration, + terminal, + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache + } = options; const outputFolders: string[] = [...(executionResult.operation.settings?.outputFolderNames ?? [])]; if (executionResult.metadataFolderPath) { outputFolders.push(executionResult.metadataFolderPath); @@ -132,7 +145,8 @@ export class OperationBuildCache { phaseName: executionResult.operation.associatedPhase.name, projectOutputFolderNames: outputFolders, operationStateHash: executionResult.getStateHash(), - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache }; const cacheId: string | undefined = OperationBuildCache._getCacheId(buildCacheOptions); return new OperationBuildCache(cacheId, buildCacheOptions); @@ -152,32 +166,63 @@ export class OperationBuildCache { let localCacheEntryPath: string | undefined = await this._localBuildCacheProvider.tryGetCacheEntryPathByIdAsync(terminal, cacheId); - let cacheEntryBuffer: Buffer | undefined; + let cloudCacheHit: boolean = false; let updateLocalCacheSuccess: boolean | undefined; if (!localCacheEntryPath && this._cloudBuildCacheProvider) { terminal.writeVerboseLine( 'This project was not found in the local build cache. Querying the cloud build cache.' ); - cacheEntryBuffer = await this._cloudBuildCacheProvider.tryGetCacheEntryBufferByIdAsync( - terminal, - cacheId - ); - if (cacheEntryBuffer) { + if ( + this._useDirectFileTransfersForBuildCache && + this._cloudBuildCacheProvider.tryDownloadCacheEntryToFileAsync + ) { + // Use file-based path to avoid loading the entire cache entry into memory. + // The provider downloads directly to the local cache file. + const targetPath: string = this._localBuildCacheProvider.getCacheEntryPath(cacheId); try { - localCacheEntryPath = await this._localBuildCacheProvider.trySetCacheEntryBufferAsync( + cloudCacheHit = await this._cloudBuildCacheProvider.tryDownloadCacheEntryToFileAsync( terminal, cacheId, - cacheEntryBuffer + targetPath ); - updateLocalCacheSuccess = true; + if (cloudCacheHit) { + localCacheEntryPath = targetPath; + updateLocalCacheSuccess = true; + } } catch (e) { + terminal.writeVerboseLine(`Failed to download cache entry to local cache: ${e}`); + // Clean up any partial file left by the failed download so it isn't + // mistaken for a valid cache entry on the next build. + try { + await FileSystem.deleteFileAsync(targetPath); + } catch { + // Ignore cleanup errors (file may not have been created) + } + updateLocalCacheSuccess = false; } + } else { + const cacheEntryBuffer: Buffer | undefined = + await this._cloudBuildCacheProvider.tryGetCacheEntryBufferByIdAsync(terminal, cacheId); + if (cacheEntryBuffer) { + cloudCacheHit = true; + try { + localCacheEntryPath = await this._localBuildCacheProvider.trySetCacheEntryBufferAsync( + terminal, + cacheId, + cacheEntryBuffer + ); + updateLocalCacheSuccess = true; + } catch (e) { + terminal.writeVerboseLine(`Failed to update local cache: ${e}`); + updateLocalCacheSuccess = false; + } + } } } - if (!localCacheEntryPath && !cacheEntryBuffer) { + if (!localCacheEntryPath && !cloudCacheHit) { terminal.writeVerboseLine('This project was not found in the build cache.'); return false; } @@ -300,8 +345,6 @@ export class OperationBuildCache { return false; } - let cacheEntryBuffer: Buffer | undefined; - let setCloudCacheEntryPromise: Promise | undefined; // Note that "writeAllowed" settings (whether in config or environment) always apply to @@ -309,17 +352,29 @@ export class OperationBuildCache { // write to the local build cache. if (this._cloudBuildCacheProvider?.isCacheWriteAllowed) { - if (localCacheEntryPath) { - cacheEntryBuffer = await FileSystem.readFileToBufferAsync(localCacheEntryPath); - } else { + if (!localCacheEntryPath) { throw new InternalError('Expected the local cache entry path to be set.'); } - setCloudCacheEntryPromise = this._cloudBuildCacheProvider?.trySetCacheEntryBufferAsync( - terminal, - cacheId, - cacheEntryBuffer - ); + if ( + this._useDirectFileTransfersForBuildCache && + this._cloudBuildCacheProvider.tryUploadCacheEntryFromFileAsync + ) { + // Use file-based upload to avoid loading the entire cache entry into memory. + // The provider reads from the local cache file directly. + setCloudCacheEntryPromise = this._cloudBuildCacheProvider.tryUploadCacheEntryFromFileAsync( + terminal, + cacheId, + localCacheEntryPath + ); + } else { + const cacheEntryBuffer: Buffer = await FileSystem.readFileToBufferAsync(localCacheEntryPath); + setCloudCacheEntryPromise = this._cloudBuildCacheProvider.trySetCacheEntryBufferAsync( + terminal, + cacheId, + cacheEntryBuffer + ); + } } const updateCloudCacheSuccess: boolean | undefined = (await setCloudCacheEntryPromise) ?? true; diff --git a/libraries/rush-lib/src/logic/buildCache/test/OperationBuildCache.test.ts b/libraries/rush-lib/src/logic/buildCache/test/OperationBuildCache.test.ts index 08a35a85428..c4f10daa36a 100644 --- a/libraries/rush-lib/src/logic/buildCache/test/OperationBuildCache.test.ts +++ b/libraries/rush-lib/src/logic/buildCache/test/OperationBuildCache.test.ts @@ -59,7 +59,8 @@ describe(OperationBuildCache.name, () => { operationStateHash: '1926f30e8ed24cb47be89aea39e7efd70fcda075', terminal, phaseName: 'build', - excludeAppleDoubleFiles: !!options.excludeAppleDoubleFiles + excludeAppleDoubleFiles: !!options.excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache: false }); return subject; diff --git a/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts b/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts index 90af287b9dd..5e7468fd8d6 100644 --- a/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts +++ b/libraries/rush-lib/src/logic/operations/CacheableOperationPlugin.ts @@ -77,6 +77,7 @@ export interface ICacheableOperationPluginOptions { cobuildConfiguration: CobuildConfiguration | undefined; terminal: ITerminal; excludeAppleDoubleFiles: boolean; + useDirectFileTransfersForBuildCache: boolean; } interface ITryGetOperationBuildCacheOptionsBase { @@ -84,6 +85,7 @@ interface ITryGetOperationBuildCacheOptionsBase { buildCacheConfiguration: BuildCacheConfiguration | undefined; terminal: ITerminal; excludeAppleDoubleFiles: boolean; + useDirectFileTransfersForBuildCache: boolean; record: TRecord; } @@ -108,7 +110,8 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { allowWarningsInSuccessfulBuild, buildCacheConfiguration, cobuildConfiguration, - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache } = this._options; hooks.beforeExecuteOperations.tap( @@ -272,7 +275,8 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { buildCacheConfiguration, terminal: buildCacheTerminal, record, - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache }); // Try to acquire the cobuild lock @@ -291,7 +295,8 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { buildCacheContext, record, terminal: buildCacheTerminal, - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache }); if (operationBuildCache) { buildCacheTerminal.writeVerboseLine( @@ -585,7 +590,14 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { private _tryGetOperationBuildCache( options: ITryGetOperationBuildCacheOptions ): OperationBuildCache | undefined { - const { buildCacheConfiguration, buildCacheContext, terminal, record, excludeAppleDoubleFiles } = options; + const { + buildCacheConfiguration, + buildCacheContext, + terminal, + record, + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache + } = options; if (!buildCacheContext.operationBuildCache) { const { cacheDisabledReason } = buildCacheContext; if (cacheDisabledReason && !record.operation.settings?.allowCobuildWithoutCache) { @@ -601,7 +613,8 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { buildCacheContext.operationBuildCache = OperationBuildCache.forOperation(record, { buildCacheConfiguration, terminal, - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache }); } @@ -618,7 +631,8 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { cobuildConfiguration, record, terminal, - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache } = options; if (!buildCacheConfiguration?.buildCacheEnabled) { @@ -649,7 +663,8 @@ export class CacheableOperationPlugin implements IPhasedCommandPlugin { terminal, operationStateHash, phaseName: associatedPhase.name, - excludeAppleDoubleFiles + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache }); buildCacheContext.operationBuildCache = operationBuildCache; diff --git a/libraries/rush-lib/src/schemas/experiments.schema.json b/libraries/rush-lib/src/schemas/experiments.schema.json index 8a92fa9ee67..d32934010ae 100644 --- a/libraries/rush-lib/src/schemas/experiments.schema.json +++ b/libraries/rush-lib/src/schemas/experiments.schema.json @@ -85,6 +85,10 @@ "omitAppleDoubleFilesFromBuildCache": { "description": "If true, when running on macOS, Rush will omit AppleDouble files (._*) from build cache archives when a companion file exists in the same directory. AppleDouble files are automatically created by macOS to store extended attributes on filesystems that don't support them, and should generally not be included in the shared build cache.", "type": "boolean" + }, + "useDirectFileTransfersForBuildCache": { + "description": "If true, the build cache will use file-based APIs to transfer cache entries to and from cloud storage. This avoids loading the entire cache entry into memory, which can prevent out-of-memory errors for large build outputs. The cloud cache provider plugin must implement the optional file-based methods for this to take effect; otherwise it falls back to the buffer-based approach.", + "type": "boolean" } }, "additionalProperties": false diff --git a/libraries/rush-lib/src/utilities/WebClient.ts b/libraries/rush-lib/src/utilities/WebClient.ts index bdd16823332..6b4e0e4140d 100644 --- a/libraries/rush-lib/src/utilities/WebClient.ts +++ b/libraries/rush-lib/src/utilities/WebClient.ts @@ -3,27 +3,44 @@ import * as os from 'node:os'; import * as process from 'node:process'; -import { request as httpRequest, type IncomingMessage, type Agent as HttpAgent } from 'node:http'; +import type { Readable } from 'node:stream'; +import { + request as httpRequest, + type IncomingMessage, + type ClientRequest, + type Agent as HttpAgent +} from 'node:http'; import { request as httpsRequest, type RequestOptions } from 'node:https'; import { Import, LegacyAdapters } from '@rushstack/node-core-library'; const createHttpsProxyAgent: typeof import('https-proxy-agent') = Import.lazy('https-proxy-agent', require); -/** - * For use with {@link WebClient}. - */ -export interface IWebClientResponse { +export interface IWebClientResponseBase { ok: boolean; status: number; statusText?: string; redirected: boolean; headers: Record; +} + +/** + * A response from {@link WebClient.fetchAsync}. + */ +export interface IWebClientResponse extends IWebClientResponseBase { getTextAsync: () => Promise; getJsonAsync: () => Promise; getBufferAsync: () => Promise; } +/** + * A response from {@link WebClient.fetchStreamAsync} that provides the response body as a + * readable stream, avoiding buffering the entire response in memory. + */ +export interface IWebClientStreamResponse extends IWebClientResponseBase { + stream: Readable; +} + /** * For use with {@link WebClient}. */ @@ -49,7 +66,7 @@ export interface IGetFetchOptions extends IWebFetchOptionsBase { */ export interface IFetchOptionsWithBody extends IWebFetchOptionsBase { verb: 'PUT' | 'POST' | 'PATCH'; - body?: Buffer; + body?: Buffer | Readable; } /** @@ -78,140 +95,350 @@ const ACCEPT_HEADER_NAME: 'accept' = 'accept'; const USER_AGENT_HEADER_NAME: 'user-agent' = 'user-agent'; const CONTENT_ENCODING_HEADER_NAME: 'content-encoding' = 'content-encoding'; -const makeRequestAsync: FetchFn = async ( +/** + * Parses the Content-Encoding header into an array of encoding names, + * or returns `undefined` if decoding should be skipped. + */ +function _getContentEncodings( + headers: Record, + noDecode: boolean | undefined +): string[] | undefined { + if (!noDecode) { + const encodings: string | string[] | undefined = headers[CONTENT_ENCODING_HEADER_NAME]; + if (encodings) { + return Array.isArray(encodings) ? encodings : encodings.split(','); + } + } +} + +type StreamFetchFn = ( url: string, options: IRequestOptions, - redirected: boolean = false -) => { - const { body, redirect, noDecode } = options; + isRedirect?: boolean +) => Promise; - return await new Promise( - (resolve: (result: IWebClientResponse) => void, reject: (error: Error) => void) => { +/** + * Shared HTTP request core used by both buffer-based and streaming request functions. + * Handles URL parsing, protocol selection, redirect following, body sending, and error handling. + * The `handleResponse` callback is responsible for processing the response and calling + * `resolve`/`reject` to complete the outer promise. + */ +function _makeRawRequestAsync( + url: string, + options: IRequestOptions, + redirected: boolean, + handleResponse: ( + response: IncomingMessage, + redirected: boolean, + resolve: (result: TResponse | PromiseLike) => void, + reject: (error: Error) => void + ) => void, + requestFnAsync: (url: string, options: IRequestOptions, isRedirect?: boolean) => Promise +): Promise { + const { body, redirect } = options; + + return new Promise( + (resolve: (result: TResponse | PromiseLike) => void, reject: (error: Error) => void) => { const parsedUrl: URL = typeof url === 'string' ? new URL(url) : url; const requestFunction: typeof httpRequest | typeof httpsRequest = parsedUrl.protocol === 'https:' ? httpsRequest : httpRequest; - requestFunction(url, options, (response: IncomingMessage) => { - const responseBuffers: (Buffer | Uint8Array)[] = []; - response.on('data', (chunk: string | Buffer | Uint8Array) => { - responseBuffers.push(Buffer.from(chunk)); - }); - response.on('end', () => { - // Handle retries by calling the method recursively with the redirect URL - const statusCode: number | undefined = response.statusCode; - if (statusCode === 301 || statusCode === 302) { - switch (redirect) { - case 'follow': { - const redirectUrl: string | string[] | undefined = response.headers.location; - if (redirectUrl) { - makeRequestAsync(redirectUrl, options, true).then(resolve).catch(reject); - } else { - reject( - new Error(`Received status code ${response.statusCode} with no location header: ${url}`) - ); - } - - break; + const req: ClientRequest = requestFunction(url, options, (response: IncomingMessage) => { + const { + statusCode, + headers: { location: redirectUrl } + } = response; + if (statusCode === 301 || statusCode === 302) { + switch (redirect) { + case 'follow': { + // Drain the redirect response since we're discarding it + response.resume(); + if (redirectUrl) { + requestFnAsync(redirectUrl, options, true).then(resolve).catch(reject); + } else { + reject(new Error(`Received status code ${statusCode} with no location header: ${url}`)); } - case 'error': - reject(new Error(`Received status code ${response.statusCode}: ${url}`)); - return; + return; } + + case 'error': + response.resume(); + reject(new Error(`Received status code ${statusCode}: ${url}`)); + return; } + } - const responseData: Buffer = Buffer.concat(responseBuffers); - const status: number = response.statusCode || 0; - const statusText: string | undefined = response.statusMessage; - const headers: Record = response.headers; - - let bodyString: string | undefined; - let bodyJson: unknown | undefined; - let decodedBuffer: Buffer | undefined; - const result: IWebClientResponse = { - ok: status >= 200 && status < 300, - status, - statusText, - redirected, - headers, - getTextAsync: async () => { - if (bodyString === undefined) { - const buffer: Buffer = await result.getBufferAsync(); - // eslint-disable-next-line require-atomic-updates - bodyString = buffer.toString(); - } + handleResponse(response, redirected, resolve, reject); + }).on('error', (error: Error) => { + reject(error); + }); + + const isStream: boolean = !!body && typeof (body as Readable).pipe === 'function'; + if (isStream) { + (body as Readable).on('error', reject); + (body as Readable).pipe(req); + } else { + req.end(body as Buffer | undefined); + } + } + ); +} - return bodyString; - }, - getJsonAsync: async () => { - if (bodyJson === undefined) { - const text: string = await result.getTextAsync(); - // eslint-disable-next-line require-atomic-updates - bodyJson = JSON.parse(text); - } +const makeRequestAsync: FetchFn = async ( + url: string, + options: IRequestOptions, + redirected: boolean = false +) => { + const { noDecode } = options; + + return _makeRawRequestAsync( + url, + options, + redirected, + ( + response: IncomingMessage, + wasRedirected: boolean, + resolve: (result: IWebClientResponse | PromiseLike) => void + ): void => { + const responseBuffers: (Buffer | Uint8Array)[] = []; + response.on('data', (chunk: string | Buffer | Uint8Array) => { + responseBuffers.push(Buffer.from(chunk)); + }); + response.on('end', () => { + const { statusCode: status = 0, statusMessage: statusText, headers } = response; + const responseData: Buffer = Buffer.concat(responseBuffers); + + let bodyString: string | undefined; + let bodyJson: unknown | undefined; + let decodedBuffer: Buffer | undefined; + const result: IWebClientResponse = { + ok: status >= 200 && status < 300, + status, + statusText, + redirected: wasRedirected, + headers, + getTextAsync: async () => { + if (bodyString === undefined) { + const buffer: Buffer = await result.getBufferAsync(); + // eslint-disable-next-line require-atomic-updates + bodyString = buffer.toString(); + } - return bodyJson as TJson; - }, - getBufferAsync: async () => { - // Determine if the buffer is compressed and decode it if necessary - if (decodedBuffer === undefined) { - let encodings: string | string[] | undefined = headers[CONTENT_ENCODING_HEADER_NAME]; - if (!noDecode && encodings !== undefined) { - const zlib: typeof import('zlib') = await import('node:zlib'); - if (!Array.isArray(encodings)) { - encodings = encodings.split(','); - } + return bodyString; + }, + getJsonAsync: async () => { + if (bodyJson === undefined) { + const text: string = await result.getTextAsync(); + // eslint-disable-next-line require-atomic-updates + bodyJson = JSON.parse(text); + } - let buffer: Buffer = responseData; - for (const encoding of encodings) { - let decompressFn: (buffer: Buffer, callback: import('zlib').CompressCallback) => void; - switch (encoding.trim()) { - case DEFLATE_ENCODING: { - decompressFn = zlib.inflate.bind(zlib); - break; - } - case GZIP_ENCODING: { - decompressFn = zlib.gunzip.bind(zlib); - break; - } - case BROTLI_ENCODING: { - decompressFn = zlib.brotliDecompress.bind(zlib); - break; - } - default: { - throw new Error(`Unsupported content-encoding: ${encodings}`); - } + return bodyJson as TJson; + }, + getBufferAsync: async () => { + // Determine if the buffer is compressed and decode it if necessary + if (decodedBuffer === undefined) { + const contentEncodings: string[] | undefined = _getContentEncodings(headers, noDecode); + if (contentEncodings) { + const zlib: typeof import('zlib') = await import('node:zlib'); + + let buffer: Buffer = responseData; + for (const encoding of contentEncodings) { + let decompressFn: (buffer: Buffer, callback: import('zlib').CompressCallback) => void; + switch (encoding.trim()) { + case DEFLATE_ENCODING: { + decompressFn = zlib.inflate.bind(zlib); + break; + } + case GZIP_ENCODING: { + decompressFn = zlib.gunzip.bind(zlib); + break; + } + case BROTLI_ENCODING: { + decompressFn = zlib.brotliDecompress.bind(zlib); + break; + } + default: { + throw new Error(`Unsupported content-encoding: ${encoding.trim()}`); } - - buffer = await LegacyAdapters.convertCallbackToPromise(decompressFn, buffer); } - // eslint-disable-next-line require-atomic-updates - decodedBuffer = buffer; - } else { - decodedBuffer = responseData; + buffer = await LegacyAdapters.convertCallbackToPromise(decompressFn, buffer); } + + // eslint-disable-next-line require-atomic-updates + decodedBuffer = buffer; + } else { + decodedBuffer = responseData; } + } + + return decodedBuffer; + } + }; + resolve(result); + }); + }, + makeRequestAsync + ); +}; - return decodedBuffer; +const makeStreamRequestAsync: StreamFetchFn = async ( + url: string, + options: IRequestOptions, + redirected: boolean = false +) => { + const { noDecode } = options; + + return _makeRawRequestAsync( + url, + options, + redirected, + ( + response: IncomingMessage, + wasRedirected: boolean, + resolve: (result: IWebClientStreamResponse | PromiseLike) => void + ): void => { + const { statusCode: status = 0, statusMessage: statusText, headers } = response; + + const buildResult = (stream: Readable): IWebClientStreamResponse => ({ + ok: status >= 200 && status < 300, + status, + statusText, + redirected: wasRedirected, + headers, + stream + }); + + // Handle Content-Encoding decompression for streaming responses, + // matching the buffer-based path's behavior in getBufferAsync() + const contentEncodings: string[] | undefined = _getContentEncodings(headers, noDecode); + + if (contentEncodings) { + // Resolve with a promise so we can lazily import zlib (same pattern as buffer path) + resolve( + (async () => { + const zlib: typeof import('zlib') = await import('node:zlib'); + + let resultStream: Readable = response; + for (const encoding of contentEncodings) { + switch (encoding.trim()) { + case DEFLATE_ENCODING: { + resultStream = resultStream.pipe(zlib.createInflate()); + break; + } + case GZIP_ENCODING: { + resultStream = resultStream.pipe(zlib.createGunzip()); + break; + } + case BROTLI_ENCODING: { + resultStream = resultStream.pipe(zlib.createBrotliDecompress()); + break; + } + default: { + throw new Error(`Unsupported content-encoding: ${encoding.trim()}`); + } + } } - }; - resolve(result); - }); - }) - .on('error', (error: Error) => { - reject(error); - }) - .end(body); - } + + return buildResult(resultStream); + })() + ); + } else { + resolve(buildResult(response)); + } + }, + makeStreamRequestAsync ); }; +// Module-level mutable state for mock injection. These must NOT be private members +// of WebClient because rush-sdk re-exports WebClient as a separate type declaration, +// and TypeScript's structural typing treats private members nominally, causing type +// incompatibility between the rush-lib and rush-sdk versions. +let _requestFnAsync: FetchFn = makeRequestAsync; +let _streamRequestFnAsync: StreamFetchFn = makeStreamRequestAsync; + +function _mergeHeaders(target: Record, source: Record): void { + for (const [name, value] of Object.entries(source)) { + target[name] = value; + } +} + +/** + * Builds the low-level IRequestOptions from WebClient instance state and caller-provided options. + * This is a module-level function (not a private method) to avoid the rush-sdk type mismatch. + */ +function buildRequestOptions( + webClient: WebClient, + options?: IGetFetchOptions | IFetchOptionsWithBody +): IRequestOptions { + const { + headers: optionsHeaders, + timeoutMs = 15 * 1000, + verb, + redirect, + body, + noDecode + } = (options as IFetchOptionsWithBody | undefined) ?? {}; + + const headers: Record = {}; + + const { standardHeaders, userAgent, accept, proxy } = webClient; + + _mergeHeaders(headers, standardHeaders); + + if (optionsHeaders) { + _mergeHeaders(headers, optionsHeaders); + } + + if (userAgent) { + headers[USER_AGENT_HEADER_NAME] = userAgent; + } + + if (accept) { + headers[ACCEPT_HEADER_NAME] = accept; + } + + let proxyUrl: string = ''; + + switch (proxy) { + case WebClientProxy.Detect: + if (process.env.HTTPS_PROXY) { + proxyUrl = process.env.HTTPS_PROXY; + } else if (process.env.HTTP_PROXY) { + proxyUrl = process.env.HTTP_PROXY; + } + break; + + case WebClientProxy.Fiddler: + // For debugging, disable cert validation + // eslint-disable-next-line + process.env['NODE_TLS_REJECT_UNAUTHORIZED'] = '0'; + proxyUrl = 'http://localhost:8888/'; + break; + } + + let agent: HttpAgent | undefined = undefined; + if (proxyUrl) { + agent = createHttpsProxyAgent(proxyUrl); + } + + return { + method: verb, + headers, + agent, + timeout: timeoutMs, + redirect, + body, + noDecode + }; +} + /** * A helper for issuing HTTP requests. */ export class WebClient { - private static _requestFn: FetchFn = makeRequestAsync; - public readonly standardHeaders: Record = {}; public accept: string | undefined = '*/*'; @@ -220,17 +447,23 @@ export class WebClient { public proxy: WebClientProxy = WebClientProxy.Detect; public static mockRequestFn(fn: FetchFn): void { - WebClient._requestFn = fn; + _requestFnAsync = fn; } public static resetMockRequestFn(): void { - WebClient._requestFn = makeRequestAsync; + _requestFnAsync = makeRequestAsync; + } + + public static mockStreamRequestFn(fn: StreamFetchFn): void { + _streamRequestFnAsync = fn; + } + + public static resetMockStreamRequestFn(): void { + _streamRequestFnAsync = makeStreamRequestAsync; } public static mergeHeaders(target: Record, source: Record): void { - for (const [name, value] of Object.entries(source)) { - target[name] = value; - } + _mergeHeaders(target, source); } public addBasicAuthHeader(userName: string, password: string): void { @@ -242,65 +475,19 @@ export class WebClient { url: string, options?: IGetFetchOptions | IFetchOptionsWithBody ): Promise { - const { - headers: optionsHeaders, - timeoutMs = 15 * 1000, - verb, - redirect, - body, - noDecode - } = (options as IFetchOptionsWithBody | undefined) ?? {}; - - const headers: Record = {}; - - WebClient.mergeHeaders(headers, this.standardHeaders); - - if (optionsHeaders) { - WebClient.mergeHeaders(headers, optionsHeaders); - } - - if (this.userAgent) { - headers[USER_AGENT_HEADER_NAME] = this.userAgent; - } - - if (this.accept) { - headers[ACCEPT_HEADER_NAME] = this.accept; - } - - let proxyUrl: string = ''; - - switch (this.proxy) { - case WebClientProxy.Detect: - if (process.env.HTTPS_PROXY) { - proxyUrl = process.env.HTTPS_PROXY; - } else if (process.env.HTTP_PROXY) { - proxyUrl = process.env.HTTP_PROXY; - } - break; - - case WebClientProxy.Fiddler: - // For debugging, disable cert validation - // eslint-disable-next-line - process.env['NODE_TLS_REJECT_UNAUTHORIZED'] = '0'; - proxyUrl = 'http://localhost:8888/'; - break; - } - - let agent: HttpAgent | undefined = undefined; - if (proxyUrl) { - agent = createHttpsProxyAgent(proxyUrl); - } + const requestInit: IRequestOptions = buildRequestOptions(this, options); + return await _requestFnAsync(url, requestInit); + } - const requestInit: IRequestOptions = { - method: verb, - headers, - agent, - timeout: timeoutMs, - redirect, - body, - noDecode - }; - - return await WebClient._requestFn(url, requestInit); + /** + * Makes an HTTP request that resolves as soon as headers are received, providing the + * response body as a readable stream. This avoids buffering the entire response in memory. + */ + public async fetchStreamAsync( + url: string, + options?: IGetFetchOptions | IFetchOptionsWithBody + ): Promise { + const requestInit: IRequestOptions = buildRequestOptions(this, options); + return await _streamRequestFnAsync(url, requestInit); } } diff --git a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3BuildCacheProvider.ts b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3BuildCacheProvider.ts index ea8d7b903c0..1410294476b 100644 --- a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3BuildCacheProvider.ts +++ b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3BuildCacheProvider.ts @@ -153,7 +153,7 @@ export class AmazonS3BuildCacheProvider implements ICloudBuildCacheProvider { ): Promise { try { const client: AmazonS3Client = await this._getS3ClientAsync(terminal); - return await client.getObjectAsync(this._s3Prefix ? `${this._s3Prefix}/${cacheId}` : cacheId); + return await client.getObjectAsync(this._getObjectName(cacheId)); } catch (e) { terminal.writeWarningLine(`Error getting cache entry from S3: ${e}`); return undefined; @@ -165,16 +165,46 @@ export class AmazonS3BuildCacheProvider implements ICloudBuildCacheProvider { cacheId: string, objectBuffer: Buffer ): Promise { - if (!this.isCacheWriteAllowed) { - terminal.writeErrorLine('Writing to S3 cache is not allowed in the current configuration.'); + if (!this._validateWriteAllowed(terminal, cacheId)) { return false; } - terminal.writeDebugLine('Uploading object with cacheId: ', cacheId); + try { + const client: AmazonS3Client = await this._getS3ClientAsync(terminal); + await client.uploadObjectAsync(this._getObjectName(cacheId), objectBuffer); + return true; + } catch (e) { + terminal.writeWarningLine(`Error uploading cache entry to S3: ${e}`); + return false; + } + } + public async tryDownloadCacheEntryToFileAsync( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise { try { const client: AmazonS3Client = await this._getS3ClientAsync(terminal); - await client.uploadObjectAsync(this._s3Prefix ? `${this._s3Prefix}/${cacheId}` : cacheId, objectBuffer); + return await client.downloadObjectToFileAsync(this._getObjectName(cacheId), localFilePath); + } catch (e) { + terminal.writeWarningLine(`Error downloading cache entry from S3: ${e}`); + return false; + } + } + + public async tryUploadCacheEntryFromFileAsync( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise { + if (!this._validateWriteAllowed(terminal, cacheId)) { + return false; + } + + try { + const client: AmazonS3Client = await this._getS3ClientAsync(terminal); + await client.uploadObjectFromFileAsync(this._getObjectName(cacheId), localFilePath); return true; } catch (e) { terminal.writeWarningLine(`Error uploading cache entry to S3: ${e}`); @@ -182,6 +212,20 @@ export class AmazonS3BuildCacheProvider implements ICloudBuildCacheProvider { } } + private _getObjectName(cacheId: string): string { + return this._s3Prefix ? `${this._s3Prefix}/${cacheId}` : cacheId; + } + + private _validateWriteAllowed(terminal: ITerminal, cacheId: string): boolean { + if (!this.isCacheWriteAllowed) { + terminal.writeErrorLine('Writing to S3 cache is not allowed in the current configuration.'); + return false; + } + + terminal.writeDebugLine('Uploading object with cacheId: ', cacheId); + return true; + } + public async updateCachedCredentialAsync(terminal: ITerminal, credential: string): Promise { await CredentialCache.usingAsync( { diff --git a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3Client.ts b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3Client.ts index 890aeba8593..89fefa89cad 100644 --- a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3Client.ts +++ b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3Client.ts @@ -2,13 +2,21 @@ // See LICENSE in the project root for license information. import * as crypto from 'node:crypto'; +import type { Readable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; -import { Async } from '@rushstack/node-core-library'; +import { + Async, + FileSystem, + type FileSystemReadStream, + type FileSystemWriteStream +} from '@rushstack/node-core-library'; import { Colorize, type ITerminal } from '@rushstack/terminal'; import { type IGetFetchOptions, type IFetchOptionsWithBody, type IWebClientResponse, + type IWebClientStreamResponse, type WebClient, AUTHORIZATION_HEADER_NAME } from '@rushstack/rush-sdk/lib/utilities/WebClient'; @@ -16,7 +24,8 @@ import { import type { IAmazonS3BuildCacheProviderOptionsAdvanced } from './AmazonS3BuildCacheProvider'; import { type IAmazonS3Credentials, fromRushEnv } from './AmazonS3Credentials'; -const CONTENT_HASH_HEADER_NAME: 'x-amz-content-sha256' = 'x-amz-content-sha256'; +const HASH_ALGORITHM: 'sha256' = 'sha256'; +const CONTENT_HASH_HEADER_NAME: `x-amz-content-${typeof HASH_ALGORITHM}` = `x-amz-content-${HASH_ALGORITHM}`; const DATE_HEADER_NAME: 'x-amz-date' = 'x-amz-date'; const HOST_HEADER_NAME: 'host' = 'host'; const SECURITY_TOKEN_HEADER_NAME: 'x-amz-security-token' = 'x-amz-security-token'; @@ -60,6 +69,18 @@ const storageRetryOptions: IStorageRetryOptions = { retryPolicyType: StorageRetryPolicyType.EXPONENTIAL }; +/** + * Computes the SHA-256 hash of a file on disk using streaming reads. + */ +async function _hashFileAsync(filePath: string): Promise { + return await new Promise((resolve, reject) => { + const hash: crypto.Hash = crypto.createHash(HASH_ALGORITHM); + const stream: FileSystemReadStream = FileSystem.createReadStream(filePath); + stream.on('data', (chunk: string | Buffer) => hash.update(chunk)); + stream.on('end', () => resolve(hash.digest('hex'))); + stream.on('error', reject); + }); +} /** * A helper for reading and updating objects on Amazon S3 * @@ -119,42 +140,14 @@ export class AmazonS3Client { public async getObjectAsync(objectName: string): Promise { this._writeDebugLine('Reading object from S3'); return await this._sendCacheRequestWithRetriesAsync(async () => { - const response: IWebClientResponse = await this._makeRequestAsync('GET', objectName); - if (response.ok) { - return { - hasNetworkError: false, - response: await response.getBufferAsync() - }; - } else if (response.status === 404) { - return { - hasNetworkError: false, - response: undefined - }; - } else if ( - (response.status === 400 || response.status === 401 || response.status === 403) && - !this._credentials - ) { - // unauthorized due to not providing credentials, - // silence error for better DX when e.g. running locally without credentials - this._writeWarningLine( - `No credentials found and received a ${response.status}`, - ' response code from the cloud storage.', - ' Maybe run rush update-cloud-credentials', - ' or set the RUSH_BUILD_CACHE_CREDENTIAL env' - ); - return { - hasNetworkError: false, - response: undefined - }; - } else if (response.status === 400 || response.status === 401 || response.status === 403) { - throw await this._getS3ErrorAsync(response); - } else { - const error: Error = await this._getS3ErrorAsync(response); - return { - hasNetworkError: true, - error - }; - } + const response: IWebClientResponse = await this._makeSignedRequestAsync('GET', objectName); + return this._handleGetResponseAsync( + response.status, + response.statusText, + response.ok, + async () => await response.getBufferAsync(), + async () => await this._getS3ErrorAsync(response) + ); }); } @@ -164,7 +157,11 @@ export class AmazonS3Client { } await this._sendCacheRequestWithRetriesAsync(async () => { - const response: IWebClientResponse = await this._makeRequestAsync('PUT', objectName, objectBuffer); + const response: IWebClientResponse = await this._makeSignedRequestAsync( + 'PUT', + objectName, + objectBuffer + ); if (!response.ok) { return { hasNetworkError: true, @@ -178,6 +175,74 @@ export class AmazonS3Client { }); } + /** + * Downloads an S3 object directly to a local file path, using streaming to avoid + * buffering the entire object in memory. Retries on transient network errors. + * + * @returns `true` if the object was found and written to the file, `false` if not found. + */ + public async downloadObjectToFileAsync(objectName: string, localFilePath: string): Promise { + this._writeDebugLine('Downloading object from S3 to file'); + const result: boolean | undefined = await this._sendCacheRequestWithRetriesAsync(async () => { + const response: IWebClientStreamResponse = await this._makeSignedRequestAsync( + 'GET', + objectName, + undefined, + true + ); + return this._handleGetResponseAsync( + response.status, + response.statusText, + response.ok, + async () => { + const writeStream: FileSystemWriteStream = await FileSystem.createWriteStreamAsync(localFilePath, { + ensureFolderExists: true + }); + await pipeline(response.stream, writeStream); + return true; + }, + async () => { + response.stream.resume(); + return new Error( + `Amazon S3 responded with status code ${response.status} (${response.statusText})` + ); + }, + () => response.stream.resume() + ); + }); + + return result ?? false; + } + + /** + * Uploads a local file to S3 using streaming, with the file's SHA-256 hash included in + * the AWS Signature V4 request for payload integrity verification. Does not retry + * because the stream is consumed after the first attempt. + */ + public async uploadObjectFromFileAsync(objectName: string, localFilePath: string): Promise { + if (!this._credentials) { + throw new Error('Credentials are required to upload objects to S3.'); + } + + // Compute SHA-256 hash of the file before uploading so we can sign the payload + const contentHash: string = await _hashFileAsync(localFilePath); + const entryStream: FileSystemReadStream = FileSystem.createReadStream(localFilePath); + + // Streaming uploads cannot be retried because the stream is consumed after the first attempt. + const response: IWebClientStreamResponse = await this._makeSignedRequestAsync( + 'PUT', + objectName, + entryStream as Readable, + true, + contentHash + ); + if (!response.ok) { + response.stream.resume(); + throw new Error(`Amazon S3 responded with status code ${response.status} (${response.statusText})`); + } + response.stream.resume(); + } + private _writeDebugLine(...messageParts: string[]): void { // if the terminal has been closed then don't bother sending a debug message try { @@ -196,13 +261,106 @@ export class AmazonS3Client { } } - private async _makeRequestAsync( + /** + * Shared response handling for GET requests (both buffer and stream). + * The `getSuccessResult` callback extracts the response payload (Buffer or Readable). + * The `getError` callback constructs an error from the response. + * The optional `cleanup` callback drains stream responses on non-success paths. + */ + private async _handleGetResponseAsync( + status: number, + statusText: string | undefined, + ok: boolean, + getSuccessResult: () => T | Promise, + getError: () => Promise, + cleanup?: () => void + ): Promise> { + if (ok) { + return { + hasNetworkError: false, + response: await getSuccessResult() + }; + } else if (status === 404) { + cleanup?.(); + return { + hasNetworkError: false, + response: undefined + }; + } else if ((status === 400 || status === 401 || status === 403) && !this._credentials) { + cleanup?.(); + // unauthorized due to not providing credentials, + // silence error for better DX when e.g. running locally without credentials + this._writeWarningLine( + `No credentials found and received a ${status}`, + ' response code from the cloud storage.', + ' Maybe run rush update-cloud-credentials', + ' or set the RUSH_BUILD_CACHE_CREDENTIAL env' + ); + return { + hasNetworkError: false, + response: undefined + }; + } else if (status === 400 || status === 401 || status === 403) { + cleanup?.(); + throw await getError(); + } else { + cleanup?.(); + const error: Error = await getError(); + return { + hasNetworkError: true, + error + }; + } + } + + private async _makeSignedRequestAsync( verb: 'GET' | 'PUT', objectName: string, body?: Buffer - ): Promise { + ): Promise; + private async _makeSignedRequestAsync( + verb: 'GET' | 'PUT', + objectName: string, + body: Readable | undefined, + stream: true, + contentHash?: string + ): Promise; + private async _makeSignedRequestAsync( + verb: 'GET' | 'PUT', + objectName: string, + body?: Buffer | Readable, + stream?: boolean, + contentHash?: string + ): Promise { + // Use the provided content hash if available (e.g. pre-computed from a file on disk), + // otherwise compute from the buffer body, or use the empty hash for GET requests. + const bodyHash: string = contentHash ?? this._getBufferSha256(Buffer.isBuffer(body) ? body : undefined); + const { url, headers } = this._buildSignedRequest(verb, objectName, bodyHash); + + const webFetchOptions: IGetFetchOptions | IFetchOptionsWithBody = { + verb, + headers + }; + if (verb === 'PUT' && body) { + (webFetchOptions as IFetchOptionsWithBody).body = body; + } + + if (stream) { + return await this._webClient.fetchStreamAsync(url, webFetchOptions); + } else { + return await this._webClient.fetchAsync(url, webFetchOptions); + } + } + + /** + * Builds an AWS Signature V4 signed request, returning the URL and signed headers. + */ + private _buildSignedRequest( + verb: 'GET' | 'PUT', + objectName: string, + bodyHash: string + ): { url: string; headers: Record } { const isoDateString: IIsoDateString = this._getIsoDateString(); - const bodyHash: string = this._getSha256(body); const headers: Record = {}; headers[DATE_HEADER_NAME] = isoDateString.dateTime; headers[CONTENT_HASH_HEADER_NAME] = bodyHash; @@ -266,7 +424,7 @@ export class AmazonS3Client { signedHeaderNamesString, bodyHash ].join('\n'); - const canonicalRequestHash: string = this._getSha256(canonicalRequest); + const canonicalRequestHash: string = this._getBufferSha256(canonicalRequest); const scope: string = `${isoDateString.date}/${this._s3Region}/s3/aws4_request`; // The string to sign looks like this: @@ -299,14 +457,6 @@ export class AmazonS3Client { } } - const webFetchOptions: IGetFetchOptions | IFetchOptionsWithBody = { - verb, - headers - }; - if (verb === 'PUT') { - (webFetchOptions as IFetchOptionsWithBody).body = body; - } - const url: string = `${this._s3Endpoint}${canonicalUri}`; this._writeDebugLine(Colorize.bold(Colorize.underline('Sending request to S3'))); @@ -316,9 +466,7 @@ export class AmazonS3Client { this._writeDebugLine(Colorize.cyan(`\t${name}: ${value}`)); } - const response: IWebClientResponse = await this._webClient.fetchAsync(url, webFetchOptions); - - return response; + return { url, headers }; } public _getSha256Hmac(key: string | Buffer, data: string): Buffer; @@ -333,9 +481,9 @@ export class AmazonS3Client { } } - private _getSha256(data?: string | Buffer): string { + private _getBufferSha256(data?: string | Buffer): string { if (data) { - const hash: crypto.Hash = crypto.createHash('sha256'); + const hash: crypto.Hash = crypto.createHash(HASH_ALGORITHM); hash.update(data); return hash.digest('hex'); } else { @@ -452,7 +600,7 @@ export class AmazonS3Client { } delay = Math.min(maxRetryDelayInMs, delay); - log(`Will retry request in ${delay}s...`); + log(`Will retry request in ${delay}ms...`); await Async.sleepAsync(delay); const retryResponse: RetryableRequestResponse = await sendRequest(); diff --git a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/AmazonS3Client.test.ts b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/AmazonS3Client.test.ts index 07b5eddfc73..43e0739cf5a 100644 --- a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/AmazonS3Client.test.ts +++ b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/AmazonS3Client.test.ts @@ -5,7 +5,15 @@ jest.mock('@rushstack/rush-sdk/lib/utilities/WebClient', () => { return jest.requireActual('@microsoft/rush-lib/lib/utilities/WebClient'); }); +jest.mock('node:stream/promises', () => ({ + pipeline: jest.fn().mockResolvedValue(undefined) +})); + +import { Readable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; + import { ConsoleTerminalProvider, Terminal } from '@rushstack/terminal'; +import { FileSystem } from '@rushstack/node-core-library'; import { WebClient } from '@rushstack/rush-sdk/lib/utilities/WebClient'; import type { IAmazonS3BuildCacheProviderOptionsAdvanced } from '../AmazonS3BuildCacheProvider'; @@ -634,4 +642,233 @@ describe(AmazonS3Client.name, () => { ); }); }); + + describe('File-based requests', () => { + let realDate: typeof Date; + let realSetTimeout: typeof setTimeout; + beforeEach(() => { + // mock date + realDate = global.Date; + global.Date = MockedDate as typeof Date; + + // mock setTimeout + realSetTimeout = global.setTimeout; + global.setTimeout = ((callback: () => void, time: number) => { + return realSetTimeout(callback, 1); + }).bind(global) as typeof global.setTimeout; + + jest.spyOn(FileSystem, 'ensureFolderAsync').mockResolvedValue(); + jest + .spyOn(FileSystem, 'createWriteStreamAsync') + .mockResolvedValue({} as unknown as Awaited>); + // Return a Readable that immediately ends, so _hashFileAsync completes with the null hash + jest.spyOn(FileSystem, 'createReadStream').mockReturnValue( + new Readable({ + read() { + this.push(null); + } + }) as unknown as ReturnType + ); + }); + + afterEach(() => { + jest.restoreAllMocks(); + global.Date = realDate; + global.setTimeout = realSetTimeout.bind(global); + }); + + describe('Downloading an object to file', () => { + async function makeFileGetRequestAsync( + credentials: IAmazonS3Credentials | undefined, + options: IAmazonS3BuildCacheProviderOptionsAdvanced, + objectName: string, + status: number, + statusText?: string + ): Promise<{ result: boolean; spy: jest.SpyInstance }> { + const mockStream = new Readable({ read() {} }); + + const spy: jest.SpyInstance = jest.spyOn(WebClient.prototype, 'fetchStreamAsync').mockReturnValue( + Promise.resolve({ + stream: mockStream, + headers: {}, + status, + statusText, + ok: status >= 200 && status < 300, + redirected: false + }) + ); + + const s3Client: AmazonS3Client = new AmazonS3Client(credentials, options, webClient, terminal); + const result = await s3Client.downloadObjectToFileAsync(objectName, '/tmp/cache-entry'); + return { result, spy }; + } + + it('Can download an object to file', async () => { + const { result, spy } = await makeFileGetRequestAsync( + { + accessKeyId: 'accessKeyId', + secretAccessKey: 'secretAccessKey', + sessionToken: undefined + }, + DUMMY_OPTIONS, + 'abc123', + 200 + ); + expect(result).toBe(true); + expect(spy).toHaveBeenCalledTimes(1); + expect(spy.mock.calls[0]).toMatchSnapshot(); + expect(pipeline).toHaveBeenCalled(); + spy.mockRestore(); + }); + + it('Returns false for a 404 (missing) object', async () => { + const { result, spy } = await makeFileGetRequestAsync( + { + accessKeyId: 'accessKeyId', + secretAccessKey: 'secretAccessKey', + sessionToken: undefined + }, + DUMMY_OPTIONS, + 'abc123', + 404, + 'Not Found' + ); + expect(result).toBe(false); + expect(spy).toHaveBeenCalledTimes(1); + expect(pipeline).not.toHaveBeenCalled(); + spy.mockRestore(); + }); + + it('Retries on transient server errors', async () => { + let callCount: number = 0; + const spy: jest.SpyInstance = jest + .spyOn(WebClient.prototype, 'fetchStreamAsync') + .mockImplementation(async () => { + callCount++; + const mockStream = new Readable({ read() {} }); + if (callCount < 3) { + return { + stream: mockStream, + headers: {}, + status: 500, + statusText: 'InternalServerError', + ok: false, + redirected: false + }; + } + return { + stream: mockStream, + headers: {}, + status: 200, + statusText: 'OK', + ok: true, + redirected: false + }; + }); + + const s3Client: AmazonS3Client = new AmazonS3Client( + { + accessKeyId: 'accessKeyId', + secretAccessKey: 'secretAccessKey', + sessionToken: undefined + }, + DUMMY_OPTIONS, + webClient, + terminal + ); + + const result = await s3Client.downloadObjectToFileAsync('abc123', '/tmp/cache-entry'); + expect(result).toBe(true); + // First two attempts fail with 500, third succeeds + expect(spy).toHaveBeenCalledTimes(3); + spy.mockRestore(); + }); + }); + + describe('Uploading an object from file', () => { + it('Throws an error if credentials are not provided', async () => { + const s3Client: AmazonS3Client = new AmazonS3Client( + undefined, + { s3Endpoint: 'http://foo.bar.baz', ...DUMMY_OPTIONS_WITHOUT_ENDPOINT }, + webClient, + terminal + ); + + await expect(s3Client.uploadObjectFromFileAsync('temp', '/tmp/cache-entry')).rejects.toThrow( + 'Credentials are required to upload objects to S3.' + ); + }); + + it('Uploads from file with signed payload hash', async () => { + const responseStream = new Readable({ read() {} }); + + const spy: jest.SpyInstance = jest.spyOn(WebClient.prototype, 'fetchStreamAsync').mockReturnValue( + Promise.resolve({ + stream: responseStream, + headers: {}, + status: 200, + statusText: 'OK', + ok: true, + redirected: false + }) + ); + + const s3Client: AmazonS3Client = new AmazonS3Client( + { + accessKeyId: 'accessKeyId', + secretAccessKey: 'secretAccessKey', + sessionToken: undefined + }, + DUMMY_OPTIONS, + webClient, + terminal + ); + + await s3Client.uploadObjectFromFileAsync('abc123', '/tmp/cache-entry'); + + expect(spy).toHaveBeenCalledTimes(1); + const [url, options] = spy.mock.calls[0]; + expect(url).toBe('http://localhost:9000/abc123'); + expect(options.verb).toBe('PUT'); + // Verify the content hash is a real SHA-256 hex string, NOT UNSIGNED-PAYLOAD + expect(options.headers['x-amz-content-sha256']).toMatch(/^[0-9a-f]{64}$/); + expect(options.headers['x-amz-date']).toBe('20200418T123242Z'); + // eslint-disable-next-line dot-notation + expect(options.headers['Authorization']).toContain('AWS4-HMAC-SHA256'); + spy.mockRestore(); + }); + + it('Does not retry on failure (stream consumed)', async () => { + const responseStream = new Readable({ read() {} }); + + const spy: jest.SpyInstance = jest.spyOn(WebClient.prototype, 'fetchStreamAsync').mockReturnValue( + Promise.resolve({ + stream: responseStream, + headers: {}, + status: 500, + statusText: 'InternalServerError', + ok: false, + redirected: false + }) + ); + + const s3Client: AmazonS3Client = new AmazonS3Client( + { + accessKeyId: 'accessKeyId', + secretAccessKey: 'secretAccessKey', + sessionToken: undefined + }, + DUMMY_OPTIONS, + webClient, + terminal + ); + + await expect(s3Client.uploadObjectFromFileAsync('abc123', '/tmp/cache-entry')).rejects.toThrow('500'); + + // Only 1 call - no retry for file-based uploads + expect(spy).toHaveBeenCalledTimes(1); + spy.mockRestore(); + }); + }); + }); }); diff --git a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/__snapshots__/AmazonS3Client.test.ts.snap b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/__snapshots__/AmazonS3Client.test.ts.snap index 9481b3d6983..d6297a81131 100644 --- a/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/__snapshots__/AmazonS3Client.test.ts.snap +++ b/rush-plugins/rush-amazon-s3-build-cache-plugin/src/test/__snapshots__/AmazonS3Client.test.ts.snap @@ -548,3 +548,18 @@ exports[`AmazonS3Client Rejects invalid S3 endpoint values 9`] = `"Invalid S3 en exports[`AmazonS3Client Rejects invalid S3 endpoint values 10`] = `"Invalid S3 endpoint. Some part of the hostname contains invalid characters or is too long"`; exports[`AmazonS3Client Rejects invalid S3 endpoint values 11`] = `"Invalid S3 endpoint. Some part of the hostname contains invalid characters or is too long"`; + +exports[`AmazonS3Client File-based requests Downloading an object to file Can download an object to file 1`] = ` +Array [ + "http://localhost:9000/abc123", + Object { + "headers": Object { + "Authorization": "AWS4-HMAC-SHA256 Credential=accessKeyId/20200418/us-east-1/s3/aws4_request,SignedHeaders=host;x-amz-content-sha256;x-amz-date,Signature=194608e9e7ba6d8aa4a019b3b6fd237e6b09ef1f45ff7fa60cbb81c1875538be", + "x-amz-content-sha256": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + "x-amz-date": "20200418T123242Z", + }, + "verb": "GET", + }, +] +`; + diff --git a/rush-plugins/rush-azure-storage-build-cache-plugin/src/AzureStorageBuildCacheProvider.ts b/rush-plugins/rush-azure-storage-build-cache-plugin/src/AzureStorageBuildCacheProvider.ts index 146ee2e8f00..cfc91b25ffa 100644 --- a/rush-plugins/rush-azure-storage-build-cache-plugin/src/AzureStorageBuildCacheProvider.ts +++ b/rush-plugins/rush-azure-storage-build-cache-plugin/src/AzureStorageBuildCacheProvider.ts @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license. // See LICENSE in the project root for license information. +import * as path from 'node:path'; + import { type BlobClient, BlobServiceClient, @@ -9,6 +11,7 @@ import { } from '@azure/storage-blob'; import { AzureAuthorityHosts } from '@azure/identity'; +import { FileSystem } from '@rushstack/node-core-library'; import type { ITerminal } from '@rushstack/terminal'; import { type ICloudBuildCacheProvider, @@ -75,66 +78,83 @@ export class AzureStorageBuildCacheProvider terminal: ITerminal, cacheId: string ): Promise { + return await this._tryGetBlobDataAsync(terminal, cacheId, async (blobClient: BlobClient) => { + return await blobClient.downloadToBuffer(); + }); + } + + public async trySetCacheEntryBufferAsync( + terminal: ITerminal, + cacheId: string, + entryBuffer: Buffer + ): Promise { + return await this._trySetBlobDataAsync(terminal, cacheId, async (blockBlobClient: BlockBlobClient) => { + await blockBlobClient.upload(entryBuffer, entryBuffer.length); + }); + } + + public async tryDownloadCacheEntryToFileAsync( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise { + const result: boolean | undefined = await this._tryGetBlobDataAsync( + terminal, + cacheId, + async (blobClient: BlobClient) => { + // TODO: Determine if this is necessary, or if the Azure Storage SDK handles this internally. + await FileSystem.ensureFolderAsync(path.dirname(localFilePath)); + await blobClient.downloadToFile(localFilePath); + return true; + } + ); + + return result ?? false; + } + + public async tryUploadCacheEntryFromFileAsync( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise { + return await this._trySetBlobDataAsync(terminal, cacheId, async (blockBlobClient: BlockBlobClient) => { + await blockBlobClient.uploadFile(localFilePath); + }); + } + + /** + * Shared logic for both buffer-based and file-based GET operations. + * Checks if the blob exists, retrieves data via the provided callback, and handles errors. + */ + private async _tryGetBlobDataAsync( + terminal: ITerminal, + cacheId: string, + getBlobDataAsync: (blobClient: BlobClient) => Promise + ): Promise { const blobClient: BlobClient = await this._getBlobClientForCacheIdAsync(cacheId, terminal); try { const blobExists: boolean = await blobClient.exists(); if (blobExists) { - return await blobClient.downloadToBuffer(); + return await getBlobDataAsync(blobClient); } else { return undefined; } } catch (err) { - const e: IBlobError = err as IBlobError; - const errorMessage: string = - 'Error getting cache entry from Azure Storage: ' + - [e.name, e.message, e.response?.status, e.response?.parsedHeaders?.errorCode] - .filter((piece: string | undefined) => piece) - .join(' '); - - if (e.response?.parsedHeaders?.errorCode === 'PublicAccessNotPermitted') { - // This error means we tried to read the cache with no credentials, but credentials are required. - // We'll assume that the configuration of the cache is correct and the user has to take action. - terminal.writeWarningLine( - `${errorMessage}\n\n` + - `You need to configure Azure Storage SAS credentials to access the build cache.\n` + - `Update the credentials by running "rush ${RushConstants.updateCloudCredentialsCommandName}", \n` + - `or provide a SAS in the ` + - `${EnvironmentVariableNames.RUSH_BUILD_CACHE_CREDENTIAL} environment variable.` - ); - } else if (e.response?.parsedHeaders?.errorCode === 'AuthenticationFailed') { - // This error means the user's credentials are incorrect, but not expired normally. They might have - // gotten corrupted somehow, or revoked manually in Azure Portal. - terminal.writeWarningLine( - `${errorMessage}\n\n` + - `Your Azure Storage SAS credentials are not valid.\n` + - `Update the credentials by running "rush ${RushConstants.updateCloudCredentialsCommandName}", \n` + - `or provide a SAS in the ` + - `${EnvironmentVariableNames.RUSH_BUILD_CACHE_CREDENTIAL} environment variable.` - ); - } else if (e.response?.parsedHeaders?.errorCode === 'AuthorizationPermissionMismatch') { - // This error is not solvable by the user, so we'll assume it is a configuration error, and revert - // to providing likely next steps on configuration. (Hopefully this error is rare for a regular - // developer, more likely this error will appear while someone is configuring the cache for the - // first time.) - terminal.writeWarningLine( - `${errorMessage}\n\n` + - `Your Azure Storage SAS credentials are valid, but do not have permission to read the build cache.\n` + - `Make sure you have added the role 'Storage Blob Data Reader' to the appropriate user(s) or group(s)\n` + - `on your storage account in the Azure Portal.` - ); - } else { - // We don't know what went wrong, hopefully we'll print something useful. - terminal.writeWarningLine(errorMessage); - } + this._logBlobError(terminal, err, 'Error getting cache entry from Azure Storage: '); return undefined; } } - public async trySetCacheEntryBufferAsync( + /** + * Shared logic for both buffer-based and file-based SET operations. + * Checks write permission, whether the blob already exists, uploads via the provided callback, + * and handles 409 conflict errors. + */ + private async _trySetBlobDataAsync( terminal: ITerminal, cacheId: string, - entryStream: Buffer + uploadAsync: (blockBlobClient: BlockBlobClient) => Promise ): Promise { if (!this.isCacheWriteAllowed) { terminal.writeErrorLine( @@ -170,7 +190,7 @@ export class AzureStorageBuildCacheProvider return true; } else { try { - await blockBlobClient.upload(entryStream, entryStream.length); + await uploadAsync(blockBlobClient); return true; } catch (e) { if ((e as IBlobError).statusCode === 409 /* conflict */) { @@ -196,6 +216,42 @@ export class AzureStorageBuildCacheProvider return client.getBlobClient(blobName); } + private _logBlobError(terminal: ITerminal, err: unknown, prefix: string): void { + const e: IBlobError = err as IBlobError; + const errorMessage: string = + prefix + + [e.name, e.message, e.response?.status, e.response?.parsedHeaders?.errorCode] + .filter((piece: string | undefined) => piece) + .join(' '); + + if (e.response?.parsedHeaders?.errorCode === 'PublicAccessNotPermitted') { + terminal.writeWarningLine( + `${errorMessage}\n\n` + + `You need to configure Azure Storage SAS credentials to access the build cache.\n` + + `Update the credentials by running "rush ${RushConstants.updateCloudCredentialsCommandName}", \n` + + `or provide a SAS in the ` + + `${EnvironmentVariableNames.RUSH_BUILD_CACHE_CREDENTIAL} environment variable.` + ); + } else if (e.response?.parsedHeaders?.errorCode === 'AuthenticationFailed') { + terminal.writeWarningLine( + `${errorMessage}\n\n` + + `Your Azure Storage SAS credentials are not valid.\n` + + `Update the credentials by running "rush ${RushConstants.updateCloudCredentialsCommandName}", \n` + + `or provide a SAS in the ` + + `${EnvironmentVariableNames.RUSH_BUILD_CACHE_CREDENTIAL} environment variable.` + ); + } else if (e.response?.parsedHeaders?.errorCode === 'AuthorizationPermissionMismatch') { + terminal.writeWarningLine( + `${errorMessage}\n\n` + + `Your Azure Storage SAS credentials are valid, but do not have permission to read the build cache.\n` + + `Make sure you have added the role 'Storage Blob Data Reader' to the appropriate user(s) or group(s)\n` + + `on your storage account in the Azure Portal.` + ); + } else { + terminal.writeWarningLine(errorMessage); + } + } + private async _getContainerClientAsync(terminal: ITerminal): Promise { if (!this._containerClient) { let sasString: string | undefined = this._environmentCredential; diff --git a/rush-plugins/rush-bridge-cache-plugin/src/BridgeCachePlugin.ts b/rush-plugins/rush-bridge-cache-plugin/src/BridgeCachePlugin.ts index f0cd91e4d52..edfcb890b97 100644 --- a/rush-plugins/rush-bridge-cache-plugin/src/BridgeCachePlugin.ts +++ b/rush-plugins/rush-bridge-cache-plugin/src/BridgeCachePlugin.ts @@ -86,7 +86,10 @@ export class BridgeCachePlugin implements IRushPlugin { buildCacheConfiguration, rushConfiguration: { experimentsConfiguration: { - configuration: { omitAppleDoubleFilesFromBuildCache } + configuration: { + omitAppleDoubleFilesFromBuildCache: excludeAppleDoubleFiles = false, + useDirectFileTransfersForBuildCache = false + } } } } = context; @@ -119,7 +122,8 @@ export class BridgeCachePlugin implements IRushPlugin { { buildCacheConfiguration, terminal, - excludeAppleDoubleFiles: !!omitAppleDoubleFilesFromBuildCache + excludeAppleDoubleFiles, + useDirectFileTransfersForBuildCache } ); diff --git a/rush-plugins/rush-http-build-cache-plugin/src/HttpBuildCacheProvider.ts b/rush-plugins/rush-http-build-cache-plugin/src/HttpBuildCacheProvider.ts index 0ff34473e7e..511a3b586fa 100644 --- a/rush-plugins/rush-http-build-cache-plugin/src/HttpBuildCacheProvider.ts +++ b/rush-plugins/rush-http-build-cache-plugin/src/HttpBuildCacheProvider.ts @@ -2,16 +2,30 @@ // See LICENSE in the project root for license information. import type { SpawnSyncReturns } from 'node:child_process'; +import type { Readable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; import { type ICredentialCacheEntry, CredentialCache } from '@rushstack/credential-cache'; -import { Executable, Async } from '@rushstack/node-core-library'; +import { + Executable, + Async, + FileSystem, + type FileSystemWriteStream, + type FileSystemReadStream +} from '@rushstack/node-core-library'; import type { ITerminal } from '@rushstack/terminal'; import { type ICloudBuildCacheProvider, type RushSession, EnvironmentConfiguration } from '@rushstack/rush-sdk'; -import { WebClient, type IWebClientResponse } from '@rushstack/rush-sdk/lib/utilities/WebClient'; +import { + WebClient, + type IGetFetchOptions, + type IFetchOptionsWithBody, + type IWebClientResponse, + type IWebClientStreamResponse +} from '@rushstack/rush-sdk/lib/utilities/WebClient'; enum CredentialsOptions { Optional, @@ -66,6 +80,7 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { private readonly _cacheKeyPrefix: string; private readonly _tokenHandler: IHttpBuildCacheTokenHandler | undefined; private readonly _minHttpRetryDelayMs: number; + private readonly _webClient: WebClient = new WebClient(); private __credentialCacheId: string | undefined; public get isCacheWriteAllowed(): boolean { @@ -104,7 +119,7 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { ): Promise { try { const result: boolean | Buffer = await this._makeHttpRequestAsync({ - terminal: terminal, + terminal, relUrl: `${this._cacheKeyPrefix}${cacheId}`, method: 'GET', body: undefined, @@ -125,16 +140,13 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { cacheId: string, objectBuffer: Buffer ): Promise { - if (!this.isCacheWriteAllowed) { - terminal.writeErrorLine('Writing to cache is not allowed in the current configuration.'); + if (!this._validateWriteAllowed(terminal, cacheId)) { return false; } - terminal.writeDebugLine('Uploading object with cacheId: ', cacheId); - try { const result: boolean | Buffer = await this._makeHttpRequestAsync({ - terminal: terminal, + terminal, relUrl: `${this._cacheKeyPrefix}${cacheId}`, method: this._uploadMethod, body: objectBuffer, @@ -150,6 +162,68 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { } } + public async tryDownloadCacheEntryToFileAsync( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise { + try { + const result: IWebClientStreamResponse | false = await this._makeHttpStreamRequestAsync({ + terminal, + relUrl: `${this._cacheKeyPrefix}${cacheId}`, + method: 'GET', + body: undefined, + warningText: 'Could not get cache entry', + maxAttempts: MAX_HTTP_CACHE_ATTEMPTS + }); + + if (result === false) { + return false; + } + + const writeStream: FileSystemWriteStream = await FileSystem.createWriteStreamAsync(localFilePath, { + ensureFolderExists: true + }); + await pipeline(result.stream, writeStream); + return true; + } catch (e) { + terminal.writeWarningLine(`Error getting cache entry: ${e}`); + return false; + } + } + + public async tryUploadCacheEntryFromFileAsync( + terminal: ITerminal, + cacheId: string, + localFilePath: string + ): Promise { + if (!this._validateWriteAllowed(terminal, cacheId)) { + return false; + } + + try { + const entryStream: FileSystemReadStream = FileSystem.createReadStream(localFilePath); + const result: IWebClientStreamResponse | false = await this._makeHttpStreamRequestAsync({ + terminal, + relUrl: `${this._cacheKeyPrefix}${cacheId}`, + method: this._uploadMethod, + body: entryStream, + warningText: 'Could not write cache entry', + // Streaming uploads cannot be retried because the stream is consumed + maxAttempts: 1 + }); + + if (result !== false) { + // Drain the response body + result.stream.resume(); + } + return result !== false; + } catch (e) { + terminal.writeWarningLine(`Error uploading cache entry: ${e}`); + return false; + } + } + public async updateCachedCredentialAsync(terminal: ITerminal, credential: string): Promise { await CredentialCache.usingAsync( { @@ -157,7 +231,7 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { }, async (credentialsCache: CredentialCache) => { credentialsCache.setCacheEntry(this._credentialCacheId, { - credential: credential + credential }); await credentialsCache.saveIfModifiedAsync(); } @@ -224,6 +298,20 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { return this.__credentialCacheId; } + /** + * Common validation for write operations. Returns `true` if writing is allowed, + * `false` if it is not (and logs an error to the terminal). + */ + private _validateWriteAllowed(terminal: ITerminal, cacheId: string): boolean { + if (!this.isCacheWriteAllowed) { + terminal.writeErrorLine('Writing to cache is not allowed in the current configuration.'); + return false; + } + + terminal.writeDebugLine('Uploading object with cacheId: ', cacheId); + return true; + } + private async _makeHttpRequestAsync(options: { terminal: ITerminal; relUrl: string; @@ -234,7 +322,68 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { maxAttempts: number; credentialOptions?: CredentialsOptions; }): Promise { - const { terminal, relUrl, method, body, warningText, readBody, credentialOptions } = options; + const { readBody } = options; + // The stream: false flag ensures the response is an IWebClientResponse + const response: IWebClientResponse | false = (await this._makeHttpCoreRequestAsync({ + ...options, + stream: false + })) as IWebClientResponse | false; + + if (response === false) { + return false; + } + + const result: Buffer | boolean = readBody ? await response.getBufferAsync() : true; + options.terminal.writeDebugLine( + `[http-build-cache] actual response: ${response.status} ${new URL(options.relUrl, this._url).href} ${ + result === true ? 'true' : result.length + } bytes` + ); + + return result; + } + + private async _makeHttpStreamRequestAsync(options: { + terminal: ITerminal; + relUrl: string; + method: 'GET' | UploadMethod; + body: Readable | undefined; + warningText: string; + maxAttempts: number; + credentialOptions?: CredentialsOptions; + }): Promise { + // The stream: true flag ensures the response is an IWebClientStreamResponse + const response: IWebClientStreamResponse | false = (await this._makeHttpCoreRequestAsync({ + ...options, + stream: true + })) as IWebClientStreamResponse | false; + + if (response === false) { + return false; + } + + options.terminal.writeDebugLine( + `[http-build-cache] stream response: ${response.status} ${new URL(options.relUrl, this._url).href}` + ); + + return response; + } + + /** + * Shared request core for both buffer-based and streaming HTTP requests. + * Handles credentials resolution, header construction, retry logic, and failure reporting. + */ + private async _makeHttpCoreRequestAsync(options: { + terminal: ITerminal; + relUrl: string; + method: 'GET' | UploadMethod; + body: Buffer | Readable | undefined; + warningText: string; + maxAttempts: number; + credentialOptions?: CredentialsOptions; + stream: boolean; + }): Promise { + const { terminal, relUrl, method, body, warningText, credentialOptions, stream } = options; const safeCredentialOptions: CredentialsOptions = credentialOptions ?? CredentialsOptions.Optional; const credentials: string | undefined = await this._tryGetCredentialsAsync(safeCredentialOptions); const url: string = new URL(relUrl, this._url).href; @@ -250,20 +399,28 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { } } - const bodyLength: number | 'unknown' = (body as { length: number })?.length || 'unknown'; + const bodyLengthDesc: string = Buffer.isBuffer(body) ? `${body.length} bytes` : 'unknown length'; - terminal.writeDebugLine(`[http-build-cache] request: ${method} ${url} ${bodyLength} bytes`); + terminal.writeDebugLine(`[http-build-cache] request: ${method} ${url} ${bodyLengthDesc}`); - const webClient: WebClient = new WebClient(); - const response: IWebClientResponse = await webClient.fetchAsync(url, { + const fetchOptions: IGetFetchOptions | IFetchOptionsWithBody = { verb: method, - headers: headers, - body: body, + headers, + body, redirect: 'follow', - timeoutMs: 0 // Use the default timeout - }); + timeoutMs: 0 // Disable timeout for streaming transfers of large cache entries + }; + + const response: IWebClientResponse | IWebClientStreamResponse = stream + ? await this._webClient.fetchStreamAsync(url, fetchOptions) + : await this._webClient.fetchAsync(url, fetchOptions); if (!response.ok) { + // Drain the response body on stream responses so the connection can be reused + if ('stream' in response) { + response.stream.resume(); + } + const isNonCredentialResponse: boolean = response.status >= 500 && response.status < 600; if ( @@ -271,15 +428,20 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { typeof credentials !== 'string' && safeCredentialOptions === CredentialsOptions.Optional ) { - // If we don't already have credentials yet, and we got a response from the server - // that is a "normal" failure (4xx), then we assume that credentials are probably - // required. Re-attempt the request, requiring credentials this time. - // - // This counts as part of the "first attempt", so it is not included in the max attempts - return await this._makeHttpRequestAsync({ - ...options, - credentialOptions: CredentialsOptions.Required - }); + // Skip credential fallback for stream bodies since the stream has already been consumed + // by the first attempt and cannot be replayed. + const isStreamBody: boolean = !!body && typeof (body as Readable).pipe === 'function'; + if (!isStreamBody) { + // If we don't already have credentials yet, and we got a response from the server + // that is a "normal" failure (4xx), then we assume that credentials are probably + // required. Re-attempt the request, requiring credentials this time. + // + // This counts as part of the "first attempt", so it is not included in the max attempts + return await this._makeHttpCoreRequestAsync({ + ...options, + credentialOptions: CredentialsOptions.Required + }); + } } if (options.maxAttempts > 1) { @@ -291,22 +453,14 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { await Async.sleepAsync(retryDelay); - return await this._makeHttpRequestAsync({ ...options, maxAttempts: options.maxAttempts - 1 }); + return await this._makeHttpCoreRequestAsync({ ...options, maxAttempts: options.maxAttempts - 1 }); } this._reportFailure(terminal, method, response, false, warningText); return false; } - const result: Buffer | boolean = readBody ? await response.getBufferAsync() : true; - - terminal.writeDebugLine( - `[http-build-cache] actual response: ${response.status} ${url} ${ - result === true ? 'true' : result.length - } bytes` - ); - - return result; + return response; } private async _tryGetCredentialsAsync(options: CredentialsOptions.Required): Promise; @@ -363,7 +517,7 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { private _getFailureType( requestMethod: string, - response: IWebClientResponse, + response: IWebClientResponse | IWebClientStreamResponse, isRedirect: boolean ): FailureType { if (response.ok) { @@ -415,7 +569,7 @@ export class HttpBuildCacheProvider implements ICloudBuildCacheProvider { private _reportFailure( terminal: ITerminal, requestMethod: string, - response: IWebClientResponse, + response: IWebClientResponse | IWebClientStreamResponse, isRedirect: boolean, message: string ): void { diff --git a/rush-plugins/rush-http-build-cache-plugin/src/test/HttpBuildCacheProvider.test.ts b/rush-plugins/rush-http-build-cache-plugin/src/test/HttpBuildCacheProvider.test.ts index a86ddb342de..b8608bb875e 100644 --- a/rush-plugins/rush-http-build-cache-plugin/src/test/HttpBuildCacheProvider.test.ts +++ b/rush-plugins/rush-http-build-cache-plugin/src/test/HttpBuildCacheProvider.test.ts @@ -5,7 +5,16 @@ jest.mock('@rushstack/rush-sdk/lib/utilities/WebClient', () => { return jest.requireActual('@microsoft/rush-lib/lib/utilities/WebClient'); }); +jest.mock('node:stream/promises', () => ({ + pipeline: jest.fn().mockResolvedValue(undefined) +})); + +import { Readable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; + import { type RushSession, EnvironmentConfiguration } from '@rushstack/rush-sdk'; +import { type ICredentialCacheEntry, CredentialCache } from '@rushstack/credential-cache'; +import { FileSystem } from '@rushstack/node-core-library'; import { StringBufferTerminalProvider, Terminal } from '@rushstack/terminal'; import { WebClient } from '@rushstack/rush-sdk/lib/utilities/WebClient'; @@ -24,24 +33,44 @@ const EXAMPLE_OPTIONS: IHttpBuildCacheProviderOptions = { minHttpRetryDelayMs: 1 }; +const WRITE_ALLOWED_OPTIONS: IHttpBuildCacheProviderOptions = { + ...EXAMPLE_OPTIONS, + isCacheWriteAllowed: true +}; + type FetchFnType = Parameters[0]; +type StreamFetchFnType = Parameters[0]; describe('HttpBuildCacheProvider', () => { let terminalBuffer: StringBufferTerminalProvider; let terminal!: Terminal; let fetchFn: jest.Mock; + let streamFetchFn: jest.Mock; beforeEach(() => { terminalBuffer = new StringBufferTerminalProvider(); terminal = new Terminal(terminalBuffer); fetchFn = jest.fn(); + streamFetchFn = jest.fn(); WebClient.mockRequestFn(fetchFn as unknown as FetchFnType); + WebClient.mockStreamRequestFn(streamFetchFn as unknown as StreamFetchFnType); + jest + .spyOn(FileSystem, 'createReadStream') + .mockReturnValue({ pipe: jest.fn() } as unknown as ReturnType); + jest + .spyOn(FileSystem, 'createWriteStreamAsync') + .mockResolvedValue({} as unknown as Awaited>); + jest.spyOn(FileSystem, 'ensureFolderAsync').mockResolvedValue(); }); afterEach(() => { WebClient.resetMockRequestFn(); + WebClient.resetMockStreamRequestFn(); + jest.restoreAllMocks(); }); + // ── Buffer-based GET ────────────────────────────────────────────────────── + describe('tryGetCacheEntryBufferByIdAsync', () => { it('prints warning if read credentials are not available', async () => { jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue(undefined); @@ -98,7 +127,7 @@ Array [ }); mocked(fetchFn).mockResolvedValueOnce({ status: 504, - statusText: 'BadGateway', + statusText: 'Gateway Timeout', ok: false }); @@ -134,9 +163,323 @@ Array [ "[ debug] [http-build-cache] request: GET https://buildcache.example.acme.com/some-key unknown bytes[n]", "[ debug] [http-build-cache] request: GET https://buildcache.example.acme.com/some-key unknown bytes[n]", "[ debug] [http-build-cache] request: GET https://buildcache.example.acme.com/some-key unknown bytes[n]", - "[warning] Could not get cache entry: HTTP 504: BadGateway[n]", + "[warning] Could not get cache entry: HTTP 504: Gateway Timeout[n]", ] `); }); + + it('returns a buffer on a successful response', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue('token123'); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(EXAMPLE_OPTIONS, session); + const expectedBuffer = Buffer.from('cache-contents'); + + mocked(fetchFn).mockResolvedValue({ + status: 200, + statusText: 'OK', + ok: true, + redirected: false, + headers: {}, + getBufferAsync: () => Promise.resolve(expectedBuffer) + }); + + const result = await provider.tryGetCacheEntryBufferByIdAsync(terminal, 'some-key'); + expect(result).toEqual(expectedBuffer); + expect(fetchFn).toHaveBeenCalledTimes(1); + }); + }); + + // ── Buffer-based SET ────────────────────────────────────────────────────── + + describe('trySetCacheEntryBufferAsync', () => { + it('returns false when cache write is not allowed', async () => { + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(EXAMPLE_OPTIONS, session); // write not allowed + + const result = await provider.trySetCacheEntryBufferAsync(terminal, 'some-key', Buffer.from('data')); + + expect(result).toBe(false); + expect(fetchFn).not.toHaveBeenCalled(); + }); + + it('uploads a buffer successfully', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue('token123'); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(WRITE_ALLOWED_OPTIONS, session); + + mocked(fetchFn).mockResolvedValue({ + status: 200, + statusText: 'OK', + ok: true, + redirected: false, + headers: {} + }); + + const result = await provider.trySetCacheEntryBufferAsync( + terminal, + 'some-key', + Buffer.from('cache-data') + ); + + expect(result).toBe(true); + expect(fetchFn).toHaveBeenCalledTimes(1); + expect(fetchFn).toHaveBeenCalledWith( + 'https://buildcache.example.acme.com/some-key', + expect.objectContaining({ + method: 'POST' + }) + ); + }); + + it('retries up to 3 times on server error', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue('token123'); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(WRITE_ALLOWED_OPTIONS, session); + + mocked(fetchFn).mockResolvedValue({ + status: 500, + statusText: 'InternalServerError', + ok: false + }); + + const result = await provider.trySetCacheEntryBufferAsync(terminal, 'some-key', Buffer.from('data')); + + expect(result).toBe(false); + expect(fetchFn).toHaveBeenCalledTimes(3); + }); + }); + + // ── File-based GET ────────────────────────────────────────────────────── + + describe('tryDownloadCacheEntryToFileAsync', () => { + it('downloads to file on a successful response', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue('token123'); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(EXAMPLE_OPTIONS, session); + const mockStream = new Readable({ read() {} }); + + mocked(streamFetchFn).mockResolvedValue({ + status: 200, + statusText: 'OK', + ok: true, + redirected: false, + headers: {}, + stream: mockStream + }); + + const result = await provider.tryDownloadCacheEntryToFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + expect(result).toBe(true); + expect(streamFetchFn).toHaveBeenCalledTimes(1); + expect(streamFetchFn).toHaveBeenCalledWith( + 'https://buildcache.example.acme.com/some-key', + expect.objectContaining({ + method: 'GET', + redirect: 'follow' + }) + ); + expect(pipeline).toHaveBeenCalledWith(mockStream, expect.anything()); + }); + + it('returns false on 404 cache miss', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue('token123'); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(EXAMPLE_OPTIONS, session); + const mockStream = new Readable({ read() {} }); + + mocked(streamFetchFn).mockResolvedValue({ + status: 404, + statusText: 'Not Found', + ok: false, + stream: mockStream + }); + + const result = await provider.tryDownloadCacheEntryToFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + expect(result).toBe(false); + expect(pipeline).not.toHaveBeenCalled(); + }); + + it('returns false on credential failure', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue(undefined); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(EXAMPLE_OPTIONS, session); + const mockStream = new Readable({ read() {} }); + + mocked(streamFetchFn).mockResolvedValue({ + status: 401, + statusText: 'Unauthorized', + ok: false, + stream: mockStream + }); + + const result = await provider.tryDownloadCacheEntryToFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + expect(result).toBe(false); + }); + + it('retries up to 3 times on server error', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue(undefined); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(EXAMPLE_OPTIONS, session); + const createMockStream = (): Readable => new Readable({ read() {} }); + + mocked(streamFetchFn).mockResolvedValueOnce({ + status: 500, + statusText: 'InternalServiceError', + ok: false, + stream: createMockStream() + }); + mocked(streamFetchFn).mockResolvedValueOnce({ + status: 503, + statusText: 'ServiceUnavailable', + ok: false, + stream: createMockStream() + }); + mocked(streamFetchFn).mockResolvedValueOnce({ + status: 504, + statusText: 'Gateway Timeout', + ok: false, + stream: createMockStream() + }); + + const result = await provider.tryDownloadCacheEntryToFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + expect(result).toBe(false); + expect(streamFetchFn).toHaveBeenCalledTimes(3); + }); + }); + + // ── File-based SET ────────────────────────────────────────────────────── + + describe('tryUploadCacheEntryFromFileAsync', () => { + it('returns false when cache write is not allowed', async () => { + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(EXAMPLE_OPTIONS, session); // write not allowed + + const result = await provider.tryUploadCacheEntryFromFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + + expect(result).toBe(false); + expect(streamFetchFn).not.toHaveBeenCalled(); + }); + + it('uploads from file successfully', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue('token123'); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(WRITE_ALLOWED_OPTIONS, session); + const responseStream = new Readable({ read() {} }); + + mocked(streamFetchFn).mockResolvedValue({ + status: 200, + statusText: 'OK', + ok: true, + redirected: false, + headers: {}, + stream: responseStream + }); + + const result = await provider.tryUploadCacheEntryFromFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + + expect(result).toBe(true); + expect(streamFetchFn).toHaveBeenCalledTimes(1); + expect(streamFetchFn).toHaveBeenCalledWith( + 'https://buildcache.example.acme.com/some-key', + expect.objectContaining({ + method: 'POST' + }) + ); + }); + + it('does not retry on failure (file stream already consumed)', async () => { + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue('token123'); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(WRITE_ALLOWED_OPTIONS, session); + const responseStream = new Readable({ read() {} }); + + mocked(streamFetchFn).mockResolvedValue({ + status: 500, + statusText: 'InternalServerError', + ok: false, + stream: responseStream + }); + + const result = await provider.tryUploadCacheEntryFromFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + + expect(result).toBe(false); + // maxAttempts is 1 for file-based uploads, so only 1 call + expect(streamFetchFn).toHaveBeenCalledTimes(1); + }); + + it('skips credential fallback for file-based uploads on 4xx', async () => { + // No credential in env for the first attempt + jest.spyOn(EnvironmentConfiguration, 'buildCacheCredential', 'get').mockReturnValue(undefined); + // But credentials ARE available in the credential cache — without the stream-body + // guard, the credential fallback would resolve these and make a second HTTP request + // with the already-consumed stream body. + jest + .spyOn(CredentialCache, 'usingAsync') + // eslint-disable-next-line @typescript-eslint/naming-convention + .mockImplementation(async (_options, fn) => { + await (fn as (cache: CredentialCache) => Promise)({ + tryGetCacheEntry: (): ICredentialCacheEntry => ({ credential: 'cached-token' }) + } as unknown as CredentialCache); + }); + + const session: RushSession = {} as RushSession; + const provider = new HttpBuildCacheProvider(WRITE_ALLOWED_OPTIONS, session); + const responseStream = new Readable({ read() {} }); + + mocked(streamFetchFn).mockResolvedValue({ + status: 401, + statusText: 'Unauthorized', + ok: false, + stream: responseStream + }); + + // Even though credentials are optional and we got a 4xx, the stream body + // should prevent the credential fallback retry since the stream is consumed + const result = await provider.tryUploadCacheEntryFromFileAsync( + terminal, + 'some-key', + '/tmp/cache-entry' + ); + + expect(result).toBe(false); + // Should only be called once (no credential fallback retry with consumed stream) + expect(streamFetchFn).toHaveBeenCalledTimes(1); + }); }); });