diff --git a/.github/agent.md b/.github/agent.md index 60cfc24..412ad70 100644 --- a/.github/agent.md +++ b/.github/agent.md @@ -1,6 +1,30 @@ # CMS-NBI-Client Agent Configuration -> **Note**: This file is maintained for reference. The active agent initialization file used by GitHub Copilot is located at `.github/agents/init.md`. +> **Source of truth:** This file is the canonical agent definition for the repository. Any downstream Copilot or agent initialization files should be treated as derived views of this document. + +## Upstream Skill Provenance + +- Upstream source: `https://github.com/nullroute-commits/agency-agents` +- Upstream reference reviewed during this integration: `783f6a72bfd7f3135700ac273c619d92821b419a` +- Upstream adoption model: selective import by reference, not verbatim mirroring +- Local override rule: repository-specific correctness, security, and runtime truth take precedence over generic upstream guidance + +## Adopted Upstream Skills + +This repository adopts the following `agency-agents` skill families as upstream guidance: + +- **Engineering:** backend architecture, code review, codebase onboarding, devops automation, technical writing, security engineering +- **Testing:** API testing, reality checking, test-results analysis, workflow optimization +- **Specialized coordination:** agents orchestration for multi-step execution planning + +When an agent works in this repository, it should prefer these upstream skill profiles where they improve quality, but it must still follow this repository's code, tests, and published runtime behavior. + +## Sync Policy + +- Review upstream `agency-agents` updates intentionally; do not auto-sync prompt content into this repository without review +- Record the upstream commit or tag whenever this file is refreshed +- Keep repository-specific sections current even if upstream skills change +- If downstream agent bootstrap files drift from this file, update the downstream copies to match this source of truth This document provides comprehensive context and instructions for AI agents working on the CMS-NBI-Client repository. It includes project overview, architecture, conventions, and best practices. diff --git a/README.md b/README.md index 409e7b6..8c809c3 100644 --- a/README.md +++ b/README.md @@ -4,16 +4,16 @@ [![Python Version](https://img.shields.io/pypi/pyversions/cms-nbi-client.svg)](https://pypi.org/project/cms-nbi-client/) [![License](https://img.shields.io/github/license/somenetworking/CMS-NBI-Client.svg)](https://github.com/somenetworking/CMS-NBI-Client/blob/main/LICENSE) -Modern async Python client for Calix Management System (CMS) Northbound Interface (NBI) with full HTTPS support, connection pooling, circuit breakers, and structured logging. +Modern Python client for Calix Management System (CMS) Northbound Interface (NBI) with an async transport layer, structured logging, and legacy E7 compatibility. **Note:** This package is not owned, supported, or endorsed by Calix. It's an independent implementation for interacting with CMS NBIs. -> **Important:** This library is currently in a transition phase. The modern async CMSClient provides the foundation and configuration management, while the legacy Client class provides the full operational functionality. Both are available and can be used together. See the [Examples](./Examples) folder for working code samples. +> **Important:** This library is currently in a transition phase. `CMSClient` provides configuration, authentication, transport, REST helpers, and legacy E7 compatibility shims. The legacy `Client` class still owns most production E7 behavior. Prefer the [Examples](./Examples) folder for fully working end-to-end E7 samples. ## Features - **Modern Async/Await**: Built on aiohttp for high-performance async operations -- **HTTPS Support**: Full TLS/SSL support with certificate validation +- **HTTPS Support**: Modern transport supports TLS/SSL. Legacy E7 coverage is still being completed - **Connection Pooling**: Reuse connections for better performance - **Circuit Breaker**: Automatic failure detection and recovery - **Structured Logging**: Rich logs with structlog for better debugging @@ -21,7 +21,7 @@ Modern async Python client for Calix Management System (CMS) Northbound Interfac - **Secure Storage**: Encrypted credential storage using system keyring - **XML Security**: Protection against XXE and other XML attacks - **Comprehensive Testing**: High test coverage with pytest -- **Backward Compatible**: Sync wrapper for legacy code +- **Backward Compatible**: Sync wrapper plus legacy E7 compatibility facade ## Quick Start @@ -50,29 +50,18 @@ async def main(): ) async with CMSClient(config) as client: - # Note: High-level methods are available through the legacy client - # For modern async operations, use the underlying E7 modules directly - # Example using legacy operations: - - # Create ONT using E7 operations - create_op = client.e7.create - result = await create_op.ont( - network_nm="NTWK-1", - ont_id="123", - admin_state="enabled" - ) - print(result) + devices = client.rest.query.device(device_type="e7") + print(devices) # Run async code asyncio.run(main()) # Synchronous usage (backward compatible) -# Note: For most operations, use the LegacyClient for now from cmsnbiclient import LegacyClient legacy_client = LegacyClient() -# Configure and use legacy client for production operations -print("Use LegacyClient for full feature compatibility") +# Configure and use LegacyClient for full E7 feature coverage +print("Use LegacyClient for end-to-end E7 workflows") ``` ### Configuration @@ -131,6 +120,12 @@ setup_logging(log_level="DEBUG", json_logs=False) For detailed documentation and examples, see the [/Examples](./Examples) folder. +### Current API Status + +- `CMSClient`: async authentication, transport management, REST helpers, and compatibility access to legacy E7 handlers +- `LegacyClient`: primary path for complete E7 NETCONF workflows +- Documentation under `docs/` may still describe the planned flattened async E7 API; use the examples and current source as the authoritative runtime behavior until that migration is completed + ### Available Operations #### E7 Operations @@ -190,4 +185,4 @@ poetry run mypy . ## Changelog -See [CHANGELOG.md](./CHANGELOG.md) for version history. \ No newline at end of file +See [CHANGELOG.md](./CHANGELOG.md) for version history. diff --git a/src/cmsnbiclient/REST/query.py b/src/cmsnbiclient/REST/query.py index 133e42b..c4f493d 100644 --- a/src/cmsnbiclient/REST/query.py +++ b/src/cmsnbiclient/REST/query.py @@ -1,4 +1,4 @@ -from typing import Any, Union +from typing import Any, Dict, Optional, Tuple, Union import requests @@ -31,13 +31,48 @@ def __init__(self, cms_nbi_connect_object: Union[Client, Any]) -> None: ) self.cms_nbi_connect_object = cms_nbi_connect_object + def _get_legacy_defaults(self) -> Tuple[str, str, str, str, str]: + """Resolve protocol, port, username, password, and host for legacy clients.""" + config = self.cms_nbi_connect_object.cms_nbi_config + default_node = config["cms_nodes"]["default"] + connection = default_node["connection"] + credentials = default_node["cms_creds"] + protocol = connection["protocol"]["http"] + return ( + protocol, + connection["rest_http_port"], + credentials["user_nm"], + credentials["pass_wd"], + connection["cms_node_ip"], + ) + + def _get_modern_defaults(self) -> Tuple[str, str, str, str, str]: + """Resolve protocol, port, username, password, and host for CMSClient.""" + config = self.cms_nbi_connect_object.config + return ( + config.connection.protocol, + str(config.connection.rest_port), + config.credentials.username, + config.credentials.password.get_secret_value(), + config.connection.host, + ) + + def _get_rest_uri(self) -> str: + """Resolve the REST devices URI for the current client type.""" + if ( + hasattr(self.cms_nbi_connect_object, "cms_nbi_config") + and "cms_nodes" in self.cms_nbi_connect_object.cms_nbi_config + ): + return self.cms_nbi_connect_object.cms_nbi_config["cms_rest_uri"]["devices"] + return "/restnbi/devices?deviceType=" + def device( self, - protocol: str = "http", - port: str = "8080", - cms_user_nm: str = "rootgod", - cms_user_pass: str = "root", - cms_node_ip: str = "localhost", + protocol: Optional[str] = None, + port: Optional[str] = None, + cms_user_nm: Optional[str] = None, + cms_user_pass: Optional[str] = None, + cms_node_ip: Optional[str] = None, device_type: str = "", http_timeout: int = 1, ) -> Any: @@ -101,25 +136,42 @@ def device( device_type='c7', http_timeout=5) """ - # Handle both Client and CMSClient types - if hasattr(self.cms_nbi_connect_object, "cms_nbi_config"): - config = self.cms_nbi_connect_object.cms_nbi_config - else: - # Fallback for CMSClient or other types - config = getattr(self.cms_nbi_connect_object, "config", {}).get("cms_rest_uri", {}) - - if isinstance(config, dict) and "cms_rest_uri" in config: - uri = config["cms_rest_uri"]["devices"] + if ( + hasattr(self.cms_nbi_connect_object, "cms_nbi_config") + and "cms_nodes" in self.cms_nbi_connect_object.cms_nbi_config + ): + ( + default_protocol, + default_port, + default_user, + default_password, + default_host, + ) = self._get_legacy_defaults() else: - uri = "/restnbi/devices?deviceType=" # Default fallback - - cms_rest_url = f"""{protocol}://{cms_node_ip}:{port}{uri}{device_type}&limit=9999""" + ( + default_protocol, + default_port, + default_user, + default_password, + default_host, + ) = self._get_modern_defaults() + + resolved_protocol = protocol or default_protocol + resolved_port = port or default_port + resolved_user = cms_user_nm or default_user + resolved_password = cms_user_pass or default_password + resolved_host = cms_node_ip or default_host + uri = self._get_rest_uri() + + cms_rest_url = ( + f"{resolved_protocol}://{resolved_host}:{resolved_port}{uri}{device_type}&limit=9999" + ) payload = "" headers = { "Content-Type": "application/json", - "User-Agent": f"CMS_NBI_CONNECT-{cms_user_nm}", + "User-Agent": f"CMS_NBI_CONNECT-{resolved_user}", } try: @@ -127,13 +179,20 @@ def device( url=cms_rest_url, headers=headers, data=payload, - auth=(cms_user_nm, cms_user_pass), + auth=(resolved_user, resolved_password), timeout=http_timeout, ) except requests.exceptions.Timeout as e: raise e if response.status_code == 200: - return response.json()["data"] + body: Dict[str, Any] = response.json() + # CMS API deployments expose both `data` and `devices` response + # envelopes depending on the endpoint version. + if "data" in body: + return body["data"] + if "devices" in body: + return body["devices"] + return body else: return response diff --git a/src/cmsnbiclient/__init__.py b/src/cmsnbiclient/__init__.py index 2ef06c4..ade3e41 100644 --- a/src/cmsnbiclient/__init__.py +++ b/src/cmsnbiclient/__init__.py @@ -54,6 +54,3 @@ # Legacy API (deprecated) "LegacyClient", ] - -# Default logging setup -setup_logging() diff --git a/src/cmsnbiclient/client.py b/src/cmsnbiclient/client.py index de614e1..c402608 100644 --- a/src/cmsnbiclient/client.py +++ b/src/cmsnbiclient/client.py @@ -185,20 +185,16 @@ def login_netconf( """ - if protocol == "http": - try: - response = requests.post( - url=self.cms_netconf_url, - headers=self.headers, - data=payload, - timeout=http_timeout, - ) - except requests.exceptions.Timeout as e: - # future came and it decided to have raise - raise e - else: - # TODO:Need to implement https handling - pass + try: + response = requests.post( + url=self.cms_netconf_url, + headers=self.headers, + data=payload, + timeout=http_timeout, + ) + except requests.exceptions.Timeout as e: + # future came and it decided to have raise + raise e if response.status_code != 200: # if the response code is not 200 FALSE and the request.post object is returned. @@ -276,20 +272,16 @@ def logout_netconf( """ - if protocol == "http": - try: - response = requests.post( - url=self.cms_netconf_url, - headers=self.headers, - data=payload, - timeout=http_timeout, - ) - except requests.exceptions.Timeout as e: - # debating between exit and raise will update in future - raise e - else: - # will need to research how to implement https connection with request library - pass + try: + response = requests.post( + url=self.cms_netconf_url, + headers=self.headers, + data=payload, + timeout=http_timeout, + ) + except requests.exceptions.Timeout as e: + # debating between exit and raise will update in future + raise e if response.status_code != 200: # if the response code is not 200 response.models.Response is returned. diff --git a/src/cmsnbiclient/client_v2.py b/src/cmsnbiclient/client_v2.py index c9b2a28..dab4abc 100644 --- a/src/cmsnbiclient/client_v2.py +++ b/src/cmsnbiclient/client_v2.py @@ -1,19 +1,66 @@ import asyncio from datetime import datetime, timedelta -from typing import Any, Dict, Optional +from typing import Any, Callable, Dict, Optional, Type, cast import aiohttp import structlog +from defusedxml import ElementTree as DefusedET from .core.base import BaseClient from .core.config import Config from .core.transport import AsyncHTTPTransport +from .E7 import Create as E7Create +from .E7 import Delete as E7Delete +from .E7 import Query as E7Query +from .E7 import Update as E7Update +from .exceptions import AuthenticationError from .REST import RESTOperations from .security.credentials import SecureCredentialManager logger = structlog.get_logger() +class _LegacyOperationGroup: + """Compatibility wrapper that lazily instantiates legacy E7 operations.""" + + def __init__(self, client: "CMSClient", factory: Type[Any]): + self._client = client + self._factory = factory + + def __getattr__(self, method_name: str) -> Callable[..., Any]: + def caller(*args: Any, **kwargs: Any) -> Any: + network_name = kwargs.pop("network_name", None) or kwargs.pop("network_nm", "") + http_timeout = kwargs.pop("http_timeout", 1) + operation = self._factory( + self._client, network_nm=network_name, http_timeout=http_timeout + ) + method = getattr(operation, method_name) + return method(*args, **kwargs) + + return caller + + +class LegacyE7Facade: + """Compatibility facade that preserves the documented `client.e7.*` surface.""" + + def __init__(self, client: "CMSClient"): + self.create = _LegacyOperationGroup(client, E7Create) + self.delete = _LegacyOperationGroup(client, E7Delete) + self.query = _LegacyOperationGroup(client, E7Query) + self.update = _LegacyOperationGroup(client, E7Update) + + def __getattr__(self, name: str) -> Callable[..., Any]: + for prefix, group in ( + ("create_", self.create), + ("delete_", self.delete), + ("query_", self.query), + ("update_", self.update), + ): + if name.startswith(prefix): + return cast(Callable[..., Any], getattr(group, name[len(prefix):])) + raise AttributeError(name) + + class CMSClient(BaseClient): """Modern async CMS client with all features""" @@ -23,10 +70,13 @@ def __init__(self, config: Config): self._credential_manager = SecureCredentialManager() self._auth_time: Optional[datetime] = None self._auth_lock = asyncio.Lock() + self.cms_nbi_config = self._build_legacy_compat_config() + self.cms_netconf_url = self._build_netconf_url() + self.session_id: Optional[str] = None + self.cms_user_nm = config.credentials.username # Operation handlers - # Note: E7 operations currently require LegacyClient interface - # self.e7 = E7Operations(self) # Disabled during transition + self.e7 = LegacyE7Facade(self) self.rest = RESTOperations(self) async def authenticate(self) -> None: @@ -60,6 +110,7 @@ async def authenticate(self) -> None: # Parse response result = await self._parse_auth_response(response) self._session_id = result["session_id"] + self.session_id = result["session_id"] self._auth_time = datetime.now() self.logger.info("Authentication successful", session_id=self._session_id) @@ -82,14 +133,17 @@ async def _logout(self) -> None: if self._transport is None: raise RuntimeError("Transport not initialized") - await self._transport.request( + response = await self._transport.request( method="POST", url=url, data=payload, headers={"Content-Type": "text/xml;charset=ISO-8859-1"}, ) + await response.read() + response.release() self._session_id = None + self.session_id = None self.logger.info("Logged out successfully") def _build_netconf_url(self) -> str: @@ -124,17 +178,51 @@ def _build_logout_payload(self) -> str: """ + def _build_legacy_compat_config(self) -> Dict[str, Any]: + """Build minimal legacy-compatible configuration for shared modules.""" + return { + "cms_netconf_uri": { + "e7": "/cmsexc/ex/netconf", + "c7/e3/e5-100": "/cmsweb/nc", + "ae_ont": "/cmsae/ae/netconf", + }, + "cms_rest_uri": { + "devices": "/restnbi/devices?deviceType=", + "region": "/restnbi/region", + "topology": "/restnbi/toplinks", + "profile": "/restnbi/profiles?profileType=", + }, + } + + @staticmethod + def _find_xml_text(root: Any, tag_name: str) -> Optional[str]: + """Find an element value regardless of namespace prefix.""" + for element in root.iter(): + if isinstance(element.tag, str) and element.tag.split("}")[-1] == tag_name: + if isinstance(element.text, str) and element.text: + return element.text.strip() + return None + async def _parse_auth_response(self, response: aiohttp.ClientResponse) -> Dict[str, Any]: """Parse authentication response""" - text = await response.text() - # Simple parsing for now - should use proper XML parser - if "" in text and "" in text: - start = text.find("") + len("") - end = text.find("") - session_id = text[start:end] - return {"session_id": session_id} - else: - raise Exception("Authentication failed - no session ID in response") + try: + text = await response.text() + finally: + response.release() + + try: + root = DefusedET.fromstring(text) + except DefusedET.ParseError as exc: + raise AuthenticationError("Authentication failed: invalid XML response") from exc + + result_code = self._find_xml_text(root, "ResultCode") + session_id = self._find_xml_text(root, "SessionId") + + if result_code and result_code != "0": + raise AuthenticationError(f"Authentication failed with result code {result_code}") + if not session_id: + raise AuthenticationError("Authentication failed: no session ID in response") + return {"session_id": session_id} @classmethod def sync(cls, config: Config) -> "SyncCMSClient": @@ -160,3 +248,15 @@ def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: if self._client and self._loop: self._loop.run_until_complete(self._client.close()) self._loop.close() + + @property + def e7(self) -> LegacyE7Facade: + if self._client is None: + raise RuntimeError("SyncCMSClient is not connected") + return self._client.e7 + + @property + def rest(self) -> RESTOperations: + if self._client is None: + raise RuntimeError("SyncCMSClient is not connected") + return self._client.rest diff --git a/src/cmsnbiclient/core/transport.py b/src/cmsnbiclient/core/transport.py index 98b6dbb..af0f627 100644 --- a/src/cmsnbiclient/core/transport.py +++ b/src/cmsnbiclient/core/transport.py @@ -5,8 +5,9 @@ import aiohttp import certifi import structlog -from aiohttp import ClientTimeout, TCPConnector +from aiohttp import ClientResponse, ClientTimeout, TCPConnector +from ..exceptions import ConnectionError, NetworkError, TimeoutError from .circuit_breaker import CircuitBreaker from .config import Config @@ -70,8 +71,13 @@ async def request( data: Optional[str] = None, headers: Optional[Dict[str, str]] = None, timeout: Optional[float] = None, - ) -> aiohttp.ClientResponse: - """Execute HTTP request with circuit breaker""" + ) -> ClientResponse: + """Execute HTTP request with circuit breaker. + + The caller must either fully consume the body with `response.read()` + or explicitly release the response with `response.release()` to avoid + leaking pooled connections. + """ if not self._session: await self.initialize() @@ -93,16 +99,25 @@ async def _do_request( headers: Optional[Dict[str, str]] = None, auth: Optional[aiohttp.BasicAuth] = None, timeout: Optional[ClientTimeout] = None, - ) -> aiohttp.ClientResponse: + ) -> ClientResponse: """Execute actual HTTP request""" if self._session is None: raise RuntimeError("Transport not initialized. Call initialize() first.") - async with self._session.request( - method=method, url=url, data=data, headers=headers, auth=auth, timeout=timeout - ) as response: + try: + response = await self._session.request( + method=method, url=url, data=data, headers=headers, auth=auth, timeout=timeout + ) response.raise_for_status() return response + except asyncio.TimeoutError as exc: + raise TimeoutError(f"Request timed out for {method} {url}") from exc + except aiohttp.ClientResponseError as exc: + raise ConnectionError( + f"HTTP {exc.status} error for {method} {url}: {exc.message}" + ) from exc + except aiohttp.ClientError as exc: + raise NetworkError(f"Request failed for {method} {url}: {exc}") from exc async def close(self) -> None: """Close transport connections""" diff --git a/src/cmsnbiclient/security/xml.py b/src/cmsnbiclient/security/xml.py index ffdda6a..cfc99d6 100644 --- a/src/cmsnbiclient/security/xml.py +++ b/src/cmsnbiclient/security/xml.py @@ -1,5 +1,6 @@ from typing import Any, Dict, Union from xml.etree.ElementTree import Element +from xml.sax.saxutils import escape, quoteattr import defusedxml.ElementTree as ET import structlog @@ -74,5 +75,43 @@ def _element_to_dict(self, element: Element) -> Union[Dict[str, Any], str]: def build(self, data: Dict[str, Any]) -> str: """Build XML from dictionary using templates""" - # Implementation using lxml builder for safety - raise NotImplementedError("XML building not yet implemented") + if len(data) != 1: + raise ValueError("XML data must contain exactly one root element") + + root_name, root_value = next(iter(data.items())) + return self._build_element(root_name, root_value) + + def _build_element(self, name: str, value: Any) -> str: + """Recursively build a safely escaped XML element string.""" + if isinstance(value, dict): + attributes = value.get("@attributes", {}) + attr_text = "".join( + f" {key}={quoteattr(str(attr_value))}" for key, attr_value in attributes.items() + ) + inner_parts = [] + for child_name, child_value in value.items(): + if child_name in {"@attributes", "#text"}: + continue + + if isinstance(child_value, list): + for item in child_value: + inner_parts.append(self._build_element(child_name, item)) + else: + inner_parts.append(self._build_element(child_name, child_value)) + + if "#text" in value and value["#text"] is not None: + inner_parts.insert(0, escape(str(value["#text"]))) + + return f"<{name}{attr_text}>{''.join(inner_parts)}" + + if value is None: + return f"<{name}>" + return f"<{name}>{escape(str(value))}" + + +_DEFAULT_XML_HANDLER = SecureXMLHandler() + + +def parse_xml_safely(xml_string: str) -> Dict[str, Any]: + """Parse XML with the repository's secure XML handler.""" + return _DEFAULT_XML_HANDLER.parse(xml_string) diff --git a/tests/unit/test_imports.py b/tests/unit/test_imports.py index 9dd491a..57b147b 100644 --- a/tests/unit/test_imports.py +++ b/tests/unit/test_imports.py @@ -93,3 +93,19 @@ def test_client_creation(self): # Just test that the class can be imported and basic config works assert CMSClient is not None assert config is not None + + def test_client_exposes_legacy_compatibility(self): + """Test that CMSClient exposes documented compatibility helpers.""" + from cmsnbiclient import CMSClient, Config + + config = Config( + credentials={"username": "test", "password": "test"}, + connection={"host": "localhost", "verify_ssl": False}, + ) + + client = CMSClient(config) + + assert client.e7 is not None + assert client.rest is not None + assert client.cms_netconf_url.endswith("/cmsexc/ex/netconf") + assert client.cms_nbi_config["cms_netconf_uri"]["e7"] == "/cmsexc/ex/netconf" diff --git a/tests/unit/test_modern_client.py b/tests/unit/test_modern_client.py new file mode 100644 index 0000000..5f10809 --- /dev/null +++ b/tests/unit/test_modern_client.py @@ -0,0 +1,170 @@ +"""Regression tests for the modern client implementation.""" + +from typing import Any, Dict, Optional + +import pytest +from pydantic import SecretStr + +from cmsnbiclient import CMSClient, Config +from cmsnbiclient.core.config import ConnectionConfig, CredentialsConfig +from cmsnbiclient.core.transport import AsyncHTTPTransport +from cmsnbiclient.exceptions import AuthenticationError +from cmsnbiclient.security.xml import SecureXMLHandler, parse_xml_safely + + +class FakeResponse: + """Minimal response stub for auth parsing tests.""" + + def __init__(self, body: str): + self._body = body + self.released = False + + async def text(self) -> str: + return self._body + + async def read(self) -> bytes: + return self._body.encode() + + def release(self) -> None: + self.released = True + + +class FakeSessionResponse: + """Minimal response stub for transport tests.""" + + def __init__(self) -> None: + self.raise_for_status_called = False + + def raise_for_status(self) -> None: + self.raise_for_status_called = True + + +class FakeSession: + """Minimal aiohttp session stub.""" + + def __init__(self, response: FakeSessionResponse) -> None: + self.response = response + self.request_args: Optional[Dict[str, Any]] = None + + async def request(self, **kwargs: Any) -> FakeSessionResponse: + self.request_args = kwargs + return self.response + + +class FakeTransport: + """Minimal transport stub for CMSClient authentication.""" + + def __init__(self, response: FakeResponse) -> None: + self.response = response + self.calls: list[Dict[str, Any]] = [] + + async def request(self, **kwargs: Any) -> FakeResponse: + self.calls.append(kwargs) + return self.response + + async def close(self) -> None: + return None + + +def make_config(**connection_overrides: Any) -> Config: + """Create a test config with optional connection overrides.""" + connection = {"protocol": "http", "host": "cms.example.com", "verify_ssl": False} + connection.update(connection_overrides) + return Config( + credentials=CredentialsConfig(username="test-user", password=SecretStr("test-pass")), + connection=ConnectionConfig.model_validate(connection), + ) + + +@pytest.mark.asyncio +async def test_transport_returns_live_response_without_context_manager() -> None: + """Transport should return the response object directly.""" + transport = AsyncHTTPTransport(make_config()) + response = FakeSessionResponse() + session = FakeSession(response) + transport._session = session # type: ignore[assignment] + + result = await transport._do_request("POST", "http://cms.example.com/test") + + assert result is response + assert response.raise_for_status_called is True + assert session.request_args is not None + assert session.request_args["method"] == "POST" + + +@pytest.mark.asyncio +async def test_authenticate_parses_xml_response_and_updates_session_state() -> None: + """Authentication should parse XML safely and update legacy compatibility fields.""" + client = CMSClient(make_config(host="127.0.0.1", netconf_port=18080)) + response = FakeResponse( + """ + + + + 0 + 12345 + + + """ + ) + client._transport = FakeTransport(response) # type: ignore[assignment] + + await client.authenticate() + + assert client.session_id == "12345" + assert response.released is True + + +@pytest.mark.asyncio +async def test_authenticate_raises_on_invalid_xml() -> None: + """Authentication should raise a typed error for malformed XML.""" + client = CMSClient(make_config()) + response = FakeResponse(" None: + """REST queries should default to CMSClient configuration instead of legacy literals.""" + client = CMSClient(make_config(host="cms.internal", rest_port=8443, protocol=protocol)) + + captured: Dict[str, Any] = {} + + class Response: + status_code = 200 + + @staticmethod + def json() -> Dict[str, Any]: + return {"devices": [{"id": "1"}]} + + def fake_get(**kwargs: Any) -> Response: + captured.update(kwargs) + return Response() + + monkeypatch.setattr("cmsnbiclient.REST.query.requests.get", fake_get) + + result = client.rest.query.device(device_type="e7", http_timeout=5) + + assert result == [{"id": "1"}] + assert captured["url"] == expected_url + assert captured["auth"] == ("test-user", "test-pass") + assert captured["timeout"] == 5 + + +def test_secure_xml_helpers_are_available() -> None: + """Secure XML helpers should parse and build simple XML payloads.""" + parsed = parse_xml_safely("ok") + built = SecureXMLHandler().build({"root": {"child": "value"}}) + + assert parsed == {"value": "ok"} + assert built == "value"