Skip to content

[Async Batch Pipeline] Add support for onPoll method#3726

Draft
sayan-das-in wants to merge 5 commits intomainfrom
async-batch-pipeline
Draft

[Async Batch Pipeline] Add support for onPoll method#3726
sayan-das-in wants to merge 5 commits intomainfrom
async-batch-pipeline

Conversation

@sayan-das-in
Copy link
Copy Markdown
Contributor

This PR adds an onPoll method for Actions with Async Support.

Testing

Include any additional information about the testing you have completed to
ensure your changes behave as expected. For a speedy review, please check
any of the tasks you completed below during your testing.

  • Added unit tests for new functionality
  • Tested end-to-end using the local server
  • [If destination is already live] Tested for backward compatibility of destination. Note: New required fields are a breaking change.
  • [Segmenters] Tested in the staging environment
  • [Segmenters] [If applicable for this change] Tested for regression with Hadron.

Security Review

Please ensure sensitive data is properly protected in your integration.

  • Reviewed all field definitions for sensitive data (API keys, tokens, passwords, client secrets) and confirmed they use type: 'password'

New Destination Checklist

  • Extracted all action API versions to verioning-info.ts file. example

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds first-class polling support for async Actions by extending the core destination-kit execution path and wiring an example implementation for the Salesforce Marketing Cloud (SFMC) async Data Extension action.

Changes:

  • Introduces pollFields + pollStatus to the ActionDefinition/Action execution model and adds Destination.executePoll(...) in core.
  • Adds new async polling response types (AsyncActionResponseType, AsyncPollResponseType) and exports them from @segment/actions-core.
  • Implements SFMC async operation polling (pollAsyncOperation) and exposes it via the SFMC asyncDataExtension action’s pollStatus.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
packages/destination-actions/src/destinations/salesforce-marketing-cloud/sfmc-operations.ts Adds SFMC async status polling helper and returns a normalized poll response.
packages/destination-actions/src/destinations/salesforce-marketing-cloud/asyncDataExtension/index.ts Defines pollFields and implements pollStatus mapping SFMC statuses to framework statuses.
packages/core/src/index.ts Re-exports new async polling-related types from destination-kit.
packages/core/src/destination-kit/types.ts Defines AsyncActionResponseType / AsyncPollResponseType and related result structures.
packages/core/src/destination-kit/index.ts Adds Destination.executePoll(...) entrypoint to invoke an action’s poll handler.
packages/core/src/destination-kit/action.ts Adds poll schema generation, executePoll(...), and pollFields/pollStatus support in ActionDefinition.

Comment on lines +113 to +125
if (pollResult.status === 'Complete') {
// Request finished - check if it succeeded or had errors
if (sfmcStatus?.hasErrors || sfmcStatus?.resultStatus === 'Error') {
status = 'failed'
} else {
status = 'completed'
}
} else if (pollResult.status === 'Failed' || pollResult.status === 'Error') {
status = 'failed'
} else {
// InProcess, Queued, etc.
status = 'pending'
}
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pollAsyncOperation can return status: 'NotFound' with an errorMessage, but this mapping treats any non-Complete/Failed/Error status as pending. This will keep polling forever on 404. Handle the NotFound (and other terminal error) statuses explicitly by mapping them to failed (or another terminal outcome) so polling stops.

