Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 90 additions & 6 deletions packages/core/src/destination-kit/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import type {
ActionDestinationSuccessResponseType,
ActionDestinationErrorResponseType,
ResultMultiStatusNode,
AudienceMembership
AudienceMembership,
AsyncPollResponseType
} from './types'
import { syncModeTypes } from './types'
import { HTTPError, NormalizedOptions } from '../request-client'
Expand Down Expand Up @@ -92,6 +93,13 @@ export interface BaseActionDefinition {
* The fields used to perform the action. These fields should match what the partner API expects.
*/
fields: ActionFields

/**
* The fields used specifically for polling async operations. These are typically minimal fields
* containing only identifiers needed to check operation status (e.g., operationId).
* REQUIRED when defining a poll method - ensures security and performance by validating only essential polling data.
*/
pollFields?: ActionFields
}

type HookValueTypes = string | boolean | number | Array<string | boolean | number>
Expand All @@ -113,7 +121,9 @@ export interface ActionDefinition<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
GeneratedActionHookInputs = any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
GeneratedActionHookOutputs = any
GeneratedActionHookOutputs = any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
PollPayload = any
> extends BaseActionDefinition {
/**
* A way to "register" dynamic fields.
Expand Down Expand Up @@ -149,6 +159,9 @@ export interface ActionDefinition<
/** The operation to perform when this action is triggered for a batch of events */
performBatch?: RequestFn<Settings, Payload[], PerformBatchResponse, AudienceSettings, any, AudienceMembership[]>

/** The operation to poll the status of asynchronous actions */
pollStatus?: RequestFn<Settings, PollPayload, AsyncPollResponseType, AudienceSettings>

/** Hooks are triggered at some point in a mappings lifecycle. They may perform a request with the
* destination using the provided inputs and return a response. The response may then optionally be stored
* in the mapping for later use in the action.
Expand Down Expand Up @@ -264,20 +277,27 @@ const isSyncMode = (value: unknown): value is SyncMode => {
* Action is the beginning step for all partner actions. Entrypoints always start with the
* MapAndValidateInput step.
*/
export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings = any> extends EventEmitter {
readonly definition: ActionDefinition<Settings, Payload, AudienceSettings>
export class Action<
Settings,
Payload extends JSONLikeObject,
AudienceSettings = any,
PollPayload = unknown
> extends EventEmitter {
readonly definition: ActionDefinition<Settings, Payload, AudienceSettings, unknown, unknown, PollPayload>
readonly destinationName: string
readonly schema?: JSONSchema4
readonly pollSchema?: JSONSchema4
readonly hookSchemas?: Record<string, JSONSchema4>
readonly hasBatchSupport: boolean
readonly hasHookSupport: boolean
readonly hasPollSupport: boolean
// Payloads may be any type so we use `any` explicitly here.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private extendRequest: RequestExtension<Settings, any> | undefined

constructor(
destinationName: string,
definition: ActionDefinition<Settings, Payload, AudienceSettings>,
definition: ActionDefinition<Settings, Payload, AudienceSettings, unknown, unknown, PollPayload>,
// Payloads may be any type so we use `any` explicitly here.
// eslint-disable-next-line @typescript-eslint/no-explicit-any
extendRequest?: RequestExtension<Settings, any>
Expand All @@ -288,10 +308,17 @@ export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings =
this.extendRequest = extendRequest
this.hasBatchSupport = typeof definition.performBatch === 'function'
this.hasHookSupport = definition.hooks !== undefined
this.hasPollSupport = typeof definition.pollStatus === 'function'
// Generate json schema based on the field definitions
if (Object.keys(definition.fields ?? {}).length) {
this.schema = fieldsToJsonSchema(definition.fields)
}

// Generate json schema for poll fields if they are defined
if (Object.keys(definition.pollFields ?? {}).length) {
this.pollSchema = fieldsToJsonSchema(definition.pollFields)
}

// Generate a json schema for each defined hook based on the field definitions
if (definition.hooks) {
for (const hookName in definition.hooks) {
Expand Down Expand Up @@ -604,6 +631,55 @@ export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings =
return multiStatusResponse
}

async executePoll(
bundle: ExecuteBundle<Settings, InputData | undefined, AudienceSettings>
): Promise<AsyncPollResponseType> {
if (!this.hasPollSupport || !this.definition.pollStatus) {
throw new IntegrationError('This action does not support polling.', 'NotImplemented', 501)
}

const payload = bundle.data as PollPayload
// Remove empty values and validate using poll schema (required for polling operations)
if (!this.pollSchema) {
throw new IntegrationError('Poll fields must be defined for polling operations.', 'NotImplemented', 501)
}
const validationSchema = this.pollSchema
// Cast to PollPayload as the removeEmptyValues pipeline produces a valid poll payload
// This represents the PollPayload type defined in the ActionDefinition (e.g., { operationId: string })
const pollPayload = removeEmptyValues(payload, validationSchema, true) as PollPayload
// Validate the resolved payload against the poll schema
const schemaKey = `${this.destinationName}:${this.definition.title}:poll`
validateSchema(pollPayload, validationSchema, {
schemaKey,
statsContext: bundle.statsContext,
exempt: ['dynamicAuthSettings']
})

// Construct the data bundle to send to the poll action
const dataBundle = {
rawData: bundle.data,
rawMapping: bundle.mapping,
settings: bundle.settings,
payload: pollPayload,
auth: bundle.auth,
features: bundle.features,
statsContext: bundle.statsContext,
logger: bundle.logger,
engageDestinationCache: bundle.engageDestinationCache,
transactionContext: bundle.transactionContext,
stateContext: bundle.stateContext,
audienceSettings: bundle.audienceSettings,
subscriptionMetadata: bundle.subscriptionMetadata,
signal: bundle?.signal
}

// Construct the request client and perform the poll operation
const requestClient = this.createRequestClient(dataBundle)
const pollResponse = await this.definition.pollStatus(requestClient, dataBundle)

return pollResponse
}
Comment on lines +634 to +681
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Core change adds a new polling execution path (executePoll) but there are no accompanying unit tests. Add tests covering: poll schema validation (missing required fields), successful pollStatus invocation, and error cases (action without pollStatus, missing pollFields). This helps prevent regressions across all destinations.

Copilot generated this review using guidance from repository custom instructions.

/*
* Extract the dynamic field context and handler path from a field string. Examples:
* - "structured.first_name" => { dynamicHandlerPath: "structured.first_name" }
Expand Down Expand Up @@ -738,11 +814,19 @@ export class Action<Settings, Payload extends JSONLikeObject, AudienceSettings =
* Try to use the parsed response `.data` or `.content` string
* @see {@link ../middleware/after-response/prepare-response.ts}
*/
console.log('[parseResponse] response type:', typeof response)
console.log('[parseResponse] response instanceof Response:', response instanceof Response)
console.log('[parseResponse] response value:', JSON.stringify(response))
console.log('[parseResponse] response.data:', (response as ModifiedResponse).data)

Comment on lines +817 to +821
if (response instanceof Response) {
return (response as ModifiedResponse).data ?? (response as ModifiedResponse).content
const result = (response as ModifiedResponse).data ?? (response as ModifiedResponse).content
console.log('[parseResponse] extracted from Response — .data/.content:', JSON.stringify(result))
return result
}

// otherwise, we don't really know what this is, so return as-is
console.log('[parseResponse] returning as-is (not a Response instance)')
return response
}

Expand Down
49 changes: 47 additions & 2 deletions packages/core/src/destination-kit/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import type {
Deletion,
DeletionPayload,
DynamicFieldResponse,
ResultMultiStatusNode
ResultMultiStatusNode,
AsyncPollResponseType
} from './types'
import type { AllRequestOptions } from '../request-client'
import { ErrorCodes, IntegrationError, InvalidAuthenticationError, MultiStatusErrorReporter } from '../errors'
Expand All @@ -44,7 +45,8 @@ export type {
ActionHookType,
ExecuteInput,
RequestFn,
Result
Result,
AsyncPollResponseType
}
export { hookTypeStrings }
export type { MinimalInputField }
Expand Down Expand Up @@ -719,6 +721,49 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> {
return action.executeDynamicField(fieldKey, data, dynamicFn)
}

public async executePoll(
actionSlug: string,
{
event,
mapping,
subscriptionMetadata,
settings,
features,
statsContext,
logger,
engageDestinationCache,
transactionContext,
stateContext,
signal
}: EventInput<Settings>
): Promise<AsyncPollResponseType> {
const action = this.actions[actionSlug]
if (!action) {
throw new IntegrationError(`Action ${actionSlug} not found`, 'NotImplemented', 404)
}

let audienceSettings = {} as AudienceSettings
if (event.context?.personas) {
audienceSettings = event.context?.personas?.audience_settings as AudienceSettings
}
const authData = getAuthData(settings as JSONObject)
return action.executePoll({
mapping,
data: event as unknown as InputData,
settings,
Comment on lines +749 to +753
audienceSettings,
auth: authData,
features,
statsContext,
logger,
engageDestinationCache,
transactionContext,
stateContext,
subscriptionMetadata,
signal
})
Comment on lines +724 to +764
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executePoll currently ignores the auth value from EventInput and always recomputes auth via getAuthData(settings). This is inconsistent with executeAction/executeBatch (which pass through the provided auth) and can break callers that supply runtime auth tokens (e.g., oauth-managed refresh flows or tests). Include auth in the destructuring and pass it through (falling back to getAuthData(settings) only when auth is not provided).

Copilot uses AI. Check for mistakes.
}
Comment on lines +724 to +765

private async onSubscription(
subscription: Subscription,
events: SegmentEvent | SegmentEvent[],
Expand Down
25 changes: 25 additions & 0 deletions packages/core/src/destination-kit/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,31 @@ export type ActionDestinationErrorResponseType = {
body?: JSONLikeObject | string
}

export type AsyncOperationResult = {
/** The current status of this operation */
status: 'pending' | 'completed' | 'failed'
/** Message about current state */
message?: string
/** Final result data when status is 'completed' */
result?: JSONLikeObject
/** Error information when status is 'failed' */
error?: {
code: string
message: string
}
/** Original context for this operation */
context?: JSONLikeObject
}

export type AsyncPollResponseType = {
/** Array of operation results - single element for individual operations, multiple for batch */
results: AsyncOperationResult[]
/** Overall status - completed when all operations are done */
overallStatus: 'pending' | 'completed' | 'failed' | 'partial'
/** Summary message */
message?: string
}

export type ResultMultiStatusNode =
| ActionDestinationSuccessResponseType
| (ActionDestinationErrorResponseType & {
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ export type {
StatsContext,
Logger,
Preset,
Result
Result,
AsyncPollResponseType
} from './destination-kit'

export type {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import { ActionDefinition, IntegrationError } from '@segment/actions-core'
import { ActionDefinition, IntegrationError, JSONLikeObject } from '@segment/actions-core'
import type { Settings } from '../generated-types'
import type { Payload } from './generated-types'
import { keys, values_dataExtensionFields, dataExtensionHook } from '../sfmc-properties'
import { getDataExtensionFields, asyncUpsertRowsV2 } from '../sfmc-operations'
import { getDataExtensionFields, asyncUpsertRowsV2, pollAsyncOperation } from '../sfmc-operations'

const action: ActionDefinition<Settings, Payload> = {
// Define the minimal payload type for polling operations
interface PollPayload {
operationId: string
}

const action: ActionDefinition<Settings, Payload, unknown, unknown, unknown, PollPayload> = {
title: 'Send Event asynchronously to Data Extension',
description: `Upsert event records asynchronously as rows into a data extension in Salesforce Marketing Cloud.

Expand Down Expand Up @@ -54,6 +59,14 @@ const action: ActionDefinition<Settings, Payload> = {
}
}
},
pollFields: {
operationId: {
label: 'Operation ID',
description: 'The ID of the asynchronous operation to poll.',
type: 'string',
required: true
}
},
hooks: {
retlOnMappingSave: {
...dataExtensionHook
Expand Down Expand Up @@ -81,6 +94,56 @@ const action: ActionDefinition<Settings, Payload> = {
throw new IntegrationError('No Data Extension Connected', 'INVALID_CONFIGURATION', 400)
}
return asyncUpsertRowsV2(request, settings.subdomain, payload, dataExtensionId)
},

pollStatus: async (request, { settings, payload }) => {
// Get the operation ID from the payload
const operationId = payload.operationId

if (!operationId) {
throw new IntegrationError('Operation ID is required for polling.', 'INVALID_REQUEST', 400)
}

// Poll the SFMC async operation status
const pollResult = await pollAsyncOperation(request, settings.subdomain, operationId)

// Map SFMC status to framework status
// Check both requestStatus (Complete/InProcess/etc) and resultStatus (OK/Error)
let status: 'pending' | 'completed' | 'failed'
const sfmcStatus = pollResult.results as any

if (pollResult.status === 'Complete') {
// Request finished - check if it succeeded or had errors
if (sfmcStatus?.hasErrors || sfmcStatus?.resultStatus === 'Error') {
status = 'failed'
} else {
status = 'completed'
}
} else if (pollResult.status === 'Failed' || pollResult.status === 'Error') {
status = 'failed'
} else {
// InProcess, Queued, etc.
status = 'pending'
}
Comment on lines +115 to +127
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pollAsyncOperation can return status: 'NotFound' with an errorMessage, but this mapping treats any non-Complete/Failed/Error status as pending. This will keep polling forever on 404. Handle the NotFound (and other terminal error) statuses explicitly by mapping them to failed (or another terminal outcome) so polling stops.

Copilot uses AI. Check for mistakes.

const results = pollResult.results || {}
const context: JSONLikeObject = {
operationId: pollResult.operationId,
completedAt: pollResult.completedAt
}

return {
results: [
{
status: status,
message: pollResult.errorMessage || `Operation ${operationId} is ${pollResult.status}`,
result: results as JSONLikeObject,
context: context
}
],
overallStatus: status,
message: pollResult.errorMessage || `Async operation ${status}`
}
}
}

Expand Down
Loading
Loading