Skip to content

Commit 4d12c2e

Browse files
jucorclaude
andcommitted
Add session-scoped conversation cache and disable parallelization
Introduce a session-scoped pytest fixture in conftest.py that caches computed Conversations across test files. Combined with test reordering, this eliminates redundant computation when multiple test files need the same datasets. Key changes: 1. Session-scoped cache in conftest.py with single-dataset eviction 2. pytest_collection_modifyitems hook reorders tests to group by dataset 3. Disable pytest-xdist parallelization (-n0 in pyproject.toml) How it works: - Tests are reordered: all tests for dataset1 run first (across all files), then all tests for dataset2, etc. - Cache holds ONE dataset at a time, evicting when switching datasets - Peak memory: O(1 dataset) instead of O(N datasets) Performance improvement for running legacy + discrepancy tests: - Before: ~23s (each file computed conversations independently) - After: ~2.2s (compute once per dataset, reuse across files) - Speedup: ~10x Why disable parallelization: - With parallelization: ~28s (each worker has separate process/cache) - Without parallelization: ~2.2s (single process shares cache) - Parallel workers can't share the session cache (separate processes) Note: If PYTEST_ADDOPTS is set in the environment (e.g., "-n auto"), it will override pyproject.toml's -n0. Either unset it or set to "". Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent e4fecb7 commit 4d12c2e

4 files changed

Lines changed: 132 additions & 135 deletions

File tree

delphi/pyproject.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,12 @@ include = [
124124

125125
# Pytest configuration
126126
[tool.pytest.ini_options]
127-
# When using pytest-xdist (-n), group tests by xdist_group marker for efficient fixture sharing
128-
addopts = "--dist=loadgroup"
127+
# Force sequential execution (-n0) to leverage the session-scoped conversation cache.
128+
# The cache shares computed conversations across test files, but each xdist worker
129+
# has its own process with a separate cache. With only 2 datasets (biodiversity, vw),
130+
# sequential execution with caching is ~6x faster than parallel execution where each
131+
# worker recomputes the conversations independently.
132+
addopts = "-n0"
129133
filterwarnings = [
130134
# Ignore python_multipart deprecation warning from ddtrace (third-party)
131135
"ignore:Please use `import python_multipart`:PendingDeprecationWarning:ddtrace.internal.module",

delphi/tests/conftest.py

Lines changed: 113 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,44 +5,92 @@
55
- Command line options --include-local and --datasets for dataset selection
66
- Fixtures for accessing dataset information
77
- @pytest.mark.use_discovered_datasets for dynamic dataset parametrization
8-
- Helper functions for parallel test execution with xdist_group markers
8+
- Session-scoped conversation cache for efficient test execution
99
"""
1010

11+
from copy import deepcopy
12+
1113
import pytest
14+
15+
from polismath.conversation.conversation import Conversation
16+
from polismath.regression import get_dataset_files
1217
from polismath.regression.datasets import (
1318
discover_datasets,
1419
list_regression_datasets,
1520
get_blob_variants,
1621
)
22+
from tests.common_utils import load_votes, load_comments
1723

1824

1925
# =============================================================================
20-
# Parallel Execution Helpers
26+
# Session-scoped Conversation Cache
2127
# =============================================================================
2228

23-
def make_dataset_params(datasets: list[str]) -> list:
29+
_SESSION_CONV_CACHE: dict = {}
30+
31+
32+
@pytest.fixture(scope="session")
33+
def get_or_compute_conversation():
34+
"""Session-wide conversation cache shared across all test files.
35+
36+
Returns a function that computes a Conversation once per dataset and
37+
returns a deepcopy each time to preserve test isolation.
38+
39+
Only ONE dataset is kept in memory at a time. When a different dataset
40+
is requested, the previous one is evicted. This works because tests are
41+
reordered by pytest_collection_modifyitems to group all tests for a
42+
dataset together (across all test files).
2443
"""
25-
Create pytest.param objects with xdist_group markers for parallel execution.
44+
import gc
2645

27-
When using pytest-xdist with --dist=loadgroup, tests with the same
28-
xdist_group marker will run on the same worker. This ensures fixtures
29-
are computed only once per dataset per worker.
46+
def _get(dataset_name: str) -> dict:
47+
if dataset_name not in _SESSION_CONV_CACHE:
48+
# Evict previous dataset (we only keep one at a time)
49+
for ds in list(_SESSION_CONV_CACHE.keys()):
50+
_SESSION_CONV_CACHE.pop(ds, None)
51+
Conversation._reset_conversion_cache()
52+
gc.collect()
53+
54+
files = get_dataset_files(dataset_name, blob_type='incremental')
55+
votes = load_votes(files['votes'])
56+
comments = load_comments(files['comments'])
57+
58+
conv = Conversation(dataset_name)
59+
conv = conv.update_votes(votes)
60+
conv = conv.recompute()
61+
62+
_SESSION_CONV_CACHE[dataset_name] = {
63+
'conv': conv,
64+
'dataset_name': dataset_name,
65+
'files': files,
66+
'comments': comments,
67+
}
68+
69+
return deepcopy(_SESSION_CONV_CACHE[dataset_name])
70+
71+
return _get
72+
73+
74+
# =============================================================================
75+
# Dataset Parametrization Helpers
76+
# =============================================================================
77+
78+
def make_dataset_params(datasets: list[str]) -> list:
79+
"""
80+
Create pytest.param objects for dataset parametrization.
3081
3182
Args:
3283
datasets: List of dataset names (or "dataset-blob_type" composite IDs)
3384
3485
Returns:
35-
List of pytest.param objects with xdist_group markers
86+
List of pytest.param objects
3687
3788
Example:
3889
@pytest.mark.parametrize("dataset_name", make_dataset_params(["biodiversity", "vw"]))
3990
def test_something(dataset_name):
4091
...
4192
"""
42-
return [
43-
pytest.param(ds, marks=pytest.mark.xdist_group(ds))
44-
for ds in datasets
45-
]
93+
return [pytest.param(ds) for ds in datasets]
4694

4795

4896
def parse_dataset_blob_id(composite_id: str) -> tuple[str, str]:
@@ -137,7 +185,7 @@ def pytest_generate_tests(metafunc):
137185
With use_blobs=True, parametrize with 'dataset-blob_type' composite IDs
138186
(e.g., 'biodiversity-incremental', 'engage-cold_start') for each filled blob variant.
139187
140-
Uses xdist_group markers for efficient parallel execution with pytest-xdist.
188+
Uses the session-scoped conversation cache for efficient test execution.
141189
"""
142190
markers = list(metafunc.definition.iter_markers("use_discovered_datasets"))
143191
if not markers:
@@ -169,6 +217,58 @@ def pytest_generate_tests(metafunc):
169217
metafunc.parametrize("dataset_name", make_dataset_params(datasets))
170218

171219

220+
# =============================================================================
221+
# Test Reordering for Cache Efficiency
222+
# =============================================================================
223+
224+
def _extract_dataset_from_test(item) -> str:
225+
"""Extract the dataset name from a test item's parameters.
226+
227+
Handles both plain dataset names ('biodiversity') and composite IDs
228+
('biodiversity-incremental'). Returns empty string if no dataset parameter.
229+
"""
230+
# Check callspec for parametrized values
231+
if hasattr(item, 'callspec') and item.callspec.params:
232+
for param_name in ('dataset_name', 'dataset_blob_id'):
233+
if param_name in item.callspec.params:
234+
value = item.callspec.params[param_name]
235+
# Extract base dataset name from composite IDs
236+
if '-incremental' in value:
237+
return value.replace('-incremental', '')
238+
elif '-cold_start' in value:
239+
return value.replace('-cold_start', '')
240+
return value
241+
return ''
242+
243+
244+
def pytest_collection_modifyitems(session, config, items):
245+
"""Reorder tests to group by dataset for cache efficiency.
246+
247+
Groups all tests for a dataset together (across all test files) so that
248+
the session-scoped conversation cache only needs to hold ONE dataset at
249+
a time. This reduces peak memory from O(N datasets) to O(1 dataset).
250+
251+
Order: dataset1[file1, file2, ...], dataset2[file1, file2, ...], ...
252+
Within each dataset, original test order is preserved.
253+
"""
254+
# Separate tests into dataset-parametrized and non-parametrized
255+
dataset_tests = []
256+
other_tests = []
257+
258+
for item in items:
259+
ds = _extract_dataset_from_test(item)
260+
if ds:
261+
dataset_tests.append((ds, item))
262+
else:
263+
other_tests.append(item)
264+
265+
# Sort dataset tests by dataset name (stable sort preserves order within dataset)
266+
dataset_tests.sort(key=lambda x: x[0])
267+
268+
# Rebuild items list: non-parametrized first, then dataset tests grouped
269+
items[:] = other_tests + [item for _, item in dataset_tests]
270+
271+
172272
# Provide summary of discovered datasets at start of test run
173273
def pytest_report_header(config):
174274
"""Add dataset discovery info to pytest header."""

delphi/tests/test_discrepancy_fixes.py

Lines changed: 7 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
)
5151
from polismath.regression.datasets import discover_datasets
5252
from conftest import _get_requested_datasets, make_dataset_params, parse_dataset_blob_id
53-
from tests.common_utils import load_votes, load_comments, load_clojure_output
53+
from tests.common_utils import load_clojure_output
5454

5555

5656
# ---------------------------------------------------------------------------
@@ -85,55 +85,23 @@ def pytest_generate_tests(metafunc):
8585
# Shared fixtures
8686
# ---------------------------------------------------------------------------
8787

88-
# Module-level caches — Conversation is keyed by dataset name (shared across
89-
# blob variants), blobs are keyed by composite ID.
90-
_CONV_CACHE: dict = {}
88+
# Module-level cache for blobs (keyed by composite ID)
9189
_BLOB_CACHE: dict = {}
9290

9391

94-
def _get_or_compute_conversation(dataset_name: str) -> dict:
95-
"""Compute (or retrieve cached) conversation for a dataset."""
96-
import gc
97-
if dataset_name in _CONV_CACHE:
98-
return _CONV_CACHE[dataset_name]
99-
100-
# Evict other datasets
101-
for ds in list(_CONV_CACHE.keys()):
102-
if ds != dataset_name:
103-
_CONV_CACHE.pop(ds, None)
104-
Conversation._reset_conversion_cache()
105-
gc.collect()
106-
107-
files = get_dataset_files(dataset_name, blob_type='incremental')
108-
votes = load_votes(files['votes'])
109-
comments = load_comments(files['comments'])
110-
111-
conv = Conversation(dataset_name)
112-
conv = conv.update_votes(votes)
113-
conv = conv.recompute()
114-
115-
data = {
116-
'conv': conv,
117-
'dataset_name': dataset_name,
118-
'files': files,
119-
'comments': comments,
120-
}
121-
_CONV_CACHE[dataset_name] = data
122-
return data
123-
124-
12592
@pytest.fixture(scope="class")
126-
def conversation_data(dataset_name):
93+
def conversation_data(dataset_name, get_or_compute_conversation):
12794
"""Class-scoped fixture: runs the full pipeline once per dataset+blob_type.
12895
12996
dataset_name here is actually a composite 'dataset-blob_type' ID
130-
(e.g., 'biodiversity-full'). The Conversation is shared across blob variants.
97+
(e.g., 'biodiversity-full'). The Conversation is shared across blob variants
98+
via the session-scoped get_or_compute_conversation fixture.
13199
"""
132100
global _BLOB_CACHE
133101
ds_name, blob_type = parse_dataset_blob_id(dataset_name)
134102

135-
# Get or compute the conversation (shared across blob variants)
136-
conv_data = _get_or_compute_conversation(ds_name)
103+
# Get or compute the conversation (shared across blob variants via session cache)
104+
conv_data = get_or_compute_conversation(ds_name)
137105

138106
# Load the specific blob variant (cache per composite ID)
139107
if dataset_name not in _BLOB_CACHE:

delphi/tests/test_legacy_clojure_regression.py

Lines changed: 6 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818

1919
import pytest
2020
import pytest_check as check
21-
import gc
2221

23-
from polismath.conversation.conversation import Conversation
2422
from polismath.regression import get_dataset_files, get_blob_variants
2523
from polismath.regression.datasets import discover_datasets
26-
from tests.common_utils import load_votes, load_comments, load_clojure_output
24+
from tests.common_utils import load_clojure_output
2725
from conftest import _get_requested_datasets, make_dataset_params, parse_dataset_blob_id
2826
from polismath.regression.clojure_comparer import (
2927
ClojureComparer,
@@ -50,9 +48,7 @@ def _get_clojure_dataset_blob_ids(include_local: bool, requested: Optional[set[s
5048
return result
5149

5250

53-
# Module-level caches — Conversation is keyed by dataset name (shared across
54-
# blob variants of the same dataset), blobs are keyed by composite ID.
55-
_CONV_CACHE: dict = {}
51+
# Module-level cache for blobs (keyed by composite ID)
5652
_BLOB_CACHE: dict = {}
5753

5854

@@ -66,88 +62,17 @@ def pytest_generate_tests(metafunc):
6662
metafunc.parametrize("dataset_blob_id", params, scope="class")
6763

6864

69-
def _get_or_compute_conversation(dataset_name: str) -> dict:
70-
"""Get cached Conversation or compute it. Evicts other datasets for memory."""
71-
global _CONV_CACHE
72-
if dataset_name in _CONV_CACHE:
73-
return _CONV_CACHE[dataset_name]
74-
75-
# Evict previous datasets
76-
for ds in list(_CONV_CACHE.keys()):
77-
if ds != dataset_name:
78-
print(f"[{ds}] Cleaning up previous dataset...")
79-
_CONV_CACHE.pop(ds, None)
80-
Conversation._reset_conversion_cache()
81-
gc.collect()
82-
83-
# Get dataset files (blob_type doesn't matter here — we only need votes/comments)
84-
dataset_files = get_dataset_files(dataset_name, blob_type='incremental')
85-
86-
# Create and compute conversation
87-
votes = load_votes(dataset_files['votes'])
88-
comments = load_comments(dataset_files['comments'])
89-
90-
print(f"\n[{dataset_name}] Processing conversation with {len(votes['votes'])} votes and {len(comments['comments'])} comments")
91-
conv = Conversation(dataset_name)
92-
conv = conv.update_votes(votes)
93-
94-
print(f"[{dataset_name}] Recomputing conversation analysis...")
95-
conv = conv.recompute()
96-
97-
# Extract key metrics for reporting
98-
group_count = len(conv.group_clusters)
99-
print(f"[{dataset_name}] Found {group_count} groups")
100-
print(f"[{dataset_name}] Processed {conv.comment_count} comments")
101-
print(f"[{dataset_name}] Found {conv.participant_count} participants")
102-
103-
if conv.repness and 'comment_repness' in conv.repness:
104-
print(f"[{dataset_name}] Calculated representativeness for {len(conv.repness['comment_repness'])} comments")
105-
106-
# Print top representative comments for each group
107-
if conv.repness and 'comment_repness' in conv.repness:
108-
for group_id in range(group_count):
109-
print(f"\n[{dataset_name}] Top representative comments for Group {group_id}:")
110-
group_repness = [item for item in conv.repness['comment_repness'] if item['gid'] == group_id]
111-
112-
# Sort by representativeness
113-
group_repness.sort(key=lambda x: abs(x['repness']), reverse=True)
114-
115-
# Print top 5 comments
116-
for i, rep_item in enumerate(group_repness[:5]):
117-
comment_id = rep_item['tid']
118-
# Get the comment text if available
119-
comment_txt = next((c['txt'] for c in comments['comments'] if str(c['tid']) == str(comment_id)), 'Unknown')
120-
print(f" {i+1}. Comment {comment_id} (Repness: {rep_item['repness']:.4f}): {comment_txt[:50]}...")
121-
122-
# Save the Python conversion results for manual inspection
123-
import os
124-
import json
125-
data_dir = dataset_files['data_dir']
126-
output_dir = os.path.join(os.path.dirname(data_dir), '.test_outputs', 'python_output', dataset_name)
127-
os.makedirs(output_dir, exist_ok=True)
128-
129-
output_path = os.path.join(output_dir, 'conversation_result.json')
130-
with open(output_path, 'w') as f:
131-
json.dump(conv.to_dict(), f, indent=2)
132-
133-
print(f"[{dataset_name}] Saved results to {output_path}")
134-
135-
data = {'conv': conv, 'comments': comments}
136-
_CONV_CACHE[dataset_name] = data
137-
return data
138-
139-
14065
@pytest.fixture(scope="class")
141-
def conversation_data(dataset_blob_id):
66+
def conversation_data(dataset_blob_id, get_or_compute_conversation):
14267
"""
14368
Class-scoped fixture computed once per dataset+blob_type.
144-
Reuses the Conversation across blob variants of the same dataset.
69+
Reuses the Conversation across blob variants via the session-scoped cache.
14570
"""
14671
global _BLOB_CACHE
14772
dataset_name, blob_type = parse_dataset_blob_id(dataset_blob_id)
14873

149-
# Get or compute the conversation (shared across blob variants)
150-
conv_data = _get_or_compute_conversation(dataset_name)
74+
# Get or compute the conversation (shared across blob variants via session cache)
75+
conv_data = get_or_compute_conversation(dataset_name)
15176

15277
# Load the specific blob variant (cache per composite ID)
15378
if dataset_blob_id not in _BLOB_CACHE:

0 commit comments

Comments
 (0)