Copilot uses AI. Check for mistakes.
Comment on lines +630 to +680
async executePoll(
bundle: ExecuteBundle<Settings, InputData | undefined, AudienceSettings>
): Promise<AsyncPollResponseType> {
if (!this.hasPollSupport || !this.definition.pollStatus) {
throw new IntegrationError('This action does not support polling.', 'NotImplemented', 501)
}

const payload = bundle.data as PollPayload
// Remove empty values and validate using poll schema (required for polling operations)
if (!this.pollSchema) {
throw new IntegrationError('Poll fields must be defined for polling operations.', 'NotImplemented', 501)
}
const validationSchema = this.pollSchema
// Cast to PollPayload as the removeEmptyValues pipeline produces a valid poll payload
// This represents the PollPayload type defined in the ActionDefinition (e.g., { operationId: string })
const pollPayload = removeEmptyValues(payload, validationSchema, true) as PollPayload
// Validate the resolved payload against the poll schema
const schemaKey = `${this.destinationName}:${this.definition.title}:poll`
validateSchema(pollPayload, validationSchema, {
schemaKey,
statsContext: bundle.statsContext,
exempt: ['dynamicAuthSettings']
})

// Construct the data bundle to send to the poll action
const dataBundle = {
rawData: bundle.data,
rawMapping: bundle.mapping,
settings: bundle.settings,
payload: pollPayload,
auth: bundle.auth,
features: bundle.features,
statsContext: bundle.statsContext,
logger: bundle.logger,
engageDestinationCache: bundle.engageDestinationCache,
transactionContext: bundle.transactionContext,
stateContext: bundle.stateContext,
audienceSettings: bundle.audienceSettings,
subscriptionMetadata: bundle.subscriptionMetadata,
signal: bundle?.signal
}

// Construct the request client and perform the poll operation
const requestClient = this.createRequestClient(dataBundle)
if (!this.definition.pollStatus) {
throw new IntegrationError('Poll method is not defined.', 'NotImplemented', 501)
}
const pollResponse = await this.definition.pollStatus(requestClient, dataBundle)

return pollResponse
}
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Core change adds a new polling execution path (executePoll) but there are no accompanying unit tests. Add tests covering: poll schema validation (missing required fields), successful pollStatus invocation, and error cases (action without pollStatus, missing pollFields). This helps prevent regressions across all destinations.

