Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions flexynesis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __repr__(self):
models = LazyModule("models")
feature_selection = LazyModule("feature_selection")
utils = LazyModule("utils")
csv_to_h5 = LazyModule("csv_to_h5")


# Import commonly used classes directly for easy access
Expand All @@ -76,6 +77,12 @@ def _get_data_importer():
return data.DataImporter


def _get_h5_data_importer():
"""Lazy getter for H5DataImporter class (HDF5-backed)."""
from .h5_dataloader import H5DataImporter
return H5DataImporter


def _get_models():
"""Lazy getter for model classes."""
return models
Expand All @@ -90,5 +97,7 @@ def _get_models():
"models",
"feature_selection",
"utils",
"csv_to_h5",
"DataImporter",
"H5DataImporter",
]
19 changes: 18 additions & 1 deletion flexynesis/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import argparse
import glob
import os
import sys
import time
Expand Down Expand Up @@ -1122,7 +1123,23 @@ def main():
else:
covariates = None

data_importer = DataImporter(
# Autodetect HDF5 inputs: if any modality is supplied as
# .h5 instead of .csv (in either the train/ or test/
# split), use the HDF5-backed loader. clin.csv is still
# required as CSV by the stock DataImporter.
h5_present = any(
glob.glob(os.path.join(args.data_path, split, f"{dt}.h5"))
for split in ("train", "test")
for dt in datatypes
)
if h5_present:
from .h5_dataloader import H5DataImporter as _DataImporter
print("[INFO] HDF5 modality files detected "
"-- using H5DataImporter.")
else:
_DataImporter = DataImporter

data_importer = _DataImporter(
path=args.data_path,
data_types=datatypes,
covariates=covariates,
Expand Down
147 changes: 147 additions & 0 deletions flexynesis/csv_to_h5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#!/usr/bin/env python3

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This script altogether is not acceptable. It is not generic. It is for your specific project use-case. It would be useful if it worked on any csv file(s).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made it generic

"""
csv_to_h5.py - Convert a feature matrix CSV to HDF5 for memory-efficient loading.

Large feature-matrix CSV files are slow to parse and, when read with pandas,
are stored as float64 by default - doubling their in-memory footprint.
Converting such a CSV to HDF5 once allows it to be loaded later as native
float32 with substantially lower peak memory.

The input CSV is expected to have features as rows and samples as columns, with
the first column containing feature identifiers and the header row containing
sample identifiers. This matches the layout used elsewhere in Flexynesis.

The output HDF5 file has the following layout:
/matrix (n_samples, n_features) float32, chunked (1, n_features)
/sample_ids (n_samples,) byte strings
/feature_names (n_features,) byte strings
attrs: created_by, source_csv, orientation, n_samples, n_features

Data are stored samples-as-rows with per-row chunking so that individual
samples can be read efficiently. H5DataImporter reads files in this layout.

Usage:
python -m flexynesis.csv_to_h5 input.csv output.h5
python -m flexynesis.csv_to_h5 input.csv output.h5 --chunksize 1000
"""
import argparse
import sys
import time
from pathlib import Path

import h5py
import numpy as np
import pandas as pd

DEFAULT_CHUNKSIZE = 500 # rows (features) read per pandas chunk


def log(msg):
print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True)


def convert_csv_to_h5(src_csv, dst_h5, chunksize=DEFAULT_CHUNKSIZE):
"""
Convert a single feature-matrix CSV to HDF5.

Parameters
----------
src_csv : str or Path
Input CSV: features as rows, samples as columns, first column is the
feature index.
dst_h5 : str or Path
Output HDF5 path. Parent directories are created if needed.
chunksize : int
Number of CSV rows parsed per chunk while streaming the file.

Returns
-------
Path
The path to the written HDF5 file.
"""
src_csv = Path(src_csv)
dst_h5 = Path(dst_h5)

if not src_csv.exists():
raise FileNotFoundError(f"Input CSV not found: {src_csv}")

dst_h5.parent.mkdir(parents=True, exist_ok=True)

log(f"Converting {src_csv} -> {dst_h5}")

# ---------- Scan structure: sample IDs (header) and feature IDs (col 0) ----------
header_df = pd.read_csv(src_csv, nrows=0, index_col=0)
sample_ids = header_df.columns.tolist()
n_samples = len(sample_ids)

# Use index_col=0 to read the first column as the row index;
# `nrows=None` keeps it light because no value columns are loaded.
feature_index = pd.read_csv(src_csv, index_col=0, usecols=[0]).index
feature_names = feature_index.astype(str).tolist()
n_features = len(feature_names)
del feature_index

log(f" {n_samples:,} samples x {n_features:,} features")

# ---------- Stream CSV into a pre-allocated (n_features, n_samples) array ----------
arr = np.empty((n_features, n_samples), dtype=np.float32)

chunks = pd.read_csv(src_csv, index_col=0, chunksize=chunksize)
row_idx = 0
for chunk in chunks:
chunk_arr = chunk.values.astype(np.float32, copy=False)
n = chunk_arr.shape[0]
arr[row_idx:row_idx + n] = chunk_arr
row_idx += n
del chunk_arr

if row_idx != n_features:
raise ValueError(
f"Row count mismatch: scanned {n_features} features, "
f"read {row_idx} while streaming."
)

# ---------- Transpose to (n_samples, n_features) for samples-as-rows storage ----------
arr_t = np.ascontiguousarray(arr.T)
del arr

# ---------- Write HDF5 ----------
with h5py.File(dst_h5, "w") as h5f:
# Per-row chunking: one chunk == one sample, for fast single-sample reads.
h5f.create_dataset("matrix", data=arr_t, chunks=(1, n_features))
h5f.create_dataset("sample_ids", data=np.array(sample_ids, dtype="S"))
h5f.create_dataset("feature_names",
data=np.array(feature_names, dtype="S"))
h5f.attrs["created_by"] = "csv_to_h5.py"
h5f.attrs["source_csv"] = str(src_csv)
h5f.attrs["orientation"] = "samples_as_rows"
h5f.attrs["n_samples"] = n_samples
h5f.attrs["n_features"] = n_features
del arr_t

size_gb = dst_h5.stat().st_size / 1e9
log(f" Wrote {size_gb:.2f} GB to {dst_h5}")
return dst_h5


def main(argv=None):
parser = argparse.ArgumentParser(
description="Convert a feature-matrix CSV to HDF5 for "
"memory-efficient loading."
)
parser.add_argument("input_csv", help="Input CSV (features x samples).")
parser.add_argument("output_h5", help="Output HDF5 file path.")
parser.add_argument(
"--chunksize", type=int, default=DEFAULT_CHUNKSIZE,
help=f"CSV rows parsed per chunk (default: {DEFAULT_CHUNKSIZE}).",
)
args = parser.parse_args(argv)

try:
convert_csv_to_h5(args.input_csv, args.output_h5, args.chunksize)
except (FileNotFoundError, ValueError) as exc:
sys.exit(f"ERROR: {exc}")


if __name__ == "__main__":
main()
140 changes: 140 additions & 0 deletions flexynesis/h5_dataloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""
h5_dataloader.py - HDF5-backed DataImporter for Flexynesis.

The stock Flexynesis DataImporter reads each modality with pd.read_csv, which
loads values as float64 by default. For large feature matrices this doubles
the in-memory footprint and makes data loading the dominant memory cost of a
training run.

H5DataImporter is a drop-in subclass of DataImporter that loads any modality
from HDF5 as native float32 when an .h5 file is present, and otherwise falls
back to the standard CSV path. All other Flexynesis behaviour - data cleanup,
NaN imputation, label encoding, scaling, and torch dataset construction - is
inherited unchanged.

HDF5 files are expected in the layout produced by csv_to_h5.py:
/matrix (n_samples, n_features) float32
/sample_ids (n_samples,) byte strings
/feature_names (n_features,) byte strings

Each modality folder is expected to contain clin.csv (as required by the
stock DataImporter); every other modality may be supplied as either an .h5
file or a .csv file.

Usage:
from flexynesis.h5_dataloader import H5DataImporter
di = H5DataImporter(path='my_dataset', data_types=['gex'])
train_ds, test_ds = di.import_data()
"""
import os

import h5py
import numpy as np
import pandas as pd

from flexynesis.data import DataImporter


class H5DataImporter(DataImporter):
"""
DataImporter subclass that loads modality matrices from HDF5 when
available, and from CSV otherwise.

Only read_data() and validate_data_folders() are overridden; the parent
DataImporter and all downstream Flexynesis components are unchanged.

HDF5 files are expected in the layout written by csv_to_h5.py:
/matrix (n_samples, n_features) float32
/sample_ids (n_samples,) byte strings
/feature_names (n_features,) byte strings
"""

def read_data(self, folder_path):
"""
Override of DataImporter.read_data.

Returns a dict {file_name: DataFrame} in the format Flexynesis
expects. Each modality in self.data_types is loaded from an .h5 file
if present, otherwise from the corresponding .csv. clin.csv is always
loaded from CSV.
"""
print("\n[INFO] ----------------- Reading Data (HDF5) ----------------- ")
data = {}
required_files = {'clin.csv'} | {f"{dt}.csv" for dt in self.data_types}

for file in required_files:
file_name = os.path.splitext(file)[0]

# Modality matrices may be HDF5; clin.csv is always CSV.
if file_name in self.data_types:
h5_path = os.path.join(folder_path, f"{file_name}.h5")
if not os.path.exists(h5_path):
# Fall back to CSV when no HDF5 file is present.
csv_path = os.path.join(folder_path, file)
print(f"[INFO] HDF5 not found at {h5_path}, "
f"falling back to CSV: {csv_path}")
data[file_name] = pd.read_csv(csv_path, index_col=0)
else:
print(f"[INFO] Importing {h5_path} (HDF5)...")
data[file_name] = self._read_h5_as_dataframe(h5_path)
else:
csv_path = os.path.join(folder_path, file)
print(f"[INFO] Importing {csv_path}...")
data[file_name] = pd.read_csv(csv_path, index_col=0)

return data

@staticmethod
def _read_h5_as_dataframe(h5_path):
"""
Read an HDF5 modality file into a DataFrame matching the Flexynesis
CSV convention:
index = feature identifiers (str)
columns = sample identifiers (str)
values = float32

HDF5 stores the matrix samples-as-rows; it is transposed here to
features-as-rows during DataFrame construction.
"""
with h5py.File(h5_path, 'r') as f:
n_samples, n_features = f['matrix'].shape
print(f"[INFO] HDF5 shape: {n_samples:,} samples x "
f"{n_features:,} features (float32)")

arr = f['matrix'][:] # (n_samples, n_features) float32
sample_ids = [s.decode() for s in f['sample_ids'][:]]
feature_names = [g.decode() for g in f['feature_names'][:]]

# Transpose to (n_features, n_samples) to match the CSV convention.
# np.ascontiguousarray forces a real copy in C-order.
arr_t = np.ascontiguousarray(arr.T)
del arr

df = pd.DataFrame(arr_t, index=feature_names, columns=sample_ids)
print(f"[INFO] DataFrame ready: {df.shape} "
f"dtype: {df.dtypes.iloc[0]}")
return df

def validate_data_folders(self, training_path, testing_path):
"""
Override to accept either a .csv or an .h5 file for each modality in
self.data_types. clin.csv must still be present as CSV.
"""
for split_name, path in [("training", training_path),
("testing", testing_path)]:
if not os.path.isdir(path):
raise ValueError(
f"{split_name} folder does not exist: {path}")
missing = []
if not os.path.exists(os.path.join(path, "clin.csv")):
missing.append("clin.csv")
for dt in self.data_types:
h5_ok = os.path.exists(os.path.join(path, f"{dt}.h5"))
csv_ok = os.path.exists(os.path.join(path, f"{dt}.csv"))
if not (h5_ok or csv_ok):
missing.append(f"{dt}.h5 or {dt}.csv")
if missing:
raise ValueError(
f"Missing files in {split_name} folder: "
f"{', '.join(missing)}")
print("[INFO] Validating data folders... OK (HDF5 or CSV accepted)")
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ dependencies = [
"pot",
"geomloss",
"plotnine",
"safetensors"
"safetensors",
"h5py"
]

[project.scripts]
Expand Down
Loading