Skip to content

Commit 1ab895f

Browse files
authored
Fix thread safety issues (#24)
* fix: make `packed.load` and `binarywave.load` thread-safe * test: add tests * style: format code
1 parent f435904 commit 1ab895f

3 files changed

Lines changed: 69 additions & 35 deletions

File tree

igor2/binarywave.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -793,18 +793,23 @@ def post_unpack(self, parents, data):
793793
count=0, array=True),
794794
])
795795

796-
Wave = _DynamicStructure(
797-
name='Wave',
798-
fields=[
799-
DynamicVersionField(
800-
'h',
801-
'version',
802-
help='Version number for backwards compatibility.'),
803-
DynamicWaveField(
804-
Wave1,
805-
'wave',
806-
help='The rest of the wave data.'),
807-
])
796+
797+
def setup_wave(byte_order='='):
798+
wave = _DynamicStructure(
799+
name='Wave',
800+
fields=[
801+
DynamicVersionField(
802+
'h',
803+
'version',
804+
help='Version number for backwards compatibility.'),
805+
DynamicWaveField(
806+
Wave1,
807+
'wave',
808+
help='The rest of the wave data.'),
809+
],
810+
byte_order=byte_order)
811+
wave.setup()
812+
return wave
808813

809814

810815
def load(filename):
@@ -813,9 +818,8 @@ def load(filename):
813818
else:
814819
f = open(filename, 'rb')
815820
try:
816-
Wave.byte_order = '='
817-
Wave.setup()
818-
data = Wave.unpack_stream(f)
821+
wave = setup_wave()
822+
data = wave.unpack_stream(f)
819823
finally:
820824
if not hasattr(filename, 'read'):
821825
f.close()

igor2/packed.py

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,30 @@
2525
# files, you must skip any record with a record type that is not
2626
# listed above.
2727

28-
PackedFileRecordHeader = _Structure(
29-
name='PackedFileRecordHeader',
30-
fields=[
31-
_Field('H', 'recordType', help='Record type plus superceded flag.'),
32-
_Field('h', 'version',
33-
help='Version information depends on the type of record.'),
34-
_Field('l', 'numDataBytes',
35-
help='Number of data bytes in the record following this'
36-
'record header.'),
37-
])
38-
3928
# CR_STR = '\x15' (\r)
4029

4130
PACKEDRECTYPE_MASK = 0x7FFF # Record type = (recordType & PACKEDREC_TYPE_MASK)
4231
SUPERCEDED_MASK = 0x8000 # Bit is set if the record is superceded by
4332
# a later record in the packed file.
4433

4534

35+
def setup_packed_file_record_header(byte_order='@'):
36+
record_header = _Structure(
37+
name='PackedFileRecordHeader',
38+
fields=[
39+
_Field('H', 'recordType',
40+
help='Record type plus superceded flag.'),
41+
_Field('h', 'version',
42+
help='Version information depends on the type of record.'),
43+
_Field('l', 'numDataBytes',
44+
help='Number of data bytes in the record following this'
45+
'record header.'),
46+
],
47+
byte_order=byte_order)
48+
record_header.setup()
49+
return record_header
50+
51+
4652
def load(filename, strict=True, ignore_unknown=True, initial_byte_order=None):
4753
"""Load a packed experiment file.
4854
@@ -77,27 +83,27 @@ def load(filename, strict=True, ignore_unknown=True, initial_byte_order=None):
7783
initial_byte_order = '='
7884
try:
7985
while True:
80-
PackedFileRecordHeader.byte_order = initial_byte_order
81-
PackedFileRecordHeader.setup()
82-
b = bytes(f.read(PackedFileRecordHeader.size))
86+
header_struct = setup_packed_file_record_header(
87+
byte_order=initial_byte_order)
88+
b = bytes(f.read(header_struct.size))
8389
if not b:
8490
break
85-
if len(b) < PackedFileRecordHeader.size:
91+
if len(b) < header_struct.size:
8692
raise ValueError(
8793
('not enough data for the next record header ({} < {})'
88-
).format(len(b), PackedFileRecordHeader.size))
94+
).format(len(b), header_struct.size))
8995
logger.debug('reading a new packed experiment file record')
90-
header = PackedFileRecordHeader.unpack_from(b)
96+
header = header_struct.unpack_from(b)
9197
if header['version'] and not byte_order:
9298
need_to_reorder = _need_to_reorder_bytes(header['version'])
9399
byte_order = initial_byte_order = _byte_order(need_to_reorder)
94100
logger.debug(
95101
'get byte order from version: %s (reorder? %s)',
96102
byte_order, need_to_reorder)
97103
if need_to_reorder:
98-
PackedFileRecordHeader.byte_order = byte_order
99-
PackedFileRecordHeader.setup()
100-
header = PackedFileRecordHeader.unpack_from(b)
104+
header_struct = setup_packed_file_record_header(
105+
byte_order=byte_order)
106+
header = header_struct.unpack_from(b)
101107
logger.debug(
102108
'reordered version: %s', header['version'])
103109
data = bytes(f.read(header['numDataBytes']))

tests/test_pxp.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import threading
2+
13
import numpy as np
24

35
from igor2.packed import load as loadpxp
@@ -152,3 +154,25 @@ def test_pxt():
152154
12647., 14242., 14470., 13913., 14158., 14754., 14462., 14346.,
153155
14219., 13467., 13595., 14331., 13960., 12934., 12897., 13557.,
154156
13105., 12797., 13234., 13053., 13455., 12825.], dtype='>f8'))
157+
158+
159+
def test_thread_safe():
160+
161+
def worker(fileobj, thread_id):
162+
expt = None
163+
for bo in ('<', '>'):
164+
try:
165+
_, expt = loadpxp(fileobj, initial_byte_order=bo)
166+
except ValueError:
167+
pass
168+
if expt is None:
169+
raise ValueError(f"No experiment loaded for thread {thread_id}")
170+
171+
threads = []
172+
for i, fname in enumerate([data_dir / 'packed-byteorder.pxt'] * 100):
173+
t = threading.Thread(target=worker, args=(fname, i))
174+
threads.append(t)
175+
t.start()
176+
177+
for t in threads:
178+
t.join()

0 commit comments

Comments
 (0)