Skip to content

Commit f67fef8

Browse files
Fixed rate limit error in bulk publish
1 parent 606872e commit f67fef8

4 files changed

Lines changed: 134 additions & 80 deletions

File tree

packages/contentstack-bulk-publish/src/consumer/publish.js

Lines changed: 105 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ const path = require('path');
77
const { formatError } = require('../util');
88
const apiVersionForNRP = '3.2';
99
const nrpApiVersionWarning = `Provided apiVersion is invalid. ${apiVersionForNRP} is only supported value. Continuing with regular bulk-publish for now.`;
10-
10+
const { handleRateLimit } = require('../util/common-utility');
1111
const { getLoggerInstance, addLogs, getLogsDirPath } = require('../util/logger');
1212
const { sanitizePath } = require('@contentstack/cli-utilities');
13+
const { delay } = require('bluebird');
1314
const logsDir = getLogsDirPath();
1415

1516
let logger;
@@ -234,6 +235,7 @@ async function performBulkPublish(data, _config, queue) {
234235
// add validation for user uid
235236
// if user not logged in, then user uid won't be available and NRP too won't work
236237
let conf;
238+
let xRateLimitRemaining;
237239
const bulkPublishObj = data.obj;
238240
const stack = bulkPublishObj.stack;
239241
let payload = {};
@@ -261,6 +263,7 @@ async function performBulkPublish(data, _config, queue) {
261263
.publish(payload)
262264
.then((bulkPublishEntriesResponse) => {
263265
if (!bulkPublishEntriesResponse.error_message) {
266+
xRateLimitRemaining = parseInt(bulkPublishEntriesResponse.stackHeaders.responseHeaders['x-ratelimit-remaining'], 10);
264267
const sanitizedData = removePublishDetails(bulkPublishObj.entries);
265268
console.log(
266269
chalk.green(`Bulk entries sent for publish`),
@@ -277,10 +280,14 @@ async function performBulkPublish(data, _config, queue) {
277280
throw bulkPublishEntriesResponse;
278281
}
279282
})
280-
.catch((error) => {
283+
.catch(async (error) => {
281284
if (error.errorCode === 429 && data.retry < 2) {
282285
data.retry++;
283-
queue.Enqueue(data);
286+
// Call the handleRateLimit function
287+
const delayApplied = await handleRateLimit(error, data, delay, xRateLimitRemaining);
288+
if (delayApplied) {
289+
queue.Enqueue(data);
290+
}
284291
} else {
285292
delete bulkPublishObj.stack;
286293
console.log(chalk.red(`Bulk entries failed to publish with error ${formatError(error)}`));
@@ -316,6 +323,7 @@ async function performBulkPublish(data, _config, queue) {
316323
.publish(payload)
317324
.then((bulkPublishAssetsResponse) => {
318325
if (!bulkPublishAssetsResponse.error_message) {
326+
xRateLimitRemaining = parseInt(bulkPublishEntriesResponse.stackHeaders.responseHeaders['x-ratelimit-remaining'], 10);
319327
console.log(
320328
chalk.green(
321329
`Bulk assets sent for publish`,
@@ -334,10 +342,14 @@ async function performBulkPublish(data, _config, queue) {
334342
throw bulkPublishAssetsResponse;
335343
}
336344
})
337-
.catch((error) => {
345+
.catch(async (error) => {
338346
if (error.errorCode === 429 && data.retry < 2) {
339347
data.retry++;
340-
queue.Enqueue(data);
348+
// Call the handleRateLimit function
349+
const delayApplied = await handleRateLimit(error, data, delay, xRateLimitRemaining);
350+
if (delayApplied) {
351+
queue.Enqueue(data);
352+
}
341353
} else {
342354
delete bulkPublishObj.stack;
343355
console.log(chalk.red(`Bulk assets failed to publish with error ${formatError(error)}`));
@@ -380,49 +392,55 @@ async function performBulkUnPublish(data, _config, queue) {
380392
}
381393
}
382394
}
383-
stack
384-
.bulkOperation()
385-
.unpublish(payload)
386-
.then((bulkUnPublishEntriesResponse) => {
387-
if (!bulkUnPublishEntriesResponse.error_message) {
388-
delete bulkUnPublishObj.stack;
389395

390-
console.log(
391-
chalk.green(
392-
`Bulk entries sent for Unpublish`,
393-
bulkUnPublishEntriesResponse.job_id
394-
? chalk.yellow(`job_id: ${bulkUnPublishEntriesResponse.job_id}`)
395-
: '',
396-
),
397-
);
398-
let sanitizedData = removePublishDetails(bulkUnPublishObj.entries);
399-
displayEntriesDetails(sanitizedData);
400-
addLogs(
401-
logger,
402-
{ options: bulkUnPublishObj, api_key: stack.stackHeaders.api_key, alias: stack.alias, host: stack.host },
403-
'info',
404-
);
405-
} else {
406-
throw bulkUnPublishEntriesResponse;
407-
}
408-
})
409-
.catch((error) => {
410-
if (error.errorCode === 429 && data.retry < 2) {
411-
data.retry++;
396+
try {
397+
const bulkUnPublishEntriesResponse = await stack.bulkOperation().unpublish(payload);
398+
399+
if (!bulkUnPublishEntriesResponse.error_message) {
400+
xRateLimitRemaining = parseInt(bulkPublishEntriesResponse.stackHeaders.responseHeaders['x-ratelimit-remaining'], 10);
401+
delete bulkUnPublishObj.stack;
402+
console.log(
403+
chalk.green(
404+
`Bulk entries sent for Unpublish`,
405+
bulkUnPublishEntriesResponse.job_id
406+
? chalk.yellow(`job_id: ${bulkUnPublishEntriesResponse.job_id}`)
407+
: '',
408+
),
409+
);
410+
411+
let sanitizedData = removePublishDetails(bulkUnPublishObj.entries);
412+
displayEntriesDetails(sanitizedData);
413+
addLogs(logger, {
414+
options: bulkUnPublishObj,
415+
api_key: stack.stackHeaders.api_key,
416+
alias: stack.alias,
417+
host: stack.host,
418+
}, 'info');
419+
} else {
420+
throw bulkUnPublishEntriesResponse;
421+
}
422+
} catch (error) {
423+
if (error.errorCode === 429 && data.retry < 2) {
424+
data.retry++;
425+
// Call the handleRateLimit function
426+
const delayApplied = await handleRateLimit(error, data, delay, xRateLimitRemaining);
427+
if (delayApplied) {
412428
queue.Enqueue(data);
413-
} else {
414-
delete bulkUnPublishObj.stack;
415-
console.log(chalk.red(`Bulk entries failed to Unpublish with error ${formatError(error)}`));
416-
let sanitizedData = removePublishDetails(bulkUnPublishObj.entries);
417-
displayEntriesDetails(sanitizedData);
418-
addLogs(
429+
}
430+
} else {
431+
delete bulkUnPublishObj.stack;
432+
console.log(chalk.red(`Bulk entries failed to Unpublish with error ${formatError(error)}`));
433+
let sanitizedData = removePublishDetails(bulkUnPublishObj.entries);
434+
displayEntriesDetails(sanitizedData);
435+
addLogs(
419436
logger,
420437
{ options: bulkUnPublishObj, api_key: stack.stackHeaders.api_key, alias: stack.alias, host: stack.host },
421438
'error',
422439
);
423-
}
424-
});
440+
}
441+
}
425442
break;
443+
426444
case 'asset':
427445
conf = {
428446
assets: removePublishDetails(bulkUnPublishObj.assets),
@@ -433,52 +451,60 @@ async function performBulkUnPublish(data, _config, queue) {
433451
if (bulkUnPublishObj.apiVersion) {
434452
if (!isNaN(bulkUnPublishObj.apiVersion) && bulkUnPublishObj.apiVersion === apiVersionForNRP) {
435453
payload['api_version'] = bulkUnPublishObj.apiVersion;
454+
} else if (bulkUnPublishObj.apiVersion !== '3') {
455+
console.log(chalk.yellow(nrpApiVersionWarning));
456+
}
457+
}
458+
459+
try {
460+
const bulkUnPublishAssetsResponse = await stack.bulkOperation().unpublish(payload);
461+
462+
if (!bulkUnPublishAssetsResponse.error_message) {
463+
xRateLimitRemaining = parseInt(bulkPublishEntriesResponse.stackHeaders.responseHeaders['x-ratelimit-remaining'], 10);
464+
delete bulkUnPublishObj.stack;
465+
let sanitizedData = removePublishDetails(bulkUnPublishObj.assets);
466+
console.log(
467+
chalk.green(
468+
`Bulk assets sent for Unpublish`,
469+
bulkUnPublishAssetsResponse.job_id ? chalk.yellow(`job_id: ${bulkUnPublishAssetsResponse.job_id}`) : '',
470+
),
471+
);
472+
473+
displayAssetsDetails(sanitizedData);
474+
addLogs(logger, {
475+
options: bulkUnPublishObj,
476+
api_key: stack.stackHeaders.api_key,
477+
alias: stack.alias,
478+
host: stack.host,
479+
}, 'info');
436480
} else {
437481
if (bulkUnPublishObj.apiVersion !== '3') {
438482
// because 3 is the default value for api-version, and it exists for the purpose of display only
439483
console.log(chalk.yellow(nrpApiVersionWarning));
440484
}
485+
throw bulkUnPublishAssetsResponse;
441486
}
442-
}
443-
stack
444-
.bulkOperation()
445-
.unpublish(payload)
446-
.then((bulkUnPublishAssetsResponse) => {
447-
if (!bulkUnPublishAssetsResponse.error_message) {
448-
delete bulkUnPublishObj.stack;
449-
let sanitizedData = removePublishDetails(bulkUnPublishObj.assets);
450-
console.log(
451-
chalk.green(
452-
`Bulk assets sent for Unpublish`,
453-
bulkUnPublishAssetsResponse.job_id ? chalk.yellow(`job_id: ${bulkUnPublishAssetsResponse.job_id}`) : '',
454-
),
455-
);
456-
displayAssetsDetails(sanitizedData);
457-
addLogs(
458-
logger,
459-
{ options: bulkUnPublishObj, api_key: stack.stackHeaders.api_key, alias: stack.alias, host: stack.host },
460-
'info',
461-
);
462-
} else {
463-
throw bulkUnPublishAssetsResponse;
464-
}
465-
})
466-
.catch((error) => {
467-
if (error.errorCode === 429 && data.retry < 2) {
468-
data.retry++;
487+
} catch (error) {
488+
if (error.errorCode === 429 && data.retry < 2) {
489+
data.retry++;
490+
// Call the handleRateLimit function
491+
const delayApplied = await handleRateLimit(error, data, delay, xRateLimitRemaining);
492+
if (delayApplied) {
469493
queue.Enqueue(data);
470-
} else {
471-
delete bulkUnPublishObj.stack;
472-
console.log(chalk.red(`Bulk assets failed to Unpublish with error ${formatError(error)}`));
473-
let sanitizedData = removePublishDetails(bulkUnPublishObj.assets);
474-
displayAssetsDetails(sanitizedData);
475-
addLogs(
476-
logger,
477-
{ options: bulkUnPublishObj, api_key: stack.stackHeaders.api_key, alias: stack.alias, host: stack.host },
478-
'error',
479-
);
480494
}
481-
});
495+
} else {
496+
delete bulkUnPublishObj.stack;
497+
console.log(chalk.red(`Bulk assets failed to Unpublish with error ${formatError(error)}`));
498+
let sanitizedData = removePublishDetails(bulkUnPublishObj.assets);
499+
displayAssetsDetails(sanitizedData);
500+
addLogs(logger, {
501+
options: bulkUnPublishObj,
502+
api_key: stack.stackHeaders.api_key,
503+
alias: stack.alias,
504+
host: stack.host,
505+
}, 'error');
506+
}
507+
}
482508
break;
483509
default:
484510
console.log('No such type');

packages/contentstack-bulk-publish/src/util/client.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ async function getStack(data) {
66
const options = {
77
host: data.host,
88
branchName: data.branch,
9+
headers: {includeResHeaders: true},
910
};
1011
const stackOptions = {};
1112
if (data.alias) {

packages/contentstack-bulk-publish/src/util/common-utility.js

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,27 @@ function fetchBulkPublishLimit(orgUid) {
2727
return bulkPublishLimit;
2828
}
2929

30-
module.exports = { fetchBulkPublishLimit };
30+
/**
31+
* Handles the rate limit checking and adds delay if necessary.
32+
* @param {Object} error - The error object containing the response headers.
33+
* @param {Object} data - The data being processed, including the batch size.
34+
* @param {Function} delay - The delay function to use for waiting.
35+
* @param {number} xRateLimitRemaining - The xRateLimitRemaining containing the remaining balance.
36+
* @returns {boolean} - Returns `true` if delay was applied, `false` otherwise.
37+
*/
38+
async function handleRateLimit(error, data, delay, xRateLimitRemaining) {
39+
// Check if rate limit is exhausted or batch size exceeds remaining limit
40+
if (xRateLimitRemaining === 0 || data.length > xRateLimitRemaining) {
41+
cliux.print(
42+
'Bulk rate limit reached or batch size exceeds remaining limit. Retrying in 2 seconds...',
43+
{ color: 'yellow' },
44+
);
45+
await delay(2000); // Wait for 2 seconds before retrying
46+
return true;
47+
} else {
48+
return false;
49+
}
50+
}
51+
52+
53+
module.exports = { fetchBulkPublishLimit, handleRateLimit };

packages/contentstack-utilities/src/contentstack-management-sdk.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ class ManagementSDKInitiator {
8585
if (!option.headers) option.headers = {};
8686
option.headers['X-CS-CLI'] = this.analyticsInfo;
8787
}
88+
if (config.headers?.includeResHeaders) {
89+
if (!option.headers) option.headers = {};
90+
option.headers['includeResHeaders'] = true;
91+
}
8892
if (!config.management_token) {
8993
const authorisationType = configStore.get('authorisationType');
9094
if (authorisationType === 'BASIC') {

0 commit comments

Comments
 (0)