|
8 | 8 | import freezegun |
9 | 9 | import jwt |
10 | 10 | import pytest |
| 11 | +from cryptography.hazmat.backends import default_backend |
| 12 | +from cryptography.hazmat.primitives import serialization |
| 13 | +from cryptography.hazmat.primitives.asymmetric import rsa |
11 | 14 |
|
12 | 15 | from airbyte_cdk.sources.declarative.auth.jwt import JwtAuthenticator |
13 | 16 |
|
@@ -185,3 +188,100 @@ def test_get_header_prefix(self, header_prefix, expected): |
185 | 188 | header_prefix=header_prefix, |
186 | 189 | ) |
187 | 190 | assert authenticator._get_header_prefix() == expected |
| 191 | + |
| 192 | + def test_get_secret_key_with_passphrase(self): |
| 193 | + """Test _get_secret_key method with encrypted private key and passphrase.""" |
| 194 | + # Generate a test RSA private key |
| 195 | + private_key = rsa.generate_private_key( |
| 196 | + public_exponent=65537, key_size=2048, backend=default_backend() |
| 197 | + ) |
| 198 | + |
| 199 | + passphrase = b"test_passphrase" |
| 200 | + encrypted_pem = private_key.private_bytes( |
| 201 | + encoding=serialization.Encoding.PEM, |
| 202 | + format=serialization.PrivateFormat.PKCS8, |
| 203 | + encryption_algorithm=serialization.BestAvailableEncryption(passphrase), |
| 204 | + ) |
| 205 | + |
| 206 | + authenticator = JwtAuthenticator( |
| 207 | + config={}, |
| 208 | + parameters={}, |
| 209 | + secret_key=encrypted_pem.decode(), |
| 210 | + algorithm="RS256", |
| 211 | + token_duration=1200, |
| 212 | + passphrase="test_passphrase", |
| 213 | + ) |
| 214 | + |
| 215 | + result_key = authenticator._get_secret_key() |
| 216 | + |
| 217 | + assert isinstance(result_key, rsa.RSAPrivateKey) |
| 218 | + |
| 219 | + original_public_key = private_key.public_key() |
| 220 | + result_public_key = result_key.public_key() |
| 221 | + |
| 222 | + original_public_numbers = original_public_key.public_numbers() |
| 223 | + result_public_numbers = result_public_key.public_numbers() |
| 224 | + |
| 225 | + assert original_public_numbers.n == result_public_numbers.n |
| 226 | + assert original_public_numbers.e == result_public_numbers.e |
| 227 | + |
| 228 | + def test_get_secret_key_with_wrong_passphrase_raises_error(self): |
| 229 | + """Test that _get_secret_key raises error with wrong passphrase.""" |
| 230 | + private_key = rsa.generate_private_key( |
| 231 | + public_exponent=65537, key_size=2048, backend=default_backend() |
| 232 | + ) |
| 233 | + |
| 234 | + passphrase = b"correct_passphrase" |
| 235 | + encrypted_pem = private_key.private_bytes( |
| 236 | + encoding=serialization.Encoding.PEM, |
| 237 | + format=serialization.PrivateFormat.PKCS8, |
| 238 | + encryption_algorithm=serialization.BestAvailableEncryption(passphrase), |
| 239 | + ) |
| 240 | + |
| 241 | + authenticator = JwtAuthenticator( |
| 242 | + config={}, |
| 243 | + parameters={}, |
| 244 | + secret_key=encrypted_pem.decode(), |
| 245 | + algorithm="RS256", |
| 246 | + token_duration=1200, |
| 247 | + passphrase="wrong_passphrase", |
| 248 | + ) |
| 249 | + |
| 250 | + with pytest.raises(Exception): |
| 251 | + authenticator._get_secret_key() |
| 252 | + |
| 253 | + def test_get_signed_token_with_passphrase_protected_key(self): |
| 254 | + """Test that JWT signing works with passphrase-protected RSA private key.""" |
| 255 | + private_key = rsa.generate_private_key( |
| 256 | + public_exponent=65537, key_size=2048, backend=default_backend() |
| 257 | + ) |
| 258 | + |
| 259 | + passphrase = b"test_passphrase" |
| 260 | + encrypted_pem = private_key.private_bytes( |
| 261 | + encoding=serialization.Encoding.PEM, |
| 262 | + format=serialization.PrivateFormat.PKCS8, |
| 263 | + encryption_algorithm=serialization.BestAvailableEncryption(passphrase), |
| 264 | + ) |
| 265 | + |
| 266 | + authenticator = JwtAuthenticator( |
| 267 | + config={}, |
| 268 | + parameters={}, |
| 269 | + secret_key=encrypted_pem.decode(), |
| 270 | + algorithm="RS256", |
| 271 | + token_duration=1000, |
| 272 | + passphrase="test_passphrase", |
| 273 | + typ="JWT", |
| 274 | + iss="test_issuer", |
| 275 | + ) |
| 276 | + |
| 277 | + signed_token = authenticator._get_signed_token() |
| 278 | + |
| 279 | + assert isinstance(signed_token, str) |
| 280 | + assert len(signed_token.split(".")) == 3 |
| 281 | + |
| 282 | + public_key = private_key.public_key() |
| 283 | + decoded_payload = jwt.decode(signed_token, public_key, algorithms=["RS256"]) |
| 284 | + |
| 285 | + assert decoded_payload["iss"] == "test_issuer" |
| 286 | + assert "iat" in decoded_payload |
| 287 | + assert "exp" in decoded_payload |
0 commit comments