1+
2+ import logging
3+ import os
4+ import sys
5+ import io
6+
7+ from cryptography .hazmat .primitives .ciphers .aead import AESGCM
8+ from base64 import b64decode
9+ from urllib .parse import urlparse , parse_qsl
10+
11+ from dotenv .main import DotEnv , find_dotenv , load_dotenv
12+
13+
14+ logging .basicConfig (level = logging .INFO )
15+
16+ logger = logging .getLogger (__name__ )
17+
18+
19+ class DotEnvVaultError (Exception ):
20+ pass
21+
22+
23+ class DotEnvVault (): #vault stuff
24+ def __init__ (self ) -> None :
25+ logger .info ('initializing DotEnvVault' )
26+ self .dotenv_key = os .environ .get ('DOTENV_KEY' )
27+
28+
29+ def parsed_vault (self ) -> bytes :
30+ """
31+ Parse information from DOTENV_KEY, and decrypt vault key.
32+ """
33+ if self .dotenv_key is None : raise DotEnvVaultError ("NOT_FOUND_DOTENV_KEY: Cannot find ENV['DOTENV_KEY']" )
34+
35+ # .env.vault needs to be present.
36+ env_vault_path = find_dotenv (filename = '.env.vault' )
37+ if env_vault_path == '' :
38+ raise DotEnvVaultError ("ENV_VAULT_NOT_FOUND: .env.vault is not present." )
39+
40+ # parse DOTENV_KEY, format is a URI
41+ uri = urlparse (self .dotenv_key )
42+ # Get encrypted key
43+ key = uri .password
44+ # Get environment from query params.
45+ params = dict (parse_qsl (uri .query ))
46+ vault_environment = params .get ('environment' ).upper ()
47+
48+ if vault_environment is None or vault_environment not in ['PRODUCTION' , 'DEVELOPMENT' , 'CI' , 'STAGING' ]:
49+ raise DotEnvVaultError ('Incorrect Vault Environment.' )
50+
51+ # Getting ciphertext from correct environment in .env.vault
52+ environment_key = f'DOTENV_VAULT_{ vault_environment } '
53+ logging .info (f'Getting { environment_key } .' )
54+
55+ # use python-dotenv library class.
56+ dotenv = DotEnv (dotenv_path = env_vault_path )
57+ ciphertext = dotenv .dict ().get (environment_key )
58+
59+ decrypted = self ._decrypt (ciphertext = ciphertext , key = key )
60+ return self ._to_text_stream (decrypted )
61+
62+
63+ def _decrypt (self , ciphertext : str , key : str ) -> bytes :
64+ """
65+ decrypt method will decrypt via AES-GCM
66+ return: decrypted keys in bytes
67+ """
68+ _key = key [4 :]
69+ if len (_key ) < 64 : raise DotEnvVault ('INVALID_DOTENV_KEY: Key part must be 64 characters long (or more)' )
70+
71+ _key = bytes .fromhex (_key )
72+ ciphertext = b64decode (ciphertext )
73+
74+ aesgcm = AESGCM (_key )
75+ return aesgcm .decrypt (ciphertext [:12 ], ciphertext [12 :], b'' )
76+
77+ def _to_text_stream (self , decrypted_obj : bytes ) -> io .StringIO :
78+ """
79+ convert decrypted object (in bytes) to io.StringIO format.
80+ Python-dotenv is expecting stream to be text stream (such as `io.StringIO`).
81+ return: io.StringIO
82+ """
83+ decoded_str = decrypted_obj .decode ('utf-8' )
84+ return io .StringIO (decoded_str )
0 commit comments