|
5 | 5 | - Command line options --include-local and --datasets for dataset selection |
6 | 6 | - Fixtures for accessing dataset information |
7 | 7 | - @pytest.mark.use_discovered_datasets for dynamic dataset parametrization |
8 | | -- Helper functions for parallel test execution with xdist_group markers |
| 8 | +- Session-scoped conversation cache for efficient test execution |
9 | 9 | """ |
10 | 10 |
|
| 11 | +from copy import deepcopy |
| 12 | + |
11 | 13 | import pytest |
| 14 | + |
| 15 | +from polismath.conversation.conversation import Conversation |
| 16 | +from polismath.regression import get_dataset_files |
12 | 17 | from polismath.regression.datasets import ( |
13 | 18 | discover_datasets, |
14 | 19 | list_regression_datasets, |
15 | 20 | get_blob_variants, |
16 | 21 | ) |
| 22 | +from tests.common_utils import load_votes, load_comments |
17 | 23 |
|
18 | 24 |
|
19 | 25 | # ============================================================================= |
20 | | -# Parallel Execution Helpers |
| 26 | +# Session-scoped Conversation Cache |
21 | 27 | # ============================================================================= |
22 | 28 |
|
23 | | -def make_dataset_params(datasets: list[str]) -> list: |
| 29 | +_SESSION_CONV_CACHE: dict = {} |
| 30 | + |
| 31 | + |
| 32 | +@pytest.fixture(scope="session") |
| 33 | +def get_or_compute_conversation(): |
| 34 | + """Session-wide conversation cache shared across all test files. |
| 35 | +
|
| 36 | + Returns a function that computes a Conversation once per dataset and |
| 37 | + returns a deepcopy each time to preserve test isolation. |
| 38 | +
|
| 39 | + Only ONE dataset is kept in memory at a time. When a different dataset |
| 40 | + is requested, the previous one is evicted. This works because tests are |
| 41 | + reordered by pytest_collection_modifyitems to group all tests for a |
| 42 | + dataset together (across all test files). |
24 | 43 | """ |
25 | | - Create pytest.param objects with xdist_group markers for parallel execution. |
| 44 | + import gc |
26 | 45 |
|
27 | | - When using pytest-xdist with --dist=loadgroup, tests with the same |
28 | | - xdist_group marker will run on the same worker. This ensures fixtures |
29 | | - are computed only once per dataset per worker. |
| 46 | + def _get(dataset_name: str) -> dict: |
| 47 | + if dataset_name not in _SESSION_CONV_CACHE: |
| 48 | + # Evict previous dataset (we only keep one at a time) |
| 49 | + for ds in list(_SESSION_CONV_CACHE.keys()): |
| 50 | + _SESSION_CONV_CACHE.pop(ds, None) |
| 51 | + Conversation._reset_conversion_cache() |
| 52 | + gc.collect() |
| 53 | + |
| 54 | + files = get_dataset_files(dataset_name, blob_type='incremental') |
| 55 | + votes = load_votes(files['votes']) |
| 56 | + comments = load_comments(files['comments']) |
| 57 | + |
| 58 | + conv = Conversation(dataset_name) |
| 59 | + conv = conv.update_votes(votes) |
| 60 | + conv = conv.recompute() |
| 61 | + |
| 62 | + _SESSION_CONV_CACHE[dataset_name] = { |
| 63 | + 'conv': conv, |
| 64 | + 'dataset_name': dataset_name, |
| 65 | + 'files': files, |
| 66 | + 'comments': comments, |
| 67 | + } |
| 68 | + |
| 69 | + return deepcopy(_SESSION_CONV_CACHE[dataset_name]) |
| 70 | + |
| 71 | + return _get |
| 72 | + |
| 73 | + |
| 74 | +# ============================================================================= |
| 75 | +# Dataset Parametrization Helpers |
| 76 | +# ============================================================================= |
| 77 | + |
| 78 | +def make_dataset_params(datasets: list[str]) -> list: |
| 79 | + """ |
| 80 | + Create pytest.param objects for dataset parametrization. |
30 | 81 |
|
31 | 82 | Args: |
32 | 83 | datasets: List of dataset names (or "dataset-blob_type" composite IDs) |
33 | 84 |
|
34 | 85 | Returns: |
35 | | - List of pytest.param objects with xdist_group markers |
| 86 | + List of pytest.param objects |
36 | 87 |
|
37 | 88 | Example: |
38 | 89 | @pytest.mark.parametrize("dataset_name", make_dataset_params(["biodiversity", "vw"])) |
39 | 90 | def test_something(dataset_name): |
40 | 91 | ... |
41 | 92 | """ |
42 | | - return [ |
43 | | - pytest.param(ds, marks=pytest.mark.xdist_group(ds)) |
44 | | - for ds in datasets |
45 | | - ] |
| 93 | + return [pytest.param(ds) for ds in datasets] |
46 | 94 |
|
47 | 95 |
|
48 | 96 | def parse_dataset_blob_id(composite_id: str) -> tuple[str, str]: |
@@ -137,7 +185,7 @@ def pytest_generate_tests(metafunc): |
137 | 185 | With use_blobs=True, parametrize with 'dataset-blob_type' composite IDs |
138 | 186 | (e.g., 'biodiversity-incremental', 'engage-cold_start') for each filled blob variant. |
139 | 187 |
|
140 | | - Uses xdist_group markers for efficient parallel execution with pytest-xdist. |
| 188 | + Uses the session-scoped conversation cache for efficient test execution. |
141 | 189 | """ |
142 | 190 | markers = list(metafunc.definition.iter_markers("use_discovered_datasets")) |
143 | 191 | if not markers: |
@@ -169,6 +217,58 @@ def pytest_generate_tests(metafunc): |
169 | 217 | metafunc.parametrize("dataset_name", make_dataset_params(datasets)) |
170 | 218 |
|
171 | 219 |
|
| 220 | +# ============================================================================= |
| 221 | +# Test Reordering for Cache Efficiency |
| 222 | +# ============================================================================= |
| 223 | + |
| 224 | +def _extract_dataset_from_test(item) -> str: |
| 225 | + """Extract the dataset name from a test item's parameters. |
| 226 | +
|
| 227 | + Handles both plain dataset names ('biodiversity') and composite IDs |
| 228 | + ('biodiversity-incremental'). Returns empty string if no dataset parameter. |
| 229 | + """ |
| 230 | + # Check callspec for parametrized values |
| 231 | + if hasattr(item, 'callspec') and item.callspec.params: |
| 232 | + for param_name in ('dataset_name', 'dataset_blob_id'): |
| 233 | + if param_name in item.callspec.params: |
| 234 | + value = item.callspec.params[param_name] |
| 235 | + # Extract base dataset name from composite IDs |
| 236 | + if value.endswith('-incremental'): |
| 237 | + return value[:-len('-incremental')] |
| 238 | + elif value.endswith('-cold_start'): |
| 239 | + return value[:-len('-cold_start')] |
| 240 | + return value |
| 241 | + return '' |
| 242 | + |
| 243 | + |
| 244 | +def pytest_collection_modifyitems(session, config, items): |
| 245 | + """Reorder tests to group by dataset for cache efficiency. |
| 246 | +
|
| 247 | + Groups all tests for a dataset together (across all test files) so that |
| 248 | + the session-scoped conversation cache only needs to hold ONE dataset at |
| 249 | + a time. This reduces peak memory from O(N datasets) to O(1 dataset). |
| 250 | +
|
| 251 | + Order: dataset1[file1, file2, ...], dataset2[file1, file2, ...], ... |
| 252 | + Within each dataset, original test order is preserved. |
| 253 | + """ |
| 254 | + # Separate tests into dataset-parametrized and non-parametrized |
| 255 | + dataset_tests = [] |
| 256 | + other_tests = [] |
| 257 | + |
| 258 | + for item in items: |
| 259 | + ds = _extract_dataset_from_test(item) |
| 260 | + if ds: |
| 261 | + dataset_tests.append((ds, item)) |
| 262 | + else: |
| 263 | + other_tests.append(item) |
| 264 | + |
| 265 | + # Sort dataset tests by dataset name (stable sort preserves order within dataset) |
| 266 | + dataset_tests.sort(key=lambda x: x[0]) |
| 267 | + |
| 268 | + # Rebuild items list: non-parametrized first, then dataset tests grouped |
| 269 | + items[:] = other_tests + [item for _, item in dataset_tests] |
| 270 | + |
| 271 | + |
172 | 272 | # Provide summary of discovered datasets at start of test run |
173 | 273 | def pytest_report_header(config): |
174 | 274 | """Add dataset discovery info to pytest header.""" |
|
0 commit comments