|
1 | 1 | const async = require('async'); |
2 | | -const assert = require('assert'); |
| 2 | +const { callbackify } = require('util'); |
3 | 3 | const { v4: uuidv4 } = require('uuid'); |
4 | | - |
5 | | -const { makeGcpRequest } = require('./makeRequest'); |
| 4 | +const { HeadBucketCommand } = require('@aws-sdk/client-s3'); |
6 | 5 |
|
7 | 6 | const genUniqID = () => uuidv4().replace(/-/g, ''); |
8 | 7 |
|
9 | | -function gcpRequestRetry(params, retry, callback) { |
10 | | - const maxRetries = 4; |
11 | | - const timeout = Math.pow(2, retry) * 1000; |
12 | | - return setTimeout(makeGcpRequest, timeout, params, (err, res) => { |
13 | | - if (err) { |
14 | | - if (retry <= maxRetries && err.statusCode === 429) { |
15 | | - return gcpRequestRetry(params, retry + 1, callback); |
| 8 | +const defaultShouldRetry = err => |
| 9 | + err && (err.name === 'SlowDown' || err.$metadata?.httpStatusCode === 429); |
| 10 | + |
| 11 | +async function gcpRetryCall(callFn, retryOptions) { |
| 12 | + const { |
| 13 | + maxAttempts = 3, |
| 14 | + shouldRetry = defaultShouldRetry, |
| 15 | + getDelayMs = attempt => Math.pow(2, attempt) * 1000, |
| 16 | + } = retryOptions || {}; |
| 17 | + |
| 18 | + let lastError; |
| 19 | + for (let attempt = 0; attempt < maxAttempts; attempt++) { |
| 20 | + try { |
| 21 | + |
| 22 | + return await callFn(); |
| 23 | + } catch (err) { |
| 24 | + lastError = err; |
| 25 | + if (!shouldRetry(err, attempt) || attempt === maxAttempts - 1) { |
| 26 | + throw err; |
16 | 27 | } |
17 | | - return callback(err); |
| 28 | + const delay = getDelayMs(attempt); |
| 29 | + process.stdout.write( |
| 30 | + 'Retryable error from GCP, retrying in ' + |
| 31 | + `${delay}ms (attempt ${attempt + 1}): ${err}\n`); |
| 32 | + |
| 33 | + await new Promise(resolve => setTimeout(resolve, delay)); |
18 | 34 | } |
19 | | - return callback(null, res); |
20 | | - }); |
| 35 | + } |
| 36 | + |
| 37 | + throw lastError; |
21 | 38 | } |
22 | 39 |
|
23 | | -function gcpClientRetry(fn, params, callback, retry = 0) { |
24 | | - const maxRetries = 4; |
25 | | - const timeout = Math.pow(2, retry) * 1000; |
26 | | - return setTimeout(fn, timeout, params, (err, res) => { |
27 | | - if (err) { |
28 | | - if (retry <= maxRetries && err.statusCode === 429) { |
29 | | - return gcpClientRetry(fn, params, callback, retry + 1); |
| 40 | +async function gcpRetry(gcpClient, command, retryOptions, cb) { |
| 41 | + if (cb) { |
| 42 | + return callbackify(() => gcpRetry(gcpClient, command, |
| 43 | + retryOptions))(cb); |
| 44 | + } |
| 45 | + |
| 46 | + return gcpRetryCall(() => gcpClient.send(command), retryOptions); |
| 47 | +} |
| 48 | + |
| 49 | +const defaultShouldRetryUpload = err => err && ( |
| 50 | + err.name === 'NoSuchBucket' |
| 51 | + || err.name === 'NotFound' |
| 52 | + || err.$metadata?.httpStatusCode === 404 |
| 53 | + || err.name === 'SlowDown' |
| 54 | + || err.$metadata?.httpStatusCode === 429 |
| 55 | + || (typeof err.message === 'string' |
| 56 | + && (err.message.includes('NoSuchBucket') |
| 57 | + || err.message.includes('unable to complete upload'))) |
| 58 | +); |
| 59 | + |
| 60 | +const defaultShouldRetryMpuCreate = err => err && ( |
| 61 | + err.name === 'NoSuchBucket' |
| 62 | + || err.name === 'NotFound' |
| 63 | + || err.$metadata?.httpStatusCode === 404 |
| 64 | + || err.name === 'SlowDown' |
| 65 | + || err.$metadata?.httpStatusCode === 429 |
| 66 | +); |
| 67 | + |
| 68 | +async function gcpUploadWithRetry(gcpClient, params, retryOptions) { |
| 69 | + const callFn = () => new Promise((resolve, reject) => { |
| 70 | + gcpClient.upload(params, (err, data) => { |
| 71 | + if (err) { |
| 72 | + return reject(err); |
30 | 73 | } |
31 | | - return callback(err); |
32 | | - } |
33 | | - return callback(null, res); |
| 74 | + return resolve(data); |
| 75 | + }); |
| 76 | + }); |
| 77 | + |
| 78 | + return gcpRetryCall(callFn, { |
| 79 | + maxAttempts: 6, |
| 80 | + shouldRetry: defaultShouldRetryUpload, |
| 81 | + getDelayMs: attempt => (attempt + 1) * 1000, |
| 82 | + ...retryOptions, |
| 83 | + }); |
| 84 | +} |
| 85 | + |
| 86 | +async function gcpCreateMultipartUploadWithRetry(gcpClient, params, retryOptions) { |
| 87 | + const callFn = () => new Promise((resolve, reject) => { |
| 88 | + gcpClient.createMultipartUpload(params, |
| 89 | + (err, res) => (err ? reject(err) : resolve(res))); |
| 90 | + }); |
| 91 | + return gcpRetryCall(callFn, { |
| 92 | + maxAttempts: 6, |
| 93 | + shouldRetry: defaultShouldRetryMpuCreate, |
| 94 | + getDelayMs: attempt => (attempt + 1) * 1000, |
| 95 | + ...retryOptions, |
34 | 96 | }); |
35 | 97 | } |
36 | 98 |
|
37 | 99 | // mpu test helpers |
38 | 100 | function gcpMpuSetup(params, callback) { |
39 | 101 | const { gcpClient, bucketNames, key, partCount, partSize } = params; |
| 102 | + |
40 | 103 | return async.waterfall([ |
41 | | - next => gcpClient.createMultipartUpload({ |
42 | | - Bucket: bucketNames.mpu.Name, |
43 | | - Key: key, |
44 | | - }, (err, res) => { |
45 | | - assert.equal(err, null, |
46 | | - `Expected success, but got error ${err}`); |
47 | | - return next(null, res.UploadId); |
48 | | - }), |
| 104 | + next => gcpCreateMultipartUploadWithRetry(gcpClient, { |
| 105 | + Bucket: bucketNames.mpu.Name, |
| 106 | + Key: key, |
| 107 | + }) |
| 108 | + .then(res => next(null, res.UploadId)) |
| 109 | + .catch(err => next(err)), |
49 | 110 | (uploadId, next) => { |
50 | 111 | if (partCount <= 0) { |
51 | 112 | return next('SkipPutPart', { uploadId }); |
@@ -141,13 +202,26 @@ function setBucketClass(storageClass) { |
141 | 202 | '</CreateBucketConfiguration>'; |
142 | 203 | } |
143 | 204 |
|
| 205 | +async function waitForBucketReady(gcpClient, bucketName, retryOptions) { |
| 206 | + const cmd = new HeadBucketCommand({ Bucket: bucketName }); |
| 207 | + return await gcpRetry(gcpClient, cmd, { |
| 208 | + maxAttempts: 6, |
| 209 | + shouldRetry: defaultShouldRetryMpuCreate, |
| 210 | + getDelayMs: attempt => (attempt + 1) * 1000, |
| 211 | + ...retryOptions, |
| 212 | + }); |
| 213 | +} |
| 214 | + |
144 | 215 | module.exports = { |
145 | | - gcpRequestRetry, |
146 | | - gcpClientRetry, |
147 | 216 | setBucketClass, |
148 | 217 | gcpMpuSetup, |
149 | 218 | genPutTagObj, |
150 | 219 | genGetTagObj, |
151 | 220 | genDelTagObj, |
152 | 221 | genUniqID, |
| 222 | + gcpRetryCall, |
| 223 | + gcpRetry, |
| 224 | + gcpCreateMultipartUploadWithRetry, |
| 225 | + gcpUploadWithRetry, |
| 226 | + waitForBucketReady, |
153 | 227 | }; |
0 commit comments