Skip to content

Commit 8949256

Browse files
Implement api methods for global SNMP configuration management (CRUD operations), SNMP config Pydantic Model, uuid validation and test
1 parent 83b107f commit 8949256

6 files changed

Lines changed: 371 additions & 9 deletions

File tree

src/videoipath_automation_tool/apps/inventory/inventory_api.py

Lines changed: 112 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
)
1414
from videoipath_automation_tool.apps.inventory.model.device_status import DeviceStatus
1515
from videoipath_automation_tool.apps.inventory.model.drivers import CustomSettingsType, DriverLiteral
16+
from videoipath_automation_tool.apps.inventory.model.global_snmp_config import SnmpConfiguration
17+
from videoipath_automation_tool.apps.inventory.model.global_snmp_request_rpc import SnmpRequestRpc
1618
from videoipath_automation_tool.apps.inventory.model.inventory_device import InventoryDevice
1719
from videoipath_automation_tool.apps.inventory.model.inventory_discovered_device import DiscoveredInventoryDevice
1820
from videoipath_automation_tool.apps.inventory.model.inventory_request_rpc import InventoryRequestRpc
@@ -664,16 +666,16 @@ def get_discovered_device(self, discovered_device_id: str) -> DiscoveredInventor
664666
response.data["status"]["devman"]["discoveredDevices"]["_items"][0]
665667
)
666668

667-
# --- Global Configuration Helpers ---
668-
def get_global_snmp_config_id_by_label(self, label: str) -> Optional[str]:
669+
# --- Global SNMP Configuration Helpers ---
670+
def get_global_snmp_config_id_by_label(self, label: str) -> Optional[str | List[str]]:
669671
"""Method to get the global SNMP configuration id by label.
670-
Note: If multiple SNMP configurations with the same label exist, the first one is returned.
672+
Note: If multiple SNMP configurations with the same label exist, a list of ids is returned.
671673
672674
Args:
673675
label (str): Label of the SNMP configuration
674676
675677
Returns:
676-
Optional[str]: SNMP configuration id, None if not found
678+
Optional[str | List[str]]: SNMP configuration id, None if not found, List of ids if multiple configurations with the same label exist
677679
"""
678680
if not label:
679681
raise ValueError("Label must not be empty.")
@@ -687,9 +689,9 @@ def get_global_snmp_config_id_by_label(self, label: str) -> Optional[str]:
687689
return list(matches.keys())[0]
688690
elif len(matches) > 1:
689691
self._logger.warning(
690-
f"Multiple SNMP configurations found with label '{label}'. Returning the first one."
692+
f"Multiple SNMP configurations found with label '{label}''. Returning all matching ids."
691693
)
692-
return list(matches.keys())[0]
694+
return list(matches.keys())
693695
return None
694696

695697
def get_global_snmp_config_label_by_id(self, snmp_config_id: str) -> Optional[str]:
@@ -705,9 +707,10 @@ def get_global_snmp_config_label_by_id(self, snmp_config_id: str) -> Optional[st
705707
raise ValueError("SNMP configuration id must not be empty.")
706708

707709
url = f"/rest/v2/data/config/system/snmp/session/{snmp_config_id}/descriptor/label"
708-
response = self.vip_connector.rest.get(url)
709-
if response.data and response.data["config"]["system"]["snmp"]["session"][snmp_config_id]:
710-
return response.data["config"]["system"]["snmp"]["session"][snmp_config_id]["descriptor"]["label"]
710+
response = self.vip_connector.rest.get(url, node_check=False)
711+
if response.data and response.data["config"]["system"]["snmp"]["session"]:
712+
if snmp_config_id in response.data["config"]["system"]["snmp"]["session"]:
713+
return response.data["config"]["system"]["snmp"]["session"][snmp_config_id]["descriptor"]["label"]
711714
return None
712715

713716
def get_all_global_snmp_config_ids(self) -> dict[str, str]:
@@ -726,6 +729,106 @@ def get_all_global_snmp_config_ids(self) -> dict[str, str]:
726729
snmp_config_id: snmp_config["descriptor"]["label"] for snmp_config_id, snmp_config in snmp_configs.items()
727730
}
728731

