Skip to content

Commit 9fc65c7

Browse files
author
Lukas Pühringer
authored
Merge pull request #609 from ianhundere/add-aws-support
feat: Adds AWS KMS signing.
2 parents 59a91eb + e616b12 commit 9fc65c7

6 files changed

Lines changed: 312 additions & 1 deletion

File tree

mypy.ini

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,9 @@ ignore_missing_imports = True
3232

3333
[mypy-azure.*]
3434
ignore_missing_imports = True
35+
36+
[mypy-boto3.*]
37+
ignore_missing_imports = True
38+
39+
[mypy-botocore.*]
40+
ignore_missing_imports = True

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ Issues = "https://github.com/secure-systems-lab/securesystemslib/issues"
4646
crypto = ["cryptography>=40.0.0"]
4747
gcpkms = ["google-cloud-kms", "cryptography>=40.0.0"]
4848
azurekms = ["azure-identity", "azure-keyvault-keys", "cryptography>=40.0.0"]
49+
awskms = ["boto3", "botocore", "cryptography>=40.0.0"]
4950
hsm = ["asn1crypto", "cryptography>=40.0.0", "PyKCS11"]
5051
pynacl = ["pynacl>1.2.0"]
5152
PySPX = ["PySPX>=0.5.0"]

securesystemslib/signer/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
This module provides extensible interfaces for public keys and signers:
55
Some implementations are provided by default but more can be added by users.
66
"""
7+
from securesystemslib.signer._aws_signer import AWSSigner
78
from securesystemslib.signer._azure_signer import AzureSigner
89
from securesystemslib.signer._gcp_signer import GCPSigner
910
from securesystemslib.signer._gpg_signer import GPGKey, GPGSigner
@@ -32,6 +33,7 @@
3233
HSMSigner.SCHEME: HSMSigner,
3334
GPGSigner.SCHEME: GPGSigner,
3435
AzureSigner.SCHEME: AzureSigner,
36+
AWSSigner.SCHEME: AWSSigner,
3537
}
3638
)
3739

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
"""Signer implementation for AWS Key Management Service"""
2+
3+
import logging
4+
from typing import Optional, Tuple
5+
from urllib import parse
6+
7+
import securesystemslib.hash as sslib_hash
8+
from securesystemslib import exceptions
9+
from securesystemslib.exceptions import UnsupportedLibraryError
10+
from securesystemslib.signer._key import Key
11+
from securesystemslib.signer._signer import (
12+
SecretsHandler,
13+
Signature,
14+
Signer,
15+
SSlibKey,
16+
)
17+
18+
logger = logging.getLogger(__name__)
19+
20+
AWS_IMPORT_ERROR = None
21+
try:
22+
import boto3
23+
from botocore.exceptions import BotoCoreError, ClientError
24+
from cryptography.hazmat.primitives import serialization
25+
except ImportError:
26+
AWS_IMPORT_ERROR = "Signing with AWS KMS requires aws-kms and cryptography."
27+
28+
29+
class AWSSigner(Signer):
30+
"""AWS Key Management Service Signer
31+
32+
This Signer uses AWS KMS to sign and supports signing with RSA/EC keys and
33+
uses "ambient" credentials typically environment variables such as
34+
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN. These will
35+
be recognized by the boto3 SDK, which underlies the aws_kms Python module.
36+
37+
For more details on AWS authentication, refer to the AWS Command Line
38+
Interface User Guide:
39+
https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html
40+
41+
Some practical authentication options include:
42+
AWS CLI: https://aws.amazon.com/cli/
43+
AWS SDKs: https://aws.amazon.com/tools/
44+
45+
The specific permissions that AWS KMS signer needs are:
46+
kms:Sign for sign()
47+
kms:GetPublicKey for import()
48+
49+
Arguments:
50+
aws_key_id (str): AWS KMS key ID or alias.
51+
public_key (Key): The related public key instance.
52+
53+
Returns:
54+
AWSSigner: An instance of the AWSSigner class.
55+
56+
Raises:
57+
UnsupportedAlgorithmError: If the payload hash algorithm is unsupported.
58+
BotoCoreError: Errors from the botocore.exceptions library.
59+
ClientError: Errors related to AWS KMS client.
60+
UnsupportedLibraryError: If necessary libraries for AWS KMS are not available.
61+
"""
62+
63+
SCHEME = "awskms"
64+
65+
def __init__(self, aws_key_id: str, public_key: Key):
66+
if AWS_IMPORT_ERROR:
67+
raise UnsupportedLibraryError(AWS_IMPORT_ERROR)
68+
69+
self.hash_algorithm = self._get_hash_algorithm(public_key)
70+
self.aws_key_id = aws_key_id
71+
self.public_key = public_key
72+
self.client = boto3.client("kms")
73+
self.aws_algo = self._get_aws_signing_algo(self.public_key.scheme)
74+
75+
@classmethod
76+
def from_priv_key_uri(
77+
cls,
78+
priv_key_uri: str,
79+
public_key: Key,
80+
secrets_handler: Optional[SecretsHandler] = None,
81+
) -> "AWSSigner":
82+
uri = parse.urlparse(priv_key_uri)
83+
84+
if uri.scheme != cls.SCHEME:
85+
raise ValueError(f"AWSSigner does not support {priv_key_uri}")
86+
87+
return cls(uri.path, public_key)
88+
89+
@classmethod
90+
def import_(cls, aws_key_id: str, local_scheme: str) -> Tuple[str, Key]:
91+
"""Loads a key and signer details from AWS KMS.
92+
93+
Returns the private key uri and the public key. This method should only
94+
be called once per key: the uri and Key should be stored for later use.
95+
96+
Arguments:
97+
aws_key_id (str): AWS KMS key ID.
98+
local_scheme (str): Local scheme to use.
99+
100+
Returns:
101+
Tuple[str, Key]: A tuple where the first element is a string
102+
representing the private key URI, and the second element is an
103+
instance of the public key.
104+
105+
Raises:
106+
UnsupportedAlgorithmError: If the AWS KMS signing algorithm is
107+
unsupported.
108+
BotoCoreError: Errors from the botocore.exceptions library.
109+
ClientError: Errors related to AWS KMS client.
110+
"""
111+
if AWS_IMPORT_ERROR:
112+
raise UnsupportedLibraryError(AWS_IMPORT_ERROR)
113+
114+
client = boto3.client("kms")
115+
request = client.get_public_key(KeyId=aws_key_id)
116+
kms_pubkey = serialization.load_der_public_key(request["PublicKey"])
117+
118+
public_key_pem = kms_pubkey.public_bytes(
119+
encoding=serialization.Encoding.PEM,
120+
format=serialization.PublicFormat.SubjectPublicKeyInfo,
121+
).decode("utf-8")
122+
try:
123+
keytype = cls._get_keytype_for_scheme(local_scheme)
124+
except KeyError as e:
125+
raise exceptions.UnsupportedAlgorithmError(
126+
f"{local_scheme} is not a supported signing algorithm"
127+
) from e
128+
129+
keyval = {"public": public_key_pem}
130+
keyid = cls._get_keyid(keytype, local_scheme, keyval)
131+
public_key = SSlibKey(keyid, keytype, local_scheme, keyval)
132+
return f"{cls.SCHEME}:{aws_key_id}", public_key
133+
134+
@staticmethod
135+
def _get_keytype_for_scheme(
136+
scheme: str,
137+
) -> str:
138+
"""Returns the Secure Systems Library key type.
139+
140+
Arguments:
141+
(str): The Secure Systems Library scheme.
142+
143+
Returns:
144+
str: The Secure Systems Library key type.
145+
"""
146+
keytype_for_scheme = {
147+
"ecdsa-sha2-nistp256": "ecdsa",
148+
"ecdsa-sha2-nistp384": "ecdsa",
149+
"ecdsa-sha2-nistp512": "ecdsa",
150+
"rsassa-pss-sha256": "rsa",
151+
"rsassa-pss-sha384": "rsa",
152+
"rsassa-pss-sha512": "rsa",
153+
"rsa-pkcs1v15-sha256": "rsa",
154+
"rsa-pkcs1v15-sha384": "rsa",
155+
"rsa-pkcs1v15-sha512": "rsa",
156+
}
157+
return keytype_for_scheme[scheme]
158+
159+
@staticmethod
160+
def _get_aws_signing_algo(
161+
scheme: str,
162+
) -> str:
163+
"""Returns AWS signing algorithm
164+
165+
Arguments:
166+
scheme (str): The Secure Systems Library signing scheme.
167+
168+
Returns:
169+
str: AWS signing scheme.
170+
"""
171+
aws_signing_algorithms = {
172+
"ecdsa-sha2-nistp256": "ECDSA_SHA_256",
173+
"ecdsa-sha2-nistp384": "ECDSA_SHA_384",
174+
"ecdsa-sha2-nistp512": "ECDSA_SHA_512",
175+
"rsassa-pss-sha256": "RSASSA_PSS_SHA_256",
176+
"rsassa-pss-sha384": "RSASSA_PSS_SHA_384",
177+
"rsassa-pss-sha512": "RSASSA_PSS_SHA_512",
178+
"rsa-pkcs1v15-sha256": "RSASSA_PKCS1_V1_5_SHA_256",
179+
"rsa-pkcs1v15-sha384": "RSASSA_PKCS1_V1_5_SHA_384",
180+
"rsa-pkcs1v15-sha512": "RSASSA_PKCS1_V1_5_SHA_512",
181+
}
182+
return aws_signing_algorithms[scheme]
183+
184+
@staticmethod
185+
def _get_hash_algorithm(public_key: Key) -> str:
186+
"""Helper function to return payload hash algorithm used for this key
187+
188+
Arguments:
189+
public_key (Key): Public key object
190+
191+
Returns:
192+
str: Hash algorithm
193+
"""
194+
if public_key.keytype == "rsa":
195+
# hash algorithm is encoded as last scheme portion
196+
algo = public_key.scheme.split("-")[-1]
197+
if public_key.keytype in [
198+
"ecdsa",
199+
"ecdsa-sha2-nistp256",
200+
"ecdsa-sha2-nistp384",
201+
]:
202+
# nistp256 uses sha-256, nistp384 uses sha-384
203+
bits = public_key.scheme.split("-nistp")[-1]
204+
algo = f"sha{bits}"
205+
206+
# trigger UnsupportedAlgorithm if appropriate
207+
_ = sslib_hash.digest(algo)
208+
return algo
209+
210+
def sign(self, payload: bytes) -> Signature:
211+
"""Sign the payload with the AWS KMS key
212+
213+
Arguments:
214+
payload: bytes to be signed.
215+
216+
Raises:
217+
BotoCoreError: Errors from the botocore.exceptions library.
218+
ClientError: Errors related to AWS KMS client.
219+
220+
Returns:
221+
Signature.
222+
"""
223+
try:
224+
request = self.client.sign(
225+
KeyId=self.aws_key_id,
226+
Message=payload,
227+
MessageType="RAW",
228+
SigningAlgorithm=self.aws_algo,
229+
)
230+
231+
hasher = sslib_hash.digest(self.hash_algorithm)
232+
hasher.update(payload)
233+
logger.debug("signing response %s", request)
234+
response = request["Signature"]
235+
logger.debug("signing response %s", response)
236+
237+
return Signature(self.public_key.keyid, response.hex())
238+
except (BotoCoreError, ClientError) as e:
239+
logger.error("Failed to sign with AWS KMS: %s", str(e))
240+
raise e

tests/check_aws_signer.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
"""This module confirms that signing using AWS KMS keys works.
2+
3+
The purpose is to do a smoke test, not to exhaustively test every possible key
4+
and environment combination.
5+
6+
For AWS, the requirements to successfully test are:
7+
* AWS authentication details
8+
have to be available in the environment
9+
* The key defined in the test has to be
10+
available to the authenticated user
11+
12+
Remember to replace the REDACTED fields to include the necessary values:
13+
* keyid: Hash of the public key
14+
* public: The public key, refer to other KMS tests to see the format
15+
* aws_id: AWS KMS ID or alias
16+
"""
17+
18+
import unittest
19+
20+
from securesystemslib.exceptions import UnverifiedSignatureError
21+
from securesystemslib.signer import AWSSigner, Key, Signer
22+
23+
24+
class TestAWSKMSKeys(unittest.TestCase):
25+
"""Test that AWS KMS keys can be used to sign."""
26+
27+
pubkey = Key.from_dict(
28+
"REDACTED",
29+
{
30+
"keytype": "rsa",
31+
"scheme": "rsassa-pss-sha256",
32+
"keyval": {
33+
"public": "-----BEGIN PUBLIC KEY-----\nREDACTED\n-----END PUBLIC KEY-----\n"
34+
},
35+
},
36+
)
37+
aws_key_id = "REDACTED"
38+
39+
def test_aws_sign(self):
40+
"""Test that AWS KMS key works for signing"""
41+
42+
data = "data".encode("utf-8")
43+
44+
signer = Signer.from_priv_key_uri(
45+
f"awskms:{self.aws_key_id}", self.pubkey
46+
)
47+
sig = signer.sign(data)
48+
49+
self.pubkey.verify_signature(sig, data)
50+
with self.assertRaises(UnverifiedSignatureError):
51+
self.pubkey.verify_signature(sig, b"NOT DATA")
52+
53+
def test_aws_import(self):
54+
"""Test that AWS KMS key can be imported"""
55+
56+
uri, key = AWSSigner.import_(self.aws_key_id, self.pubkey.scheme)
57+
self.assertEqual(key.keytype, self.pubkey.keytype)
58+
self.assertEqual(uri, f"awskms:{self.aws_key_id}")
59+
60+
61+
if __name__ == "__main__":
62+
unittest.main(verbosity=1)

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ deps =
2121
commands =
2222
python -m tests.check_gpg_available
2323
coverage run tests/aggregate_tests.py
24-
coverage report -m --fail-under 85
24+
coverage report -m --fail-under 83
2525

2626
[testenv:purepy311]
2727
deps =

0 commit comments

Comments
 (0)