Skip to content

Commit 27f4220

Browse files
committed
Add API mode support with legacy/rest auto-detection
1 parent 1cbde70 commit 27f4220

2 files changed

Lines changed: 186 additions & 6 deletions

File tree

sagemcom_api/client.py

Lines changed: 177 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
ClientOSError,
1818
ClientSession,
1919
ClientTimeout,
20+
ContentTypeError,
2021
ServerDisconnectedError,
2122
TCPConnector,
2223
)
@@ -39,7 +40,7 @@
3940
XMO_REQUEST_NO_ERR,
4041
XMO_UNKNOWN_PATH_ERR,
4142
)
42-
from .enums import EncryptionMethod
43+
from .enums import ApiMode, EncryptionMethod
4344
from .exceptions import (
4445
AccessRestrictionException,
4546
AuthenticationException,
@@ -75,6 +76,7 @@ def __init__(
7576
username: str,
7677
password: str,
7778
authentication_method: EncryptionMethod | None = None,
79+
api_mode: ApiMode | str = ApiMode.AUTO,
7880
session: ClientSession | None = None,
7981
ssl: bool | None = False,
8082
verify_ssl: bool | None = True,
@@ -86,11 +88,16 @@ def __init__(
8688
:param username: the username for your Sagemcom router
8789
:param password: the password for your Sagemcom router
8890
:param authentication_method: the auth method of your Sagemcom router
91+
:param api_mode: one of auto, legacy or rest
8992
:param session: use a custom session, for example to configure the timeout
9093
"""
9194
self.host = host
9295
self.username = username
9396
self.authentication_method = authentication_method
97+
self.api_mode = ApiMode(api_mode)
98+
self._active_api_mode: ApiMode = (
99+
self.api_mode if self.api_mode != ApiMode.AUTO else ApiMode.LEGACY
100+
)
94101
self.password = password
95102
self._current_nonce = None
96103
self._password_hash = self.__generate_hash(password)
@@ -112,6 +119,11 @@ def __init__(
112119
)
113120
)
114121

122+
@property
123+
def active_api_mode(self) -> ApiMode:
124+
"""Return the API mode that is currently active."""
125+
return self._active_api_mode
126+
115127
async def __aenter__(self) -> SagemcomClient:
116128
"""TODO."""
117129
return self
@@ -315,8 +327,8 @@ async def __api_request_async(self, actions, priority=False):
315327
) as exception:
316328
raise ConnectionError(str(exception)) from exception
317329

318-
async def login(self):
319-
"""Login to the SagemCom F@st router using a username and password."""
330+
async def __legacy_login(self):
331+
"""Login to the legacy JSON-REQ API."""
320332

321333
actions = {
322334
"id": 0,
@@ -358,18 +370,108 @@ async def login(self):
358370

359371
raise UnauthorizedException(data)
360372

373+
@backoff.on_exception(
374+
backoff.expo,
375+
(ClientConnectorError, ClientOSError, ServerDisconnectedError),
376+
max_tries=5,
377+
)
378+
async def __rest_request(
379+
self, method: str, endpoint: str, data: dict[str, Any] | None = None
380+
):
381+
"""Call the REST API using form-encoded payloads."""
382+
url = f"{self.protocol}://{self.host}{endpoint}"
383+
payload = urllib.parse.urlencode(data or {})
384+
request_headers = {"Content-Type": "application/x-www-form-urlencoded"}
385+
386+
async with self.session.request(
387+
method, url, data=payload, headers=request_headers
388+
) as response:
389+
if response.status in (200, 204):
390+
if response.status == 204:
391+
return None
392+
try:
393+
return await response.json()
394+
except (json.JSONDecodeError, ContentTypeError):
395+
return await response.text()
396+
397+
result = await response.text()
398+
if response.status in (401, 403):
399+
raise UnauthorizedException(result)
400+
401+
if response.status == 404:
402+
raise UnsupportedHostException(result)
403+
404+
if response.status == 400:
405+
raise AuthenticationException(result)
406+
407+
raise UnknownException(result)
408+
409+
async def __rest_login(self):
410+
"""Login to routers exposing the newer REST API."""
411+
await self.__rest_request(
412+
"POST",
413+
"/api/v1/login",
414+
data={"login": self.username, "password": self.password},
415+
)
416+
return True
417+
418+
def __should_fallback_to_rest(self, exception: Exception) -> bool:
419+
"""Return True when legacy API failure indicates a REST-only router."""
420+
if isinstance(exception, UnsupportedHostException):
421+
return True
422+
423+
if isinstance(exception, (UnknownException, BadRequestException)):
424+
content = str(exception).lower()
425+
return "service unavailable" in content or "<html" in content
426+
427+
return False
428+
429+
async def login(self):
430+
"""Login to the router using configured API mode."""
431+
if self.api_mode == ApiMode.REST:
432+
self._active_api_mode = ApiMode.REST
433+
return await self.__rest_login()
434+
435+
if self.api_mode == ApiMode.LEGACY:
436+
self._active_api_mode = ApiMode.LEGACY
437+
return await self.__legacy_login()
438+
439+
# Auto-detect mode: try legacy first, then fall back to REST for newer firmwares.
440+
try:
441+
self._active_api_mode = ApiMode.LEGACY
442+
return await self.__legacy_login()
443+
except Exception as exception: # pylint: disable=broad-except
444+
if not self.__should_fallback_to_rest(exception):
445+
raise
446+
447+
self._active_api_mode = ApiMode.REST
448+
return await self.__rest_login()
449+
361450
async def logout(self):
362451
"""Log out of the Sagemcom F@st device."""
363-
actions = {"id": 0, "method": "logOut"}
364-
365-
await self.__api_request_async([actions], False)
452+
if self._active_api_mode == ApiMode.REST:
453+
await self.__rest_request("POST", "/api/v1/logout")
454+
else:
455+
actions = {"id": 0, "method": "logOut"}
456+
await self.__api_request_async([actions], False)
366457

367458
self._session_id = -1
368459
self._server_nonce = ""
369460
self._request_id = -1
370461

462+
def __ensure_legacy_api(self):
463+
"""Raise when a method is only available on legacy JSON-REQ API."""
464+
if self._active_api_mode == ApiMode.REST:
465+
raise NotImplementedError(
466+
"This method is not available with REST API mode. "
467+
"Use helper methods supported for REST firmware instead."
468+
)
469+
371470
async def get_encryption_method(self):
372471
"""Determine which encryption method to use for authentication and set it directly."""
472+
if self.api_mode == ApiMode.REST:
473+
return None
474+
373475
for encryption_method in EncryptionMethod:
374476
try:
375477
self.authentication_method = encryption_method
@@ -411,6 +513,8 @@ async def get_value_by_xpath(self, xpath: str, options: dict | None = None) -> d
411513
:param xpath: path expression
412514
:param options: optional options
413515
"""
516+
self.__ensure_legacy_api()
517+
414518
actions = {
415519
"id": 0,
416520
"method": "getValue",
@@ -441,6 +545,8 @@ async def get_values_by_xpaths(self, xpaths, options: dict | None = None) -> dic
441545
:param xpaths: Dict of key to xpath expression
442546
:param options: optional options
443547
"""
548+
self.__ensure_legacy_api()
549+
444550
actions = [
445551
{
446552
"id": i,
@@ -478,6 +584,8 @@ async def set_value_by_xpath(
478584
:param value: value
479585
:param options: optional options
480586
"""
587+
self.__ensure_legacy_api()
588+
481589
actions = {
482590
"id": 0,
483591
"method": "setValue",
@@ -503,6 +611,26 @@ async def set_value_by_xpath(
503611
)
504612
async def get_device_info(self) -> DeviceInfo:
505613
"""Retrieve information about Sagemcom F@st device."""
614+
if self._active_api_mode == ApiMode.REST:
615+
data = await self.__rest_request("GET", "/api/v1/device")
616+
if not data or not isinstance(data, list):
617+
raise UnknownException("Invalid response from /api/v1/device")
618+
619+
device = data[0].get("device", {})
620+
return DeviceInfo(
621+
mac_address=device.get("wan_mac_address"),
622+
serial_number=device.get("serialnumber"),
623+
model_name=device.get("modelname"),
624+
model_number=device.get("modelname"),
625+
product_class=device.get("modelname"),
626+
software_version=device.get("running", {}).get("version"),
627+
hardware_version=device.get("hardware_version"),
628+
manufacturer="Sagemcom",
629+
up_time=device.get("uptime"),
630+
first_use_date=device.get("firstusedate"),
631+
reboot_count=device.get("numberofboots"),
632+
)
633+
506634
try:
507635
data = await self.get_value_by_xpath("Device/DeviceInfo")
508636
return DeviceInfo(**data["device_info"])
@@ -534,6 +662,47 @@ async def get_device_info(self) -> DeviceInfo:
534662
)
535663
async def get_hosts(self, only_active: bool | None = False) -> list[Device]:
536664
"""Retrieve hosts connected to Sagemcom F@st device."""
665+
if self._active_api_mode == ApiMode.REST:
666+
data = await self.__rest_request("GET", "/api/v1/home")
667+
if not data or not isinstance(data, list):
668+
raise UnknownException("Invalid response from /api/v1/home")
669+
670+
home = data[0]
671+
devices: list[Device] = []
672+
673+
for entry in home.get("wirelessListDevice", []):
674+
devices.append(
675+
Device(
676+
uid=entry.get("id"),
677+
phys_address=entry.get("macAddress"),
678+
ip_address=entry.get("ipAddress"),
679+
host_name=entry.get("hostname"),
680+
user_host_name=entry.get("friendlyname"),
681+
active=entry.get("active", True),
682+
interface_type="wifi",
683+
detected_device_type=entry.get("devicetype"),
684+
)
685+
)
686+
687+
for entry in home.get("ethernetListDevice", []):
688+
devices.append(
689+
Device(
690+
uid=entry.get("id"),
691+
phys_address=entry.get("macAddress"),
692+
ip_address=entry.get("ipAddress"),
693+
host_name=entry.get("hostname"),
694+
user_host_name=entry.get("friendlyname"),
695+
active=entry.get("active", True),
696+
interface_type="ethernet",
697+
detected_device_type=entry.get("devicetype"),
698+
)
699+
)
700+
701+
if only_active:
702+
return [d for d in devices if d.active is True]
703+
704+
return devices
705+
537706
data = await self.get_value_by_xpath(
538707
"Device/Hosts/Hosts", options={"capability-flags": {"interface": True}}
539708
)
@@ -558,6 +727,7 @@ async def get_hosts(self, only_active: bool | None = False) -> list[Device]:
558727
)
559728
async def get_port_mappings(self) -> list[PortMapping]:
560729
"""Retrieve configured Port Mappings on Sagemcom F@st device."""
730+
self.__ensure_legacy_api()
561731
data = await self.get_value_by_xpath("Device/NAT/PortMappings")
562732
port_mappings = [PortMapping(**p) for p in data]
563733

@@ -576,6 +746,7 @@ async def get_port_mappings(self) -> list[PortMapping]:
576746
)
577747
async def reboot(self):
578748
"""Reboot Sagemcom F@st device."""
749+
self.__ensure_legacy_api()
579750
action = {
580751
"id": 0,
581752
"method": "reboot",

sagemcom_api/enums.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,12 @@ class EncryptionMethod(StrEnum):
1818
MD5 = "MD5"
1919
MD5_NONCE = "MD5_NONCE"
2020
SHA512 = "SHA512"
21+
22+
23+
@unique
24+
class ApiMode(StrEnum):
25+
"""API mode to use when communicating with the router."""
26+
27+
AUTO = "auto"
28+
LEGACY = "legacy"
29+
REST = "rest"

0 commit comments

Comments
 (0)