Skip to content

Commit 061e11b

Browse files
committed
Code (deepdiff/_multiprocessing.py)
- New helpers _extract_worker_stats and _aggregate_worker_stats. - _distance_worker and _subtree_diff_worker now return a stats delta as a third tuple element. - compute_distances_parallel and compute_subtree_diffs_parallel now return (result, aggregated_stats) instead of bare result. Code (deepdiff/diff.py) - New stats keys WORKER_DIFF_COUNT, WORKER_PASSES_COUNT, WORKER_DISTANCE_CACHE_HIT_COUNT, WORKER_BATCH_COUNT added to _stats init. - New helper _merge_worker_stats (sums counters, OR-merges limit flags). - _maybe_compute_pair_distances_parallel and _dispatch_subtree_jobs unpack the new orchestrator return shape and merge. Tests - New classes TestWorkerStatsUnit, TestStatsKeys, TestWorkerStatsAggregationSlow (8 tests). - Updated TestSubtreeParallelHelper.test_empty_jobs_returns_empty_list for new return shape. - Updated expected_stats dicts in tests/test_cache.py (3 tests) and tests/test_ignore_order.py (2 tests) with the four new zeroed keys. - Full suite: 1148 pass, 35 multiprocessing pass with --runslow. Doc (docs/multi_processing.md) - Phase 4 implementation status, code locations, test summary, and Subticket #5 removed from "Not yet implemented".
1 parent dd2c678 commit 061e11b

6 files changed

Lines changed: 330 additions & 31 deletions

File tree

deepdiff/_multiprocessing.py

Lines changed: 78 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,46 @@
2323
DEFAULT_MAX_WORKERS = 4
2424
DEFAULT_THRESHOLD = 64
2525

26+
# Keys we lift out of a worker's internal _stats and ship back to the parent.
27+
# These mirror the same string constants used by ``deepdiff/diff.py``; we keep
28+
# string literals here to avoid importing diff.py at module load (which would
29+
# create an import cycle under spawn).
30+
_WORKER_STATS_COUNTER_KEYS = ('DIFF COUNT', 'PASSES COUNT', 'DISTANCE CACHE HIT COUNT')
31+
_WORKER_STATS_FLAG_KEYS = ('MAX PASS LIMIT REACHED', 'MAX DIFF LIMIT REACHED')
32+
33+
34+
def _extract_worker_stats(diff_instance: Any) -> Dict[str, Any]:
35+
"""Pull a small, picklable stats snapshot off a worker-local DeepDiff.
36+
37+
Returns a dict with integer counters plus boolean limit flags. Missing keys
38+
are tolerated so this stays robust if ``_stats`` shrinks at the end of
39+
``__init__`` (it currently deletes ``DISTANCE CACHE ENABLED`` and the
40+
``PREVIOUS *`` bookkeeping keys before we get here).
41+
"""
42+
stats = getattr(diff_instance, '_stats', None) or {}
43+
delta: Dict[str, Any] = {}
44+
for key in _WORKER_STATS_COUNTER_KEYS:
45+
delta[key] = int(stats.get(key, 0) or 0)
46+
for key in _WORKER_STATS_FLAG_KEYS:
47+
delta[key] = bool(stats.get(key, False))
48+
return delta
49+
50+
51+
def _aggregate_worker_stats(deltas: List[Dict[str, Any]]) -> Dict[str, Any]:
52+
"""Sum counter keys and OR-merge limit flags across worker deltas."""
53+
out: Dict[str, Any] = {key: 0 for key in _WORKER_STATS_COUNTER_KEYS}
54+
for key in _WORKER_STATS_FLAG_KEYS:
55+
out[key] = False
56+
for delta in deltas:
57+
if not delta:
58+
continue
59+
for key in _WORKER_STATS_COUNTER_KEYS:
60+
out[key] += int(delta.get(key, 0) or 0)
61+
for key in _WORKER_STATS_FLAG_KEYS:
62+
if delta.get(key):
63+
out[key] = True
64+
return out
65+
2666

2767
@dataclass(frozen=True)
2868
class MPConfig:
@@ -114,7 +154,9 @@ def _sanitize_parameters_for_worker(parameters: Dict[str, Any]) -> Dict[str, Any
114154
return sanitized
115155

116156

117-
def _distance_worker(job: Tuple[int, Dict[str, Any], Any, Any, Any, Any]) -> Tuple[int, float]:
157+
def _distance_worker(
158+
job: Tuple[int, Dict[str, Any], Any, Any, Any, Any],
159+
) -> Tuple[int, float, Dict[str, Any]]:
118160
"""Compute the rough distance between two items in a worker process.
119161
120162
``job`` layout matches what ``compute_distances_parallel`` ships:
@@ -123,7 +165,9 @@ def _distance_worker(job: Tuple[int, Dict[str, Any], Any, Any, Any, Any]) -> Tup
123165
124166
The worker constructs a fresh root ``DeepDiff`` (no shared parent state),
125167
requests the DELTA_VIEW so we hit the same code path as the serial call in
126-
``_get_rough_distance_of_hashed_objs``, and returns the resulting float.
168+
``_get_rough_distance_of_hashed_objs``, and returns the resulting float
169+
plus a ``_extract_worker_stats`` snapshot so the parent can aggregate
170+
diff/pass/cache-hit counts into its WORKER_* stats keys.
127171
"""
128172
# Imported here to keep module import cheap and to dodge any circular
129173
# import surprises under spawn.
@@ -144,7 +188,7 @@ def _distance_worker(job: Tuple[int, Dict[str, Any], Any, Any, Any, Any]) -> Tup
144188
# call below, hence cache_purge_level=0.
145189
cache_purge_level=0,
146190
)
147-
return job_index, cast(float, diff._get_rough_distance())
191+
return job_index, cast(float, diff._get_rough_distance()), _extract_worker_stats(diff)
148192

149193

150194
def compute_distances_parallel(
@@ -153,25 +197,28 @@ def compute_distances_parallel(
153197
original_type: Any,
154198
iterable_compare_func: Optional[Callable],
155199
config: MPConfig,
156-
) -> Optional[Dict[Tuple[Any, Any], float]]:
200+
) -> Optional[Tuple[Dict[Tuple[Any, Any], float], Dict[str, Any]]]:
157201
"""Run ``_distance_worker`` over ``jobs`` and return distances by pair.
158202
159203
``jobs`` is a list of ``(added_hash, removed_hash, added_item, removed_item)``
160204
tuples in the exact order the serial nested loop visits them. The parent
161205
is responsible for that ordering; this helper does not reorder anything.
162206
163207
Returns:
164-
A dict ``{(added_hash, removed_hash): distance}``, or ``None`` if the
165-
section is unsafe to parallelize (unpickleable inputs/parameters,
166-
worker import error, etc.). On ``None`` the caller MUST fall back to
167-
the serial path so correctness is preserved.
208+
``(distances_by_pair, aggregated_worker_stats)`` where the first item
209+
is a dict ``{(added_hash, removed_hash): distance}`` and the second is
210+
the aggregated ``_extract_worker_stats`` snapshot summed across all
211+
workers (counter keys summed, limit flags OR-merged). Returns
212+
``None`` if the section is unsafe to parallelize (unpickleable
213+
inputs/parameters, worker import error, etc.). On ``None`` the caller
214+
MUST fall back to the serial path so correctness is preserved.
168215
169216
Workers may finish out of order; we collect results into a dict keyed by
170217
the original job index, so callers see the same result regardless of
171218
completion order.
172219
"""
173220
if not jobs:
174-
return {}
221+
return {}, _aggregate_worker_stats([])
175222

176223
sanitized_params = _sanitize_parameters_for_worker(parameters)
177224

@@ -200,14 +247,16 @@ def compute_distances_parallel(
200247
)
201248

202249
results_by_index: Dict[int, float] = {}
250+
stats_deltas: List[Dict[str, Any]] = []
203251
try:
204252
with ProcessPoolExecutor(max_workers=config.workers) as executor:
205253
futures = [executor.submit(_distance_worker, payload) for payload in payloads]
206254
for future in as_completed(futures):
207255
# Re-raise worker exceptions in the parent so they surface as
208256
# normal DeepDiff exceptions instead of being swallowed.
209-
idx, distance = future.result()
257+
idx, distance, stats_delta = future.result()
210258
results_by_index[idx] = distance
259+
stats_deltas.append(stats_delta)
211260
except (pickle.PicklingError, AttributeError, TypeError):
212261
# Pickling/spawn-related failures: surface as a serial fallback rather
213262
# than crashing the diff. Other exceptions (worker logic bugs, user
@@ -217,7 +266,7 @@ def compute_distances_parallel(
217266
out: Dict[Tuple[Any, Any], float] = {}
218267
for i, job in enumerate(jobs):
219268
out[(job[0], job[1])] = results_by_index[i]
220-
return out
269+
return out, _aggregate_worker_stats(stats_deltas)
221270

222271

223272
def _hash_worker(job: Tuple[int, Any, str, Dict[str, Any]]) -> Tuple[int, Optional[str]]:
@@ -256,7 +305,7 @@ def _hash_worker(job: Tuple[int, Any, str, Dict[str, Any]]) -> Tuple[int, Option
256305

257306
def _subtree_diff_worker(
258307
job: Tuple[int, Dict[str, Any], Any, Any, Any],
259-
) -> Tuple[int, List[Tuple[str, Any]]]:
308+
) -> Tuple[int, List[Tuple[str, Any]], Dict[str, Any]]:
260309
"""Run one paired-item subtree diff in a worker process.
261310
262311
``job`` layout: ``(job_index, sanitized_parameters, t1, t2, _original_type)``.
@@ -290,30 +339,33 @@ def _subtree_diff_worker(
290339
continue
291340
for leaf in levels:
292341
entries.append((report_type, leaf))
293-
return job_index, entries
342+
return job_index, entries, _extract_worker_stats(diff)
294343

295344

296345
def compute_subtree_diffs_parallel(
297346
jobs: List[Tuple[Any, Any]],
298347
parameters: Dict[str, Any],
299348
original_type: Any,
300349
config: MPConfig,
301-
) -> Optional[List[List[Tuple[str, Any]]]]:
350+
) -> Optional[Tuple[List[List[Tuple[str, Any]]], Dict[str, Any]]]:
302351
"""Run ``_subtree_diff_worker`` over ``jobs`` and return per-job entries.
303352
304353
``jobs`` is a list of ``(t1_item, t2_item)`` tuples in the exact order
305-
the serial paired-iteration code visits them. Returns a list aligned to
306-
that order; each element is ``[(report_type, leaf_difflevel), ...]``
307-
suitable for the parent to rebase and merge into its tree. Returns
308-
``None`` when the section is unsafe to parallelize (unpickleable
354+
the serial paired-iteration code visits them. Returns
355+
``(entries_by_job, aggregated_worker_stats)`` where ``entries_by_job`` is
356+
a list aligned to job order — each element is ``[(report_type,
357+
leaf_difflevel), ...]`` suitable for the parent to rebase and merge into
358+
its tree — and ``aggregated_worker_stats`` is the per-batch ``_stats``
359+
deltas summed across workers (counters summed, limit flags OR-merged).
360+
Returns ``None`` when the section is unsafe to parallelize (unpickleable
309361
parameters/items, worker import error). On ``None`` the caller MUST run
310362
the same jobs serially so correctness is preserved.
311363
312364
Workers may finish out of order; results are collected by their original
313365
job index so the merge order is identical regardless of completion order.
314366
"""
315367
if not jobs:
316-
return []
368+
return [], _aggregate_worker_stats([])
317369

318370
sanitized_params = _sanitize_parameters_for_worker(parameters)
319371

@@ -332,16 +384,21 @@ def compute_subtree_diffs_parallel(
332384
]
333385

334386
results_by_index: Dict[int, List[Tuple[str, Any]]] = {}
387+
stats_deltas: List[Dict[str, Any]] = []
335388
try:
336389
with ProcessPoolExecutor(max_workers=config.workers) as executor:
337390
futures = [executor.submit(_subtree_diff_worker, payload) for payload in payloads]
338391
for future in as_completed(futures):
339-
idx, entries = future.result()
392+
idx, entries, stats_delta = future.result()
340393
results_by_index[idx] = entries
394+
stats_deltas.append(stats_delta)
341395
except (pickle.PicklingError, AttributeError, TypeError):
342396
return None
343397

344-
return [results_by_index[i] for i in range(len(jobs))]
398+
return (
399+
[results_by_index[i] for i in range(len(jobs))],
400+
_aggregate_worker_stats(stats_deltas),
401+
)
345402

346403

347404
def compute_hashes_parallel(

deepdiff/diff.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ def _report_progress(_stats: Dict[str, Any], progress_logger: Callable[[str], No
8686
DISTANCE_CACHE_ENABLED = 'DISTANCE CACHE ENABLED'
8787
PREVIOUS_DIFF_COUNT = 'PREVIOUS DIFF COUNT'
8888
PREVIOUS_DISTANCE_CACHE_HIT_COUNT = 'PREVIOUS DISTANCE CACHE HIT COUNT'
89+
WORKER_DIFF_COUNT = 'WORKER DIFF COUNT'
90+
WORKER_PASSES_COUNT = 'WORKER PASSES COUNT'
91+
WORKER_DISTANCE_CACHE_HIT_COUNT = 'WORKER DISTANCE CACHE HIT COUNT'
92+
WORKER_BATCH_COUNT = 'WORKER BATCH COUNT'
8993
CANT_FIND_NUMPY_MSG = 'Unable to import numpy. This must be a bug in DeepDiff since a numpy array is detected.'
9094
INVALID_VIEW_MSG = "view parameter must be one of 'text', 'tree', 'delta', 'colored' or 'colored_compact'. But {} was passed."
9195
CUTOFF_RANGE_ERROR_MSG = 'cutoff_distance_for_pairs needs to be a positive float max 1.'
@@ -340,6 +344,13 @@ def _group_by_sort_key(x):
340344
MAX_PASS_LIMIT_REACHED: False,
341345
MAX_DIFF_LIMIT_REACHED: False,
342346
DISTANCE_CACHE_ENABLED: bool(cache_size),
347+
# Multiprocessing aggregates: each parallel batch sums per-worker
348+
# _stats deltas into these keys. Parent-side counters above stay
349+
# comparable to a serial run so existing tests are unaffected.
350+
WORKER_DIFF_COUNT: 0,
351+
WORKER_PASSES_COUNT: 0,
352+
WORKER_DISTANCE_CACHE_HIT_COUNT: 0,
353+
WORKER_BATCH_COUNT: 0,
343354
}
344355
self.hashes = dict_() if hashes is None else hashes
345356
self._numpy_paths = dict_() # if _numpy_paths is None else _numpy_paths
@@ -1350,13 +1361,38 @@ def _maybe_compute_pair_distances_parallel(
13501361
if not mp_config.should_parallelize(len(jobs)):
13511362
return None
13521363

1353-
return compute_distances_parallel(
1364+
result = compute_distances_parallel(
13541365
jobs=jobs,
13551366
parameters=self._parameters,
13561367
original_type=_original_type,
13571368
iterable_compare_func=self.iterable_compare_func,
13581369
config=mp_config,
13591370
)
1371+
if result is None:
1372+
return None
1373+
distances, worker_stats = result
1374+
self._merge_worker_stats(worker_stats)
1375+
return distances
1376+
1377+
def _merge_worker_stats(self, worker_stats):
1378+
"""Aggregate one parallel-batch's worker ``_stats`` delta into self._stats.
1379+
1380+
Counters (DIFF / PASSES / DISTANCE CACHE HIT) sum into the matching
1381+
``WORKER_*`` keys; limit flags OR-merge into the parent's existing
1382+
MAX_*_LIMIT_REACHED flags so any worker hitting a guard surfaces the
1383+
same warning state on the public ``get_stats()`` output.
1384+
"""
1385+
if not worker_stats:
1386+
return
1387+
self._stats[WORKER_DIFF_COUNT] += int(worker_stats.get('DIFF COUNT', 0) or 0)
1388+
self._stats[WORKER_PASSES_COUNT] += int(worker_stats.get('PASSES COUNT', 0) or 0)
1389+
self._stats[WORKER_DISTANCE_CACHE_HIT_COUNT] += int(
1390+
worker_stats.get('DISTANCE CACHE HIT COUNT', 0) or 0)
1391+
self._stats[WORKER_BATCH_COUNT] += 1
1392+
if worker_stats.get(MAX_PASS_LIMIT_REACHED):
1393+
self._stats[MAX_PASS_LIMIT_REACHED] = True
1394+
if worker_stats.get(MAX_DIFF_LIMIT_REACHED):
1395+
self._stats[MAX_DIFF_LIMIT_REACHED] = True
13601396

13611397
def _get_most_in_common_pairs_in_iterables(
13621398
self, hashes_added, hashes_removed, t1_hashtable, t2_hashtable, parents_ids, _original_type):
@@ -1578,12 +1614,15 @@ def _dispatch_subtree_jobs(self, pending_jobs, _original_type, local_tree):
15781614
if (mp_config is not None and mp_config.enabled
15791615
and mp_config.should_parallelize(len(pending_jobs))):
15801616
jobs_payload = [(t1_item, t2_item) for (_, t1_item, t2_item, _) in pending_jobs]
1581-
parallel_results = compute_subtree_diffs_parallel(
1617+
outcome = compute_subtree_diffs_parallel(
15821618
jobs=jobs_payload,
15831619
parameters=self._parameters,
15841620
original_type=_original_type,
15851621
config=mp_config,
15861622
)
1623+
if outcome is not None:
1624+
parallel_results, worker_stats = outcome
1625+
self._merge_worker_stats(worker_stats)
15871626

15881627
if parallel_results is None:
15891628
# Below threshold or unsafe inputs — run inline-equivalent serial.

docs/multi_processing.md

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,28 @@ Without this, identity checks like `change.t2 is not notpresent` (used by
3333
`TextResult._from_tree_default` to decide t1-vs-t2 reporting) break on any
3434
DiffLevel that travels through `pickle`, which is exactly the Phase 3 path.
3535

36-
Subtickets #5, #6 (extended matrix), and #7 are still open.
36+
**Phase 4 — landed (2026-04-27).** Subticket #5 (multiprocessing-aware stats)
37+
is implemented. Workers now return their internal `_stats` snapshot alongside
38+
their primary result; the parent aggregates those deltas into four new keys on
39+
its own `_stats` dict — `WORKER DIFF COUNT`, `WORKER PASSES COUNT`,
40+
`WORKER DISTANCE CACHE HIT COUNT`, and `WORKER BATCH COUNT` — and OR-merges
41+
worker `MAX PASS LIMIT REACHED` / `MAX DIFF LIMIT REACHED` flags into the
42+
parent's existing flags so any worker hitting a guard surfaces the same
43+
warning state on the public `get_stats()` output. Parent counters
44+
(`DIFF COUNT`, `PASSES COUNT`, `DISTANCE CACHE HIT COUNT`) stay scoped to the
45+
parent process so they remain comparable to a serial run; this is what lets
46+
existing stats-asserting tests pass with multiprocessing on.
47+
48+
`max_diffs` and `max_passes` continue to act as approximate stop guards.
49+
Workers run their own `DeepDiff` with the same constructor params, so they
50+
trip the limit locally; the OR-merge means the parent's
51+
`MAX_*_LIMIT_REACHED` flags reflect "any worker hit it" without requiring
52+
exact serial-equivalent counts (which the doc explicitly does not require).
53+
`get_stats()` always exposes the new `WORKER_*` keys, even on serial runs,
54+
so consumers can read them unconditionally — they just stay zero when
55+
multiprocessing is off or below threshold.
56+
57+
Subtickets #6 (extended matrix) and #7 (benchmarks) are still open.
3758

3859
What works today:
3960

@@ -61,6 +82,15 @@ What works today:
6182
fallback, `exclude_obj_callback` fallback, plus direct unit tests for
6283
`compute_subtree_diffs_parallel`). All other test files still pass
6384
unchanged.
85+
- Phase 4 adds 8 stats-aggregation tests in `tests/test_multiprocessing.py`
86+
(`TestWorkerStatsUnit` for `_extract_worker_stats` / `_aggregate_worker_stats`,
87+
`TestStatsKeys` for the always-present `WORKER_*` keys on serial runs, and
88+
`TestWorkerStatsAggregationSlow` covering paired-subtree aggregation,
89+
distance-loop aggregation, and the no-double-counting invariant). The
90+
pre-existing stats-asserting tests in `tests/test_cache.py` and
91+
`tests/test_ignore_order.py` were updated to include the four new zeroed
92+
keys in their `expected_stats` dicts; all of them continue to pass with
93+
unchanged primary counter values.
6494

6595
Code locations:
6696

@@ -89,6 +119,24 @@ Code locations:
89119
- `deepdiff/helper.py``NotPresent` / `Unprocessed` / `Skipped` /
90120
`NotHashed` gained `__reduce__` so the singleton sentinels survive
91121
`spawn`-based pickle round-trips.
122+
- `deepdiff/_multiprocessing.py::_extract_worker_stats`,
123+
`_aggregate_worker_stats` — Phase 4 helpers. Each worker dispatch returns
124+
a small picklable stats dict (`DIFF COUNT`, `PASSES COUNT`,
125+
`DISTANCE CACHE HIT COUNT`, plus the two limit flags); the orchestrator
126+
sums counters and OR-merges flags before handing them back.
127+
- `deepdiff/_multiprocessing.py::compute_distances_parallel`,
128+
`compute_subtree_diffs_parallel` — both now return
129+
`(primary_result, aggregated_worker_stats)` instead of just
130+
`primary_result` (the `None` failure-case sentinel is unchanged).
131+
- `deepdiff/diff.py::DeepDiff._merge_worker_stats` — Phase 4 helper that
132+
takes one orchestrator's aggregated stats dict and folds it into the
133+
parent's `self._stats`. Called by both
134+
`_maybe_compute_pair_distances_parallel` and `_dispatch_subtree_jobs`.
135+
- `deepdiff/diff.py` — four new module-level constants
136+
(`WORKER_DIFF_COUNT`, `WORKER_PASSES_COUNT`,
137+
`WORKER_DISTANCE_CACHE_HIT_COUNT`, `WORKER_BATCH_COUNT`) plus
138+
initialization in `__init__` so the keys are always present in
139+
`get_stats()`.
92140

93141
Not yet implemented (deferred, intentional):
94142

@@ -105,8 +153,6 @@ Not yet implemented (deferred, intentional):
105153
the current tests don't cover. Worker-side `_iterable_opcodes` are also
106154
not propagated, so `DELTA_VIEW` of a paired subtree containing ordered
107155
iterables is not yet covered by Phase 3.
108-
- **Subticket #5** — multiprocessing-aware stats semantics. Parent-only stats
109-
remain meaningful in Phase 1, but no aggregation across workers.
110156
- **Subticket #6** — extended test matrix (numpy, pydantic, namedtuple, group_by,
111157
large-mixed structures, worker exception propagation tests). Phase 1 ships
112158
the core determinism harness; the rest is additive.

0 commit comments

Comments
 (0)