Skip to content

Commit 5a4252d

Browse files
committed
fix bug in data prep to run on HPC
1 parent 3500dc3 commit 5a4252d

1 file changed

Lines changed: 136 additions & 52 deletions

File tree

workflows/code/build_graph_inputs.R

Lines changed: 136 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -327,19 +327,123 @@ prep_spieceasi_matrix <- function(community_df, metadata_row) {
327327

328328
if (detect_relative_abundance(filtered_df) && isTRUE(scale_to_counts)) {
329329
filtered_df <- round(filtered_df * scale_factor)
330-
prep$message <- sprintf("Relative abundances were scaled to pseudo-counts with factor %s.", format(scale_factor, scientific = FALSE))
330+
prep$message <- str_trim(paste(
331+
prep$message,
332+
sprintf("Relative abundances were scaled to pseudo-counts with factor %s.", format(scale_factor, scientific = FALSE))
333+
))
334+
}
335+
336+
# SpiecEasi needs sample-to-sample variation. Pooled donor strata in this
337+
# workshop dataset often contain repeated identical profiles, so they are
338+
# intentionally skipped rather than recorded as model failures.
339+
variable_taxa <- vapply(filtered_df, function(x) {
340+
x <- as.numeric(x)
341+
length(unique(x[is.finite(x)])) > 1
342+
}, logical(1))
343+
344+
if (sum(variable_taxa) < min_taxa) {
345+
prep$skipped <- TRUE
346+
prep$n_taxa_after_filter <- sum(variable_taxa)
347+
prep$message <- sprintf(
348+
"Skipped: %s variable taxa after prevalence filtering; requires at least %s. This usually indicates repeated identical sample profiles.",
349+
sum(variable_taxa),
350+
min_taxa
351+
)
352+
return(prep)
353+
}
354+
355+
if (sum(variable_taxa) < ncol(filtered_df)) {
356+
filtered_df <- filtered_df |> select(all_of(names(variable_taxa)[variable_taxa]))
357+
prep$message <- str_trim(paste(
358+
prep$message,
359+
sprintf("Removed %s non-variable taxa before SpiecEasi.", sum(!variable_taxa))
360+
))
331361
}
332362

333363
if (any(rowSums(filtered_df, na.rm = TRUE) == 0)) {
334364
filtered_df <- filtered_df + 1
335-
prep$message <- paste(c(prep$message, "Added pseudocount of 1 to avoid zero-sum samples."), collapse = " ")
365+
prep$message <- str_trim(paste(prep$message, "Added pseudocount of 1 to avoid zero-sum samples."))
336366
}
337367

368+
prep$n_taxa_after_filter <- ncol(filtered_df)
338369
prep$matrix <- as.matrix(filtered_df)
339370
prep$prevalence <- prevalence[colnames(filtered_df)]
340371
prep
341372
}
342373

