|
1 | 1 | from enum import Enum |
| 2 | +from typing import Literal, cast |
2 | 3 | from uuid import uuid4 |
3 | 4 |
|
4 | 5 | from pydantic import BaseModel, Field |
@@ -117,18 +118,144 @@ def parse_from_dict(cls, data: dict) -> "SnmpConfiguration": |
117 | 118 | raise ValueError("Data dictionary must contain exactly one key/value pair: <id>: <configuration>") |
118 | 119 | return cls(**data) |
119 | 120 |
|
120 | | - # def dump_to_dict(self) -> dict: |
121 | | - # """ |
122 | | - # Dumps the SnmpConfiguration instance to a dictionary. |
| 121 | + # --- Getters and Setters --- |
| 122 | + @property |
| 123 | + def label(self) -> str: |
| 124 | + """Label of the SNMP configuration.""" |
| 125 | + return self.descriptor.label |
123 | 126 |
|
124 | | - # Returns: |
125 | | - # dict: A dictionary representation of the SnmpConfiguration instance. |
126 | | - # """ |
127 | | - # config_id = self.id |
128 | | - # data = self.model_dump(mode="json", exclude={"id"}) |
129 | | - # return {config_id: data} |
| 127 | + @label.setter |
| 128 | + def label(self, value: str): |
| 129 | + """Sets the label of the SNMP configuration.""" |
| 130 | + self.descriptor.label = value |
130 | 131 |
|
| 132 | + @property |
| 133 | + def description(self) -> str: |
| 134 | + """Description of the SNMP configuration.""" |
| 135 | + return self.descriptor.desc |
131 | 136 |
|
132 | | -# class SnmpConfig(BaseModel): |
133 | | -# id: str |
134 | | -# configuration: SnmpConfiguration = Field(default_factory=SnmpConfiguration) |
| 137 | + @description.setter |
| 138 | + def description(self, value: str): |
| 139 | + """Sets the description of the SNMP configuration.""" |
| 140 | + self.descriptor.desc = value |
| 141 | + |
| 142 | + @property |
| 143 | + def version(self) -> Literal["SNMP v1", "SNMP v2c", "SNMP v3"]: |
| 144 | + """Preferred SNMP version.""" |
| 145 | + version_map = {SnmpVersion.V1: "SNMP v1", SnmpVersion.V2C: "SNMP v2c", SnmpVersion.V3: "SNMP v3"} |
| 146 | + return cast(Literal["SNMP v1", "SNMP v2c", "SNMP v3"], version_map[self.protocol.preferredVersion]) |
| 147 | + |
| 148 | + @version.setter |
| 149 | + def version(self, value: Literal["SNMP v1", "SNMP v2c", "SNMP v3"]): |
| 150 | + """Sets the preferred SNMP version.""" |
| 151 | + version_map = {"SNMP v1": SnmpVersion.V1, "SNMP v2c": SnmpVersion.V2C, "SNMP v3": SnmpVersion.V3} |
| 152 | + if value not in version_map: |
| 153 | + raise ValueError(f"Invalid SNMP version: {value}") |
| 154 | + self.protocol.preferredVersion = version_map[value] |
| 155 | + |
| 156 | + @property |
| 157 | + def retries(self) -> int: |
| 158 | + """Retries""" |
| 159 | + return self.protocol.retries |
| 160 | + |
| 161 | + @retries.setter |
| 162 | + def retries(self, value: int): |
| 163 | + """Sets the number of retries for SNMP requests.""" |
| 164 | + if not isinstance(value, int) or value < 0: |
| 165 | + raise ValueError("Retries must be a non-negative integer.") |
| 166 | + self.protocol.retries = value |
| 167 | + |
| 168 | + @property |
| 169 | + def timeout(self) -> int: |
| 170 | + """Timeout in milliseconds.""" |
| 171 | + return self.protocol.timeout |
| 172 | + |
| 173 | + @timeout.setter |
| 174 | + def timeout(self, value: int): |
| 175 | + """Sets the timeout for SNMP requests in milliseconds.""" |
| 176 | + if not isinstance(value, int) or value < 0: |
| 177 | + raise ValueError("Timeout must be a non-negative integer.") |
| 178 | + self.protocol.timeout = value |
| 179 | + |
| 180 | + @property |
| 181 | + def local_engine_id(self) -> str: |
| 182 | + """Local Engine ID""" |
| 183 | + return self.protocol.localEngineId |
| 184 | + |
| 185 | + @local_engine_id.setter |
| 186 | + def local_engine_id(self, value: str): |
| 187 | + """Sets the local engine ID for SNMP.""" |
| 188 | + if not isinstance(value, str) or not value: |
| 189 | + raise ValueError("Local Engine ID must be a non-empty string.") |
| 190 | + self.protocol.localEngineId = value |
| 191 | + |
| 192 | + @property |
| 193 | + def use_get_bulk(self) -> bool: |
| 194 | + """Use GetBulk""" |
| 195 | + return self.protocol.useGetBulk |
| 196 | + |
| 197 | + @use_get_bulk.setter |
| 198 | + def use_get_bulk(self, value: bool): |
| 199 | + """Sets whether to use GetBulk for SNMP requests.""" |
| 200 | + if not isinstance(value, bool): |
| 201 | + raise ValueError("Use GetBulk must be a boolean value.") |
| 202 | + self.protocol.useGetBulk = value |
| 203 | + |
| 204 | + @property |
| 205 | + def max_repetitions(self) -> int: |
| 206 | + """Max Repetitions""" |
| 207 | + return self.protocol.maxRepetitions |
| 208 | + |
| 209 | + @max_repetitions.setter |
| 210 | + def max_repetitions(self, value: int): |
| 211 | + """Sets the maximum number of repetitions for SNMP requests.""" |
| 212 | + if not isinstance(value, int) or value < 0: |
| 213 | + raise ValueError("Max Repetitions must be a non-negative integer.") |
| 214 | + self.protocol.maxRepetitions = value |
| 215 | + |
| 216 | + @property |
| 217 | + def read_community(self) -> str: |
| 218 | + """Read Community""" |
| 219 | + return self.security.read.community |
| 220 | + |
| 221 | + @read_community.setter |
| 222 | + def read_community(self, value: str): |
| 223 | + """Sets the read community for SNMP.""" |
| 224 | + if not isinstance(value, str) or not value: |
| 225 | + raise ValueError("Read Community must be a non-empty string.") |
| 226 | + self.security.read.community = value |
| 227 | + |
| 228 | + @property |
| 229 | + def write_community(self) -> str: |
| 230 | + """Write Community""" |
| 231 | + return self.security.write.community |
| 232 | + |
| 233 | + @write_community.setter |
| 234 | + def write_community(self, value: str): |
| 235 | + """Sets the write community for SNMP.""" |
| 236 | + if not isinstance(value, str) or not value: |
| 237 | + raise ValueError("Write Community must be a non-empty string.") |
| 238 | + self.security.write.community = value |
| 239 | + |
| 240 | + def list_usernames(self) -> list[str]: |
| 241 | + """Returns a list of usernames in the SNMP configuration.""" |
| 242 | + return list(self.users.keys()) |
| 243 | + |
| 244 | + def get_user_id_by_username(self, username: str) -> str: |
| 245 | + """Returns the user ID for a given username.""" |
| 246 | + user_ids = [user_id for user_id, user in self.users.items() if user.name == username] |
| 247 | + if not user_ids: |
| 248 | + raise ValueError(f"No user found with username: {username}") |
| 249 | + if len(user_ids) > 1: |
| 250 | + raise ValueError(f"Multiple users found with username: {username}. Please specify a unique user.") |
| 251 | + return user_ids[0] |
| 252 | + |
| 253 | + def set_read_user_by_username(self, username: str): |
| 254 | + """Sets the read user by username.""" |
| 255 | + user_id = self.get_user_id_by_username(username) |
| 256 | + self.security.read.user = user_id |
| 257 | + |
| 258 | + def set_write_user_by_username(self, username: str): |
| 259 | + """Sets the write user by username.""" |
| 260 | + user_id = self.get_user_id_by_username(username) |
| 261 | + self.security.write.user = user_id |
0 commit comments