Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions fixtures/notifications/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,22 @@ stepFunctions:
SUCCEEDED:
- sqs:
Fn::GetAtt: [NotificationQueue, Arn]

# Second state machine sharing the same SNS topic and SQS queue (fan-in).
# Reproduces issue #275: without the fix, this machine's TopicPolicy and
# QueuePolicy overwrite the first machine's, leaving it unable to publish.
notificationMachine2:
name: integration-notifications-2-${opt:stage, 'test'}
definition:
StartAt: PassThrough
States:
PassThrough:
Type: Pass
End: true
notifications:
FAILED:
- sns:
Ref: NotificationTopic
SUCCEEDED:
- sqs:
Fn::GetAtt: [NotificationQueue, Arn]
74 changes: 74 additions & 0 deletions fixtures/notifications/verify.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
'use strict';

const fs = require('node:fs');
const path = require('node:path');
const expect = require('chai').expect;

const templatePath = path.join(__dirname, '.serverless', 'cloudformation-template-update-stack.json');

describe('notifications fixture — CloudFormation template', () => {
let resources;

before(() => {
const template = JSON.parse(fs.readFileSync(templatePath, 'utf8'));
resources = template.Resources;
});

it('should produce exactly one AWS::SNS::TopicPolicy per unique topic', () => {
const snsPolicies = Object.values(resources).filter(
(r) => r.Type === 'AWS::SNS::TopicPolicy',
);

const topicKeys = snsPolicies.map((p) => JSON.stringify(p.Properties.Topics));
const uniqueTopics = new Set(topicKeys);

expect(snsPolicies.length).to.equal(
uniqueTopics.size,
'Multiple AWS::SNS::TopicPolicy resources for the same topic — the second would '
+ 'overwrite the first in CloudFormation, silently breaking the first machine\'s notifications',
);
});

it('should produce exactly one AWS::SQS::QueuePolicy per unique queue', () => {
const sqsPolicies = Object.values(resources).filter(
(r) => r.Type === 'AWS::SQS::QueuePolicy',
);

const queueKeys = sqsPolicies.map((p) => JSON.stringify(p.Properties.Queues));
const uniqueQueues = new Set(queueKeys);

expect(sqsPolicies.length).to.equal(
uniqueQueues.size,
'Multiple AWS::SQS::QueuePolicy resources for the same queue — the second would '
+ 'overwrite the first in CloudFormation, silently breaking the first machine\'s notifications',
);
});

it('should include statements from all state machines in the merged SNS topic policy', () => {
const snsPolicies = Object.values(resources).filter(
(r) => r.Type === 'AWS::SNS::TopicPolicy',
);

for (const policy of snsPolicies) {
const statements = [].concat(policy.Properties.PolicyDocument.Statement);
expect(statements.length).to.be.greaterThan(
1,
'SNS topic policy should have statements from both notificationMachine and notificationMachine2',
);
}
});

it('should include statements from all state machines in the merged SQS queue policy', () => {
const sqsPolicies = Object.values(resources).filter(
(r) => r.Type === 'AWS::SQS::QueuePolicy',
);

for (const policy of sqsPolicies) {
const statements = [].concat(policy.Properties.PolicyDocument.Statement);
expect(statements.length).to.be.greaterThan(
1,
'SQS queue policy should have statements from both notificationMachine and notificationMachine2',
);
}
});
});
50 changes: 49 additions & 1 deletion lib/deploy/stepFunctions/compileNotifications.js
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,54 @@ function validateConfig(serverless, stateMachineName, stateMachineObj, notificat
return true;
}

// Merge AWS::SNS::TopicPolicy and AWS::SQS::QueuePolicy resources that target
// the same topic/queue into a single resource with combined statements.
//
// CloudFormation treats these resource types as full policy replacements: if two
// separate resources target the same topic/queue, the second one overwrites the
// first. When multiple state machines share a notification topic/queue (fan-in),
// this destroys the first machine's permissions. Merging into one resource with
// all statements ensures CloudFormation sets the policy exactly once, correctly.
function mergeResourcePolicies(resourcePairs) {
const snsMap = new Map(); // JSON-serialised Topics array → { logicalId, resource }
const sqsMap = new Map(); // JSON-serialised Queues array → { logicalId, resource }
const result = {};

for (const [logicalId, resource] of resourcePairs) {
if (resource.Type === 'AWS::SNS::TopicPolicy') {
const key = JSON.stringify(resource.Properties.Topics);
if (snsMap.has(key)) {
const incoming = [].concat(resource.Properties.PolicyDocument.Statement);
snsMap.get(key).resource.Properties.PolicyDocument.Statement.push(...incoming);
} else {
const merged = _.cloneDeep(resource);
merged.Properties.PolicyDocument.Statement = [].concat(
merged.Properties.PolicyDocument.Statement,
);
snsMap.set(key, { logicalId, resource: merged });
result[logicalId] = merged;
}
} else if (resource.Type === 'AWS::SQS::QueuePolicy') {
const key = JSON.stringify(resource.Properties.Queues);
if (sqsMap.has(key)) {
const incoming = [].concat(resource.Properties.PolicyDocument.Statement);
sqsMap.get(key).resource.Properties.PolicyDocument.Statement.push(...incoming);
} else {
const merged = _.cloneDeep(resource);
merged.Properties.PolicyDocument.Statement = [].concat(
merged.Properties.PolicyDocument.Statement,
);
sqsMap.set(key, { logicalId, resource: merged });
result[logicalId] = merged;
}
} else {
result[logicalId] = resource;
}
}

return result;
}

