diff --git a/packages/core/src/destination-kit/action.ts b/packages/core/src/destination-kit/action.ts index 75268caa5f0..b866af933b1 100644 --- a/packages/core/src/destination-kit/action.ts +++ b/packages/core/src/destination-kit/action.ts @@ -17,7 +17,8 @@ import type { ActionDestinationSuccessResponseType, ActionDestinationErrorResponseType, ResultMultiStatusNode, - AudienceMembership + AudienceMembership, + AsyncPollResponseType } from './types' import { syncModeTypes } from './types' import { HTTPError, NormalizedOptions } from '../request-client' @@ -92,6 +93,13 @@ export interface BaseActionDefinition { * The fields used to perform the action. These fields should match what the partner API expects. */ fields: ActionFields + + /** + * The fields used specifically for polling async operations. These are typically minimal fields + * containing only identifiers needed to check operation status (e.g., operationId). + * REQUIRED when defining a poll method - ensures security and performance by validating only essential polling data. + */ + pollFields?: ActionFields } type HookValueTypes = string | boolean | number | Array @@ -113,7 +121,9 @@ export interface ActionDefinition< // eslint-disable-next-line @typescript-eslint/no-explicit-any GeneratedActionHookInputs = any, // eslint-disable-next-line @typescript-eslint/no-explicit-any - GeneratedActionHookOutputs = any + GeneratedActionHookOutputs = any, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + PollPayload = any > extends BaseActionDefinition { /** * A way to "register" dynamic fields. @@ -149,6 +159,9 @@ export interface ActionDefinition< /** The operation to perform when this action is triggered for a batch of events */ performBatch?: RequestFn + /** The operation to poll the status of asynchronous actions */ + pollStatus?: RequestFn + /** Hooks are triggered at some point in a mappings lifecycle. They may perform a request with the * destination using the provided inputs and return a response. The response may then optionally be stored * in the mapping for later use in the action. @@ -264,20 +277,27 @@ const isSyncMode = (value: unknown): value is SyncMode => { * Action is the beginning step for all partner actions. Entrypoints always start with the * MapAndValidateInput step. */ -export class Action extends EventEmitter { - readonly definition: ActionDefinition +export class Action< + Settings, + Payload extends JSONLikeObject, + AudienceSettings = any, + PollPayload = unknown +> extends EventEmitter { + readonly definition: ActionDefinition readonly destinationName: string readonly schema?: JSONSchema4 + readonly pollSchema?: JSONSchema4 readonly hookSchemas?: Record readonly hasBatchSupport: boolean readonly hasHookSupport: boolean + readonly hasPollSupport: boolean // Payloads may be any type so we use `any` explicitly here. // eslint-disable-next-line @typescript-eslint/no-explicit-any private extendRequest: RequestExtension | undefined constructor( destinationName: string, - definition: ActionDefinition, + definition: ActionDefinition, // Payloads may be any type so we use `any` explicitly here. // eslint-disable-next-line @typescript-eslint/no-explicit-any extendRequest?: RequestExtension @@ -288,10 +308,17 @@ export class Action + ): Promise { + if (!this.hasPollSupport || !this.definition.pollStatus) { + throw new IntegrationError('This action does not support polling.', 'NotImplemented', 501) + } + + const payload = bundle.data as PollPayload + // Remove empty values and validate using poll schema (required for polling operations) + if (!this.pollSchema) { + throw new IntegrationError('Poll fields must be defined for polling operations.', 'NotImplemented', 501) + } + const validationSchema = this.pollSchema + // Cast to PollPayload as the removeEmptyValues pipeline produces a valid poll payload + // This represents the PollPayload type defined in the ActionDefinition (e.g., { operationId: string }) + const pollPayload = removeEmptyValues(payload, validationSchema, true) as PollPayload + // Validate the resolved payload against the poll schema + const schemaKey = `${this.destinationName}:${this.definition.title}:poll` + validateSchema(pollPayload, validationSchema, { + schemaKey, + statsContext: bundle.statsContext, + exempt: ['dynamicAuthSettings'] + }) + + // Construct the data bundle to send to the poll action + const dataBundle = { + rawData: bundle.data, + rawMapping: bundle.mapping, + settings: bundle.settings, + payload: pollPayload, + auth: bundle.auth, + features: bundle.features, + statsContext: bundle.statsContext, + logger: bundle.logger, + engageDestinationCache: bundle.engageDestinationCache, + transactionContext: bundle.transactionContext, + stateContext: bundle.stateContext, + audienceSettings: bundle.audienceSettings, + subscriptionMetadata: bundle.subscriptionMetadata, + signal: bundle?.signal + } + + // Construct the request client and perform the poll operation + const requestClient = this.createRequestClient(dataBundle) + const pollResponse = await this.definition.pollStatus(requestClient, dataBundle) + + return pollResponse + } + /* * Extract the dynamic field context and handler path from a field string. Examples: * - "structured.first_name" => { dynamicHandlerPath: "structured.first_name" } @@ -738,11 +814,19 @@ export class Action { return action.executeDynamicField(fieldKey, data, dynamicFn) } + public async executePoll( + actionSlug: string, + { + event, + mapping, + subscriptionMetadata, + settings, + features, + statsContext, + logger, + engageDestinationCache, + transactionContext, + stateContext, + signal + }: EventInput + ): Promise { + const action = this.actions[actionSlug] + if (!action) { + throw new IntegrationError(`Action ${actionSlug} not found`, 'NotImplemented', 404) + } + + let audienceSettings = {} as AudienceSettings + if (event.context?.personas) { + audienceSettings = event.context?.personas?.audience_settings as AudienceSettings + } + const authData = getAuthData(settings as JSONObject) + return action.executePoll({ + mapping, + data: event as unknown as InputData, + settings, + audienceSettings, + auth: authData, + features, + statsContext, + logger, + engageDestinationCache, + transactionContext, + stateContext, + subscriptionMetadata, + signal + }) + } + private async onSubscription( subscription: Subscription, events: SegmentEvent | SegmentEvent[], diff --git a/packages/core/src/destination-kit/types.ts b/packages/core/src/destination-kit/types.ts index dfe20ffb60b..a81d00d0df5 100644 --- a/packages/core/src/destination-kit/types.ts +++ b/packages/core/src/destination-kit/types.ts @@ -405,6 +405,31 @@ export type ActionDestinationErrorResponseType = { body?: JSONLikeObject | string } +export type AsyncOperationResult = { + /** The current status of this operation */ + status: 'pending' | 'completed' | 'failed' + /** Message about current state */ + message?: string + /** Final result data when status is 'completed' */ + result?: JSONLikeObject + /** Error information when status is 'failed' */ + error?: { + code: string + message: string + } + /** Original context for this operation */ + context?: JSONLikeObject +} + +export type AsyncPollResponseType = { + /** Array of operation results - single element for individual operations, multiple for batch */ + results: AsyncOperationResult[] + /** Overall status - completed when all operations are done */ + overallStatus: 'pending' | 'completed' | 'failed' | 'partial' + /** Summary message */ + message?: string +} + export type ResultMultiStatusNode = | ActionDestinationSuccessResponseType | (ActionDestinationErrorResponseType & { diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index ce662378f41..3690a1b09c9 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -82,7 +82,8 @@ export type { StatsContext, Logger, Preset, - Result + Result, + AsyncPollResponseType } from './destination-kit' export type { diff --git a/packages/destination-actions/src/destinations/salesforce-marketing-cloud/asyncDataExtension/index.ts b/packages/destination-actions/src/destinations/salesforce-marketing-cloud/asyncDataExtension/index.ts index 821016821a6..42fdb44b7fc 100644 --- a/packages/destination-actions/src/destinations/salesforce-marketing-cloud/asyncDataExtension/index.ts +++ b/packages/destination-actions/src/destinations/salesforce-marketing-cloud/asyncDataExtension/index.ts @@ -1,10 +1,15 @@ -import { ActionDefinition, IntegrationError } from '@segment/actions-core' +import { ActionDefinition, IntegrationError, JSONLikeObject } from '@segment/actions-core' import type { Settings } from '../generated-types' import type { Payload } from './generated-types' import { keys, values_dataExtensionFields, dataExtensionHook } from '../sfmc-properties' -import { getDataExtensionFields, asyncUpsertRowsV2 } from '../sfmc-operations' +import { getDataExtensionFields, asyncUpsertRowsV2, pollAsyncOperation } from '../sfmc-operations' -const action: ActionDefinition = { +// Define the minimal payload type for polling operations +interface PollPayload { + operationId: string +} + +const action: ActionDefinition = { title: 'Send Event asynchronously to Data Extension', description: `Upsert event records asynchronously as rows into a data extension in Salesforce Marketing Cloud. @@ -54,6 +59,14 @@ const action: ActionDefinition = { } } }, + pollFields: { + operationId: { + label: 'Operation ID', + description: 'The ID of the asynchronous operation to poll.', + type: 'string', + required: true + } + }, hooks: { retlOnMappingSave: { ...dataExtensionHook @@ -81,6 +94,56 @@ const action: ActionDefinition = { throw new IntegrationError('No Data Extension Connected', 'INVALID_CONFIGURATION', 400) } return asyncUpsertRowsV2(request, settings.subdomain, payload, dataExtensionId) + }, + + pollStatus: async (request, { settings, payload }) => { + // Get the operation ID from the payload + const operationId = payload.operationId + + if (!operationId) { + throw new IntegrationError('Operation ID is required for polling.', 'INVALID_REQUEST', 400) + } + + // Poll the SFMC async operation status + const pollResult = await pollAsyncOperation(request, settings.subdomain, operationId) + + // Map SFMC status to framework status + // Check both requestStatus (Complete/InProcess/etc) and resultStatus (OK/Error) + let status: 'pending' | 'completed' | 'failed' + const sfmcStatus = pollResult.results as any + + if (pollResult.status === 'Complete') { + // Request finished - check if it succeeded or had errors + if (sfmcStatus?.hasErrors || sfmcStatus?.resultStatus === 'Error') { + status = 'failed' + } else { + status = 'completed' + } + } else if (pollResult.status === 'Failed' || pollResult.status === 'Error') { + status = 'failed' + } else { + // InProcess, Queued, etc. + status = 'pending' + } + + const results = pollResult.results || {} + const context: JSONLikeObject = { + operationId: pollResult.operationId, + completedAt: pollResult.completedAt + } + + return { + results: [ + { + status: status, + message: pollResult.errorMessage || `Operation ${operationId} is ${pollResult.status}`, + result: results as JSONLikeObject, + context: context + } + ], + overallStatus: status, + message: pollResult.errorMessage || `Async operation ${status}` + } } } diff --git a/packages/destination-actions/src/destinations/salesforce-marketing-cloud/sfmc-operations.ts b/packages/destination-actions/src/destinations/salesforce-marketing-cloud/sfmc-operations.ts index 4efdfb34316..a23ea3b2022 100644 --- a/packages/destination-actions/src/destinations/salesforce-marketing-cloud/sfmc-operations.ts +++ b/packages/destination-actions/src/destinations/salesforce-marketing-cloud/sfmc-operations.ts @@ -22,6 +22,34 @@ import { SALESFORCE_MARKETING_CLOUD_DATA_API_VERSION } from './versioning-info' +/** + * Interface for SFMC async operation status response from /status endpoint + */ +interface SFMCAsyncStatus { + status: { + callDateTime: string + completionDateTime?: string + hasErrors: boolean + pickupDateTime?: string + requestStatus: string + resultStatus: string + requestId: string + } + requestId: string + resultMessages: string[] +} + +/** + * Interface for poll response + */ +interface PollResponse { + status: string + operationId: string + completedAt?: string + errorMessage?: string + results?: Record +} + function generateRows(payloads: payload_dataExtension[] | payload_contactDataExtension[]): Record[] { const rows: Record[] = [] payloads.forEach((payload: payload_dataExtension | payload_contactDataExtension) => { @@ -57,6 +85,70 @@ function isRetryableError(errData: ErrorData, status: number): boolean { ) } +export async function pollAsyncOperation( + request: RequestClient, + subdomain: string, + operationId: string +): Promise { + try { + console.log('subdomain', subdomain, 'operationId', operationId) + const response = await request( + `https://${subdomain}.rest.marketingcloudapis.com/data/v1/async/${operationId}/status`, + { + method: 'GET', + headers: { + 'Content-Type': 'application/json' + } + } + ) + + const data = response.data + console.log('poll data', data) + + return { + status: data.status.requestStatus, + operationId: operationId, + completedAt: data.status.completionDateTime, + errorMessage: data.status.hasErrors ? 'Operation completed with errors' : undefined, + results: { + requestStatus: data.status.requestStatus, + resultStatus: data.status.resultStatus, + hasErrors: data.status.hasErrors, + resultMessages: data.resultMessages || [], + callDateTime: data.status.callDateTime, + pickupDateTime: data.status.pickupDateTime, + completionDateTime: data.status.completionDateTime + } + } + } catch (error) { + console.log('poll error', error) + const err = error as ErrorResponse + + if (err?.response?.status === 401) { + throw error + } + + // If operation not found, it might be completed and cleaned up + if (err?.response?.status === 404) { + return { + status: 'NotFound', + operationId: operationId, + errorMessage: 'Operation not found - may have been completed and cleaned up' + } + } + + const errorMessage = err?.response?.data + ? (err.response.data as { message?: string }).message || 'Unknown error' + : 'Unknown error' + + throw new IntegrationError( + `Failed to poll async operation: ${errorMessage}`, + 'ASYNC_POLL_FAILED', + err?.response?.status || 500 + ) + } +} + export async function asyncUpsertRowsV2( request: RequestClient, subdomain: String, @@ -79,7 +171,7 @@ export async function asyncUpsertRowsV2( json: { items: rows } } ) - + console.log('no multistatsus') return response }