forked from meshtastic/python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_stream_interface.py
More file actions
190 lines (151 loc) · 7.59 KB
/
Copy pathtest_stream_interface.py
File metadata and controls
190 lines (151 loc) · 7.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
"""Meshtastic unit tests for stream_interface.py"""
import logging
from unittest.mock import MagicMock
import pytest
from ..stream_interface import StreamInterface
# import re
@pytest.mark.unit
def test_StreamInterface():
"""Test that we cannot instantiate a StreamInterface based on noProto"""
with pytest.raises(Exception) as pytest_wrapped_e:
StreamInterface()
assert pytest_wrapped_e.type == RuntimeError
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_StreamInterface_close_safe_when_thread_never_started():
"""close() must not raise RuntimeError when called before connect() has started the reader.
Hits the cleanup path used by __init__ when the handshake raises before the
reader thread is started.
"""
iface = StreamInterface(noProto=True, connectNow=False)
iface.stream = MagicMock()
# _rxThread was created in __init__ but never .start()'d. close() should
# detect that and skip join() instead of raising RuntimeError.
iface.close()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_StreamInterface_close_when_thread_never_started_closes_stream():
"""If no reader thread was started, close() should still close the stream."""
iface = StreamInterface(noProto=True, connectNow=False)
stream = MagicMock()
iface.stream = stream
iface.close()
stream.close.assert_called_once()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_StreamInterface_init_cleans_up_when_connect_raises():
"""If connect() raises during __init__, close() runs and the original exception propagates."""
cleanup_calls = []
class FailingConnectStream(StreamInterface):
"""Subclass whose connect() raises, to exercise the __init__ cleanup path."""
def __init__(self):
self.stream = MagicMock() # bypass StreamInterface abstract check
super().__init__(noProto=False, connectNow=True)
def connect(self):
raise RuntimeError("simulated handshake failure")
def close(self):
cleanup_calls.append("close")
super().close()
with pytest.raises(RuntimeError, match="simulated handshake failure"):
FailingConnectStream()
assert cleanup_calls == ["close"], "close() should be invoked exactly once on handshake failure"
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_StreamInterface_init_cleans_up_when_waitForConfig_raises():
"""If waitForConfig() raises after a successful connect(), close() runs and exception propagates."""
cleanup_calls = []
class FailingWaitStream(StreamInterface):
"""Subclass whose waitForConfig() raises, to exercise the second leg of cleanup."""
def __init__(self):
self.stream = MagicMock()
super().__init__(noProto=False, connectNow=True)
def connect(self):
# No-op connect — we are simulating handshake-stage failure, not connect-stage.
pass
def waitForConfig(self):
raise TimeoutError("simulated config-handshake timeout")
def close(self):
cleanup_calls.append("close")
super().close()
with pytest.raises(TimeoutError, match="simulated config-handshake timeout"):
FailingWaitStream()
assert cleanup_calls == ["close"], "close() should be invoked exactly once on handshake timeout"
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_StreamInterface_init_cleanup_does_not_shadow_original_exception():
"""If close() itself raises during __init__ cleanup, the original exception still propagates.
The cleanup uses contextlib.suppress(Exception) so that a secondary failure
in close() doesn't replace the real reason for the failed handshake.
"""
class CleanupRaisesStream(StreamInterface):
def __init__(self):
self.stream = MagicMock()
super().__init__(noProto=False, connectNow=True)
def connect(self):
raise RuntimeError("original handshake failure")
def close(self):
raise RuntimeError("secondary close failure — should be suppressed")
with pytest.raises(RuntimeError, match="original handshake failure"):
CleanupRaisesStream()
# Note: This takes a bit, so moving from unit to slow
@pytest.mark.unitslow
@pytest.mark.usefixtures("reset_mt_config")
def test_StreamInterface_with_noProto(caplog):
"""Test that we can instantiate a StreamInterface based on nonProto
and we can read/write bytes from a mocked stream
"""
stream = MagicMock()
test_data = b"hello"
stream.read.return_value = test_data
with caplog.at_level(logging.DEBUG):
iface = StreamInterface(noProto=True, connectNow=False)
iface.stream = stream
iface._writeBytes(test_data)
data = iface._readBytes(len(test_data))
assert data == test_data
# TODO
### Note: This takes a bit, so moving from unit to slow
### Tip: If you want to see the print output, run with '-s' flag:
### pytest -s meshtastic/tests/test_stream_interface.py::test_sendToRadioImpl
# @pytest.mark.unitslow
# @pytest.mark.usefixtures("reset_mt_config")
# def test_sendToRadioImpl(caplog):
# """Test _sendToRadioImpl()"""
#
## def add_header(b):
## """Add header stuffs for radio"""
## bufLen = len(b)
## header = bytes([START1, START2, (bufLen >> 8) & 0xff, bufLen & 0xff])
## return header + b
#
# # captured raw bytes of a Heltec2.1 radio with 2 channels (primary and a secondary channel named "gpio")
# raw_1_my_info = b'\x1a,\x08\xdc\x8c\xd5\xc5\x02\x18\r2\x0e1.2.49.5354c49P\x15]\xe1%\x17Eh\xe0\xa7\x12p\xe8\x9d\x01x\x08\x90\x01\x01'
# raw_2_node_info = b'"9\x08\xdc\x8c\xd5\xc5\x02\x12(\n\t!28b5465c\x12\x0cUnknown 465c\x1a\x03?5C"\x06$o(\xb5F\\0\n\x1a\x02 1%M<\xc6a'
# # pylint: disable=C0301
# raw_3_node_info = b'"C\x08\xa4\x8c\xd5\xc5\x02\x12(\n\t!28b54624\x12\x0cUnknown 4624\x1a\x03?24"\x06$o(\xb5F$0\n\x1a\x07 5MH<\xc6a%G<\xc6a=\x00\x00\xc0@'
# raw_4_complete = b'@\xcf\xe5\xd1\x8c\x0e'
# # pylint: disable=C0301
# raw_5_prefs = b'Z6\r\\F\xb5(\x15\\F\xb5("\x1c\x08\x06\x12\x13*\x11\n\x0f0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#\xb8\t\x015]$\xddk5\xd5\x7f!b=M<\xc6aP\x03`F'
# # pylint: disable=C0301
# raw_6_channel0 = b'Z.\r\\F\xb5(\x15\\F\xb5("\x14\x08\x06\x12\x0b:\t\x12\x05\x18\x01"\x01\x01\x18\x015^$\xddk5\xd6\x7f!b=M<\xc6aP\x03`F'
# # pylint: disable=C0301
# raw_7_channel1 = b'ZS\r\\F\xb5(\x15\\F\xb5("9\x08\x06\x120:.\x08\x01\x12(" \xb4&\xb3\xc7\x06\xd8\xe39%\xba\xa5\xee\x8eH\x06\xf6\xf4H\xe8\xd5\xc1[ao\xb5Y\\\xb4"\xafmi*\x04gpio\x18\x025_$\xddk5\xd7\x7f!b=M<\xc6aP\x03`F'
# raw_8_channel2 = b'Z)\r\\F\xb5(\x15\\F\xb5("\x0f\x08\x06\x12\x06:\x04\x08\x02\x12\x005`$\xddk5\xd8\x7f!b=M<\xc6aP\x03`F'
# raw_blank = b''
#
# test_data = b'hello'
# stream = MagicMock()
# #stream.read.return_value = add_header(test_data)
# stream.read.side_effect = [ raw_1_my_info, raw_2_node_info, raw_3_node_info, raw_4_complete,
# raw_5_prefs, raw_6_channel0, raw_7_channel1, raw_8_channel2,
# raw_blank, raw_blank]
# toRadio = MagicMock()
# toRadio.SerializeToString.return_value = test_data
# with caplog.at_level(logging.DEBUG):
# iface = StreamInterface(noProto=True, connectNow=False)
# iface.stream = stream
# iface.connect()
# iface._sendToRadioImpl(toRadio)
# assert re.search(r'Sending: ', caplog.text, re.MULTILINE)
# assert re.search(r'reading character', caplog.text, re.MULTILINE)
# assert re.search(r'In reader loop', caplog.text, re.MULTILINE)