Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 91 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -637,35 +637,114 @@ with mistapi.websockets.sites.DeviceStatsEvents(apisession, site_ids=["<site_id>

### Device Utilities Usage

```python
from mistapi.device_utils import ap, ex
All device utility functions are **non-blocking**: they trigger the REST API call, start a WebSocket stream in the background, and return a `UtilResponse` immediately. Your script can continue processing while data streams in.

#### Callback style

# Ping from an AP
result = ap.ping(apisession, site_id, device_id, host="8.8.8.8")
print(result.ws_data)
Pass an `on_message` callback to process each result as it arrives:

# Retrieve ARP table from a switch
result = ex.retrieveArpTable(apisession, site_id, device_id)
print(result.ws_data)
```python
from mistapi.device_utils import ex

# With real-time callback
def handle(msg):
print("got:", msg)
print("Live:", msg)

response = ex.retrieveArpTable(apisession, site_id, device_id, on_message=handle)
# returns immediately — on_message fires for each message in the background

do_other_work()

response.wait() # block until streaming is complete
print(response.ws_data) # all collected data
```

#### Generator style

Iterate over processed messages as they arrive, similar to `_MistWebsocket.receive()`:

result = ex.cableTest(apisession, site_id, device_id, port="ge-0/0/0", on_message=handle)
```python
response = ex.retrieveMacTable(apisession, site_id, device_id)
for msg in response.receive(): # blocking generator, yields each message
print(msg)
# loop ends when the WebSocket closes
print(response.ws_data)
```

#### Context manager

`disconnect()` is called automatically when the context exits:

```python
with ex.cableTest(apisession, site_id, device_id, port_id="ge-0/0/0") as response:
for msg in response.receive():
print(msg)
# WebSocket disconnected, data ready
print(response.ws_data)
```

#### Polling

Check `response.done` to avoid blocking:

```python
response = ex.retrieveBgpSummary(apisession, site_id, device_id)
while not response.done:
do_other_work()
print(response.ws_data)
```

#### Cancel early

Stop a long-running stream before it completes:

```python
response = ex.monitorTraffic(apisession, site_id, device_id, port_id="ge-0/0/0")
do_some_work()
response.disconnect() # stop the WebSocket
print(response.ws_data) # data collected so far
```

#### Async await

Works in `asyncio` contexts without blocking the event loop:

```python
import asyncio
from mistapi.device_utils import ex

async def main():
response = ex.traceroute(apisession, site_id, device_id, host="8.8.8.8")
await response # non-blocking await
print(response.ws_data)

asyncio.run(main())
```

### UtilResponse Object

All device utility functions return a `UtilResponse` object:

#### Attributes

| Attribute | Type | Description |
|-----------|------|-------------|
| `trigger_api_response` | `APIResponse` | The initial REST API response that triggered the device command. Contains `status_code`, `data`, and `headers` from the trigger request. |
| `ws_required` | `bool` | `True` if the command required a WebSocket connection to stream results (most diagnostic commands do). `False` if the REST response alone was sufficient. |
| `ws_data` | `list[str]` | Parsed result data extracted from the WebSocket stream. Each entry is a processed output line from the device (e.g., a line of ping output or an ARP table row). |
| `ws_data` | `list[str]` | Parsed result data extracted from the WebSocket stream. This list is **live** — it grows as messages arrive in the background, even before `wait()` is called. |
| `ws_raw_events` | `list[str]` | Raw, unprocessed WebSocket event payloads as received from the Mist API. Useful for debugging or custom parsing. |

#### Properties and Methods

| Method / Property | Returns | Description |
|-------------------|---------|-------------|
| `done` | `bool` | `True` if data collection is complete (or no WS was needed). |
| `wait(timeout=None)` | `UtilResponse` | Block until data collection is complete. Returns `self`. |
| `receive()` | `Generator` | Blocking generator that yields each processed message as it arrives. Exits when the WebSocket closes. |
| `disconnect()` | `None` | Stop the WebSocket connection early. |
| `await response` | `UtilResponse` | Non-blocking await for `asyncio` contexts. |

`UtilResponse` also supports the context manager protocol (`with` statement).

### Enums

- `ap.TracerouteProtocol` — `ICMP`, `UDP` (for `ap.traceroute()`)
Comment thread
tmunzer-AIDE marked this conversation as resolved.
Expand Down
114 changes: 104 additions & 10 deletions src/mistapi/device_utils/__tools/__ws_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import queue
import threading
from collections.abc import Callable
from collections.abc import Callable, Generator
from enum import Enum

from mistapi import APISession
Expand Down Expand Up @@ -30,19 +31,102 @@ class Timer(Enum):

class UtilResponse:
"""
A simple class to encapsulate the response from utility WebSocket functions.
This class can be extended in the future to include additional metadata or helper methods.
Encapsulates the response from device utility functions.

Returned immediately by tool functions. When a WebSocket stream is
involved, data is collected in the background. Use ``receive()``,
``wait()``, or the ``on_message`` callback to consume results.

USAGE PATTERNS
-----------
Callback style (on_message passed at call time)::

response = ex.ping(session, site_id, device_id, host="8.8.8.8",
on_message=lambda msg: print(msg))
do_other_work()
response.wait()
print(response.ws_data)

Generator style::

response = ex.ping(session, site_id, device_id, host="8.8.8.8")
for msg in response.receive():
print(msg)

Context manager::

with ex.ping(session, site_id, device_id, host="8.8.8.8") as response:
for msg in response.receive():
print(msg)

Async await::

response = ex.ping(session, site_id, device_id, host="8.8.8.8")
await response
print(response.ws_data)
"""

def __init__(
self,
api_response: _APIResponse,
) -> None:
self.trigger_api_response = api_response
# This can be set to True if the WebSocket connection was successfully initiated
self.ws_required: bool = False
self.ws_data: list[str] = []
self.ws_raw_events: list[str] = []
self._queue: queue.Queue[str | None] = queue.Queue()
self._closed = threading.Event()
self._closed.set() # default: done (no WS to wait for)
self._disconnect_fn: Callable[[], None] | None = None

@property
def done(self) -> bool:
"""True if data collection is complete (or no WS was needed)."""
return self._closed.is_set()

def wait(self, timeout: float | None = None) -> "UtilResponse":
"""Block until data collection is complete. Returns self."""
self._closed.wait(timeout=timeout)
return self

def receive(self) -> Generator[str, None, None]:
"""
Blocking generator that yields each processed message as it arrives.

Mirrors ``_MistWebsocket.receive()``. Exits cleanly when the
WebSocket connection closes or ``disconnect()`` is called.
"""
while True:
try:
item = self._queue.get(timeout=1)
except queue.Empty:
if self._closed.is_set() and self._queue.empty():
break
continue
if item is None:
break
yield item

def disconnect(self) -> None:
"""Stop the WebSocket connection early."""
if self._disconnect_fn:
self._disconnect_fn()

def __enter__(self) -> "UtilResponse":
return self

def __exit__(self, *args) -> None:
self.disconnect()

def __await__(self):
"""Allow ``result = await response`` in async contexts."""
import asyncio

async def _await_impl():
await asyncio.to_thread(self._closed.wait)
return self

return _await_impl().__await__()


class WebSocketWrapper:
Expand Down Expand Up @@ -83,7 +167,6 @@ def __init__(
self.session_id: str | None = None
self.capture_id: str | None = None
self._on_message_cb = on_message
self._closed = threading.Event()

LOGGER.debug(
"trigger response: %s", self.util_response.trigger_api_response.data
Expand All @@ -107,7 +190,9 @@ def _on_open(self):

def _on_close(self, code, msg):
LOGGER.info("WebSocket closed: %s - %s", code, msg)
self._closed.set()
self._stop_all_timers()
self.util_response._queue.put(None) # sentinel for receive()
self.util_response._closed.set() # signal completion

##########################################################################
## Helper methods for managing timers
Expand Down Expand Up @@ -158,6 +243,7 @@ def _handle_message(self, msg):
raw = self._extract_raw(msg)
if raw:
self.data.append(raw)
self.util_response._queue.put(raw) # feed receive() generator
if self._on_message_cb:
self._on_message_cb(raw)
self._timeout_handler(Timer.TIMEOUT, TimerAction.RESET)
Expand Down Expand Up @@ -234,7 +320,11 @@ def _extract_raw(self, message):
## WebSocket connection management
def start(self, ws) -> UtilResponse:
"""
Start the WS connection, block until closed, return UtilResponse.
Start the WS connection in the background and return immediately.

The returned ``UtilResponse`` collects data as it streams in. Use
``response.receive()``, ``response.wait()``, or the ``on_message``
callback to consume results.

PARAMS
-----------
Expand All @@ -246,9 +336,13 @@ def start(self, ws) -> UtilResponse:
ws.on_error(lambda error: LOGGER.error("Error: %s", error))
ws.on_close(self._on_close)
ws.on_open(self._on_open)
ws.connect(run_in_background=False) # blocks until _on_close fires
self._stop_all_timers()

# Wire up UtilResponse before starting WS
self.util_response.ws_required = True
self.util_response.ws_data = self.data
self.util_response.ws_data = self.data # live list reference
self.util_response.ws_raw_events = self.raw_events
self.util_response._closed.clear() # mark as "in progress"
self.util_response._disconnect_fn = ws.disconnect

ws.connect(run_in_background=True) # non-blocking
return self.util_response
2 changes: 1 addition & 1 deletion src/mistapi/device_utils/__tools/bpdu.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from mistapi.device_utils.__tools.__ws_wrapper import UtilResponse


async def clear_error(
def clear_error(
apissession: _APISession,
site_id: str,
device_id: str,
Expand Down
2 changes: 1 addition & 1 deletion src/mistapi/device_utils/__tools/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class Node(Enum):
# LOGGER.info(trigger.data)
# print(f"SSR DNS resolution command triggered for device {device_id}")
# ws = DeviceCmdEvents(apissession, site_id=site_id, device_ids=[device_id])
# util_response = WebSocketWrapper(
# util_response = await WebSocketWrapper(
Comment thread
tmunzer-AIDE marked this conversation as resolved.
Outdated
Comment thread
tmunzer-AIDE marked this conversation as resolved.
Outdated
# apissession, util_response, timeout=timeout, on_message=on_message
# ).start(ws)
# else:
Expand Down
2 changes: 1 addition & 1 deletion src/mistapi/device_utils/__tools/dot1x.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from mistapi.device_utils.__tools.__ws_wrapper import UtilResponse


async def clear_sessions(
def clear_sessions(
apissession: _APISession,
site_id: str,
device_id: str,
Expand Down
2 changes: 1 addition & 1 deletion src/mistapi/device_utils/__tools/mac.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def clear_mac_table(
if trigger.status_code == 200:
LOGGER.info(trigger.data)
print(f"Clear MAC Table command triggered for device {device_id}")
# util_response = WebSocketWrapper(
# util_response = await WebSocketWrapper(
Comment thread
tmunzer-AIDE marked this conversation as resolved.
Outdated
# apissession, util_response, timeout=timeout, on_message=on_message
# ).start(ws)
else:
Expand Down
13 changes: 9 additions & 4 deletions src/mistapi/device_utils/__tools/miscellaneous.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,15 @@ def monitor_traffic(
if trigger.status_code == 200:
LOGGER.info(trigger.data)
print(f"Monitor traffic command triggered for device {device_id}")
ws = SessionWithUrl(apissession, url=trigger.data.get("url", ""))
util_response = WebSocketWrapper(
apissession, util_response, timeout=timeout, on_message=on_message
).start(ws)
if isinstance(trigger.data, dict) and "url" in trigger.data:
ws = SessionWithUrl(apissession, url=trigger.data.get("url", ""))
util_response = WebSocketWrapper(
apissession, util_response, timeout=timeout, on_message=on_message
).start(ws)
else:
LOGGER.error(
f"Monitor traffic command did not return a valid URL: {trigger.data}"
)
else:
LOGGER.error(
f"Failed to trigger monitor traffic command: {trigger.status_code} - {trigger.data}"
Expand Down
2 changes: 1 addition & 1 deletion src/mistapi/device_utils/__tools/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from mistapi.device_utils.__tools.__ws_wrapper import UtilResponse


async def clear_hit_count(
def clear_hit_count(
apissession: _APISession,
site_id: str,
device_id: str,
Expand Down
2 changes: 0 additions & 2 deletions src/mistapi/device_utils/ap.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
# Re-export shared classes and types
from mistapi.device_utils.__tools.arp import retrieve_ap_arp_table as retrieveArpTable
from mistapi.device_utils.__tools.miscellaneous import (
TracerouteProtocol,
ping,
traceroute,
)
Comment thread
tmunzer-AIDE marked this conversation as resolved.

__all__ = [
"ping",
"traceroute",
"TracerouteProtocol",
"retrieveArpTable",
]
Loading
Loading