Skip to content

Commit a43b90b

Browse files
authored
Fix(DIANN): Enable annotation to be added to input (#16)
1 parent ce69a87 commit a43b90b

10 files changed

Lines changed: 284 additions & 104 deletions

R/backends.R

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,25 @@ MSstatsPreprocessBigArrow <- function(input_file,
9595
anomalyModelFeatures = c()) {
9696
input <- arrow::open_dataset(input_file, format = "csv")
9797

98+
# dynamically cast any columns Arrow inferred as 'null'
99+
null_columns <- names(input$schema)[vapply(input$schema$fields, function(f) f$type$ToString() == "null", logical(1))]
100+
if (length(null_columns) > 0) {
101+
for (col in null_columns) {
102+
input <- dplyr::mutate(input, !!rlang::sym(col) := as.numeric(!!rlang::sym(col)))
103+
}
104+
}
105+
106+
if ("PrecursorMz" %in% input$schema$names) {
107+
input <- dplyr::mutate(input, PrecursorMz = as.numeric(PrecursorMz))
108+
}
109+
98110
input <- dplyr::mutate(input,
99111
Feature = paste(PeptideSequence, PrecursorCharge,
100112
FragmentIon, ProductCharge, sep = "_"))
101113
feature_counts <- dplyr::group_by(input, ProteinName, Feature)
102114
feature_counts <- dplyr::summarize(feature_counts,
103115
MeanAbundance = mean(Intensity,
104-
na.rm <- TRUE))
116+
na.rm = TRUE))
105117
feature_counts <- dplyr::collect(feature_counts)
106118

