Skip to content

Commit e3889d1

Browse files
feat: add trigger and batch opportunity status job handlers
Introduces two new task handlers for opportunity status reporting: - trigger-opportunity-status-job: per-site post-audit scan that classifies opportunities as found/not-found and detects audit-driven updates - batch-opportunity-status-job: Step Functions fan-out handler that checks data source availability (RUM, AHREFS Import, Scraping) and fetches suggestion counts per opportunity - batch-opportunity-status-notifier: aggregates Map state results and posts a consolidated Slack summary Exports isRUMAvailable, isAHREFSImportDataAvailable, and isScrapingAvailable from opportunity-status-processor so the batch handler can reuse them. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 05a7d86 commit e3889d1

6 files changed

Lines changed: 810 additions & 3 deletions

File tree

src/index.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ import { runDemoUrlProcessor as demoUrlProcessor } from './tasks/demo-url-proces
2828
import { runCwvDemoSuggestionsProcessor as cwvDemoSuggestionsProcessor } from './tasks/cwv-demo-suggestions-processor/handler.js';
2929
import { runAgentExecutor as agentExecutor } from './tasks/agent-executor/handler.js';
3030
import { runSlackNotify as slackNotify } from './tasks/slack-notify/handler.js';
31+
import { runTriggerOpportunityStatusJob as triggerOpportunityStatusJob } from './tasks/trigger-opportunity-status-job/handler.js';
32+
import { runBatchOpportunityStatusJob as batchOpportunityStatusJob } from './tasks/batch-opportunity-status-job/handler.js';
33+
import { runBatchOpportunityStatusNotifier as batchOpportunityStatusNotifier } from './tasks/batch-opportunity-status-job/notifier.js';
3134

