Skip to content

Commit afb4c6f

Browse files
feat: add bulk-disable-import-audit-processor task handler (#244)
Introduces a new bulk handler that processes N sites in a single invocation — disabling imports per site and calling configuration.save() once for all sites — eliminating the race condition that occurred when N parallel disable-import-audit-processor invocations each wrote the shared Configuration object concurrently.
1 parent c33139f commit afb4c6f

3 files changed

Lines changed: 712 additions & 0 deletions

File tree

src/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import dataAccess from './support/data-access.js';
2424

2525
import { runOpportunityStatusProcessor as opportunityStatusProcessor } from './tasks/opportunity-status-processor/handler.js';
2626
import { runDisableImportAuditProcessor as disableImportAuditProcessor } from './tasks/disable-import-audit-processor/handler.js';
27+
import { runBulkDisableImportAuditProcessor as bulkDisableImportAuditProcessor } from './tasks/bulk-disable-import-audit-processor/handler.js';
2728
import { runDemoUrlProcessor as demoUrlProcessor } from './tasks/demo-url-processor/handler.js';
2829
import { runCwvDemoSuggestionsProcessor as cwvDemoSuggestionsProcessor } from './tasks/cwv-demo-suggestions-processor/handler.js';
2930
import { runAgentExecutor as agentExecutor } from './tasks/agent-executor/handler.js';
@@ -32,6 +33,7 @@ import { runSlackNotify as slackNotify } from './tasks/slack-notify/handler.js';
3233
const HANDLERS = {
3334
'opportunity-status-processor': opportunityStatusProcessor,
3435
'disable-import-audit-processor': disableImportAuditProcessor,
36+
'bulk-disable-import-audit-processor': bulkDisableImportAuditProcessor,
3537
'demo-url-processor': demoUrlProcessor,
3638
'agent-executor': agentExecutor,
3739
'slack-notify': slackNotify,
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Copyright 2026 Adobe. All rights reserved.
3+
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License. You may obtain a copy
5+
* of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software distributed under
8+
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9+
* OF ANY KIND, either express or implied. See the License for the specific language
10+
* governing permissions and limitations under the License.
11+
*/
12+
13+
import { ok, internalServerError } from '@adobe/spacecat-shared-http-utils';
14+
import { Config } from '@adobe/spacecat-shared-data-access';
15+
16+
import { say } from '../../utils/slack-utils.js';
17+
18+
const TASK_TYPE = 'bulk-disable-import-audit-processor';
19+
const SITE_BATCH_SIZE = 10;
20+
21+
async function processSiteEntry(siteEntry, Site, log) {
22+
const {
23+
siteUrl,
24+
siteId,
25+
importTypes = [],
26+
auditTypes = [],
27+
scheduledRun: siteScheduledRun = false,
28+
} = siteEntry;
29+
30+
if (!siteUrl) {
31+
log.warn(`Skipping site entry with missing siteUrl (siteId: ${siteId})`);
32+
return { siteUrl: siteId || 'unknown', status: 'error', error: 'Missing siteUrl' };
33+
}
34+
35+
if (siteScheduledRun) {
36+
log.info(`Scheduled run for site ${siteUrl} - skipping`);
37+
return { siteUrl, status: 'skipped' };
38+
}
39+
40+
try {
41+
const site = await Site.findByBaseURL(siteUrl);
42+
if (!site) {
43+
log.warn(`Site not found for siteUrl: ${siteUrl} (siteId: ${siteId})`);
44+
return { siteUrl, status: 'not_found' };
45+
}
46+
47+
const siteConfig = site.getConfig();
48+
for (const importType of importTypes) {
49+
siteConfig.disableImport(importType);
50+
}
51+
site.setConfig(Config.toDynamoItem(siteConfig));
52+
await site.save();
53+
54+
log.info(`Disabled imports [${importTypes.join(', ')}] and audits [${auditTypes.join(', ')}] for site: ${siteUrl}`);
55+
return {
56+
site, siteUrl, importTypes, auditTypes, status: 'disabled',
57+
};
58+
} catch (error) {
59+
log.error(`Error processing site ${siteUrl}:`, error);
60+
return { siteUrl, status: 'error', error: 'Site processing failed' };
61+
}
62+
}
63+
64+
/**
65+
* Runs the bulk disable import and audit processor for multiple sites.
66+
* Loads Configuration once and saves it once after processing all sites,
67+
* avoiding race conditions from concurrent per-site configuration writes.
68+
*
69+
* @param {object} message - The message object
70+
* @param {Array<{siteId, siteUrl, organizationId, importTypes, auditTypes, scheduledRun}>}
71+
* message.sites
72+
* @param {object} message.taskContext
73+
* @param {object} message.taskContext.slackContext
74+
* @param {object} context - The context object
75+
*/
76+
export async function runBulkDisableImportAuditProcessor(message, context) {
77+
const { log, env, dataAccess } = context;
78+
const { sites = [], taskContext = {} } = message;
79+
const { slackContext, scheduledRun = false } = taskContext;
80+
const { Site, Configuration } = dataAccess;
81+
82+
log.info('Processing bulk disable import and audit request:', {
83+
taskType: TASK_TYPE,
84+
siteCount: sites.length,
85+
scheduledRun,
86+
});
87+
88+
if (scheduledRun) {
89+
log.info('Scheduled run detected - skipping bulk disable of imports and audits');
90+
await say(env, log, slackContext, ':information_source: Scheduled run detected - skipping bulk disable of imports and audits');
91+
return ok({ message: 'Scheduled run - no disable of imports and audits performed' });
92+
}
93+
94+
if (sites.length === 0) {
95+
log.info('No sites to process');
96+
return ok({ message: 'No sites to process' });
97+
}
98+
99+
let configuration;
100+
try {
101+
configuration = await Configuration.findLatest();
102+
} catch (error) {
103+
log.error('Failed to load configuration:', error);
104+
await say(env, log, slackContext, ':x: Bulk disable: failed to load configuration');
105+
return internalServerError('Failed to load configuration');
106+
}
107+
108+
const results = [];
109+
110+
for (let i = 0; i < sites.length; i += SITE_BATCH_SIZE) {
111+
const batch = sites.slice(i, i + SITE_BATCH_SIZE);
112+
// eslint-disable-next-line no-await-in-loop
113+
const batchOutcomes = await Promise.allSettled(
114+
batch.map((siteEntry) => processSiteEntry(siteEntry, Site, log)),
115+
);
116+
117+
for (const outcome of batchOutcomes) {
118+
// processSiteEntry always resolves — rejected case is a safeguard only
119+
const result = outcome.status === 'fulfilled'
120+
? outcome.value
121+
: { siteUrl: 'unknown', status: 'error', error: 'Unexpected processing error' };
122+
123+
if (result.status === 'disabled') {
124+
for (const auditType of result.auditTypes) {
125+
configuration.disableHandlerForSite(auditType, result.site);
126+
}
127+
results.push({
128+
siteUrl: result.siteUrl,
129+
status: 'disabled',
130+
importTypes: result.importTypes,
131+
auditTypes: result.auditTypes,
132+
});
133+
} else {
134+
results.push({ siteUrl: result.siteUrl, status: result.status, error: result.error });
135+
}
136+
}
137+
}
138+
139+
try {
140+
await configuration.save();
141+
log.info(`Saved configuration after processing ${sites.length} sites`);
142+
} catch (error) {
143+
log.error('Failed to save configuration:', error);
144+
await say(env, log, slackContext, `:x: Bulk disable: failed to save configuration after processing ${sites.length} sites`);
145+
return internalServerError('Failed to save configuration');
146+
}
147+
148+
const succeeded = results.filter((r) => r.status === 'disabled');
149+
const failed = results.filter((r) => r.status === 'error' || r.status === 'not_found');
150+
151+
try {
152+
const summaryLines = succeeded.map((r) => {
153+
const importsText = r.importTypes?.length > 0 ? r.importTypes.join(', ') : 'None';
154+
const auditsText = r.auditTypes?.length > 0 ? r.auditTypes.join(', ') : 'None';
155+
return `:broom: *${r.siteUrl}*: disabled imports: ${importsText} | audits: ${auditsText}`;
156+
});
157+
158+
if (summaryLines.length > 0) {
159+
await say(env, log, slackContext, summaryLines.join('\n'));
160+
}
161+
162+
if (failed.length > 0) {
163+
const failedText = failed.map((r) => `${r.siteUrl} (${r.status})`).join(', ');
164+
await say(env, log, slackContext, `:warning: Bulk disable: ${failed.length} site(s) had issues: ${failedText}`);
165+
}
166+
} catch (error) {
167+
log.error('Failed to send Slack summary:', error);
168+
}
169+
170+
return ok({ message: 'Bulk disable import and audit processor completed', results });
171+
}
172+
173+
export default runBulkDisableImportAuditProcessor;

0 commit comments

Comments
 (0)