Skip to content

Commit f812944

Browse files
mwiewiorclaude
andauthored
feat: contig-by-contig VEP annotation with partitioned parquet cache (#47)
* feat: contig-by-contig VEP annotation with partitioned parquet cache Refactor the parquet annotation path so everything is contig-scoped: VCF reading (filter pushdown → tabix seek), variation lookup (per-contig COITree), context loading (per-contig parquet files), and annotation (per-contig PreparedContext). Memory is freed after each contig. The partitioned cache layout (variation/chrN.parquet, transcript/chrN.parquet, etc.) is auto-detected by PartitionedParquetCache::detect() and can be controlled via "partitioned": true/false in options_json. Contig discovery uses zero-cost VCF schema metadata (bio.vcf.contigs) with SQL fallback. The existing monolithic path is completely untouched. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: use TBI-indexed contigs for zero-cost data-bearing contig discovery Prefer bio.vcf.contigs.indexed metadata (TBI-derived, only contigs with actual data) over bio.vcf.contigs (all header contigs). Fall back to SELECT DISTINCT chrom when indexed metadata is unavailable. This eliminates empty contig overhead: for a chr1-only VCF, processes 1 contig instead of 24 (saving ~3s / 11% on 1K variant benchmark). Bumps datafusion-bio-format-vcf to rev 47e7ad3 (PR #136). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: prefer TBI-indexed contigs, fall back to SELECT DISTINCT chrom Skip bio.vcf.contigs (all header contigs) which includes ~195 GRCh38 sequences even for single-chrom VCFs. Prefer bio.vcf.contigs.indexed (data-bearing only), fall back to SELECT DISTINCT chrom. Bumps datafusion-bio-format-vcf to rev e92ff6f. Eliminates ~10s empty-contig overhead on chr1 benchmark (24→1 contig). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: streaming pipelined contig annotation with per-contig memory reclamation Replace MemTable-based batch accumulation with a pull-based ContigAnnotationExec / ContigAnnotationStream state machine that processes one contig at a time and reclaims memory after each. Key changes: - Add ContigAnnotationExec (leaf ExecutionPlan) and ContigAnnotationStream (StartContig → PreparingContig → Draining → Done state machine) - Extract per-contig logic into prepare_and_annotate_contig() async fn - Add MissWorklist::for_chrom() for single-contig worklist without scanning base batches - Add Clone derive to PartitionedParquetCache Verified: 323K chr1 variants, 80 fields --everything, 100% accuracy against VEP 115 golden truth (0 mismatches in 2,997,504 CSQ entries). Timing: 72s (no regression vs previous MemTable baseline). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: true e2e streaming annotation with window-based HGVS hydration Two key changes enable fully streaming contig annotation: 1. VariantLookupExec: buffer matched rows during probe phase, emit only after probe completes. This ensures the colocated sink is fully populated before any downstream consumer sees the first batch. New EmitMatched state yields buffered matches, then EmitUnmatched. 2. ContigAnnotationStream: rich state machine with window-based processing. - PreparingContig: parallel context loading + lookup stream setup - AnnotatingContig: pull lookup batches into windows of 1000, then per-window: hydrate HGVS (cumulative, skip already-hydrated transcripts — same sliding-window pattern as SIFT), rebuild PreparedContext, annotate, yield - DrainingWindow: yield annotated batches one at a time - CleaningUp: deregister ephemeral tables Context loaded via MissWorklist::for_chrom() (no dependency on lookup results for the partitioned path). Verified: 323K chr1 variants, 80 fields --everything, 100% accuracy (0 mismatches in 2,997,504 CSQ entries). 72.3s (no regression). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: fix rustfmt formatting for CI consistency Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: address PR review findings — 7 fixes 1. Chr-prefix normalization in contig intersection (Critical) VCF "chr1" now matches cache "1" and vice versa, matching MissWorklist::expanded_chroms() behavior. 2. Ephemeral table cleanup on error paths (Critical) Three error paths (lookup stream, hydrate_window, annotate_window) now transition to ErrorCleaningUp which deregisters tables before propagating the error. Added make_cleanup_future() helper. 3. Corrected misleading "parallel" docstring (Moderate) Removed false claim about tokio::try_join! parallelism. 4. Pass reference_fasta_path to LookupProvider (Moderate) Was hardcoded None, disabling reference-based allele shifting for colocated variant matching in partitioned path. 5. Named constant ANNOTATION_COLUMN_COUNT replaces magic 2 (Moderate) Documents that output schema appends csq + most_severe_consequence + CACHE_OUTPUT_COLUMNS after VCF fields. 6. Documented miRNA/structural gap in partitioned path (Minor) 7. Removed unnecessary filter() just to read VCF schema (Minor) Skipped #7 from review (auto-detection opt-out semantics) — existing behavior, low risk, not worth changing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: O(n²) Vec::remove(0) → VecDeque::pop_front() in EmitMatched Change matched_batches from Vec to VecDeque so each emit is O(1) instead of O(n) shift. For chr1 WGS with ~10K batches this avoids ~50M element moves. Also documented that matched_batches peaks at full chromosome size (inherent — colocated sink must be complete before annotation). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: eager per-contig memory reclamation — drop BuildSide, sink, context After the lookup stream is exhausted: - Drop the lookup stream (reclaims BuildSide: COITrees, hash indices, concatenated VCF batch — several hundred MB for chr1) - Clear the colocated sink (data already copied to colocated_map) After the last annotation window: - Clear colocated_map, transcripts, exons, translations, regulatory, motifs before entering the async cleanup phase Previously these stayed alive inside ContigAnnotationState until the cleanup future completed, preventing per-contig memory reclamation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix project lifecycle * refactor: remove monolithic annotation path, partitioned-only Remove scan_with_transcript_engine (monolithic single-parquet path) and all supporting helpers (resolve_cache_table_name, generated_cache_table_name, resolve_transcript_context_tables, resolve_optional_context_table). All annotation now goes through the partitioned streaming path (ContigAnnotationExec → ContigAnnotationStream). When no partitioned cache directory is detected, scan() returns a clear error message. Refactored 17 tests to use partitioned cache layout: - Added write_partitioned_cache/write_batch_to_cache/write_batch_to_chrom helpers that write per-chrom parquet files to TempDir - Updated cache_batch() to include both chrom "1" and "2" variation data - Changed tests from register_table("var_cache") pattern to writing partitioned parquet files and passing directory path with {"partitioned":true} in options_json - Updated assertions for partitioned behavior (intergenic_variant when no context tables, vs old sequence_variant placeholder) - Exon/translation batches (no chrom column) use write_batch_to_chrom Net: -1186 lines removed (monolithic path + old test patterns). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1c68844 commit f812944

12 files changed

Lines changed: 2545 additions & 2092 deletions

File tree

Cargo.lock

Lines changed: 76 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmark-infra/terraform/main.tf

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,8 @@ resource "ansible_playbook" "provision" {
184184
# ── Outputs ──────────────────────────────────────────────────────────
185185

186186
output "project_id" {
187-
description = "GCP project ID (protected from destroy)"
188-
value = google_project.benchmark.project_id
187+
description = "GCP project ID (survives destroy)"
188+
value = var.project_id
189189
}
190190

191191
output "instance_ip" {

datafusion/bio-function-vep/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ parquet = { version = "56", features = ["arrow"] }
2929

3030
[dev-dependencies]
3131
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
32-
datafusion-bio-format-vcf = { git = "https://github.com/biodatageeks/datafusion-bio-formats.git", rev = "45562f3beb230c23008b19bfe6c172bd1c5923fa" }
32+
datafusion-bio-format-vcf = { git = "https://github.com/biodatageeks/datafusion-bio-formats.git", rev = "e92ff6fa169d611d67e280551cd3e1a037254093" }
3333
noodles-vcf = { git = "https://github.com/biodatageeks/noodles.git", rev = "9b7b2c5b6531373918302d4c07410e583f1b5b5c" }
3434
env_logger = "0.11"
3535
tempfile = "3"

datafusion/bio-function-vep/examples/annotate_vep_golden_bench.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,65 @@ async fn main() -> Result<()> {
403403
/// Scans context_dir for files matching known patterns and builds the
404404
/// JSON string to pass as the 4th argument to `annotate_vep()`.
405405
fn build_options_json(args: &Args) -> Result<Option<String>> {
406+
// Detect partitioned per-chromosome cache layout.
407+
let is_partitioned = Path::new(&args.cache_source).join("variation").is_dir();
408+
if is_partitioned {
409+
// Partitioned path: AnnotateProvider::scan() handles everything.
410+
let mut entries = Vec::new();
411+
entries.push("\"partitioned\":true".to_string());
412+
entries.push(format!("\"extended_probes\":{}", args.extended_probes));
413+
414+
if args.everything {
415+
entries.push("\"everything\":true".to_string());
416+
let fasta_path = args.reference_fasta_path.as_ref().ok_or_else(|| {
417+
DataFusionError::Execution(
418+
"--everything requires --reference-fasta-path=/path/to/reference.fa[.gz]"
419+
.to_string(),
420+
)
421+
})?;
422+
entries.push(format!(
423+
"\"reference_fasta_path\":\"{}\"",
424+
sql_literal(fasta_path.to_str().ok_or_else(|| {
425+
DataFusionError::Execution(
426+
"reference_fasta_path must be valid UTF-8".to_string(),
427+
)
428+
})?)
429+
));
430+
} else {
431+
if args.hgvs {
432+
entries.push("\"hgvs\":true".to_string());
433+
if let Some(shift_hgvs) = args.shift_hgvs {
434+
entries.push(format!("\"shift_hgvs\":{shift_hgvs}"));
435+
}
436+
let fasta_path = args.reference_fasta_path.as_ref().ok_or_else(|| {
437+
DataFusionError::Execution(
438+
"--hgvs requires --reference-fasta-path=/path/to/reference.fa[.gz]"
439+
.to_string(),
440+
)
441+
})?;
442+
entries.push(format!(
443+
"\"reference_fasta_path\":\"{}\"",
444+
sql_literal(fasta_path.to_str().ok_or_else(|| {
445+
DataFusionError::Execution(
446+
"reference_fasta_path must be valid UTF-8".to_string(),
447+
)
448+
})?)
449+
));
450+
}
451+
entries.push("\"check_existing\":true".to_string());
452+
entries.push("\"af\":true".to_string());
453+
entries.push("\"af_1kg\":true".to_string());
454+
entries.push("\"af_gnomade\":true".to_string());
455+
entries.push("\"af_gnomadg\":true".to_string());
456+
entries.push("\"max_af\":true".to_string());
457+
entries.push("\"pubmed\":true".to_string());
458+
}
459+
if args.merged {
460+
entries.push("\"merged\":true".to_string());
461+
}
462+
return Ok(Some(format!("{{{}}}", entries.join(","))));
463+
}
464+
406465
// Use explicit context_dir, or derive from cache_source parent directory.
407466
let context_dir = args
408467
.context_dir

0 commit comments

Comments
 (0)