Skip to content

feat: parallel Comparison.from_netcdf and .expand (not planned)#672

Closed
FBumann wants to merge 1 commit intomainfrom
feature/comparison-from-netcdf-expand
Closed

feat: parallel Comparison.from_netcdf and .expand (not planned)#672
FBumann wants to merge 1 commit intomainfrom
feature/comparison-from-netcdf-expand

Conversation

@FBumann
Copy link
Copy Markdown
Member

@FBumann FBumann commented Apr 21, 2026

Summary

  • New Comparison.from_netcdf(paths, max_workers=None) classmethod — accepts a list of paths or a dict mapping paths → names, loads with a thread pool.
  • New Comparison.expand(max_workers=None) method — parallel expansion of clustered FlowSystems, passes through systems without clustering.
  • Both return a fresh Comparison instance. The file read in from_netcdf is serialized under a module-level lock because the netCDF4 C library is not thread-safe; only the CPU-bound deserialization (JSON attrs + FlowSystem.from_dataset) actually runs in parallel.

Benchmark results (why this may be closed)

Honest numbers — the parallelism doesn't pay off much:

workload variant wall time speedup
4× 82 MB files (100 components, 8760h, 10 periods) serial 1.48 s 1.00×
thread pool (default) 1.25 s 1.19×
process pool (4) 3.93 s 0.38×
6× 335 MB files (400 components) serial 7.46 s 1.00×
thread pool (default) 5.83 s 1.28×
Comparison.expand (6× clustered, 720h) serial 0.164 s 1.00×
thread pool (default) 0.167 s 0.98×

Profile: ~70 % of per-file work is in load_dataset_from_netcdf (GIL-held netCDF4 read, now under our lock) and only ~30 % in FlowSystem.from_dataset. Theoretical thread ceiling ≈ 1.4×, real ≈ 1.2–1.3×. Process pool startup + FlowSystem pickling dwarf any parallelism win at every size we tried.

Test plan

  • Unit tests in tests/test_comparison.py::TestComparisonFromNetcdf and TestComparisonExpand (list/dict input, serial==parallel equivalence, mixed-cluster pass-through, full-timestep restoration).
  • Full tests/test_comparison.py suite passes (54 tests).

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added a method to load multiple comparison datasets from NetCDF files with parallel processing support.
    • Added an expansion method for clustered systems that processes multiple cases concurrently while preserving case names.
  • Tests

    • Added comprehensive test coverage for dataset loading and system expansion functionality.

…workers

Both methods accept `max_workers` and use a thread pool. `from_netcdf`
serializes the actual netCDF4 read under a module-level lock (the C
library is not thread-safe) and parallelizes the FlowSystem deserialization;
`expand` parallelizes per-system expansion and passes through systems
without clustering so mixed comparisons work.

Benchmarked speedup is modest (~1.2-1.3x for loads, ~1x for expand);
thread overhead largely cancels out the small fraction of work that
actually releases the GIL.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 21, 2026

📝 Walkthrough

Walkthrough

The changes add two new methods to the Comparison class: a classmethod from_netcdf() that concurrently loads multiple NetCDF files into FlowSystem objects with serialized file reading due to thread-safety constraints, and an instance method expand() that applies clustering expansion to FlowSystem objects in parallel while maintaining case names.

Changes

Cohort / File(s) Summary
Concurrent NetCDF Loading and Expansion
flixopt/comparison.py
Added global _NETCDF_READ_LOCK and threading imports for safe concurrent dataset loading. Introduced from_netcdf() classmethod supporting both path lists and path-to-name mappings, using thread pool with serialized file reads. Added expand() method that applies clustering expansion to systems in parallel, passing through non-clustered systems unchanged.
Test Coverage
tests/test_comparison.py
Added TestComparisonFromNetcdf validating list and dict input modes, and determinism with single worker. Added TestComparisonExpand verifying expanded systems preserve case names, restore full time dimensions, and maintain consistency across worker configurations.

Sequence Diagrams

sequenceDiagram
    participant Client
    participant ThreadPool
    participant Lock
    participant FileSystem as File System
    participant FlowSystem
    participant Comparison

    Client->>ThreadPool: from_netcdf(paths, max_workers)
    ThreadPool->>ThreadPool: Create worker tasks for each path
    
    loop For each NetCDF file
        ThreadPool->>Lock: Acquire _NETCDF_READ_LOCK
        Lock-->>ThreadPool: Lock acquired
        ThreadPool->>FileSystem: load_dataset_from_netcdf(path)
        FileSystem-->>ThreadPool: Dataset
        ThreadPool->>Lock: Release _NETCDF_READ_LOCK
        
        ThreadPool->>FlowSystem: FlowSystem.from_dataset(dataset)
        FlowSystem-->>ThreadPool: FlowSystem instance
    end
    
    ThreadPool-->>Comparison: All FlowSystem objects
    Comparison-->>Client: Comparison with case names
