diff --git a/src/videoipath_automation_tool/apps/inventory/app/app.py b/src/videoipath_automation_tool/apps/inventory/app/app.py index e9e8cb5..9061952 100644 --- a/src/videoipath_automation_tool/apps/inventory/app/app.py +++ b/src/videoipath_automation_tool/apps/inventory/app/app.py @@ -10,11 +10,13 @@ from videoipath_automation_tool.apps.inventory.app.get_device import InventoryGetDeviceMixin from videoipath_automation_tool.apps.inventory.inventory_api import InventoryAPI from videoipath_automation_tool.apps.inventory.model.drivers import CustomSettings, CustomSettingsType, DriverLiteral +from videoipath_automation_tool.apps.inventory.model.global_snmp_config import SnmpConfiguration from videoipath_automation_tool.apps.inventory.model.inventory_device import InventoryDevice from videoipath_automation_tool.apps.inventory.model.inventory_device_configuration_compare import ( InventoryDeviceComparison, ) from videoipath_automation_tool.apps.inventory.model.inventory_discovered_device import DiscoveredInventoryDevice +from videoipath_automation_tool.connector.models.response_rpc import ResponseRPC from videoipath_automation_tool.connector.vip_connector import VideoIPathConnector from videoipath_automation_tool.validators.device_id import validate_device_id @@ -349,6 +351,83 @@ def parse_configuration(config: dict) -> InventoryDevice: """ return InventoryDevice.parse_configuration(config) + # --- Global SNMP Configuration Helpers --- + def get_global_snmp_config_id_by_label(self, label: str) -> Optional[str | List[str]]: + """Method to get the global SNMP configuration id by label. + Note: If multiple SNMP configurations with the same label exist, a list of ids is returned. + + Args: + label (str): Label of the SNMP configuration + + Returns: + Optional[str | List[str]]: SNMP configuration id, None if not found, List of ids if multiple configurations with the same label exist + """ + return self._inventory_api.get_global_snmp_config_id_by_label(label=label) + + def get_global_snmp_config_label_by_id(self, snmp_config_id: str) -> Optional[str]: + """Method to get the global SNMP configuration label by id. + + Args: + snmp_config_id (str): SNMP configuration id + + Returns: + Optional[str]: SNMP configuration label, None if not found + """ + return self._inventory_api.get_global_snmp_config_label_by_id(snmp_config_id=snmp_config_id) + + def get_all_global_snmp_config_ids(self) -> dict[str, str]: + """Method to list all global SNMP configuration ids with their labels. + + Returns: + dict: {snmp_config_id: snmp_config_label} + """ + return self._inventory_api.get_all_global_snmp_config_ids() + + # --- Global SNMP Configuration CRUD Methods --- + def get_global_snmp_config(self, snmp_config_id: str) -> SnmpConfiguration: + """Method to get a global SNMP configuration by id from VideoIPath-Inventory + + Args: + snmp_config_id (str): SNMP configuration id + + Returns: + GlobalSnmpConfig: Global SNMP configuration object + """ + return self._inventory_api.get_global_snmp_config(snmp_config_id=snmp_config_id) + + def add_global_snmp_config(self, snmp_config: SnmpConfiguration) -> SnmpConfiguration: + """Method to add a new global SNMP configuration + + Args: + snmp_config (SnmpConfiguration): SNMP configuration object to add + + Returns: + SnmpConfiguration: Added SNMP configuration object + """ + return self._inventory_api.add_global_snmp_config(snmp_config=snmp_config) + + def update_global_snmp_config(self, snmp_config: SnmpConfiguration) -> SnmpConfiguration: + """Method to update a global SNMP configuration + + Args: + snmp_config (SnmpConfiguration): SNMP configuration object to update + + Returns: + SnmpConfiguration: Updated SNMP configuration object + """ + return self._inventory_api.update_global_snmp_config(snmp_config=snmp_config) + + def remove_global_snmp_config(self, snmp_config_id: str) -> ResponseRPC: + """Method to remove a global SNMP configuration by id from VideoIPath-Inventory + + Args: + snmp_config_id (str): SNMP configuration id + + Returns: + ResponseRPC: Response object + """ + return self._inventory_api.remove_global_snmp_config(snmp_config_id=snmp_config_id) + # --- Deprecated Methods --- @deprecated( "This method is deprecated and will be removed in future versions.", diff --git a/src/videoipath_automation_tool/apps/inventory/inventory_api.py b/src/videoipath_automation_tool/apps/inventory/inventory_api.py index 5881536..caaccc3 100644 --- a/src/videoipath_automation_tool/apps/inventory/inventory_api.py +++ b/src/videoipath_automation_tool/apps/inventory/inventory_api.py @@ -13,6 +13,8 @@ ) from videoipath_automation_tool.apps.inventory.model.device_status import DeviceStatus from videoipath_automation_tool.apps.inventory.model.drivers import CustomSettingsType, DriverLiteral +from videoipath_automation_tool.apps.inventory.model.global_snmp_config import SnmpConfiguration +from videoipath_automation_tool.apps.inventory.model.global_snmp_request_rpc import SnmpRequestRpc from videoipath_automation_tool.apps.inventory.model.inventory_device import InventoryDevice from videoipath_automation_tool.apps.inventory.model.inventory_discovered_device import DiscoveredInventoryDevice from videoipath_automation_tool.apps.inventory.model.inventory_request_rpc import InventoryRequestRpc @@ -664,6 +666,169 @@ def get_discovered_device(self, discovered_device_id: str) -> DiscoveredInventor response.data["status"]["devman"]["discoveredDevices"]["_items"][0] ) + # --- Global SNMP Configuration Helpers --- + def get_global_snmp_config_id_by_label(self, label: str) -> Optional[str | List[str]]: + """Method to get the global SNMP configuration id by label. + Note: If multiple SNMP configurations with the same label exist, a list of ids is returned. + + Args: + label (str): Label of the SNMP configuration + + Returns: + Optional[str | List[str]]: SNMP configuration id, None if not found, List of ids if multiple configurations with the same label exist + """ + if not label: + raise ValueError("Label must not be empty.") + + escaped_label = urllib.parse.quote(label, safe="") + url = f"/rest/v2/data/config/system/snmp/*/* where descriptor.label='{escaped_label}' /*" + response = self.vip_connector.rest.get(url) + if response.data and response.data["config"]["system"]["snmp"]["session"]: + matches = response.data["config"]["system"]["snmp"]["session"] + if len(matches) == 1: + return list(matches.keys())[0] + elif len(matches) > 1: + self._logger.warning( + f"Multiple SNMP configurations found with label '{label}''. Returning all matching ids." + ) + return list(matches.keys()) + return None + + def get_global_snmp_config_label_by_id(self, snmp_config_id: str) -> Optional[str]: + """Method to get the global SNMP configuration label by id. + + Args: + snmp_config_id (str): SNMP configuration id + + Returns: + Optional[str]: SNMP configuration label, None if not found + """ + if not snmp_config_id: + raise ValueError("SNMP configuration id must not be empty.") + + url = f"/rest/v2/data/config/system/snmp/session/{snmp_config_id}/descriptor/label" + response = self.vip_connector.rest.get(url, node_check=False) + if response.data and response.data["config"]["system"]["snmp"]["session"]: + if snmp_config_id in response.data["config"]["system"]["snmp"]["session"]: + return response.data["config"]["system"]["snmp"]["session"][snmp_config_id]["descriptor"]["label"] + return None + + def get_all_global_snmp_config_ids(self) -> dict[str, str]: + """Method to list all global SNMP configuration ids with their labels. + + Returns: + dict: {snmp_config_id: snmp_config_label} + """ + url = "/rest/v2/data/config/system/snmp/*/*/descriptor/label" + response = self.vip_connector.rest.get(url) + if not response.data: + raise ValueError("Response data is empty.") + + snmp_configs = response.data["config"]["system"]["snmp"]["session"] + return { + snmp_config_id: snmp_config["descriptor"]["label"] for snmp_config_id, snmp_config in snmp_configs.items() + } + + # --- Global SNMP Configuration CRUD Methods --- + def get_global_snmp_config(self, snmp_config_id: str) -> SnmpConfiguration: + """Method to get a global SNMP configuration by id from VideoIPath-Inventory + + Args: + snmp_config_id (str): SNMP configuration id + + Returns: + GlobalSnmpConfig: Global SNMP configuration object + """ + if not snmp_config_id: + raise ValueError("SNMP configuration id must not be empty.") + + url = f"/rest/v2/data/config/system/snmp/session/{snmp_config_id}/**" + response = self.vip_connector.rest.get(url) + if not response.data: + raise ValueError("Response data is empty.") + + return SnmpConfiguration.parse_from_dict(response.data["config"]["system"]["snmp"]["session"]) + + def add_global_snmp_config(self, snmp_config: SnmpConfiguration) -> SnmpConfiguration: + """Method to add a new global SNMP configuration + + Args: + snmp_config (SnmpConfiguration): SNMP configuration object to add + + Returns: + SnmpConfiguration: Added SNMP configuration object + """ + if not snmp_config.id: + raise ValueError("SNMP configuration id must be set.") + + self._logger.debug(f"Adding new global SNMP configuration with id '{snmp_config.id}'.") + + existing_configs_label = self.get_global_snmp_config_label_by_id(snmp_config.id) + if existing_configs_label is not None: + raise ValueError(f"SNMP configuration with id '{snmp_config.id}' already exists. Please update it instead.") + + body = SnmpRequestRpc() + body.add(snmp_config) + + response = self.vip_connector.rpc.post("/api/updateSnmpConfig", body=body) + + if response.header.status != "OK": + raise ValueError(f"Failed to add global SNMP configuration. Error: {response}") + + return self.get_global_snmp_config(snmp_config_id=snmp_config.id) + + def update_global_snmp_config(self, snmp_config: SnmpConfiguration) -> SnmpConfiguration: + """Method to update a global SNMP configuration + + Args: + snmp_config (SnmpConfiguration): SNMP configuration object to update + + Returns: + SnmpConfiguration: Updated SNMP configuration object + """ + if not snmp_config.id: + raise ValueError("SNMP configuration id must be set.") + + self._logger.debug(f"Updating global SNMP configuration with id '{snmp_config.id}'.") + + existing_configs_label = self.get_global_snmp_config_label_by_id(snmp_config.id) + if existing_configs_label is None: + raise ValueError(f"SNMP configuration with id '{snmp_config.id}' does not exist. Please add it first.") + + body = SnmpRequestRpc() + body.update(snmp_config) + + response = self.vip_connector.rpc.post("/api/updateSnmpConfig", body=body) + + if response.header.status != "OK": + raise ValueError(f"Failed to update global SNMP configuration. Error: {response}") + + return self.get_global_snmp_config(snmp_config_id=snmp_config.id) + + def remove_global_snmp_config(self, snmp_config_id: str) -> ResponseRPC: + """Method to remove a global SNMP configuration by id from VideoIPath-Inventory + + Args: + snmp_config_id (str): SNMP configuration id + + Returns: + ResponseRPC: Response object + """ + if not snmp_config_id: + raise ValueError("SNMP configuration id must be set.") + + self._logger.debug(f"Removing global SNMP configuration with id '{snmp_config_id}'.") + + body = SnmpRequestRpc() + body.remove(snmp_config_id) + + response = self.vip_connector.rpc.post("/api/updateSnmpConfig", body=body) + + if response.header.status != "OK": + raise ValueError(f"Failed to remove global SNMP configuration. Error: {response}") + + return response + # --- Deprecated Methods --- @deprecated( "The method `fetch_device_ids_list` is deprecated and will be removed in a future release. ", diff --git a/src/videoipath_automation_tool/apps/inventory/model/global_snmp_config.py b/src/videoipath_automation_tool/apps/inventory/model/global_snmp_config.py new file mode 100644 index 0000000..488746d --- /dev/null +++ b/src/videoipath_automation_tool/apps/inventory/model/global_snmp_config.py @@ -0,0 +1,289 @@ +from enum import Enum +from typing import Literal, cast +from uuid import uuid4 + +from pydantic import BaseModel, Field, field_validator + + +def _validate_engine_id_format(engine_id: str) -> str: + """ + Basic format check for SNMPv3 Engine ID. + - Allows empty string + - Must be valid hex string (after cleaning) + - Length: 5–32 bytes (10–64 hex digits) + - No RFC compliance guaranteed + """ + if not engine_id: + return engine_id # allowed for v1/v2c + + cleaned = engine_id.replace(":", "").replace(" ", "").lower() + + if len(cleaned) % 2 != 0: + raise ValueError("Engine ID must contain full bytes (even number of hex digits)") + + try: + raw = bytes.fromhex(cleaned) + except ValueError: + raise ValueError("Engine ID must be a valid hex string") + + if not (5 <= len(raw) <= 32): + raise ValueError("Engine ID must be 5–32 bytes long") + + return cleaned + + +# --- User Enum Classes --- +class SecurityLevel(int, Enum): + UNDEFINED = 0 + """Undefined security level.""" + NO_AUTH_NO_PRIV = 1 + """NoAuthNoPriv | Without authentication and without privacy.""" + AUTH_NO_PRIV = 2 + """AuthNoPriv | With authentication but without privacy.""" + AUTH_PRIV = 3 + """AuthPriv | With authentication and with privacy.""" + + +class AuthProtocol(int, Enum): + MD5 = 2 + """MD5 | HMAC-MD5-96 digest authentication protocol.""" + SHA = 3 + """SHA | HMAC-SHA-96 digest authentication protocol.""" + + +class PrivProtocol(int, Enum): + DES = 2 + """DES | CBC-DES symmetric encryption protocol.""" + THREE_DES = 3 + """3DES | 3DES-EDE symmetric encryption protocol.""" + AES128 = 4 + """AES127 | CFB128-AES-128 privacy protocol.""" + + +# --- Version Enum Class --- +class SnmpVersion(int, Enum): + V1 = 0 + """SNMP version 1.""" + V2C = 1 + """SNMP version 2 with community security.""" + V3 = 3 + """SNMP version 3.""" + + +# --- Data Model Classes --- +class SnmpUser(BaseModel, validate_assignment=True): + level: SecurityLevel = SecurityLevel.NO_AUTH_NO_PRIV + name: str = "New User" + authProtocol: AuthProtocol = AuthProtocol.MD5 + privProtocol: PrivProtocol = PrivProtocol.DES + engineId: str = "" + privPassword: str = "" + authPassword: str = "" + + @field_validator("engineId") + def validate_engine_id(cls, value: str) -> str: + """ + Validates the SNMPv3 Engine ID format. + - Allows empty string + - Must be valid hex string (after cleaning) + - Length: 5–32 bytes (10–64 hex digits) + """ + return _validate_engine_id_format(value) + + +class SnmpDescriptor(BaseModel, validate_assignment=True): + label: str = "" + desc: str = "" + + +class SnmpSecurityEntry(BaseModel, validate_assignment=True): + user: str = "" # User ID from "Users" section. Must be a valid UUID of an existing user. + community: str = "" # Value from "Protocol Settings => SNMP v1/v2c Security => Write / Read community" + + +class SnmpSecurity(BaseModel, validate_assignment=True): + read: SnmpSecurityEntry = SnmpSecurityEntry(community="public") + write: SnmpSecurityEntry = SnmpSecurityEntry(community="private") + + +class SnmpProtocolSettings(BaseModel, validate_assignment=True): + preferredVersion: SnmpVersion = SnmpVersion.V2C + retries: int = Field(default=1, ge=0) + maxRepetitions: int = Field(default=10, ge=0) + useGetBulk: bool = True + timeout: int = Field(default=5000, ge=0) + localEngineId: str = "" + + @field_validator("localEngineId") + @classmethod + def validate_engine_id(cls, value: str) -> str: + return _validate_engine_id_format(value) + + +class SnmpConfiguration(BaseModel, validate_assignment=True): + id: str = Field(alias="_id") + descriptor: SnmpDescriptor = Field(default_factory=SnmpDescriptor) + users: dict[str, SnmpUser] = Field(default_factory=dict) + security: SnmpSecurity = Field(default_factory=SnmpSecurity) + protocol: SnmpProtocolSettings = Field(default_factory=SnmpProtocolSettings) + + @classmethod + def create(cls): + """ + Creates a new instance of SnmpConfiguration with default values. + + Returns: + SnmpConfiguration: A new instance of SnmpConfiguration. + """ + config_id = str(uuid4()) + return cls( + _id=config_id, + descriptor=SnmpDescriptor(label="New SNMP Configuration", desc=""), + ) + + @classmethod + def parse_from_dict(cls, data: dict) -> "SnmpConfiguration": + """ + Parses a dictionary into a SnmpConfiguration instance. + + Args: + data (dict): The dictionary to parse. + + Returns: + SnmpConfiguration: An instance of SnmpConfiguration. + """ + if len(data.keys()) == 1: + config_id = list(data.keys())[0] + data = data[config_id] + data["_id"] = config_id + else: + raise ValueError("Data dictionary must contain exactly one key/value pair: : ") + return cls(**data) + + # --- Getters and Setters --- + @property + def label(self) -> str: + """Label of the SNMP configuration.""" + return self.descriptor.label + + @label.setter + def label(self, value: str): + """Sets the label of the SNMP configuration.""" + self.descriptor.label = value + + @property + def description(self) -> str: + """Description of the SNMP configuration.""" + return self.descriptor.desc + + @description.setter + def description(self, value: str): + """Sets the description of the SNMP configuration.""" + self.descriptor.desc = value + + @property + def version(self) -> Literal["SNMP v1", "SNMP v2c", "SNMP v3"]: + """Preferred SNMP version.""" + version_map = {SnmpVersion.V1: "SNMP v1", SnmpVersion.V2C: "SNMP v2c", SnmpVersion.V3: "SNMP v3"} + return cast(Literal["SNMP v1", "SNMP v2c", "SNMP v3"], version_map[self.protocol.preferredVersion]) + + @version.setter + def version(self, value: Literal["SNMP v1", "SNMP v2c", "SNMP v3"]): + """Sets the preferred SNMP version.""" + version_map = {"SNMP v1": SnmpVersion.V1, "SNMP v2c": SnmpVersion.V2C, "SNMP v3": SnmpVersion.V3} + if value not in version_map: + raise ValueError(f"Invalid SNMP version: {value}") + self.protocol.preferredVersion = version_map[value] + + @property + def retries(self) -> int: + """Retries""" + return self.protocol.retries + + @retries.setter + def retries(self, value: int): + """Sets the number of retries for SNMP requests.""" + self.protocol.retries = value + + @property + def timeout(self) -> int: + """Timeout in milliseconds.""" + return self.protocol.timeout + + @timeout.setter + def timeout(self, value: int): + """Sets the timeout for SNMP requests in milliseconds.""" + self.protocol.timeout = value + + @property + def local_engine_id(self) -> str: + """Local Engine ID""" + return self.protocol.localEngineId + + @local_engine_id.setter + def local_engine_id(self, value: str): + """Sets the local engine ID for SNMP.""" + self.protocol.localEngineId = value + + @property + def use_get_bulk(self) -> bool: + """Use GetBulk""" + return self.protocol.useGetBulk + + @use_get_bulk.setter + def use_get_bulk(self, value: bool): + """Sets whether to use GetBulk for SNMP requests.""" + self.protocol.useGetBulk = value + + @property + def max_repetitions(self) -> int: + """Max Repetitions""" + return self.protocol.maxRepetitions + + @max_repetitions.setter + def max_repetitions(self, value: int): + """Sets the maximum number of repetitions for SNMP requests.""" + self.protocol.maxRepetitions = value + + @property + def read_community(self) -> str: + """Read Community""" + return self.security.read.community + + @read_community.setter + def read_community(self, value: str): + """Sets the read community for SNMP.""" + self.security.read.community = value + + @property + def write_community(self) -> str: + """Write Community""" + return self.security.write.community + + @write_community.setter + def write_community(self, value: str): + """Sets the write community for SNMP.""" + self.security.write.community = value + + def list_usernames(self) -> list[str]: + """Returns a list of usernames in the SNMP configuration.""" + return list(self.users.keys()) + + def get_user_id_by_username(self, username: str) -> str: + """Returns the user ID for a given username.""" + user_ids = [user_id for user_id, user in self.users.items() if user.name == username] + if not user_ids: + raise ValueError(f"No user found with username: {username}") + if len(user_ids) > 1: + raise ValueError(f"Multiple users found with username: {username}. Please specify a unique user.") + return user_ids[0] + + def set_read_user_by_username(self, username: str): + """Sets the read user by username.""" + user_id = self.get_user_id_by_username(username) + self.security.read.user = user_id + + def set_write_user_by_username(self, username: str): + """Sets the write user by username.""" + user_id = self.get_user_id_by_username(username) + self.security.write.user = user_id diff --git a/src/videoipath_automation_tool/apps/inventory/model/global_snmp_request_rpc.py b/src/videoipath_automation_tool/apps/inventory/model/global_snmp_request_rpc.py new file mode 100644 index 0000000..3601795 --- /dev/null +++ b/src/videoipath_automation_tool/apps/inventory/model/global_snmp_request_rpc.py @@ -0,0 +1,51 @@ +from videoipath_automation_tool.apps.inventory.model.global_snmp_config import SnmpConfiguration +from videoipath_automation_tool.connector.models.request_rpc import RequestRPC +from videoipath_automation_tool.validators.uuid_4 import validate_uuid_4 + + +class SnmpRequestRpc(RequestRPC): + # Wrapper class for RequestRpc + + def add(self, config: SnmpConfiguration): + """Method to add a new global SNMP configuration + + Args: + config (SnmpConfiguration): SNMP configuration to add + """ + try: + validate_uuid_4(config.id) + except ValueError as e: + raise ValueError( + f"To add a new global SNMP configuration, a valid 'id' (UUID 4 format) must be set in the configuration. Error: {e}" + ) + return super().add(config.id, config) + + def update(self, config: SnmpConfiguration): + """Method to update a global SNMP configuration + + Args: + config (SnmpConfiguration): SNMP configuration to update + """ + try: + validate_uuid_4(config.id) + except ValueError as e: + raise ValueError( + f"To update a global SNMP configuration, a valid 'id' (UUID 4 format) must be set in the configuration. Error: {e}" + ) + return super().update(config.id, config) + + def remove(self, config_id: str | list[str]): + """Method to remove a global SNMP configuration + + Args: + config_id (str | list[str]): Id or List of Ids of the configuration + """ + try: + if isinstance(config_id, list): + for c_id in config_id: + validate_uuid_4(c_id) + else: + validate_uuid_4(config_id) + except ValueError as e: + raise ValueError(f"Invalid 'config_id' format. Must be a valid UUID 4 format. Error: {e}") + return super().remove(config_id) diff --git a/src/videoipath_automation_tool/apps/inventory/model/inventory_device_configuration.py b/src/videoipath_automation_tool/apps/inventory/model/inventory_device_configuration.py index 62016de..f43f15d 100644 --- a/src/videoipath_automation_tool/apps/inventory/model/inventory_device_configuration.py +++ b/src/videoipath_automation_tool/apps/inventory/model/inventory_device_configuration.py @@ -4,6 +4,7 @@ from typing_extensions import deprecated from videoipath_automation_tool.apps.inventory.model.drivers import CustomSettingsType +from videoipath_automation_tool.validators.uuid_4 import validate_uuid_4 class CinfoOverridesSNMP(BaseModel, validate_assignment=True): @@ -180,6 +181,55 @@ def metadata(self): def metadata(self, value): self.meta = value + @property + def use_global_snmp_settings(self) -> bool: + """Use (activate) global SNMP settings.""" + return self.cinfoOverrides.snmp.useDefault + + @use_global_snmp_settings.setter + def use_global_snmp_settings(self, value: bool): + """Use (activate) global SNMP settings.""" + self.cinfoOverrides.snmp.useDefault = value + + def get_global_snmp_setting_id(self) -> str: + """ + Returns the ID of the global SNMP setting currently in use. + + Returns: + str: The ID of the global SNMP setting. + - "default" if the system's default configuration is used. + - Otherwise, returns the UUID of the configured global SNMP setting. + Note: Retrieve the Label of the global SNMP setting using: + `app.inventory.get_global_snmp_config_label_by_id(id)` + """ + return self.cinfoOverrides.snmp.id + + def set_global_snmp_setting(self, setting_id: str, activate: bool = True): + """ + Sets the global SNMP setting by ID. + + Args: + setting_id (str): The ID of the global SNMP setting to use. + - Use "default" to apply the system's default configuration. + - For other configurations, retrieve the ID by label using: + `app.inventory.get_global_snmp_config_id_by_label(label)` + + activate (bool): Whether to activate the global SNMP settings (`use_global_snmp_settings = True`). + Defaults to True. Set to False if you only want to store the ID without activating it yet. + + Raises: + ValueError: If the setting ID is empty or not a valid UUID (unless "default"). + """ + if not setting_id: + raise ValueError("Setting ID cannot be empty.") + if setting_id != "default": + setting_id = validate_uuid_4(setting_id) + + self.cinfoOverrides.snmp.id = setting_id + + if activate: + self.cinfoOverrides.snmp.useDefault = True + # --- Deprecated properties --- @property @deprecated("The property `custom` is deprecated, use `custom_settings` instead.") diff --git a/src/videoipath_automation_tool/connector/vip_rpc_connector.py b/src/videoipath_automation_tool/connector/vip_rpc_connector.py index 94c7cb0..5043d58 100644 --- a/src/videoipath_automation_tool/connector/vip_rpc_connector.py +++ b/src/videoipath_automation_tool/connector/vip_rpc_connector.py @@ -15,6 +15,7 @@ class VideoIPathRPCConnector(VideoIPathBaseConnector): "/api/uploadLicense", "/api/activateLicense", "/api/deactivateLicense", + "/api/updateSnmpConfig", } def post(self, url_path: str, body: RequestRPC, url_validation: bool = True) -> ResponseRPC: diff --git a/src/videoipath_automation_tool/validators/uuid_4.py b/src/videoipath_automation_tool/validators/uuid_4.py new file mode 100644 index 0000000..19df1d9 --- /dev/null +++ b/src/videoipath_automation_tool/validators/uuid_4.py @@ -0,0 +1,28 @@ +import uuid + + +def validate_uuid_4(uuid_4: str) -> str: + """ + Validates and normalizes a UUIDv4 string. + + Args: + uuid_4: The input string to validate. + + Returns: + The normalized UUIDv4 string (lowercase, with dashes). + + Raises: + ValueError: If input is not a valid UUIDv4. + """ + if not isinstance(uuid_4, str): + raise ValueError(f"UUID must be a string, got {type(uuid_4).__name__}") + + try: + u = uuid.UUID(uuid_4) + except (ValueError, AttributeError, TypeError): + raise ValueError(f"Invalid UUID format: '{uuid_4}'") + + if u.version != 4: + raise ValueError(f"UUID is not version 4: '{uuid_4}'") + + return str(u) diff --git a/tests/validators/test_uuid_4_validator.py b/tests/validators/test_uuid_4_validator.py new file mode 100644 index 0000000..488135d --- /dev/null +++ b/tests/validators/test_uuid_4_validator.py @@ -0,0 +1,45 @@ +import uuid + +import pytest + +from videoipath_automation_tool.validators.uuid_4 import validate_uuid_4 + + +class TestValidateUUID4: + @pytest.mark.parametrize( + "uuid_str", + [ + str(uuid.uuid4()), # random, valid UUIDv4 + "550e8400-e29b-41d4-a716-446655440000", # valid v4-UUID + "00000000-0000-4000-8000-000000000000", # minimal v4-UUID + "ffffffff-ffff-4fff-bfff-ffffffffffff", # maximal v4-UUID + "550e8400e29b41d4a716446655440000", # valid, no dashes + "550E8400-E29B-41D4-A716-446655440000", # valid, uppercase + ], + ) + def test_valid_uuid_4(self, uuid_str): + normalized = validate_uuid_4(uuid_str) + assert isinstance(normalized, str) + # Standardize to lowercase with dashes + assert uuid.UUID(normalized).version == 4 + + @pytest.mark.parametrize( + "uuid_str", + [ + "550e8400-e29b-11d4-a716-446655440000", # v1 instead of v4 + "550e8400-e29b-21d4-a716-446655440000", # v2 instead of v4 + "550e8400-e29b-51d4-a716-446655440000", # v5 + "this-is-not-a-uuid", + "12345678-1234-1234-1234-1234567890ab", + "zzzzzzzz-zzzz-4zzz-8zzz-zzzzzzzzzzzz", + "", + None, + 1234, + [], + {}, + object(), + ], + ) + def test_invalid_uuid_4(self, uuid_str): + with pytest.raises(ValueError): + validate_uuid_4(uuid_str)