107119
feature_counts <- dplyr::mutate(
@@ -112,11 +124,11 @@ MSstatsPreprocessBigArrow <- function(input_file,
112124
feature_rank <= max_feature_count)
113125

114126
feature_counts <- dplyr::select(feature_counts, -MeanAbundance, -feature_rank)
115-
input <- dplyr::inner_join(input, feature_counts,
127+
input <- dplyr::inner_join(input, feature_counts,
116128
by = c("ProteinName", "Feature"))
117129
input <- dplyr::select(input, -Feature)
118130

119-
arrow::write_csv_arrow(input, file = paste0("topN_", output_file_name))
131+
arrow::write_dataset(input, .prefixedPath("topN_", output_file_name), format = "csv")
120132

121133
if (filter_unique_peptides) {
122134
pp_df <- dplyr::select(input, ProteinName, PeptideSequence)
@@ -126,7 +138,6 @@ MSstatsPreprocessBigArrow <- function(input_file,
126138
pp_df <- dplyr::select(pp_df, -NumProteins)
127139
input <- dplyr::anti_join(input, pp_df, by = "PeptideSequence")
128140
}
129-
130141
if (aggregate_psms) {
131142
group_cols <- c("ProteinName", "PeptideSequence", "PrecursorCharge",
132143
"FragmentIon", "ProductCharge", "IsotopeLabelType", "Run",
@@ -158,7 +169,7 @@ MSstatsPreprocessBigArrow <- function(input_file,
158169
input <- dplyr::select(input, -Feature)
159170
}
160171

161-
arrow::write_csv_arrow(input, file = output_file_name)
172+
arrow::write_dataset(input, output_file_name, format = "csv")
162173
input
163174
}
164175

R/clean_DIANN.R

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,55 @@ reduceBigDIANN <- function(input_file, output_path, MBR = TRUE,
1717
global_qvalue_cutoff = 0.01,
1818
qvalue_cutoff = 0.01,
1919
pg_qvalue_cutoff = 0.01,
20-
calculateAnomalyScores=FALSE,
20+
calculateAnomalyScores=FALSE,
2121
anomalyModelFeatures=c(),
2222
annotation = NULL) {
23-
if (grepl("csv", input_file)) {
24-
delim = ","
25-
} else if (grepl("tsv|xls", input_file)) {
26-
delim = "\t"
27-
} else {
28-
delim <- ";"
29-
}
30-
31-
diann_chunk <- function(x, pos) cleanDIANNChunk(x, output_path, MBR,
23+
# Per-chunk callback shared by both the parquet and delimited-text paths.
24+
# `pos` drives .writeChunkToFile: pos == 1 overwrites, pos > 1 appends.
25+
diann_chunk <- function(x, pos) cleanDIANNChunk(x, output_path, MBR,
3226
quantificationColumn, pos,
33-
global_qvalue_cutoff,
34-
qvalue_cutoff,
35-
pg_qvalue_cutoff,
27+
global_qvalue_cutoff,
28+
qvalue_cutoff,
29+
pg_qvalue_cutoff,
3630
calculateAnomalyScores,
3731
anomalyModelFeatures,
3832
annotation)
3933

34+
# Parquet branch (DIANN 2.0+): stream record batches via arrow so the file
35+
# is never fully materialised. read_delim_chunked can't read parquet bytes.
36+
if (tolower(tools::file_ext(input_file)) == "parquet") {
37+
# Lazy handle to the parquet file — no data loaded yet.
38+
ds <- arrow::open_dataset(input_file, format = "parquet")
39+
# Scanner + RecordBatchReader yields one batch at a time on demand.
40+
# batch_size matches the delimited-text path's 1M-row chunks; row-group
41+
# boundaries in the parquet may cap individual batches below this.
42+
scanner <- arrow::Scanner$create(ds, batch_size = 1e6)
43+
reader <- scanner$ToRecordBatchReader()
44+
pos <- 1
45+
repeat {
46+
batch <- reader$read_next_batch()
47+
if (is.null(batch)) break # exhausted
48+
# Materialise just this batch, run it through the shared cleaner.
49+
diann_chunk(as.data.frame(batch), pos)
50+
pos <- pos + 1
51+
}
52+
return(invisible(NULL))
53+
}
54+
55+
# Delimited-text branch (DIANN 1.x TSV/CSV): sniff the delimiter from the
56+
# first line, defaulting to tab when nothing matches.
57+
first_line <- readLines(input_file, n = 1)
58+
if (grepl("\t", first_line)) {
59+
delim <- "\t"
60+
} else if (grepl(",", first_line)) {
61+
delim <- ","
62+
} else if (grepl(";", first_line)) {
63+
delim <- ";"
64+
} else {
65+
delim <- "\t"
66+
}
67+
68+
# Stream the file in 1M-row chunks, invoking diann_chunk for each.
4069
readr::read_delim_chunked(input_file,
4170
readr::DataFrameCallback$new(diann_chunk),
4271
delim = delim,

R/converters.R

Lines changed: 86 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ MSstatsPreprocessBig <- function(input_file,
6262
calculateAnomalyScores,
6363
anomalyModelFeatures)
6464
} else if (backend == "sparklyr") {
65-
MSstatsPreprocessBigSparklyr(connection, input, output_file_name,
65+
MSstatsPreprocessBigSparklyr(connection, input_file, output_file_name,
6666
max_feature_count, filter_unique_peptides,
6767
aggregate_psms, filter_few_obs,
6868
remove_annotation)
@@ -96,10 +96,16 @@ bigFragPipetoMSstatsFormat <- function(input_file, output_file_name,
9696
filter_few_obs = FALSE,
9797
remove_annotation = FALSE,
9898
connection = NULL) {
99-
MSstatsPreprocessBig(input_file, output_file_name,
100-
backend, max_feature_count, filter_unique_peptides,
101-
aggregate_psms, filter_few_obs, remove_annotation,
102-
connection = connection)
99+
MSstatsPreprocessBig(
100+
input_file = input_file,
101+
output_file_name = output_file_name,
102+
backend = backend,
103+
max_feature_count = max_feature_count,
104+
filter_unique_peptides = filter_unique_peptides,
105+
aggregate_psms = aggregate_psms,
106+
filter_few_obs = filter_few_obs,
107+
remove_annotation = remove_annotation,
108+
connection = connection)
103109
}
104110

105111

@@ -140,15 +146,23 @@ bigSpectronauttoMSstatsFormat <- function(input_file, output_file_name,
140146
calculateAnomalyScores=FALSE,
141147
anomalyModelFeatures=c(),
142148
connection = NULL) {
143-
reduceBigSpectronaut(input_file, paste0("reduce_output_", output_file_name),
149+
reduced_file <- .prefixedPath("reduce_output_", output_file_name)
150+
reduceBigSpectronaut(input_file, reduced_file,
144151
intensity, filter_by_excluded, filter_by_identified,
145152
filter_by_qvalue, qvalue_cutoff,
146153
calculateAnomalyScores, anomalyModelFeatures)
147154
msstats_data <- MSstatsPreprocessBig(
148-
paste0("reduce_output_", output_file_name),
149-
output_file_name, backend, max_feature_count,
150-
aggregate_psms, filter_few_obs, remove_annotation, calculateAnomalyScores,
151-
anomalyModelFeatures, connection)
155+
input_file = reduced_file,
156+
output_file_name = output_file_name,
157+
backend = backend,
158+
max_feature_count = max_feature_count,
159+
filter_unique_peptides = filter_unique_peptides,
160+
aggregate_psms = aggregate_psms,
161+
filter_few_obs = filter_few_obs,
162+
remove_annotation = remove_annotation,
163+
calculateAnomalyScores = calculateAnomalyScores,
164+
anomalyModelFeatures = anomalyModelFeatures,
165+
connection = connection)
152166

153167
return(msstats_data)
154168

@@ -184,22 +198,59 @@ bigDIANNtoMSstatsFormat <- function(input_file,
184198
connection = NULL) {
185199

186200
# Reduce and clean the DIANN report file in chunks
187-
reduceBigDIANN(input_file,
188-
paste0("reduce_output_", output_file_name),
201+
reduced_file <- .prefixedPath("reduce_output_", output_file_name)
202+
reduceBigDIANN(input_file,
203+
reduced_file,
189204
MBR,
190205
quantificationColumn,
191-
global_qvalue_cutoff, qvalue_cutoff, pg_qvalue_cutoff,
206+
global_qvalue_cutoff, qvalue_cutoff, pg_qvalue_cutoff,
192207
calculateAnomalyScores, anomalyModelFeatures,
193208
annotation)
194-
209+
210+
reduced <- arrow::open_dataset(reduced_file, format = "csv")
211+
212+
# Identify columns where Arrow inferred 'null' type (all values NA)
213+
null_cols <- names(reduced$schema)[
214+
vapply(reduced$schema$fields, function(f) f$type$ToString() == "null", logical(1))
215+
]
216+
217+
if (length(null_cols) > 0) {
218+
# Drop null-typed columns using a lazy select (no data loaded into memory)
219+
reduced <- dplyr::select(reduced, -dplyr::all_of(null_cols))
220+
221+
# Write back using Arrow's streaming writer — stays out-of-memory.
222+
# write_dataset creates a directory, but open_dataset can read
223+
# directories just as easily as single files.
224+
cleaned_file <- .prefixedPath("cleaned_", output_file_name)
225+
arrow::write_dataset(reduced, cleaned_file, format = "csv")
226+
reduced_file <- cleaned_file
227+
}
228+
195229
# Preprocess the cleaned data (feature selection, etc.)
196230
msstats_data <- MSstatsPreprocessBig(
197-
paste0("reduce_output_", output_file_name),
198-
output_file_name, backend, max_feature_count,
199-
filter_unique_peptides, aggregate_psms, filter_few_obs,
200-
remove_annotation, calculateAnomalyScores,
201-
anomalyModelFeatures, connection)
202-
231+
input_file = reduced_file,
232+
output_file_name = output_file_name,
233+
backend = backend,
234+
max_feature_count = max_feature_count,
235+
filter_unique_peptides = filter_unique_peptides,
236+
aggregate_psms = aggregate_psms,
237+
filter_few_obs = filter_few_obs,
238+
remove_annotation = remove_annotation,
239+
calculateAnomalyScores = calculateAnomalyScores,
240+
anomalyModelFeatures = anomalyModelFeatures,
241+
connection = connection)
242+
243+
# Merge annotation with the preprocessed data and persist the merge so
244+
# callers reopening output_file_name see Condition/BioReplicate. The arrow
245+
# rewrite stays lazy — the underlying source is reduced_file, not
246+
# output_file_name, so we can safely overwrite the directory we just wrote.
247+
if (!is.null(annotation)) {
248+
msstats_data <- MSstatsAddAnnotationBig(msstats_data, annotation)
249+
if (backend == "arrow") {
250+
unlink(output_file_name, recursive = TRUE, force = TRUE)
251+
arrow::write_dataset(msstats_data, output_file_name, format = "csv")
252+
}
253+
}
203254
return(msstats_data)
204255
}
205256

@@ -232,5 +283,19 @@ bigDIANNtoMSstatsFormat <- function(input_file,
232283
#' @return table of `input` and `annotation` merged by Run column.
233284
#'
234285
MSstatsAddAnnotationBig <- function(input, annotation) {
235-
dplyr::inner_join(input, annotation, by = "Run")
286+
join_keys <- "Run"
287+
288+
# Use tbl_vars which works reliably on both Arrow
289+
# datasets, arrow_dplyr_query objects, and data frames
290+
input_cols <- dplyr::tbl_vars(input)
291+
292+
overlap_cols <- setdiff(
293+
intersect(input_cols, colnames(annotation)),
294+
join_keys
295+
)
296+
if (length(overlap_cols) > 0) {
297+
input <- dplyr::select(input, -dplyr::all_of(overlap_cols))
298+
}
299+
300+
dplyr::inner_join(input, annotation, by = join_keys)
236301
}

R/utils.R

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,21 @@
1+
#' Build an intermediate output path by prefixing only the basename.
2+
#'
3+
#' Naive `paste0(prefix, output_file_name)` corrupts paths that contain a
4+
#' directory (`subdir/out.csv` → `topN_subdir/out.csv`,
5+
#' `/tmp/out.csv` → `topN_/tmp/out.csv`). Splitting via dirname/basename keeps
6+
#' the directory component intact so intermediate files land beside the final
7+
#' output.
8+
#'
9+
#' @param prefix Character scalar prepended to the basename.
10+
#' @param path Output file path supplied by the caller.
11+
#' @return Character scalar.
12+
#' @keywords internal
13+
.prefixedPath <- function(prefix, path) {
14+
file.path(dirname(path), paste0(prefix, basename(path)))
15+
}
16+
117
#' Write chunk to file
2-
#'
18+
#'
319
#' @param input Data frame
420
#' @param output_path Path to output file
521
#' @param pos Chunk position

tests/testthat/test-clean_DIANN.R

Lines changed: 0 additions & 39 deletions
This file was deleted.

0 commit comments

Comments
 (0)