module.exports = {
compileNotifications() {
logger.config(this.serverless, this.v3Api);
Expand All @@ -412,7 +460,7 @@ module.exports = {

return Array.from(resourcesIterator);
});
const newResources = _.fromPairs(newResourcePairs);
const newResources = mergeResourcePolicies(newResourcePairs);

_.merge(
this.serverless.service.provider.compiledCloudFormationTemplate.Resources,
Expand Down
75 changes: 75 additions & 0 deletions lib/deploy/stepFunctions/compileNotifications.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -623,4 +623,79 @@ describe('#compileNotifications', () => {

expect(resources.Beta1NotificationsIamRole.Properties.Path).to.equal('/teamA/');
});

it('should produce a single SNS topic policy when multiple state machines share the same topic', () => {
// Reproduces issue #275: each state machine generates its own AWS::SNS::TopicPolicy
// for the shared topic. CloudFormation applies them sequentially — the second
// overwrites the first, leaving the first machine's notifications unauthorised.
// The fix merges policies for the same topic into a single resource with combined
// statements so CloudFormation only sets the policy once, correctly.
serverless.service.stepFunctions = {
stateMachines: {
machine1: genStateMachineWithTargets('Machine1', [{ sns: 'arn:aws:sns:us-east-1:123:shared-topic' }]),
machine2: genStateMachineWithTargets('Machine2', [{ sns: 'arn:aws:sns:us-east-1:123:shared-topic' }]),
},
};

serverlessStepFunctions.compileNotifications();
const resources = serverlessStepFunctions.serverless.service
.provider.compiledCloudFormationTemplate.Resources;

const snsPolicies = _.values(resources).filter((r) => r.Type === 'AWS::SNS::TopicPolicy');
const policiesForSharedTopic = snsPolicies.filter((p) => _.isEqual(
p.Properties.Topics[0],
'arn:aws:sns:us-east-1:123:shared-topic',
));

expect(policiesForSharedTopic).to.have.lengthOf(1);

// Both machines × 5 statuses = 10 statements in the merged policy
const statements = [].concat(policiesForSharedTopic[0].Properties.PolicyDocument.Statement);
expect(statements.length).to.equal(10);
});

it('should produce a single SQS queue policy when multiple state machines share the same queue', () => {
serverless.service.stepFunctions = {
stateMachines: {
machine1: genStateMachineWithTargets('Machine1', [{ sqs: 'arn:aws:sqs:us-east-1:123:shared-queue' }]),
machine2: genStateMachineWithTargets('Machine2', [{ sqs: 'arn:aws:sqs:us-east-1:123:shared-queue' }]),
},
};

serverlessStepFunctions.compileNotifications();
const resources = serverlessStepFunctions.serverless.service
.provider.compiledCloudFormationTemplate.Resources;

const sqsPolicies = _.values(resources).filter((r) => r.Type === 'AWS::SQS::QueuePolicy');
expect(sqsPolicies).to.have.lengthOf(1);

const statements = [].concat(sqsPolicies[0].Properties.PolicyDocument.Statement);
expect(statements.length).to.equal(10);
});

it('should merge SNS policies when same topic is used for multiple statuses in one state machine', () => {
serverless.service.stepFunctions = {
stateMachines: {
machine1: {
id: 'Machine1',
name: 'Machine1',
definition: { StartAt: 'A', States: { A: { Type: 'Pass', End: true } } },
notifications: {
SUCCEEDED: [{ sns: 'arn:aws:sns:us-east-1:123:shared-topic' }],
FAILED: [{ sns: 'arn:aws:sns:us-east-1:123:shared-topic' }],
},
},
},
};

serverlessStepFunctions.compileNotifications();
const resources = serverlessStepFunctions.serverless.service
.provider.compiledCloudFormationTemplate.Resources;

const snsPolicies = _.values(resources).filter((r) => r.Type === 'AWS::SNS::TopicPolicy');
expect(snsPolicies).to.have.lengthOf(1);

const statements = [].concat(snsPolicies[0].Properties.PolicyDocument.Statement);
expect(statements.length).to.equal(2);
});
});
Loading