Loading
sequenceDiagram
    participant Client
    participant ThreadPool
    participant FlowSystem
    participant Transform
    participant Comparison

    Client->>ThreadPool: expand(max_workers)
    ThreadPool->>ThreadPool: Create tasks for each FlowSystem

    loop For each FlowSystem
        alt FlowSystem.clustering is not None
            ThreadPool->>Transform: fs.transform.expand()
            Transform-->>ThreadPool: Expanded FlowSystem
        else
            ThreadPool-->>ThreadPool: Pass through unchanged
        end
    end

    ThreadPool-->>Comparison: Expanded systems (names preserved)
    Comparison-->>Client: New Comparison instance
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Poem

🐰 A thread-safe tale of parallel dreams,
NetCDF files flowing in concurrent streams,
Locks keep the rabbits from tangling their thread,
While expansions multiply what was said!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 77.78% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly describes the main changes: introducing parallel versions of Comparison.from_netcdf and .expand methods with thread pool support.
Description check ✅ Passed The description comprehensively covers the changes, includes implementation details, benchmark results, and testing verification; all key sections from the template are addressed.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/comparison-from-netcdf-expand

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
flixopt/comparison.py (1)

34-38: Consider moving the netCDF lock into load_dataset_from_netcdf itself.

Putting the lock inside flixopt/io.py::load_dataset_from_netcdf would protect all concurrent callers, not just Comparison.from_netcdf. As written, any other thread-pool consumer of load_dataset_from_netcdf (now or in the future) can still race with from_netcdf readers and segfault the shared HDF5/netCDF4 global state, because each call site must remember to take _NETCDF_READ_LOCK itself. Centralizing the lock next to the actual xr.load_dataset(..., engine='netcdf4') call makes the thread-safety guarantee a property of the loader rather than of each caller.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flixopt/comparison.py` around lines 34 - 38, The module-level
_NETCDF_READ_LOCK should be moved into flixopt/io.py and used inside
load_dataset_from_netcdf so the xr.load_dataset(..., engine='netcdf4') call is
wrapped by the lock; update load_dataset_from_netcdf to create or reference a
threading.Lock (e.g., a module-scoped lock in flixopt/io.py) and acquire/release
it around the actual netCDF4 read, and remove external locking in
Comparison.from_netcdf so callers no longer need to remember to take
_NETCDF_READ_LOCK; this centralizes thread-safety at the loader
(load_dataset_from_netcdf) rather than at each caller.
tests/test_comparison.py (1)

543-663: Solid coverage for the new parallel paths.

The list/dict input forms, serial-vs-parallel equivalence via xr.testing.assert_identical, full-timestep restoration (168 + 1), and the mixed clustered/non-clustered identity check together cover the meaningful behavioral contracts of from_netcdf and expand. The pytest.importorskip('tsam') guard in clustered_systems keeps the suite portable.

One small note: test_from_netcdf_serial_matches_parallel compares the default path against max_workers=1, but on single-core CI runners ThreadPoolExecutor's default may also collapse to 1 worker, so this asserts determinism rather than true serial-vs-parallel equivalence. Consider pinning the "parallel" side to max_workers=2 to make the intent explicit.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_comparison.py` around lines 543 - 663, The test
test_from_netcdf_serial_matches_parallel can falsely pass on single-core CI
because the ThreadPoolExecutor default may be 1; make the "parallel" call
explicitly multi-worker so the comparison is meaningful: in
test_from_netcdf_serial_matches_parallel call fx.Comparison.from_netcdf([p1,
p2], max_workers=2) for comp_parallel (leave comp_serial as max_workers=1) and
keep the subsequent assertions comparing comp_parallel and comp_serial
(references: test_from_netcdf_serial_matches_parallel,
fx.Comparison.from_netcdf, variables comp_parallel and comp_serial).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@flixopt/comparison.py`:
- Around line 246-251: The helper _load_one currently unconditionally sets
FlowSystem.name to Path(path).stem, overwriting user-supplied names; modify the
loading logic so that when callers supply an explicit name (e.g., via the dict
form feeding Comparison names=), you preserve that name instead of replacing it
— detect the provided label before calling _load_one (or change _load_one to
accept an optional name parameter), call FlowSystem.from_dataset(ds) and only
set fs.name = Path(path).stem if no explicit name was given; update any call
sites that construct Comparison from a mapping to pass the provided label into
_load_one.

---

Nitpick comments:
In `@flixopt/comparison.py`:
- Around line 34-38: The module-level _NETCDF_READ_LOCK should be moved into
flixopt/io.py and used inside load_dataset_from_netcdf so the
xr.load_dataset(..., engine='netcdf4') call is wrapped by the lock; update
load_dataset_from_netcdf to create or reference a threading.Lock (e.g., a
module-scoped lock in flixopt/io.py) and acquire/release it around the actual
netCDF4 read, and remove external locking in Comparison.from_netcdf so callers
no longer need to remember to take _NETCDF_READ_LOCK; this centralizes
thread-safety at the loader (load_dataset_from_netcdf) rather than at each
caller.

In `@tests/test_comparison.py`:
- Around line 543-663: The test test_from_netcdf_serial_matches_parallel can
falsely pass on single-core CI because the ThreadPoolExecutor default may be 1;
make the "parallel" call explicitly multi-worker so the comparison is
meaningful: in test_from_netcdf_serial_matches_parallel call
fx.Comparison.from_netcdf([p1, p2], max_workers=2) for comp_parallel (leave
comp_serial as max_workers=1) and keep the subsequent assertions comparing
comp_parallel and comp_serial (references:
test_from_netcdf_serial_matches_parallel, fx.Comparison.from_netcdf, variables
comp_parallel and comp_serial).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 6fe5013d-8ea7-4470-bc0d-50c5d5cd7d5c

📥 Commits

Reviewing files that changed from the base of the PR and between 10103d2 and cbe2d7e.

📒 Files selected for processing (2)
  • flixopt/comparison.py
  • tests/test_comparison.py

Comment thread flixopt/comparison.py
Comment on lines +246 to +251
def _load_one(path: str | _pl.Path) -> FlowSystem:
with _NETCDF_READ_LOCK:
ds = load_dataset_from_netcdf(path)
fs = FlowSystem.from_dataset(ds)
fs.name = _pl.Path(path).stem
return fs
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

fs.name is overwritten with the filename stem even when the user supplies explicit names via a dict.

For the dict input form ({path: 'baseline', ...}), the explicit name is only applied at the Comparison level (via names=), while each FlowSystem.name is still forced to Path(path).stem. That leaves the underlying system with a name that differs from the case label the user asked for, which can be confusing when accessing comp[i].name or later rebuilding a Comparison from those systems.

Consider using the provided name when available:

Proposed tweak
-        if isinstance(paths, dict):
-            path_list = list(paths.keys())
-            names: list[str] | None = list(paths.values())
-        else:
-            path_list = list(paths)
-            names = None
-
-        def _load_one(path: str | _pl.Path) -> FlowSystem:
-            with _NETCDF_READ_LOCK:
-                ds = load_dataset_from_netcdf(path)
-            fs = FlowSystem.from_dataset(ds)
-            fs.name = _pl.Path(path).stem
-            return fs
-
-        with ThreadPoolExecutor(max_workers=max_workers) as executor:
-            flow_systems = list(executor.map(_load_one, path_list))
+        if isinstance(paths, dict):
+            path_list = list(paths.keys())
+            names: list[str] | None = list(paths.values())
+        else:
+            path_list = list(paths)
+            names = None
+
+        stem_names = [_pl.Path(p).stem for p in path_list]
+        fs_names = names if names is not None else stem_names
+
+        def _load_one(item: tuple[str | _pl.Path, str]) -> FlowSystem:
+            path, fs_name = item
+            with _NETCDF_READ_LOCK:
+                ds = load_dataset_from_netcdf(path)
+            fs = FlowSystem.from_dataset(ds)
+            fs.name = fs_name
+            return fs
+
+        with ThreadPoolExecutor(max_workers=max_workers) as executor:
+            flow_systems = list(executor.map(_load_one, zip(path_list, fs_names, strict=True)))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flixopt/comparison.py` around lines 246 - 251, The helper _load_one currently
unconditionally sets FlowSystem.name to Path(path).stem, overwriting
user-supplied names; modify the loading logic so that when callers supply an
explicit name (e.g., via the dict form feeding Comparison names=), you preserve
that name instead of replacing it — detect the provided label before calling
_load_one (or change _load_one to accept an optional name parameter), call
FlowSystem.from_dataset(ds) and only set fs.name = Path(path).stem if no
explicit name was given; update any call sites that construct Comparison from a
mapping to pass the provided label into _load_one.

@FBumann FBumann closed this Apr 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant