|
| 1 | +# Note/disclosure: This file has been (partially or fully) generated by an AI agent. |
| 2 | +import boto3 |
| 3 | +import pytest |
| 4 | +from botocore.exceptions import ClientError |
| 5 | +from localstack.aws.connect import connect_to |
| 6 | +from localstack.utils.strings import short_uid |
| 7 | + |
| 8 | + |
| 9 | +def test_kms_key_operations(start_aws_proxy, cleanups): |
| 10 | + """Test basic KMS key operations with proxy.""" |
| 11 | + key_description = f"test-key-{short_uid()}" |
| 12 | + |
| 13 | + # start proxy - forwarding requests for KMS keys with specific alias pattern |
| 14 | + config = { |
| 15 | + "services": { |
| 16 | + "kms": {"resources": [".*:key/.*"]}, |
| 17 | + } |
| 18 | + } |
| 19 | + start_aws_proxy(config) |
| 20 | + |
| 21 | + # create clients |
| 22 | + kms_client = connect_to().kms |
| 23 | + kms_client_aws = boto3.client("kms") |
| 24 | + |
| 25 | + # create key in AWS |
| 26 | + create_response = kms_client_aws.create_key(Description=key_description) |
| 27 | + key_id_aws = create_response["KeyMetadata"]["KeyId"] |
| 28 | + key_arn_aws = create_response["KeyMetadata"]["Arn"] |
| 29 | + cleanups.append( |
| 30 | + lambda: kms_client_aws.schedule_key_deletion( |
| 31 | + KeyId=key_id_aws, PendingWindowInDays=7 |
| 32 | + ) |
| 33 | + ) |
| 34 | + |
| 35 | + # assert that local call for this key is proxied |
| 36 | + key_aws = kms_client_aws.describe_key(KeyId=key_id_aws) |
| 37 | + key_local = kms_client.describe_key(KeyId=key_id_aws) |
| 38 | + assert key_local["KeyMetadata"]["KeyId"] == key_aws["KeyMetadata"]["KeyId"] |
| 39 | + assert key_local["KeyMetadata"]["Arn"] == key_arn_aws |
| 40 | + |
| 41 | + # test encryption with AWS client |
| 42 | + plaintext = b"test message" |
| 43 | + encrypt_response_aws = kms_client_aws.encrypt(KeyId=key_id_aws, Plaintext=plaintext) |
| 44 | + ciphertext = encrypt_response_aws["CiphertextBlob"] |
| 45 | + |
| 46 | + # decrypt with local client (proxied to AWS) |
| 47 | + decrypt_response_local = kms_client.decrypt(CiphertextBlob=ciphertext) |
| 48 | + assert decrypt_response_local["Plaintext"] == plaintext |
| 49 | + assert decrypt_response_local["KeyId"] == key_arn_aws |
| 50 | + |
| 51 | + # encrypt with local client (proxied to AWS) |
| 52 | + plaintext2 = b"another test message" |
| 53 | + encrypt_response_local = kms_client.encrypt(KeyId=key_id_aws, Plaintext=plaintext2) |
| 54 | + ciphertext2 = encrypt_response_local["CiphertextBlob"] |
| 55 | + |
| 56 | + # decrypt with AWS client |
| 57 | + decrypt_response_aws = kms_client_aws.decrypt(CiphertextBlob=ciphertext2) |
| 58 | + assert decrypt_response_aws["Plaintext"] == plaintext2 |
| 59 | + |
| 60 | + |
| 61 | +def test_kms_key_alias_operations(start_aws_proxy, cleanups): |
| 62 | + """Test KMS key alias operations with proxy.""" |
| 63 | + alias_name = f"alias/test-alias-{short_uid()}" |
| 64 | + key_description = f"test-key-with-alias-{short_uid()}" |
| 65 | + |
| 66 | + # start proxy - forwarding requests for KMS |
| 67 | + config = { |
| 68 | + "services": { |
| 69 | + "kms": {"resources": [".*:key/.*", f".*:{alias_name}"]}, |
| 70 | + } |
| 71 | + } |
| 72 | + start_aws_proxy(config) |
| 73 | + |
| 74 | + # create clients |
| 75 | + kms_client = connect_to().kms |
| 76 | + kms_client_aws = boto3.client("kms") |
| 77 | + |
| 78 | + # create key in AWS |
| 79 | + create_response = kms_client_aws.create_key(Description=key_description) |
| 80 | + key_id_aws = create_response["KeyMetadata"]["KeyId"] |
| 81 | + cleanups.append( |
| 82 | + lambda: kms_client_aws.schedule_key_deletion( |
| 83 | + KeyId=key_id_aws, PendingWindowInDays=7 |
| 84 | + ) |
| 85 | + ) |
| 86 | + |
| 87 | + # create alias in AWS |
| 88 | + kms_client_aws.create_alias(AliasName=alias_name, TargetKeyId=key_id_aws) |
| 89 | + cleanups.append(lambda: kms_client_aws.delete_alias(AliasName=alias_name)) |
| 90 | + |
| 91 | + # assert that local call for alias operations is proxied |
| 92 | + aliases_aws = kms_client_aws.list_aliases(KeyId=key_id_aws)["Aliases"] |
| 93 | + aliases_local = kms_client.list_aliases(KeyId=key_id_aws)["Aliases"] |
| 94 | + |
| 95 | + # filter for our specific alias |
| 96 | + alias_aws = [a for a in aliases_aws if a["AliasName"] == alias_name][0] |
| 97 | + alias_local = [a for a in aliases_local if a["AliasName"] == alias_name][0] |
| 98 | + |
| 99 | + assert alias_local["AliasName"] == alias_aws["AliasName"] |
| 100 | + assert alias_local["TargetKeyId"] == alias_aws["TargetKeyId"] |
| 101 | + |
| 102 | + # test encryption with alias via local client |
| 103 | + plaintext = b"test with alias" |
| 104 | + encrypt_response_local = kms_client.encrypt(KeyId=alias_name, Plaintext=plaintext) |
| 105 | + ciphertext = encrypt_response_local["CiphertextBlob"] |
| 106 | + |
| 107 | + # decrypt with AWS client |
| 108 | + decrypt_response_aws = kms_client_aws.decrypt(CiphertextBlob=ciphertext) |
| 109 | + assert decrypt_response_aws["Plaintext"] == plaintext |
| 110 | + |
| 111 | + |
| 112 | +def test_kms_readonly_operations(start_aws_proxy, cleanups): |
| 113 | + """Test KMS operations in read-only proxy mode.""" |
| 114 | + key_description = f"test-readonly-key-{short_uid()}" |
| 115 | + |
| 116 | + # start proxy - forwarding requests for KMS in read-only mode |
| 117 | + config = { |
| 118 | + "services": { |
| 119 | + "kms": {"resources": [".*:key/.*"], "read_only": True}, |
| 120 | + } |
| 121 | + } |
| 122 | + start_aws_proxy(config) |
| 123 | + |
| 124 | + # create clients |
| 125 | + kms_client = connect_to().kms |
| 126 | + kms_client_aws = boto3.client("kms") |
| 127 | + |
| 128 | + # create key in AWS (this should succeed as it's direct AWS client) |
| 129 | + create_response = kms_client_aws.create_key(Description=key_description) |
| 130 | + key_id_aws = create_response["KeyMetadata"]["KeyId"] |
| 131 | + cleanups.append( |
| 132 | + lambda: kms_client_aws.schedule_key_deletion( |
| 133 | + KeyId=key_id_aws, PendingWindowInDays=7 |
| 134 | + ) |
| 135 | + ) |
| 136 | + |
| 137 | + # assert that local call for describe_key is proxied and results are consistent |
| 138 | + key_aws = kms_client_aws.describe_key(KeyId=key_id_aws) |
| 139 | + key_local = kms_client.describe_key(KeyId=key_id_aws) |
| 140 | + assert key_local["KeyMetadata"]["KeyId"] == key_aws["KeyMetadata"]["KeyId"] |
| 141 | + |
| 142 | + # assert that local call for list_keys is proxied |
| 143 | + keys_local = kms_client.list_keys()["Keys"] |
| 144 | + keys_aws = kms_client_aws.list_keys()["Keys"] |
| 145 | + |
| 146 | + # filter for our specific key |
| 147 | + key_local_filtered = [k for k in keys_local if k["KeyId"] == key_id_aws] |
| 148 | + key_aws_filtered = [k for k in keys_aws if k["KeyId"] == key_id_aws] |
| 149 | + assert key_local_filtered == key_aws_filtered |
| 150 | + |
| 151 | + # Negative test: attempt write operations with proxied client |
| 152 | + # Create a new key using the proxied client (should succeed in LocalStack) |
| 153 | + new_key_description = f"no-proxy-key-{short_uid()}" |
| 154 | + new_key_local = kms_client.create_key(Description=new_key_description) |
| 155 | + new_key_id_local = new_key_local["KeyMetadata"]["KeyId"] |
| 156 | + cleanups.append( |
| 157 | + lambda: kms_client.schedule_key_deletion( |
| 158 | + KeyId=new_key_id_local, PendingWindowInDays=7 |
| 159 | + ) |
| 160 | + ) |
| 161 | + |
| 162 | + # Verify that this new key does NOT exist in real AWS |
| 163 | + keys_aws_after_create = kms_client_aws.list_keys()["Keys"] |
| 164 | + assert not any(k for k in keys_aws_after_create if k["KeyId"] == new_key_id_local) |
| 165 | + |
| 166 | + # Attempt to encrypt data with the AWS key using proxied client (should fail) |
| 167 | + plaintext = b"this should not work" |
| 168 | + with pytest.raises(ClientError) as excinfo: |
| 169 | + kms_client.encrypt(KeyId=key_id_aws, Plaintext=plaintext) |
| 170 | + # In read-only mode, the key exists in AWS but LocalStack doesn't have it locally |
| 171 | + # so encrypt operation should fail |
| 172 | + assert excinfo.value.response["Error"]["Code"] in [ |
| 173 | + "NotFoundException", |
| 174 | + "InvalidKeyId.NotFound", |
| 175 | + ] |
| 176 | + |
| 177 | + |
| 178 | +def test_kms_selective_resource_matching(start_aws_proxy, cleanups): |
| 179 | + """Test that proxy forwards requests for specific KMS keys matching ARN pattern.""" |
| 180 | + key_description_1 = f"test-proxied-key-1-{short_uid()}" |
| 181 | + key_description_2 = f"test-proxied-key-2-{short_uid()}" |
| 182 | + |
| 183 | + # create clients |
| 184 | + kms_client_aws = boto3.client("kms") |
| 185 | + |
| 186 | + # create two keys in AWS |
| 187 | + create_response_1 = kms_client_aws.create_key(Description=key_description_1) |
| 188 | + key_id_1 = create_response_1["KeyMetadata"]["KeyId"] |
| 189 | + key_arn_1 = create_response_1["KeyMetadata"]["Arn"] |
| 190 | + cleanups.append( |
| 191 | + lambda: kms_client_aws.schedule_key_deletion( |
| 192 | + KeyId=key_id_1, PendingWindowInDays=7 |
| 193 | + ) |
| 194 | + ) |
| 195 | + |
| 196 | + create_response_2 = kms_client_aws.create_key(Description=key_description_2) |
| 197 | + key_id_2 = create_response_2["KeyMetadata"]["KeyId"] |
| 198 | + key_arn_2 = create_response_2["KeyMetadata"]["Arn"] |
| 199 | + cleanups.append( |
| 200 | + lambda: kms_client_aws.schedule_key_deletion( |
| 201 | + KeyId=key_id_2, PendingWindowInDays=7 |
| 202 | + ) |
| 203 | + ) |
| 204 | + |
| 205 | + # start proxy - forwarding requests for both keys using wildcard pattern |
| 206 | + config = { |
| 207 | + "services": { |
| 208 | + "kms": {"resources": [".*:key/.*"]}, |
| 209 | + } |
| 210 | + } |
| 211 | + start_aws_proxy(config) |
| 212 | + |
| 213 | + # create LocalStack client after proxy is started |
| 214 | + kms_client = connect_to().kms |
| 215 | + |
| 216 | + # verify that both keys are accessible via local client (proxied) |
| 217 | + key_1_local = kms_client.describe_key(KeyId=key_id_1) |
| 218 | + assert key_1_local["KeyMetadata"]["KeyId"] == key_id_1 |
| 219 | + assert key_1_local["KeyMetadata"]["Arn"] == key_arn_1 |
| 220 | + |
| 221 | + key_2_local = kms_client.describe_key(KeyId=key_id_2) |
| 222 | + assert key_2_local["KeyMetadata"]["KeyId"] == key_id_2 |
| 223 | + assert key_2_local["KeyMetadata"]["Arn"] == key_arn_2 |
| 224 | + |
| 225 | + # verify we can encrypt with both proxied keys |
| 226 | + plaintext = b"test with first key" |
| 227 | + encrypt_response_1 = kms_client.encrypt(KeyId=key_id_1, Plaintext=plaintext) |
| 228 | + ciphertext_1 = encrypt_response_1["CiphertextBlob"] |
| 229 | + |
| 230 | + # decrypt with AWS client to confirm it went through proxy |
| 231 | + decrypt_response_1 = kms_client_aws.decrypt(CiphertextBlob=ciphertext_1) |
| 232 | + assert decrypt_response_1["Plaintext"] == plaintext |
| 233 | + |
| 234 | + # encrypt with second key |
| 235 | + plaintext_2 = b"test with second key" |
| 236 | + encrypt_response_2 = kms_client.encrypt(KeyId=key_id_2, Plaintext=plaintext_2) |
| 237 | + ciphertext_2 = encrypt_response_2["CiphertextBlob"] |
| 238 | + |
| 239 | + # decrypt with AWS client |
| 240 | + decrypt_response_2 = kms_client_aws.decrypt(CiphertextBlob=ciphertext_2) |
| 241 | + assert decrypt_response_2["Plaintext"] == plaintext_2 |
0 commit comments