Skip to content

Commit f4b4626

Browse files
authored
add proxy tests for the Kinesis service (#117)
1 parent 60d8366 commit f4b4626

1 file changed

Lines changed: 251 additions & 0 deletions

File tree

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
# Note/disclosure: This file has been (partially or fully) generated by an AI agent.
2+
import boto3
3+
import pytest
4+
from botocore.exceptions import ClientError
5+
from localstack.aws.connect import connect_to
6+
from localstack.utils.strings import short_uid
7+
from localstack.utils.sync import retry
8+
9+
from aws_proxy.shared.models import ProxyConfig
10+
11+
12+
def test_kinesis_requests(start_aws_proxy, cleanups):
13+
stream_name_aws = f"test-stream-aws-{short_uid()}"
14+
stream_name_local = f"test-stream-local-{short_uid()}"
15+
16+
# start proxy - only forwarding requests for stream name matching `test-stream-aws-*`
17+
config = ProxyConfig(services={"kinesis": {"resources": f".*:{stream_name_aws}"}})
18+
start_aws_proxy(config)
19+
20+
# create clients
21+
region_name = "us-east-1"
22+
kinesis_client = connect_to(region_name=region_name).kinesis
23+
kinesis_client_aws = boto3.client("kinesis", region_name=region_name)
24+
25+
# create stream in AWS
26+
kinesis_client_aws.create_stream(StreamName=stream_name_aws, ShardCount=1)
27+
cleanups.append(
28+
lambda: kinesis_client_aws.delete_stream(
29+
StreamName=stream_name_aws, EnforceConsumerDeletion=True
30+
)
31+
)
32+
33+
# wait for stream to become active
34+
def _wait_for_stream_active():
35+
response = kinesis_client_aws.describe_stream(StreamName=stream_name_aws)
36+
if response["StreamDescription"]["StreamStatus"] != "ACTIVE":
37+
raise AssertionError("Stream not active yet")
38+
39+
retry(_wait_for_stream_active, retries=30, sleep=2)
40+
41+
# assert that local call for this stream is proxied
42+
stream_local = kinesis_client.describe_stream(StreamName=stream_name_aws)
43+
stream_aws = kinesis_client_aws.describe_stream(StreamName=stream_name_aws)
44+
assert (
45+
stream_local["StreamDescription"]["StreamName"]
46+
== stream_aws["StreamDescription"]["StreamName"]
47+
)
48+
assert (
49+
stream_local["StreamDescription"]["StreamARN"]
50+
== stream_aws["StreamDescription"]["StreamARN"]
51+
)
52+
53+
# verify that requesting a non-existent stream with LocalStack client
54+
# does not create it in AWS (negative test)
55+
with pytest.raises(ClientError) as ctx:
56+
kinesis_client_aws.describe_stream(StreamName=stream_name_local)
57+
assert ctx.value.response["Error"]["Code"] == "ResourceNotFoundException"
58+
59+
# put record to AWS stream, get it back locally
60+
kinesis_client_aws.put_record(
61+
StreamName=stream_name_aws, Data=b"test data 1", PartitionKey="partition-1"
62+
)
63+
64+
# get shard iterator
65+
shards = kinesis_client.describe_stream(StreamName=stream_name_aws)[
66+
"StreamDescription"
67+
]["Shards"]
68+
shard_id = shards[0]["ShardId"]
69+
shard_iterator_response = kinesis_client.get_shard_iterator(
70+
StreamName=stream_name_aws,
71+
ShardId=shard_id,
72+
ShardIteratorType="TRIM_HORIZON",
73+
)
74+
shard_iterator = shard_iterator_response["ShardIterator"]
75+
76+
# get records
77+
records_response = kinesis_client.get_records(ShardIterator=shard_iterator)
78+
records = records_response["Records"]
79+
assert len(records) == 1
80+
assert records[0]["Data"] == b"test data 1"
81+
assert records[0]["PartitionKey"] == "partition-1"
82+
83+
# put record locally, get it back with AWS client
84+
kinesis_client.put_record(
85+
StreamName=stream_name_aws, Data=b"test data 2", PartitionKey="partition-2"
86+
)
87+
88+
# get shard iterator from AWS
89+
shard_iterator_response_aws = kinesis_client_aws.get_shard_iterator(
90+
StreamName=stream_name_aws,
91+
ShardId=shard_id,
92+
ShardIteratorType="TRIM_HORIZON",
93+
)
94+
shard_iterator_aws = shard_iterator_response_aws["ShardIterator"]
95+
96+
# get all records from AWS
97+
records_response_aws = kinesis_client_aws.get_records(
98+
ShardIterator=shard_iterator_aws
99+
)
100+
records_aws = records_response_aws["Records"]
101+
assert len(records_aws) == 2 # both records should be present
102+
assert records_aws[0]["Data"] == b"test data 1"
103+
assert records_aws[1]["Data"] == b"test data 2"
104+
105+
# test list_streams - should include proxied stream
106+
streams_local = kinesis_client.list_streams()["StreamNames"]
107+
assert stream_name_aws in streams_local
108+
109+
streams_aws = kinesis_client_aws.list_streams()["StreamNames"]
110+
assert stream_name_aws in streams_aws
111+
112+
113+
def test_kinesis_readonly_operations(start_aws_proxy, cleanups):
114+
stream_name = f"test-readonly-stream-{short_uid()}"
115+
116+
# start proxy - forwarding requests for Kinesis in read-only mode
117+
config = ProxyConfig(
118+
services={"kinesis": {"resources": [f".*:{stream_name}"], "read_only": True}}
119+
)
120+
start_aws_proxy(config)
121+
122+
# create clients
123+
kinesis_client = connect_to().kinesis
124+
kinesis_client_aws = boto3.client("kinesis")
125+
126+
# create stream in AWS (this should succeed as it's direct AWS client)
127+
kinesis_client_aws.create_stream(StreamName=stream_name, ShardCount=1)
128+
cleanups.append(
129+
lambda: kinesis_client_aws.delete_stream(
130+
StreamName=stream_name, EnforceConsumerDeletion=True
131+
)
132+
)
133+
134+
# wait for stream to become active
135+
def _wait_for_stream_active():
136+
response = kinesis_client_aws.describe_stream(StreamName=stream_name)
137+
if response["StreamDescription"]["StreamStatus"] != "ACTIVE":
138+
raise AssertionError("Stream not active yet")
139+
140+
retry(_wait_for_stream_active, retries=30, sleep=2)
141+
142+
# assert that local call for describe_stream is proxied and results are consistent
143+
stream_local = kinesis_client.describe_stream(StreamName=stream_name)
144+
stream_aws = kinesis_client_aws.describe_stream(StreamName=stream_name)
145+
assert (
146+
stream_local["StreamDescription"]["StreamName"]
147+
== stream_aws["StreamDescription"]["StreamName"]
148+
)
149+
assert (
150+
stream_local["StreamDescription"]["StreamARN"]
151+
== stream_aws["StreamDescription"]["StreamARN"]
152+
)
153+
154+
# assert that local call for list_streams is proxied
155+
streams_local = kinesis_client.list_streams()["StreamNames"]
156+
streams_aws = kinesis_client_aws.list_streams()["StreamNames"]
157+
assert stream_name in streams_local
158+
assert stream_name in streams_aws
159+
160+
# Put record to AWS stream using direct AWS client
161+
kinesis_client_aws.put_record(
162+
StreamName=stream_name, Data=b"test data aws", PartitionKey="partition-1"
163+
)
164+
165+
# Get shard iterator and verify data can be read through proxy
166+
shards = kinesis_client.describe_stream(StreamName=stream_name)[
167+
"StreamDescription"
168+
]["Shards"]
169+
shard_id = shards[0]["ShardId"]
170+
shard_iterator_response = kinesis_client.get_shard_iterator(
171+
StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
172+
)
173+
shard_iterator = shard_iterator_response["ShardIterator"]
174+
175+
# Get records - this should work in read-only mode
176+
records_response = kinesis_client.get_records(ShardIterator=shard_iterator)
177+
records = records_response["Records"]
178+
assert len(records) == 1
179+
assert records[0]["Data"] == b"test data aws"
180+
181+
# Attempt to put record using the proxied client in read-only mode
182+
# This should fail because LocalStack doesn't have the stream (it's in AWS)
183+
with pytest.raises(ClientError) as excinfo:
184+
kinesis_client.put_record(
185+
StreamName=stream_name, Data=b"should not reach AWS", PartitionKey="p1"
186+
)
187+
assert excinfo.value.response["Error"]["Code"] == "ResourceNotFoundException"
188+
189+
190+
def test_kinesis_resource_name_matching(start_aws_proxy, cleanups):
191+
stream_name_match = f"proxy-stream-{short_uid()}"
192+
stream_name_nomatch = f"local-stream-{short_uid()}"
193+
194+
# start proxy - only forwarding requests for streams starting with "proxy-"
195+
config = ProxyConfig(services={"kinesis": {"resources": ".*:proxy-.*"}})
196+
start_aws_proxy(config)
197+
198+
# create clients
199+
kinesis_client = connect_to().kinesis
200+
kinesis_client_aws = boto3.client("kinesis")
201+
202+
# create stream in AWS that matches the pattern
203+
kinesis_client_aws.create_stream(StreamName=stream_name_match, ShardCount=1)
204+
cleanups.append(
205+
lambda: kinesis_client_aws.delete_stream(
206+
StreamName=stream_name_match, EnforceConsumerDeletion=True
207+
)
208+
)
209+
210+
# wait for AWS stream to become active
211+
def _wait_for_aws_stream_active():
212+
response = kinesis_client_aws.describe_stream(StreamName=stream_name_match)
213+
if response["StreamDescription"]["StreamStatus"] != "ACTIVE":
214+
raise AssertionError("AWS stream not active yet")
215+
216+
retry(_wait_for_aws_stream_active, retries=30, sleep=2)
217+
218+
# assert that the matching stream is proxied
219+
stream_local = kinesis_client.describe_stream(StreamName=stream_name_match)
220+
stream_aws = kinesis_client_aws.describe_stream(StreamName=stream_name_match)
221+
assert (
222+
stream_local["StreamDescription"]["StreamARN"]
223+
== stream_aws["StreamDescription"]["StreamARN"]
224+
)
225+
226+
# verify that a stream name that doesn't match the pattern and doesn't exist
227+
# is not found in AWS
228+
with pytest.raises(ClientError) as ctx:
229+
kinesis_client_aws.describe_stream(StreamName=stream_name_nomatch)
230+
assert ctx.value.response["Error"]["Code"] == "ResourceNotFoundException"
231+
232+
# Put and get records through the proxied stream
233+
kinesis_client.put_record(
234+
StreamName=stream_name_match, Data=b"test data", PartitionKey="partition-1"
235+
)
236+
237+
# Get shard iterator
238+
shards = kinesis_client_aws.describe_stream(StreamName=stream_name_match)[
239+
"StreamDescription"
240+
]["Shards"]
241+
shard_id = shards[0]["ShardId"]
242+
shard_iterator_response = kinesis_client_aws.get_shard_iterator(
243+
StreamName=stream_name_match, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"
244+
)
245+
shard_iterator = shard_iterator_response["ShardIterator"]
246+
247+
# Get records from AWS - should see the record we put through LocalStack
248+
records_response = kinesis_client_aws.get_records(ShardIterator=shard_iterator)
249+
records = records_response["Records"]
250+
assert len(records) == 1
251+
assert records[0]["Data"] == b"test data"

0 commit comments

Comments
 (0)