732+
# --- Global SNMP Configuration CRUD Methods ---
733+
def get_global_snmp_config(self, snmp_config_id: str) -> SnmpConfiguration:
734+
"""Method to get a global SNMP configuration by id from VideoIPath-Inventory
735+
736+
Args:
737+
snmp_config_id (str): SNMP configuration id
738+
739+
Returns:
740+
GlobalSnmpConfig: Global SNMP configuration object
741+
"""
742+
if not snmp_config_id:
743+
raise ValueError("SNMP configuration id must not be empty.")
744+
745+
url = f"/rest/v2/data/config/system/snmp/session/{snmp_config_id}/**"
746+
response = self.vip_connector.rest.get(url)
747+
if not response.data:
748+
raise ValueError("Response data is empty.")
749+
750+
return SnmpConfiguration.parse_from_dict(response.data["config"]["system"]["snmp"]["session"])
751+
752+
def add_global_snmp_config(self, snmp_config: SnmpConfiguration) -> SnmpConfiguration:
753+
"""Method to add a new global SNMP configuration
754+
755+
Args:
756+
snmp_config (SnmpConfiguration): SNMP configuration object to add
757+
758+
Returns:
759+
SnmpConfiguration: Added SNMP configuration object
760+
"""
761+
if not snmp_config.id:
762+
raise ValueError("SNMP configuration id must be set.")
763+
764+
self._logger.debug(f"Adding new global SNMP configuration with id '{snmp_config.id}'.")
765+
766+
existing_configs_label = self.get_global_snmp_config_label_by_id(snmp_config.id)
767+
if existing_configs_label is not None:
768+
raise ValueError(f"SNMP configuration with id '{snmp_config.id}' already exists. Please update it instead.")
769+
770+
body = SnmpRequestRpc()
771+
body.add(snmp_config)
772+
773+
response = self.vip_connector.rpc.post("/api/updateSnmpConfig", body=body)
774+
775+
if response.header.status != "OK":
776+
raise ValueError(f"Failed to add global SNMP configuration. Error: {response}")
777+
778+
return self.get_global_snmp_config(snmp_config_id=snmp_config.id)
779+
780+
def update_global_snmp_config(self, snmp_config: SnmpConfiguration) -> SnmpConfiguration:
781+
"""Method to update a global SNMP configuration
782+
783+
Args:
784+
snmp_config (SnmpConfiguration): SNMP configuration object to update
785+
786+
Returns:
787+
SnmpConfiguration: Updated SNMP configuration object
788+
"""
789+
if not snmp_config.id:
790+
raise ValueError("SNMP configuration id must be set.")
791+
792+
self._logger.debug(f"Updating global SNMP configuration with id '{snmp_config.id}'.")
793+
794+
existing_configs_label = self.get_global_snmp_config_label_by_id(snmp_config.id)
795+
if existing_configs_label is None:
796+
raise ValueError(f"SNMP configuration with id '{snmp_config.id}' does not exist. Please add it first.")
797+
798+
body = SnmpRequestRpc()
799+
body.update(snmp_config)
800+
801+
response = self.vip_connector.rpc.post("/api/updateSnmpConfig", body=body)
802+
803+
if response.header.status != "OK":
804+
raise ValueError(f"Failed to update global SNMP configuration. Error: {response}")
805+
806+
return self.get_global_snmp_config(snmp_config_id=snmp_config.id)
807+
808+
def remove_global_snmp_config(self, snmp_config_id: str) -> ResponseRPC:
809+
"""Method to remove a global SNMP configuration by id from VideoIPath-Inventory
810+
811+
Args:
812+
snmp_config_id (str): SNMP configuration id
813+
814+
Returns:
815+
ResponseRPC: Response object
816+
"""
817+
if not snmp_config_id:
818+
raise ValueError("SNMP configuration id must be set.")
819+
820+
self._logger.debug(f"Removing global SNMP configuration with id '{snmp_config_id}'.")
821+
822+
body = SnmpRequestRpc()
823+
body.remove(snmp_config_id)
824+
825+
response = self.vip_connector.rpc.post("/api/updateSnmpConfig", body=body)
826+
827+
if response.header.status != "OK":
828+
raise ValueError(f"Failed to remove global SNMP configuration. Error: {response}")
829+
830+
return response
831+
729832
# --- Deprecated Methods ---
730833
@deprecated(
731834
"The method `fetch_device_ids_list` is deprecated and will be removed in a future release. ",
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
from enum import Enum
2+
from uuid import uuid4
3+
4+
from pydantic import BaseModel, Field
5+
6+
7+
# --- User Enum Classes ---
8+
class SecurityLevel(int, Enum):
9+
UNDEFINED = 0
10+
"""Undefined security level."""
11+
NO_AUTH_NO_PRIV = 1
12+
"""NoAuthNoPriv | Without authentication and without privacy."""
13+
AUTH_NO_PRIV = 2
14+
"""AuthNoPriv | With authentication but without privacy."""
15+
AUTH_PRIV = 3
16+
"""AuthPriv | With authentication and with privacy."""
17+
18+
19+
class AuthProtocol(int, Enum):
20+
MD5 = 2
21+
"""MD5 | HMAC-MD5-96 digest authentication protocol."""
22+
SHA = 3
23+
"""SHA | HMAC-SHA-96 digest authentication protocol."""
24+
25+
26+
class PrivProtocol(int, Enum):
27+
DES = 2
28+
"""DES | CBC-DES symmetric encryption protocol."""
29+
THREE_DES = 3
30+
"""3DES | 3DES-EDE symmetric encryption protocol."""
31+
AES128 = 4
32+
"""AES127 | CFB128-AES-128 privacy protocol."""
33+
34+
35+
# --- Version Enum Class ---
36+
class SnmpVersion(int, Enum):
37+
V1 = 0
38+
"""SNMP version 1."""
39+
V2C = 1
40+
"""SNMP version 2 with community security."""
41+
V3 = 3
42+
"""SNMP version 3."""
43+
44+
45+
# --- Data Model Classes ---
46+
class SnmpUser(BaseModel):
47+
level: SecurityLevel = SecurityLevel.NO_AUTH_NO_PRIV
48+
name: str = "New User"
49+
authProtocol: AuthProtocol = AuthProtocol.MD5
50+
privProtocol: PrivProtocol = PrivProtocol.DES
51+
engineId: str = ""
52+
privPassword: str = ""
53+
authPassword: str = ""
54+
55+
56+
class SnmpDescriptor(BaseModel):
57+
label: str = ""
58+
desc: str = ""
59+
60+
61+
class SnmpSecurityEntry(BaseModel):
62+
user: str = "" # User ID from "Users" section. Must be a valid UUID of an existing user.
63+
community: str = "" # Value from "Protocol Settings => SNMP v1/v2c Security => Write / Read community"
64+
65+
66+
class SnmpSecurity(BaseModel):
67+
read: SnmpSecurityEntry = SnmpSecurityEntry(community="public")
68+
write: SnmpSecurityEntry = SnmpSecurityEntry(community="private")
69+
70+
71+
class SnmpProtocolSettings(BaseModel):
72+
preferredVersion: SnmpVersion = SnmpVersion.V2C
73+
retries: int = 1
74+
maxRepetitions: int = 10
75+
useGetBulk: bool = True
76+
timeout: int = 5000
77+
localEngineId: str = ""
78+
79+
80+
class SnmpConfiguration(BaseModel):
81+
id: str = Field(alias="_id")
82+
descriptor: SnmpDescriptor = Field(default_factory=SnmpDescriptor)
83+
users: dict[str, SnmpUser] = Field(default_factory=dict)
84+
security: SnmpSecurity = Field(default_factory=SnmpSecurity)
85+
protocol: SnmpProtocolSettings = Field(default_factory=SnmpProtocolSettings)
86+
87+
@classmethod
88+
def create(cls):
89+
"""
90+
Creates a new instance of SnmpConfiguration with default values.
91+
92+
Returns:
93+
SnmpConfiguration: A new instance of SnmpConfiguration.
94+
"""
95+
config_id = str(uuid4())
96+
return cls(
97+
_id=config_id,
98+
descriptor=SnmpDescriptor(label="New SNMP Configuration", desc=""),
99+
)
100+
101+
@classmethod
102+
def parse_from_dict(cls, data: dict) -> "SnmpConfiguration":
103+
"""
104+
Parses a dictionary into a SnmpConfiguration instance.
105+
106+
Args:
107+
data (dict): The dictionary to parse.
108+
109+
Returns:
110+
SnmpConfiguration: An instance of SnmpConfiguration.
111+
"""
112+
if len(data.keys()) == 1:
113+
config_id = list(data.keys())[0]
114+
data = data[config_id]
115+
data["_id"] = config_id
116+
else:
117+
raise ValueError("Data dictionary must contain exactly one key/value pair: <id>: <configuration>")
118+
return cls(**data)
119+
120+
# def dump_to_dict(self) -> dict:
121+
# """
122+
# Dumps the SnmpConfiguration instance to a dictionary.
123+
124+
# Returns:
125+
# dict: A dictionary representation of the SnmpConfiguration instance.
126+
# """
127+
# config_id = self.id
128+
# data = self.model_dump(mode="json", exclude={"id"})
129+
# return {config_id: data}
130+
131+
132+
# class SnmpConfig(BaseModel):
133+
# id: str
134+
# configuration: SnmpConfiguration = Field(default_factory=SnmpConfiguration)
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
from videoipath_automation_tool.apps.inventory.model.global_snmp_config import SnmpConfiguration
2+
from videoipath_automation_tool.connector.models.request_rpc import RequestRPC
3+
from videoipath_automation_tool.validators.uuid_4 import validate_uuid_4
4+
5+
6+
class SnmpRequestRpc(RequestRPC):
7+
# Wrapper class for RequestRpc
8+
9+
def add(self, config: SnmpConfiguration):
10+
"""Method to add a new global SNMP configuration
11+
12+
Args:
13+
config (SnmpConfiguration): SNMP configuration to add
14+
"""
15+
try:
16+
validate_uuid_4(config.id)
17+
except ValueError as e:
18+
raise ValueError(
19+
f"To add a new global SNMP configuration, a valid 'id' (UUID 4 format) must be set in the configuration. Error: {e}"
20+
)
21+
return super().add(config.id, config)
22+
23+
def update(self, config: SnmpConfiguration):
24+
"""Method to update a global SNMP configuration
25+
26+
Args:
27+
config (SnmpConfiguration): SNMP configuration to update
28+
"""
29+
try:
30+
validate_uuid_4(config.id)
31+
except ValueError as e:
32+
raise ValueError(
33+
f"To update a global SNMP configuration, a valid 'id' (UUID 4 format) must be set in the configuration. Error: {e}"
34+
)
35+
return super().update(config.id, config)
36+
37+
def remove(self, config_id: str | list[str]):
38+
"""Method to remove a global SNMP configuration
39+
40+
Args:
41+
config_id (str | list[str]): Id or List of Ids of the configuration
42+
"""
43+
try:
44+
if isinstance(config_id, list):
45+
for c_id in config_id:
46+
validate_uuid_4(c_id)
47+
else:
48+
validate_uuid_4(config_id)
49+
except ValueError as e:
50+
raise ValueError(f"Invalid 'config_id' format. Must be a valid UUID 4 format. Error: {e}")
51+
return super().remove(config_id)

src/videoipath_automation_tool/connector/vip_rpc_connector.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class VideoIPathRPCConnector(VideoIPathBaseConnector):
1515
"/api/uploadLicense",
1616
"/api/activateLicense",
1717
"/api/deactivateLicense",
18+
"/api/updateSnmpConfig",
1819
}
1920

2021
def post(self, url_path: str, body: RequestRPC, url_validation: bool = True) -> ResponseRPC:
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import uuid
2+
3+
4+
def validate_uuid_4(uuid_4: str) -> str:
5+
"""
6+
Validates and normalizes a UUIDv4 string.
7+
8+
Args:
9+
uuid_4: The input string to validate.
10+
11+
Returns:
12+
The normalized UUIDv4 string (lowercase, with dashes).
13+
14+
Raises:
15+
ValueError: If input is not a valid UUIDv4.
16+
"""
17+
if not isinstance(uuid_4, str):
18+
raise ValueError(f"UUID must be a string, got {type(uuid_4).__name__}")
19+
20+
try:
21+
u = uuid.UUID(uuid_4)
22+
except (ValueError, AttributeError, TypeError):
23+
raise ValueError(f"Invalid UUID format: '{uuid_4}'")
24+
25+
if u.version != 4:
26+
raise ValueError(f"UUID is not version 4: '{uuid_4}'")
27+
28+
return str(u)

0 commit comments

Comments
 (0)