diff --git a/flexynesis/__init__.py b/flexynesis/__init__.py index 3a31a0d2..19b9dca4 100644 --- a/flexynesis/__init__.py +++ b/flexynesis/__init__.py @@ -67,6 +67,7 @@ def __repr__(self): models = LazyModule("models") feature_selection = LazyModule("feature_selection") utils = LazyModule("utils") +csv_to_h5 = LazyModule("csv_to_h5") # Import commonly used classes directly for easy access @@ -76,6 +77,12 @@ def _get_data_importer(): return data.DataImporter +def _get_h5_data_importer(): + """Lazy getter for H5DataImporter class (HDF5-backed).""" + from .h5_dataloader import H5DataImporter + return H5DataImporter + + def _get_models(): """Lazy getter for model classes.""" return models @@ -90,5 +97,7 @@ def _get_models(): "models", "feature_selection", "utils", + "csv_to_h5", "DataImporter", + "H5DataImporter", ] diff --git a/flexynesis/__main__.py b/flexynesis/__main__.py index ecb5fe69..bdce3de4 100644 --- a/flexynesis/__main__.py +++ b/flexynesis/__main__.py @@ -1,4 +1,5 @@ import argparse +import glob import os import sys import time @@ -1122,7 +1123,23 @@ def main(): else: covariates = None - data_importer = DataImporter( + # Autodetect HDF5 inputs: if any modality is supplied as + # .h5 instead of .csv (in either the train/ or test/ + # split), use the HDF5-backed loader. clin.csv is still + # required as CSV by the stock DataImporter. + h5_present = any( + glob.glob(os.path.join(args.data_path, split, f"{dt}.h5")) + for split in ("train", "test") + for dt in datatypes + ) + if h5_present: + from .h5_dataloader import H5DataImporter as _DataImporter + print("[INFO] HDF5 modality files detected " + "-- using H5DataImporter.") + else: + _DataImporter = DataImporter + + data_importer = _DataImporter( path=args.data_path, data_types=datatypes, covariates=covariates, diff --git a/flexynesis/csv_to_h5.py b/flexynesis/csv_to_h5.py new file mode 100644 index 00000000..4f4a9a2d --- /dev/null +++ b/flexynesis/csv_to_h5.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +""" +csv_to_h5.py - Convert a feature matrix CSV to HDF5 for memory-efficient loading. + +Large feature-matrix CSV files are slow to parse and, when read with pandas, +are stored as float64 by default - doubling their in-memory footprint. +Converting such a CSV to HDF5 once allows it to be loaded later as native +float32 with substantially lower peak memory. + +The input CSV is expected to have features as rows and samples as columns, with +the first column containing feature identifiers and the header row containing +sample identifiers. This matches the layout used elsewhere in Flexynesis. + +The output HDF5 file has the following layout: + /matrix (n_samples, n_features) float32, chunked (1, n_features) + /sample_ids (n_samples,) byte strings + /feature_names (n_features,) byte strings + attrs: created_by, source_csv, orientation, n_samples, n_features + +Data are stored samples-as-rows with per-row chunking so that individual +samples can be read efficiently. H5DataImporter reads files in this layout. + +Usage: + python -m flexynesis.csv_to_h5 input.csv output.h5 + python -m flexynesis.csv_to_h5 input.csv output.h5 --chunksize 1000 +""" +import argparse +import sys +import time +from pathlib import Path + +import h5py +import numpy as np +import pandas as pd + +DEFAULT_CHUNKSIZE = 500 # rows (features) read per pandas chunk + + +def log(msg): + print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True) + + +def convert_csv_to_h5(src_csv, dst_h5, chunksize=DEFAULT_CHUNKSIZE): + """ + Convert a single feature-matrix CSV to HDF5. + + Parameters + ---------- + src_csv : str or Path + Input CSV: features as rows, samples as columns, first column is the + feature index. + dst_h5 : str or Path + Output HDF5 path. Parent directories are created if needed. + chunksize : int + Number of CSV rows parsed per chunk while streaming the file. + + Returns + ------- + Path + The path to the written HDF5 file. + """ + src_csv = Path(src_csv) + dst_h5 = Path(dst_h5) + + if not src_csv.exists(): + raise FileNotFoundError(f"Input CSV not found: {src_csv}") + + dst_h5.parent.mkdir(parents=True, exist_ok=True) + + log(f"Converting {src_csv} -> {dst_h5}") + + # ---------- Scan structure: sample IDs (header) and feature IDs (col 0) ---------- + header_df = pd.read_csv(src_csv, nrows=0, index_col=0) + sample_ids = header_df.columns.tolist() + n_samples = len(sample_ids) + + # Use index_col=0 to read the first column as the row index; + # `nrows=None` keeps it light because no value columns are loaded. + feature_index = pd.read_csv(src_csv, index_col=0, usecols=[0]).index + feature_names = feature_index.astype(str).tolist() + n_features = len(feature_names) + del feature_index + + log(f" {n_samples:,} samples x {n_features:,} features") + + # ---------- Stream CSV into a pre-allocated (n_features, n_samples) array ---------- + arr = np.empty((n_features, n_samples), dtype=np.float32) + + chunks = pd.read_csv(src_csv, index_col=0, chunksize=chunksize) + row_idx = 0 + for chunk in chunks: + chunk_arr = chunk.values.astype(np.float32, copy=False) + n = chunk_arr.shape[0] + arr[row_idx:row_idx + n] = chunk_arr + row_idx += n + del chunk_arr + + if row_idx != n_features: + raise ValueError( + f"Row count mismatch: scanned {n_features} features, " + f"read {row_idx} while streaming." + ) + + # ---------- Transpose to (n_samples, n_features) for samples-as-rows storage ---------- + arr_t = np.ascontiguousarray(arr.T) + del arr + + # ---------- Write HDF5 ---------- + with h5py.File(dst_h5, "w") as h5f: + # Per-row chunking: one chunk == one sample, for fast single-sample reads. + h5f.create_dataset("matrix", data=arr_t, chunks=(1, n_features)) + h5f.create_dataset("sample_ids", data=np.array(sample_ids, dtype="S")) + h5f.create_dataset("feature_names", + data=np.array(feature_names, dtype="S")) + h5f.attrs["created_by"] = "csv_to_h5.py" + h5f.attrs["source_csv"] = str(src_csv) + h5f.attrs["orientation"] = "samples_as_rows" + h5f.attrs["n_samples"] = n_samples + h5f.attrs["n_features"] = n_features + del arr_t + + size_gb = dst_h5.stat().st_size / 1e9 + log(f" Wrote {size_gb:.2f} GB to {dst_h5}") + return dst_h5 + + +def main(argv=None): + parser = argparse.ArgumentParser( + description="Convert a feature-matrix CSV to HDF5 for " + "memory-efficient loading." + ) + parser.add_argument("input_csv", help="Input CSV (features x samples).") + parser.add_argument("output_h5", help="Output HDF5 file path.") + parser.add_argument( + "--chunksize", type=int, default=DEFAULT_CHUNKSIZE, + help=f"CSV rows parsed per chunk (default: {DEFAULT_CHUNKSIZE}).", + ) + args = parser.parse_args(argv) + + try: + convert_csv_to_h5(args.input_csv, args.output_h5, args.chunksize) + except (FileNotFoundError, ValueError) as exc: + sys.exit(f"ERROR: {exc}") + + +if __name__ == "__main__": + main() diff --git a/flexynesis/h5_dataloader.py b/flexynesis/h5_dataloader.py new file mode 100644 index 00000000..9160c660 --- /dev/null +++ b/flexynesis/h5_dataloader.py @@ -0,0 +1,140 @@ +""" +h5_dataloader.py - HDF5-backed DataImporter for Flexynesis. + +The stock Flexynesis DataImporter reads each modality with pd.read_csv, which +loads values as float64 by default. For large feature matrices this doubles +the in-memory footprint and makes data loading the dominant memory cost of a +training run. + +H5DataImporter is a drop-in subclass of DataImporter that loads any modality +from HDF5 as native float32 when an .h5 file is present, and otherwise falls +back to the standard CSV path. All other Flexynesis behaviour - data cleanup, +NaN imputation, label encoding, scaling, and torch dataset construction - is +inherited unchanged. + +HDF5 files are expected in the layout produced by csv_to_h5.py: + /matrix (n_samples, n_features) float32 + /sample_ids (n_samples,) byte strings + /feature_names (n_features,) byte strings + +Each modality folder is expected to contain clin.csv (as required by the +stock DataImporter); every other modality may be supplied as either an .h5 +file or a .csv file. + +Usage: + from flexynesis.h5_dataloader import H5DataImporter + di = H5DataImporter(path='my_dataset', data_types=['gex']) + train_ds, test_ds = di.import_data() +""" +import os + +import h5py +import numpy as np +import pandas as pd + +from flexynesis.data import DataImporter + + +class H5DataImporter(DataImporter): + """ + DataImporter subclass that loads modality matrices from HDF5 when + available, and from CSV otherwise. + + Only read_data() and validate_data_folders() are overridden; the parent + DataImporter and all downstream Flexynesis components are unchanged. + + HDF5 files are expected in the layout written by csv_to_h5.py: + /matrix (n_samples, n_features) float32 + /sample_ids (n_samples,) byte strings + /feature_names (n_features,) byte strings + """ + + def read_data(self, folder_path): + """ + Override of DataImporter.read_data. + + Returns a dict {file_name: DataFrame} in the format Flexynesis + expects. Each modality in self.data_types is loaded from an .h5 file + if present, otherwise from the corresponding .csv. clin.csv is always + loaded from CSV. + """ + print("\n[INFO] ----------------- Reading Data (HDF5) ----------------- ") + data = {} + required_files = {'clin.csv'} | {f"{dt}.csv" for dt in self.data_types} + + for file in required_files: + file_name = os.path.splitext(file)[0] + + # Modality matrices may be HDF5; clin.csv is always CSV. + if file_name in self.data_types: + h5_path = os.path.join(folder_path, f"{file_name}.h5") + if not os.path.exists(h5_path): + # Fall back to CSV when no HDF5 file is present. + csv_path = os.path.join(folder_path, file) + print(f"[INFO] HDF5 not found at {h5_path}, " + f"falling back to CSV: {csv_path}") + data[file_name] = pd.read_csv(csv_path, index_col=0) + else: + print(f"[INFO] Importing {h5_path} (HDF5)...") + data[file_name] = self._read_h5_as_dataframe(h5_path) + else: + csv_path = os.path.join(folder_path, file) + print(f"[INFO] Importing {csv_path}...") + data[file_name] = pd.read_csv(csv_path, index_col=0) + + return data + + @staticmethod + def _read_h5_as_dataframe(h5_path): + """ + Read an HDF5 modality file into a DataFrame matching the Flexynesis + CSV convention: + index = feature identifiers (str) + columns = sample identifiers (str) + values = float32 + + HDF5 stores the matrix samples-as-rows; it is transposed here to + features-as-rows during DataFrame construction. + """ + with h5py.File(h5_path, 'r') as f: + n_samples, n_features = f['matrix'].shape + print(f"[INFO] HDF5 shape: {n_samples:,} samples x " + f"{n_features:,} features (float32)") + + arr = f['matrix'][:] # (n_samples, n_features) float32 + sample_ids = [s.decode() for s in f['sample_ids'][:]] + feature_names = [g.decode() for g in f['feature_names'][:]] + + # Transpose to (n_features, n_samples) to match the CSV convention. + # np.ascontiguousarray forces a real copy in C-order. + arr_t = np.ascontiguousarray(arr.T) + del arr + + df = pd.DataFrame(arr_t, index=feature_names, columns=sample_ids) + print(f"[INFO] DataFrame ready: {df.shape} " + f"dtype: {df.dtypes.iloc[0]}") + return df + + def validate_data_folders(self, training_path, testing_path): + """ + Override to accept either a .csv or an .h5 file for each modality in + self.data_types. clin.csv must still be present as CSV. + """ + for split_name, path in [("training", training_path), + ("testing", testing_path)]: + if not os.path.isdir(path): + raise ValueError( + f"{split_name} folder does not exist: {path}") + missing = [] + if not os.path.exists(os.path.join(path, "clin.csv")): + missing.append("clin.csv") + for dt in self.data_types: + h5_ok = os.path.exists(os.path.join(path, f"{dt}.h5")) + csv_ok = os.path.exists(os.path.join(path, f"{dt}.csv")) + if not (h5_ok or csv_ok): + missing.append(f"{dt}.h5 or {dt}.csv") + if missing: + raise ValueError( + f"Missing files in {split_name} folder: " + f"{', '.join(missing)}") + print("[INFO] Validating data folders... OK (HDF5 or CSV accepted)") diff --git a/pyproject.toml b/pyproject.toml index 093d184a..6b86d396 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,8 @@ dependencies = [ "pot", "geomloss", "plotnine", - "safetensors" + "safetensors", + "h5py" ] [project.scripts]