Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/destination-actions/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"@amplitude/ua-parser-js": "^0.7.25",
"@aws-sdk/client-eventbridge": "^3.894.0",
"@aws-sdk/client-kinesis": "3.974.0",
"@aws-sdk/client-sqs": "3.974.0",
"@aws-sdk/client-s3": "^3.894.0",
"@aws-sdk/client-sts": "^3.894.0",
"@bufbuild/protobuf": "^2.2.3",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { createTestIntegration } from '@segment/actions-core'
import Definition from '../index'

const testDestination = createTestIntegration(Definition)

describe('AWS SQS', () => {
describe('destination definition', () => {
it('has the correct name', () => {
expect(Definition.name).toBe('AWS SQS')
})

it('has the correct slug', () => {
expect(Definition.slug).toBe('actions-aws-sqs')
})

it('has cloud mode', () => {
expect(Definition.mode).toBe('cloud')
})

it('has the send action', () => {
expect(Definition.actions.send).toBeDefined()
})

it('has correct authentication fields', () => {
const authFields = Definition.authentication?.fields
expect(authFields?.iamRoleArn).toBeDefined()
expect(authFields?.iamRoleArn.required).toBe(true)
expect(authFields?.iamExternalId).toBeDefined()
expect(authFields?.iamExternalId.type).toBe('password')
})
})

describe('testAuthentication', () => {
it('destination is defined', () => {
expect(testDestination).toBeDefined()
})
})
})

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions packages/destination-actions/src/destinations/aws-sqs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import type { DestinationDefinition } from '@segment/actions-core'
import type { Settings } from './generated-types'
import { DEFAULT_REQUEST_TIMEOUT } from '@segment/actions-core'
import { assumeRole } from '../../lib/AWS/sts'
import { APP_AWS_REGION } from '../../lib/AWS/utils'
import send from './send'

const destination: DestinationDefinition<Settings> = {
name: 'AWS SQS',
slug: 'actions-aws-sqs',
mode: 'cloud',
description:
'Amazon Simple Queue Service (SQS) is a fully managed message queuing service. This destination enables delivery of Segment events to SQS queues for asynchronous processing.',
authentication: {
scheme: 'custom',
fields: {
iamRoleArn: {
type: 'string',
label: 'IAM Role ARN',
description:
'The ARN of the IAM role to assume for SQS access. Format: arn:aws:iam::<account-id>:role/<role-name>. Must have sqs:SendMessage and sqs:SendMessageBatch permissions.',
required: true
},
iamExternalId: {
type: 'password',
label: 'External ID',
description:
"The external ID for cross-account role assumption. Used as a shared secret between Segment and the customer's IAM trust policy.",
required: true
}
},
testAuthentication: async (_, { settings }) => {
await assumeRole(settings.iamRoleArn, settings.iamExternalId, APP_AWS_REGION)
return true
}
},
extendRequest() {
return {
timeout: Math.max(30_000, DEFAULT_REQUEST_TIMEOUT)
}
},
actions: {
send
}
}

export default destination
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import { createTestEvent, createTestIntegration, SegmentEvent } from '@segment/actions-core'
import Definition from '../../index'
import type { Settings } from '../../generated-types'

let testDestination = createTestIntegration(Definition)

const mockSend = jest.fn()
const mockAssumeRole = jest.fn()

jest.mock('@aws-sdk/client-sqs', () => ({
SQSClient: jest.fn(() => ({
send: mockSend
})),
SendMessageBatchCommand: class {
constructor(public input: any) {}
}
}))

jest.mock('../../../../lib/AWS/sts', () => ({
assumeRole: (...args: unknown[]) => mockAssumeRole(...args)
}))

const settings: Settings = {
iamRoleArn: 'arn:aws:iam::123456789012:role/test-role',
iamExternalId: 'test-external-id'
}

const mapping = {
payload: { '@path': '$.' },
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/test-queue',
awsRegion: 'us-east-1',
messageDeduplicationId: { '@path': '$.messageId' },
enable_batching: true,
batch_size: 10
}

const basePayload: Partial<SegmentEvent> = {
userId: 'user-123',
anonymousId: 'anon-456',
event: 'Test Event',
type: 'track',
timestamp: '2026-04-06T10:00:00.000Z',
properties: {
product_id: 'prod-789',
price: 99.99
},
messageId: 'msg-abc-123'
}

