forked from meshtastic/python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_tcp_interface.py
More file actions
78 lines (62 loc) · 2.7 KB
/
Copy pathtest_tcp_interface.py
File metadata and controls
78 lines (62 loc) · 2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"""Meshtastic unit tests for tcp_interface.py"""
import re
from unittest.mock import MagicMock, patch
import pytest
from ..protobuf import config_pb2
from ..tcp_interface import TCPInterface
@pytest.mark.unit
def test_TCPInterface(capsys):
"""Test that we can instantiate a TCPInterface"""
with patch("socket.socket") as mock_socket:
iface = TCPInterface(hostname="localhost", noProto=True)
iface.localNode.localConfig.lora.CopyFrom(config_pb2.Config.LoRaConfig())
iface.myConnect()
iface.showInfo()
iface.localNode.showInfo()
out, err = capsys.readouterr()
assert re.search(r"Owner: None \(None\)", out, re.MULTILINE)
assert re.search(r"Nodes", out, re.MULTILINE)
assert re.search(r"Preferences", out, re.MULTILINE)
assert re.search(r"Channels", out, re.MULTILINE)
assert re.search(r"Primary channel URL", out, re.MULTILINE)
assert err == ""
assert mock_socket.called
iface.close()
@pytest.mark.unit
def test_TCPInterface_exception():
"""Test that we can instantiate a TCPInterface"""
def throw_an_exception():
raise ValueError("Fake exception.")
with patch(
"meshtastic.tcp_interface.TCPInterface._socket_shutdown"
) as mock_shutdown:
mock_shutdown.side_effect = throw_an_exception
with patch("socket.socket") as mock_socket:
iface = TCPInterface(hostname="localhost", noProto=True)
iface.myConnect()
iface.close()
assert mock_socket.called
assert mock_shutdown.called
@pytest.mark.unit
def test_TCPInterface_without_connecting():
"""Test that we can instantiate a TCPInterface with connectNow as false"""
with patch("socket.socket"):
iface = TCPInterface(hostname="localhost", noProto=True, connectNow=False)
assert iface.socket is None
@pytest.mark.unit
def test_TCPInterface_close_shutdowns_socket_before_super_close():
"""Close should unblock socket reads before waiting on StreamInterface.close()."""
iface = TCPInterface(hostname="localhost", noProto=True, connectNow=False)
sock = MagicMock()
iface.socket = sock
call_order = []
with patch.object(TCPInterface, "_socket_shutdown", autospec=True) as mock_shutdown:
with patch(
"meshtastic.stream_interface.StreamInterface.close", autospec=True
) as mock_super_close:
mock_shutdown.side_effect = lambda _self: call_order.append("shutdown")
mock_super_close.side_effect = lambda _self: call_order.append("super_close")
iface.close()
assert call_order == ["shutdown", "super_close"]
sock.close.assert_called_once()
assert iface.socket is None