|
| 1 | +"""Benchmarks for the internal multiprocessing mode (Subticket #7). |
| 2 | +
|
| 3 | +Goal: provide a reproducible "is multiprocessing actually faster?" check for |
| 4 | +the workloads multi_processing.md flags as the primary targets — the |
| 5 | +``ignore_order=True`` distance loop, paired-subtree diffs, and large lists of |
| 6 | +nested dicts. Each workload runs serial first, then parallel at a few worker |
| 7 | +counts; we print a single results table. |
| 8 | +
|
| 9 | +Usage:: |
| 10 | +
|
| 11 | + source ~/.venvs/deep/bin/activate |
| 12 | + python -m benchmarks.multiprocessing_bench |
| 13 | +
|
| 14 | + # Smaller, faster sweep: |
| 15 | + python -m benchmarks.multiprocessing_bench --quick |
| 16 | +
|
| 17 | + # Just one workload: |
| 18 | + python -m benchmarks.multiprocessing_bench --only paired_subtree |
| 19 | +
|
| 20 | +The script also asserts that the parallel result equals the serial result for |
| 21 | +every workload — a benchmark that produces wrong answers is worse than no |
| 22 | +benchmark at all. If any pair diverges the script exits non-zero. |
| 23 | +
|
| 24 | +The numbers here are not committed; they're meant to inform threshold tuning |
| 25 | +(see DEFAULT_THRESHOLD in deepdiff/_multiprocessing.py) and to expose |
| 26 | +regressions when the hot path changes. Re-run on your hardware before drawing |
| 27 | +conclusions — process spawn overhead and IPC pickle cost vary wildly across |
| 28 | +machines. |
| 29 | +""" |
| 30 | + |
| 31 | +import argparse |
| 32 | +import os |
| 33 | +import sys |
| 34 | +import time |
| 35 | +from typing import Any, Callable, Dict, List, Tuple |
| 36 | + |
| 37 | +# Make the package importable when the script is run from a checkout. |
| 38 | +HERE = os.path.dirname(os.path.abspath(__file__)) |
| 39 | +ROOT = os.path.dirname(HERE) |
| 40 | +if ROOT not in sys.path: |
| 41 | + sys.path.insert(0, ROOT) |
| 42 | + |
| 43 | +from deepdiff import DeepDiff # noqa: E402 |
| 44 | + |
| 45 | + |
| 46 | +# --------------------------------------------------------------------------- |
| 47 | +# Workloads. |
| 48 | +# |
| 49 | +# Each builder returns ``(t1, t2, kwargs)`` where ``kwargs`` is the DeepDiff |
| 50 | +# constructor arguments common to both the serial and parallel runs. |
| 51 | +# Multiprocessing parameters are added by the runner; workloads should not set |
| 52 | +# them. |
| 53 | +# --------------------------------------------------------------------------- |
| 54 | + |
| 55 | + |
| 56 | +def workload_paired_subtree(scale: int) -> Tuple[Any, Any, Dict[str, Any]]: |
| 57 | + """Heavy paired-subtree diff path. |
| 58 | +
|
| 59 | + Each item is a small dict whose nested ``data`` differs by one element; |
| 60 | + pairing kicks in for every item, so the subtree-parallel path runs. |
| 61 | + """ |
| 62 | + n = scale |
| 63 | + t1 = [{"id": i, "data": {"x": i, "y": [i, i + 1, i + 2]}} for i in range(n)] |
| 64 | + t2 = [{"id": i, "data": {"x": i, "y": [i, i + 1, i + 3]}} for i in range(n)] |
| 65 | + return t1, t2, {"ignore_order": True, "cutoff_intersection_for_pairs": 1} |
| 66 | + |
| 67 | + |
| 68 | +def workload_distance_loop(scale: int) -> Tuple[Any, Any, Dict[str, Any]]: |
| 69 | + """Heavy added-vs-removed distance grid. |
| 70 | +
|
| 71 | + All ids are disjoint between t1 and t2, so every t2 item is "added" and |
| 72 | + every t1 item is "removed". The candidate distance grid is N*N, which is |
| 73 | + where the distance worker pool earns its keep. |
| 74 | + """ |
| 75 | + n = scale |
| 76 | + t1 = [{"id": i, "v": [i, i, i]} for i in range(n)] |
| 77 | + t2 = [{"id": i + 10_000, "v": [i, i, i + 1]} for i in range(n)] |
| 78 | + return t1, t2, {"ignore_order": True, "cutoff_intersection_for_pairs": 1} |
| 79 | + |
| 80 | + |
| 81 | +def workload_large_nested_dicts(scale: int) -> Tuple[Any, Any, Dict[str, Any]]: |
| 82 | + """Large list of moderately-deep dicts with one mutation each. |
| 83 | +
|
| 84 | + The shape mirrors the JSON-like blobs the doc calls out: each item is |
| 85 | + several layers deep with a mix of strings, ints, and nested lists. |
| 86 | + """ |
| 87 | + n = scale |
| 88 | + |
| 89 | + def make(i: int, mutate: int) -> Dict[str, Any]: |
| 90 | + return { |
| 91 | + "id": i, |
| 92 | + "name": "name-%d" % i, |
| 93 | + "tags": ["t%d" % (i + j) for j in range(5)], |
| 94 | + "details": { |
| 95 | + "score": i + mutate, |
| 96 | + "history": [{"step": j, "value": j * 2 + mutate} for j in range(4)], |
| 97 | + "meta": {"created_at": "2024-01-%02d" % ((i % 28) + 1), |
| 98 | + "owner": "user-%d" % (i % 17)}, |
| 99 | + }, |
| 100 | + } |
| 101 | + |
| 102 | + t1 = [make(i, 0) for i in range(n)] |
| 103 | + t2 = [make(i, 1 if i % 7 == 0 else 0) for i in range(n)] |
| 104 | + return t1, t2, {"ignore_order": True, "cutoff_intersection_for_pairs": 1} |
| 105 | + |
| 106 | + |
| 107 | +WORKLOADS: Dict[str, Callable[[int], Tuple[Any, Any, Dict[str, Any]]]] = { |
| 108 | + "paired_subtree": workload_paired_subtree, |
| 109 | + "distance_loop": workload_distance_loop, |
| 110 | + "large_nested_dicts": workload_large_nested_dicts, |
| 111 | +} |
| 112 | + |
| 113 | + |
| 114 | +# --------------------------------------------------------------------------- |
| 115 | +# Runner. |
| 116 | +# --------------------------------------------------------------------------- |
| 117 | + |
| 118 | + |
| 119 | +def _time(fn: Callable[[], Any]) -> Tuple[float, Any]: |
| 120 | + start = time.perf_counter() |
| 121 | + result = fn() |
| 122 | + return time.perf_counter() - start, result |
| 123 | + |
| 124 | + |
| 125 | +def run_one(name: str, scale: int, worker_counts: List[int]) -> List[Dict[str, Any]]: |
| 126 | + """Run one workload serial + parallel and return one row per worker count. |
| 127 | +
|
| 128 | + The serial result is computed once and reused as the correctness reference |
| 129 | + for every parallel run. |
| 130 | + """ |
| 131 | + t1, t2, kwargs = WORKLOADS[name](scale) |
| 132 | + print(f"\n=== {name} (scale={scale}) ===") |
| 133 | + print(f"input shape: t1 has {len(t1)} items, t2 has {len(t2)} items") |
| 134 | + |
| 135 | + serial_time, serial_result = _time(lambda: DeepDiff(t1, t2, **kwargs)) |
| 136 | + print(f"serial: {serial_time:.3f}s") |
| 137 | + |
| 138 | + rows: List[Dict[str, Any]] = [{ |
| 139 | + "workload": name, "scale": scale, |
| 140 | + "mode": "serial", "workers": 1, |
| 141 | + "time_s": serial_time, "speedup": 1.0, |
| 142 | + "ok": True, |
| 143 | + }] |
| 144 | + |
| 145 | + for workers in worker_counts: |
| 146 | + parallel_time, parallel_result = _time(lambda: DeepDiff( |
| 147 | + t1, t2, |
| 148 | + multiprocessing=True, |
| 149 | + multiprocessing_workers=workers, |
| 150 | + multiprocessing_threshold=0, |
| 151 | + **kwargs, |
| 152 | + )) |
| 153 | + ok = parallel_result == serial_result |
| 154 | + speedup = serial_time / parallel_time if parallel_time > 0 else float("inf") |
| 155 | + marker = "" if ok else " !! RESULT MISMATCH !!" |
| 156 | + print(f"parallel(workers={workers}): {parallel_time:.3f}s " |
| 157 | + f"speedup={speedup:.2f}x{marker}") |
| 158 | + rows.append({ |
| 159 | + "workload": name, "scale": scale, |
| 160 | + "mode": "parallel", "workers": workers, |
| 161 | + "time_s": parallel_time, "speedup": speedup, |
| 162 | + "ok": ok, |
| 163 | + }) |
| 164 | + return rows |
| 165 | + |
| 166 | + |
| 167 | +def print_table(rows: List[Dict[str, Any]]) -> None: |
| 168 | + """Compact summary table at the end of the run.""" |
| 169 | + print("\n=== summary ===") |
| 170 | + header = ("workload", "scale", "mode", "workers", "time_s", "speedup", "ok") |
| 171 | + print("%-22s %6s %-9s %7s %10s %9s %4s" % header) |
| 172 | + print("-" * 72) |
| 173 | + for r in rows: |
| 174 | + print("%-22s %6d %-9s %7d %10.3f %9.2f %4s" % ( |
| 175 | + r["workload"], r["scale"], r["mode"], |
| 176 | + r["workers"], r["time_s"], r["speedup"], |
| 177 | + "yes" if r["ok"] else "NO", |
| 178 | + )) |
| 179 | + |
| 180 | + |
| 181 | +def main() -> int: |
| 182 | + parser = argparse.ArgumentParser(description=__doc__, |
| 183 | + formatter_class=argparse.RawDescriptionHelpFormatter) |
| 184 | + parser.add_argument( |
| 185 | + "--only", choices=list(WORKLOADS), action="append", default=None, |
| 186 | + help="run only the named workload(s); may be repeated. Default: all.", |
| 187 | + ) |
| 188 | + parser.add_argument( |
| 189 | + "--workers", type=int, action="append", default=None, |
| 190 | + help="explicit worker count to test; may be repeated. " |
| 191 | + "Default: 2 and min(4, cpu_count).", |
| 192 | + ) |
| 193 | + parser.add_argument( |
| 194 | + "--scale", type=int, default=None, |
| 195 | + help="override per-workload scale (number of items). Larger = more " |
| 196 | + "wall time. Default: a per-workload value below.", |
| 197 | + ) |
| 198 | + parser.add_argument( |
| 199 | + "--quick", action="store_true", |
| 200 | + help="use small scales for a fast sanity-check run.", |
| 201 | + ) |
| 202 | + args = parser.parse_args() |
| 203 | + |
| 204 | + workloads = args.only or list(WORKLOADS) |
| 205 | + cpu = os.cpu_count() or 1 |
| 206 | + workers_list = args.workers or [2, min(4, cpu)] |
| 207 | + # Deduplicate while preserving order — repeated --workers flags shouldn't |
| 208 | + # cause duplicate rows. |
| 209 | + workers_list = list(dict.fromkeys(workers_list)) |
| 210 | + |
| 211 | + # Default scales tuned so each row takes a few seconds serially. Override |
| 212 | + # via --scale or --quick. These are starting points, not gospel. |
| 213 | + default_scales = { |
| 214 | + "paired_subtree": 200, |
| 215 | + "distance_loop": 120, |
| 216 | + "large_nested_dicts": 200, |
| 217 | + } |
| 218 | + quick_scales = { |
| 219 | + "paired_subtree": 60, |
| 220 | + "distance_loop": 40, |
| 221 | + "large_nested_dicts": 60, |
| 222 | + } |
| 223 | + scales = quick_scales if args.quick else default_scales |
| 224 | + if args.scale is not None: |
| 225 | + scales = {name: args.scale for name in workloads} |
| 226 | + |
| 227 | + print("DeepDiff multiprocessing benchmark") |
| 228 | + print(f"cpu_count={cpu} workers tested={workers_list}") |
| 229 | + |
| 230 | + all_rows: List[Dict[str, Any]] = [] |
| 231 | + for name in workloads: |
| 232 | + all_rows.extend(run_one(name, scales[name], workers_list)) |
| 233 | + |
| 234 | + print_table(all_rows) |
| 235 | + |
| 236 | + # Non-zero exit if any parallel run produced a different result than its |
| 237 | + # serial reference — that's the one regression mode this script must catch. |
| 238 | + if any(not r["ok"] for r in all_rows): |
| 239 | + print("\nFAIL: at least one parallel run did not match its serial reference.") |
| 240 | + return 1 |
| 241 | + return 0 |
| 242 | + |
| 243 | + |
| 244 | +if __name__ == "__main__": |
| 245 | + sys.exit(main()) |
0 commit comments