Skip to content

Commit 733ef5c

Browse files
Copiloticlanton
andauthored
Add streaming support to Amazon S3 and Azure Storage build cache plugins
Agent-Logs-Url: https://github.com/microsoft/rushstack/sessions/6212baeb-266c-4823-94df-251c69a8f74c Co-authored-by: iclanton <5010588+iclanton@users.noreply.github.com>
1 parent 91161d8 commit 733ef5c

4 files changed

Lines changed: 284 additions & 12 deletions

File tree

common/reviews/api/rush-amazon-s3-build-cache-plugin.api.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import type { IRushPlugin } from '@rushstack/rush-sdk';
1010
import { ITerminal } from '@rushstack/terminal';
11+
import type { Readable } from 'node:stream';
1112
import type { RushConfiguration } from '@rushstack/rush-sdk';
1213
import type { RushSession } from '@rushstack/rush-sdk';
1314
import { WebClient } from '@rushstack/rush-sdk/lib/utilities/WebClient';
@@ -18,6 +19,8 @@ export class AmazonS3Client {
1819
// (undocumented)
1920
getObjectAsync(objectName: string): Promise<Buffer | undefined>;
2021
// (undocumented)
22+
getObjectStreamAsync(objectName: string): Promise<Readable | undefined>;
23+
// (undocumented)
2124
_getSha256Hmac(key: string | Buffer, data: string): Buffer;
2225
// (undocumented)
2326
_getSha256Hmac(key: string | Buffer, data: string, encoding: 'hex'): string;
@@ -26,6 +29,8 @@ export class AmazonS3Client {
2629
// (undocumented)
2730
uploadObjectAsync(objectName: string, objectBuffer: Buffer): Promise<void>;
2831
// (undocumented)
32+
uploadObjectStreamAsync(objectName: string, objectStream: Readable): Promise<void>;
33+
// (undocumented)
2934
static UriEncode(input: string): string;
3035
}
3136

rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3BuildCacheProvider.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license.
22
// See LICENSE in the project root for license information.
33

4+
import type { Readable } from 'node:stream';
5+
46
import { type ICredentialCacheEntry, CredentialCache } from '@rushstack/credential-cache';
57
import type { ITerminal } from '@rushstack/terminal';
68
import {
@@ -182,6 +184,44 @@ export class AmazonS3BuildCacheProvider implements ICloudBuildCacheProvider {
182184
}
183185
}
184186

187+
public async tryGetCacheEntryStreamByIdAsync(
188+
terminal: ITerminal,
189+
cacheId: string
190+
): Promise<Readable | undefined> {
191+
try {
192+
const client: AmazonS3Client = await this._getS3ClientAsync(terminal);
193+
return await client.getObjectStreamAsync(this._s3Prefix ? `${this._s3Prefix}/${cacheId}` : cacheId);
194+
} catch (e) {
195+
terminal.writeWarningLine(`Error getting cache entry stream from S3: ${e}`);
196+
return undefined;
197+
}
198+
}
199+
200+
public async trySetCacheEntryStreamAsync(
201+
terminal: ITerminal,
202+
cacheId: string,
203+
entryStream: Readable
204+
): Promise<boolean> {
205+
if (!this.isCacheWriteAllowed) {
206+
terminal.writeErrorLine('Writing to S3 cache is not allowed in the current configuration.');
207+
return false;
208+
}
209+
210+
terminal.writeDebugLine('Uploading object stream with cacheId: ', cacheId);
211+
212+
try {
213+
const client: AmazonS3Client = await this._getS3ClientAsync(terminal);
214+
await client.uploadObjectStreamAsync(
215+
this._s3Prefix ? `${this._s3Prefix}/${cacheId}` : cacheId,
216+
entryStream
217+
);
218+
return true;
219+
} catch (e) {
220+
terminal.writeWarningLine(`Error uploading cache entry stream to S3: ${e}`);
221+
return false;
222+
}
223+
}
224+
185225
public async updateCachedCredentialAsync(terminal: ITerminal, credential: string): Promise<void> {
186226
await CredentialCache.usingAsync(
187227
{

rush-plugins/rush-amazon-s3-build-cache-plugin/src/AmazonS3Client.ts

Lines changed: 121 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22
// See LICENSE in the project root for license information.
33

44
import * as crypto from 'node:crypto';
5+
import type { Readable } from 'node:stream';
56

67
import { Async } from '@rushstack/node-core-library';
78
import { Colorize, type ITerminal } from '@rushstack/terminal';
89
import {
910
type IGetFetchOptions,
1011
type IFetchOptionsWithBody,
1112
type IWebClientResponse,
13+
type IWebClientStreamResponse,
1214
type WebClient,
1315
AUTHORIZATION_HEADER_NAME
1416
} from '@rushstack/rush-sdk/lib/utilities/WebClient';
@@ -21,6 +23,13 @@ const DATE_HEADER_NAME: 'x-amz-date' = 'x-amz-date';
2123
const HOST_HEADER_NAME: 'host' = 'host';
2224
const SECURITY_TOKEN_HEADER_NAME: 'x-amz-security-token' = 'x-amz-security-token';
2325

26+
/**
27+
* AWS Signature V4 allows using this sentinel value as the content hash when the request
28+
* payload is not signed. This is used for streaming uploads where the body cannot be hashed
29+
* upfront.
30+
*/
31+
const UNSIGNED_PAYLOAD: 'UNSIGNED-PAYLOAD' = 'UNSIGNED-PAYLOAD';
32+
2433
interface IIsoDateString {
2534
date: string;
2635
dateTime: string;
@@ -178,6 +187,73 @@ export class AmazonS3Client {
178187
});
179188
}
180189

190+
public async getObjectStreamAsync(objectName: string): Promise<Readable | undefined> {
191+
this._writeDebugLine('Reading object stream from S3');
192+
return await this._sendCacheRequestWithRetriesAsync(async () => {
193+
const response: IWebClientStreamResponse = await this._makeStreamRequestAsync('GET', objectName);
194+
if (response.ok) {
195+
return {
196+
hasNetworkError: false,
197+
response: response.stream
198+
};
199+
} else if (response.status === 404) {
200+
response.stream.resume();
201+
return {
202+
hasNetworkError: false,
203+
response: undefined
204+
};
205+
} else if (
206+
(response.status === 400 || response.status === 401 || response.status === 403) &&
207+
!this._credentials
208+
) {
209+
response.stream.resume();
210+
this._writeWarningLine(
211+
`No credentials found and received a ${response.status}`,
212+
' response code from the cloud storage.',
213+
' Maybe run rush update-cloud-credentials',
214+
' or set the RUSH_BUILD_CACHE_CREDENTIAL env'
215+
);
216+
return {
217+
hasNetworkError: false,
218+
response: undefined
219+
};
220+
} else if (response.status === 400 || response.status === 401 || response.status === 403) {
221+
response.stream.resume();
222+
throw new Error(
223+
`Amazon S3 responded with status code ${response.status} (${response.statusText})`
224+
);
225+
} else {
226+
response.stream.resume();
227+
return {
228+
hasNetworkError: true,
229+
error: new Error(
230+
`Amazon S3 responded with status code ${response.status} (${response.statusText})`
231+
)
232+
};
233+
}
234+
});
235+
}
236+
237+
public async uploadObjectStreamAsync(objectName: string, objectStream: Readable): Promise<void> {
238+
if (!this._credentials) {
239+
throw new Error('Credentials are required to upload objects to S3.');
240+
}
241+
242+
// Streaming uploads cannot be retried because the stream is consumed after the first attempt.
243+
const response: IWebClientStreamResponse = await this._makeStreamRequestAsync(
244+
'PUT',
245+
objectName,
246+
objectStream
247+
);
248+
if (!response.ok) {
249+
response.stream.resume();
250+
throw new Error(
251+
`Amazon S3 responded with status code ${response.status} (${response.statusText})`
252+
);
253+
}
254+
response.stream.resume();
255+
}
256+
181257
private _writeDebugLine(...messageParts: string[]): void {
182258
// if the terminal has been closed then don't bother sending a debug message
183259
try {
@@ -201,8 +277,51 @@ export class AmazonS3Client {
201277
objectName: string,
202278
body?: Buffer
203279
): Promise<IWebClientResponse> {
204-
const isoDateString: IIsoDateString = this._getIsoDateString();
205280
const bodyHash: string = this._getSha256(body);
281+
const { url, headers } = this._buildSignedRequest(verb, objectName, bodyHash);
282+
283+
const webFetchOptions: IGetFetchOptions | IFetchOptionsWithBody = {
284+
verb,
285+
headers
286+
};
287+
if (verb === 'PUT') {
288+
(webFetchOptions as IFetchOptionsWithBody).body = body;
289+
}
290+
291+
const response: IWebClientResponse = await this._webClient.fetchAsync(url, webFetchOptions);
292+
293+
return response;
294+
}
295+
296+
private async _makeStreamRequestAsync(
297+
verb: 'GET' | 'PUT',
298+
objectName: string,
299+
body?: Readable
300+
): Promise<IWebClientStreamResponse> {
301+
// For streaming uploads, the body cannot be hashed upfront, so we use UNSIGNED-PAYLOAD.
302+
const bodyHash: string = body ? UNSIGNED_PAYLOAD : this._getSha256(undefined);
303+
const { url, headers } = this._buildSignedRequest(verb, objectName, bodyHash);
304+
305+
const webFetchOptions: IGetFetchOptions | IFetchOptionsWithBody = {
306+
verb,
307+
headers
308+
};
309+
if (verb === 'PUT' && body) {
310+
(webFetchOptions as IFetchOptionsWithBody).body = body;
311+
}
312+
313+
return await this._webClient.fetchStreamAsync(url, webFetchOptions);
314+
}
315+
316+
/**
317+
* Builds an AWS Signature V4 signed request, returning the URL and signed headers.
318+
*/
319+
private _buildSignedRequest(
320+
verb: 'GET' | 'PUT',
321+
objectName: string,
322+
bodyHash: string
323+
): { url: string; headers: Record<string, string> } {
324+
const isoDateString: IIsoDateString = this._getIsoDateString();
206325
const headers: Record<string, string> = {};
207326
headers[DATE_HEADER_NAME] = isoDateString.dateTime;
208327
headers[CONTENT_HASH_HEADER_NAME] = bodyHash;
@@ -299,14 +418,6 @@ export class AmazonS3Client {
299418
}
300419
}
301420

302-
const webFetchOptions: IGetFetchOptions | IFetchOptionsWithBody = {
303-
verb,
304-
headers
305-
};
306-
if (verb === 'PUT') {
307-
(webFetchOptions as IFetchOptionsWithBody).body = body;
308-
}
309-
310421
const url: string = `${this._s3Endpoint}${canonicalUri}`;
311422

312423
this._writeDebugLine(Colorize.bold(Colorize.underline('Sending request to S3')));
@@ -316,9 +427,7 @@ export class AmazonS3Client {
316427
this._writeDebugLine(Colorize.cyan(`\t${name}: ${value}`));
317428
}
318429

319-
const response: IWebClientResponse = await this._webClient.fetchAsync(url, webFetchOptions);
320-
321-
return response;
430+
return { url, headers };
322431
}
323432

324433
public _getSha256Hmac(key: string | Buffer, data: string): Buffer;

0 commit comments

Comments
 (0)