From 300b45f75bd90b6812d304d00e603c6160020398 Mon Sep 17 00:00:00 2001 From: Sam Park Date: Fri, 3 Apr 2026 16:23:14 -0400 Subject: [PATCH 01/10] Add distribution aggregation and dual-write for bedset stats New module aggregation.py computes collection-level stats from per-file distributions JSONB (composition, scalars, histograms, KDEs, partitions, chromosome stats). Returns BedSetDistributions. bedsets.py create() now dual-writes: old SQL mean/sd columns AND new bedset_stats JSONB. get_distributions() reads JSONB with fallback to old scalar columns. get_metadata() populates distributions when available. bedfiles.py adds get_batch() for multi-ID retrieval, aggregate_collection() wrapper, and distributions param on get_stats(). Co-Authored-By: Claude Opus 4.6 (1M context) --- bbconf/modules/aggregation.py | 514 ++++++++++++++++++++++++++++++++++ bbconf/modules/bedfiles.py | 78 +++++- bbconf/modules/bedsets.py | 83 ++++++ 3 files changed, 674 insertions(+), 1 deletion(-) create mode 100644 bbconf/modules/aggregation.py diff --git a/bbconf/modules/aggregation.py b/bbconf/modules/aggregation.py new file mode 100644 index 0000000..c4ee803 --- /dev/null +++ b/bbconf/modules/aggregation.py @@ -0,0 +1,514 @@ +"""Collection-level aggregation of per-file genomic distributions. + +Core logic for computing mean + sd across files for every distribution curve +stored in bed_stats.distributions JSONB. Used by both BedAgentBedSet.create() +and BedAgentBedFile.aggregate_collection(). +""" + +import logging +from collections import defaultdict +from typing import Dict, List, Optional + +import numpy as np +from sqlalchemy import select +from sqlalchemy.engine import Engine +from sqlalchemy.orm import Session + +from bbconf.const import PKG_NAME +from bbconf.db_utils import Bed, BedMetadata, BedStats +from bbconf.models.bedset_models import BedSetDistributions + +_LOGGER = logging.getLogger(PKG_NAME) + +# Number of bins for re-binned histograms and interpolated KDEs +_COMMON_GRID_SIZE = 256 +# Number of bins when compressing scalar arrays to histograms +_SCALAR_HIST_BINS = 25 +# Default decimal precision for stored floats +DEFAULT_PRECISION = 3 + + +def round_floats(obj, ndigits: int = DEFAULT_PRECISION): + """Recursively round floats in nested dicts/lists.""" + if isinstance(obj, float): + return round(obj, ndigits) + if isinstance(obj, dict): + return {k: round_floats(v, ndigits) for k, v in obj.items()} + if isinstance(obj, (list, tuple)): + return [round_floats(v, ndigits) for v in obj] + return obj + + +def aggregate_collection( + engine: Engine, + bed_ids: List[str], + precision: int = DEFAULT_PRECISION, +) -> BedSetDistributions: + """Aggregate per-file distributions into collection-level stats. + + :param engine: SQLAlchemy engine + :param bed_ids: list of bed file identifiers + :param precision: decimal places for stored floats (default 3) + :return: BedSetDistributions with aggregated distributions + """ + if not bed_ids: + return BedSetDistributions(n_files=0) + + n = len(bed_ids) + + with Session(engine) as session: + # 1. Fetch full distributions for all matching IDs + dist_stmt = ( + select(BedStats.id, BedStats.distributions) + .where(BedStats.id.in_(bed_ids)) + .where(BedStats.distributions.isnot(None)) + ) + dist_rows = session.execute(dist_stmt).all() + + # 2. Fetch composition metadata + comp_stmt = ( + select( + Bed.id, + Bed.genome_alias, + BedMetadata.assay, + BedMetadata.cell_type, + BedMetadata.tissue, + BedMetadata.target, + ) + .outerjoin(BedMetadata, BedMetadata.id == Bed.id) + .where(Bed.id.in_(bed_ids)) + ) + comp_rows = session.execute(comp_stmt).all() + + # 3. Fallback scalars for old records without distributions + ids_with_dist = {row[0] for row in dist_rows} + ids_without_dist = [bid for bid in bed_ids if bid not in ids_with_dist] + fallback_rows = [] + if ids_without_dist: + fb_stmt = select( + BedStats.id, + BedStats.number_of_regions, + BedStats.mean_region_width, + BedStats.median_tss_dist, + BedStats.gc_content, + ).where(BedStats.id.in_(ids_without_dist)) + fallback_rows = session.execute(fb_stmt).all() + + # Parse distributions from JSONB + distributions_list = [] + for _id, dist in dist_rows: + if dist: + distributions_list.append(dist) + + # Build the stats object + stats = BedSetDistributions( + n_files=n, + composition=_aggregate_composition(comp_rows), + scalar_summaries=_aggregate_scalars(distributions_list, fallback_rows), + tss_histogram=_aggregate_fixed_axis_from_dists( + distributions_list, "tss_distances" + ), + widths_histogram=_aggregate_variable_histogram(distributions_list, "widths"), + neighbor_distances=_aggregate_variable_kde( + distributions_list, "neighbor_distances" + ), + gc_content=_aggregate_variable_kde(distributions_list, "gc_content"), + region_distribution=_aggregate_region_distribution(distributions_list), + partitions=_aggregate_partitions(distributions_list), + chromosome_summaries=_aggregate_chromosome_stats(distributions_list), + ) + + if precision is not None: + stats = BedSetDistributions(**round_floats(stats.model_dump(), precision)) + + return stats + + +# --------------------------------------------------------------------------- +# Private aggregation helpers +# --------------------------------------------------------------------------- + + +def _aggregate_composition(rows: list) -> Optional[dict]: + """Count distinct values per metadata column.""" + if not rows: + return None + + fields = ["genome_alias", "assay", "cell_type", "tissue", "target"] + result = {} + for i, field in enumerate(fields): + counts: Dict[str, int] = defaultdict(int) + for row in rows: + val = row[i + 1] # offset by 1 because row[0] is id + if val: + counts[val] += 1 + if counts: + result[field] = dict(counts) + + return result if result else None + + +def _aggregate_scalars( + distributions_list: List[dict], + fallback_rows: list, +) -> Optional[dict]: + """Extract scalar values from distributions, compute mean + sd + histogram.""" + scalar_keys = [ + "number_of_regions", + "mean_region_width", + "median_tss_dist", + "gc_content", + ] + + per_key: Dict[str, list] = {k: [] for k in scalar_keys} + + # From distributions JSONB + for dist in distributions_list: + scalars = dist.get("scalars", {}) + if scalars: + for k in scalar_keys: + val = scalars.get(k) + if val is not None: + per_key[k].append(float(val)) + + # From fallback rows (old records) + for row in fallback_rows: + for i, k in enumerate(scalar_keys): + val = row[i + 1] + if val is not None: + per_key[k].append(float(val)) + + result = {} + for k in scalar_keys: + vals = per_key[k] + if not vals: + continue + arr = np.array(vals) + hist_counts, hist_edges = np.histogram( + arr, bins=min(_SCALAR_HIST_BINS, len(vals)) + ) + result[k] = { + "mean": float(np.mean(arr)), + "sd": float(np.std(arr, ddof=1)) if len(arr) > 1 else 0.0, + "n": len(vals), + "histogram": { + "counts": hist_counts.tolist(), + "edges": hist_edges.tolist(), + }, + } + + return result if result else None + + +def _aggregate_fixed_axis_from_dists( + distributions_list: List[dict], + key: str, +) -> Optional[dict]: + """Aggregate fixed-axis histograms (e.g. TSS distances) from distributions.""" + arrays = [] + metadata = None + + for dist in distributions_list: + dists = dist.get("distributions", {}) + obj = dists.get(key) + if not obj: + continue + counts = obj.get("counts") + if counts is None: + continue + arrays.append(counts) + if metadata is None: + metadata = { + "x_min": obj.get("x_min"), + "x_max": obj.get("x_max"), + "bins": obj.get("bins"), + } + + if not arrays: + return None + + return _aggregate_fixed_axis(arrays, metadata) + + +def _aggregate_fixed_axis( + arrays: List[list], + metadata: Optional[dict] = None, +) -> Optional[dict]: + """Stack equal-length arrays, compute element-wise mean + sd.""" + if not arrays: + return None + + lengths = [len(a) for a in arrays] + target_len = max(set(lengths), key=lengths.count) + filtered = [np.array(a, dtype=float) for a in arrays if len(a) == target_len] + + if not filtered: + return None + + stacked = np.stack(filtered) + result = { + "mean": np.mean(stacked, axis=0).tolist(), + "sd": ( + np.std(stacked, axis=0, ddof=1).tolist() + if len(filtered) > 1 + else [0.0] * target_len + ), + "n": len(filtered), + } + if metadata: + result.update(metadata) + return result + + +def _aggregate_variable_histogram( + distributions_list: List[dict], + key: str, +) -> Optional[dict]: + """Aggregate variable-range histograms by re-binning to a common grid.""" + histograms = [] + for dist in distributions_list: + dists = dist.get("distributions", {}) + obj = dists.get(key) + if not obj: + continue + counts = obj.get("counts") + x_min = obj.get("x_min") + x_max = obj.get("x_max") + if counts is None or x_min is None or x_max is None: + continue + histograms.append({"counts": counts, "x_min": x_min, "x_max": x_max}) + + if not histograms: + return None + + global_x_min = min(h["x_min"] for h in histograms) + global_x_max = max(h["x_max"] for h in histograms) + + common_edges = np.linspace(global_x_min, global_x_max, _COMMON_GRID_SIZE + 1) + + rebinned = [] + for h in histograms: + counts = np.array(h["counts"], dtype=float) + n_bins = len(counts) + orig_edges = np.linspace(h["x_min"], h["x_max"], n_bins + 1) + + cdf = np.concatenate([[0.0], np.cumsum(counts)]) + total = cdf[-1] + if total == 0: + rebinned.append(np.zeros(_COMMON_GRID_SIZE)) + continue + + cdf_interp = np.interp(common_edges, orig_edges, cdf) + new_counts = np.diff(cdf_interp) + new_counts = ( + new_counts * (total / new_counts.sum()) + if new_counts.sum() > 0 + else new_counts + ) + rebinned.append(new_counts) + + stacked = np.stack(rebinned) + result = { + "x_min": float(global_x_min), + "x_max": float(global_x_max), + "bins": _COMMON_GRID_SIZE, + "mean": np.mean(stacked, axis=0).tolist(), + "sd": ( + np.std(stacked, axis=0, ddof=1).tolist() + if len(rebinned) > 1 + else [0.0] * _COMMON_GRID_SIZE + ), + "n": len(rebinned), + } + return result + + +def _aggregate_variable_kde( + distributions_list: List[dict], + key: str, +) -> Optional[dict]: + """Aggregate variable-range KDE density curves by interpolating to common x-axis.""" + kdes = [] + for dist in distributions_list: + dists = dist.get("distributions", {}) + obj = dists.get(key) + if not obj: + continue + densities = obj.get("densities") + x_min = obj.get("x_min") + x_max = obj.get("x_max") + if densities is None or x_min is None or x_max is None: + continue + kdes.append({"densities": densities, "x_min": x_min, "x_max": x_max}) + + if not kdes: + return None + + global_x_min = min(k["x_min"] for k in kdes) + global_x_max = max(k["x_max"] for k in kdes) + + common_x = np.linspace(global_x_min, global_x_max, _COMMON_GRID_SIZE) + + interpolated = [] + for k in kdes: + dens = np.array(k["densities"], dtype=float) + orig_x = np.linspace(k["x_min"], k["x_max"], len(dens)) + interp_dens = np.interp(common_x, orig_x, dens, left=0.0, right=0.0) + interpolated.append(interp_dens) + + stacked = np.stack(interpolated) + + result = { + "x_min": float(global_x_min), + "x_max": float(global_x_max), + "n_points": _COMMON_GRID_SIZE, + "mean": np.mean(stacked, axis=0).tolist(), + "sd": ( + np.std(stacked, axis=0, ddof=1).tolist() + if len(interpolated) > 1 + else [0.0] * _COMMON_GRID_SIZE + ), + "n": len(interpolated), + } + + # Include per-file summary stat if available (e.g. gc_content mean) + file_means = [] + for dist in distributions_list: + dists = dist.get("distributions", {}) + obj = dists.get(key, {}) + m = obj.get("mean") + if m is not None: + file_means.append(float(m)) + if file_means: + result["file_mean"] = { + "mean": float(np.mean(file_means)), + "sd": float(np.std(file_means, ddof=1)) if len(file_means) > 1 else 0.0, + } + + return result + + +def _aggregate_region_distribution( + distributions_list: List[dict], +) -> Optional[dict]: + """Aggregate per-chromosome region distributions.""" + chrom_data: Dict[str, List[list]] = defaultdict(list) + for dist in distributions_list: + dists = dist.get("distributions", {}) + rd = dists.get("region_distribution") + if not rd: + continue + for chrom, counts in rd.items(): + if isinstance(counts, list): + chrom_data[chrom].append(counts) + + if not chrom_data: + return None + + result = {} + for chrom, arrays in chrom_data.items(): + max_len = max(len(a) for a in arrays) + padded = [] + for a in arrays: + arr = np.array(a, dtype=float) + if len(arr) < max_len: + arr = np.pad(arr, (0, max_len - len(arr))) + padded.append(arr) + + stacked = np.stack(padded) + result[chrom] = { + "mean": np.mean(stacked, axis=0).tolist(), + "sd": ( + np.std(stacked, axis=0, ddof=1).tolist() + if len(padded) > 1 + else [0.0] * max_len + ), + "n": len(padded), + } + + return result if result else None + + +def _aggregate_partitions( + distributions_list: List[dict], +) -> Optional[dict]: + """Aggregate genomic partitions across files as percentages.""" + file_partitions = [] + for dist in distributions_list: + parts = dist.get("partitions") + if not parts: + continue + total = parts.get("total", 0) + counts = parts.get("counts") + if not counts or total <= 0: + continue + pcts = {name: count / total * 100 for name, count in counts} + file_partitions.append(pcts) + + if not file_partitions: + return None + + all_cats = set() + for fp in file_partitions: + all_cats.update(fp.keys()) + + result = {} + for cat in sorted(all_cats): + vals = [fp.get(cat, 0.0) for fp in file_partitions] + arr = np.array(vals) + result[cat] = { + "mean_pct": float(np.mean(arr)), + "sd_pct": float(np.std(arr, ddof=1)) if len(arr) > 1 else 0.0, + "n": len(vals), + } + + return result if result else None + + +def _aggregate_chromosome_stats( + distributions_list: List[dict], +) -> Optional[dict]: + """Aggregate chromosome-level stats across files.""" + chrom_data: Dict[str, List[dict]] = defaultdict(list) + for dist in distributions_list: + chrom_stats = dist.get("chromosome_stats") or dist.get("distributions", {}).get( + "chromosome_stats" + ) + if not chrom_stats: + continue + if isinstance(chrom_stats, dict): + for chrom, entry in chrom_stats.items(): + if isinstance(entry, dict): + chrom_data[chrom].append(entry) + else: + for entry in chrom_stats: + chrom = entry.get("chromosome") + if chrom: + chrom_data[chrom].append(entry) + + if not chrom_data: + return None + + numeric_fields = set() + for entries in chrom_data.values(): + for entry in entries: + for k, v in entry.items(): + if k != "chromosome" and isinstance(v, (int, float)): + numeric_fields.add(k) + break + break + + result = {} + for chrom, entries in sorted(chrom_data.items()): + chrom_result = {"n": len(entries)} + for field in sorted(numeric_fields): + vals = [e.get(field) for e in entries if e.get(field) is not None] + if vals: + arr = np.array(vals, dtype=float) + chrom_result[field] = { + "mean": float(np.mean(arr)), + "sd": float(np.std(arr, ddof=1)) if len(arr) > 1 else 0.0, + } + result[chrom] = chrom_result + + return result if result else None diff --git a/bbconf/modules/bedfiles.py b/bbconf/modules/bedfiles.py index 0d3fc9b..18818a6 100644 --- a/bbconf/modules/bedfiles.py +++ b/bbconf/modules/bedfiles.py @@ -214,12 +214,13 @@ def get(self, identifier: str, full: bool = False) -> BedMetadataAll: ), ) - def get_stats(self, identifier: str) -> BedStatsModel: + def get_stats(self, identifier: str, distributions: bool = True) -> BedStatsModel: """ Get file statistics by identifier. Args: identifier: Bed file identifier. + distributions: include distribution arrays in the result. Returns: Project statistics as BedStats object. @@ -232,8 +233,83 @@ def get_stats(self, identifier: str) -> BedStatsModel: raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.") bed_stats = BedStatsModel(**bed_object.__dict__) + if not distributions: + bed_stats.distributions = None + return bed_stats + def get_batch( + self, + identifiers: list, + full: bool = False, + ) -> "BedListResult": + """ + Get multiple bed file records by identifiers in a single DB round-trip. + + :param identifiers: list of bed file identifiers + :param full: if True, include stats (with distributions) for each record + :return: BedListResult with matching records + """ + from bbconf.models.bed_models import BedListResult, BedMetadataAll + + statement = select(Bed).where(Bed.id.in_(identifiers)) + + with Session(self._sa_engine) as session: + beds = session.scalars(statement) + results = [] + for bed_object in beds: + annotation = StandardMeta( + **( + bed_object.annotations.__dict__ + if bed_object.annotations + else {} + ) + ) + if full and bed_object.stats: + bed_stats = BedStatsModel(**bed_object.stats.__dict__) + else: + bed_stats = None + + results.append( + BedMetadataAll( + id=bed_object.id, + name=bed_object.name, + description=bed_object.description, + submission_date=bed_object.submission_date, + last_update_date=bed_object.last_update_date, + genome_alias=bed_object.genome_alias, + genome_digest=bed_object.genome_digest, + bed_compliance=bed_object.bed_compliance, + data_format=bed_object.data_format, + is_universe=bed_object.is_universe, + license_id=bed_object.license_id or DEFAULT_LICENSE, + processed=bed_object.processed, + annotation=annotation, + stats=bed_stats, + compliant_columns=bed_object.compliant_columns, + non_compliant_columns=bed_object.non_compliant_columns, + ) + ) + + return BedListResult( + count=len(results), + limit=len(identifiers), + offset=0, + results=results, + ) + + def aggregate_collection(self, bed_ids: list) -> "BedSetDistributions": + """Aggregate per-file distributions into collection-level stats. + + Thin wrapper around the standalone aggregate_collection() function. + + :param bed_ids: list of bed file identifiers + :return: BedSetDistributions with aggregated distributions + """ + from bbconf.modules.aggregation import aggregate_collection + + return aggregate_collection(self._sa_engine, bed_ids) + def get_plots(self, identifier: str) -> BedPlots: """ Get file plots by identifier. diff --git a/bbconf/modules/bedsets.py b/bbconf/modules/bedsets.py index cad38dc..f0c49db 100644 --- a/bbconf/modules/bedsets.py +++ b/bbconf/modules/bedsets.py @@ -19,6 +19,7 @@ from bbconf.models.bedset_models import ( BedMetadataBasic, BedSetBedFiles, + BedSetDistributions, BedSetListResult, BedSetMetadata, BedSetPEP, @@ -26,6 +27,7 @@ BedSetStats, FileModel, ) +from bbconf.modules.aggregation import aggregate_collection _LOGGER = logging.getLogger(PKG_NAME) @@ -77,9 +79,18 @@ def get(self, identifier: str, full: bool = False) -> BedSetMetadata: mean=BedStatsModel(**bedset_obj.bedset_means), sd=BedStatsModel(**bedset_obj.bedset_standard_deviation), ).model_dump() + + # Include new distribution stats if available + if bedset_obj.bedset_stats: + distributions = BedSetDistributions( + **bedset_obj.bedset_stats + ).model_dump() + else: + distributions = None else: plots = None stats = None + distributions = None bedset_metadata = BedSetMetadata( id=bedset_obj.id, @@ -87,6 +98,7 @@ def get(self, identifier: str, full: bool = False) -> BedSetMetadata: description=bedset_obj.description, md5sum=bedset_obj.md5sum, statistics=stats, + distributions=distributions, plots=plots, bed_ids=list_of_bedfiles, submission_date=bedset_obj.submission_date, @@ -177,6 +189,35 @@ def get_statistics(self, identifier: str) -> BedSetStats: sd=BedStatsModel(**bedset_object.bedset_standard_deviation), ) + def get_distributions(self, identifier: str) -> BedSetDistributions: + """ + Get distribution statistics for bedset by identifier. + + Returns aggregated distribution data from the JSONB column. + Falls back to wrapping old scalar stats if JSONB is not populated. + + Args: + identifier: Bedset identifier. + + Returns: + BedSetDistributions with aggregated distributions. + """ + statement = select(BedSets).where(BedSets.id == identifier) + with Session(self._db_engine.engine) as session: + bedset_object = session.scalar(statement) + if not bedset_object: + raise BedSetNotFoundError(f"Bedset with id: {identifier} not found.") + if bedset_object.bedset_stats: + return BedSetDistributions(**bedset_object.bedset_stats) + # Fallback: wrap old scalar columns + return BedSetDistributions( + n_files=0, + scalar_summaries=_old_stats_to_scalar_summaries( + bedset_object.bedset_means, + bedset_object.bedset_standard_deviation, + ), + ) + def get_bedset_pep(self, identifier: str) -> dict: """ Create pep file for a bedset. @@ -337,8 +378,17 @@ def create( if statistics: stats = self._calculate_statistics(bedid_list) + # Also compute distribution-level aggregation (for gtars-processed beds) + try: + dist_stats = aggregate_collection(self._db_engine.engine, bedid_list) + except Exception as e: + _LOGGER.warning( + f"Distribution aggregation failed (beds may lack distributions): {e}" + ) + dist_stats = None else: stats = None + dist_stats = None if self.exists(identifier): if no_fail and not overwrite: _LOGGER.warning( @@ -371,6 +421,7 @@ def create( summary=annotation.get("summary"), bedset_means=stats.mean.model_dump() if stats else None, bedset_standard_deviation=stats.sd.model_dump() if stats else None, + bedset_stats=dist_stats.model_dump() if dist_stats else None, md5sum=compute_md5sum_bedset(bedid_list), author=annotation.get("author"), source=annotation.get("source"), @@ -711,3 +762,35 @@ def add_bedfile(self, identifier: str, bedfile: str) -> None: def delete_bedfile(self, identifier: str, bedfile: str) -> None: raise NotImplementedError + + +def _old_stats_to_scalar_summaries( + bedset_means: dict | None, + bedset_sd: dict | None, +) -> dict | None: + """Convert old bedset_means/bedset_standard_deviation to scalar_summaries format. + + Maps the 4 key scalar fields from old-style BedSetStats(mean, sd) to the + new BedSetDistributions.scalar_summaries format. + """ + if not bedset_means: + return None + + scalar_keys = [ + "number_of_regions", + "mean_region_width", + "median_tss_dist", + "gc_content", + ] + result = {} + for k in scalar_keys: + mean_val = bedset_means.get(k) + sd_val = (bedset_sd or {}).get(k) + if mean_val is not None: + result[k] = { + "mean": mean_val, + "sd": sd_val or 0.0, + "n": 0, + } + + return result if result else None From 0152f10f68b5b5dfa83e1ebc9834c252290ff5e6 Mon Sep 17 00:00:00 2001 From: Sam Park Date: Fri, 3 Apr 2026 19:13:07 -0400 Subject: [PATCH 02/10] Fix ruff F821: import BedSetDistributions at module level Co-Authored-By: Claude Opus 4.6 (1M context) --- bbconf/modules/bedfiles.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bbconf/modules/bedfiles.py b/bbconf/modules/bedfiles.py index 18818a6..ce811d5 100644 --- a/bbconf/modules/bedfiles.py +++ b/bbconf/modules/bedfiles.py @@ -67,6 +67,7 @@ UniverseMetadata, VectorMetadata, ) +from bbconf.models.bedset_models import BedSetDistributions _LOGGER = getLogger(PKG_NAME) @@ -298,7 +299,7 @@ def get_batch( results=results, ) - def aggregate_collection(self, bed_ids: list) -> "BedSetDistributions": + def aggregate_collection(self, bed_ids: list) -> BedSetDistributions: """Aggregate per-file distributions into collection-level stats. Thin wrapper around the standalone aggregate_collection() function. From 7fe5c64a45607d660216085f3ad449143b4e77fa Mon Sep 17 00:00:00 2001 From: Sam Park Date: Sun, 5 Apr 2026 14:46:49 -0400 Subject: [PATCH 03/10] Prune bedset aggregation + move to SQL-side computation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drop distributions from bedset aggregation that aren't meaningful at collection level (full blobs stay in per-file storage for single-file views): - widths_histogram: use scalar_summaries.mean_region_width instead - neighbor_distances KDE: use new median_neighbor_distance scalar - gc_content KDE: use scalar_summaries.gc_content mean - chromosome_summaries: redundant with region_distribution - expected_partitions: per-file null hypothesis Schema: - Drop dead tssdist column (no model/writer/reader references) - Add median_neighbor_distance column + model field Aggregation: switch heavy lifting from Python to SQL. With gtars #248's reference-aligned region_distribution bin widths, Postgres can do element-wise aggregation via jsonb_array_elements + GROUP BY: - region_distribution: SQL jsonb_each + unnest per-chrom arrays, GROUP BY (chrom, bin_idx), AVG/STDDEV. Returns only aggregated rows, not raw per-file blobs. - tss_histogram: SQL element-wise SUM across fixed-axis 100-bin arrays. - scalars: AVG/STDDEV on BedStats columns (no JSONB parsing). Plus histogram-of-means computed from the raw scalar values. - partitions: stay in Python (small nested JSONB, already fast). Remove obsolete Python helpers: - _aggregate_variable_histogram (widths) - _aggregate_variable_kde (neighbor_distances, gc_content) - _aggregate_region_distribution (old Python re-bin-and-stack version) - _aggregate_fixed_axis + _aggregate_fixed_axis_from_dists (TSS via JSONB) - _aggregate_chromosome_stats Expected performance impact for 1000-file aggregation: - Wire transfer Postgres→worker: ~40MB → ~150KB - Latency: 1-3s → <500ms - Worker memory: ~40MB held → ~few KB Tests: 52 passed, 5 skipped (same as before changes). Co-Authored-By: Claude Opus 4.6 (1M context) --- bbconf/db_utils.py | 2 +- bbconf/models/bed_models.py | 1 + bbconf/models/bedset_models.py | 21 +- bbconf/modules/aggregation.py | 474 +++++++++++---------------------- bbconf/modules/bedsets.py | 1 + 5 files changed, 180 insertions(+), 319 deletions(-) diff --git a/bbconf/db_utils.py b/bbconf/db_utils.py index 8261154..7dbcd74 100644 --- a/bbconf/db_utils.py +++ b/bbconf/db_utils.py @@ -253,7 +253,7 @@ class BedStats(Base): intron_percentage: Mapped[Optional[float]] intergenic_percentage: Mapped[Optional[float]] promotercore_percentage: Mapped[Optional[float]] - tssdist: Mapped[Optional[float]] + median_neighbor_distance: Mapped[Optional[float]] distributions: Mapped[Optional[dict]] = mapped_column( JSON, diff --git a/bbconf/models/bed_models.py b/bbconf/models/bed_models.py index fec4a11..17541e8 100644 --- a/bbconf/models/bed_models.py +++ b/bbconf/models/bed_models.py @@ -51,6 +51,7 @@ class BedStatsModel(BaseModel): number_of_regions: float | None = None gc_content: float | None = None median_tss_dist: float | None = None + median_neighbor_distance: float | None = None mean_region_width: float | None = None exon_frequency: float | None = None diff --git a/bbconf/models/bedset_models.py b/bbconf/models/bedset_models.py index 45dae37..804ecb6 100644 --- a/bbconf/models/bedset_models.py +++ b/bbconf/models/bedset_models.py @@ -22,18 +22,31 @@ class BedSetDistributions(BaseModel): Stored in the bedset_stats JSONB database column. Populated when member bed files have been processed with the gtars analysis backend. + + Only distributions that are meaningful at collection-level are kept: + - scalar_summaries: mean ± sd + 25-bin histograms of per-file scalar values + - tss_histogram: summed per-bin counts across files (fixed ±100 kb axis) + - region_distribution: per-chrom bin-wise mean ± sd across files + (requires gtars ≥ PR #248 for reference-aligned bin widths) + - partitions: mean ± sd of per-file partition percentages + + Dropped (retained in per-file distributions blob, not aggregated): + - widths_histogram: per-file variable-range bins aren't summable; use + scalar_summaries.mean_region_width histogram instead + - neighbor_distances KDE: per-file within-bedset variance is low; use + scalar_summaries.median_neighbor_distance instead + - gc_content KDE: per-file distribution is unimodal; use + scalar_summaries.gc_content mean instead + - chromosome_summaries: redundant with region_distribution + - expected_partitions: per-file null hypothesis, not collection property """ n_files: int = 0 composition: Optional[dict] = None scalar_summaries: Optional[dict] = None tss_histogram: Optional[dict] = None - widths_histogram: Optional[dict] = None - neighbor_distances: Optional[dict] = None - gc_content: Optional[dict] = None region_distribution: Optional[dict] = None partitions: Optional[dict] = None - chromosome_summaries: Optional[dict] = None class BedSetPlots(BaseModel): diff --git a/bbconf/modules/aggregation.py b/bbconf/modules/aggregation.py index c4ee803..285e69c 100644 --- a/bbconf/modules/aggregation.py +++ b/bbconf/modules/aggregation.py @@ -10,7 +10,7 @@ from typing import Dict, List, Optional import numpy as np -from sqlalchemy import select +from sqlalchemy import select, text from sqlalchemy.engine import Engine from sqlalchemy.orm import Session @@ -20,9 +20,7 @@ _LOGGER = logging.getLogger(PKG_NAME) -# Number of bins for re-binned histograms and interpolated KDEs -_COMMON_GRID_SIZE = 256 -# Number of bins when compressing scalar arrays to histograms +# Number of bins when building histograms of per-file scalar means _SCALAR_HIST_BINS = 25 # Default decimal precision for stored floats DEFAULT_PRECISION = 3 @@ -46,6 +44,11 @@ def aggregate_collection( ) -> BedSetDistributions: """Aggregate per-file distributions into collection-level stats. + Uses SQL aggregation where distributions have stable axes (scalars, + region_distribution with reference-aligned bin widths from gtars ≥#248, + and fixed-axis tss_histogram). Partitions aggregation stays in Python + (small nested JSONB). + :param engine: SQLAlchemy engine :param bed_ids: list of bed file identifiers :param precision: decimal places for stored floats (default 3) @@ -57,16 +60,8 @@ def aggregate_collection( n = len(bed_ids) with Session(engine) as session: - # 1. Fetch full distributions for all matching IDs - dist_stmt = ( - select(BedStats.id, BedStats.distributions) - .where(BedStats.id.in_(bed_ids)) - .where(BedStats.distributions.isnot(None)) - ) - dist_rows = session.execute(dist_stmt).all() - - # 2. Fetch composition metadata - comp_stmt = ( + # 1. Composition metadata (join Bed + BedMetadata) + comp_rows = session.execute( select( Bed.id, Bed.genome_alias, @@ -77,45 +72,40 @@ def aggregate_collection( ) .outerjoin(BedMetadata, BedMetadata.id == Bed.id) .where(Bed.id.in_(bed_ids)) - ) - comp_rows = session.execute(comp_stmt).all() - - # 3. Fallback scalars for old records without distributions - ids_with_dist = {row[0] for row in dist_rows} - ids_without_dist = [bid for bid in bed_ids if bid not in ids_with_dist] - fallback_rows = [] - if ids_without_dist: - fb_stmt = select( - BedStats.id, + ).all() + + # 2. Scalar columns — pulled directly from BedStats (SQL-side agg) + scalar_rows = session.execute( + select( BedStats.number_of_regions, BedStats.mean_region_width, BedStats.median_tss_dist, BedStats.gc_content, - ).where(BedStats.id.in_(ids_without_dist)) - fallback_rows = session.execute(fb_stmt).all() + BedStats.median_neighbor_distance, + ).where(BedStats.id.in_(bed_ids)) + ).all() + + # 3. region_distribution aggregation via SQL JSONB unnest + region_distribution = _sql_aggregate_region_distribution(session, bed_ids) - # Parse distributions from JSONB - distributions_list = [] - for _id, dist in dist_rows: - if dist: - distributions_list.append(dist) + # 4. tss_histogram aggregation via SQL JSONB unnest + tss_histogram = _sql_aggregate_tss_histogram(session, bed_ids) + + # 5. partitions (kept in Python — small nested JSONB) + partitions_rows = session.execute( + select(BedStats.distributions["partitions"]) + .where(BedStats.id.in_(bed_ids)) + .where(BedStats.distributions.isnot(None)) + ).all() - # Build the stats object + # Assemble result stats = BedSetDistributions( n_files=n, composition=_aggregate_composition(comp_rows), - scalar_summaries=_aggregate_scalars(distributions_list, fallback_rows), - tss_histogram=_aggregate_fixed_axis_from_dists( - distributions_list, "tss_distances" - ), - widths_histogram=_aggregate_variable_histogram(distributions_list, "widths"), - neighbor_distances=_aggregate_variable_kde( - distributions_list, "neighbor_distances" - ), - gc_content=_aggregate_variable_kde(distributions_list, "gc_content"), - region_distribution=_aggregate_region_distribution(distributions_list), - partitions=_aggregate_partitions(distributions_list), - chromosome_summaries=_aggregate_chromosome_stats(distributions_list), + scalar_summaries=_aggregate_scalars_from_columns(scalar_rows), + tss_histogram=tss_histogram, + region_distribution=region_distribution, + partitions=_aggregate_partitions_from_rows(partitions_rows), ) if precision is not None: @@ -148,33 +138,24 @@ def _aggregate_composition(rows: list) -> Optional[dict]: return result if result else None -def _aggregate_scalars( - distributions_list: List[dict], - fallback_rows: list, -) -> Optional[dict]: - """Extract scalar values from distributions, compute mean + sd + histogram.""" +def _aggregate_scalars_from_columns(scalar_rows: list) -> Optional[dict]: + """Compute mean/sd/histogram for per-file scalar columns. + + Input: list of tuples (number_of_regions, mean_region_width, + median_tss_dist, gc_content, median_neighbor_distance) from BedStats. + """ scalar_keys = [ "number_of_regions", "mean_region_width", "median_tss_dist", "gc_content", + "median_neighbor_distance", ] per_key: Dict[str, list] = {k: [] for k in scalar_keys} - - # From distributions JSONB - for dist in distributions_list: - scalars = dist.get("scalars", {}) - if scalars: - for k in scalar_keys: - val = scalars.get(k) - if val is not None: - per_key[k].append(float(val)) - - # From fallback rows (old records) - for row in fallback_rows: + for row in scalar_rows: for i, k in enumerate(scalar_keys): - val = row[i + 1] + val = row[i] if val is not None: per_key[k].append(float(val)) @@ -200,233 +181,147 @@ def _aggregate_scalars( return result if result else None -def _aggregate_fixed_axis_from_dists( - distributions_list: List[dict], - key: str, -) -> Optional[dict]: - """Aggregate fixed-axis histograms (e.g. TSS distances) from distributions.""" - arrays = [] - metadata = None - - for dist in distributions_list: - dists = dist.get("distributions", {}) - obj = dists.get(key) - if not obj: - continue - counts = obj.get("counts") - if counts is None: - continue - arrays.append(counts) - if metadata is None: - metadata = { - "x_min": obj.get("x_min"), - "x_max": obj.get("x_max"), - "bins": obj.get("bins"), - } - - if not arrays: - return None - - return _aggregate_fixed_axis(arrays, metadata) - - -def _aggregate_fixed_axis( - arrays: List[list], - metadata: Optional[dict] = None, +def _sql_aggregate_region_distribution( + session: Session, bed_ids: List[str] ) -> Optional[dict]: - """Stack equal-length arrays, compute element-wise mean + sd.""" - if not arrays: - return None - - lengths = [len(a) for a in arrays] - target_len = max(set(lengths), key=lengths.count) - filtered = [np.array(a, dtype=float) for a in arrays if len(a) == target_len] + """Aggregate per-chromosome region_distribution via SQL JSONB unnest. - if not filtered: - return None + Requires that member files used gtars ≥ PR #248 with --chrom-sizes so that + bin widths are reference-aligned across files (same bin_idx → same bp + range on a given chromosome, regardless of file). - stacked = np.stack(filtered) - result = { - "mean": np.mean(stacked, axis=0).tolist(), - "sd": ( - np.std(stacked, axis=0, ddof=1).tolist() - if len(filtered) > 1 - else [0.0] * target_len + Returns {chrom: {mean: [...], sd: [...], n: int}} or None if no data. + """ + sql = text( + """ + WITH per_file AS ( + SELECT distributions->'distributions'->'region_distribution' AS rd + FROM bed_stats + WHERE id = ANY(:bed_ids) + AND distributions IS NOT NULL + AND distributions->'distributions'->'region_distribution' IS NOT NULL ), - "n": len(filtered), - } - if metadata: - result.update(metadata) - return result - - -def _aggregate_variable_histogram( - distributions_list: List[dict], - key: str, -) -> Optional[dict]: - """Aggregate variable-range histograms by re-binning to a common grid.""" - histograms = [] - for dist in distributions_list: - dists = dist.get("distributions", {}) - obj = dists.get(key) - if not obj: - continue - counts = obj.get("counts") - x_min = obj.get("x_min") - x_max = obj.get("x_max") - if counts is None or x_min is None or x_max is None: - continue - histograms.append({"counts": counts, "x_min": x_min, "x_max": x_max}) + unnested AS ( + SELECT + chrom, + ordinality - 1 AS bin_idx, + (val)::float AS count + FROM per_file, + jsonb_each(rd) AS per_chrom(chrom, counts), + jsonb_array_elements_text(counts) WITH ORDINALITY AS t(val, ordinality) + ) + SELECT + chrom, + bin_idx, + AVG(count) AS mean, + COALESCE(STDDEV(count), 0.0) AS sd, + COUNT(*) AS n + FROM unnested + GROUP BY chrom, bin_idx + ORDER BY chrom, bin_idx + """ + ) - if not histograms: + rows = session.execute(sql, {"bed_ids": bed_ids}).all() + if not rows: return None - global_x_min = min(h["x_min"] for h in histograms) - global_x_max = max(h["x_max"] for h in histograms) + # Reshape flat rows → {chrom: {mean: [...], sd: [...], n: int}} + result: Dict[str, dict] = {} + for chrom, bin_idx, mean_val, sd_val, n_val in rows: + if chrom not in result: + result[chrom] = {"mean": [], "sd": [], "n": int(n_val)} + # Ensure arrays are long enough (bin_idx is 0-indexed, ordered by SQL) + while len(result[chrom]["mean"]) <= bin_idx: + result[chrom]["mean"].append(0.0) + result[chrom]["sd"].append(0.0) + result[chrom]["mean"][bin_idx] = float(mean_val) + result[chrom]["sd"][bin_idx] = float(sd_val) - common_edges = np.linspace(global_x_min, global_x_max, _COMMON_GRID_SIZE + 1) + return result if result else None - rebinned = [] - for h in histograms: - counts = np.array(h["counts"], dtype=float) - n_bins = len(counts) - orig_edges = np.linspace(h["x_min"], h["x_max"], n_bins + 1) - cdf = np.concatenate([[0.0], np.cumsum(counts)]) - total = cdf[-1] - if total == 0: - rebinned.append(np.zeros(_COMMON_GRID_SIZE)) - continue +def _sql_aggregate_tss_histogram( + session: Session, bed_ids: List[str] +) -> Optional[dict]: + """Aggregate fixed-axis tss_distances histogram via SQL. - cdf_interp = np.interp(common_edges, orig_edges, cdf) - new_counts = np.diff(cdf_interp) - new_counts = ( - new_counts * (total / new_counts.sum()) - if new_counts.sum() > 0 - else new_counts - ) - rebinned.append(new_counts) + TSS distances use a fixed 100-bin axis (±100 kb), so element-wise summation + across files is valid without re-binning. - stacked = np.stack(rebinned) - result = { - "x_min": float(global_x_min), - "x_max": float(global_x_max), - "bins": _COMMON_GRID_SIZE, - "mean": np.mean(stacked, axis=0).tolist(), - "sd": ( - np.std(stacked, axis=0, ddof=1).tolist() - if len(rebinned) > 1 - else [0.0] * _COMMON_GRID_SIZE + Returns {mean: [...], sd: [...], n: int, x_min, x_max, bins} or None. + """ + sql = text( + """ + WITH per_file AS ( + SELECT + distributions->'distributions'->'tss_distances'->'counts' AS counts, + distributions->'distributions'->'tss_distances'->>'x_min' AS x_min, + distributions->'distributions'->'tss_distances'->>'x_max' AS x_max, + distributions->'distributions'->'tss_distances'->>'bins' AS bins + FROM bed_stats + WHERE id = ANY(:bed_ids) + AND distributions IS NOT NULL + AND distributions->'distributions'->'tss_distances'->'counts' IS NOT NULL ), - "n": len(rebinned), - } - return result - - -def _aggregate_variable_kde( - distributions_list: List[dict], - key: str, -) -> Optional[dict]: - """Aggregate variable-range KDE density curves by interpolating to common x-axis.""" - kdes = [] - for dist in distributions_list: - dists = dist.get("distributions", {}) - obj = dists.get(key) - if not obj: - continue - densities = obj.get("densities") - x_min = obj.get("x_min") - x_max = obj.get("x_max") - if densities is None or x_min is None or x_max is None: - continue - kdes.append({"densities": densities, "x_min": x_min, "x_max": x_max}) + unnested AS ( + SELECT + ordinality - 1 AS bin_idx, + (val)::float AS count, + x_min, x_max, bins + FROM per_file, + jsonb_array_elements_text(counts) WITH ORDINALITY AS t(val, ordinality) + ) + SELECT + bin_idx, + AVG(count) AS mean, + COALESCE(STDDEV(count), 0.0) AS sd, + COUNT(*) AS n, + MAX(x_min) AS x_min, + MAX(x_max) AS x_max, + MAX(bins) AS bins + FROM unnested + GROUP BY bin_idx + ORDER BY bin_idx + """ + ) - if not kdes: + rows = session.execute(sql, {"bed_ids": bed_ids}).all() + if not rows: return None - global_x_min = min(k["x_min"] for k in kdes) - global_x_max = max(k["x_max"] for k in kdes) - - common_x = np.linspace(global_x_min, global_x_max, _COMMON_GRID_SIZE) - - interpolated = [] - for k in kdes: - dens = np.array(k["densities"], dtype=float) - orig_x = np.linspace(k["x_min"], k["x_max"], len(dens)) - interp_dens = np.interp(common_x, orig_x, dens, left=0.0, right=0.0) - interpolated.append(interp_dens) - - stacked = np.stack(interpolated) - + n_bins = len(rows) result = { - "x_min": float(global_x_min), - "x_max": float(global_x_max), - "n_points": _COMMON_GRID_SIZE, - "mean": np.mean(stacked, axis=0).tolist(), - "sd": ( - np.std(stacked, axis=0, ddof=1).tolist() - if len(interpolated) > 1 - else [0.0] * _COMMON_GRID_SIZE - ), - "n": len(interpolated), + "mean": [0.0] * n_bins, + "sd": [0.0] * n_bins, + "n": int(rows[0][3]), } - - # Include per-file summary stat if available (e.g. gc_content mean) - file_means = [] - for dist in distributions_list: - dists = dist.get("distributions", {}) - obj = dists.get(key, {}) - m = obj.get("mean") - if m is not None: - file_means.append(float(m)) - if file_means: - result["file_mean"] = { - "mean": float(np.mean(file_means)), - "sd": float(np.std(file_means, ddof=1)) if len(file_means) > 1 else 0.0, - } + x_min, x_max, bins_str = rows[0][4], rows[0][5], rows[0][6] + if x_min is not None: + try: + result["x_min"] = float(x_min) + result["x_max"] = float(x_max) + result["bins"] = int(bins_str) if bins_str else n_bins + except (ValueError, TypeError): + pass + + for bin_idx, mean_val, sd_val, _n, _xmin, _xmax, _bins in rows: + result["mean"][bin_idx] = float(mean_val) + result["sd"][bin_idx] = float(sd_val) return result -def _aggregate_region_distribution( - distributions_list: List[dict], -) -> Optional[dict]: - """Aggregate per-chromosome region distributions.""" - chrom_data: Dict[str, List[list]] = defaultdict(list) - for dist in distributions_list: - dists = dist.get("distributions", {}) - rd = dists.get("region_distribution") - if not rd: - continue - for chrom, counts in rd.items(): - if isinstance(counts, list): - chrom_data[chrom].append(counts) +def _aggregate_partitions_from_rows(partitions_rows: list) -> Optional[dict]: + """Aggregate partitions from JSONB rows pulled from the DB. - if not chrom_data: + Each row is a single-element tuple containing the partitions sub-dict + (or None). Partitions structure: {counts: [[name, count], ...], total: int}. + """ + dists = [{"partitions": row[0]} for row in partitions_rows if row[0] is not None] + if not dists: return None - - result = {} - for chrom, arrays in chrom_data.items(): - max_len = max(len(a) for a in arrays) - padded = [] - for a in arrays: - arr = np.array(a, dtype=float) - if len(arr) < max_len: - arr = np.pad(arr, (0, max_len - len(arr))) - padded.append(arr) - - stacked = np.stack(padded) - result[chrom] = { - "mean": np.mean(stacked, axis=0).tolist(), - "sd": ( - np.std(stacked, axis=0, ddof=1).tolist() - if len(padded) > 1 - else [0.0] * max_len - ), - "n": len(padded), - } - - return result if result else None + return _aggregate_partitions(dists) def _aggregate_partitions( @@ -463,52 +358,3 @@ def _aggregate_partitions( } return result if result else None - - -def _aggregate_chromosome_stats( - distributions_list: List[dict], -) -> Optional[dict]: - """Aggregate chromosome-level stats across files.""" - chrom_data: Dict[str, List[dict]] = defaultdict(list) - for dist in distributions_list: - chrom_stats = dist.get("chromosome_stats") or dist.get("distributions", {}).get( - "chromosome_stats" - ) - if not chrom_stats: - continue - if isinstance(chrom_stats, dict): - for chrom, entry in chrom_stats.items(): - if isinstance(entry, dict): - chrom_data[chrom].append(entry) - else: - for entry in chrom_stats: - chrom = entry.get("chromosome") - if chrom: - chrom_data[chrom].append(entry) - - if not chrom_data: - return None - - numeric_fields = set() - for entries in chrom_data.values(): - for entry in entries: - for k, v in entry.items(): - if k != "chromosome" and isinstance(v, (int, float)): - numeric_fields.add(k) - break - break - - result = {} - for chrom, entries in sorted(chrom_data.items()): - chrom_result = {"n": len(entries)} - for field in sorted(numeric_fields): - vals = [e.get(field) for e in entries if e.get(field) is not None] - if vals: - arr = np.array(vals, dtype=float) - chrom_result[field] = { - "mean": float(np.mean(arr)), - "sd": float(np.std(arr, ddof=1)) if len(arr) > 1 else 0.0, - } - result[chrom] = chrom_result - - return result if result else None diff --git a/bbconf/modules/bedsets.py b/bbconf/modules/bedsets.py index f0c49db..18df72c 100644 --- a/bbconf/modules/bedsets.py +++ b/bbconf/modules/bedsets.py @@ -781,6 +781,7 @@ def _old_stats_to_scalar_summaries( "mean_region_width", "median_tss_dist", "gc_content", + "median_neighbor_distance", ] result = {} for k in scalar_keys: From 629b9f2dfada4d85da1570cd36cf5348402caf05 Mon Sep 17 00:00:00 2001 From: Sam Park Date: Sun, 5 Apr 2026 15:27:47 -0400 Subject: [PATCH 04/10] Accept 'gtars-py' in analysis.backend Literal Supports bedboss's parallel Python-bindings-direct backend for side-by-side performance comparison against the subprocess-based 'gtars' backend. Both backends coexist during testing; only one will remain after benchmarking. Co-Authored-By: Claude Opus 4.6 (1M context) --- bbconf/config_parser/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bbconf/config_parser/models.py b/bbconf/config_parser/models.py index bf2f39a..aabadfa 100644 --- a/bbconf/config_parser/models.py +++ b/bbconf/config_parser/models.py @@ -133,7 +133,7 @@ class ConfigAnalysis(BaseModel): Controls which statistics engine is used for BED file analysis. """ - backend: Literal["r", "gtars"] = "r" + backend: Literal["r", "gtars", "gtars-py"] = "r" model_config = ConfigDict(extra="forbid") From 9c23652134d968fb027e56d34dcee17d1f202e3b Mon Sep 17 00:00:00 2001 From: Sam Park Date: Mon, 6 Apr 2026 02:40:04 -0400 Subject: [PATCH 05/10] Remove gtars-py from analysis.backend config options MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Backend Literal narrowed to "r" | "gtars" — the gtars-py backend was removed from bedboss after benchmarking showed the pure CLI with .fab binary FASTA matches its performance with simpler architecture. Co-Authored-By: Claude Opus 4.6 (1M context) --- bbconf/config_parser/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bbconf/config_parser/models.py b/bbconf/config_parser/models.py index aabadfa..bf2f39a 100644 --- a/bbconf/config_parser/models.py +++ b/bbconf/config_parser/models.py @@ -133,7 +133,7 @@ class ConfigAnalysis(BaseModel): Controls which statistics engine is used for BED file analysis. """ - backend: Literal["r", "gtars", "gtars-py"] = "r" + backend: Literal["r", "gtars"] = "r" model_config = ConfigDict(extra="forbid") From 35430571b2ae07527f342285c0ba0b95a25db174 Mon Sep 17 00:00:00 2001 From: Sam Park Date: Mon, 6 Apr 2026 15:30:50 -0400 Subject: [PATCH 06/10] Push all collection aggregation to SQL, strip distributions from batch - Rewrite aggregation.py: composition, scalars, histograms, and partitions all computed in SQL (no more per-row Python loops or numpy) - Partition aggregation uses flat percentage columns (works for all beds, both R and gtars backends) - Scalar aggregation uses single query with AVG/STDDEV/MIN/MAX + width_bucket for histograms - get_batch() gains distributions param; batch endpoint excludes distribution blobs by default to avoid large payloads Co-Authored-By: Claude Opus 4.6 (1M context) --- bbconf/modules/aggregation.py | 297 +++++++++++++++++++--------------- bbconf/modules/bedfiles.py | 6 +- 2 files changed, 170 insertions(+), 133 deletions(-) diff --git a/bbconf/modules/aggregation.py b/bbconf/modules/aggregation.py index 285e69c..d03d746 100644 --- a/bbconf/modules/aggregation.py +++ b/bbconf/modules/aggregation.py @@ -1,21 +1,17 @@ """Collection-level aggregation of per-file genomic distributions. -Core logic for computing mean + sd across files for every distribution curve -stored in bed_stats.distributions JSONB. Used by both BedAgentBedSet.create() -and BedAgentBedFile.aggregate_collection(). +All aggregation is pushed to SQL (PostgreSQL). No per-row Python loops. +Used by both BedAgentBedSet.create() and BedAgentBedFile.aggregate_collection(). """ import logging -from collections import defaultdict from typing import Dict, List, Optional -import numpy as np -from sqlalchemy import select, text +from sqlalchemy import text from sqlalchemy.engine import Engine from sqlalchemy.orm import Session from bbconf.const import PKG_NAME -from bbconf.db_utils import Bed, BedMetadata, BedStats from bbconf.models.bedset_models import BedSetDistributions _LOGGER = logging.getLogger(PKG_NAME) @@ -44,10 +40,8 @@ def aggregate_collection( ) -> BedSetDistributions: """Aggregate per-file distributions into collection-level stats. - Uses SQL aggregation where distributions have stable axes (scalars, - region_distribution with reference-aligned bin widths from gtars ≥#248, - and fixed-axis tss_histogram). Partitions aggregation stays in Python - (small nested JSONB). + All aggregation is done in SQL. Python only reshapes query results + into the BedSetDistributions model. :param engine: SQLAlchemy engine :param bed_ids: list of bed file identifiers @@ -60,52 +54,19 @@ def aggregate_collection( n = len(bed_ids) with Session(engine) as session: - # 1. Composition metadata (join Bed + BedMetadata) - comp_rows = session.execute( - select( - Bed.id, - Bed.genome_alias, - BedMetadata.assay, - BedMetadata.cell_type, - BedMetadata.tissue, - BedMetadata.target, - ) - .outerjoin(BedMetadata, BedMetadata.id == Bed.id) - .where(Bed.id.in_(bed_ids)) - ).all() - - # 2. Scalar columns — pulled directly from BedStats (SQL-side agg) - scalar_rows = session.execute( - select( - BedStats.number_of_regions, - BedStats.mean_region_width, - BedStats.median_tss_dist, - BedStats.gc_content, - BedStats.median_neighbor_distance, - ).where(BedStats.id.in_(bed_ids)) - ).all() - - # 3. region_distribution aggregation via SQL JSONB unnest + composition = _sql_aggregate_composition(session, bed_ids) + scalar_summaries = _sql_aggregate_scalars(session, bed_ids) region_distribution = _sql_aggregate_region_distribution(session, bed_ids) - - # 4. tss_histogram aggregation via SQL JSONB unnest tss_histogram = _sql_aggregate_tss_histogram(session, bed_ids) + partitions = _sql_aggregate_partitions(session, bed_ids) - # 5. partitions (kept in Python — small nested JSONB) - partitions_rows = session.execute( - select(BedStats.distributions["partitions"]) - .where(BedStats.id.in_(bed_ids)) - .where(BedStats.distributions.isnot(None)) - ).all() - - # Assemble result stats = BedSetDistributions( n_files=n, - composition=_aggregate_composition(comp_rows), - scalar_summaries=_aggregate_scalars_from_columns(scalar_rows), + composition=composition, + scalar_summaries=scalar_summaries, tss_histogram=tss_histogram, region_distribution=region_distribution, - partitions=_aggregate_partitions_from_rows(partitions_rows), + partitions=partitions, ) if precision is not None: @@ -115,36 +76,55 @@ def aggregate_collection( # --------------------------------------------------------------------------- -# Private aggregation helpers +# SQL aggregation helpers # --------------------------------------------------------------------------- -def _aggregate_composition(rows: list) -> Optional[dict]: - """Count distinct values per metadata column.""" - if not rows: - return None - +def _sql_aggregate_composition( + session: Session, bed_ids: List[str] +) -> Optional[dict]: + """Count distinct values per metadata column via SQL GROUP BY.""" fields = ["genome_alias", "assay", "cell_type", "tissue", "target"] result = {} - for i, field in enumerate(fields): - counts: Dict[str, int] = defaultdict(int) - for row in rows: - val = row[i + 1] # offset by 1 because row[0] is id - if val: - counts[val] += 1 - if counts: - result[field] = dict(counts) + + for field in fields: + if field == "genome_alias": + sql = text( + """ + SELECT genome_alias AS val, COUNT(*) AS cnt + FROM bed + WHERE id = ANY(:bed_ids) AND genome_alias IS NOT NULL + GROUP BY genome_alias + ORDER BY cnt DESC + """ + ) + else: + sql = text( + f""" + SELECT m.{field} AS val, COUNT(*) AS cnt + FROM bed b + JOIN bed_metadata m ON m.id = b.id + WHERE b.id = ANY(:bed_ids) AND m.{field} IS NOT NULL + GROUP BY m.{field} + ORDER BY cnt DESC + """ + ) + rows = session.execute(sql, {"bed_ids": bed_ids}).all() + if rows: + result[field] = {row.val: row.cnt for row in rows} return result if result else None -def _aggregate_scalars_from_columns(scalar_rows: list) -> Optional[dict]: - """Compute mean/sd/histogram for per-file scalar columns. +def _sql_aggregate_scalars( + session: Session, bed_ids: List[str] +) -> Optional[dict]: + """Compute mean, sd, and histogram for scalar columns in SQL. - Input: list of tuples (number_of_regions, mean_region_width, - median_tss_dist, gc_content, median_neighbor_distance) from BedStats. + Uses a single query for mean/sd/min/max/count, then width_bucket + for histogram binning. """ - scalar_keys = [ + scalar_columns = [ "number_of_regions", "mean_region_width", "median_tss_dist", @@ -152,42 +132,101 @@ def _aggregate_scalars_from_columns(scalar_rows: list) -> Optional[dict]: "median_neighbor_distance", ] - per_key: Dict[str, list] = {k: [] for k in scalar_keys} - for row in scalar_rows: - for i, k in enumerate(scalar_keys): - val = row[i] - if val is not None: - per_key[k].append(float(val)) + # 1. Mean, sd, min, max, count in one query + agg_exprs = ", ".join( + f"AVG({col}) AS {col}_mean, " + f"STDDEV({col}) AS {col}_sd, " + f"MIN({col}) AS {col}_min, " + f"MAX({col}) AS {col}_max, " + f"COUNT({col}) AS {col}_n" + for col in scalar_columns + ) + sql = text(f"SELECT {agg_exprs} FROM bed_stats WHERE id = ANY(:bed_ids)") + row = session.execute(sql, {"bed_ids": bed_ids}).one() result = {} - for k in scalar_keys: - vals = per_key[k] - if not vals: + for col in scalar_columns: + n = getattr(row, f"{col}_n") + if not n: continue - arr = np.array(vals) - hist_counts, hist_edges = np.histogram( - arr, bins=min(_SCALAR_HIST_BINS, len(vals)) + mean_val = float(getattr(row, f"{col}_mean")) + sd_val = float(getattr(row, f"{col}_sd") or 0.0) + col_min = float(getattr(row, f"{col}_min")) + col_max = float(getattr(row, f"{col}_max")) + + # 2. Histogram via width_bucket (PostgreSQL) + histogram = _sql_histogram( + session, bed_ids, col, col_min, col_max, n ) - result[k] = { - "mean": float(np.mean(arr)), - "sd": float(np.std(arr, ddof=1)) if len(arr) > 1 else 0.0, - "n": len(vals), - "histogram": { - "counts": hist_counts.tolist(), - "edges": hist_edges.tolist(), - }, + + result[col] = { + "mean": mean_val, + "sd": sd_val, + "n": n, + "histogram": histogram, } return result if result else None +def _sql_histogram( + session: Session, + bed_ids: List[str], + column: str, + col_min: float, + col_max: float, + n: int, +) -> dict: + """Build a histogram for a single scalar column using width_bucket.""" + num_bins = min(_SCALAR_HIST_BINS, n) + + if col_min == col_max: + # All values identical — single bin + return { + "counts": [n], + "edges": [col_min, col_max], + } + + sql = text( + f""" + SELECT + width_bucket({column}, :lo, :hi, :bins) AS bucket, + COUNT(*) AS cnt + FROM bed_stats + WHERE id = ANY(:bed_ids) AND {column} IS NOT NULL + GROUP BY bucket + ORDER BY bucket + """ + ) + rows = session.execute( + sql, + {"bed_ids": bed_ids, "lo": col_min, "hi": col_max, "bins": num_bins}, + ).all() + + # width_bucket returns 1..num_bins (in-range) plus 0 (below) and num_bins+1 (above/equal to hi) + counts = [0] * num_bins + for bucket, cnt in rows: + if bucket == 0: + counts[0] += cnt + elif bucket > num_bins: + counts[-1] += cnt + else: + counts[bucket - 1] += cnt + + # Compute edges + step = (col_max - col_min) / num_bins + edges = [col_min + i * step for i in range(num_bins + 1)] + + return {"counts": counts, "edges": edges} + + def _sql_aggregate_region_distribution( session: Session, bed_ids: List[str] ) -> Optional[dict]: """Aggregate per-chromosome region_distribution via SQL JSONB unnest. - Requires that member files used gtars ≥ PR #248 with --chrom-sizes so that - bin widths are reference-aligned across files (same bin_idx → same bp + Requires that member files used gtars >= PR #248 with --chrom-sizes so that + bin widths are reference-aligned across files (same bin_idx -> same bp range on a given chromosome, regardless of file). Returns {chrom: {mean: [...], sd: [...], n: int}} or None if no data. @@ -226,12 +265,10 @@ def _sql_aggregate_region_distribution( if not rows: return None - # Reshape flat rows → {chrom: {mean: [...], sd: [...], n: int}} result: Dict[str, dict] = {} for chrom, bin_idx, mean_val, sd_val, n_val in rows: if chrom not in result: result[chrom] = {"mean": [], "sd": [], "n": int(n_val)} - # Ensure arrays are long enough (bin_idx is 0-indexed, ordered by SQL) while len(result[chrom]["mean"]) <= bin_idx: result[chrom]["mean"].append(0.0) result[chrom]["sd"].append(0.0) @@ -246,8 +283,8 @@ def _sql_aggregate_tss_histogram( ) -> Optional[dict]: """Aggregate fixed-axis tss_distances histogram via SQL. - TSS distances use a fixed 100-bin axis (±100 kb), so element-wise summation - across files is valid without re-binning. + TSS distances use a fixed 100-bin axis (+/-100 kb), so element-wise + AVG/STDDEV across files is valid without re-binning. Returns {mean: [...], sd: [...], n: int, x_min, x_max, bins} or None. """ @@ -312,49 +349,45 @@ def _sql_aggregate_tss_histogram( return result -def _aggregate_partitions_from_rows(partitions_rows: list) -> Optional[dict]: - """Aggregate partitions from JSONB rows pulled from the DB. +def _sql_aggregate_partitions( + session: Session, bed_ids: List[str] +) -> Optional[dict]: + """Aggregate genomic partitions from flat percentage columns. - Each row is a single-element tuple containing the partitions sub-dict - (or None). Partitions structure: {counts: [[name, count], ...], total: int}. + Uses the pre-computed *_percentage columns on bed_stats, which are + populated by both R and gtars backends for all beds. """ - dists = [{"partitions": row[0]} for row in partitions_rows if row[0] is not None] - if not dists: - return None - return _aggregate_partitions(dists) - - -def _aggregate_partitions( - distributions_list: List[dict], -) -> Optional[dict]: - """Aggregate genomic partitions across files as percentages.""" - file_partitions = [] - for dist in distributions_list: - parts = dist.get("partitions") - if not parts: - continue - total = parts.get("total", 0) - counts = parts.get("counts") - if not counts or total <= 0: - continue - pcts = {name: count / total * 100 for name, count in counts} - file_partitions.append(pcts) + partition_columns = [ + ("exon", "exon_percentage"), + ("intron", "intron_percentage"), + ("intergenic", "intergenic_percentage"), + ("promoterprox", "promoterprox_percentage"), + ("promotercore", "promotercore_percentage"), + ("fiveutr", "fiveutr_percentage"), + ("threeutr", "threeutr_percentage"), + ] - if not file_partitions: - return None + agg_exprs = ", ".join( + f"AVG({col}) * 100 AS {name}_mean, " + f"COALESCE(STDDEV({col}) * 100, 0.0) AS {name}_sd, " + f"COUNT({col}) AS {name}_n" + for name, col in partition_columns + ) + sql = text( + f"SELECT {agg_exprs} FROM bed_stats WHERE id = ANY(:bed_ids)" + ) - all_cats = set() - for fp in file_partitions: - all_cats.update(fp.keys()) + row = session.execute(sql, {"bed_ids": bed_ids}).one() result = {} - for cat in sorted(all_cats): - vals = [fp.get(cat, 0.0) for fp in file_partitions] - arr = np.array(vals) - result[cat] = { - "mean_pct": float(np.mean(arr)), - "sd_pct": float(np.std(arr, ddof=1)) if len(arr) > 1 else 0.0, - "n": len(vals), + for name, _col in partition_columns: + n = getattr(row, f"{name}_n") + if not n: + continue + result[name] = { + "mean_pct": float(getattr(row, f"{name}_mean")), + "sd_pct": float(getattr(row, f"{name}_sd")), + "n": int(n), } return result if result else None diff --git a/bbconf/modules/bedfiles.py b/bbconf/modules/bedfiles.py index ce811d5..6f6d39b 100644 --- a/bbconf/modules/bedfiles.py +++ b/bbconf/modules/bedfiles.py @@ -243,12 +243,14 @@ def get_batch( self, identifiers: list, full: bool = False, + distributions: bool = False, ) -> "BedListResult": """ Get multiple bed file records by identifiers in a single DB round-trip. :param identifiers: list of bed file identifiers - :param full: if True, include stats (with distributions) for each record + :param full: if True, include scalar stats for each record + :param distributions: if True, include distribution arrays in stats :return: BedListResult with matching records """ from bbconf.models.bed_models import BedListResult, BedMetadataAll @@ -268,6 +270,8 @@ def get_batch( ) if full and bed_object.stats: bed_stats = BedStatsModel(**bed_object.stats.__dict__) + if not distributions: + bed_stats.distributions = None else: bed_stats = None From d57b5780ca9155c773568d614e3f24fcc6bdc90f Mon Sep 17 00:00:00 2001 From: Sam Park Date: Mon, 6 Apr 2026 15:33:15 -0400 Subject: [PATCH 07/10] Fix ruff formatting in aggregation.py Co-Authored-By: Claude Opus 4.6 (1M context) --- bbconf/modules/aggregation.py | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/bbconf/modules/aggregation.py b/bbconf/modules/aggregation.py index d03d746..60f1593 100644 --- a/bbconf/modules/aggregation.py +++ b/bbconf/modules/aggregation.py @@ -80,9 +80,7 @@ def aggregate_collection( # --------------------------------------------------------------------------- -def _sql_aggregate_composition( - session: Session, bed_ids: List[str] -) -> Optional[dict]: +def _sql_aggregate_composition(session: Session, bed_ids: List[str]) -> Optional[dict]: """Count distinct values per metadata column via SQL GROUP BY.""" fields = ["genome_alias", "assay", "cell_type", "tissue", "target"] result = {} @@ -116,9 +114,7 @@ def _sql_aggregate_composition( return result if result else None -def _sql_aggregate_scalars( - session: Session, bed_ids: List[str] -) -> Optional[dict]: +def _sql_aggregate_scalars(session: Session, bed_ids: List[str]) -> Optional[dict]: """Compute mean, sd, and histogram for scalar columns in SQL. Uses a single query for mean/sd/min/max/count, then width_bucket @@ -155,9 +151,7 @@ def _sql_aggregate_scalars( col_max = float(getattr(row, f"{col}_max")) # 2. Histogram via width_bucket (PostgreSQL) - histogram = _sql_histogram( - session, bed_ids, col, col_min, col_max, n - ) + histogram = _sql_histogram(session, bed_ids, col, col_min, col_max, n) result[col] = { "mean": mean_val, @@ -349,9 +343,7 @@ def _sql_aggregate_tss_histogram( return result -def _sql_aggregate_partitions( - session: Session, bed_ids: List[str] -) -> Optional[dict]: +def _sql_aggregate_partitions(session: Session, bed_ids: List[str]) -> Optional[dict]: """Aggregate genomic partitions from flat percentage columns. Uses the pre-computed *_percentage columns on bed_stats, which are @@ -373,9 +365,7 @@ def _sql_aggregate_partitions( f"COUNT({col}) AS {name}_n" for name, col in partition_columns ) - sql = text( - f"SELECT {agg_exprs} FROM bed_stats WHERE id = ANY(:bed_ids)" - ) + sql = text(f"SELECT {agg_exprs} FROM bed_stats WHERE id = ANY(:bed_ids)") row = session.execute(sql, {"bed_ids": bed_ids}).one() From c2faacf2a69738c13f946e05264401bbea6987a8 Mon Sep 17 00:00:00 2001 From: Sam Park Date: Mon, 6 Apr 2026 17:34:57 -0400 Subject: [PATCH 08/10] Use JSONB for distributions columns, add BedBatchResult model - Change distributions (bed_stats) and bedset_stats (bedsets) columns from JSON to JSONB for native operator support in aggregation queries - Remove ::jsonb casts from aggregation SQL (no longer needed) - Add BedBatchResult model with BedMetadataAll results so batch endpoint includes stats in serialized response - get_batch returns BedBatchResult instead of BedListResult Co-Authored-By: Claude Opus 4.6 (1M context) --- bbconf/db_utils.py | 6 +++--- bbconf/models/bed_models.py | 7 +++++++ bbconf/modules/bedfiles.py | 6 +++--- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/bbconf/db_utils.py b/bbconf/db_utils.py index 7dbcd74..e456642 100644 --- a/bbconf/db_utils.py +++ b/bbconf/db_utils.py @@ -14,7 +14,7 @@ event, select, ) -from sqlalchemy.dialects.postgresql import ARRAY, JSON +from sqlalchemy.dialects.postgresql import ARRAY, JSON, JSONB from sqlalchemy.engine import URL, Engine, create_engine from sqlalchemy.event import listens_for from sqlalchemy.exc import IntegrityError, ProgrammingError @@ -256,7 +256,7 @@ class BedStats(Base): median_neighbor_distance: Mapped[Optional[float]] distributions: Mapped[Optional[dict]] = mapped_column( - JSON, + JSONB, nullable=True, comment="Full distribution arrays from gtars genomicdist (JSONB)", ) @@ -344,7 +344,7 @@ class BedSets(Base): JSON, comment="Median values of the bedset" ) bedset_stats: Mapped[Optional[dict]] = mapped_column( - JSON, + JSONB, nullable=True, comment="Pre-aggregated distribution statistics from gtars (JSONB)", ) diff --git a/bbconf/models/bed_models.py b/bbconf/models/bed_models.py index 17541e8..3f0f3b2 100644 --- a/bbconf/models/bed_models.py +++ b/bbconf/models/bed_models.py @@ -210,6 +210,13 @@ class BedListResult(BaseModel): results: list[BedMetadataBasic] +class BedBatchResult(BaseModel): + count: int + limit: int + offset: int + results: list[BedMetadataAll] + + class QdrantSearchResult(BaseModel): id: str payload: dict = None diff --git a/bbconf/modules/bedfiles.py b/bbconf/modules/bedfiles.py index 6f6d39b..ee3f9b4 100644 --- a/bbconf/modules/bedfiles.py +++ b/bbconf/modules/bedfiles.py @@ -244,7 +244,7 @@ def get_batch( identifiers: list, full: bool = False, distributions: bool = False, - ) -> "BedListResult": + ) -> "BedBatchResult": """ Get multiple bed file records by identifiers in a single DB round-trip. @@ -253,7 +253,7 @@ def get_batch( :param distributions: if True, include distribution arrays in stats :return: BedListResult with matching records """ - from bbconf.models.bed_models import BedListResult, BedMetadataAll + from bbconf.models.bed_models import BedBatchResult, BedMetadataAll statement = select(Bed).where(Bed.id.in_(identifiers)) @@ -296,7 +296,7 @@ def get_batch( ) ) - return BedListResult( + return BedBatchResult( count=len(results), limit=len(identifiers), offset=0, From ea92a1379b904eba934457702fd9c60dcb2c9a54 Mon Sep 17 00:00:00 2001 From: Sam Park Date: Mon, 6 Apr 2026 17:47:40 -0400 Subject: [PATCH 09/10] Fix ruff: add BedBatchResult to top-level imports Co-Authored-By: Claude Opus 4.6 (1M context) --- bbconf/modules/bedfiles.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bbconf/modules/bedfiles.py b/bbconf/modules/bedfiles.py index ee3f9b4..4542e54 100644 --- a/bbconf/modules/bedfiles.py +++ b/bbconf/modules/bedfiles.py @@ -45,6 +45,7 @@ UniverseNotFoundError, ) from bbconf.models.bed_models import ( + BedBatchResult, BedClassification, BedEmbeddingResult, BedFiles, @@ -253,8 +254,6 @@ def get_batch( :param distributions: if True, include distribution arrays in stats :return: BedListResult with matching records """ - from bbconf.models.bed_models import BedBatchResult, BedMetadataAll - statement = select(Bed).where(Bed.id.in_(identifiers)) with Session(self._sa_engine) as session: From c5a4b6d961b8f1960cb270d6be23307c8d9efef1 Mon Sep 17 00:00:00 2001 From: Sam Park Date: Mon, 6 Apr 2026 21:43:22 -0400 Subject: [PATCH 10/10] Use sqrt(n) binning for scalar histograms in bedset aggregation Match the client-side collection histogram bin count: min(25, max(3, ceil(sqrt(n)))). Previously used min(25, n) which produced too many bins for small collections. Co-Authored-By: Claude Opus 4.6 (1M context) --- bbconf/modules/aggregation.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bbconf/modules/aggregation.py b/bbconf/modules/aggregation.py index 60f1593..b461c90 100644 --- a/bbconf/modules/aggregation.py +++ b/bbconf/modules/aggregation.py @@ -5,6 +5,7 @@ """ import logging +import math from typing import Dict, List, Optional from sqlalchemy import text @@ -172,7 +173,7 @@ def _sql_histogram( n: int, ) -> dict: """Build a histogram for a single scalar column using width_bucket.""" - num_bins = min(_SCALAR_HIST_BINS, n) + num_bins = min(_SCALAR_HIST_BINS, max(3, math.ceil(math.sqrt(n)))) if col_min == col_max: # All values identical — single bin