-
Notifications
You must be signed in to change notification settings - Fork 16
Add proxy tests for the SNS service #112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Changes from 2 commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,162 @@ | ||
| # Note/disclosure: This file has been (partially or fully) generated by an AI agent. | ||
| import json | ||
| import boto3 | ||
| import pytest | ||
| from botocore.exceptions import ClientError | ||
| from localstack.aws.connect import connect_to | ||
| from localstack.utils.strings import short_uid | ||
| from localstack.utils.sync import retry | ||
|
|
||
|
|
||
| def test_sns_requests(start_aws_proxy, cleanups): | ||
| topic_name = f"test-topic-{short_uid()}" | ||
| queue_name_sub = f"test-queue-sub-{short_uid()}" | ||
|
|
||
| # start proxy - forwarding requests for both SQS and SNS | ||
| config = { | ||
| "services": { | ||
| "sns": {"resources": [f".*:{topic_name}"]}, | ||
| "sqs": {"resources": [f".*:{queue_name_sub}"]}, | ||
| } | ||
| } | ||
| start_aws_proxy(config) | ||
|
|
||
| # create clients | ||
| sns_client = connect_to().sns | ||
| sns_client_aws = boto3.client("sns") | ||
| sqs_client_aws = boto3.client("sqs") | ||
|
|
||
| # create topic in AWS | ||
| topic_arn_aws = sns_client_aws.create_topic(Name=topic_name)["TopicArn"] | ||
| cleanups.append(lambda: sns_client_aws.delete_topic(TopicArn=topic_arn_aws)) | ||
|
|
||
| # assert that local call for this topic is proxied | ||
| topic_aws = sns_client_aws.get_topic_attributes(TopicArn=topic_arn_aws) | ||
| topic_local = sns_client.get_topic_attributes(TopicArn=topic_arn_aws) | ||
| assert topic_local["Attributes"] == topic_aws["Attributes"] | ||
|
|
||
| # publish message to AWS, receive locally (requires subscription) | ||
| queue_url_sub_aws = sqs_client_aws.create_queue(QueueName=queue_name_sub)[ | ||
| "QueueUrl" | ||
| ] | ||
| cleanups.append(lambda: sqs_client_aws.delete_queue(QueueUrl=queue_url_sub_aws)) | ||
| queue_arn_sub = sqs_client_aws.get_queue_attributes( | ||
| QueueUrl=queue_url_sub_aws, AttributeNames=["QueueArn"] | ||
| )["Attributes"]["QueueArn"] | ||
|
|
||
| # grant SNS permission to send messages to SQS queue | ||
| policy = { | ||
| "Statement": [ | ||
| { | ||
| "Effect": "Allow", | ||
| "Principal": {"Service": "sns.amazonaws.com"}, | ||
| "Action": "sqs:SendMessage", | ||
| "Resource": queue_arn_sub, | ||
| "Condition": {"ArnEquals": {"aws:SourceArn": topic_arn_aws}}, | ||
| } | ||
| ] | ||
| } | ||
| sqs_client_aws.set_queue_attributes( | ||
| QueueUrl=queue_url_sub_aws, Attributes={"Policy": json.dumps(policy)} | ||
| ) | ||
|
|
||
| sub_arn = sns_client_aws.subscribe( | ||
| TopicArn=topic_arn_aws, Protocol="sqs", Endpoint=queue_arn_sub | ||
| )["SubscriptionArn"] | ||
| if "pending confirmation" not in sub_arn: | ||
| cleanups.append(lambda: sns_client_aws.unsubscribe(SubscriptionArn=sub_arn)) | ||
|
|
||
| # wait for subscription to be confirmed | ||
| def _wait_for_subscription_confirmation(): | ||
| nonlocal sub_arn | ||
| subs = sns_client_aws.list_subscriptions_by_topic(TopicArn=topic_arn_aws)[ | ||
| "Subscriptions" | ||
| ] | ||
| if not (subs and subs[0]["SubscriptionArn"] != "PendingConfirmation"): | ||
| raise AssertionError("Subscription not confirmed yet") | ||
| sub_arn = subs[0]["SubscriptionArn"] | ||
|
|
||
| retry(_wait_for_subscription_confirmation, retries=10, sleep=1) | ||
|
|
||
| # publish message to AWS | ||
| sns_client_aws.publish(TopicArn=topic_arn_aws, Message="message 1") | ||
|
|
||
| # receive message from local queue | ||
| received = sqs_client_aws.receive_message( | ||
| QueueUrl=queue_url_sub_aws, WaitTimeSeconds=5 | ||
| ).get("Messages", []) | ||
| assert len(received) == 1 | ||
| assert "message 1" in received[0]["Body"] | ||
| sqs_client_aws.delete_message( | ||
| QueueUrl=queue_url_sub_aws, ReceiptHandle=received[0]["ReceiptHandle"] | ||
| ) | ||
|
|
||
| # publish message locally, receive with AWS client | ||
| sns_client.publish(TopicArn=topic_arn_aws, Message="message 2") | ||
| received = sqs_client_aws.receive_message( | ||
| QueueUrl=queue_url_sub_aws, WaitTimeSeconds=5 | ||
| ).get("Messages", []) | ||
| assert len(received) == 1 | ||
| assert "message 2" in received[0]["Body"] | ||
|
|
||
|
|
||
| def test_sns_readonly_operations(start_aws_proxy, cleanups): | ||
| topic_name = f"test-readonly-topic-{short_uid()}" | ||
|
|
||
| # start proxy - forwarding requests for SNS in read-only mode | ||
| config = { | ||
| "services": { | ||
| "sns": {"resources": [f".*:{topic_name}"], "read_only": True}, | ||
| } | ||
| } | ||
| start_aws_proxy(config) | ||
|
|
||
| # create clients | ||
| sns_client = connect_to().sns | ||
| sns_client_aws = boto3.client("sns") | ||
|
|
||
| # create topic in AWS (this should succeed as it's direct AWS client) | ||
| topic_arn_aws = sns_client_aws.create_topic(Name=topic_name)["TopicArn"] | ||
| cleanups.append(lambda: sns_client_aws.delete_topic(TopicArn=topic_arn_aws)) | ||
|
|
||
| # assert that local call for list_topics is proxied and results are consistent | ||
| topics_local = sns_client.list_topics()["Topics"] | ||
| topics_aws = sns_client_aws.list_topics()["Topics"] | ||
|
|
||
| # filter for our specific topic to ensure consistency | ||
| topic_local_filtered = [t for t in topics_local if t["TopicArn"] == topic_arn_aws] | ||
| topic_aws_filtered = [t for t in topics_aws if t["TopicArn"] == topic_arn_aws] | ||
|
|
||
| # assert that results are equal | ||
| assert topic_local_filtered == topic_aws_filtered | ||
|
|
||
| # assert that local call for get_topic_attributes is proxied and results are consistent | ||
| attributes_local = sns_client.get_topic_attributes(TopicArn=topic_arn_aws)[ | ||
| "Attributes" | ||
| ] | ||
| attributes_aws = sns_client_aws.get_topic_attributes(TopicArn=topic_arn_aws)[ | ||
| "Attributes" | ||
| ] | ||
| assert attributes_local == attributes_aws | ||
|
|
||
| # Negative test cases: Attempt write operations with proxied client and assert they do not affect real AWS | ||
| # Create a new topic using the proxied client (this should succeed as LocalStack processes it) | ||
| new_topic_name = f"no-proxy-topic-{short_uid()}" | ||
| new_topic_arn_local = sns_client.create_topic(Name=new_topic_name)["TopicArn"] | ||
| cleanups.append( | ||
| lambda: sns_client.delete_topic(TopicArn=new_topic_arn_local) | ||
| ) # Cleanup localstack resource | ||
|
|
||
| # Verify that this new topic does NOT exist in real AWS | ||
| topics_aws_after_create = sns_client_aws.list_topics()["Topics"] | ||
| assert not any( | ||
| t for t in topics_aws_after_create if t["TopicArn"] == new_topic_arn_local | ||
| ) | ||
|
|
||
| # Attempt to publish a message using the proxied client, expecting it to fail from LocalStack's perspective | ||
| with pytest.raises(ClientError) as excinfo: | ||
| sns_client.publish( | ||
| TopicArn=topic_arn_aws, Message="this should not reach real AWS" | ||
| ) | ||
| assert "NotFound" in str(excinfo.value) | ||
| assert "Topic does not exist" in str(excinfo.value) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.