-
Notifications
You must be signed in to change notification settings - Fork 34
feat: HDF5-backed dataloader for memory-efficient large-scale training #146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
48e75ff
feat: HDF5-backed dataloader for memory-efficient large-scale training
amitpande74 0809bae
style: sort imports (isort) in h5_dataloader.py and csv_to_h5.py
amitpande74 6b3fd80
style: fix E221 (whitespace before operator) for flake8 compliance
amitpande74 7fe2541
refactor: make HDF5 dataloader and converter project-agnostic
amitpande74 9c436ef
fix: CLI autodetect for HDF5 inputs; correct feature_names in converter
amitpande74 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| #!/usr/bin/env python3 | ||
| """ | ||
| csv_to_h5.py - Convert a feature matrix CSV to HDF5 for memory-efficient loading. | ||
|
|
||
| Large feature-matrix CSV files are slow to parse and, when read with pandas, | ||
| are stored as float64 by default - doubling their in-memory footprint. | ||
| Converting such a CSV to HDF5 once allows it to be loaded later as native | ||
| float32 with substantially lower peak memory. | ||
|
|
||
| The input CSV is expected to have features as rows and samples as columns, with | ||
| the first column containing feature identifiers and the header row containing | ||
| sample identifiers. This matches the layout used elsewhere in Flexynesis. | ||
|
|
||
| The output HDF5 file has the following layout: | ||
| /matrix (n_samples, n_features) float32, chunked (1, n_features) | ||
| /sample_ids (n_samples,) byte strings | ||
| /feature_names (n_features,) byte strings | ||
| attrs: created_by, source_csv, orientation, n_samples, n_features | ||
|
|
||
| Data are stored samples-as-rows with per-row chunking so that individual | ||
| samples can be read efficiently. H5DataImporter reads files in this layout. | ||
|
|
||
| Usage: | ||
| python -m flexynesis.csv_to_h5 input.csv output.h5 | ||
| python -m flexynesis.csv_to_h5 input.csv output.h5 --chunksize 1000 | ||
| """ | ||
| import argparse | ||
| import sys | ||
| import time | ||
| from pathlib import Path | ||
|
|
||
| import h5py | ||
| import numpy as np | ||
| import pandas as pd | ||
|
|
||
| DEFAULT_CHUNKSIZE = 500 # rows (features) read per pandas chunk | ||
|
|
||
|
|
||
| def log(msg): | ||
| print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True) | ||
|
|
||
|
|
||
| def convert_csv_to_h5(src_csv, dst_h5, chunksize=DEFAULT_CHUNKSIZE): | ||
| """ | ||
| Convert a single feature-matrix CSV to HDF5. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| src_csv : str or Path | ||
| Input CSV: features as rows, samples as columns, first column is the | ||
| feature index. | ||
| dst_h5 : str or Path | ||
| Output HDF5 path. Parent directories are created if needed. | ||
| chunksize : int | ||
| Number of CSV rows parsed per chunk while streaming the file. | ||
|
|
||
| Returns | ||
| ------- | ||
| Path | ||
| The path to the written HDF5 file. | ||
| """ | ||
| src_csv = Path(src_csv) | ||
| dst_h5 = Path(dst_h5) | ||
|
|
||
| if not src_csv.exists(): | ||
| raise FileNotFoundError(f"Input CSV not found: {src_csv}") | ||
|
|
||
| dst_h5.parent.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| log(f"Converting {src_csv} -> {dst_h5}") | ||
|
|
||
| # ---------- Scan structure: sample IDs (header) and feature IDs (col 0) ---------- | ||
| header_df = pd.read_csv(src_csv, nrows=0, index_col=0) | ||
| sample_ids = header_df.columns.tolist() | ||
| n_samples = len(sample_ids) | ||
|
|
||
| # Use index_col=0 to read the first column as the row index; | ||
| # `nrows=None` keeps it light because no value columns are loaded. | ||
| feature_index = pd.read_csv(src_csv, index_col=0, usecols=[0]).index | ||
| feature_names = feature_index.astype(str).tolist() | ||
| n_features = len(feature_names) | ||
| del feature_index | ||
|
|
||
| log(f" {n_samples:,} samples x {n_features:,} features") | ||
|
|
||
| # ---------- Stream CSV into a pre-allocated (n_features, n_samples) array ---------- | ||
| arr = np.empty((n_features, n_samples), dtype=np.float32) | ||
|
|
||
| chunks = pd.read_csv(src_csv, index_col=0, chunksize=chunksize) | ||
| row_idx = 0 | ||
| for chunk in chunks: | ||
| chunk_arr = chunk.values.astype(np.float32, copy=False) | ||
| n = chunk_arr.shape[0] | ||
| arr[row_idx:row_idx + n] = chunk_arr | ||
| row_idx += n | ||
| del chunk_arr | ||
|
|
||
| if row_idx != n_features: | ||
| raise ValueError( | ||
| f"Row count mismatch: scanned {n_features} features, " | ||
| f"read {row_idx} while streaming." | ||
| ) | ||
|
|
||
| # ---------- Transpose to (n_samples, n_features) for samples-as-rows storage ---------- | ||
| arr_t = np.ascontiguousarray(arr.T) | ||
| del arr | ||
|
|
||
| # ---------- Write HDF5 ---------- | ||
| with h5py.File(dst_h5, "w") as h5f: | ||
| # Per-row chunking: one chunk == one sample, for fast single-sample reads. | ||
| h5f.create_dataset("matrix", data=arr_t, chunks=(1, n_features)) | ||
| h5f.create_dataset("sample_ids", data=np.array(sample_ids, dtype="S")) | ||
| h5f.create_dataset("feature_names", | ||
| data=np.array(feature_names, dtype="S")) | ||
| h5f.attrs["created_by"] = "csv_to_h5.py" | ||
| h5f.attrs["source_csv"] = str(src_csv) | ||
| h5f.attrs["orientation"] = "samples_as_rows" | ||
| h5f.attrs["n_samples"] = n_samples | ||
| h5f.attrs["n_features"] = n_features | ||
| del arr_t | ||
|
|
||
| size_gb = dst_h5.stat().st_size / 1e9 | ||
| log(f" Wrote {size_gb:.2f} GB to {dst_h5}") | ||
| return dst_h5 | ||
|
|
||
|
|
||
| def main(argv=None): | ||
| parser = argparse.ArgumentParser( | ||
| description="Convert a feature-matrix CSV to HDF5 for " | ||
| "memory-efficient loading." | ||
| ) | ||
| parser.add_argument("input_csv", help="Input CSV (features x samples).") | ||
| parser.add_argument("output_h5", help="Output HDF5 file path.") | ||
| parser.add_argument( | ||
| "--chunksize", type=int, default=DEFAULT_CHUNKSIZE, | ||
| help=f"CSV rows parsed per chunk (default: {DEFAULT_CHUNKSIZE}).", | ||
| ) | ||
| args = parser.parse_args(argv) | ||
|
|
||
| try: | ||
| convert_csv_to_h5(args.input_csv, args.output_h5, args.chunksize) | ||
| except (FileNotFoundError, ValueError) as exc: | ||
| sys.exit(f"ERROR: {exc}") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,140 @@ | ||
| """ | ||
| h5_dataloader.py - HDF5-backed DataImporter for Flexynesis. | ||
|
|
||
| The stock Flexynesis DataImporter reads each modality with pd.read_csv, which | ||
| loads values as float64 by default. For large feature matrices this doubles | ||
| the in-memory footprint and makes data loading the dominant memory cost of a | ||
| training run. | ||
|
|
||
| H5DataImporter is a drop-in subclass of DataImporter that loads any modality | ||
| from HDF5 as native float32 when an .h5 file is present, and otherwise falls | ||
| back to the standard CSV path. All other Flexynesis behaviour - data cleanup, | ||
| NaN imputation, label encoding, scaling, and torch dataset construction - is | ||
| inherited unchanged. | ||
|
|
||
| HDF5 files are expected in the layout produced by csv_to_h5.py: | ||
| /matrix (n_samples, n_features) float32 | ||
| /sample_ids (n_samples,) byte strings | ||
| /feature_names (n_features,) byte strings | ||
|
|
||
| Each modality folder is expected to contain clin.csv (as required by the | ||
| stock DataImporter); every other modality may be supplied as either an .h5 | ||
| file or a .csv file. | ||
|
|
||
| Usage: | ||
| from flexynesis.h5_dataloader import H5DataImporter | ||
| di = H5DataImporter(path='my_dataset', data_types=['gex']) | ||
| train_ds, test_ds = di.import_data() | ||
| """ | ||
| import os | ||
|
|
||
| import h5py | ||
| import numpy as np | ||
| import pandas as pd | ||
|
|
||
| from flexynesis.data import DataImporter | ||
|
|
||
|
|
||
| class H5DataImporter(DataImporter): | ||
| """ | ||
| DataImporter subclass that loads modality matrices from HDF5 when | ||
| available, and from CSV otherwise. | ||
|
|
||
| Only read_data() and validate_data_folders() are overridden; the parent | ||
| DataImporter and all downstream Flexynesis components are unchanged. | ||
|
|
||
| HDF5 files are expected in the layout written by csv_to_h5.py: | ||
| /matrix (n_samples, n_features) float32 | ||
| /sample_ids (n_samples,) byte strings | ||
| /feature_names (n_features,) byte strings | ||
| """ | ||
|
|
||
| def read_data(self, folder_path): | ||
| """ | ||
| Override of DataImporter.read_data. | ||
|
|
||
| Returns a dict {file_name: DataFrame} in the format Flexynesis | ||
| expects. Each modality in self.data_types is loaded from an .h5 file | ||
| if present, otherwise from the corresponding .csv. clin.csv is always | ||
| loaded from CSV. | ||
| """ | ||
| print("\n[INFO] ----------------- Reading Data (HDF5) ----------------- ") | ||
| data = {} | ||
| required_files = {'clin.csv'} | {f"{dt}.csv" for dt in self.data_types} | ||
|
|
||
| for file in required_files: | ||
| file_name = os.path.splitext(file)[0] | ||
|
|
||
| # Modality matrices may be HDF5; clin.csv is always CSV. | ||
| if file_name in self.data_types: | ||
| h5_path = os.path.join(folder_path, f"{file_name}.h5") | ||
| if not os.path.exists(h5_path): | ||
| # Fall back to CSV when no HDF5 file is present. | ||
| csv_path = os.path.join(folder_path, file) | ||
| print(f"[INFO] HDF5 not found at {h5_path}, " | ||
| f"falling back to CSV: {csv_path}") | ||
| data[file_name] = pd.read_csv(csv_path, index_col=0) | ||
| else: | ||
| print(f"[INFO] Importing {h5_path} (HDF5)...") | ||
| data[file_name] = self._read_h5_as_dataframe(h5_path) | ||
| else: | ||
| csv_path = os.path.join(folder_path, file) | ||
| print(f"[INFO] Importing {csv_path}...") | ||
| data[file_name] = pd.read_csv(csv_path, index_col=0) | ||
|
|
||
| return data | ||
|
|
||
| @staticmethod | ||
| def _read_h5_as_dataframe(h5_path): | ||
| """ | ||
| Read an HDF5 modality file into a DataFrame matching the Flexynesis | ||
| CSV convention: | ||
| index = feature identifiers (str) | ||
| columns = sample identifiers (str) | ||
| values = float32 | ||
|
|
||
| HDF5 stores the matrix samples-as-rows; it is transposed here to | ||
| features-as-rows during DataFrame construction. | ||
| """ | ||
| with h5py.File(h5_path, 'r') as f: | ||
| n_samples, n_features = f['matrix'].shape | ||
| print(f"[INFO] HDF5 shape: {n_samples:,} samples x " | ||
| f"{n_features:,} features (float32)") | ||
|
|
||
| arr = f['matrix'][:] # (n_samples, n_features) float32 | ||
| sample_ids = [s.decode() for s in f['sample_ids'][:]] | ||
| feature_names = [g.decode() for g in f['feature_names'][:]] | ||
|
|
||
| # Transpose to (n_features, n_samples) to match the CSV convention. | ||
| # np.ascontiguousarray forces a real copy in C-order. | ||
| arr_t = np.ascontiguousarray(arr.T) | ||
| del arr | ||
|
|
||
| df = pd.DataFrame(arr_t, index=feature_names, columns=sample_ids) | ||
| print(f"[INFO] DataFrame ready: {df.shape} " | ||
| f"dtype: {df.dtypes.iloc[0]}") | ||
| return df | ||
|
|
||
| def validate_data_folders(self, training_path, testing_path): | ||
| """ | ||
| Override to accept either a .csv or an .h5 file for each modality in | ||
| self.data_types. clin.csv must still be present as CSV. | ||
| """ | ||
| for split_name, path in [("training", training_path), | ||
| ("testing", testing_path)]: | ||
| if not os.path.isdir(path): | ||
| raise ValueError( | ||
| f"{split_name} folder does not exist: {path}") | ||
| missing = [] | ||
| if not os.path.exists(os.path.join(path, "clin.csv")): | ||
| missing.append("clin.csv") | ||
| for dt in self.data_types: | ||
| h5_ok = os.path.exists(os.path.join(path, f"{dt}.h5")) | ||
| csv_ok = os.path.exists(os.path.join(path, f"{dt}.csv")) | ||
| if not (h5_ok or csv_ok): | ||
| missing.append(f"{dt}.h5 or {dt}.csv") | ||
| if missing: | ||
| raise ValueError( | ||
| f"Missing files in {split_name} folder: " | ||
| f"{', '.join(missing)}") | ||
| print("[INFO] Validating data folders... OK (HDF5 or CSV accepted)") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This script altogether is not acceptable. It is not generic. It is for your specific project use-case. It would be useful if it worked on any csv file(s).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made it generic