-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.js
More file actions
132 lines (114 loc) · 4.64 KB
/
Copy pathindex.js
File metadata and controls
132 lines (114 loc) · 4.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
* Copyright 2025 Adobe. All rights reserved.
* This file is licensed to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS
* OF ANY KIND, either express or implied. See the License for the specific language
* governing permissions and limitations under the License.
*/
import wrap from '@adobe/helix-shared-wrap';
import { helixStatus } from '@adobe/helix-status';
import secrets from '@adobe/helix-shared-secrets';
import dataAccess from '@adobe/spacecat-shared-data-access';
import {
internalServerError,
notFound,
ok,
badRequest,
} from '@adobe/spacecat-shared-http-utils';
import { imsClientWrapper } from '@adobe/spacecat-shared-ims-client';
import { isNonEmptyObject, sqsEventAdapter, sqsWrapper } from '@adobe/spacecat-shared-utils';
import { runOpportunityStatusProcessor as opportunityStatusProcessor } from './tasks/opportunity-status-processor/handler.js';
import { runDisableImportAuditProcessor as disableImportAuditProcessor } from './tasks/disable-import-audit-processor/handler.js';
import { runDemoUrlProcessor as demoUrlProcessor } from './tasks/demo-url-processor/handler.js';
import { runCwvDemoSuggestionsProcessor as cwvDemoSuggestionsProcessor } from './tasks/cwv-demo-suggestions-processor/handler.js';
import { runAgentExecutor as agentExecutor } from './tasks/agent-executor/handler.js';
import { runSlackNotify as slackNotify } from './tasks/slack-notify/handler.js';
const HANDLERS = {
'opportunity-status-processor': opportunityStatusProcessor,
'disable-import-audit-processor': disableImportAuditProcessor,
'demo-url-processor': demoUrlProcessor,
'agent-executor': agentExecutor,
'slack-notify': slackNotify,
'cwv-demo-suggestions-processor': cwvDemoSuggestionsProcessor,
dummy: (message) => ok(message), // for tests
};
// Custom secret name resolver to use the correct secret path
function getSecretName() {
return '/helix-deploy/spacecat-services/task-manager/latest';
}
// Export for testing
export { getSecretName };
function getElapsedSeconds(startTime) {
const endTime = process.hrtime(startTime);
const elapsedSeconds = endTime[0] + endTime[1] / 1e9;
return elapsedSeconds.toFixed(2);
}
/**
* This is the main function
* @param {object} message the message object received from SQS
* @param {UniversalContext} context the context of the universal serverless function
* @returns {Response} a response
*/
async function processTask(message, context) {
const { log } = context;
const { type, siteId } = message;
log.info(`Received message with type: ${type} for site: ${siteId}`);
const handler = HANDLERS[type];
if (!handler) {
const msg = `no such task type: ${type}`;
log.error(msg);
return notFound();
}
const startTime = process.hrtime();
try {
const result = await handler(message, context);
log.info(`${type} task for ${siteId} completed in ${getElapsedSeconds(startTime)} seconds`);
return result;
} catch (e) {
log.error(`${type} task for ${siteId} failed after ${getElapsedSeconds(startTime)} seconds. `, e);
return internalServerError();
}
}
const runSQS = wrap(processTask)
.with(dataAccess)
.with(sqsWrapper)
.with(sqsEventAdapter)
.with(imsClientWrapper)
.with(secrets, { name: getSecretName })
.with(helixStatus);
const runDirect = wrap(processTask)
.with(dataAccess)
.with(sqsWrapper)
.with(imsClientWrapper)
.with(secrets, { name: getSecretName })
.with(helixStatus);
function isSqsEvent(event, context) {
// Check top-level Records (unwrapped SQS events)
if (Array.isArray(event?.Records)) {
return true;
}
// Check context.invocation.event.Records (wrapped SQS events)
// The key difference: SQS events have Records with messageId,
// direct invocations do not have Records array
const invocationEvent = context?.invocation?.event;
if (Array.isArray(invocationEvent?.Records) && invocationEvent.Records[0]?.messageId) {
return true;
}
// If no Records array found, it's a direct invocation
return false;
}
export const main = async (event, context) => {
if (isSqsEvent(event, context)) {
return runSQS(event, context);
}
const payload = context?.invocation?.event;
if (!isNonEmptyObject(payload)) {
context?.log?.warn?.('Direct invocation missing payload');
return badRequest('Event does not contain a valid message body');
}
return runDirect(payload, context);
};