|
| 1 | +import argparse |
| 2 | +import importlib.util |
| 3 | +import json |
| 4 | + |
| 5 | + |
| 6 | +def load_pydantic_model_builder(): |
| 7 | + spec = importlib.util.spec_from_file_location( |
| 8 | + "pydantic_model_builder", "src/videoipath_automation_tool/utils/pydantic_model_builder.py" |
| 9 | + ) |
| 10 | + |
| 11 | + if spec is None or spec.loader is None: |
| 12 | + raise ValueError("Failed to load pydantic_model_builder module") |
| 13 | + |
| 14 | + module = importlib.util.module_from_spec(spec) |
| 15 | + spec.loader.exec_module(module) |
| 16 | + return module |
| 17 | + |
| 18 | + |
| 19 | +parser = argparse.ArgumentParser(description="Generate Pydantic models from driver schema") |
| 20 | +parser.add_argument( |
| 21 | + "schema_file", |
| 22 | + nargs="?", |
| 23 | + default="src/videoipath_automation_tool/apps/inventory/model/driver_schema/2024.3.3.json", |
| 24 | + help="Path to the driver schema JSON file", |
| 25 | +) |
| 26 | +parser.add_argument( |
| 27 | + "output_file", |
| 28 | + nargs="?", |
| 29 | + default="src/videoipath_automation_tool/apps/inventory/model/drivers.py", |
| 30 | + help="Path where the generated Python file will be saved", |
| 31 | +) |
| 32 | + |
| 33 | + |
| 34 | +def _generate_driver_model(driver_schema: dict) -> str: |
| 35 | + pmb_module = load_pydantic_model_builder() |
| 36 | + PydanticModelBuilder = pmb_module.PydanticModelBuilder |
| 37 | + PydanticModelField = pmb_module.PydanticModelField |
| 38 | + |
| 39 | + driver_id = driver_schema["_id"] |
| 40 | + builder = PydanticModelBuilder( |
| 41 | + name=_get_custom_settings_class_name(driver_id), parent_classes=["DriverCustomSettings"] |
| 42 | + ) |
| 43 | + |
| 44 | + builder.add_field( |
| 45 | + PydanticModelField( |
| 46 | + name="driver_id", |
| 47 | + type=f'Literal["{driver_id}"]', |
| 48 | + default=driver_id, |
| 49 | + ) |
| 50 | + ) |
| 51 | + |
| 52 | + if "values" in driver_schema["customSettings"]["_schema"]: |
| 53 | + for field_id, field in driver_schema["customSettings"]["_schema"]["values"].items(): |
| 54 | + default = field["_schema"]["default"] |
| 55 | + min_value, max_value = _get_attribute_range(field) |
| 56 | + type_, literal_options = _get_attribute_type(field) |
| 57 | + |
| 58 | + builder.add_field( |
| 59 | + PydanticModelField( |
| 60 | + name=field_id.split(".")[-1], |
| 61 | + type=type_, |
| 62 | + default=default, |
| 63 | + alias=field_id, |
| 64 | + label=field["_schema"]["descriptor"]["label"], |
| 65 | + description=field["_schema"]["descriptor"]["desc"], |
| 66 | + is_optional=field["_schema"]["isNullable"], |
| 67 | + min_value=min_value, |
| 68 | + max_value=max_value, |
| 69 | + literal_options=literal_options, |
| 70 | + ) |
| 71 | + ) |
| 72 | + |
| 73 | + return builder.build() |
| 74 | + |
| 75 | + |
| 76 | +def _generate_driver_id_custom_settings_mapping(drivers: list[dict]) -> str: |
| 77 | + mapping = ",\n\t".join( |
| 78 | + [f'"{driver["_id"]}": {_get_custom_settings_class_name(driver["_id"])}' for driver in drivers] |
| 79 | + ) |
| 80 | + return f"DRIVER_ID_TO_CUSTOM_SETTINGS: Dict[str, Type[DriverCustomSettings]] = {{\n\t{mapping}\n}}" |
| 81 | + |
| 82 | + |
| 83 | +def _generate_driver_literal(drivers: list[dict]) -> str: |
| 84 | + return "DriverLiteral = Literal[\n\t" + ",\n\t".join([f'"{driver["_id"]}"' for driver in drivers]) + "\n]" |
| 85 | + |
| 86 | + |
| 87 | +def _generate_custom_settings_type(drivers: list[dict]) -> str: |
| 88 | + custom_settings_classes = ",\n\t".join([_get_custom_settings_class_name(driver["_id"]) for driver in drivers]) |
| 89 | + return f"CustomSettings = Union[\n\t{custom_settings_classes}\n]" |
| 90 | + |
| 91 | + |
| 92 | +def _get_custom_settings_class_name(driver_id: str) -> str: |
| 93 | + return f"CustomSettings_{driver_id.replace('.', '_').replace('-', '_')}" |
| 94 | + |
| 95 | + |
| 96 | +def _get_attribute_range(field: dict) -> tuple[int | None, int | None]: |
| 97 | + if "ranges" not in field["_schema"] or len(field["_schema"]["ranges"]) == 0: |
| 98 | + return None, None |
| 99 | + |
| 100 | + min_value = field["_schema"]["default"] |
| 101 | + max_value = field["_schema"]["default"] |
| 102 | + |
| 103 | + for range in field["_schema"]["ranges"]: |
| 104 | + range_start, range_end, _step = range |
| 105 | + |
| 106 | + min_value = min(min_value, range_start) |
| 107 | + max_value = max(max_value, range_end) |
| 108 | + |
| 109 | + return min_value, max_value |
| 110 | + |
| 111 | + |
| 112 | +def _get_attribute_type(field: dict) -> tuple[str, list[tuple[str | int | float, str, bool]] | None]: |
| 113 | + if "options" in field["_schema"] and len(field["_schema"]["options"]) > 0: |
| 114 | + |
| 115 | + def format_value(value: str | int | float) -> str: |
| 116 | + if isinstance(value, str): |
| 117 | + return f'"{value}"' |
| 118 | + return str(value) |
| 119 | + |
| 120 | + return ( |
| 121 | + "Literal[" + ", ".join([format_value(option["value"]) for option in field["_schema"]["options"]]) + "]", |
| 122 | + [ |
| 123 | + (option["value"], option["descriptor"]["label"], option["value"] == field["_schema"]["default"]) |
| 124 | + for option in field["_schema"]["options"] |
| 125 | + ], |
| 126 | + ) |
| 127 | + return field["_schema"]["type"], None |
| 128 | + |
| 129 | + |
| 130 | +if __name__ == "__main__": |
| 131 | + args = parser.parse_args() |
| 132 | + schema = json.load(open(args.schema_file)) |
| 133 | + |
| 134 | + drivers = schema["data"]["status"]["system"]["drivers"]["_items"] |
| 135 | + driver_models = "\n\n".join([_generate_driver_model(driver) for driver in drivers]) |
| 136 | + |
| 137 | + code = f"""from abc import ABC |
| 138 | +from typing import Dict, Literal, Type, TypeVar, Union, Optional |
| 139 | +
|
| 140 | +from pydantic import BaseModel, Field |
| 141 | +
|
| 142 | +# Notes: |
| 143 | +# - The name of the custom settings model follows the naming convention: CustomSettings_<driver_organization>_<driver_name>_<driver_version> => "." and "-" are replaced by "_"! |
| 144 | +# - {args.schema_file} is used as reference to define the custom settings model! |
| 145 | +# - The "driver_id" attribute is necessary for the discriminator, which is used to determine the correct model for the custom settings in DeviceConfiguration! |
| 146 | +# - The "alias" attribute is used to map the attribute to the correct key (with driver organization & name) in the JSON payload for the API! |
| 147 | +# - "DriverLiteral" is used to provide a list of all possible drivers in the IDEs IntelliSense! |
| 148 | +
|
| 149 | +
|
| 150 | +class DriverCustomSettings(ABC, BaseModel, validate_assignment=True): ... |
| 151 | +
|
| 152 | +
|
| 153 | +{driver_models} |
| 154 | +
|
| 155 | +{_generate_driver_id_custom_settings_mapping(drivers)} |
| 156 | +
|
| 157 | +{_generate_driver_literal(drivers)} |
| 158 | +
|
| 159 | +# Important: |
| 160 | +# To make the discriminator work properly, the custom settings model must be included in the Union type! |
| 161 | +# This must be statically typed in order to make intellisense work, we can't reuse DRIVER_ID_TO_CUSTOM_SETTINGS here |
| 162 | +{_generate_custom_settings_type(drivers)} |
| 163 | +
|
| 164 | +# used for generic typing to ensure intellisense and correct typing |
| 165 | +CustomSettingsType = TypeVar("CustomSettingsType", bound=CustomSettings) |
| 166 | +
|
| 167 | +""" |
| 168 | + print("Drivers generated successfully!") |
| 169 | + |
| 170 | + with open(args.output_file, "w") as f: |
| 171 | + f.write(code) |
| 172 | + print(f"Updated {args.output_file}") |
0 commit comments