describe('AWS SQS Send', () => {
beforeEach(() => {
testDestination = createTestIntegration(Definition)
jest.clearAllMocks()
mockAssumeRole.mockResolvedValue({
accessKeyId: 'AKIA...',
secretAccessKey: 'SECRET...',
sessionToken: 'TOKEN...'
})
})

describe('successful send', () => {
it('sends a single event with correct payload', async () => {
mockSend.mockResolvedValueOnce({
Successful: [{ Id: '0', MessageId: 'sqs-msg-001', MD5OfMessageBody: 'abc123' }],
Failed: []
})

const event = createTestEvent(basePayload)

const response = await testDestination.testAction('send', {
event,
settings,
useDefaultMappings: true,
mapping
})

expect(response).toBeDefined()
expect(mockAssumeRole).toHaveBeenCalledWith(
'arn:aws:iam::123456789012:role/test-role',
'test-external-id',
'us-west-2'
)
expect(mockSend).toHaveBeenCalledTimes(1)

const command = mockSend.mock.calls[0][0]
expect(command.input.QueueUrl).toBe('https://sqs.us-east-1.amazonaws.com/123456789012/test-queue')
expect(command.input.Entries).toHaveLength(1)
expect(command.input.Entries[0].Id).toBe('0')

const messageBody = JSON.parse(command.input.Entries[0].MessageBody)
expect(messageBody.userId).toBe('user-123')
expect(messageBody.event).toBe('Test Event')
expect(messageBody.properties.product_id).toBe('prod-789')
})

it('sends a batch of events', async () => {
mockSend.mockResolvedValueOnce({
Successful: [
{ Id: '0', MessageId: 'sqs-msg-001', MD5OfMessageBody: 'abc123' },
{ Id: '1', MessageId: 'sqs-msg-002', MD5OfMessageBody: 'def456' }
],
Failed: []
})

const events = [
createTestEvent({ ...basePayload, messageId: 'msg-1' }),
createTestEvent({ ...basePayload, messageId: 'msg-2', event: 'Second Event' })
]

const response = await testDestination.testBatchAction('send', {
events,
settings,
useDefaultMappings: true,
mapping
})

expect(response).toBeDefined()
expect(mockSend).toHaveBeenCalledTimes(1)

const command = mockSend.mock.calls[0][0]
expect(command.input.Entries).toHaveLength(2)
expect(command.input.Entries[0].Id).toBe('0')
expect(command.input.Entries[1].Id).toBe('1')
})
})

describe('FIFO queue support', () => {
it('includes messageGroupId when provided', async () => {
mockSend.mockResolvedValueOnce({
Successful: [{ Id: '0', MessageId: 'sqs-msg-001', MD5OfMessageBody: 'abc123' }],
Failed: []
})

const event = createTestEvent(basePayload)

await testDestination.testAction('send', {
event,
settings,
useDefaultMappings: true,
mapping: {
...mapping,
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/test-queue.fifo',
messageGroupId: 'user-123'
}
})

const command = mockSend.mock.calls[0][0]
expect(command.input.Entries[0].MessageGroupId).toBe('user-123')
})
})

describe('error handling', () => {
it('handles partial batch failure with MultiStatusResponse', async () => {
mockSend.mockResolvedValueOnce({
Successful: [{ Id: '0', MessageId: 'sqs-msg-001', MD5OfMessageBody: 'abc123' }],
Failed: [{ Id: '1', Code: 'InternalError', Message: 'Internal service error', SenderFault: false }]
})

const events = [
createTestEvent({ ...basePayload, messageId: 'msg-1' }),
createTestEvent({ ...basePayload, messageId: 'msg-2' })
]

const response = await testDestination.testBatchAction('send', {
events,
settings,
useDefaultMappings: true,
mapping
})

expect(response).toBeDefined()
expect(mockSend).toHaveBeenCalledTimes(1)
})

it('throws RetryableError on RequestThrottled', async () => {
const throttleError = new Error('Rate exceeded')
throttleError.name = 'RequestThrottled'
mockSend.mockRejectedValueOnce(throttleError)

const event = createTestEvent(basePayload)

await expect(
testDestination.testAction('send', {
event,
settings,
useDefaultMappings: true,
mapping
})
).rejects.toThrowError(/Retryable error RequestThrottled/)
})

it('throws IntegrationError on QueueDoesNotExist', async () => {
const notFoundError = new Error('Queue not found')
notFoundError.name = 'QueueDoesNotExist'
mockSend.mockRejectedValueOnce(notFoundError)

const event = createTestEvent(basePayload)

await expect(
testDestination.testAction('send', {
event,
settings,
useDefaultMappings: true,
mapping
})
).rejects.toThrowError(/Non-retryable error QueueDoesNotExist/)
})

it('throws IntegrationError on InvalidSecurity', async () => {
const securityError = new Error('Invalid security token')
securityError.name = 'InvalidSecurity'
mockSend.mockRejectedValueOnce(securityError)

const event = createTestEvent(basePayload)

await expect(
testDestination.testAction('send', {
event,
settings,
useDefaultMappings: true,
mapping
})
).rejects.toThrowError(/Non-retryable error InvalidSecurity/)
})
})
})

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading