Skip to content
48 changes: 42 additions & 6 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,69 @@
"version": "2.0.0",
"tasks": [
{
"label": "Generate Overloads",
"label": "Generate Data Model",
"problemMatcher": [],
"dependsOn": [
"Generate Overloads without Formatting",
"Generate Driver Models",
"Generate Overloads",
"Format",
],
"isBackground": true,
"promptOnClose": false,
"dependsOrder": "sequence",
},
{
"label": "Generate Overloads without Formatting",
"label": "Generate Overloads",
"type": "shell",
"command": "python",
"command": "poetry",
"args": [
"${workspaceFolder}/src/generate_overloads.py"
"run",
"python",
"${workspaceFolder}/src/scripts/generate_overloads.py"
],
"hide": true
},
{
"label": "Format",
"type": "shell",
"command": "ruff",
"command": "poetry",
"args": [
"run",
"ruff",
"format",
"."
]
},
{
"label": "Generate Driver Models",
"type": "shell",
"command": "poetry",
"args": [
"run",
"python",
"${workspaceFolder}/src/scripts/generate_driver_models.py",
"${input:driversSchemaFile}",
"${input:driverModelFile}"
],
"problemMatcher": [],
"presentation": {
"reveal": "always",
"panel": "new"
}
}
],
"inputs": [
{
"id": "driversSchemaFile",
"type": "promptString",
"description": "Path to drivers schema JSON file (optional, use latest version as default)",
"default": ""
},
{
"id": "driverModelFile",
"type": "promptString",
"description": "Path to driver models Python file (optional)",
"default": ""
}
]
}
172 changes: 172 additions & 0 deletions src/scripts/generate_driver_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import argparse
import importlib.util
import json


def load_pydantic_model_builder():
spec = importlib.util.spec_from_file_location(
"pydantic_model_builder", "src/videoipath_automation_tool/utils/pydantic_model_builder.py"
)

if spec is None or spec.loader is None:
raise ValueError("Failed to load pydantic_model_builder module")

module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module


parser = argparse.ArgumentParser(description="Generate Pydantic models from driver schema")
parser.add_argument(
"schema_file",
nargs="?",
default="src/videoipath_automation_tool/apps/inventory/model/driver_schema/2024.3.3.json",
help="Path to the driver schema JSON file",
)
parser.add_argument(
"output_file",
nargs="?",
default="src/videoipath_automation_tool/apps/inventory/model/drivers.py",
help="Path where the generated Python file will be saved",
)


def _generate_driver_model(driver_schema: dict) -> str:
pmb_module = load_pydantic_model_builder()
PydanticModelBuilder = pmb_module.PydanticModelBuilder
PydanticModelField = pmb_module.PydanticModelField

driver_id = driver_schema["_id"]
builder = PydanticModelBuilder(
name=_get_custom_settings_class_name(driver_id), parent_classes=["DriverCustomSettings"]
)

builder.add_field(
PydanticModelField(
name="driver_id",
type=f'Literal["{driver_id}"]',
default=driver_id,
)
)

if "values" in driver_schema["customSettings"]["_schema"]:
for field_id, field in driver_schema["customSettings"]["_schema"]["values"].items():
default = field["_schema"]["default"]
min_value, max_value = _get_attribute_range(field)
type_, literal_options = _get_attribute_type(field)

builder.add_field(
PydanticModelField(
name=field_id.split(".")[-1],
type=type_,
default=default,
alias=field_id,
label=field["_schema"]["descriptor"]["label"],
description=field["_schema"]["descriptor"]["desc"],
is_optional=field["_schema"]["isNullable"],
min_value=min_value,
max_value=max_value,
literal_options=literal_options,
)
)

return builder.build()


def _generate_driver_id_custom_settings_mapping(drivers: list[dict]) -> str:
mapping = ",\n\t".join(
[f'"{driver["_id"]}": {_get_custom_settings_class_name(driver["_id"])}' for driver in drivers]
)
return f"DRIVER_ID_TO_CUSTOM_SETTINGS: Dict[str, Type[DriverCustomSettings]] = {{\n\t{mapping}\n}}"


def _generate_driver_literal(drivers: list[dict]) -> str:
return "DriverLiteral = Literal[\n\t" + ",\n\t".join([f'"{driver["_id"]}"' for driver in drivers]) + "\n]"


def _generate_custom_settings_type(drivers: list[dict]) -> str:
custom_settings_classes = ",\n\t".join([_get_custom_settings_class_name(driver["_id"]) for driver in drivers])
return f"CustomSettings = Union[\n\t{custom_settings_classes}\n]"


def _get_custom_settings_class_name(driver_id: str) -> str:
return f"CustomSettings_{driver_id.replace('.', '_').replace('-', '_')}"


def _get_attribute_range(field: dict) -> tuple[int | None, int | None]:
if "ranges" not in field["_schema"] or len(field["_schema"]["ranges"]) == 0:
return None, None

min_value = field["_schema"]["default"]
max_value = field["_schema"]["default"]

for range in field["_schema"]["ranges"]:
range_start, range_end, _step = range

min_value = min(min_value, range_start)
max_value = max(max_value, range_end)

return min_value, max_value


def _get_attribute_type(field: dict) -> tuple[str, list[tuple[str | int | float, str, bool]] | None]:
if "options" in field["_schema"] and len(field["_schema"]["options"]) > 0:

def format_value(value: str | int | float) -> str:
if isinstance(value, str):
return f'"{value}"'
return str(value)

return (
"Literal[" + ", ".join([format_value(option["value"]) for option in field["_schema"]["options"]]) + "]",
[
(option["value"], option["descriptor"]["label"], option["value"] == field["_schema"]["default"])
for option in field["_schema"]["options"]
],
)
return field["_schema"]["type"], None


if __name__ == "__main__":
args = parser.parse_args()
schema = json.load(open(args.schema_file))

drivers = schema["data"]["status"]["system"]["drivers"]["_items"]
driver_models = "\n\n".join([_generate_driver_model(driver) for driver in drivers])

code = f"""from abc import ABC
from typing import Dict, Literal, Type, TypeVar, Union, Optional

from pydantic import BaseModel, Field

# Notes:
# - The name of the custom settings model follows the naming convention: CustomSettings_<driver_organization>_<driver_name>_<driver_version> => "." and "-" are replaced by "_"!
# - {args.schema_file} is used as reference to define the custom settings model!
# - The "driver_id" attribute is necessary for the discriminator, which is used to determine the correct model for the custom settings in DeviceConfiguration!
# - The "alias" attribute is used to map the attribute to the correct key (with driver organization & name) in the JSON payload for the API!
# - "DriverLiteral" is used to provide a list of all possible drivers in the IDEs IntelliSense!


class DriverCustomSettings(ABC, BaseModel, validate_assignment=True): ...


{driver_models}

{_generate_driver_id_custom_settings_mapping(drivers)}

{_generate_driver_literal(drivers)}

# Important:
# To make the discriminator work properly, the custom settings model must be included in the Union type!
# This must be statically typed in order to make intellisense work, we can't reuse DRIVER_ID_TO_CUSTOM_SETTINGS here
{_generate_custom_settings_type(drivers)}

# used for generic typing to ensure intellisense and correct typing
CustomSettingsType = TypeVar("CustomSettingsType", bound=CustomSettings)

"""
print("Drivers generated successfully!")

with open(args.output_file, "w") as f:
f.write(code)
print(f"Updated {args.output_file}")
17 changes: 16 additions & 1 deletion src/generate_overloads.py → src/scripts/generate_overloads.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
import importlib.util
import re
from typing import Callable

from videoipath_automation_tool.apps.inventory.model.drivers import DRIVER_ID_TO_CUSTOM_SETTINGS

def load_driver_settings():
spec = importlib.util.spec_from_file_location(
"drivers_module", "src/videoipath_automation_tool/apps/inventory/model/drivers.py"
)

if spec is None or spec.loader is None:
raise ValueError("Failed to load drivers module")

module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return getattr(module, "DRIVER_ID_TO_CUSTOM_SETTINGS", {})


DRIVER_ID_TO_CUSTOM_SETTINGS = load_driver_settings()


def generate_create_device_overloads() -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ def create_device_from_discovered_device(

if suggested_config_index >= count_of_suggested_configs or suggested_config_index < 0:
raise ValueError(
f"suggested_config_index is out of range. {f'Please provide a index between 0 and {count_of_suggested_configs - 1}' if (count_of_suggested_configs-1) > 0 else 'Please provide 0 as index.'}"
f"suggested_config_index is out of range. {f'Please provide a index between 0 and {count_of_suggested_configs - 1}' if (count_of_suggested_configs - 1) > 0 else 'Please provide 0 as index.'}"
)

suggested_config = discovered_device.suggestedConfigs[suggested_config_index]
Expand Down
Loading