-
Notifications
You must be signed in to change notification settings - Fork 307
[Async Batch Pipeline] Add support for onPoll method #3726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
4b5ae79
2df3a1c
f2cd013
4e0a4fd
cc2f3bf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,8 @@ import type { | |
| Deletion, | ||
| DeletionPayload, | ||
| DynamicFieldResponse, | ||
| ResultMultiStatusNode | ||
| ResultMultiStatusNode, | ||
| AsyncPollResponseType | ||
| } from './types' | ||
| import type { AllRequestOptions } from '../request-client' | ||
| import { ErrorCodes, IntegrationError, InvalidAuthenticationError, MultiStatusErrorReporter } from '../errors' | ||
|
|
@@ -44,7 +45,8 @@ export type { | |
| ActionHookType, | ||
| ExecuteInput, | ||
| RequestFn, | ||
| Result | ||
| Result, | ||
| AsyncPollResponseType | ||
| } | ||
| export { hookTypeStrings } | ||
| export type { MinimalInputField } | ||
|
|
@@ -719,6 +721,49 @@ export class Destination<Settings = JSONObject, AudienceSettings = JSONObject> { | |
| return action.executeDynamicField(fieldKey, data, dynamicFn) | ||
| } | ||
|
|
||
| public async executePoll( | ||
| actionSlug: string, | ||
| { | ||
| event, | ||
| mapping, | ||
| subscriptionMetadata, | ||
| settings, | ||
| features, | ||
| statsContext, | ||
| logger, | ||
| engageDestinationCache, | ||
| transactionContext, | ||
| stateContext, | ||
| signal | ||
| }: EventInput<Settings> | ||
| ): Promise<AsyncPollResponseType> { | ||
| const action = this.actions[actionSlug] | ||
| if (!action) { | ||
| throw new IntegrationError(`Action ${actionSlug} not found`, 'NotImplemented', 404) | ||
| } | ||
|
|
||
| let audienceSettings = {} as AudienceSettings | ||
| if (event.context?.personas) { | ||
| audienceSettings = event.context?.personas?.audience_settings as AudienceSettings | ||
| } | ||
| const authData = getAuthData(settings as JSONObject) | ||
| return action.executePoll({ | ||
| mapping, | ||
| data: event as unknown as InputData, | ||
| settings, | ||
|
Comment on lines
+749
to
+753
|
||
| audienceSettings, | ||
| auth: authData, | ||
| features, | ||
| statsContext, | ||
| logger, | ||
| engageDestinationCache, | ||
| transactionContext, | ||
| stateContext, | ||
| subscriptionMetadata, | ||
| signal | ||
| }) | ||
|
Comment on lines
+724
to
+764
|
||
| } | ||
|
Comment on lines
+724
to
+765
|
||
|
|
||
| private async onSubscription( | ||
| subscription: Subscription, | ||
| events: SegmentEvent | SegmentEvent[], | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,15 @@ | ||
| import { ActionDefinition, IntegrationError } from '@segment/actions-core' | ||
| import { ActionDefinition, IntegrationError, JSONLikeObject } from '@segment/actions-core' | ||
| import type { Settings } from '../generated-types' | ||
| import type { Payload } from './generated-types' | ||
| import { keys, values_dataExtensionFields, dataExtensionHook } from '../sfmc-properties' | ||
| import { getDataExtensionFields, asyncUpsertRowsV2 } from '../sfmc-operations' | ||
| import { getDataExtensionFields, asyncUpsertRowsV2, pollAsyncOperation } from '../sfmc-operations' | ||
|
|
||
| const action: ActionDefinition<Settings, Payload> = { | ||
| // Define the minimal payload type for polling operations | ||
| interface PollPayload { | ||
| operationId: string | ||
| } | ||
|
|
||
| const action: ActionDefinition<Settings, Payload, unknown, unknown, unknown, PollPayload> = { | ||
| title: 'Send Event asynchronously to Data Extension', | ||
| description: `Upsert event records asynchronously as rows into a data extension in Salesforce Marketing Cloud. | ||
|
|
||
|
|
@@ -54,6 +59,14 @@ const action: ActionDefinition<Settings, Payload> = { | |
| } | ||
| } | ||
| }, | ||
| pollFields: { | ||
| operationId: { | ||
| label: 'Operation ID', | ||
| description: 'The ID of the asynchronous operation to poll.', | ||
| type: 'string', | ||
| required: true | ||
| } | ||
| }, | ||
| hooks: { | ||
| retlOnMappingSave: { | ||
| ...dataExtensionHook | ||
|
|
@@ -81,6 +94,56 @@ const action: ActionDefinition<Settings, Payload> = { | |
| throw new IntegrationError('No Data Extension Connected', 'INVALID_CONFIGURATION', 400) | ||
| } | ||
| return asyncUpsertRowsV2(request, settings.subdomain, payload, dataExtensionId) | ||
| }, | ||
|
|
||
| pollStatus: async (request, { settings, payload }) => { | ||
| // Get the operation ID from the payload | ||
| const operationId = payload.operationId | ||
|
|
||
| if (!operationId) { | ||
| throw new IntegrationError('Operation ID is required for polling.', 'INVALID_REQUEST', 400) | ||
| } | ||
|
|
||
| // Poll the SFMC async operation status | ||
| const pollResult = await pollAsyncOperation(request, settings.subdomain, operationId) | ||
|
|
||
| // Map SFMC status to framework status | ||
| // Check both requestStatus (Complete/InProcess/etc) and resultStatus (OK/Error) | ||
| let status: 'pending' | 'completed' | 'failed' | ||
| const sfmcStatus = pollResult.results as any | ||
|
|
||
| if (pollResult.status === 'Complete') { | ||
| // Request finished - check if it succeeded or had errors | ||
| if (sfmcStatus?.hasErrors || sfmcStatus?.resultStatus === 'Error') { | ||
| status = 'failed' | ||
| } else { | ||
| status = 'completed' | ||
| } | ||
| } else if (pollResult.status === 'Failed' || pollResult.status === 'Error') { | ||
| status = 'failed' | ||
| } else { | ||
| // InProcess, Queued, etc. | ||
| status = 'pending' | ||
| } | ||
|
Comment on lines
+115
to
+127
|
||
|
|
||
| const results = pollResult.results || {} | ||
| const context: JSONLikeObject = { | ||
| operationId: pollResult.operationId, | ||
| completedAt: pollResult.completedAt | ||
| } | ||
|
|
||
| return { | ||
| results: [ | ||
| { | ||
| status: status, | ||
| message: pollResult.errorMessage || `Operation ${operationId} is ${pollResult.status}`, | ||
| result: results as JSONLikeObject, | ||
| context: context | ||
| } | ||
| ], | ||
| overallStatus: status, | ||
| message: pollResult.errorMessage || `Async operation ${status}` | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Core change adds a new polling execution path (
executePoll) but there are no accompanying unit tests. Add tests covering: poll schema validation (missing required fields), successfulpollStatusinvocation, and error cases (action withoutpollStatus, missingpollFields). This helps prevent regressions across all destinations.