-
Notifications
You must be signed in to change notification settings - Fork 132
Expand file tree
/
Copy pathfixture.py
More file actions
307 lines (261 loc) · 11.4 KB
/
fixture.py
File metadata and controls
307 lines (261 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
from __future__ import division
from __future__ import print_function
import cProfile
import gc
import pstats
import sys
import time
import traceback
from math import ceil
from .compat import INT
from .compat import XRANGE
from .timers import compute_timer_precision
from .utils import NameWrapper
from .utils import format_time
try:
import statistics
except (ImportError, SyntaxError):
statistics_error = traceback.format_exc()
statistics = None
else:
statistics_error = None
from .stats import Metadata
class FixtureAlreadyUsed(Exception):
pass
class BenchmarkFixture(object):
_precisions = {}
def __init__(self, node, disable_gc, timer, min_rounds, min_time, max_time, warmup, warmup_iterations,
calibration_precision, add_stats, logger, warner, disabled, cprofile, group=None, baseline=True):
self.name = node.name
self.baseline = baseline
self.fullname = node._nodeid
self.disabled = disabled
if hasattr(node, 'callspec'):
self.param = node.callspec.id
self.params = node.callspec.params
else:
self.param = None
self.params = None
self.group = group
self.has_error = False
self.extra_info = {}
self.skipped = False
self._disable_gc = disable_gc
self._timer = timer.target
self._min_rounds = min_rounds
self._max_time = float(max_time)
self._min_time = float(min_time)
self._add_stats = add_stats
self._calibration_precision = calibration_precision
self._warmup = warmup and warmup_iterations
self._logger = logger
self._warner = warner
self._cleanup_callbacks = []
self._mode = None
self.cprofile = cprofile
self.cprofile_stats = None
self.stats = None
@property
def enabled(self):
return not self.disabled
def _get_precision(self, timer):
if timer in self._precisions:
timer_precision = self._precisions[timer]
else:
timer_precision = self._precisions[timer] = compute_timer_precision(timer)
self._logger.debug("")
self._logger.debug("Computing precision for %s ... %ss." % (
NameWrapper(timer), format_time(timer_precision)), blue=True, bold=True)
return timer_precision
def _make_runner(self, function_to_benchmark, args, kwargs):
def runner(loops_range, timer=self._timer):
gc_enabled = gc.isenabled()
if self._disable_gc:
gc.disable()
tracer = sys.gettrace()
sys.settrace(None)
try:
if loops_range:
start = timer()
for _ in loops_range:
function_to_benchmark(*args, **kwargs)
end = timer()
return end - start
else:
start = timer()
result = function_to_benchmark(*args, **kwargs)
end = timer()
return end - start, result
finally:
sys.settrace(tracer)
if gc_enabled:
gc.enable()
return runner
def _make_stats(self, iterations):
bench_stats = Metadata(self, iterations=iterations, options={
"disable_gc": self._disable_gc,
"timer": self._timer,
"min_rounds": self._min_rounds,
"max_time": self._max_time,
"min_time": self._min_time,
"warmup": self._warmup,
})
self._add_stats(bench_stats)
self.stats = bench_stats
return bench_stats
def __call__(self, function_to_benchmark, *args, **kwargs):
if self._mode:
self.has_error = True
raise FixtureAlreadyUsed(
"Fixture can only be used once. Previously it was used in %s mode." % self._mode)
try:
self._mode = 'benchmark(...)'
return self._raw(function_to_benchmark, *args, **kwargs)
except Exception:
self.has_error = True
raise
def pedantic(self, target, args=(), kwargs=None, setup=None, rounds=1, warmup_rounds=0, iterations=1):
if self._mode:
self.has_error = True
raise FixtureAlreadyUsed(
"Fixture can only be used once. Previously it was used in %s mode." % self._mode)
try:
self._mode = 'benchmark.pedantic(...)'
return self._raw_pedantic(target, args=args, kwargs=kwargs, setup=setup, rounds=rounds,
warmup_rounds=warmup_rounds, iterations=iterations)
except Exception:
self.has_error = True
raise
def _raw(self, function_to_benchmark, *args, **kwargs):
if self.enabled:
runner = self._make_runner(function_to_benchmark, args, kwargs)
duration, iterations, loops_range = self._calibrate_timer(runner)
# Choose how many time we must repeat the test
rounds = int(ceil(self._max_time / duration))
rounds = max(rounds, self._min_rounds)
rounds = min(rounds, sys.maxsize)
stats = self._make_stats(iterations)
self._logger.debug(" Running %s rounds x %s iterations ..." % (rounds, iterations), yellow=True, bold=True)
run_start = time.time()
if self._warmup:
warmup_rounds = min(rounds, max(1, int(self._warmup / iterations)))
self._logger.debug(" Warmup %s rounds x %s iterations ..." % (warmup_rounds, iterations))
for _ in XRANGE(warmup_rounds):
runner(loops_range)
for _ in XRANGE(rounds):
stats.update(runner(loops_range))
self._logger.debug(" Ran for %ss." % format_time(time.time() - run_start), yellow=True, bold=True)
if self.enabled and self.cprofile:
profile = cProfile.Profile()
function_result = profile.runcall(function_to_benchmark, *args, **kwargs)
self.stats.cprofile_stats = pstats.Stats(profile)
else:
function_result = function_to_benchmark(*args, **kwargs)
return function_result
def _raw_pedantic(self, target, args=(), kwargs=None, setup=None, rounds=1, warmup_rounds=0, iterations=1):
if kwargs is None:
kwargs = {}
has_args = bool(args or kwargs)
if not isinstance(iterations, INT) or iterations < 1:
raise ValueError("Must have positive int for `iterations`.")
if not isinstance(rounds, INT) or rounds < 1:
raise ValueError("Must have positive int for `rounds`.")
if not isinstance(warmup_rounds, INT) or warmup_rounds < 0:
raise ValueError("Must have positive int for `warmup_rounds`.")
if iterations > 1 and setup:
raise ValueError("Can't use more than 1 `iterations` with a `setup` function.")
def make_arguments(args=args, kwargs=kwargs):
if setup:
maybe_args = setup()
if maybe_args:
if has_args:
raise TypeError("Can't use `args` or `kwargs` if `setup` returns the arguments.")
args, kwargs = maybe_args
return args, kwargs
if self.disabled:
args, kwargs = make_arguments()
return target(*args, **kwargs)
stats = self._make_stats(iterations)
loops_range = XRANGE(iterations) if iterations > 1 else None
for _ in XRANGE(warmup_rounds):
args, kwargs = make_arguments()
runner = self._make_runner(target, args, kwargs)
runner(loops_range)
for _ in XRANGE(rounds):
args, kwargs = make_arguments()
runner = self._make_runner(target, args, kwargs)
if loops_range:
duration = runner(loops_range)
else:
duration, result = runner(loops_range)
stats.update(duration)
if loops_range:
args, kwargs = make_arguments()
result = target(*args, **kwargs)
if self.cprofile:
profile = cProfile.Profile()
args, kwargs = make_arguments()
profile.runcall(target, *args, **kwargs)
self.stats.cprofile_stats = pstats.Stats(profile)
return result
def weave(self, target, **kwargs):
try:
import aspectlib
except ImportError as exc:
raise ImportError(exc.args, "Please install aspectlib or pytest-benchmark[aspect]")
def aspect(function):
def wrapper(*args, **kwargs):
return self(function, *args, **kwargs)
return wrapper
self._cleanup_callbacks.append(aspectlib.weave(target, aspect, **kwargs).rollback)
patch = weave
def _cleanup(self):
while self._cleanup_callbacks:
callback = self._cleanup_callbacks.pop()
callback()
if not self._mode and not self.skipped:
self._logger.warn("Benchmark fixture was not used at all in this test!",
warner=self._warner, suspend=True)
def _calibrate_timer(self, runner):
timer_precision = self._get_precision(self._timer)
min_time = max(self._min_time, timer_precision * self._calibration_precision)
min_time_estimate = min_time * 5 / self._calibration_precision
self._logger.debug("")
self._logger.debug(" Calibrating to target round %ss; will estimate when reaching %ss "
"(using: %s, precision: %ss)." % (
format_time(min_time),
format_time(min_time_estimate),
NameWrapper(self._timer),
format_time(timer_precision)
), yellow=True, bold=True)
loops = 1
while True:
loops_range = XRANGE(loops)
duration = runner(loops_range)
if self._warmup:
warmup_start = time.time()
warmup_iterations = 0
warmup_rounds = 0
while time.time() - warmup_start < self._max_time and warmup_iterations < self._warmup:
duration = min(duration, runner(loops_range))
warmup_rounds += 1
warmup_iterations += loops
self._logger.debug(" Warmup: %ss (%s x %s iterations)." % (
format_time(time.time() - warmup_start),
warmup_rounds, loops
))
self._logger.debug(" Measured %s iterations: %ss." % (loops, format_time(duration)), yellow=True)
if duration >= min_time:
break
if duration >= min_time_estimate:
# coarse estimation of the number of loops
loops = int(ceil(min_time * loops / duration))
self._logger.debug(" Estimating %s iterations." % loops, green=True)
if loops == 1:
# If we got a single loop then bail early - nothing to calibrate if the the
# test function is 100 times slower than the timer resolution.
loops_range = XRANGE(loops)
break
else:
loops *= 10
return duration, loops, loops_range