[Async Batch Pipeline] Add support for onPoll method#3726
[Async Batch Pipeline] Add support for onPoll method#3726sayan-das-in wants to merge 5 commits intomainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds first-class polling support for async Actions by extending the core destination-kit execution path and wiring an example implementation for the Salesforce Marketing Cloud (SFMC) async Data Extension action.
Changes:
- Introduces
pollFields+pollStatusto the ActionDefinition/Action execution model and addsDestination.executePoll(...)in core. - Adds new async polling response types (
AsyncActionResponseType,AsyncPollResponseType) and exports them from@segment/actions-core. - Implements SFMC async operation polling (
pollAsyncOperation) and exposes it via the SFMCasyncDataExtensionaction’spollStatus.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/destination-actions/src/destinations/salesforce-marketing-cloud/sfmc-operations.ts | Adds SFMC async status polling helper and returns a normalized poll response. |
| packages/destination-actions/src/destinations/salesforce-marketing-cloud/asyncDataExtension/index.ts | Defines pollFields and implements pollStatus mapping SFMC statuses to framework statuses. |
| packages/core/src/index.ts | Re-exports new async polling-related types from destination-kit. |
| packages/core/src/destination-kit/types.ts | Defines AsyncActionResponseType / AsyncPollResponseType and related result structures. |
| packages/core/src/destination-kit/index.ts | Adds Destination.executePoll(...) entrypoint to invoke an action’s poll handler. |
| packages/core/src/destination-kit/action.ts | Adds poll schema generation, executePoll(...), and pollFields/pollStatus support in ActionDefinition. |
| if (pollResult.status === 'Complete') { | ||
| // Request finished - check if it succeeded or had errors | ||
| if (sfmcStatus?.hasErrors || sfmcStatus?.resultStatus === 'Error') { | ||
| status = 'failed' | ||
| } else { | ||
| status = 'completed' | ||
| } | ||
| } else if (pollResult.status === 'Failed' || pollResult.status === 'Error') { | ||
| status = 'failed' | ||
| } else { | ||
| // InProcess, Queued, etc. | ||
| status = 'pending' | ||
| } |
There was a problem hiding this comment.
pollAsyncOperation can return status: 'NotFound' with an errorMessage, but this mapping treats any non-Complete/Failed/Error status as pending. This will keep polling forever on 404. Handle the NotFound (and other terminal error) statuses explicitly by mapping them to failed (or another terminal outcome) so polling stops.
| async executePoll( | ||
| bundle: ExecuteBundle<Settings, InputData | undefined, AudienceSettings> | ||
| ): Promise<AsyncPollResponseType> { | ||
| if (!this.hasPollSupport || !this.definition.pollStatus) { | ||
| throw new IntegrationError('This action does not support polling.', 'NotImplemented', 501) | ||
| } | ||
|
|
||
| const payload = bundle.data as PollPayload | ||
| // Remove empty values and validate using poll schema (required for polling operations) | ||
| if (!this.pollSchema) { | ||
| throw new IntegrationError('Poll fields must be defined for polling operations.', 'NotImplemented', 501) | ||
| } | ||
| const validationSchema = this.pollSchema | ||
| // Cast to PollPayload as the removeEmptyValues pipeline produces a valid poll payload | ||
| // This represents the PollPayload type defined in the ActionDefinition (e.g., { operationId: string }) | ||
| const pollPayload = removeEmptyValues(payload, validationSchema, true) as PollPayload | ||
| // Validate the resolved payload against the poll schema | ||
| const schemaKey = `${this.destinationName}:${this.definition.title}:poll` | ||
| validateSchema(pollPayload, validationSchema, { | ||
| schemaKey, | ||
| statsContext: bundle.statsContext, | ||
| exempt: ['dynamicAuthSettings'] | ||
| }) | ||
|
|
||
| // Construct the data bundle to send to the poll action | ||
| const dataBundle = { | ||
| rawData: bundle.data, | ||
| rawMapping: bundle.mapping, | ||
| settings: bundle.settings, | ||
| payload: pollPayload, | ||
| auth: bundle.auth, | ||
| features: bundle.features, | ||
| statsContext: bundle.statsContext, | ||
| logger: bundle.logger, | ||
| engageDestinationCache: bundle.engageDestinationCache, | ||
| transactionContext: bundle.transactionContext, | ||
| stateContext: bundle.stateContext, | ||
| audienceSettings: bundle.audienceSettings, | ||
| subscriptionMetadata: bundle.subscriptionMetadata, | ||
| signal: bundle?.signal | ||
| } | ||
|
|
||
| // Construct the request client and perform the poll operation | ||
| const requestClient = this.createRequestClient(dataBundle) | ||
| if (!this.definition.pollStatus) { | ||
| throw new IntegrationError('Poll method is not defined.', 'NotImplemented', 501) | ||
| } | ||
| const pollResponse = await this.definition.pollStatus(requestClient, dataBundle) | ||
|
|
||
| return pollResponse | ||
| } |
There was a problem hiding this comment.
Core change adds a new polling execution path (executePoll) but there are no accompanying unit tests. Add tests covering: poll schema validation (missing required fields), successful pollStatus invocation, and error cases (action without pollStatus, missing pollFields). This helps prevent regressions across all destinations.
| public async executePoll( | ||
| actionSlug: string, | ||
| { | ||
| event, | ||
| mapping, | ||
| subscriptionMetadata, | ||
| settings, | ||
| features, | ||
| statsContext, | ||
| logger, | ||
| engageDestinationCache, | ||
| transactionContext, | ||
| stateContext, | ||
| signal | ||
| }: EventInput<Settings> | ||
| ): Promise<AsyncPollResponseType> { | ||
| const action = this.actions[actionSlug] | ||
| if (!action) { | ||
| throw new IntegrationError(`Action ${actionSlug} not found`, 'NotImplemented', 404) | ||
| } | ||
|
|
||
| let audienceSettings = {} as AudienceSettings | ||
| if (event.context?.personas) { | ||
| audienceSettings = event.context?.personas?.audience_settings as AudienceSettings | ||
| } | ||
| const authData = getAuthData(settings as JSONObject) | ||
| return action.executePoll({ | ||
| mapping, | ||
| data: event as unknown as InputData, | ||
| settings, | ||
| audienceSettings, | ||
| auth: authData, | ||
| features, | ||
| statsContext, | ||
| logger, | ||
| engageDestinationCache, | ||
| transactionContext, | ||
| stateContext, | ||
| subscriptionMetadata, | ||
| signal | ||
| }) |
There was a problem hiding this comment.
executePoll currently ignores the auth value from EventInput and always recomputes auth via getAuthData(settings). This is inconsistent with executeAction/executeBatch (which pass through the provided auth) and can break callers that supply runtime auth tokens (e.g., oauth-managed refresh flows or tests). Include auth in the destructuring and pass it through (falling back to getAuthData(settings) only when auth is not provided).
| console.log('subdomain', subdomain, 'operationId', operationId) | ||
| const response = await request<SFMCAsyncStatus>( | ||
| `https://${subdomain}.rest.marketingcloudapis.com/data/v1/async/${operationId}/status`, | ||
| { | ||
| method: 'GET', | ||
| headers: { | ||
| 'Content-Type': 'application/json' | ||
| } | ||
| } | ||
| ) | ||
|
|
||
| const data = response.data | ||
| console.log('poll data', data) | ||
|
|
There was a problem hiding this comment.
Remove the debug console.log calls in pollAsyncOperation. These logs can leak operational data to customer logs and add noise in production; prefer the framework logger (if available in this layer) or rely on existing request/response instrumentation.
| } | ||
| } | ||
| } catch (error) { | ||
| console.log('poll error', error) |
There was a problem hiding this comment.
Avoid logging raw errors with console.log('poll error', error). The error object can include request/response details and headers; instead rethrow or wrap the error (as you already do below) and let centralized error/reporting handle it.
| console.log('poll error', error) |
| // If operation not found, it might be completed and cleaned up | ||
| if (err?.response?.status === 404) { | ||
| return { | ||
| status: 'NotFound', | ||
| operationId: operationId, | ||
| errorMessage: 'Operation not found - may have been completed and cleaned up' | ||
| } |
There was a problem hiding this comment.
Returning status: 'NotFound' on 404 will be treated as pending by the current SFMC pollStatus mapping, which can cause infinite polling. Consider mapping 404 to a terminal state (e.g., throw an IntegrationError, or return a status that the caller treats as failed/completed) and include a clear message/context.
| // If operation not found, it might be completed and cleaned up | |
| if (err?.response?.status === 404) { | |
| return { | |
| status: 'NotFound', | |
| operationId: operationId, | |
| errorMessage: 'Operation not found - may have been completed and cleaned up' | |
| } | |
| // Treat missing operations as terminal to avoid infinite polling. | |
| if (err?.response?.status === 404) { | |
| throw new IntegrationError( | |
| `Async operation ${operationId} was not found while polling SFMC status. The operation may have been removed or is no longer available.`, | |
| 'ASYNC_OPERATION_NOT_FOUND', | |
| 404 | |
| ) |
| } | ||
| ) | ||
|
|
||
| console.log('no multistatsus') |
There was a problem hiding this comment.
Remove the stray console.log('no multistatsus') debug log. If the intent is to track async upsert outcomes, use structured logging/metrics via existing framework hooks instead.
| console.log('no multistatsus') |
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (13.88%) is below the target coverage (80.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #3726 +/- ##
==========================================
- Coverage 80.91% 80.73% -0.18%
==========================================
Files 1386 1386
Lines 27903 27970 +67
Branches 6026 6025 -1
==========================================
+ Hits 22577 22582 +5
- Misses 4350 4428 +78
+ Partials 976 960 -16 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
52ae376 to
cc2f3bf
Compare
| const authData = getAuthData(settings as JSONObject) | ||
| return action.executePoll({ | ||
| mapping, | ||
| data: event as unknown as InputData, | ||
| settings, |
| console.log('[parseResponse] response type:', typeof response) | ||
| console.log('[parseResponse] response instanceof Response:', response instanceof Response) | ||
| console.log('[parseResponse] response value:', JSON.stringify(response)) | ||
| console.log('[parseResponse] response.data:', (response as ModifiedResponse).data) | ||
|
|
| public async executePoll( | ||
| actionSlug: string, | ||
| { | ||
| event, | ||
| mapping, | ||
| subscriptionMetadata, | ||
| settings, | ||
| features, | ||
| statsContext, | ||
| logger, | ||
| engageDestinationCache, | ||
| transactionContext, | ||
| stateContext, | ||
| signal | ||
| }: EventInput<Settings> | ||
| ): Promise<AsyncPollResponseType> { | ||
| const action = this.actions[actionSlug] | ||
| if (!action) { | ||
| throw new IntegrationError(`Action ${actionSlug} not found`, 'NotImplemented', 404) | ||
| } | ||
|
|
||
| let audienceSettings = {} as AudienceSettings | ||
| if (event.context?.personas) { | ||
| audienceSettings = event.context?.personas?.audience_settings as AudienceSettings | ||
| } | ||
| const authData = getAuthData(settings as JSONObject) | ||
| return action.executePoll({ | ||
| mapping, | ||
| data: event as unknown as InputData, | ||
| settings, | ||
| audienceSettings, | ||
| auth: authData, | ||
| features, | ||
| statsContext, | ||
| logger, | ||
| engageDestinationCache, | ||
| transactionContext, | ||
| stateContext, | ||
| subscriptionMetadata, | ||
| signal | ||
| }) | ||
| } |
| console.log('subdomain', subdomain, 'operationId', operationId) | ||
| const response = await request<SFMCAsyncStatus>( | ||
| `https://${subdomain}.rest.marketingcloudapis.com/data/v1/async/${operationId}/status`, |
This PR adds an onPoll method for Actions with Async Support.
Testing
Include any additional information about the testing you have completed to
ensure your changes behave as expected. For a speedy review, please check
any of the tasks you completed below during your testing.
Security Review
Please ensure sensitive data is properly protected in your integration.
type: 'password'New Destination Checklist
verioning-info.tsfile. example