374+
safe_get_spieceasi_matrix <- function(fit, getter_name) {
375+
getter <- get0(getter_name, envir = asNamespace("SpiecEasi"), mode = "function")
376+
if (is.null(getter)) {
377+
return(NULL)
378+
}
379+
tryCatch(
380+
Matrix::as.matrix(getter(fit)),
381+
error = function(err) NULL
382+
)
383+
}
384+
385+
build_spieceasi_edges <- function(fit, taxa_names, prepped, input_file) {
386+
adjacency <- safe_get_spieceasi_matrix(fit, "getRefit")
387+
if (is.null(adjacency) || is.null(dim(adjacency)) || any(dim(adjacency) == 0)) {
388+
return(tibble())
389+
}
390+
391+
adjacency[is.na(adjacency)] <- 0
392+
diag(adjacency) <- 0
393+
edge_positions <- which(upper.tri(adjacency) & adjacency != 0, arr.ind = TRUE)
394+
if (nrow(edge_positions) == 0) {
395+
return(tibble())
396+
}
397+
398+
weight_matrix <- abs(adjacency)
399+
sign_matrix <- matrix(NA_real_, nrow = nrow(adjacency), ncol = ncol(adjacency))
400+
method_label <- paste0("spieceasi_", spieceasi_method)
401+
402+
if (spieceasi_method == "mb") {
403+
beta <- safe_get_spieceasi_matrix(fit, "getOptBeta")
404+
if (!is.null(beta) && all(dim(beta) == dim(adjacency))) {
405+
beta[is.na(beta)] <- 0
406+
weight_matrix <- pmax(abs(beta), abs(t(beta)))
407+
signed_matrix <- beta + t(beta)
408+
sign_matrix <- sign(signed_matrix)
409+
sign_matrix[sign_matrix == 0] <- NA_real_
410+
}
411+
} else if (spieceasi_method == "glasso") {
412+
theta <- safe_get_spieceasi_matrix(fit, "getOptTheta")
413+
if (is.null(theta)) {
414+
theta <- safe_get_spieceasi_matrix(fit, "getOptiCov")
415+
}
416+
if (is.null(theta)) {
417+
theta <- safe_get_spieceasi_matrix(fit, "getOptCov")
418+
}
419+
if (!is.null(theta) && all(dim(theta) == dim(adjacency))) {
420+
diag_theta <- diag(theta)
421+
if (all(is.finite(diag_theta)) && all(diag_theta > 0)) {
422+
inv_sqrt_diag <- diag(1 / sqrt(diag_theta))
423+
partial_corr <- -inv_sqrt_diag %*% theta %*% inv_sqrt_diag
424+
diag(partial_corr) <- 0
425+
weight_matrix <- abs(partial_corr)
426+
sign_matrix <- sign(partial_corr)
427+
}
428+
}
429+
}
430+
431+
tibble(
432+
kingdom = prepped$kingdom,
433+
donor_id = prepped$donor_id,
434+
community_type = prepped$community_type,
435+
taxon_a = taxa_names[edge_positions[, "row"]],
436+
taxon_b = taxa_names[edge_positions[, "col"]],
437+
weight = weight_matrix[edge_positions],
438+
sign = sign_matrix[edge_positions],
439+
method = method_label,
440+
n_samples = prepped$n_samples,
441+
prevalence_a = unname(prepped$prevalence[taxa_names[edge_positions[, "row"]]]),
442+
prevalence_b = unname(prepped$prevalence[taxa_names[edge_positions[, "col"]]])
443+
) |>
444+
arrange(kingdom, donor_id, community_type, taxon_a, taxon_b)
445+
}
446+
343447
run_spieceasi_for_stratum <- function(prepped, input_file) {
344448
summary_row <- tibble(
345449
kingdom = prepped$kingdom,
@@ -373,23 +477,38 @@ run_spieceasi_for_stratum <- function(prepped, input_file) {
373477
result <- tryCatch(
374478
{
375479
set.seed(random_seed)
376-
setTimeLimit(elapsed = spieceasi_time_limit_seconds, transient = TRUE)
377-
on.exit(setTimeLimit(cpu = Inf, elapsed = Inf, transient = FALSE), add = TRUE)
378-
fit <- spiec.easi(
379-
prepped$matrix,
380-
method = spieceasi_method,
381-
nlambda = nlambda,
382-
lambda.min.ratio = lambda_min_ratio,
383-
sel.criterion = spieceasi_sel_criterion,
384-
pulsar.params = list(rep.num = rep_num, ncores = spieceasi_ncores),
385-
verbose = FALSE
480+
if (is.finite(spieceasi_time_limit_seconds)) {
481+
setTimeLimit(elapsed = spieceasi_time_limit_seconds, transient = TRUE)
482+
on.exit(setTimeLimit(cpu = Inf, elapsed = Inf, transient = FALSE), add = TRUE)
483+
}
484+
485+
captured_warnings <- character()
486+
fit <- withCallingHandlers(
487+
spiec.easi(
488+
prepped$matrix,
489+
method = spieceasi_method,
490+
nlambda = nlambda,
491+
lambda.min.ratio = lambda_min_ratio,
492+
sel.criterion = spieceasi_sel_criterion,
493+
pulsar.params = list(rep.num = rep_num, ncores = spieceasi_ncores),
494+
verbose = FALSE
495+
),
496+
warning = function(w) {
497+
captured_warnings <<- c(captured_warnings, conditionMessage(w))
498+
invokeRestart("muffleWarning")
499+
}
386500
)
387-
theta <- Matrix::as.matrix(getOptiCov(fit))
388-
partial_corr <- build_spieceasi_edges(theta, colnames(prepped$matrix), prepped, input_file)
389-
summary_row$n_edges <- nrow(partial_corr)
501+
502+
edges <- build_spieceasi_edges(fit, colnames(prepped$matrix), prepped, input_file)
503+
summary_row$n_edges <- nrow(edges)
390504
summary_row$status <- "success"
391-
summary_row$message <- str_trim(paste("SpiecEasi completed successfully.", prepped$message))
392-
list(edges = partial_corr, summary = summary_row)
505+
warning_text <- if (length(captured_warnings) > 0) {
506+
paste("Warnings:", paste(unique(captured_warnings), collapse = " | "))
507+
} else {
508+
""
509+
}
510+
summary_row$message <- str_trim(paste("SpiecEasi completed successfully.", prepped$message, warning_text))
511+
list(edges = edges, summary = summary_row)
393512
},
394513
error = function(err) {
395514
summary_row$status <- "failed"
@@ -401,41 +520,6 @@ run_spieceasi_for_stratum <- function(prepped, input_file) {
401520
result
402521
}
403522

404-
build_spieceasi_edges <- function(theta, taxa_names, prepped, input_file) {
405-
if (is.null(dim(theta)) || any(dim(theta) == 0)) {
406-
return(tibble())
407-
}
408-
409-
diag_theta <- diag(theta)
410-
if (any(diag_theta <= 0)) {
411-
stop(sprintf("Non-positive diagonal entries encountered in precision matrix for %s", basename(input_file)), call. = FALSE)
412-
}
413-
414-
inv_sqrt_diag <- diag(1 / sqrt(diag_theta))
415-
partial_corr <- -inv_sqrt_diag %*% theta %*% inv_sqrt_diag
416-
diag(partial_corr) <- 0
417-
418-
edge_positions <- which(upper.tri(partial_corr) & partial_corr != 0, arr.ind = TRUE)
419-
if (nrow(edge_positions) == 0) {
420-
return(tibble())
421-
}
422-
423-
tibble(
424-
kingdom = prepped$kingdom,
425-
donor_id = prepped$donor_id,
426-
community_type = prepped$community_type,
427-
taxon_a = taxa_names[edge_positions[, "row"]],
428-
taxon_b = taxa_names[edge_positions[, "col"]],
429-
weight = abs(partial_corr[edge_positions]),
430-
sign = sign(partial_corr[edge_positions]),
431-
method = paste0("spieceasi_", spieceasi_method),
432-
n_samples = prepped$n_samples,
433-
prevalence_a = unname(prepped$prevalence[taxa_names[edge_positions[, "row"]]]),
434-
prevalence_b = unname(prepped$prevalence[taxa_names[edge_positions[, "col"]]])
435-
) |>
436-
arrange(kingdom, donor_id, community_type, taxon_a, taxon_b)
437-
}
438-
439523
build_multirelational_edges <- function(sample_taxon_edges, taxon_taxon_edges) {
440524
sample_edges <- sample_taxon_edges |>
441525
transmute(

0 commit comments

Comments
 (0)