Copilot generated this review using guidance from repository custom instructions.
Comment on lines +723 to +763
public async executePoll(
actionSlug: string,
{
event,
mapping,
subscriptionMetadata,
settings,
features,
statsContext,
logger,
engageDestinationCache,
transactionContext,
stateContext,
signal
}: EventInput<Settings>
): Promise<AsyncPollResponseType> {
const action = this.actions[actionSlug]
if (!action) {
throw new IntegrationError(`Action ${actionSlug} not found`, 'NotImplemented', 404)
}

let audienceSettings = {} as AudienceSettings
if (event.context?.personas) {
audienceSettings = event.context?.personas?.audience_settings as AudienceSettings
}
const authData = getAuthData(settings as JSONObject)
return action.executePoll({
mapping,
data: event as unknown as InputData,
settings,
audienceSettings,
auth: authData,
features,
statsContext,
logger,
engageDestinationCache,
transactionContext,
stateContext,
subscriptionMetadata,
signal
})
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executePoll currently ignores the auth value from EventInput and always recomputes auth via getAuthData(settings). This is inconsistent with executeAction/executeBatch (which pass through the provided auth) and can break callers that supply runtime auth tokens (e.g., oauth-managed refresh flows or tests). Include auth in the destructuring and pass it through (falling back to getAuthData(settings) only when auth is not provided).

Copilot uses AI. Check for mistakes.
Comment on lines +94 to +107
console.log('subdomain', subdomain, 'operationId', operationId)
const response = await request<SFMCAsyncStatus>(
`https://${subdomain}.rest.marketingcloudapis.com/data/v1/async/${operationId}/status`,
{
method: 'GET',
headers: {
'Content-Type': 'application/json'
}
}
)

const data = response.data
console.log('poll data', data)

Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the debug console.log calls in pollAsyncOperation. These logs can leak operational data to customer logs and add noise in production; prefer the framework logger (if available in this layer) or rely on existing request/response instrumentation.

Copilot uses AI. Check for mistakes.
}
}
} catch (error) {
console.log('poll error', error)
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid logging raw errors with console.log('poll error', error). The error object can include request/response details and headers; instead rethrow or wrap the error (as you already do below) and let centralized error/reporting handle it.

Suggested change
console.log('poll error', error)

Copilot uses AI. Check for mistakes.
Comment on lines +131 to +137
// If operation not found, it might be completed and cleaned up
if (err?.response?.status === 404) {
return {
status: 'NotFound',
operationId: operationId,
errorMessage: 'Operation not found - may have been completed and cleaned up'
}
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning status: 'NotFound' on 404 will be treated as pending by the current SFMC pollStatus mapping, which can cause infinite polling. Consider mapping 404 to a terminal state (e.g., throw an IntegrationError, or return a status that the caller treats as failed/completed) and include a clear message/context.

Suggested change
// If operation not found, it might be completed and cleaned up
if (err?.response?.status === 404) {
return {
status: 'NotFound',
operationId: operationId,
errorMessage: 'Operation not found - may have been completed and cleaned up'
}
// Treat missing operations as terminal to avoid infinite polling.
if (err?.response?.status === 404) {
throw new IntegrationError(
`Async operation ${operationId} was not found while polling SFMC status. The operation may have been removed or is no longer available.`,
'ASYNC_OPERATION_NOT_FOUND',
404
)

Copilot uses AI. Check for mistakes.
}
)

console.log('no multistatsus')
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the stray console.log('no multistatsus') debug log. If the intent is to track async upsert outcomes, use structured logging/metrics via existing framework hooks instead.

Suggested change
console.log('no multistatsus')

Copilot uses AI. Check for mistakes.
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 15, 2026

Codecov Report

❌ Patch coverage is 13.88889% with 62 lines in your changes missing coverage. Please review.
✅ Project coverage is 80.73%. Comparing base (7a44526) to head (427d632).

Files with missing lines Patch % Lines
packages/core/src/destination-kit/action.ts 21.73% 18 Missing ⚠️
...ions/salesforce-marketing-cloud/sfmc-operations.ts 10.00% 18 Missing ⚠️
...sforce-marketing-cloud/asyncDataExtension/index.ts 15.00% 17 Missing ⚠️
packages/core/src/destination-kit/index.ts 0.00% 9 Missing ⚠️

❌ Your patch check has failed because the patch coverage (13.88%) is below the target coverage (80.00%). You can increase the patch coverage or adjust the target coverage.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3726      +/-   ##
==========================================
- Coverage   80.91%   80.73%   -0.18%     
==========================================
  Files        1386     1386              
  Lines       27903    27970      +67     
  Branches     6026     6025       -1     
==========================================
+ Hits        22577    22582       +5     
- Misses       4350     4428      +78     
+ Partials      976      960      -16     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copilot AI review requested due to automatic review settings April 29, 2026 18:11
@sayan-das-in sayan-das-in force-pushed the async-batch-pipeline branch from 52ae376 to cc2f3bf Compare April 29, 2026 18:11
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.

Comment on lines +749 to +753
const authData = getAuthData(settings as JSONObject)
return action.executePoll({
mapping,
data: event as unknown as InputData,
settings,
Comment on lines +817 to +821
console.log('[parseResponse] response type:', typeof response)
console.log('[parseResponse] response instanceof Response:', response instanceof Response)
console.log('[parseResponse] response value:', JSON.stringify(response))
console.log('[parseResponse] response.data:', (response as ModifiedResponse).data)

Comment on lines +724 to +765
public async executePoll(
actionSlug: string,
{
event,
mapping,
subscriptionMetadata,
settings,
features,
statsContext,
logger,
engageDestinationCache,
transactionContext,
stateContext,
signal
}: EventInput<Settings>
): Promise<AsyncPollResponseType> {
const action = this.actions[actionSlug]
if (!action) {
throw new IntegrationError(`Action ${actionSlug} not found`, 'NotImplemented', 404)
}

let audienceSettings = {} as AudienceSettings
if (event.context?.personas) {
audienceSettings = event.context?.personas?.audience_settings as AudienceSettings
}
const authData = getAuthData(settings as JSONObject)
return action.executePoll({
mapping,
data: event as unknown as InputData,
settings,
audienceSettings,
auth: authData,
features,
statsContext,
logger,
engageDestinationCache,
transactionContext,
stateContext,
subscriptionMetadata,
signal
})
}
Comment on lines +94 to +96
console.log('subdomain', subdomain, 'operationId', operationId)
const response = await request<SFMCAsyncStatus>(
`https://${subdomain}.rest.marketingcloudapis.com/data/v1/async/${operationId}/status`,
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants