Skip to content

Commit 86d884e

Browse files
authored
Merge pull request #24 from opesci/compressionV2
Compression
2 parents 77dad44 + 2a287df commit 86d884e

15 files changed

Lines changed: 720 additions & 105 deletions

examples/use_modernised.py

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -61,25 +61,11 @@ def __init__(self, symbols):
6161
raise Exception("Symbols must be a Mapping, for example a \
6262
dictionary.")
6363

64-
def save(self, ptr):
65-
"""Overwrite live-data in this Checkpoint object with data found at
66-
the ptr location."""
67-
i_ptr_lo = 0
68-
i_ptr_hi = 0
69-
for i in self.symbols:
70-
i_ptr_hi = i_ptr_hi + self.symbols[i].size
71-
ptr[i_ptr_lo:i_ptr_hi] = self.symbols[i].data[:]
72-
i_ptr_lo = i_ptr_hi
73-
74-
def load(self, ptr):
75-
"""Copy live-data from this Checkpoint object into the memory given by
76-
the ptr."""
77-
i_ptr_lo = 0
78-
i_ptr_hi = 0
79-
for i in self.symbols:
80-
i_ptr_hi = i_ptr_hi + self.symbols[i].size
81-
self.symbols[i].data[:] = ptr[i_ptr_lo:i_ptr_hi]
82-
i_ptr_lo = i_ptr_hi
64+
def get_data_location(self, timestep):
65+
return [x.data for x in list(self.symbols.values())]
66+
67+
def get_data(self, timestep):
68+
return [x.data for x in self.symbols.values()]
8369

8470
@property
8571
def size(self):

pyrevolve/compression.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import blosc
2+
import pyzfp
3+
import numpy as np
4+
from contexttimer import Timer
5+
from functools import partial
6+
import pickle
7+
8+
9+
DEFAULTS = {None: {}, 'blosc': {'chunk_size': 1000000},
10+
'zfp': {'tolerance': 0.0000001, 'parallel': True}}
11+
12+
13+
def init_compression(params):
14+
params = params.copy()
15+
scheme = params.pop('scheme', None)
16+
if scheme == 'custom':
17+
compressor = params.pop('compressor', None)
18+
decompressor = params.pop('decompressor', None)
19+
else:
20+
compressor = compressors[scheme]
21+
decompressor = decompressors[scheme]
22+
default_values = DEFAULTS[scheme]
23+
for k, v in default_values.items():
24+
if k not in params:
25+
params[k] = v
26+
part_compressor = partial(compressor, params)
27+
part_decompressor = partial(decompressor, params)
28+
return part_compressor, part_decompressor
29+
30+
31+
def no_compression_in(params, indata):
32+
return CompressedObject(memoryview(indata.tobytes()), shape=indata.shape,
33+
dtype=indata.dtype)
34+
35+
36+
def no_compression_out(params, indata):
37+
return np.frombuffer(indata.data, dtype=indata.dtype).reshape(indata.shape)
38+
39+
40+
def blosc_compress(params, indata):
41+
s = indata.tostring()
42+
chunk_size = params.get('chunk_size')
43+
chunked = [s[i:i+chunk_size] for i in range(0, len(s), chunk_size)]
44+
time = 0
45+
size = 0
46+
compressed = bytes()
47+
chunk_sizes = []
48+
for chunk in chunked:
49+
with Timer(factor=1000) as t:
50+
c = blosc.compress(chunk)
51+
compressed += c
52+
time += t.elapsed
53+
size += len(c)
54+
chunk_sizes.append(len(c))
55+
metadata = {'shape': indata.shape, 'dtype': indata.dtype,
56+
'chunks': chunk_sizes}
57+
return CompressedObject(data=compressed, metadata=metadata)
58+
59+
60+
def blosc_decompress(params, indata):
61+
compressed = indata.data
62+
chunk_sizes = indata.metadata['chunks']
63+
64+
ptr = 0
65+
decompressed = bytes()
66+
for s in chunk_sizes:
67+
c = compressed[ptr:(ptr + s)]
68+
d = blosc.decompress(c)
69+
decompressed += d
70+
ptr += s
71+
return np.frombuffer(decompressed,
72+
dtype=indata.dtype).reshape(indata.shape)
73+
74+
75+
class CompressedObject(object):
76+
def __init__(self, data, shape=None, dtype=None, metadata=None):
77+
assert(metadata is None or (shape is None and dtype is None))
78+
if metadata is not None:
79+
assert('shape' in metadata and 'dtype' in metadata)
80+
shape = metadata['shape']
81+
dtype = metadata['dtype']
82+
else:
83+
metadata = {'shape': shape, 'dtype': dtype}
84+
self.shape = shape
85+
self.dtype = dtype
86+
self.data = data
87+
self.metadata = metadata
88+
self.pickled_metadata = pickle.dumps(self.metadata)
89+
90+
91+
def zfp_compress(params, indata):
92+
return CompressedObject(memoryview(pyzfp.compress(indata, **params)),
93+
shape=indata.shape, dtype=indata.dtype)
94+
95+
96+
def zfp_decompress(params, indata):
97+
assert(isinstance(indata, CompressedObject))
98+
return pyzfp.decompress(indata.data, indata.shape, indata.dtype,
99+
**params)
100+
101+
102+
compressors = {None: no_compression_in, 'blosc': blosc_compress,
103+
'zfp': zfp_compress}
104+
decompressors = {None: no_compression_out, 'blosc': blosc_decompress,
105+
'zfp': zfp_decompress}
106+
allowed_names = [None, 'blosc', 'zfp']

pyrevolve/crevolve.pyx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ cimport revolve_c
33
from enum import Enum
44
import warnings
55

6+
from tools import OutputGrabber
7+
8+
69
class RevolveError(Exception):
710
pass
811
# TODO: the hardcoded limits really should be removed in a future version. This should be as easy as replacing the arrays in the C++ code with an std::vector.
@@ -78,7 +81,8 @@ cdef class CRevolve(object):
7881

7982
def revolve(self):
8083
cdef revolve_c.CACTION action
81-
action = revolve_c.revolve(self.__r)
84+
with OutputGrabber() as og:
85+
action = revolve_c.revolve(self.__r)
8286
if(action == revolve_c.CACTION_ADVANCE):
8387
retAction = Action.advance
8488
elif(action == revolve_c.CACTION_TAKESHOT):

pyrevolve/custom_pickle.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import pickle
2+
import numpy as np
3+
4+
5+
def dumps(data):
6+
if isinstance(data, np.ndarray):
7+
data = {'data': data.tobytes(), 'shape': data.shape,
8+
'dtype': data.dtype, 'creator': 'custom_pickle'}
9+
return pickle.dumps(data)
10+
11+
12+
def loads(data):
13+
outdata = pickle.loads(data)
14+
if isinstance(outdata, dict) \
15+
and 'creator' in outdata \
16+
and outdata['creator'] == 'custom_pickle':
17+
outdata = np.frombuffer(outdata['data'], dtype=outdata['dtype'])
18+
outdata = outdata.reshape(outdata['shape'])
19+
return outdata

pyrevolve/logger.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import logging
2+
3+
logger = logging.getLogger("pyRevolve")
4+
logger.setLevel(logging.DEBUG)
5+
6+
ch = logging.StreamHandler()
7+
ch.setLevel(logging.DEBUG)
8+
# create formatter and add it to the handlers
9+
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # noqa
10+
11+
ch.setFormatter(formatter)
12+
# add the handlers to the logger
13+
logger.addHandler(ch)

pyrevolve/profiling.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
from timeit import default_timer
2+
3+
4+
class Timer(object):
5+
def __init__(self, profiler, section, action):
6+
self.timer = default_timer
7+
self.profiler = profiler
8+
self.section = section
9+
self.action = action
10+
11+
def __enter__(self):
12+
self.start = self.timer()
13+
return self
14+
15+
def __exit__(self, *args):
16+
end = self.timer()
17+
self.elapsed_secs = end - self.start
18+
self.elapsed = self.elapsed_secs * 1000 # millisecs
19+
self.profiler.increment(self.section, self.action, self.elapsed)
20+
21+
22+
class Profiler(object):
23+
def __init__(self):
24+
self.timings = {}
25+
self.counts = {}
26+
27+
def get_timer(self, section, action):
28+
return Timer(self, section, action)
29+
30+
def increment(self, section, action, elapsed):
31+
# Warning: Not thread safe
32+
section_timings = self.timings.get(section, {})
33+
section_timings[action] = section_timings.get(action, 0) + elapsed
34+
self.timings[section] = section_timings
35+
36+
section_counts = self.counts.get(section, {})
37+
section_counts[action] = section_counts.get(action, 0) + 1
38+
self.counts[section] = section_counts
39+
40+
def summary(self):
41+
summary = '****************'
42+
for section, section_timings in self.timings.items():
43+
summary += '\nIn section %s:' % section
44+
for action, action_time in section_timings.items():
45+
summary += '\n\tAction %s: %f (%d)' \
46+
% (action, action_time,
47+
self.counts[section][action])
48+
summary += '\n****************'
49+
return summary
50+
51+
def get_dict(self):
52+
results = {}
53+
for s_n, s_dict in self.timings.items():
54+
for a_n, a_time in s_dict.items():
55+
results['%s_%s_timing' % (s_n, a_n)] = a_time
56+
57+
for s_n, s_dict in self.counts.items():
58+
for a_n, a_time in s_dict.items():
59+
results['%s_%s_counts' % (s_n, a_n)] = a_time
60+
61+
return results

0 commit comments

Comments
 (0)