Skip to content

Commit 0788ce0

Browse files
committed
Added Telemetry parsing
This patch adds telemetry packet parsing and unit tests.
1 parent c2a0f18 commit 0788ce0

3 files changed

Lines changed: 327 additions & 1 deletion

File tree

aprslib/parsing/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ def detect(x):
5757
'.':'reserved',
5858
'<':'station capabilities',
5959
'?':'general query format',
60-
'T':'telemetry report',
6160
'[':'maidenhead locator beacon',
6261
'\\':'unused',
6362
']':'unused',
@@ -202,6 +201,12 @@ def _try_toparse_body(packet_type, body, parsed):
202201

203202
body, result = parse_weather(body)
204203

204+
# Telemetry report
205+
elif packet_type == 'T':
206+
logger.debug("Attempting to parse as telemetry report")
207+
208+
body, result = parse_telemetry_report(body)
209+
205210
# postion report (regular or compressed)
206211
elif (packet_type in '!=/@;' or
207212
0 <= body.find('!') < 40): # page 28 of spec (PDF)

aprslib/parsing/telemetry.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
__all__ = [
77
'parse_comment_telemetry',
88
'parse_telemetry_config',
9+
'parse_telemetry_report',
910
]
1011

1112

@@ -98,3 +99,76 @@ def parse_telemetry_config(body):
9899

99100
return (body, parsed)
100101

102+
103+
def parse_telemetry_report(body):
104+
"""
105+
Parses APRS 1.2 telemetry report format: T#sss,aaa,bbb,ccc,ddd,eee,bbbbbbbb,comment
106+
107+
Format:
108+
- T# indicates telemetry report
109+
- sss is sequence number (000-999)
110+
- aaa to eee are 5 analog values (000-999)
111+
- bbbbbbbb is 8 binary digits (digital I/O)
112+
- comment is optional text
113+
114+
Returns (remaining_body, parsed_dict)
115+
"""
116+
parsed = {}
117+
118+
# Check if body starts with '#'
119+
if not body.startswith('#'):
120+
raise ParseError("telemetry report must start with '#'")
121+
122+
# Remove the '#' prefix
123+
body = body[1:]
124+
125+
# Split by comma - we need at least 7 fields (seq + 5 analog + 1 digital)
126+
parts = body.split(',', 7)
127+
128+
if len(parts) < 7:
129+
raise ParseError("telemetry report must have at least 7 comma-separated fields")
130+
131+
seq_str = parts[0]
132+
analog_strs = parts[1:6]
133+
digital_str = parts[6]
134+
comment = parts[7] if len(parts) > 7 else ''
135+
136+
# Validate and parse sequence number (3 digits, 000-999)
137+
if not re.match(r'^\d{1,3}$', seq_str):
138+
raise ParseError("telemetry sequence number must be 1-3 digits")
139+
seq = int(seq_str)
140+
if seq > 999:
141+
raise ParseError("telemetry sequence number must be 000-999")
142+
143+
# Parse analog values (can be 000-999, allow decimals and negatives per APRS 1.2)
144+
analog_vals = []
145+
for i, val_str in enumerate(analog_strs):
146+
# Allow integers, decimals, and negative numbers
147+
if not re.match(r'^-?\d+\.?\d*$', val_str):
148+
raise ParseError("telemetry analog value %d has invalid format" % (i+1))
149+
try:
150+
val = float(val_str)
151+
except ValueError:
152+
raise ParseError("telemetry analog value %d is not a valid number" % (i+1))
153+
analog_vals.append(val)
154+
155+
# Validate digital I/O (must be exactly 8 binary digits)
156+
if not re.match(r'^[01]{8}$', digital_str):
157+
raise ParseError("telemetry digital I/O must be exactly 8 binary digits")
158+
159+
parsed.update({
160+
'format': 'telemetry',
161+
'telemetry': {
162+
'seq': seq,
163+
'vals': analog_vals,
164+
'bits': digital_str
165+
}
166+
})
167+
168+
# Add comment if present
169+
if comment:
170+
parsed['comment'] = comment.strip(' ')
171+
172+
# Return empty remaining body since we consumed everything
173+
return ('', parsed)
174+

tests/test_parse_telemetry.py

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
import unittest
2+
3+
from aprslib import parse
4+
from aprslib.parsing.telemetry import parse_telemetry_report
5+
from aprslib.exceptions import ParseError
6+
7+
8+
class ParseTelemetryReport(unittest.TestCase):
9+
def setUp(self):
10+
self.maxDiff = None
11+
12+
def test_valid_telemetry_basic(self):
13+
"""Test basic valid telemetry packet with integers"""
14+
packet = "TESTCALL>APRS:T#123,456,789,012,345,678,11001010"
15+
result = parse(packet)
16+
17+
self.assertEqual(result['format'], 'telemetry')
18+
self.assertEqual(result['telemetry']['seq'], 123)
19+
self.assertEqual(result['telemetry']['vals'], [456, 789, 12, 345, 678])
20+
self.assertEqual(result['telemetry']['bits'], '11001010')
21+
self.assertNotIn('comment', result)
22+
23+
def test_valid_telemetry_with_comment(self):
24+
"""Test telemetry packet with comment"""
25+
packet = "TESTCALL>APRS:T#123,456,789,012,345,678,11001010,Test comment"
26+
result = parse(packet)
27+
28+
self.assertEqual(result['format'], 'telemetry')
29+
self.assertEqual(result['telemetry']['seq'], 123)
30+
self.assertEqual(result['telemetry']['vals'], [456, 789, 12, 345, 678])
31+
self.assertEqual(result['telemetry']['bits'], '11001010')
32+
self.assertEqual(result['comment'], 'Test comment')
33+
34+
def test_valid_telemetry_with_decimal_values(self):
35+
"""Test telemetry packet with decimal analog values (APRS 1.2)"""
36+
packet = "EL-PS7AD>RXTLM-1,TCPIP,qAR,PS7AD:T#121,0.00,0.00,0,0,0.0,00000000,SimplexLogic"
37+
result = parse(packet)
38+
39+
self.assertEqual(result['format'], 'telemetry')
40+
self.assertEqual(result['from'], 'EL-PS7AD')
41+
self.assertEqual(result['to'], 'RXTLM-1')
42+
self.assertEqual(result['telemetry']['seq'], 121)
43+
self.assertEqual(result['telemetry']['vals'], [0.0, 0.0, 0.0, 0.0, 0.0])
44+
self.assertEqual(result['telemetry']['bits'], '00000000')
45+
self.assertEqual(result['comment'], 'SimplexLogic')
46+
47+
def test_valid_telemetry_with_negative_values(self):
48+
"""Test telemetry packet with negative analog values"""
49+
packet = "TEST>APRS:T#123,-45.67,123.456,999,0,0.0,11111111"
50+
result = parse(packet)
51+
52+
self.assertEqual(result['format'], 'telemetry')
53+
self.assertEqual(result['telemetry']['seq'], 123)
54+
self.assertEqual(result['telemetry']['vals'], [-45.67, 123.456, 999.0, 0.0, 0.0])
55+
self.assertEqual(result['telemetry']['bits'], '11111111')
56+
57+
def test_valid_telemetry_mixed_integer_decimal(self):
58+
"""Test telemetry packet with mix of integer and decimal values"""
59+
packet = "TEST>APRS:T#001,100,200.5,300,400.25,500,01010101"
60+
result = parse(packet)
61+
62+
self.assertEqual(result['format'], 'telemetry')
63+
self.assertEqual(result['telemetry']['seq'], 1)
64+
self.assertEqual(result['telemetry']['vals'], [100.0, 200.5, 300.0, 400.25, 500.0])
65+
self.assertEqual(result['telemetry']['bits'], '01010101')
66+
67+
def test_valid_telemetry_min_values(self):
68+
"""Test telemetry packet with minimum values"""
69+
packet = "TEST>APRS:T#000,0,0,0,0,0,00000000"
70+
result = parse(packet)
71+
72+
self.assertEqual(result['format'], 'telemetry')
73+
self.assertEqual(result['telemetry']['seq'], 0)
74+
self.assertEqual(result['telemetry']['vals'], [0.0, 0.0, 0.0, 0.0, 0.0])
75+
self.assertEqual(result['telemetry']['bits'], '00000000')
76+
77+
def test_valid_telemetry_max_values(self):
78+
"""Test telemetry packet with maximum sequence number"""
79+
packet = "TEST>APRS:T#999,999,999,999,999,999,11111111"
80+
result = parse(packet)
81+
82+
self.assertEqual(result['format'], 'telemetry')
83+
self.assertEqual(result['telemetry']['seq'], 999)
84+
self.assertEqual(result['telemetry']['vals'], [999.0, 999.0, 999.0, 999.0, 999.0])
85+
self.assertEqual(result['telemetry']['bits'], '11111111')
86+
87+
def test_valid_telemetry_comment_with_commas(self):
88+
"""Test telemetry packet with comment containing commas"""
89+
packet = "TEST>APRS:T#123,456,789,012,345,678,11001010,Comment with, commas"
90+
result = parse(packet)
91+
92+
self.assertEqual(result['format'], 'telemetry')
93+
self.assertEqual(result['comment'], 'Comment with, commas')
94+
95+
def test_valid_telemetry_all_bits_set(self):
96+
"""Test telemetry packet with all digital bits set"""
97+
packet = "TEST>APRS:T#123,456,789,012,345,678,11111111"
98+
result = parse(packet)
99+
100+
self.assertEqual(result['telemetry']['bits'], '11111111')
101+
102+
def test_valid_telemetry_no_bits_set(self):
103+
"""Test telemetry packet with no digital bits set"""
104+
packet = "TEST>APRS:T#123,456,789,012,345,678,00000000"
105+
result = parse(packet)
106+
107+
self.assertEqual(result['telemetry']['bits'], '00000000')
108+
109+
def test_valid_telemetry_leading_zeros(self):
110+
"""Test telemetry packet with leading zeros in sequence and values"""
111+
packet = "TEST>APRS:T#001,002,003,004,005,006,11001010"
112+
result = parse(packet)
113+
114+
self.assertEqual(result['telemetry']['seq'], 1)
115+
self.assertEqual(result['telemetry']['vals'], [2.0, 3.0, 4.0, 5.0, 6.0])
116+
117+
def test_invalid_telemetry_missing_hash(self):
118+
"""Test that telemetry packet without '#' raises error"""
119+
packet = "TEST>APRS:T123,456,789,012,345,678,11001010"
120+
121+
with self.assertRaises(ParseError) as context:
122+
parse(packet)
123+
124+
self.assertIn("telemetry report must start with '#'", str(context.exception))
125+
126+
def test_invalid_telemetry_too_few_fields(self):
127+
"""Test that telemetry packet with too few fields raises error"""
128+
packet = "TEST>APRS:T#123,456"
129+
130+
with self.assertRaises(ParseError) as context:
131+
parse(packet)
132+
133+
self.assertIn("telemetry report must have at least 7 comma-separated fields", str(context.exception))
134+
135+
def test_invalid_telemetry_invalid_sequence(self):
136+
"""Test that invalid sequence number raises error"""
137+
packet = "TEST>APRS:T#abc,456,789,012,345,678,11001010"
138+
139+
with self.assertRaises(ParseError) as context:
140+
parse(packet)
141+
142+
self.assertIn("telemetry sequence number must be 1-3 digits", str(context.exception))
143+
144+
def test_invalid_telemetry_sequence_too_many_digits(self):
145+
"""Test that sequence number with too many digits raises error"""
146+
packet = "TEST>APRS:T#1000,456,789,012,345,678,11001010"
147+
148+
with self.assertRaises(ParseError) as context:
149+
parse(packet)
150+
151+
# "1000" has 4 digits, so it fails the digit count check first
152+
self.assertIn("telemetry sequence number must be 1-3 digits", str(context.exception))
153+
154+
def test_invalid_telemetry_sequence_range_check(self):
155+
"""Test that sequence number range validation works (999 is max)"""
156+
# Test that 999 is valid (boundary case)
157+
packet = "TEST>APRS:T#999,456,789,012,345,678,11001010"
158+
result = parse(packet)
159+
self.assertEqual(result['telemetry']['seq'], 999)
160+
161+
def test_invalid_telemetry_invalid_analog_value(self):
162+
"""Test that invalid analog value raises error"""
163+
packet = "TEST>APRS:T#123,abc,789,012,345,678,11001010"
164+
165+
with self.assertRaises(ParseError) as context:
166+
parse(packet)
167+
168+
self.assertIn("telemetry analog value", str(context.exception))
169+
self.assertIn("invalid format", str(context.exception))
170+
171+
def test_invalid_telemetry_invalid_digital_bits(self):
172+
"""Test that invalid digital I/O format raises error"""
173+
packet = "TEST>APRS:T#123,456,789,012,345,678,123"
174+
175+
with self.assertRaises(ParseError) as context:
176+
parse(packet)
177+
178+
self.assertIn("telemetry digital I/O must be exactly 8 binary digits", str(context.exception))
179+
180+
def test_invalid_telemetry_digital_bits_wrong_length(self):
181+
"""Test that digital I/O with wrong length raises error"""
182+
packet = "TEST>APRS:T#123,456,789,012,345,678,1100101"
183+
184+
with self.assertRaises(ParseError) as context:
185+
parse(packet)
186+
187+
self.assertIn("telemetry digital I/O must be exactly 8 binary digits", str(context.exception))
188+
189+
def test_invalid_telemetry_digital_bits_non_binary(self):
190+
"""Test that digital I/O with non-binary characters raises error"""
191+
packet = "TEST>APRS:T#123,456,789,012,345,678,11001012"
192+
193+
with self.assertRaises(ParseError) as context:
194+
parse(packet)
195+
196+
self.assertIn("telemetry digital I/O must be exactly 8 binary digits", str(context.exception))
197+
198+
def test_parse_telemetry_report_function_direct(self):
199+
"""Test parse_telemetry_report function directly"""
200+
body = "#123,456,789,012,345,678,11001010,Test"
201+
remaining, result = parse_telemetry_report(body)
202+
203+
self.assertEqual(remaining, '')
204+
self.assertEqual(result['format'], 'telemetry')
205+
self.assertEqual(result['telemetry']['seq'], 123)
206+
self.assertEqual(result['telemetry']['vals'], [456, 789, 12, 345, 678])
207+
self.assertEqual(result['telemetry']['bits'], '11001010')
208+
self.assertEqual(result['comment'], 'Test')
209+
210+
def test_parse_telemetry_report_function_no_hash(self):
211+
"""Test parse_telemetry_report function with body not starting with '#'"""
212+
body = "123,456,789,012,345,678,11001010"
213+
214+
with self.assertRaises(ParseError) as context:
215+
parse_telemetry_report(body)
216+
217+
self.assertIn("telemetry report must start with '#'", str(context.exception))
218+
219+
def test_telemetry_output_format(self):
220+
"""Test that parsed telemetry has correct output format"""
221+
packet = "TEST>APRS:T#123,456,789,012,345,678,11001010,Comment"
222+
result = parse(packet)
223+
224+
# Check structure
225+
self.assertIn('format', result)
226+
self.assertIn('telemetry', result)
227+
self.assertIn('seq', result['telemetry'])
228+
self.assertIn('vals', result['telemetry'])
229+
self.assertIn('bits', result['telemetry'])
230+
231+
# Check types
232+
self.assertIsInstance(result['telemetry']['seq'], int)
233+
self.assertIsInstance(result['telemetry']['vals'], list)
234+
self.assertEqual(len(result['telemetry']['vals']), 5)
235+
self.assertIsInstance(result['telemetry']['bits'], str)
236+
self.assertEqual(len(result['telemetry']['bits']), 8)
237+
238+
# Check that all vals are numbers (int or float)
239+
for val in result['telemetry']['vals']:
240+
self.assertIsInstance(val, (int, float))
241+
242+
# Check that bits are binary
243+
self.assertTrue(all(c in '01' for c in result['telemetry']['bits']))
244+
245+
246+
if __name__ == '__main__':
247+
unittest.main()

0 commit comments

Comments
 (0)