Skip to content

Commit 5c49e31

Browse files
Copilot777arc
andauthored
Add comprehensive tests for SigMFCollection
Agent-Logs-Url: https://github.com/sigmf/sigmf-python/sessions/a19ae54f-6f9b-46e2-acd0-72987bf4d76c Co-authored-by: 777arc <5722532+777arc@users.noreply.github.com>
1 parent 2690ea2 commit 5c49e31

1 file changed

Lines changed: 331 additions & 0 deletions

File tree

tests/test_collection.py

Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,21 @@
77
"""Tests for collections"""
88

99
import copy
10+
import json
1011
import os
1112
import shutil
1213
import tempfile
1314
import unittest
1415
from pathlib import Path
1516

17+
import jsonschema
1618
import numpy as np
1719
from hypothesis import given
1820
from hypothesis import strategies as st
1921

22+
from sigmf import schema
2023
from sigmf.archive import SIGMF_COLLECTION_EXT, SIGMF_DATASET_EXT, SIGMF_METADATA_EXT
24+
from sigmf.error import SigMFFileError, SigMFFileExistsError
2125
from sigmf.sigmffile import SigMFCollection, SigMFFile, fromfile
2226

2327
from .testdata import TEST_FLOAT32_DATA, TEST_METADATA
@@ -74,3 +78,330 @@ def test_load_collection(self, subdir: str) -> None:
7478

7579
self.assertTrue(np.array_equal(TEST_FLOAT32_DATA, meta1_loopback.read_samples()))
7680
self.assertTrue(np.array_equal(TEST_FLOAT32_DATA, meta2_loopback[:]))
81+
82+
83+
class TestCollectionConstructor(unittest.TestCase):
84+
"""tests for SigMFCollection constructor"""
85+
86+
def test_empty_constructor(self):
87+
"""test that a collection can be created with no arguments"""
88+
collection = SigMFCollection(skip_checksums=True)
89+
self.assertIsInstance(collection, SigMFCollection)
90+
self.assertEqual(len(collection), 0)
91+
self.assertEqual(collection.get_stream_names(), [])
92+
93+
def test_constructor_with_metadata(self):
94+
"""test that a collection can be created with a metadata dict"""
95+
from sigmf import __specification__
96+
97+
metadata = {
98+
SigMFCollection.COLLECTION_KEY: {
99+
SigMFCollection.VERSION_KEY: __specification__,
100+
SigMFCollection.STREAMS_KEY: [],
101+
}
102+
}
103+
collection = SigMFCollection(metadata=metadata, skip_checksums=True)
104+
self.assertIsInstance(collection, SigMFCollection)
105+
self.assertEqual(len(collection), 0)
106+
107+
108+
class TestCollectionRoundTrip(unittest.TestCase):
109+
"""tests for SigMFCollection round-trip write/read"""
110+
111+
def setUp(self):
112+
"""create temporary directory and populate with SigMF files"""
113+
self.temp_dir = Path(tempfile.mkdtemp())
114+
# create two SigMF recordings
115+
meta_name1 = "stream0" + SIGMF_METADATA_EXT
116+
meta_name2 = "stream1" + SIGMF_METADATA_EXT
117+
data_path1 = self.temp_dir / ("stream0" + SIGMF_DATASET_EXT)
118+
data_path2 = self.temp_dir / ("stream1" + SIGMF_DATASET_EXT)
119+
TEST_FLOAT32_DATA.tofile(data_path1)
120+
TEST_FLOAT32_DATA.tofile(data_path2)
121+
meta1 = SigMFFile(metadata=copy.deepcopy(TEST_METADATA), data_file=data_path1)
122+
meta2 = SigMFFile(metadata=copy.deepcopy(TEST_METADATA), data_file=data_path2)
123+
meta1.tofile(self.temp_dir / meta_name1, overwrite=True)
124+
meta2.tofile(self.temp_dir / meta_name2, overwrite=True)
125+
self.meta_name1 = meta_name1
126+
self.meta_name2 = meta_name2
127+
self.collection_path = self.temp_dir / ("mycollection" + SIGMF_COLLECTION_EXT)
128+
129+
def tearDown(self):
130+
"""remove temporary directory"""
131+
shutil.rmtree(self.temp_dir)
132+
133+
def test_round_trip_metadata(self):
134+
"""test that collection metadata survives a write/read round-trip"""
135+
collection = SigMFCollection(
136+
metafiles=[self.meta_name1, self.meta_name2],
137+
base_path=str(self.temp_dir),
138+
)
139+
collection.set_collection_field(SigMFCollection.AUTHOR_KEY, "Round Trip Tester")
140+
collection.set_collection_field(SigMFCollection.DESCRIPTION_KEY, "A round-trip test collection")
141+
collection.set_collection_field(SigMFCollection.LICENSE_KEY, "https://creativecommons.org/licenses/by-sa/4.0/")
142+
143+
collection.tofile(self.collection_path)
144+
145+
# read back
146+
collection_rt = fromfile(self.collection_path)
147+
148+
self.assertIsInstance(collection_rt, SigMFCollection)
149+
self.assertEqual(len(collection_rt), 2)
150+
self.assertEqual(collection_rt.get_stream_names(), ["stream0", "stream1"])
151+
self.assertEqual(collection_rt.get_collection_field(SigMFCollection.AUTHOR_KEY), "Round Trip Tester")
152+
self.assertEqual(
153+
collection_rt.get_collection_field(SigMFCollection.DESCRIPTION_KEY), "A round-trip test collection"
154+
)
155+
self.assertEqual(
156+
collection_rt.get_collection_field(SigMFCollection.LICENSE_KEY),
157+
"https://creativecommons.org/licenses/by-sa/4.0/",
158+
)
159+
160+
def test_round_trip_collection_info(self):
161+
"""test that get_collection_info returns a dict matching what was set"""
162+
collection = SigMFCollection(
163+
metafiles=[self.meta_name1, self.meta_name2],
164+
base_path=str(self.temp_dir),
165+
)
166+
collection.set_collection_field(SigMFCollection.AUTHOR_KEY, "Test Author")
167+
collection.tofile(self.collection_path)
168+
169+
collection_rt = fromfile(self.collection_path)
170+
info = collection_rt.get_collection_info()
171+
self.assertIsInstance(info, dict)
172+
self.assertIn(SigMFCollection.AUTHOR_KEY, info)
173+
self.assertEqual(info[SigMFCollection.AUTHOR_KEY], "Test Author")
174+
self.assertIn(SigMFCollection.VERSION_KEY, info)
175+
self.assertIn(SigMFCollection.STREAMS_KEY, info)
176+
177+
def test_round_trip_json_content(self):
178+
"""test that the written collection file is valid JSON with expected structure"""
179+
collection = SigMFCollection(
180+
metafiles=[self.meta_name1, self.meta_name2],
181+
base_path=str(self.temp_dir),
182+
)
183+
collection.tofile(self.collection_path)
184+
185+
with open(self.collection_path, "r") as f:
186+
data = json.load(f)
187+
188+
self.assertIn(SigMFCollection.COLLECTION_KEY, data)
189+
self.assertIn(SigMFCollection.STREAMS_KEY, data[SigMFCollection.COLLECTION_KEY])
190+
self.assertIn(SigMFCollection.VERSION_KEY, data[SigMFCollection.COLLECTION_KEY])
191+
streams = data[SigMFCollection.COLLECTION_KEY][SigMFCollection.STREAMS_KEY]
192+
self.assertEqual(len(streams), 2)
193+
for stream in streams:
194+
self.assertIn("name", stream)
195+
self.assertIn("hash", stream)
196+
197+
198+
class TestCollectionValidation(unittest.TestCase):
199+
"""tests for SigMFCollection validation against the JSON schema"""
200+
201+
def _validate(self, metadata):
202+
"""helper: validate collection metadata against the collection schema"""
203+
col_schema = schema.get_schema(schema_file=schema.SCHEMA_COLLECTION)
204+
jsonschema.validators.validate(instance=metadata, schema=col_schema)
205+
206+
def test_valid_empty_collection(self):
207+
"""a minimal collection with only core:version should be schema-valid"""
208+
collection = SigMFCollection(skip_checksums=True)
209+
self._validate(collection._metadata)
210+
211+
def test_valid_collection_with_optional_fields(self):
212+
"""a collection with optional fields set should be schema-valid"""
213+
collection = SigMFCollection(skip_checksums=True)
214+
collection.set_collection_field(SigMFCollection.AUTHOR_KEY, "Test Author")
215+
collection.set_collection_field(SigMFCollection.DESCRIPTION_KEY, "Test description")
216+
collection.set_collection_field(SigMFCollection.LICENSE_KEY, "https://example.com/license")
217+
collection.set_collection_field(SigMFCollection.COLLECTION_DOI_KEY, "10.1000/xyz123")
218+
self._validate(collection._metadata)
219+
220+
def test_invalid_collection_missing_version(self):
221+
"""a collection missing core:version should fail schema validation"""
222+
metadata = {SigMFCollection.COLLECTION_KEY: {}}
223+
col_schema = schema.get_schema(schema_file=schema.SCHEMA_COLLECTION)
224+
with self.assertRaises(jsonschema.exceptions.ValidationError):
225+
jsonschema.validators.validate(instance=metadata, schema=col_schema)
226+
227+
def test_invalid_collection_missing_collection_key(self):
228+
"""a metadata dict without the top-level 'collection' key should fail"""
229+
metadata = {}
230+
col_schema = schema.get_schema(schema_file=schema.SCHEMA_COLLECTION)
231+
with self.assertRaises(jsonschema.exceptions.ValidationError):
232+
jsonschema.validators.validate(instance=metadata, schema=col_schema)
233+
234+
def test_valid_collection_with_extensions(self):
235+
"""a collection with a valid extensions array should be schema-valid"""
236+
collection = SigMFCollection(skip_checksums=True)
237+
collection.set_collection_field(
238+
SigMFCollection.EXTENSIONS_KEY,
239+
[{"name": "antenna", "version": "1.0.0", "optional": True}],
240+
)
241+
self._validate(collection._metadata)
242+
243+
244+
class TestCollectionCommonUseCases(unittest.TestCase):
245+
"""tests for common SigMFCollection use cases"""
246+
247+
def setUp(self):
248+
"""create temporary directory and two SigMF recordings"""
249+
self.temp_dir = Path(tempfile.mkdtemp())
250+
for name in ("rec0", "rec1", "rec2"):
251+
data_path = self.temp_dir / (name + SIGMF_DATASET_EXT)
252+
meta_path = self.temp_dir / (name + SIGMF_METADATA_EXT)
253+
TEST_FLOAT32_DATA.tofile(data_path)
254+
meta = SigMFFile(metadata=copy.deepcopy(TEST_METADATA), data_file=data_path)
255+
meta.tofile(meta_path, overwrite=True)
256+
self.metafiles = [f"{name}{SIGMF_METADATA_EXT}" for name in ("rec0", "rec1", "rec2")]
257+
258+
def tearDown(self):
259+
shutil.rmtree(self.temp_dir)
260+
261+
def _make_collection(self, metafiles=None):
262+
"""helper: create a SigMFCollection using the temp dir"""
263+
if metafiles is None:
264+
metafiles = self.metafiles
265+
return SigMFCollection(metafiles=metafiles, base_path=str(self.temp_dir))
266+
267+
def test_len(self):
268+
"""__len__ should return the number of streams"""
269+
collection = self._make_collection()
270+
self.assertEqual(len(collection), 3)
271+
272+
def test_len_empty(self):
273+
"""an empty collection should have length 0"""
274+
collection = SigMFCollection(skip_checksums=True)
275+
self.assertEqual(len(collection), 0)
276+
277+
def test_get_stream_names(self):
278+
"""get_stream_names should return base names in order"""
279+
collection = self._make_collection()
280+
names = collection.get_stream_names()
281+
self.assertEqual(names, ["rec0", "rec1", "rec2"])
282+
283+
def test_get_sigmffile_by_index(self):
284+
"""get_SigMFFile with stream_index should return correct SigMFFile"""
285+
collection = self._make_collection()
286+
sf = collection.get_SigMFFile(stream_index=0)
287+
self.assertIsInstance(sf, SigMFFile)
288+
self.assertTrue(np.array_equal(TEST_FLOAT32_DATA, sf.read_samples()))
289+
290+
def test_get_sigmffile_by_name(self):
291+
"""get_SigMFFile with stream_name should return correct SigMFFile"""
292+
collection = self._make_collection()
293+
sf = collection.get_SigMFFile(stream_name="rec1")
294+
self.assertIsInstance(sf, SigMFFile)
295+
self.assertTrue(np.array_equal(TEST_FLOAT32_DATA, sf.read_samples()))
296+
297+
def test_get_sigmffile_invalid_name(self):
298+
"""get_SigMFFile with an unknown stream_name should return None"""
299+
collection = self._make_collection()
300+
result = collection.get_SigMFFile(stream_name="nonexistent")
301+
self.assertIsNone(result)
302+
303+
def test_set_get_collection_field(self):
304+
"""set_collection_field and get_collection_field should round-trip values"""
305+
collection = self._make_collection()
306+
collection.set_collection_field(SigMFCollection.AUTHOR_KEY, "Jane Doe")
307+
self.assertEqual(collection.get_collection_field(SigMFCollection.AUTHOR_KEY), "Jane Doe")
308+
309+
def test_get_collection_field_default(self):
310+
"""get_collection_field should return default when key is absent"""
311+
collection = self._make_collection()
312+
result = collection.get_collection_field("core:nonexistent_key", default="fallback")
313+
self.assertEqual(result, "fallback")
314+
315+
def test_set_get_collection_info(self):
316+
"""set_collection_info and get_collection_info should round-trip a dict"""
317+
from sigmf import __specification__
318+
319+
collection = self._make_collection()
320+
new_info = {
321+
SigMFCollection.VERSION_KEY: __specification__,
322+
SigMFCollection.AUTHOR_KEY: "Info Author",
323+
SigMFCollection.STREAMS_KEY: collection.get_collection_field(SigMFCollection.STREAMS_KEY),
324+
}
325+
collection.set_collection_info(new_info)
326+
info = collection.get_collection_info()
327+
self.assertEqual(info[SigMFCollection.AUTHOR_KEY], "Info Author")
328+
329+
def test_overwrite_protection(self):
330+
"""writing a collection to an existing file without overwrite=True should raise"""
331+
collection_path = self.temp_dir / ("test" + SIGMF_COLLECTION_EXT)
332+
collection = self._make_collection()
333+
collection.tofile(collection_path)
334+
with self.assertRaises(SigMFFileExistsError):
335+
collection.tofile(collection_path)
336+
337+
def test_overwrite_allowed(self):
338+
"""writing with overwrite=True should succeed even if file exists"""
339+
collection_path = self.temp_dir / ("test" + SIGMF_COLLECTION_EXT)
340+
collection = self._make_collection()
341+
collection.tofile(collection_path)
342+
collection.tofile(collection_path, overwrite=True)
343+
self.assertTrue(collection_path.exists())
344+
345+
def test_skip_checksums(self):
346+
"""skip_checksums=True should allow creating a collection without verifying hashes"""
347+
collection_path = self.temp_dir / ("test" + SIGMF_COLLECTION_EXT)
348+
collection = self._make_collection()
349+
collection.tofile(collection_path)
350+
# test via SigMFCollection constructor with skip_checksums=True
351+
with open(collection_path, "r") as f:
352+
metadata = json.load(f)
353+
collection_loaded = SigMFCollection(metadata=metadata, base_path=str(self.temp_dir), skip_checksums=True)
354+
self.assertIsInstance(collection_loaded, SigMFCollection)
355+
self.assertEqual(len(collection_loaded), 3)
356+
357+
def test_verify_stream_hashes_valid(self):
358+
"""verify_stream_hashes should not raise when hashes are correct"""
359+
collection = self._make_collection()
360+
# should not raise
361+
collection.verify_stream_hashes()
362+
363+
def test_verify_stream_hashes_invalid(self):
364+
"""verify_stream_hashes should raise when a stream hash is wrong"""
365+
collection = self._make_collection()
366+
# corrupt the hash of the first stream
367+
streams = collection.get_collection_field(SigMFCollection.STREAMS_KEY)
368+
streams[0]["hash"] = "badhash"
369+
collection.set_collection_field(SigMFCollection.STREAMS_KEY, streams)
370+
with self.assertRaises(SigMFFileError):
371+
collection.verify_stream_hashes()
372+
373+
def test_error_on_nonexistent_metafile(self):
374+
"""constructing a collection with a non-existent file should raise SigMFFileError"""
375+
with self.assertRaises(SigMFFileError):
376+
SigMFCollection(
377+
metafiles=["does_not_exist" + SIGMF_METADATA_EXT],
378+
base_path=str(self.temp_dir),
379+
)
380+
381+
def test_error_on_non_meta_extension(self):
382+
"""constructing a collection with a file lacking .sigmf-meta extension should raise"""
383+
with self.assertRaises(SigMFFileError):
384+
SigMFCollection(
385+
metafiles=["rec0" + SIGMF_DATASET_EXT],
386+
base_path=str(self.temp_dir),
387+
)
388+
389+
def test_set_streams_updates_hashes(self):
390+
"""set_streams should recompute hashes for the specified metafiles"""
391+
collection = self._make_collection(metafiles=["rec0" + SIGMF_METADATA_EXT])
392+
self.assertEqual(len(collection), 1)
393+
# add more streams
394+
collection.set_streams(["rec0" + SIGMF_METADATA_EXT, "rec1" + SIGMF_METADATA_EXT])
395+
self.assertEqual(len(collection), 2)
396+
names = collection.get_stream_names()
397+
self.assertIn("rec0", names)
398+
self.assertIn("rec1", names)
399+
400+
def test_collection_dumps_is_valid_json(self):
401+
"""dumps() should produce valid JSON containing collection data"""
402+
collection = self._make_collection()
403+
s = collection.dumps()
404+
data = json.loads(s)
405+
self.assertIn(SigMFCollection.COLLECTION_KEY, data)
406+
self.assertIn(SigMFCollection.STREAMS_KEY, data[SigMFCollection.COLLECTION_KEY])
407+
self.assertEqual(len(data[SigMFCollection.COLLECTION_KEY][SigMFCollection.STREAMS_KEY]), 3)

0 commit comments

Comments
 (0)