3235
const HANDLERS = {
3336
'opportunity-status-processor': opportunityStatusProcessor,
@@ -36,6 +39,9 @@ const HANDLERS = {
3639
'agent-executor': agentExecutor,
3740
'slack-notify': slackNotify,
3841
'cwv-demo-suggestions-processor': cwvDemoSuggestionsProcessor,
42+
'trigger-opportunity-status-job': triggerOpportunityStatusJob,
43+
'batch-opportunity-status-job': batchOpportunityStatusJob,
44+
'batch-opportunity-status-notifier': batchOpportunityStatusNotifier,
3945
dummy: (message) => ok(message), // for tests
4046
};
4147

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
/*
2+
* Copyright 2025 Adobe. All rights reserved.
3+
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License. You may obtain a copy
5+
* of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* Unless required by applicable law or agreed to in writing, software distributed under
8+
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
9+
* OF ANY KIND, either express or implied. See the License for the specific language
10+
* governing permissions and limitations under the License.
11+
*/
12+
13+
import { ok } from '@adobe/spacecat-shared-http-utils';
14+
import { resolveCanonicalUrl } from '@adobe/spacecat-shared-utils';
15+
import { getAllOpportunityTypes } from '../opportunity-status-processor/audit-opportunity-map.js';
16+
import {
17+
isRUMAvailable,
18+
isAHREFSImportDataAvailable,
19+
isScrapingAvailable,
20+
} from '../opportunity-status-processor/handler.js';
21+
22+
const TASK_TYPE = 'batch-opportunity-status-job';
23+
24+
const EMPTY_DATA_SOURCES = {
25+
rum: false,
26+
ahrefsImport: false,
27+
scraping: false,
28+
scrapingStats: null,
29+
};
30+
31+
/**
32+
* Checks data source availability (RUM, AHREFS Import, Scraping) for a site.
33+
*
34+
* @param {string} siteId - Site ID
35+
* @param {string} siteUrl - Site base URL
36+
* @param {object} dataAccess - Data access layer
37+
* @param {object} context - Universal serverless context
38+
* @returns {Promise<{rum: boolean, ahrefsImport: boolean, scraping: boolean}>}
39+
*/
40+
async function checkDataSources(siteId, siteUrl, dataAccess, context) {
41+
const { log } = context;
42+
const dataSources = { ...EMPTY_DATA_SOURCES };
43+
44+
try {
45+
const resolvedUrl = await resolveCanonicalUrl(siteUrl);
46+
if (resolvedUrl) {
47+
const domain = new URL(resolvedUrl).hostname;
48+
dataSources.rum = await isRUMAvailable(domain, context);
49+
}
50+
} catch (err) {
51+
log.warn(`[${TASK_TYPE}] RUM check failed for ${siteUrl}: ${err.message}`);
52+
}
53+
54+
try {
55+
dataSources.ahrefsImport = await isAHREFSImportDataAvailable(siteId, dataAccess, context);
56+
} catch (err) {
57+
log.warn(`[${TASK_TYPE}] AHREFS Import check failed for ${siteUrl}: ${err.message}`);
58+
}
59+
60+
try {
61+
const scrapingCheck = await isScrapingAvailable(siteUrl, context);
62+
dataSources.scraping = scrapingCheck.available;
63+
dataSources.scrapingStats = scrapingCheck.stats || null;
64+
} catch (err) {
65+
log.warn(`[${TASK_TYPE}] Scraping check failed for ${siteUrl}: ${err.message}`);
66+
}
67+
68+
return dataSources;
69+
}
70+
71+
async function processSite(message, context) {
72+
const { log, dataAccess } = context;
73+
const { siteId, siteUrl, taskContext = {} } = message;
74+
75+
// ── Input validation ───────────────────────────────────────────────────────
76+
if (!siteId || !siteUrl) {
77+
log.error(`[${TASK_TYPE}] Missing required fields: siteId=${siteId}, siteUrl=${siteUrl}`);
78+
return ok({
79+
message: 'Missing required fields: siteId and siteUrl are required',
80+
found: [],
81+
notFound: [],
82+
dataSources: { ...EMPTY_DATA_SOURCES },
83+
});
84+
}
85+
86+
const { opportunityTypes } = taskContext;
87+
const opportunityTypesToCheck = (Array.isArray(opportunityTypes) && opportunityTypes.length > 0)
88+
? [...new Set(opportunityTypes)]
89+
: getAllOpportunityTypes();
90+
91+
log.info(`[${TASK_TYPE}] Processing site ${siteId} (${siteUrl})`, { opportunityTypesToCheck });
92+
93+
const { Site } = dataAccess;
94+
95+
// ── Fetch site ─────────────────────────────────────────────────────────────
96+
let site;
97+
try {
98+
site = await Site.findById(siteId);
99+
} catch (err) {
100+
log.error(`[${TASK_TYPE}] DB error fetching site ${siteId}: ${err.message}`);
101+
return ok({
102+
message: `Failed to fetch site: ${siteId}`,
103+
siteId,
104+
found: [],
105+
notFound: opportunityTypesToCheck.map((type) => ({ siteId, type })),
106+
dataSources: { ...EMPTY_DATA_SOURCES },
107+
});
108+
}
109+
110+
if (!site) {
111+
log.error(`[${TASK_TYPE}] Site not found for siteId: ${siteId}`);
112+
return ok({
113+
message: `Site not found: ${siteId}`,
114+
siteId,
115+
found: [],
116+
notFound: opportunityTypesToCheck.map((type) => ({ siteId, type })),
117+
dataSources: { ...EMPTY_DATA_SOURCES },
118+
});
119+
}
120+
121+
const baseUrl = site.getBaseURL();
122+
123+
// ── Data source checks ─────────────────────────────────────────────────────
124+
const dataSources = await checkDataSources(siteId, siteUrl, dataAccess, context);
125+
log.info(`[${TASK_TYPE}] Data sources for ${siteId}:`, dataSources);
126+
127+
// ── Fetch opportunities ────────────────────────────────────────────────────
128+
let opportunities = [];
129+
try {
130+
opportunities = await site.getOpportunities();
131+
} catch (err) {
132+
log.error(`[${TASK_TYPE}] Failed to fetch opportunities for site ${siteId}: ${err.message}`);
133+
}
134+
135+
// Group by type for O(1) lookup
136+
const foundByType = {};
137+
for (const opp of opportunities) {
138+
try {
139+
const type = opp.getType();
140+
if (!foundByType[type]) foundByType[type] = [];
141+
foundByType[type].push(opp);
142+
} catch (err) {
143+
log.warn(`[${TASK_TYPE}] Skipping malformed opportunity for site ${siteId}: ${err.message}`);
144+
}
145+
}
146+
147+
// Fetch suggestion counts for all opportunities in a single parallel pass
148+
const suggestionCountMap = new Map();
149+
await Promise.all(
150+
opportunities.map(async (opp) => {
151+
try {
152+
const suggestions = await opp.getSuggestions();
153+
suggestionCountMap.set(opp, suggestions?.length ?? 0);
154+
} catch (err) {
155+
log.warn(`[${TASK_TYPE}] Failed to fetch suggestions for site ${siteId}: ${err.message}`);
156+
suggestionCountMap.set(opp, 0);
157+
}
158+
}),
159+
);
160+
161+
const found = [];
162+
const notFound = [];
163+
164+
for (const type of opportunityTypesToCheck) {
165+
const oppsOfType = foundByType[type] || [];
166+
167+
if (oppsOfType.length === 0) {
168+
notFound.push({ siteId, baseUrl, type });
169+
log.info(`[${TASK_TYPE}] NOT FOUND – type=${type} site=${siteId}`);
170+
} else {
171+
for (const opp of oppsOfType) {
172+
found.push({
173+
siteId,
174+
baseUrl,
175+
type,
176+
updatedAt: opp.getUpdatedAt(),
177+
suggestionCount: suggestionCountMap.get(opp) ?? 0,
178+
});
179+
}
180+
log.info(`[${TASK_TYPE}] FOUND – type=${type} site=${siteId} updatedAt=${oppsOfType[0].getUpdatedAt()}`);
181+
}
182+
}
183+
184+
log.info(`[${TASK_TYPE}] Done – site=${siteId} found=${found.length} notFound=${notFound.length}`);
185+
186+
return ok({
187+
message: `Batch opportunity status scan completed for site ${siteId}`,
188+
siteId,
189+
baseUrl,
190+
opportunityTypesChecked: opportunityTypesToCheck,
191+
dataSources,
192+
found,
193+
notFound,
194+
});
195+
}
196+
197+
/**
198+
* Runs the batch opportunity status job processor.
199+
*
200+
* Designed to be invoked once per site via a Step Functions Map state that fans
201+
* out one Lambda invocation per site. Checks which opportunity types exist for
202+
* the site and also checks data source availability (RUM, AHREFS Import, Scraping).
203+
*
204+
* Message shape (one message per site, fanned out by the Map state):
205+
* {
206+
* type: 'batch-opportunity-status-job',
207+
* siteId: string,
208+
* siteUrl: string,
209+
* taskContext: {
210+
* opportunityTypes?: string[], // explicit list; falls back to all known types
211+
* }
212+
* }
213+
*
214+
* @param {object} message - The Lambda payload
215+
* @param {object} context - The universal serverless context
216+
*/
217+
export async function runBatchOpportunityStatusJob(message, context) {
218+
const { log } = context;
219+
const { siteId, siteUrl } = message;
220+
221+
try {
222+
return await processSite(message, context);
223+
} catch (err) {
224+
log.error(`[${TASK_TYPE}] Unexpected error for site ${siteId} (${siteUrl}): ${err.message}`, err);
225+
return ok({
226+
message: `Unexpected error processing site ${siteId}: ${err.message}`,
227+
siteId,
228+
found: [],
229+
notFound: [],
230+
dataSources: { ...EMPTY_DATA_SOURCES },
231+
});
232+
}
233+
}
234+
235+
export default runBatchOpportunityStatusJob;

0 commit comments

Comments
 (0)