-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathtest_archive.py
More file actions
179 lines (147 loc) · 7.47 KB
/
test_archive.py
File metadata and controls
179 lines (147 loc) · 7.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# Copyright: Multiple Authors
#
# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python
#
# SPDX-License-Identifier: LGPL-3.0-or-later
"""Tests for SigMFArchive"""
import codecs
import copy
import json
import shutil
import tarfile
import tempfile
import unittest
from pathlib import Path
import jsonschema
import numpy as np
from sigmf import SigMFFile, __specification__, error, fromfile
from sigmf.archive import SIGMF_DATASET_EXT, SIGMF_METADATA_EXT
from .testdata import TEST_FLOAT32_DATA, TEST_METADATA
class TestSigMFArchive(unittest.TestCase):
"""Tests for SigMF Archive functionality"""
def setUp(self):
"""Create temporary directory and test SigMFFile"""
self.temp_dir = Path(tempfile.mkdtemp())
self.temp_path_data = self.temp_dir / "trash.sigmf-data"
self.temp_path_meta = self.temp_dir / "trash.sigmf-meta"
self.temp_path_archive = self.temp_dir / "test.sigmf"
TEST_FLOAT32_DATA.tofile(self.temp_path_data)
self.sigmf_object = SigMFFile(copy.deepcopy(TEST_METADATA), data_file=self.temp_path_data)
self.sigmf_object.tofile(self.temp_path_meta)
self.sigmf_object.tofile(self.temp_path_archive, toarchive=True)
self.sigmf_tarfile = tarfile.open(self.temp_path_archive, mode="r", format=tarfile.PAX_FORMAT)
def tearDown(self):
"""Clean up temporary directory"""
shutil.rmtree(self.temp_dir)
def test_archive_creation_requires_data_file(self):
"""Test that archiving without data file raises error"""
self.sigmf_object.data_file = None
with self.assertRaises(error.SigMFFileError):
self.sigmf_object.archive(name=self.temp_path_archive)
def test_archive_creation_validates_metadata(self):
"""Test that invalid metadata raises error"""
del self.sigmf_object._metadata["global"]["core:datatype"] # required field
with self.assertRaises(jsonschema.exceptions.ValidationError):
self.sigmf_object.archive(name=self.temp_path_archive)
def test_archive_creation_validates_extension(self):
"""Test that wrong extension raises error"""
wrong_name = self.temp_dir / "temp_archive.zip"
with self.assertRaises(error.SigMFFileError):
self.sigmf_object.archive(name=wrong_name)
def test_fileobj_ignores_extension(self):
"""Test that file object extension is ignored"""
temp_archive_tar = self.temp_dir / "test.sigmf.tar"
with open(temp_archive_tar, "wb") as temp:
self.sigmf_object.archive(fileobj=temp)
def test_custom_name_overrides_fileobj_name(self):
"""Test that name is used in file object"""
with open(self.temp_path_archive, "wb") as temp:
sigmf_archive = self.sigmf_object.archive(name="testarchive", fileobj=temp)
sigmf_tarfile = tarfile.open(sigmf_archive, mode="r")
basedir, file1, file2 = sigmf_tarfile.getmembers()
self.assertEqual(basedir.name, "testarchive")
self.assertEqual(Path(file1.name).stem, "testarchive")
self.assertEqual(Path(file2.name).stem, "testarchive")
def test_fileobj_remains_open_after_archive(self):
"""Test that file object is not closed after archiving"""
with open(self.temp_path_archive, "wb") as temp:
self.sigmf_object.archive(fileobj=temp)
self.assertFalse(temp.closed)
def test_readonly_fileobj_raises_error(self):
"""Test that unwritable file object raises error"""
temp_path = self.temp_dir / "temp_archive.sigmf"
temp_path.touch()
with open(temp_path, "rb") as temp:
with self.assertRaises(error.SigMFFileError):
self.sigmf_object.archive(fileobj=temp)
def test_invalid_path_raises_error(self):
"""Test that unwritable name raises error"""
# Cannot assume /root/ is unwritable (e.g. Docker environment)
# so use invalid filename
unwritable_file = "/bad_name/"
with self.assertRaises(error.SigMFFileError):
self.sigmf_object.archive(name=unwritable_file)
def test_archive_contains_directory_and_files(self):
"""Test archive layout structure"""
basedir, file1, file2 = self.sigmf_tarfile.getmembers()
self.assertTrue(tarfile.TarInfo.isdir(basedir))
self.assertTrue(tarfile.TarInfo.isfile(file1))
self.assertTrue(tarfile.TarInfo.isfile(file2))
def test_archive_files_have_correct_names_and_extensions(self):
"""Test tarfile names and extensions"""
basedir, file1, file2 = self.sigmf_tarfile.getmembers()
archive_name = basedir.name
self.assertEqual(archive_name, Path(self.temp_path_archive).stem)
file_extensions = {SIGMF_DATASET_EXT, SIGMF_METADATA_EXT}
file1_name, file1_ext = Path(file1.name).stem, Path(file1.name).suffix
self.assertEqual(file1_name, archive_name)
self.assertIn(file1_ext, file_extensions)
file_extensions.remove(file1_ext)
file2_name, file2_ext = Path(file2.name).stem, Path(file2.name).suffix
self.assertEqual(file2_name, archive_name)
self.assertIn(file2_ext, file_extensions)
def test_archive_files_have_correct_permissions(self):
"""Test tarfile permissions"""
basedir, file1, file2 = self.sigmf_tarfile.getmembers()
self.assertEqual(basedir.mode, 0o755)
self.assertEqual(file1.mode, 0o644)
self.assertEqual(file2.mode, 0o644)
def test_archive_contents_match_original_data(self):
"""Test archive contents"""
_, file1, file2 = self.sigmf_tarfile.getmembers()
if file1.name.endswith(SIGMF_METADATA_EXT):
mdfile = file1
datfile = file2
else:
mdfile = file2
datfile = file1
bytestream_reader = codecs.getreader("utf-8") # bytes -> str
mdfile_reader = bytestream_reader(self.sigmf_tarfile.extractfile(mdfile))
self.assertEqual(json.load(mdfile_reader), TEST_METADATA)
datfile_reader = self.sigmf_tarfile.extractfile(datfile)
# calling `fileno` on `tarfile.ExFileObject` throws error (?), but
# np.fromfile requires it, so we need this extra step
data = np.frombuffer(datfile_reader.read(), dtype=np.float32)
np.testing.assert_array_equal(data, TEST_FLOAT32_DATA)
def test_tarfile_format(self):
"""Tar file format is PAX"""
self.assertEqual(self.sigmf_tarfile.format, tarfile.PAX_FORMAT)
def test_archive_read_samples(self):
"""test that read_samples works correctly with archived data"""
# load from archive
archive_mdfile = fromfile(self.temp_path_archive)
# verify sample count matches
expected_sample_count = len(self.sigmf_object)
self.assertEqual(archive_mdfile.sample_count, expected_sample_count)
# verify read_samples returns same as slice
samples_orig = TEST_FLOAT32_DATA[3:13]
samples_read = archive_mdfile.read_samples(start_index=3, count=10)
samples_sliced = archive_mdfile[3:13]
np.testing.assert_array_equal(samples_orig, samples_sliced)
np.testing.assert_array_equal(samples_orig, samples_read)
def test_archive_read_samples_beyond_end(self):
"""test that read_samples beyond end of data raises error"""
meta = fromfile(self.temp_path_archive)
# FIXME: Should this raise a SigMFFileError instead?
with self.assertRaises(OSError):
meta.read_samples(start_index=meta.sample_count + 10, count=5)