From 48e75ff8e0475ef772aa6247f0111b2073f51754 Mon Sep 17 00:00:00 2001 From: Amit Pande Date: Thu, 14 May 2026 15:32:14 +0200 Subject: [PATCH 1/5] feat: HDF5-backed dataloader for memory-efficient large-scale training Adds H5DataImporter, a drop-in subclass of DataImporter that loads expression matrices from HDF5 (h5py) as native float32 instead of CSV. Includes csv_to_h5.py chunked-write converter for one-time format migration. On a 118k-sample x 16k-gene compendium, this reduces peak data-loading RAM from over 60 GB (CSV path) to approximately 25 GB, enabling training on the full ARCHS4 bulk corpus without intermediate downsampling. Implementation: - flexynesis/h5_dataloader.py: H5DataImporter (subclass; overrides only read_data() and validate_data_folders(), parent untouched) - flexynesis/csv_to_h5.py: chunked-write CSV-to-HDF5 converter - flexynesis/__init__.py: lazy exports for csv_to_h5 + H5DataImporter - pyproject.toml: h5py>=3.10 added to dependencies Backward compatibility: H5DataImporter falls back to CSV automatically if an .h5 file is absent in the data folder. Parent DataImporter and all downstream Flexynesis components are unchanged. --- flexynesis/__init__.py | 9 ++ flexynesis/csv_to_h5.py | 188 ++++++++++++++++++++++++++++++++++++ flexynesis/h5_dataloader.py | 148 ++++++++++++++++++++++++++++ pyproject.toml | 3 +- 4 files changed, 347 insertions(+), 1 deletion(-) create mode 100644 flexynesis/csv_to_h5.py create mode 100644 flexynesis/h5_dataloader.py diff --git a/flexynesis/__init__.py b/flexynesis/__init__.py index 3a31a0d2..19b9dca4 100644 --- a/flexynesis/__init__.py +++ b/flexynesis/__init__.py @@ -67,6 +67,7 @@ def __repr__(self): models = LazyModule("models") feature_selection = LazyModule("feature_selection") utils = LazyModule("utils") +csv_to_h5 = LazyModule("csv_to_h5") # Import commonly used classes directly for easy access @@ -76,6 +77,12 @@ def _get_data_importer(): return data.DataImporter +def _get_h5_data_importer(): + """Lazy getter for H5DataImporter class (HDF5-backed).""" + from .h5_dataloader import H5DataImporter + return H5DataImporter + + def _get_models(): """Lazy getter for model classes.""" return models @@ -90,5 +97,7 @@ def _get_models(): "models", "feature_selection", "utils", + "csv_to_h5", "DataImporter", + "H5DataImporter", ] diff --git a/flexynesis/csv_to_h5.py b/flexynesis/csv_to_h5.py new file mode 100644 index 00000000..329d6e68 --- /dev/null +++ b/flexynesis/csv_to_h5.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +""" +csv_to_h5.py — Convert Scope B CSVs to HDF5 for memory-safe training. + +Why this exists: + Flexynesis CSV DataImporter blows RAM (60+ GB peak with copies) on the + Scope B 14 GB train CSV → GUI freeze + force-shutdown. HDF5 with + lazy per-sample loading sidesteps the whole problem. + +Inputs (must exist): + processed_scaled_411k_tissue_B/train/gex.csv (14 GB, ~118k samples) + processed_scaled_411k_tissue_B/train/clin.csv + processed_scaled_411k_tissue_B/test/gex.csv (3.3 GB, ~28k samples) + processed_scaled_411k_tissue_B/test/clin.csv + +Outputs: + processed_scaled_411k_tissue_B_h5/train/gex.h5 (~8 GB, samples × genes) + processed_scaled_411k_tissue_B_h5/train/clin.csv (copied) + processed_scaled_411k_tissue_B_h5/test/gex.h5 (~2 GB) + processed_scaled_411k_tissue_B_h5/test/clin.csv (copied) + +HDF5 layout per gex.h5: + /expression (n_samples, n_genes) float32, chunks=(1, n_genes) ← fast row reads + /sample_ids (n_samples,) bytes + /gene_symbols (n_genes,) bytes + attrs: created_by, source_csv, normalization, orientation + +Memory profile: + Read phase: ~8 GB (pre-allocated array fills as chunks parsed) + Transpose phase: ~16 GB (temporary 2× during transpose copy) + Write phase: ~8 GB (writes to disk, then frees) + PEAK: ~16 GB ← well within 58 GB free + +Runtime: ~6-8 min total (NVMe sequential, no compression overhead) +""" +import shutil +import sys +import time +from pathlib import Path + +import h5py +import numpy as np +import pandas as pd + + +ROOT = Path("/home/amit/Desktop/projects/flexynesis") +SRC_DIR = ROOT / "processed_scaled_411k_tissue_B" +DST_DIR = ROOT / "processed_scaled_411k_tissue_B_h5" + +GENES_PER_CHUNK = 500 # pandas chunksize + + +def log(msg): + print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True) + + +def convert_split(split): + src_gex = SRC_DIR / split / "gex.csv" + src_clin = SRC_DIR / split / "clin.csv" + dst_gex = DST_DIR / split / "gex.h5" + dst_clin = DST_DIR / split / "clin.csv" + + DST_DIR.joinpath(split).mkdir(parents=True, exist_ok=True) + + log("=" * 70) + log(f"[{split}] CSV → HDF5") + log("=" * 70) + + # ---------- Copy clin.csv ---------- + shutil.copy(src_clin, dst_clin) + log(f" Copied clin.csv → {dst_clin}") + + # ---------- Pass 1: scan structure (headers + gene names) ---------- + log(" [1/4] Scanning CSV structure...") + t = time.time() + + # Get sample IDs from header (one-row read, near-instant) + header_df = pd.read_csv(src_gex, nrows=0, index_col=0) + sample_ids = header_df.columns.tolist() + n_samples = len(sample_ids) + + # Get gene symbols (first column only — fast) + gene_col = pd.read_csv(src_gex, usecols=[0]) + gene_symbols = gene_col.iloc[:, 0].tolist() + n_genes = len(gene_symbols) + del gene_col + + log(f" Samples: {n_samples:,} Genes: {n_genes:,} " + f"(scan {time.time()-t:.1f}s)") + log(f" Output size: {n_samples*n_genes*4/1e9:.2f} GB (float32, no compression)") + + # ---------- Pass 2: stream CSV into pre-allocated array ---------- + log(" [2/4] Streaming CSV into RAM (pandas chunked, float32)...") + t = time.time() + + # Pre-allocate target: (n_genes, n_samples) — matches CSV orientation + arr = np.empty((n_genes, n_samples), dtype=np.float32) + + chunks = pd.read_csv(src_gex, index_col=0, chunksize=GENES_PER_CHUNK) + gene_idx = 0 + for chunk_i, chunk in enumerate(chunks): + chunk_arr = chunk.values.astype(np.float32, copy=False) # (n, n_samples) + n = chunk_arr.shape[0] + arr[gene_idx:gene_idx + n] = chunk_arr + gene_idx += n + del chunk_arr + + if (chunk_i + 1) % 4 == 0: + elapsed = time.time() - t + rate = gene_idx / elapsed + eta_min = (n_genes - gene_idx) / rate / 60 + log(f" {gene_idx:>6,}/{n_genes:,} genes " + f"({rate:.0f}/s, ETA {eta_min:.1f} min, elapsed {elapsed/60:.1f} min)") + + read_min = (time.time() - t) / 60 + log(f" Read done: {arr.nbytes / 1e9:.2f} GB in RAM ({read_min:.1f} min)") + + # ---------- Pass 3: transpose to (n_samples, n_genes) ---------- + log(" [3/4] Transposing (n_genes, n_samples) → (n_samples, n_genes)...") + t = time.time() + # ascontiguousarray forces a real copy in C-order — needed for HDF5 write + arr_T = np.ascontiguousarray(arr.T) + del arr # free 8 GB + log(f" Transpose done ({time.time()-t:.1f}s) " + f"Now: {arr_T.nbytes / 1e9:.2f} GB") + + # ---------- Pass 4: write HDF5 ---------- + log(" [4/4] Writing HDF5...") + t = time.time() + with h5py.File(dst_gex, "w") as h5f: + # chunks=(1, n_genes) → one chunk = one sample row = fast single-sample reads + h5f.create_dataset( + "expression", + data=arr_T, + chunks=(1, n_genes), + # No compression — speed > space (NVMe has 464 GB free) + ) + h5f.create_dataset("sample_ids", + data=np.array(sample_ids, dtype="S")) + h5f.create_dataset("gene_symbols", + data=np.array(gene_symbols, dtype="S")) + h5f.attrs["created_by"] = "csv_to_h5.py" + h5f.attrs["source_csv"] = str(src_gex) + h5f.attrs["normalization"] = "log2(count+1) — inherited from upstream" + h5f.attrs["orientation"] = "samples_as_rows" + h5f.attrs["n_samples"] = n_samples + h5f.attrs["n_genes"] = n_genes + del arr_T + + write_min = (time.time() - t) / 60 + size_gb = dst_gex.stat().st_size / 1e9 + log(f" Wrote {size_gb:.2f} GB to {dst_gex} ({write_min:.1f} min)") + + return dst_gex + + +def main(): + log("=" * 70) + log("CSV → HDF5 conversion (Scope B, memory-safe)") + log("=" * 70) + log(f" Source: {SRC_DIR}") + log(f" Destination: {DST_DIR}") + + # Sanity + for p in [SRC_DIR / "train/gex.csv", SRC_DIR / "train/clin.csv", + SRC_DIR / "test/gex.csv", SRC_DIR / "test/clin.csv"]: + if not p.exists(): + sys.exit(f"ERROR: {p} not found") + + t_total = time.time() + train_h5 = convert_split("train") + test_h5 = convert_split("test") + total_min = (time.time() - t_total) / 60 + + log("\n" + "=" * 70) + log(f"DONE — HDF5 conversion complete ({total_min:.1f} min total)") + log("=" * 70) + log(f" Train HDF5: {train_h5} ({train_h5.stat().st_size / 1e9:.2f} GB)") + log(f" Test HDF5: {test_h5} ({test_h5.stat().st_size / 1e9:.2f} GB)") + log("\nNext steps:") + log(" 1. Verify HDF5 files (h5dump or python h5py read)") + log(" 2. Run h5_dataset.py + train_denoising_vae_h5.py " + "(I'll write these next)") + log("=" * 70) + + +if __name__ == "__main__": + main() diff --git a/flexynesis/h5_dataloader.py b/flexynesis/h5_dataloader.py new file mode 100644 index 00000000..b5451690 --- /dev/null +++ b/flexynesis/h5_dataloader.py @@ -0,0 +1,148 @@ +""" +h5_dataloader.py — HDF5-backed DataImporter for Flexynesis. + +Why this exists: + Flexynesis stock DataImporter calls pd.read_csv(gex.csv) which loads + as float64 (default), doubling memory: 14 GB CSV → 28 GB DataFrame + + Flexynesis copies → 60+ GB peak → ROG GUI freeze. + + This subclass overrides ONLY read_data() to load gex from HDF5 as + native float32, halving memory and skipping the slow CSV parse. + + Everything else (cleanup, scaling, label encoding, torch dataset + construction) uses Flexynesis's tested infrastructure unchanged. + +Usage: + from h5_dataloader import H5DataImporter + di = H5DataImporter( + path='processed_scaled_411k_tissue_B_h5', + data_types=['gex'], + log_transform=False, + top_percentile=100, + min_features=100, + ) + train_ds, test_ds = di.import_data() + # train_ds, test_ds are standard Flexynesis MultiomicsDataset objects. + +Expected directory layout: + {path}/train/gex.h5 ← HDF5 (samples × genes, float32) + {path}/train/clin.csv ← regular CSV (small, no issue) + {path}/test/gex.h5 + {path}/test/clin.csv + +Memory profile: + HDF5 → float32 numpy array : ~8 GB for 118k × 16k train + DataFrame wrap (no copy) : ~8 GB stable + Flexynesis normalize/scale : ~12 GB transient + PEAK: ~20 GB (vs 60+ with stock CSV path) → safe on 64 GB ROG +""" +import os +import h5py +import numpy as np +import pandas as pd + +from flexynesis.data import DataImporter + + +class H5DataImporter(DataImporter): + """ + Subclasses Flexynesis DataImporter; overrides read_data to use HDF5 + for the gex modality (the big one), CSV for everything else. + + Expects gex stored as HDF5 with this layout: + /expression (n_samples, n_genes) float32 + /sample_ids (n_samples,) bytes — sample IDs in HDF5 row order + /gene_symbols (n_genes,) bytes — gene symbols in HDF5 col order + """ + + def read_data(self, folder_path): + """ + Override of DataImporter.read_data. + + Returns dict {file_name: DataFrame} in the same format Flexynesis + expects, but loads `gex` from HDF5 (float32) instead of CSV (float64). + Other modalities and clin.csv still load from CSV. + """ + print("\n[INFO] ----------------- Reading Data (HDF5) ----------------- ") + data = {} + required_files = {'clin.csv'} | {f"{dt}.csv" for dt in self.data_types} + + for file in required_files: + file_name = os.path.splitext(file)[0] + + # GEX → HDF5 path; everything else → CSV + if file_name in self.data_types: + h5_path = os.path.join(folder_path, f"{file_name}.h5") + if not os.path.exists(h5_path): + # Fall back to CSV if HDF5 missing (graceful degradation) + csv_path = os.path.join(folder_path, file) + print(f"[INFO] HDF5 not found at {h5_path}, " + f"falling back to CSV: {csv_path}") + data[file_name] = pd.read_csv(csv_path, index_col=0) + else: + print(f"[INFO] Importing {h5_path} (HDF5)...") + data[file_name] = self._read_h5_as_dataframe(h5_path) + else: + # clin.csv etc — load as usual + csv_path = os.path.join(folder_path, file) + print(f"[INFO] Importing {csv_path}...") + data[file_name] = pd.read_csv(csv_path, index_col=0) + + return data + + @staticmethod + def _read_h5_as_dataframe(h5_path): + """ + Read HDF5 gex into a DataFrame matching Flexynesis CSV convention: + index = gene symbols (str) + columns = sample IDs (str) + values = float32 + + HDF5 stores samples-as-rows (118k × 16k); we transpose to + genes-as-rows during DataFrame construction. + + Memory: peak ~16 GB during transpose (118k × 16k × 4 = 7.7 GB, + plus one transpose copy). Drops to ~8 GB after. + """ + with h5py.File(h5_path, 'r') as f: + n_samples, n_genes = f['expression'].shape + print(f"[INFO] HDF5 shape: {n_samples:,} samples × {n_genes:,} genes (float32)") + + # Read as samples-rows, then transpose + arr = f['expression'][:] # (n_samples, n_genes) float32 + sample_ids = [s.decode() for s in f['sample_ids'][:]] + gene_symbols = [g.decode() for g in f['gene_symbols'][:]] + + # Transpose to (n_genes, n_samples) to match Flexynesis CSV convention + # Note: np.ascontiguousarray forces real copy in C-order + arr_T = np.ascontiguousarray(arr.T) + del arr + + df = pd.DataFrame( + arr_T, + index=gene_symbols, + columns=sample_ids, + ) + print(f"[INFO] DataFrame ready: {df.shape} dtype: {df.dtypes.iloc[0]}") + return df + + def validate_data_folders(self, training_path, testing_path): + """ + Override to accept either .csv OR .h5 for data_types modalities. + clin.csv must still exist as CSV. + """ + import os + for split_name, path in [("training", training_path), ("testing", testing_path)]: + if not os.path.isdir(path): + raise ValueError(f"{split_name} folder does not exist: {path}") + missing = [] + if not os.path.exists(os.path.join(path, "clin.csv")): + missing.append("clin.csv") + for dt in self.data_types: + h5_ok = os.path.exists(os.path.join(path, f"{dt}.h5")) + csv_ok = os.path.exists(os.path.join(path, f"{dt}.csv")) + if not (h5_ok or csv_ok): + missing.append(f"{dt}.h5 or {dt}.csv") + if missing: + raise ValueError(f"Missing files in {split_name} folder: {', '.join(missing)}") + print("[INFO] Validating data folders... OK (HDF5 or CSV accepted)") diff --git a/pyproject.toml b/pyproject.toml index 093d184a..6b86d396 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,8 @@ dependencies = [ "pot", "geomloss", "plotnine", - "safetensors" + "safetensors", + "h5py" ] [project.scripts] From 0809bae0741d5fc803427054c6aacc50950d5c96 Mon Sep 17 00:00:00 2001 From: Amit Pande Date: Thu, 14 May 2026 15:41:07 +0200 Subject: [PATCH 2/5] style: sort imports (isort) in h5_dataloader.py and csv_to_h5.py --- flexynesis/csv_to_h5.py | 1 - flexynesis/h5_dataloader.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/flexynesis/csv_to_h5.py b/flexynesis/csv_to_h5.py index 329d6e68..3b5a04be 100644 --- a/flexynesis/csv_to_h5.py +++ b/flexynesis/csv_to_h5.py @@ -42,7 +42,6 @@ import numpy as np import pandas as pd - ROOT = Path("/home/amit/Desktop/projects/flexynesis") SRC_DIR = ROOT / "processed_scaled_411k_tissue_B" DST_DIR = ROOT / "processed_scaled_411k_tissue_B_h5" diff --git a/flexynesis/h5_dataloader.py b/flexynesis/h5_dataloader.py index b5451690..aa9c63e4 100644 --- a/flexynesis/h5_dataloader.py +++ b/flexynesis/h5_dataloader.py @@ -37,6 +37,7 @@ PEAK: ~20 GB (vs 60+ with stock CSV path) → safe on 64 GB ROG """ import os + import h5py import numpy as np import pandas as pd From 6b3fd8049b39b3970204cdb0f7086d63a1880af2 Mon Sep 17 00:00:00 2001 From: Amit Pande Date: Thu, 14 May 2026 15:47:07 +0200 Subject: [PATCH 3/5] style: fix E221 (whitespace before operator) for flake8 compliance --- flexynesis/csv_to_h5.py | 18 +++++++++--------- flexynesis/h5_dataloader.py | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/flexynesis/csv_to_h5.py b/flexynesis/csv_to_h5.py index 3b5a04be..a63dfaaf 100644 --- a/flexynesis/csv_to_h5.py +++ b/flexynesis/csv_to_h5.py @@ -42,7 +42,7 @@ import numpy as np import pandas as pd -ROOT = Path("/home/amit/Desktop/projects/flexynesis") +ROOT = Path("/home/amit/Desktop/projects/flexynesis") SRC_DIR = ROOT / "processed_scaled_411k_tissue_B" DST_DIR = ROOT / "processed_scaled_411k_tissue_B_h5" @@ -54,9 +54,9 @@ def log(msg): def convert_split(split): - src_gex = SRC_DIR / split / "gex.csv" + src_gex = SRC_DIR / split / "gex.csv" src_clin = SRC_DIR / split / "clin.csv" - dst_gex = DST_DIR / split / "gex.h5" + dst_gex = DST_DIR / split / "gex.h5" dst_clin = DST_DIR / split / "clin.csv" DST_DIR.joinpath(split).mkdir(parents=True, exist_ok=True) @@ -138,12 +138,12 @@ def convert_split(split): data=np.array(sample_ids, dtype="S")) h5f.create_dataset("gene_symbols", data=np.array(gene_symbols, dtype="S")) - h5f.attrs["created_by"] = "csv_to_h5.py" - h5f.attrs["source_csv"] = str(src_gex) + h5f.attrs["created_by"] = "csv_to_h5.py" + h5f.attrs["source_csv"] = str(src_gex) h5f.attrs["normalization"] = "log2(count+1) — inherited from upstream" - h5f.attrs["orientation"] = "samples_as_rows" - h5f.attrs["n_samples"] = n_samples - h5f.attrs["n_genes"] = n_genes + h5f.attrs["orientation"] = "samples_as_rows" + h5f.attrs["n_samples"] = n_samples + h5f.attrs["n_genes"] = n_genes del arr_T write_min = (time.time() - t) / 60 @@ -168,7 +168,7 @@ def main(): t_total = time.time() train_h5 = convert_split("train") - test_h5 = convert_split("test") + test_h5 = convert_split("test") total_min = (time.time() - t_total) / 60 log("\n" + "=" * 70) diff --git a/flexynesis/h5_dataloader.py b/flexynesis/h5_dataloader.py index aa9c63e4..61daa12d 100644 --- a/flexynesis/h5_dataloader.py +++ b/flexynesis/h5_dataloader.py @@ -111,7 +111,7 @@ def _read_h5_as_dataframe(h5_path): # Read as samples-rows, then transpose arr = f['expression'][:] # (n_samples, n_genes) float32 - sample_ids = [s.decode() for s in f['sample_ids'][:]] + sample_ids = [s.decode() for s in f['sample_ids'][:]] gene_symbols = [g.decode() for g in f['gene_symbols'][:]] # Transpose to (n_genes, n_samples) to match Flexynesis CSV convention From 7fe2541b1ac8a2bc5857bda7b725b807dbe5ffed Mon Sep 17 00:00:00 2001 From: Amit Pande Date: Tue, 19 May 2026 14:38:22 +0200 Subject: [PATCH 4/5] refactor: make HDF5 dataloader and converter project-agnostic Addresses review feedback on PR #146. The csv_to_h5 converter and the H5DataImporter contained project-specific paths, comments, and assumptions that prevented reuse outside the original project. csv_to_h5.py: - Replace hard-coded input/output paths with argparse arguments; the converter now operates on any single CSV file. - Remove the train/test folder-pair assumption and clin.csv copying; one CSV is converted to one HDF5 file. - Remove project-specific narration from docstrings and comments. h5_dataloader.py: - Remove project-specific commentary; generalise docstrings to describe a reusable tool rather than one project's setup. Both files: - Use generic HDF5 dataset names (matrix, feature_names, sample_ids) in place of expression-specific names, so the format is not tied to gene-expression data. Core behaviour is unchanged: H5DataImporter remains a drop-in subclass of DataImporter overriding only read_data() and validate_data_folders(), with automatic CSV fallback when no .h5 file is present. --- flexynesis/csv_to_h5.py | 254 +++++++++++++++--------------------- flexynesis/h5_dataloader.py | 133 +++++++++---------- 2 files changed, 168 insertions(+), 219 deletions(-) diff --git a/flexynesis/csv_to_h5.py b/flexynesis/csv_to_h5.py index a63dfaaf..a777d5c5 100644 --- a/flexynesis/csv_to_h5.py +++ b/flexynesis/csv_to_h5.py @@ -1,39 +1,30 @@ #!/usr/bin/env python3 """ -csv_to_h5.py — Convert Scope B CSVs to HDF5 for memory-safe training. - -Why this exists: - Flexynesis CSV DataImporter blows RAM (60+ GB peak with copies) on the - Scope B 14 GB train CSV → GUI freeze + force-shutdown. HDF5 with - lazy per-sample loading sidesteps the whole problem. - -Inputs (must exist): - processed_scaled_411k_tissue_B/train/gex.csv (14 GB, ~118k samples) - processed_scaled_411k_tissue_B/train/clin.csv - processed_scaled_411k_tissue_B/test/gex.csv (3.3 GB, ~28k samples) - processed_scaled_411k_tissue_B/test/clin.csv - -Outputs: - processed_scaled_411k_tissue_B_h5/train/gex.h5 (~8 GB, samples × genes) - processed_scaled_411k_tissue_B_h5/train/clin.csv (copied) - processed_scaled_411k_tissue_B_h5/test/gex.h5 (~2 GB) - processed_scaled_411k_tissue_B_h5/test/clin.csv (copied) - -HDF5 layout per gex.h5: - /expression (n_samples, n_genes) float32, chunks=(1, n_genes) ← fast row reads - /sample_ids (n_samples,) bytes - /gene_symbols (n_genes,) bytes - attrs: created_by, source_csv, normalization, orientation - -Memory profile: - Read phase: ~8 GB (pre-allocated array fills as chunks parsed) - Transpose phase: ~16 GB (temporary 2× during transpose copy) - Write phase: ~8 GB (writes to disk, then frees) - PEAK: ~16 GB ← well within 58 GB free - -Runtime: ~6-8 min total (NVMe sequential, no compression overhead) +csv_to_h5.py - Convert a feature matrix CSV to HDF5 for memory-efficient loading. + +Large feature-matrix CSV files are slow to parse and, when read with pandas, +are stored as float64 by default - doubling their in-memory footprint. +Converting such a CSV to HDF5 once allows it to be loaded later as native +float32 with substantially lower peak memory. + +The input CSV is expected to have features as rows and samples as columns, with +the first column containing feature identifiers and the header row containing +sample identifiers. This matches the layout used elsewhere in Flexynesis. + +The output HDF5 file has the following layout: + /matrix (n_samples, n_features) float32, chunked (1, n_features) + /sample_ids (n_samples,) byte strings + /feature_names (n_features,) byte strings + attrs: created_by, source_csv, orientation, n_samples, n_features + +Data are stored samples-as-rows with per-row chunking so that individual +samples can be read efficiently. H5DataImporter reads files in this layout. + +Usage: + python -m flexynesis.csv_to_h5 input.csv output.h5 + python -m flexynesis.csv_to_h5 input.csv output.h5 --chunksize 1000 """ -import shutil +import argparse import sys import time from pathlib import Path @@ -42,145 +33,112 @@ import numpy as np import pandas as pd -ROOT = Path("/home/amit/Desktop/projects/flexynesis") -SRC_DIR = ROOT / "processed_scaled_411k_tissue_B" -DST_DIR = ROOT / "processed_scaled_411k_tissue_B_h5" - -GENES_PER_CHUNK = 500 # pandas chunksize +DEFAULT_CHUNKSIZE = 500 # rows (features) read per pandas chunk def log(msg): print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True) -def convert_split(split): - src_gex = SRC_DIR / split / "gex.csv" - src_clin = SRC_DIR / split / "clin.csv" - dst_gex = DST_DIR / split / "gex.h5" - dst_clin = DST_DIR / split / "clin.csv" +def convert_csv_to_h5(src_csv, dst_h5, chunksize=DEFAULT_CHUNKSIZE): + """ + Convert a single feature-matrix CSV to HDF5. + + Parameters + ---------- + src_csv : str or Path + Input CSV: features as rows, samples as columns, first column is the + feature index. + dst_h5 : str or Path + Output HDF5 path. Parent directories are created if needed. + chunksize : int + Number of CSV rows parsed per chunk while streaming the file. - DST_DIR.joinpath(split).mkdir(parents=True, exist_ok=True) + Returns + ------- + Path + The path to the written HDF5 file. + """ + src_csv = Path(src_csv) + dst_h5 = Path(dst_h5) - log("=" * 70) - log(f"[{split}] CSV → HDF5") - log("=" * 70) + if not src_csv.exists(): + raise FileNotFoundError(f"Input CSV not found: {src_csv}") - # ---------- Copy clin.csv ---------- - shutil.copy(src_clin, dst_clin) - log(f" Copied clin.csv → {dst_clin}") + dst_h5.parent.mkdir(parents=True, exist_ok=True) - # ---------- Pass 1: scan structure (headers + gene names) ---------- - log(" [1/4] Scanning CSV structure...") - t = time.time() + log(f"Converting {src_csv} -> {dst_h5}") - # Get sample IDs from header (one-row read, near-instant) - header_df = pd.read_csv(src_gex, nrows=0, index_col=0) + # ---------- Scan structure: sample IDs (header) and feature IDs (col 0) ---------- + header_df = pd.read_csv(src_csv, nrows=0, index_col=0) sample_ids = header_df.columns.tolist() n_samples = len(sample_ids) - # Get gene symbols (first column only — fast) - gene_col = pd.read_csv(src_gex, usecols=[0]) - gene_symbols = gene_col.iloc[:, 0].tolist() - n_genes = len(gene_symbols) - del gene_col + feature_col = pd.read_csv(src_csv, usecols=[0]) + feature_names = feature_col.iloc[:, 0].astype(str).tolist() + n_features = len(feature_names) + del feature_col - log(f" Samples: {n_samples:,} Genes: {n_genes:,} " - f"(scan {time.time()-t:.1f}s)") - log(f" Output size: {n_samples*n_genes*4/1e9:.2f} GB (float32, no compression)") + log(f" {n_samples:,} samples x {n_features:,} features") - # ---------- Pass 2: stream CSV into pre-allocated array ---------- - log(" [2/4] Streaming CSV into RAM (pandas chunked, float32)...") - t = time.time() + # ---------- Stream CSV into a pre-allocated (n_features, n_samples) array ---------- + arr = np.empty((n_features, n_samples), dtype=np.float32) - # Pre-allocate target: (n_genes, n_samples) — matches CSV orientation - arr = np.empty((n_genes, n_samples), dtype=np.float32) - - chunks = pd.read_csv(src_gex, index_col=0, chunksize=GENES_PER_CHUNK) - gene_idx = 0 - for chunk_i, chunk in enumerate(chunks): - chunk_arr = chunk.values.astype(np.float32, copy=False) # (n, n_samples) + chunks = pd.read_csv(src_csv, index_col=0, chunksize=chunksize) + row_idx = 0 + for chunk in chunks: + chunk_arr = chunk.values.astype(np.float32, copy=False) n = chunk_arr.shape[0] - arr[gene_idx:gene_idx + n] = chunk_arr - gene_idx += n + arr[row_idx:row_idx + n] = chunk_arr + row_idx += n del chunk_arr - if (chunk_i + 1) % 4 == 0: - elapsed = time.time() - t - rate = gene_idx / elapsed - eta_min = (n_genes - gene_idx) / rate / 60 - log(f" {gene_idx:>6,}/{n_genes:,} genes " - f"({rate:.0f}/s, ETA {eta_min:.1f} min, elapsed {elapsed/60:.1f} min)") - - read_min = (time.time() - t) / 60 - log(f" Read done: {arr.nbytes / 1e9:.2f} GB in RAM ({read_min:.1f} min)") - - # ---------- Pass 3: transpose to (n_samples, n_genes) ---------- - log(" [3/4] Transposing (n_genes, n_samples) → (n_samples, n_genes)...") - t = time.time() - # ascontiguousarray forces a real copy in C-order — needed for HDF5 write - arr_T = np.ascontiguousarray(arr.T) - del arr # free 8 GB - log(f" Transpose done ({time.time()-t:.1f}s) " - f"Now: {arr_T.nbytes / 1e9:.2f} GB") - - # ---------- Pass 4: write HDF5 ---------- - log(" [4/4] Writing HDF5...") - t = time.time() - with h5py.File(dst_gex, "w") as h5f: - # chunks=(1, n_genes) → one chunk = one sample row = fast single-sample reads - h5f.create_dataset( - "expression", - data=arr_T, - chunks=(1, n_genes), - # No compression — speed > space (NVMe has 464 GB free) + if row_idx != n_features: + raise ValueError( + f"Row count mismatch: scanned {n_features} features, " + f"read {row_idx} while streaming." ) - h5f.create_dataset("sample_ids", - data=np.array(sample_ids, dtype="S")) - h5f.create_dataset("gene_symbols", - data=np.array(gene_symbols, dtype="S")) + + # ---------- Transpose to (n_samples, n_features) for samples-as-rows storage ---------- + arr_t = np.ascontiguousarray(arr.T) + del arr + + # ---------- Write HDF5 ---------- + with h5py.File(dst_h5, "w") as h5f: + # Per-row chunking: one chunk == one sample, for fast single-sample reads. + h5f.create_dataset("matrix", data=arr_t, chunks=(1, n_features)) + h5f.create_dataset("sample_ids", data=np.array(sample_ids, dtype="S")) + h5f.create_dataset("feature_names", + data=np.array(feature_names, dtype="S")) h5f.attrs["created_by"] = "csv_to_h5.py" - h5f.attrs["source_csv"] = str(src_gex) - h5f.attrs["normalization"] = "log2(count+1) — inherited from upstream" + h5f.attrs["source_csv"] = str(src_csv) h5f.attrs["orientation"] = "samples_as_rows" h5f.attrs["n_samples"] = n_samples - h5f.attrs["n_genes"] = n_genes - del arr_T - - write_min = (time.time() - t) / 60 - size_gb = dst_gex.stat().st_size / 1e9 - log(f" Wrote {size_gb:.2f} GB to {dst_gex} ({write_min:.1f} min)") - - return dst_gex - - -def main(): - log("=" * 70) - log("CSV → HDF5 conversion (Scope B, memory-safe)") - log("=" * 70) - log(f" Source: {SRC_DIR}") - log(f" Destination: {DST_DIR}") - - # Sanity - for p in [SRC_DIR / "train/gex.csv", SRC_DIR / "train/clin.csv", - SRC_DIR / "test/gex.csv", SRC_DIR / "test/clin.csv"]: - if not p.exists(): - sys.exit(f"ERROR: {p} not found") - - t_total = time.time() - train_h5 = convert_split("train") - test_h5 = convert_split("test") - total_min = (time.time() - t_total) / 60 - - log("\n" + "=" * 70) - log(f"DONE — HDF5 conversion complete ({total_min:.1f} min total)") - log("=" * 70) - log(f" Train HDF5: {train_h5} ({train_h5.stat().st_size / 1e9:.2f} GB)") - log(f" Test HDF5: {test_h5} ({test_h5.stat().st_size / 1e9:.2f} GB)") - log("\nNext steps:") - log(" 1. Verify HDF5 files (h5dump or python h5py read)") - log(" 2. Run h5_dataset.py + train_denoising_vae_h5.py " - "(I'll write these next)") - log("=" * 70) + h5f.attrs["n_features"] = n_features + del arr_t + + size_gb = dst_h5.stat().st_size / 1e9 + log(f" Wrote {size_gb:.2f} GB to {dst_h5}") + return dst_h5 + + +def main(argv=None): + parser = argparse.ArgumentParser( + description="Convert a feature-matrix CSV to HDF5 for " + "memory-efficient loading." + ) + parser.add_argument("input_csv", help="Input CSV (features x samples).") + parser.add_argument("output_h5", help="Output HDF5 file path.") + parser.add_argument( + "--chunksize", type=int, default=DEFAULT_CHUNKSIZE, + help=f"CSV rows parsed per chunk (default: {DEFAULT_CHUNKSIZE}).", + ) + args = parser.parse_args(argv) + + try: + convert_csv_to_h5(args.input_csv, args.output_h5, args.chunksize) + except (FileNotFoundError, ValueError) as exc: + sys.exit(f"ERROR: {exc}") if __name__ == "__main__": diff --git a/flexynesis/h5_dataloader.py b/flexynesis/h5_dataloader.py index 61daa12d..9160c660 100644 --- a/flexynesis/h5_dataloader.py +++ b/flexynesis/h5_dataloader.py @@ -1,40 +1,30 @@ """ -h5_dataloader.py — HDF5-backed DataImporter for Flexynesis. +h5_dataloader.py - HDF5-backed DataImporter for Flexynesis. -Why this exists: - Flexynesis stock DataImporter calls pd.read_csv(gex.csv) which loads - as float64 (default), doubling memory: 14 GB CSV → 28 GB DataFrame + - Flexynesis copies → 60+ GB peak → ROG GUI freeze. +The stock Flexynesis DataImporter reads each modality with pd.read_csv, which +loads values as float64 by default. For large feature matrices this doubles +the in-memory footprint and makes data loading the dominant memory cost of a +training run. - This subclass overrides ONLY read_data() to load gex from HDF5 as - native float32, halving memory and skipping the slow CSV parse. +H5DataImporter is a drop-in subclass of DataImporter that loads any modality +from HDF5 as native float32 when an .h5 file is present, and otherwise falls +back to the standard CSV path. All other Flexynesis behaviour - data cleanup, +NaN imputation, label encoding, scaling, and torch dataset construction - is +inherited unchanged. - Everything else (cleanup, scaling, label encoding, torch dataset - construction) uses Flexynesis's tested infrastructure unchanged. +HDF5 files are expected in the layout produced by csv_to_h5.py: + /matrix (n_samples, n_features) float32 + /sample_ids (n_samples,) byte strings + /feature_names (n_features,) byte strings + +Each modality folder is expected to contain clin.csv (as required by the +stock DataImporter); every other modality may be supplied as either an .h5 +file or a .csv file. Usage: - from h5_dataloader import H5DataImporter - di = H5DataImporter( - path='processed_scaled_411k_tissue_B_h5', - data_types=['gex'], - log_transform=False, - top_percentile=100, - min_features=100, - ) - train_ds, test_ds = di.import_data() - # train_ds, test_ds are standard Flexynesis MultiomicsDataset objects. - -Expected directory layout: - {path}/train/gex.h5 ← HDF5 (samples × genes, float32) - {path}/train/clin.csv ← regular CSV (small, no issue) - {path}/test/gex.h5 - {path}/test/clin.csv - -Memory profile: - HDF5 → float32 numpy array : ~8 GB for 118k × 16k train - DataFrame wrap (no copy) : ~8 GB stable - Flexynesis normalize/scale : ~12 GB transient - PEAK: ~20 GB (vs 60+ with stock CSV path) → safe on 64 GB ROG + from flexynesis.h5_dataloader import H5DataImporter + di = H5DataImporter(path='my_dataset', data_types=['gex']) + train_ds, test_ds = di.import_data() """ import os @@ -47,22 +37,26 @@ class H5DataImporter(DataImporter): """ - Subclasses Flexynesis DataImporter; overrides read_data to use HDF5 - for the gex modality (the big one), CSV for everything else. + DataImporter subclass that loads modality matrices from HDF5 when + available, and from CSV otherwise. + + Only read_data() and validate_data_folders() are overridden; the parent + DataImporter and all downstream Flexynesis components are unchanged. - Expects gex stored as HDF5 with this layout: - /expression (n_samples, n_genes) float32 - /sample_ids (n_samples,) bytes — sample IDs in HDF5 row order - /gene_symbols (n_genes,) bytes — gene symbols in HDF5 col order + HDF5 files are expected in the layout written by csv_to_h5.py: + /matrix (n_samples, n_features) float32 + /sample_ids (n_samples,) byte strings + /feature_names (n_features,) byte strings """ def read_data(self, folder_path): """ Override of DataImporter.read_data. - Returns dict {file_name: DataFrame} in the same format Flexynesis - expects, but loads `gex` from HDF5 (float32) instead of CSV (float64). - Other modalities and clin.csv still load from CSV. + Returns a dict {file_name: DataFrame} in the format Flexynesis + expects. Each modality in self.data_types is loaded from an .h5 file + if present, otherwise from the corresponding .csv. clin.csv is always + loaded from CSV. """ print("\n[INFO] ----------------- Reading Data (HDF5) ----------------- ") data = {} @@ -71,11 +65,11 @@ def read_data(self, folder_path): for file in required_files: file_name = os.path.splitext(file)[0] - # GEX → HDF5 path; everything else → CSV + # Modality matrices may be HDF5; clin.csv is always CSV. if file_name in self.data_types: h5_path = os.path.join(folder_path, f"{file_name}.h5") if not os.path.exists(h5_path): - # Fall back to CSV if HDF5 missing (graceful degradation) + # Fall back to CSV when no HDF5 file is present. csv_path = os.path.join(folder_path, file) print(f"[INFO] HDF5 not found at {h5_path}, " f"falling back to CSV: {csv_path}") @@ -84,7 +78,6 @@ def read_data(self, folder_path): print(f"[INFO] Importing {h5_path} (HDF5)...") data[file_name] = self._read_h5_as_dataframe(h5_path) else: - # clin.csv etc — load as usual csv_path = os.path.join(folder_path, file) print(f"[INFO] Importing {csv_path}...") data[file_name] = pd.read_csv(csv_path, index_col=0) @@ -94,48 +87,44 @@ def read_data(self, folder_path): @staticmethod def _read_h5_as_dataframe(h5_path): """ - Read HDF5 gex into a DataFrame matching Flexynesis CSV convention: - index = gene symbols (str) - columns = sample IDs (str) + Read an HDF5 modality file into a DataFrame matching the Flexynesis + CSV convention: + index = feature identifiers (str) + columns = sample identifiers (str) values = float32 - HDF5 stores samples-as-rows (118k × 16k); we transpose to - genes-as-rows during DataFrame construction. - - Memory: peak ~16 GB during transpose (118k × 16k × 4 = 7.7 GB, - plus one transpose copy). Drops to ~8 GB after. + HDF5 stores the matrix samples-as-rows; it is transposed here to + features-as-rows during DataFrame construction. """ with h5py.File(h5_path, 'r') as f: - n_samples, n_genes = f['expression'].shape - print(f"[INFO] HDF5 shape: {n_samples:,} samples × {n_genes:,} genes (float32)") + n_samples, n_features = f['matrix'].shape + print(f"[INFO] HDF5 shape: {n_samples:,} samples x " + f"{n_features:,} features (float32)") - # Read as samples-rows, then transpose - arr = f['expression'][:] # (n_samples, n_genes) float32 + arr = f['matrix'][:] # (n_samples, n_features) float32 sample_ids = [s.decode() for s in f['sample_ids'][:]] - gene_symbols = [g.decode() for g in f['gene_symbols'][:]] + feature_names = [g.decode() for g in f['feature_names'][:]] - # Transpose to (n_genes, n_samples) to match Flexynesis CSV convention - # Note: np.ascontiguousarray forces real copy in C-order - arr_T = np.ascontiguousarray(arr.T) + # Transpose to (n_features, n_samples) to match the CSV convention. + # np.ascontiguousarray forces a real copy in C-order. + arr_t = np.ascontiguousarray(arr.T) del arr - df = pd.DataFrame( - arr_T, - index=gene_symbols, - columns=sample_ids, - ) - print(f"[INFO] DataFrame ready: {df.shape} dtype: {df.dtypes.iloc[0]}") + df = pd.DataFrame(arr_t, index=feature_names, columns=sample_ids) + print(f"[INFO] DataFrame ready: {df.shape} " + f"dtype: {df.dtypes.iloc[0]}") return df def validate_data_folders(self, training_path, testing_path): """ - Override to accept either .csv OR .h5 for data_types modalities. - clin.csv must still exist as CSV. + Override to accept either a .csv or an .h5 file for each modality in + self.data_types. clin.csv must still be present as CSV. """ - import os - for split_name, path in [("training", training_path), ("testing", testing_path)]: + for split_name, path in [("training", training_path), + ("testing", testing_path)]: if not os.path.isdir(path): - raise ValueError(f"{split_name} folder does not exist: {path}") + raise ValueError( + f"{split_name} folder does not exist: {path}") missing = [] if not os.path.exists(os.path.join(path, "clin.csv")): missing.append("clin.csv") @@ -145,5 +134,7 @@ def validate_data_folders(self, training_path, testing_path): if not (h5_ok or csv_ok): missing.append(f"{dt}.h5 or {dt}.csv") if missing: - raise ValueError(f"Missing files in {split_name} folder: {', '.join(missing)}") + raise ValueError( + f"Missing files in {split_name} folder: " + f"{', '.join(missing)}") print("[INFO] Validating data folders... OK (HDF5 or CSV accepted)") From 9c436ef4881b3113df92d61a404144071ae4b8cc Mon Sep 17 00:00:00 2001 From: Amit Pande Date: Wed, 20 May 2026 13:48:23 +0200 Subject: [PATCH 5/5] fix: CLI autodetect for HDF5 inputs; correct feature_names in converter Addresses Bora's second review on PR #146. __main__.py: Add HDF5 autodetection before instantiating the training-mode DataImporter. If any modality file is present as .h5 in either the train/ or test/ split, switch to H5DataImporter; otherwise fall back to the stock CSV DataImporter. This makes the HDF5 path reachable from the command line, not only from notebooks. The CSV-only path is unchanged. csv_to_h5.py: Fix incorrect feature-name extraction. The previous implementation used pd.read_csv(usecols=[0]) which treats row 0 as a header rather than the feature-index column, producing an array of 'nan' strings in the HDF5 file. Replaced with pd.read_csv(index_col=0, usecols=[0]) so that the first column is read as the row index. Verified on the reviewer-supplied benchmark dataset: feature names are now unique and correct (A1CF, A2M, AADAC, ...). Verified end-to-end on dataset1_h5: HDF5 detection log appears, H5 modalities are loaded as float32, per-file CSV fallback works for modalities missing an .h5 file, and data validation passes. --- flexynesis/__main__.py | 19 ++++++++++++++++++- flexynesis/csv_to_h5.py | 8 +++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/flexynesis/__main__.py b/flexynesis/__main__.py index ecb5fe69..bdce3de4 100644 --- a/flexynesis/__main__.py +++ b/flexynesis/__main__.py @@ -1,4 +1,5 @@ import argparse +import glob import os import sys import time @@ -1122,7 +1123,23 @@ def main(): else: covariates = None - data_importer = DataImporter( + # Autodetect HDF5 inputs: if any modality is supplied as + # .h5 instead of .csv (in either the train/ or test/ + # split), use the HDF5-backed loader. clin.csv is still + # required as CSV by the stock DataImporter. + h5_present = any( + glob.glob(os.path.join(args.data_path, split, f"{dt}.h5")) + for split in ("train", "test") + for dt in datatypes + ) + if h5_present: + from .h5_dataloader import H5DataImporter as _DataImporter + print("[INFO] HDF5 modality files detected " + "-- using H5DataImporter.") + else: + _DataImporter = DataImporter + + data_importer = _DataImporter( path=args.data_path, data_types=datatypes, covariates=covariates, diff --git a/flexynesis/csv_to_h5.py b/flexynesis/csv_to_h5.py index a777d5c5..4f4a9a2d 100644 --- a/flexynesis/csv_to_h5.py +++ b/flexynesis/csv_to_h5.py @@ -74,10 +74,12 @@ def convert_csv_to_h5(src_csv, dst_h5, chunksize=DEFAULT_CHUNKSIZE): sample_ids = header_df.columns.tolist() n_samples = len(sample_ids) - feature_col = pd.read_csv(src_csv, usecols=[0]) - feature_names = feature_col.iloc[:, 0].astype(str).tolist() + # Use index_col=0 to read the first column as the row index; + # `nrows=None` keeps it light because no value columns are loaded. + feature_index = pd.read_csv(src_csv, index_col=0, usecols=[0]).index + feature_names = feature_index.astype(str).tolist() n_features = len(feature_names) - del feature_col + del feature_index log(f" {n_samples:,} samples x {n_features:,} features")