|
| 1 | +""" |
| 2 | +Cache for OpenAlchemy. |
| 3 | +
|
| 4 | +The name of the file is: |
| 5 | +__open_alchemy_<sha256 of spec filename>_cache__ |
| 6 | +
|
| 7 | +The structure of the file is: |
| 8 | +
|
| 9 | +{ |
| 10 | + "hash": "<sha256 hash of the file contents>", |
| 11 | + "data": { |
| 12 | + "schemas": { |
| 13 | + "valid": true/false |
| 14 | + } |
| 15 | + } |
| 16 | +} |
| 17 | +""" |
| 18 | + |
| 19 | +import hashlib |
| 20 | +import json |
| 21 | +import pathlib |
| 22 | +import shutil |
| 23 | + |
| 24 | +from . import exceptions |
| 25 | + |
| 26 | + |
| 27 | +def calculate_hash(value: str) -> str: |
| 28 | + """Create hash of a value.""" |
| 29 | + sha256 = hashlib.sha256() |
| 30 | + sha256.update(value.encode()) |
| 31 | + return sha256.hexdigest() |
| 32 | + |
| 33 | + |
| 34 | +def calculate_cache_path(path: pathlib.Path) -> pathlib.Path: |
| 35 | + """ |
| 36 | + Calculate the name of the cache file. |
| 37 | +
|
| 38 | + Args: |
| 39 | + path: The path to the spec file. |
| 40 | +
|
| 41 | + Returns: |
| 42 | + The path to the cache file. |
| 43 | +
|
| 44 | + """ |
| 45 | + return path.parent / f"__open_alchemy_{calculate_hash(path.name)}_cache__" |
| 46 | + |
| 47 | + |
| 48 | +_HASH_KEY = "hash" |
| 49 | +_DATA_KEY = "data" |
| 50 | +_DATA_SCHEMAS_KEY = "schemas" |
| 51 | +_DATA_SCHEMAS_VALID_KEY = "valid" |
| 52 | + |
| 53 | + |
| 54 | +def schemas_valid(filename: str) -> bool: |
| 55 | + """ |
| 56 | + Calculate whether the cache indicates that the schemas in the file are valid. |
| 57 | +
|
| 58 | + Algorithm: |
| 59 | + 1. If the file does not exist, return False. |
| 60 | + 2. If the file is actually a folder, return False. |
| 61 | + 3. If the spec file is actually a folder, return False. |
| 62 | + 4. If the spec file does not exist, return False. |
| 63 | + 5. Calculate the hash of the spec file contents. |
| 64 | + 6. Try to load the cache, if it fails or it is not a dictionary, return False. |
| 65 | + 7. Try to retrieve the hash key, if it does not exist, return False. |
| 66 | + 8. If the value of the hash key is different to the hash of the file, return False. |
| 67 | + 9. Look for the data.schemas.valid key, if it does not exist, return False. |
| 68 | + 12. If the value of data.schemas.valid is True return True, otherwise return False. |
| 69 | +
|
| 70 | + Args: |
| 71 | + filename: The name of the OpenAPI specification file. |
| 72 | +
|
| 73 | + Returns: |
| 74 | + Whether the cache indicates that the schemas in the file are valid. |
| 75 | +
|
| 76 | + """ |
| 77 | + path = pathlib.Path(filename) |
| 78 | + cache_path = calculate_cache_path(path) |
| 79 | + |
| 80 | + # Check that both file and cache exists and are files |
| 81 | + if ( |
| 82 | + not path.exists() |
| 83 | + or not path.is_file() |
| 84 | + or not cache_path.exists() |
| 85 | + or not cache_path.is_file() |
| 86 | + ): |
| 87 | + return False |
| 88 | + |
| 89 | + file_hash = calculate_hash(path.read_text()) |
| 90 | + |
| 91 | + try: |
| 92 | + cache = json.loads(cache_path.read_text()) |
| 93 | + except json.JSONDecodeError: |
| 94 | + return False |
| 95 | + |
| 96 | + cache_valid = ( |
| 97 | + isinstance(cache, dict) |
| 98 | + and _HASH_KEY in cache |
| 99 | + and _DATA_KEY in cache |
| 100 | + and isinstance(cache[_DATA_KEY], dict) |
| 101 | + and _DATA_SCHEMAS_KEY in cache[_DATA_KEY] |
| 102 | + and isinstance(cache[_DATA_KEY][_DATA_SCHEMAS_KEY], dict) |
| 103 | + and _DATA_SCHEMAS_VALID_KEY in cache[_DATA_KEY][_DATA_SCHEMAS_KEY] |
| 104 | + ) |
| 105 | + if not cache_valid: |
| 106 | + return False |
| 107 | + |
| 108 | + cache_file_hash = cache[_HASH_KEY] |
| 109 | + if file_hash != cache_file_hash: |
| 110 | + return False |
| 111 | + |
| 112 | + return cache[_DATA_KEY][_DATA_SCHEMAS_KEY][_DATA_SCHEMAS_VALID_KEY] is True |
| 113 | + |
| 114 | + |
| 115 | +def schemas_are_valid(filename: str) -> None: |
| 116 | + """ |
| 117 | + Update the cache to indicate that the filename is valid. |
| 118 | +
|
| 119 | + Algorithm: |
| 120 | + 1. If the spec filename is actually a folder, raise a CacheError. |
| 121 | + 2. If the spec filename does not exist, raise a CacheError. |
| 122 | + 3. Calculate the hash of the spec file contents. |
| 123 | + 4. If the chache is actually a folder, delete the folder. |
| 124 | + 5. If the cache does not exist, create the cache. |
| 125 | + 6. Read the contents of the cache. If it is not a dictionary, throw the contents |
| 126 | + away and create an empty dictionary. |
| 127 | + 7. Create or update the hash key in the cache dictionary to be the calculated value. |
| 128 | + 8. Look for the data key in the cache dictionary. If it does not exist or is not a |
| 129 | + dictionary, make it an empty dictionary. |
| 130 | + 9. Look for the schemas key under data in the cache dictionary. If it does not exist |
| 131 | + or is not a dictionary, set it to be an empty dictionary. |
| 132 | + 10. Create or update the valid key under data.schemas and set it to True. |
| 133 | + 11. Write the dictionary to the file as JSON. |
| 134 | +
|
| 135 | + Args: |
| 136 | + filename: The name of the spec file. |
| 137 | +
|
| 138 | + """ |
| 139 | + path = pathlib.Path(filename) |
| 140 | + if not path.exists(): |
| 141 | + raise exceptions.CacheError( |
| 142 | + f"the spec file does not exists, filename={filename}" |
| 143 | + ) |
| 144 | + if not path.is_file(): |
| 145 | + raise exceptions.CacheError(f"the spec file is not a file, filename={filename}") |
| 146 | + file_hash = calculate_hash(path.read_text()) |
| 147 | + |
| 148 | + cache_path = calculate_cache_path(path) |
| 149 | + if cache_path.exists() and not cache_path.is_file(): |
| 150 | + shutil.rmtree(cache_path) |
| 151 | + if not cache_path.exists(): |
| 152 | + cache_path.write_text("", encoding="utf-8") |
| 153 | + |
| 154 | + try: |
| 155 | + cache = json.loads(cache_path.read_text()) |
| 156 | + except json.JSONDecodeError: |
| 157 | + cache = {} |
| 158 | + if not isinstance(cache, dict): |
| 159 | + cache = {} |
| 160 | + |
| 161 | + cache[_HASH_KEY] = file_hash |
| 162 | + |
| 163 | + if _DATA_KEY not in cache or not isinstance(cache[_DATA_KEY], dict): |
| 164 | + cache[_DATA_KEY] = {} |
| 165 | + cache_data = cache[_DATA_KEY] |
| 166 | + if _DATA_SCHEMAS_KEY not in cache_data or not isinstance( |
| 167 | + cache_data[_DATA_SCHEMAS_KEY], dict |
| 168 | + ): |
| 169 | + cache_data[_DATA_SCHEMAS_KEY] = {} |
| 170 | + cache_data_schemas = cache_data[_DATA_SCHEMAS_KEY] |
| 171 | + cache_data_schemas[_DATA_SCHEMAS_VALID_KEY] = True |
| 172 | + |
| 173 | + cache_path.write_text(json.dumps(cache), encoding="utf-8") |
0 commit comments