From 70f204b1fa8553edb351a04407fa100d5bb6fc4b Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:17:50 -0400 Subject: [PATCH 01/37] add fertilization config --- workflows/fertilization-statewide/config.yml | 24 ++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 workflows/fertilization-statewide/config.yml diff --git a/workflows/fertilization-statewide/config.yml b/workflows/fertilization-statewide/config.yml new file mode 100644 index 0000000000..8a29757a77 --- /dev/null +++ b/workflows/fertilization-statewide/config.yml @@ -0,0 +1,24 @@ +default: + n_parcels: 1000 + n_ensemble: 20 + nh4_no3_ratio: 0.5 + seed: 42 + years: [2016, 2018, 2019, 2020, 2021, 2022, 2023] + matched_dir: /projectnb/dietzelab/ccmmf/management/phenology/matched_landiq_mslsp_v4.1 + crosswalk_path: /projectnb/dietzelab/ccmmf/usr/akash/management/fertilization/CCMMF Fertilization - Crop_types.tsv + output_dir: /projectnb/dietzelab/ccmmf/usr/akash/event_files + output_subdir: fertilization + batch_size: 100 + workers: 1 + +small: + +medium: + n_parcels: 10000 + batch_size: 1000 + workers: 4 + +all: + n_parcels: null # null means all parcels in the matched product + batch_size: 5000 + workers: 8 From 676b47831561b93c0db8ae5208ca390840ede5e6 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:17:50 -0400 Subject: [PATCH 02/37] add fertilization parcel design step --- .../01-build-parcel-design.R | 175 ++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 workflows/fertilization-statewide/01-build-parcel-design.R diff --git a/workflows/fertilization-statewide/01-build-parcel-design.R b/workflows/fertilization-statewide/01-build-parcel-design.R new file mode 100644 index 0000000000..505b166b20 --- /dev/null +++ b/workflows/fertilization-statewide/01-build-parcel-design.R @@ -0,0 +1,175 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/fertilization-statewide/config.yml", + config = Sys.getenv("FERT_PROJECT", "default")) + +set.seed(config[["seed"]]) + +staging_dir <- file.path(config[["output_dir"]], config[["output_subdir"]], "_staging") +dir.create(staging_dir, showWarnings = FALSE, recursive = TRUE) + +options(arrow.unsafe_metadata = TRUE) + +# strip parenthetical annotations, conjunctions, and punctuation so the +# LandIQ, FREP, and UC ANR strings can be matched on a common key. +normalize_name <- function(s) { + s |> tolower() |> + stringr::str_replace_all("\\(.*?\\)", "") |> + stringr::str_replace_all("grouped for remote sensing only", "") |> + stringr::str_replace_all("\\bor\\b", "") |> + stringr::str_replace_all("\\band\\b", "") |> + stringr::str_replace_all("&", "") |> + stringr::str_replace_all("[[:punct:]]", " ") |> + stringr::str_squish() +} + +# split slash separated UC ANR or FREP candidates and reattach the shared +# prefix when a part is missing it. e.g. "Plums, dried / fresh" expands to +# c("Plums, dried", "Plums, fresh"). only candidates that match the known +# crop list are kept. +parse_candidates <- function(s, known) { + if (is.na(s) || nchar(s) == 0) return(character(0)) + parts <- stringr::str_split(s, " ?/ ?")[[1]] |> stringr::str_squish() + prefix <- if (stringr::str_detect(parts[1], ",")) { + stringr::str_extract(parts[1], "^[^,]+, ") + } else "" + result <- character() + for (p in parts) { + if (tolower(p) %in% tolower(known)) { + result <- c(result, p) + } else if (nchar(prefix) > 0) { + combined <- paste0(prefix, p) + if (tolower(combined) %in% tolower(known)) { + result <- c(result, combined) + } + } + } + unique(result) +} + +## load lookups +PEcAn.logger::logger.info("Loading bundled lookups and crosswalk") +crosswalk <- readr::read_tsv(config[["crosswalk_path"]], show_col_types = FALSE) |> + dplyr::select(landiq = "LandIQ Name", frep = "FREP Name", ucanr = "UC ANR") +ca_rates <- PEcAn.data.land::ca_n_application_rate +code_map <- PEcAn.data.land::landiq_crop_mapping_codes + +## crosswalk lookup +# resolve each LandIQ CLASS+SUBCLASS code to an N rate envelope by walking +# the crosswalk to UC ANR or FREP names and matching against the bundled +# rate table. +known_crops <- ca_rates$crop +xw_norm <- crosswalk |> + dplyr::mutate( + key = normalize_name(.data$landiq), + candidates = dplyr::coalesce(.data$ucanr, .data$frep) + ) + +code_lookup <- code_map |> + dplyr::mutate( + code = paste0(.data$CLASS, .data$SUBCLASS), + key = normalize_name(.data$subclass_name) + ) |> + dplyr::left_join(xw_norm |> dplyr::select("key", "candidates"), by = "key") |> + dplyr::rowwise() |> + dplyr::mutate(matched_crops = list(parse_candidates(.data$candidates, known_crops))) |> + dplyr::ungroup() |> + dplyr::filter(lengths(.data$matched_crops) > 0) |> + dplyr::mutate(rates = lapply(.data$matched_crops, function(cc) { + ca_rates |> + dplyr::filter(.data$crop %in% cc) |> + dplyr::summarize( + min_n_lbs_acre = min(.data$min_n_lbs_acre, na.rm = TRUE), + max_n_lbs_acre = max(.data$max_n_lbs_acre, na.rm = TRUE) + ) + })) |> + tidyr::unnest("rates") |> + dplyr::select("code", "min_n_lbs_acre", "max_n_lbs_acre") + +PEcAn.logger::logger.info(sprintf("Resolved %d LandIQ codes via crosswalk", nrow(code_lookup))) + +## load matched product +# parcel_id is stored as a string in the source so cast to int on the way +# in. filter keeps matched cycles with non NA crop class plus a valid EVI +# signal. +read_matched_year <- function(year) { + fn <- file.path(config[["matched_dir"]], + sprintf("assigned_year=%d.parquet", year)) + if (!file.exists(fn)) { + PEcAn.logger::logger.warn("Missing matched file for year ", year, ": ", fn) + return(NULL) + } + arrow::read_parquet(fn) |> + dplyr::filter(.data$assigned_by == "matched", + !is.na(.data$landiq_CLASS), + !is.na(.data$landiq_SUBCLASS), + !is.na(.data$landiq_PFT), + !is.na(.data$mslsp_EVImax), + !is.na(.data$mslsp_EVIamp)) |> + dplyr::transmute( + parcel_id = as.integer(.data$parcel_id), + year = as.integer(.data$year), + season = as.integer(.data$season), + date = as.Date(.data$mslsp_OGI), + code = paste0(.data$landiq_CLASS, .data$landiq_SUBCLASS), + PFT = as.character(.data$landiq_PFT) + ) +} + +PEcAn.logger::logger.info("Reading matched LandIQ MSLSP for years: ", + paste(config[["years"]], collapse = ", ")) +plant <- purrr::map_dfr(config[["years"]], read_matched_year) +PEcAn.logger::logger.info(sprintf("Loaded %d cycles across %d parcels", + nrow(plant), dplyr::n_distinct(plant$parcel_id))) + +## subsample +# parcel set is sampled once and applied to all years so the same parcels +# appear in every year. +n_parcels <- config[["n_parcels"]] +if (!is.null(n_parcels) && n_parcels < dplyr::n_distinct(plant$parcel_id)) { + picked <- plant |> + dplyr::distinct(.data$parcel_id) |> + dplyr::slice_sample(n = n_parcels) |> + dplyr::pull(.data$parcel_id) + plant <- plant |> dplyr::filter(.data$parcel_id %in% picked) + PEcAn.logger::logger.info(sprintf("Sampled %d parcels (n_parcels=%d)", + length(picked), n_parcels)) +} + +## attach rates +# cycles with no resolvable rate are dropped with an audit log. no PFT +# median fallback. +design <- plant |> + dplyr::left_join(code_lookup, by = "code") |> + dplyr::mutate( + rate_source = ifelse(is.na(.data$min_n_lbs_acre), "skip", "crosswalk") + ) + +skipped <- design |> dplyr::filter(.data$rate_source == "skip") +if (nrow(skipped) > 0) { + by_code <- skipped |> + dplyr::count(.data$code, name = "n_events", sort = TRUE) |> + head(15) + PEcAn.logger::logger.warn(sprintf( + "Dropping %d cycles across %d codes with no resolvable N rate. Top offenders:", + nrow(skipped), dplyr::n_distinct(skipped$code) + )) + for (i in seq_len(nrow(by_code))) { + PEcAn.logger::logger.warn(sprintf(" %s: %d cycles", + by_code$code[i], by_code$n_events[i])) + } +} + +design <- design |> + dplyr::filter(.data$rate_source != "skip") |> + dplyr::select("parcel_id", "year", "season", "date", "code", "PFT", + "min_n_lbs_acre", "max_n_lbs_acre") + +PEcAn.logger::logger.info(sprintf("Design table: %d events, %d parcels, %d years", + nrow(design), + dplyr::n_distinct(design$parcel_id), + dplyr::n_distinct(design$year))) + +staging_file <- file.path(staging_dir, "_staging_01_design.rds") +saveRDS(design, staging_file) +PEcAn.logger::logger.info("Wrote ", staging_file) From 7188dd8d046f43f482b04ddc324554e24b9cd666 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:17:50 -0400 Subject: [PATCH 03/37] add n rate sampling step --- .../02-sample-n-rates.R | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 workflows/fertilization-statewide/02-sample-n-rates.R diff --git a/workflows/fertilization-statewide/02-sample-n-rates.R b/workflows/fertilization-statewide/02-sample-n-rates.R new file mode 100644 index 0000000000..33533eb8c6 --- /dev/null +++ b/workflows/fertilization-statewide/02-sample-n-rates.R @@ -0,0 +1,46 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/fertilization-statewide/config.yml", + config = Sys.getenv("FERT_PROJECT", "default")) + +set.seed(config[["seed"]]) + +staging_dir <- file.path(config[["output_dir"]], config[["output_subdir"]], "_staging") +design_file <- file.path(staging_dir, "_staging_01_design.rds") +if (!file.exists(design_file)) { + PEcAn.logger::logger.severe( + "Stage 01 output not found: ", design_file, + ". Run 01-build-parcel-design.R first." + ) +} + +PEcAn.logger::logger.info("Reading design from ", design_file) +design <- readRDS(design_file) + +n_ensemble <- config[["n_ensemble"]] +PEcAn.logger::logger.info(sprintf( + "Sampling %d ensemble members across %d design rows", + n_ensemble, nrow(design) +)) + +# expand each design row to one row per ensemble member and draw the annual +# N rate uniformly from the resolved min/max envelope. +events <- design |> + tidyr::crossing(ensemble_member = seq_len(n_ensemble)) |> + dplyr::mutate( + annual_n_lb_acre = stats::runif( + dplyr::n(), + min = .data$min_n_lbs_acre, + max = .data$max_n_lbs_acre + ), + ens_id = sprintf("fert_ens_%03d", .data$ensemble_member) + ) + +PEcAn.logger::logger.info(sprintf( + "Sampled %d events. annual N range: %.2f to %.2f lb/acre", + nrow(events), min(events$annual_n_lb_acre), max(events$annual_n_lb_acre) +)) + +staging_file <- file.path(staging_dir, "_staging_02_events.rds") +saveRDS(events, staging_file) +PEcAn.logger::logger.info("Wrote ", staging_file) From 6166a4b105b9c54c5163d29840fa69cf2ecab7b3 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:17:50 -0400 Subject: [PATCH 04/37] add fertilization parquet writer --- .../03-write-parquet.R | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 workflows/fertilization-statewide/03-write-parquet.R diff --git a/workflows/fertilization-statewide/03-write-parquet.R b/workflows/fertilization-statewide/03-write-parquet.R new file mode 100644 index 0000000000..09610c9261 --- /dev/null +++ b/workflows/fertilization-statewide/03-write-parquet.R @@ -0,0 +1,98 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/fertilization-statewide/config.yml", + config = Sys.getenv("FERT_PROJECT", "default")) + +# 1 lb/acre = 0.112085 g/m^2 = 0.000112085 kg/m^2. +LBS_ACRE_TO_KG_M2 <- 0.112085 / 1000 + +staging_dir <- file.path(config[["output_dir"]], config[["output_subdir"]], "_staging") +events_file <- file.path(staging_dir, "_staging_02_events.rds") +if (!file.exists(events_file)) { + PEcAn.logger::logger.severe( + "Stage 02 output not found: ", events_file, + ". Run 02-sample-n-rates.R first." + ) +} + +PEcAn.logger::logger.info("Reading events from ", events_file) +events <- readRDS(events_file) + +# synthetic N is split between nh4 and no3 by the configured ratio. org_c +# and org_n stay zero here because organic amendments live in the separate +# ncc product. +nh4_ratio <- config[["nh4_no3_ratio"]] + +out <- events |> + dplyr::mutate( + total_n_kg_m2 = .data$annual_n_lb_acre * .env$LBS_ACRE_TO_KG_M2, + nh4_n_kg_m2 = .data$total_n_kg_m2 * .env$nh4_ratio, + no3_n_kg_m2 = .data$total_n_kg_m2 * (1 - .env$nh4_ratio), + org_c_kg_m2 = 0, + org_n_kg_m2 = 0, + fert_subtype = "synthetic" + ) |> + dplyr::transmute( + parcel_id = as.integer(.data$parcel_id), + ens_id = .data$ens_id, + date = as.Date(.data$date), + .data$nh4_n_kg_m2, + .data$no3_n_kg_m2, + .data$org_c_kg_m2, + .data$org_n_kg_m2, + .data$fert_subtype, + crop_code = .data$code, + PFT = .data$PFT + ) + +out_path <- file.path(config[["output_dir"]], config[["output_subdir"]]) +dir.create(out_path, showWarnings = FALSE, recursive = TRUE) + +## clean prior shards +existing <- list.files(out_path, pattern = "\\.parquet$", full.names = TRUE) +if (length(existing) > 0) { + PEcAn.logger::logger.info(sprintf("Removing %d existing parquet shards", length(existing))) + unlink(existing) +} + +# partition by parcel_id range and write one parquet per batch named +# _.parquet. +all_parcels <- sort(unique(out[["parcel_id"]])) +batch_size <- config[["batch_size"]] +n_batches <- ceiling(length(all_parcels) / batch_size) +batches <- split(all_parcels, ceiling(seq_along(all_parcels) / batch_size)) + +PEcAn.logger::logger.info(sprintf( + "Writing %d rows across %d parcel batches (batch_size=%d) to %s", + nrow(out), n_batches, batch_size, out_path +)) + +# prefer ZSTD; fall back to snappy when zstd is not in the local arrow +# build. +parquet_codec <- if (arrow::codec_is_available("zstd")) "ZSTD" else "SNAPPY" +PEcAn.logger::logger.info("Parquet compression codec: ", parquet_codec) + +write_batch <- function(pids) { + shard <- out |> dplyr::filter(.data$parcel_id %in% pids) + pid_min <- min(shard[["parcel_id"]]) + pid_max <- max(shard[["parcel_id"]]) + fn <- file.path(out_path, sprintf("%d_%d.parquet", pid_min, pid_max)) + arrow::write_parquet(shard, fn, compression = parquet_codec) + fn +} + +workers <- as.integer(config[["workers"]]) +if (workers > 1) { + PEcAn.logger::logger.info(sprintf("Using mclapply with %d workers", workers)) + written <- parallel::mclapply(batches, write_batch, mc.cores = workers) +} else { + written <- lapply(batches, write_batch) +} + +PEcAn.logger::logger.info(sprintf( + "Done. wrote %d shards, %d total rows, parcels=%d, years=%d, ensemble=%d", + length(written), nrow(out), + dplyr::n_distinct(out[["parcel_id"]]), + dplyr::n_distinct(format(out[["date"]], "%Y")), + dplyr::n_distinct(out[["ens_id"]]) +)) From 2f1b4bdedb273c9e360337728932296947eb97e3 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:17:50 -0400 Subject: [PATCH 05/37] add fertilization runner script --- .../fertilization-statewide/run-statewide.sh | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100755 workflows/fertilization-statewide/run-statewide.sh diff --git a/workflows/fertilization-statewide/run-statewide.sh b/workflows/fertilization-statewide/run-statewide.sh new file mode 100755 index 0000000000..b01d2906d9 --- /dev/null +++ b/workflows/fertilization-statewide/run-statewide.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +# +# runs the fertilization statewide pipeline. FERT_PROJECT picks a profile +# from config.yml (default, small, medium, all). + +set -euo pipefail + +# run from PEcAn repo root regardless of caller cwd +cd "$(dirname "$0")/../.." + +FERT_PROJECT="${FERT_PROJECT:-default}" +export FERT_PROJECT + +DIR="workflows/fertilization-statewide" + +Rscript "$DIR/01-build-parcel-design.R" +Rscript "$DIR/02-sample-n-rates.R" +Rscript "$DIR/03-write-parquet.R" From 5f585b5787aede3a1ce2e7dac7e02a4642362bad Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:17:51 -0400 Subject: [PATCH 06/37] add fertilization sanity check --- .../fertilization-statewide/check-result.R | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 workflows/fertilization-statewide/check-result.R diff --git a/workflows/fertilization-statewide/check-result.R b/workflows/fertilization-statewide/check-result.R new file mode 100644 index 0000000000..b8409735d5 --- /dev/null +++ b/workflows/fertilization-statewide/check-result.R @@ -0,0 +1,59 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/fertilization-statewide/config.yml", + config = Sys.getenv("FERT_PROJECT", "default")) + +options(arrow.unsafe_metadata = TRUE) + +out_path <- file.path(config[["output_dir"]], config[["output_subdir"]]) +if (!dir.exists(out_path)) { + PEcAn.logger::logger.severe("Output not found: ", out_path) +} + +shards <- list.files(out_path, pattern = "\\.parquet$", full.names = TRUE) +if (length(shards) == 0) { + PEcAn.logger::logger.severe("No parquet shards under: ", out_path) +} +ds <- arrow::open_dataset(shards) + +# schema +print(ds$schema) + +PEcAn.logger::logger.info(sprintf( + "%d shards: %s ... %s", + length(shards), basename(shards[1]), basename(shards[length(shards)]))) + +tot <- ds |> dplyr::count() |> dplyr::collect() +PEcAn.logger::logger.info(sprintf("total rows: %d", tot[["n"]])) + +# rows per ensemble member +print( + ds |> + dplyr::count(.data$ens_id) |> + dplyr::collect() |> + as.data.frame() +) + +# rows per year +print( + ds |> + dplyr::mutate(year = lubridate::year(.data$date)) |> + dplyr::count(.data$year) |> + dplyr::collect() |> + dplyr::arrange(.data$year) |> + as.data.frame() +) + +# first 5 rows +print(ds |> dplyr::slice_head(n = 5) |> dplyr::collect() |> as.data.frame()) + +# nh4 and no3 total range (kg N/m^2) +stats <- ds |> + dplyr::mutate(total_n = .data$nh4_n_kg_m2 + .data$no3_n_kg_m2) |> + dplyr::summarize( + min_total_n = min(.data$total_n, na.rm = TRUE), + max_total_n = max(.data$total_n, na.rm = TRUE), + mean_total_n = mean(.data$total_n, na.rm = TRUE) + ) |> + dplyr::collect() +print(stats |> as.data.frame()) From b20f9d91bdb6fd36367f7b1c4cc9269ab7f5aed2 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:17:51 -0400 Subject: [PATCH 07/37] add fertilization s3 sync --- workflows/fertilization-statewide/push-to-carb.sh | 5 +++++ 1 file changed, 5 insertions(+) create mode 100755 workflows/fertilization-statewide/push-to-carb.sh diff --git a/workflows/fertilization-statewide/push-to-carb.sh b/workflows/fertilization-statewide/push-to-carb.sh new file mode 100755 index 0000000000..5298e90433 --- /dev/null +++ b/workflows/fertilization-statewide/push-to-carb.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +aws s3 sync --profile ccmmf \ + /projectnb/dietzelab/ccmmf/management/fertilization_event_files/ \ + s3://carb/management/fertilization/v1.0/ From 608af23108761be931f33913d223d18cc7099e8b Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:17:51 -0400 Subject: [PATCH 08/37] add fertilization workflow readme --- workflows/fertilization-statewide/README.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 workflows/fertilization-statewide/README.md diff --git a/workflows/fertilization-statewide/README.md b/workflows/fertilization-statewide/README.md new file mode 100644 index 0000000000..b5c7aed341 --- /dev/null +++ b/workflows/fertilization-statewide/README.md @@ -0,0 +1,15 @@ +# Statewide fertilization events workflow + +Builds an ensemble of synthetic N fertilization events for California ag parcels across 2016 and 2018 to 2023. + +Run from PEcAn project root. Pick a profile (default, small, medium, all) with `FERT_PROJECT`: + +``` +FERT_PROJECT=default bash workflows/fertilization-statewide/run-statewide.sh +``` + +The three scripts run in order. `01-build-parcel-design.R` reads the LandIQ MSLSP matched product for each configured year, walks the Crop_types crosswalk to map each LandIQ CLASS+SUBCLASS code onto a per crop N rate envelope from `PEcAn.data.land::ca_n_application_rate`, and saves the resulting design table. Cycles whose code does not resolve to a rate get logged and dropped. `02-sample-n-rates.R` crosses the design with the ensemble dimension and draws annual N uniformly from each cycle's min/max envelope. `03-write-parquet.R` converts lb/acre to kg/m^2, splits between nh4 and no3 by the configured `nh4_no3_ratio`, partitions parcels into batches, and writes one parquet per batch named `_.parquet`. Compression is ZSTD when available, snappy otherwise. + +Output lands at `/projectnb/dietzelab/ccmmf/usr/akash/event_files/fertilization/`. Intermediate `.rds` files live under `_staging/` next to the output and can be deleted after a successful run. `check-result.R` opens the dataset and prints schema, shard count, rows per ensemble member, rows per year, a sample of rows, and the total N range. `push-to-carb.sh` is the eventual `aws s3 sync` to `s3://carb/management/fertilization/v1.0/`. + +The output columns are `parcel_id` (int), `ens_id` (string `fert_ens_NNN`), `date` (MSLSP `mslsp_OGI`), `nh4_n_kg_m2`, `no3_n_kg_m2`, `org_c_kg_m2` (zero here, compost lives in the ncc product), `org_n_kg_m2` (zero), `fert_subtype` (`"synthetic"`), `crop_code` (LandIQ CLASS+SUBCLASS), `PFT`. The rename to the schema the JSON converter consumes (`site_id`, `event_member_id`, hive partitioned by ensemble) happens in the downstream cleaner under `workflows/preprocess-event-parquet/`. From fadbff9bcfb55b2ccc40edf0c8f2e8849005f0cc Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:18:04 -0400 Subject: [PATCH 09/37] add ncc config --- workflows/ncc-statewide/config.yml | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 workflows/ncc-statewide/config.yml diff --git a/workflows/ncc-statewide/config.yml b/workflows/ncc-statewide/config.yml new file mode 100644 index 0000000000..017bc307bd --- /dev/null +++ b/workflows/ncc-statewide/config.yml @@ -0,0 +1,29 @@ +default: + n_parcels: 1000 + n_ensemble: 20 + seed: 42 + years: [2016, 2018, 2019, 2020, 2021, 2022, 2023] + matched_dir: /projectnb/dietzelab/ccmmf/management/phenology/matched_landiq_mslsp_v4.1 + + # probability of compost amendment per parcel year per ensemble member. + # scenario knob, useful range 0.05 to 0.25. the rest of the compost + # distributions live in PEcAn.data.land bundled data + p_apply_default: 0.10 + + crosswalk_path: /projectnb/dietzelab/ccmmf/usr/akash/management/fertilization/CCMMF Fertilization - Crop_types.tsv + output_dir: /projectnb/dietzelab/ccmmf/usr/akash/event_files + output_subdir: ncc + batch_size: 100 + workers: 1 + +small: + +medium: + n_parcels: 10000 + batch_size: 1000 + workers: 4 + +all: + n_parcels: null + batch_size: 5000 + workers: 8 From 0ce93d1c26af3724b8311e9bdd8a2959a8ac8d80 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:18:04 -0400 Subject: [PATCH 10/37] add ncc parcel design step --- .../ncc-statewide/01-build-parcel-design.R | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 workflows/ncc-statewide/01-build-parcel-design.R diff --git a/workflows/ncc-statewide/01-build-parcel-design.R b/workflows/ncc-statewide/01-build-parcel-design.R new file mode 100644 index 0000000000..c4c0b92381 --- /dev/null +++ b/workflows/ncc-statewide/01-build-parcel-design.R @@ -0,0 +1,96 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/ncc-statewide/config.yml", + config = Sys.getenv("NCC_PROJECT", "default")) + +set.seed(config[["seed"]]) + +staging_dir <- file.path(config[["output_dir"]], config[["output_subdir"]], "_staging") +dir.create(staging_dir, showWarnings = FALSE, recursive = TRUE) + +options(arrow.unsafe_metadata = TRUE) + +# compost application rate is conditioned on PFT family rather than crop +# code so no rate crosswalk is needed at this stage. +read_matched_year <- function(year) { + fn <- file.path(config[["matched_dir"]], + sprintf("assigned_year=%d.parquet", year)) + if (!file.exists(fn)) { + PEcAn.logger::logger.warn("Missing matched file for year ", year, ": ", fn) + return(NULL) + } + arrow::read_parquet(fn) |> + dplyr::filter(.data$assigned_by == "matched", + !is.na(.data$landiq_CLASS), + !is.na(.data$landiq_SUBCLASS), + !is.na(.data$landiq_PFT), + !is.na(.data$mslsp_EVImax), + !is.na(.data$mslsp_EVIamp)) |> + dplyr::transmute( + parcel_id = as.integer(.data$parcel_id), + year = as.integer(.data$year), + season = as.integer(.data$season), + anchor = as.Date(.data$mslsp_OGI), + code = paste0(.data$landiq_CLASS, .data$landiq_SUBCLASS), + PFT = as.character(.data$landiq_PFT) + ) +} + +PEcAn.logger::logger.info("Reading matched LandIQ MSLSP for years: ", + paste(config[["years"]], collapse = ", ")) +plant <- purrr::map_dfr(config[["years"]], read_matched_year) +PEcAn.logger::logger.info(sprintf("Loaded %d cycles across %d parcels", + nrow(plant), dplyr::n_distinct(plant$parcel_id))) + +## subsample +n_parcels <- config[["n_parcels"]] +if (!is.null(n_parcels) && n_parcels < dplyr::n_distinct(plant$parcel_id)) { + picked <- plant |> + dplyr::distinct(.data$parcel_id) |> + dplyr::slice_sample(n = n_parcels) |> + dplyr::pull(.data$parcel_id) + plant <- plant |> dplyr::filter(.data$parcel_id %in% picked) + PEcAn.logger::logger.info(sprintf("Sampled %d parcels (n_parcels=%d)", + length(picked), n_parcels)) +} + +# classify each PFT into annual vs perennial. anything not in this set +# gets dropped with a warning since the rate envelope is per family. +pft_family <- function(pft) { + dplyr::case_when( + pft %in% c("row", "vegetable") ~ "annual", + pft %in% c("woody", "vine") ~ "perennial", + TRUE ~ NA_character_ + ) +} + +design <- plant |> + dplyr::mutate(pft_family = pft_family(.data$PFT)) + +unknown <- design |> dplyr::filter(is.na(.data$pft_family)) +if (nrow(unknown) > 0) { + by_pft <- unknown |> dplyr::count(.data$PFT, sort = TRUE) + PEcAn.logger::logger.warn(sprintf( + "Dropping %d cycles with unknown PFT family. Breakdown:", nrow(unknown))) + for (i in seq_len(nrow(by_pft))) { + PEcAn.logger::logger.warn(sprintf(" PFT=%s: %d cycles", + by_pft$PFT[i], by_pft$n[i])) + } +} + +design <- design |> + dplyr::filter(!is.na(.data$pft_family)) |> + dplyr::select("parcel_id", "year", "season", "anchor", + "code", "PFT", "pft_family") + +PEcAn.logger::logger.info(sprintf("Design table: %d cycles, %d parcels, %d years", + nrow(design), + dplyr::n_distinct(design$parcel_id), + dplyr::n_distinct(design$year))) +PEcAn.logger::logger.info(sprintf("PFT family split: annual=%d, perennial=%d", + sum(design$pft_family == "annual"), + sum(design$pft_family == "perennial"))) + +staging_file <- file.path(staging_dir, "_staging_01_design.rds") +saveRDS(design, staging_file) +PEcAn.logger::logger.info("Wrote ", staging_file) From 4c9fbad28543f73f9bc61ba2e749812743114fc4 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:18:04 -0400 Subject: [PATCH 11/37] add ncc event sampling step --- .../ncc-statewide/02-sample-ncc-events.R | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 workflows/ncc-statewide/02-sample-ncc-events.R diff --git a/workflows/ncc-statewide/02-sample-ncc-events.R b/workflows/ncc-statewide/02-sample-ncc-events.R new file mode 100644 index 0000000000..a39d1f679d --- /dev/null +++ b/workflows/ncc-statewide/02-sample-ncc-events.R @@ -0,0 +1,66 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/ncc-statewide/config.yml", + config = Sys.getenv("NCC_PROJECT", "default")) + +set.seed(config[["seed"]]) + +staging_dir <- file.path(config[["output_dir"]], config[["output_subdir"]], "_staging") +design_file <- file.path(staging_dir, "_staging_01_design.rds") +if (!file.exists(design_file)) { + PEcAn.logger::logger.severe( + "Stage 01 output not found: ", design_file, + ". Run 01-build-parcel-design.R first." + ) +} + +PEcAn.logger::logger.info("Reading design from ", design_file) +design <- readRDS(design_file) + +n_ensemble <- as.integer(config[["n_ensemble"]]) +p_apply <- as.numeric(config[["p_apply_default"]]) + +PEcAn.logger::logger.info(sprintf( + "Sampling %d ensemble members per cycle with p_apply=%.2f", n_ensemble, p_apply)) + +# three nested conditional draws: probability of amendment, then material +# / rate / date / %C / C:N given amendment. only rows where the bernoulli +# fires get emitted. all per family lookups live in PEcAn.data.land bundled +# tables so the science ships with package +events <- design |> + tidyr::crossing(ensemble_member = seq_len(n_ensemble)) |> + dplyr::mutate( + applied = stats::rbinom(dplyr::n(), 1, p_apply) == 1L + ) |> + dplyr::filter(.data$applied) + +n_total <- nrow(design) * n_ensemble +PEcAn.logger::logger.info(sprintf( + "Probability draw fired %d of %d possible events (%.1f%%)", + nrow(events), n_total, 100 * nrow(events) / n_total)) + +if (nrow(events) == 0) { + PEcAn.logger::logger.severe("No events fired. Check p_apply_default.") +} + +events <- events |> + dplyr::mutate( + date_offset_days = PEcAn.data.land::sample_ca_compost_date_offset(.data$pft_family), + date = .data$anchor - .data$date_offset_days, + material = PEcAn.data.land::sample_ca_compost_material(.data$pft_family), + app_rate_t_ac = PEcAn.data.land::sample_ca_compost_app_rate(.data$pft_family), + pct_c = PEcAn.data.land::sample_ca_compost_pct_c(dplyr::n()), + cn_ratio = PEcAn.data.land::sample_ca_compost_cn(dplyr::n()), + ens_id = sprintf("ncc_ens_%03d", .data$ensemble_member) + ) + +PEcAn.logger::logger.info(sprintf( + "Sampled %d NCC events. app_rate %.2f-%.2f t/ac, pct_c %.1f-%.1f, cn %.1f-%.1f", + nrow(events), + min(events[["app_rate_t_ac"]]), max(events[["app_rate_t_ac"]]), + min(events[["pct_c"]]), max(events[["pct_c"]]), + min(events[["cn_ratio"]]), max(events[["cn_ratio"]]))) + +staging_file <- file.path(staging_dir, "_staging_02_events.rds") +saveRDS(events, staging_file) +PEcAn.logger::logger.info("Wrote ", staging_file) From 418410b31bf05cfd44aff0b4f2c06e5a450d626e Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:18:04 -0400 Subject: [PATCH 12/37] add ncc parquet writer --- workflows/ncc-statewide/03-write-parquet.R | 95 ++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 workflows/ncc-statewide/03-write-parquet.R diff --git a/workflows/ncc-statewide/03-write-parquet.R b/workflows/ncc-statewide/03-write-parquet.R new file mode 100644 index 0000000000..a8c9c56ca7 --- /dev/null +++ b/workflows/ncc-statewide/03-write-parquet.R @@ -0,0 +1,95 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/ncc-statewide/config.yml", + config = Sys.getenv("NCC_PROJECT", "default")) + +# 1 t/acre = 224.1702 g/m^2 = 0.2241702 kg/m^2. +T_ACRE_TO_KG_M2 <- 0.2241702 + +staging_dir <- file.path(config[["output_dir"]], config[["output_subdir"]], "_staging") +events_file <- file.path(staging_dir, "_staging_02_events.rds") +if (!file.exists(events_file)) { + PEcAn.logger::logger.severe( + "Stage 02 output not found: ", events_file, + ". Run 02-sample-ncc-events.R first." + ) +} + +PEcAn.logger::logger.info("Reading events from ", events_file) +events <- readRDS(events_file) + +# org_c = app_rate * (pct_c / 100); org_n = org_c / cn. nh4 and no3 stay +# zero here because compost releases organic N that mineralizes through +# the soil pool in SIPNET rather than as direct mineral N. +# TODO: in v2, sample %C and C:N jointly since they covary through %N. +out <- events |> + dplyr::mutate( + org_c_kg_m2 = .data$app_rate_t_ac * .env$T_ACRE_TO_KG_M2 * (.data$pct_c / 100), + org_n_kg_m2 = .data$org_c_kg_m2 / .data$cn_ratio, + nh4_n_kg_m2 = 0, + no3_n_kg_m2 = 0, + ncc_subtype = "compost" + ) |> + dplyr::transmute( + parcel_id = as.integer(.data$parcel_id), + ens_id = .data$ens_id, + date = as.Date(.data$date), + .data$material, + .data$org_c_kg_m2, + .data$org_n_kg_m2, + .data$nh4_n_kg_m2, + .data$no3_n_kg_m2, + .data$ncc_subtype, + crop_code = .data$code, + PFT = .data$PFT + ) + +out_path <- file.path(config[["output_dir"]], config[["output_subdir"]]) +dir.create(out_path, showWarnings = FALSE, recursive = TRUE) + +## clean prior shards +existing <- list.files(out_path, pattern = "\\.parquet$", full.names = TRUE) +if (length(existing) > 0) { + PEcAn.logger::logger.info(sprintf("Removing %d existing parquet shards", length(existing))) + unlink(existing) +} + +# partition by parcel_id range and write one parquet per batch named +# _.parquet. +all_parcels <- sort(unique(out[["parcel_id"]])) +batch_size <- as.integer(config[["batch_size"]]) +n_batches <- ceiling(length(all_parcels) / batch_size) +batches <- split(all_parcels, ceiling(seq_along(all_parcels) / batch_size)) + +PEcAn.logger::logger.info(sprintf( + "Writing %d rows across %d parcel batches (batch_size=%d) to %s", + nrow(out), n_batches, batch_size, out_path)) + +# prefer ZSTD; fall back to snappy when zstd is not in the local arrow +# build. +parquet_codec <- if (arrow::codec_is_available("zstd")) "ZSTD" else "SNAPPY" +PEcAn.logger::logger.info("Parquet compression codec: ", parquet_codec) + +write_batch <- function(pids) { + shard <- out |> dplyr::filter(.data$parcel_id %in% pids) + pid_min <- min(shard[["parcel_id"]]) + pid_max <- max(shard[["parcel_id"]]) + fn <- file.path(out_path, sprintf("%d_%d.parquet", pid_min, pid_max)) + arrow::write_parquet(shard, fn, compression = parquet_codec) + fn +} + +workers <- as.integer(config[["workers"]]) +if (workers > 1) { + PEcAn.logger::logger.info(sprintf("Using mclapply with %d workers", workers)) + written <- parallel::mclapply(batches, write_batch, mc.cores = workers) +} else { + written <- lapply(batches, write_batch) +} + +PEcAn.logger::logger.info(sprintf( + "Done. wrote %d shards, %d total rows, parcels=%d, materials=%d, ensemble=%d", + length(written), nrow(out), + dplyr::n_distinct(out[["parcel_id"]]), + dplyr::n_distinct(out[["material"]]), + dplyr::n_distinct(out[["ens_id"]]))) From 659c53293f80bb3cc0af0c99d33b685a1f0821bf Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:18:05 -0400 Subject: [PATCH 13/37] add ncc runner script --- workflows/ncc-statewide/run-statewide.sh | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100755 workflows/ncc-statewide/run-statewide.sh diff --git a/workflows/ncc-statewide/run-statewide.sh b/workflows/ncc-statewide/run-statewide.sh new file mode 100755 index 0000000000..6e9f16d652 --- /dev/null +++ b/workflows/ncc-statewide/run-statewide.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +# +# runs the ncc (compost) statewide pipeline. NCC_PROJECT picks a profile +# from config.yml (default, small, medium, all). + +set -euo pipefail + +# run from the PEcAn repo root regardless of caller cwd. +cd "$(dirname "$0")/../.." + +NCC_PROJECT="${NCC_PROJECT:-default}" +export NCC_PROJECT + +DIR="workflows/ncc-statewide" + +Rscript "$DIR/01-build-parcel-design.R" +Rscript "$DIR/02-sample-ncc-events.R" +Rscript "$DIR/03-write-parquet.R" From 27591640af29ef4d58f3146519dcbf852b373c56 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:18:05 -0400 Subject: [PATCH 14/37] add ncc sanity check --- workflows/ncc-statewide/check-result.R | 69 ++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 workflows/ncc-statewide/check-result.R diff --git a/workflows/ncc-statewide/check-result.R b/workflows/ncc-statewide/check-result.R new file mode 100644 index 0000000000..95b1a99129 --- /dev/null +++ b/workflows/ncc-statewide/check-result.R @@ -0,0 +1,69 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/ncc-statewide/config.yml", + config = Sys.getenv("NCC_PROJECT", "default")) + +options(arrow.unsafe_metadata = TRUE) + +out_path <- file.path(config[["output_dir"]], config[["output_subdir"]]) +if (!dir.exists(out_path)) { + PEcAn.logger::logger.severe("Output not found: ", out_path) +} + +shards <- list.files(out_path, pattern = "\\.parquet$", full.names = TRUE) +if (length(shards) == 0) { + PEcAn.logger::logger.severe("No parquet shards under: ", out_path) +} +ds <- arrow::open_dataset(shards) + +# schema +print(ds$schema) + +PEcAn.logger::logger.info(sprintf( + "%d shards: %s ... %s", + length(shards), basename(shards[1]), basename(shards[length(shards)]))) + +tot <- ds |> dplyr::count() |> dplyr::collect() +PEcAn.logger::logger.info(sprintf("total rows: %d", tot[["n"]])) + +# rows per ensemble member +print( + ds |> + dplyr::count(.data$ens_id) |> + dplyr::collect() |> + as.data.frame() +) + +# rows per material +print( + ds |> + dplyr::count(.data$material) |> + dplyr::collect() |> + as.data.frame() +) + +# rows per year +print( + ds |> + dplyr::mutate(year = lubridate::year(.data$date)) |> + dplyr::count(.data$year) |> + dplyr::collect() |> + dplyr::arrange(.data$year) |> + as.data.frame() +) + +# first 5 rows +print(ds |> dplyr::slice_head(n = 5) |> dplyr::collect() |> as.data.frame()) + +# org_c and org_n ranges +stats <- ds |> + dplyr::summarize( + min_org_c = min(.data$org_c_kg_m2, na.rm = TRUE), + max_org_c = max(.data$org_c_kg_m2, na.rm = TRUE), + mean_org_c = mean(.data$org_c_kg_m2, na.rm = TRUE), + min_org_n = min(.data$org_n_kg_m2, na.rm = TRUE), + max_org_n = max(.data$org_n_kg_m2, na.rm = TRUE), + mean_org_n = mean(.data$org_n_kg_m2, na.rm = TRUE) + ) |> + dplyr::collect() +print(stats |> as.data.frame()) From 7d4246baa7853d537df6fe5445c66cc1b50b44fe Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:18:05 -0400 Subject: [PATCH 15/37] add ncc s3 sync --- workflows/ncc-statewide/push-to-carb.sh | 5 +++++ 1 file changed, 5 insertions(+) create mode 100755 workflows/ncc-statewide/push-to-carb.sh diff --git a/workflows/ncc-statewide/push-to-carb.sh b/workflows/ncc-statewide/push-to-carb.sh new file mode 100755 index 0000000000..745e5cfeb0 --- /dev/null +++ b/workflows/ncc-statewide/push-to-carb.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +aws s3 sync --profile ccmmf \ + /projectnb/dietzelab/ccmmf/management/ncc_event_files/ \ + s3://carb/management/ncc/v1.0/ From 6af1c011ecf3890edb652792e9a4442e24f66339 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:18:05 -0400 Subject: [PATCH 16/37] add ncc workflow readme --- workflows/ncc-statewide/README.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 workflows/ncc-statewide/README.md diff --git a/workflows/ncc-statewide/README.md b/workflows/ncc-statewide/README.md new file mode 100644 index 0000000000..242c91169b --- /dev/null +++ b/workflows/ncc-statewide/README.md @@ -0,0 +1,17 @@ +# Statewide NCC (compost) events workflow + +Builds an ensemble of compost amendment events for California ag parcels across 2016 and 2018 to 2023. Implements the monitoring framework's three nested conditional draws: probability of amendment per parcel year, then date conditional on amendment, then amount + material conditional on amendment. 2017 is skipped because LandIQ has no statewide survey for that year. + +Run from the PEcAn project root. Pick a profile (default, small, medium, all) with `NCC_PROJECT`: + +``` +NCC_PROJECT=default bash workflows/ncc-statewide/run-statewide.sh +``` + +The three scripts run in order. `01-build-parcel-design.R` reads the LandIQ MSLSP matched product for each configured year, classifies each PFT into the annual or perennial family (compost rate depends on family, not crop), and saves the design table. Cycles whose PFT is outside that set get logged and dropped. `02-sample-ncc-events.R` expands the design across the ensemble dimension, runs the Bernoulli probability draw, and for cycles that fire pulls material, application rate, %C, C:N, and a date offset by calling the bundled samplers in `PEcAn.data.land::sample_ca_compost_*`. `03-write-parquet.R` computes `org_c_kg_m2 = app_rate * 0.2241702 * (pct_c / 100)` and `org_n_kg_m2 = org_c / cn_ratio`, partitions parcels into batches, and writes one parquet per batch named `_.parquet`. Compression is ZSTD when available, snappy otherwise. + +Output lands at `/projectnb/dietzelab/ccmmf/usr/akash/event_files/ncc/`. Intermediate `.rds` files live under `_staging/` next to the output. `check-result.R` opens the dataset and prints schema, shard count, rows per ensemble member, rows per material, rows per year, a sample of rows, and the org_c / org_n ranges. `push-to-carb.sh` is the eventual `aws s3 sync` to `s3://carb/management/ncc/v1.0/`. + +The output columns are `parcel_id` (int), `ens_id` (string `ncc_ens_NNN`), `date` (sampled within the per family calendar window), `material` (one of the CalRecycle classes: green, food, ag, yard), `org_c_kg_m2`, `org_n_kg_m2`, `nh4_n_kg_m2` (zero, compost N mineralizes through the soil pool in SIPNET), `no3_n_kg_m2` (zero), `ncc_subtype` (`"compost"`), `crop_code`, `PFT`. Only cycle ensemble rows where the probability draw fires get emitted, so the row count varies by `p_apply_default` in `config.yml`. + +The bundled sampling distributions and per family lookups live in `PEcAn.data.land` (tables `ca_compost_pct_c_distribution`, `ca_compost_cn_distribution`, `ca_compost_app_rate_envelope`, `ca_compost_calendar_window`, `ca_compost_material_whitelist`) so the workflow only carries the probability knob in its own config. Defaults: %C 15 to 30 from the CDFA and CalRecycle characterizations of CA finished compost, C:N 8 to 25 from the CDFA HSP white paper, rates from CDFA HSP Table 2 plus NRCS CPS 336, materials from 14 CCR section 17852, calendar windows from UC ANR practice guidance. From b8e99cba315d529935a1dc93b2de5465898278b5 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:18:49 -0400 Subject: [PATCH 17/37] add fertilization cleaner --- .../01c-clean-fertilization.R | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 workflows/preprocess-event-parquet/01c-clean-fertilization.R diff --git a/workflows/preprocess-event-parquet/01c-clean-fertilization.R b/workflows/preprocess-event-parquet/01c-clean-fertilization.R new file mode 100644 index 0000000000..d496133855 --- /dev/null +++ b/workflows/preprocess-event-parquet/01c-clean-fertilization.R @@ -0,0 +1,42 @@ +#!/usr/bin/env Rscript + +fert_path <- "/projectnb/dietzelab/ccmmf/usr/akash/event_files/fertilization" + +outdir <- "_output" +dir.create(outdir, showWarnings = FALSE, recursive = TRUE) + +dbdir <- file.path(Sys.getenv("TMPDIR", "/tmp"), "temp.duckdb") +conn <- DBI::dbConnect(duckdb::duckdb(dbdir = dbdir)) + +# cast ensemble id to an enum to accelerate and reduce the memory pressure +# of the sort. +DBI::dbExecute(conn, glue::glue(" + CREATE OR REPLACE TYPE fert_ens_id_enum AS ENUM ( + SELECT DISTINCT ens_id FROM read_parquet('{fert_path}/*.parquet') + ) + " +)) + +# sort and write the partitioned parquet output. rename parcel_id to site_id +# and ens_id to event_member_id to match the schema the json converter +# consumes. +DBI::dbExecute(conn, glue::glue(" + COPY ( + SELECT + CAST (parcel_id AS INTEGER) AS site_id, + CAST (ens_id AS fert_ens_id_enum) AS event_member_id, + date, + CAST (nh4_n_kg_m2 AS DECIMAL(10, 8)) AS nh4_n_kg_m2, + CAST (no3_n_kg_m2 AS DECIMAL(10, 8)) AS no3_n_kg_m2, + CAST (org_c_kg_m2 AS DECIMAL(10, 8)) AS org_c_kg_m2, + CAST (org_n_kg_m2 AS DECIMAL(10, 8)) AS org_n_kg_m2, + fert_subtype, + crop_code, + PFT + FROM read_parquet('{fert_path}/*.parquet') + ORDER BY event_member_id, site_id, date + ) TO + '{outdir}/fertilization.parquet' + (FORMAT PARQUET, COMPRESSION ZSTD, OVERWRITE, PARTITION_BY (event_member_id)) + " +)) From bbbb1daaf901a8a635866b4f9f42303b6f1bcdd9 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:18:49 -0400 Subject: [PATCH 18/37] add ncc cleaner --- .../preprocess-event-parquet/01d-clean-ncc.R | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 workflows/preprocess-event-parquet/01d-clean-ncc.R diff --git a/workflows/preprocess-event-parquet/01d-clean-ncc.R b/workflows/preprocess-event-parquet/01d-clean-ncc.R new file mode 100644 index 0000000000..45698da8de --- /dev/null +++ b/workflows/preprocess-event-parquet/01d-clean-ncc.R @@ -0,0 +1,43 @@ +#!/usr/bin/env Rscript + +ncc_path <- "/projectnb/dietzelab/ccmmf/usr/akash/event_files/ncc" + +outdir <- "_output" +dir.create(outdir, showWarnings = FALSE, recursive = TRUE) + +dbdir <- file.path(Sys.getenv("TMPDIR", "/tmp"), "temp.duckdb") +conn <- DBI::dbConnect(duckdb::duckdb(dbdir = dbdir)) + +# cast ensemble id to an enum to accelerate and reduce the memory pressure +# of the sort. +DBI::dbExecute(conn, glue::glue(" + CREATE OR REPLACE TYPE ncc_ens_id_enum AS ENUM ( + SELECT DISTINCT ens_id FROM read_parquet('{ncc_path}/*.parquet') + ) + " +)) + +# sort and write the partitioned parquet output. rename parcel_id to site_id +# and ens_id to event_member_id to match the schema the json converter +# consumes. +DBI::dbExecute(conn, glue::glue(" + COPY ( + SELECT + CAST (parcel_id AS INTEGER) AS site_id, + CAST (ens_id AS ncc_ens_id_enum) AS event_member_id, + date, + material, + CAST (org_c_kg_m2 AS DECIMAL(10, 8)) AS org_c_kg_m2, + CAST (org_n_kg_m2 AS DECIMAL(10, 8)) AS org_n_kg_m2, + CAST (nh4_n_kg_m2 AS DECIMAL(10, 8)) AS nh4_n_kg_m2, + CAST (no3_n_kg_m2 AS DECIMAL(10, 8)) AS no3_n_kg_m2, + ncc_subtype, + crop_code, + PFT + FROM read_parquet('{ncc_path}/*.parquet') + ORDER BY event_member_id, site_id, date + ) TO + '{outdir}/ncc.parquet' + (FORMAT PARQUET, COMPRESSION ZSTD, OVERWRITE, PARTITION_BY (event_member_id)) + " +)) From d87c873b0247c293976f9bbd11f34e7551362583 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:18:49 -0400 Subject: [PATCH 19/37] list new cleaners in readme --- workflows/preprocess-event-parquet/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/workflows/preprocess-event-parquet/README.md b/workflows/preprocess-event-parquet/README.md index c64ce25d7e..b8e3aa0775 100644 --- a/workflows/preprocess-event-parquet/README.md +++ b/workflows/preprocess-event-parquet/README.md @@ -4,4 +4,6 @@ This converts the raw outputs of the management/monitoring CCMMF pipeline into p - `01a-clean-irrigation.R` --- Preprocess irrigation. This is handled separately because the raw data are really large (600M rows). - `01b-clean-other-events.R` --- Preprocess remaining events. These are done together because they are much smaller. +- `01c-clean-fertilization.R` --- Preprocess synthetic N fertilization (`workflows/fertilization-statewide/`). +- `01d-clean-ncc.R` --- Preprocess ncc (compost) events (`workflows/ncc-statewide/`). - `02-events-to-json.R` --- Example of running From 83df132fb9a0905efc9dd7f12274588b736187d4 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Mon, 18 May 2026 16:18:49 -0400 Subject: [PATCH 20/37] add changelog entry for event workflows --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e966311292..efecb01ec7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ For more information about this file see also [Keep a Changelog](http://keepacha - PEcAn.SIPNET gains support for SIPNET v2, whose features includes management events, nitrogen cycle tracking, explicit N2O and methane fluxes, runtime setting of feature flags, and changes to the parameter set (now 73 parameters). SIPNET v1 is still fully supported, but workarounds for bugs in the legacy `sipnet.unk` version have been removed. - Added `PEcAn.data.land::to_co2e()` for converting SOC change, CH4, and N2O to CO2-equivalent emissions using IPCC Global Warming Potential values. - Added `PEcAn.data.land::event_parquet_to_json` for generating PEcAn `event.json` files from well-formatted event parquet files, with support for ensembles of events. +- Added statewide fertilization and ncc compost event workflows that emit ensemble parquet shards for California ag parcels. ### Fixed - Fixed broken pecanproject.github.io, pecan.gitbooks.io, and other outdated documentation links across book_source, tutorials, models, modules, web, and shiny files (#3710). From 3ab64b3e36f1d9bd78dbc8069ebbb8f360b32b7a Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:12:25 -0400 Subject: [PATCH 21/37] drop output_subdir, fix nh4 split knob name, point crosswalk at shared path --- workflows/fertilization-statewide/config.yml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/workflows/fertilization-statewide/config.yml b/workflows/fertilization-statewide/config.yml index 8a29757a77..0a37d286b9 100644 --- a/workflows/fertilization-statewide/config.yml +++ b/workflows/fertilization-statewide/config.yml @@ -1,24 +1,24 @@ default: n_parcels: 1000 n_ensemble: 20 - nh4_no3_ratio: 0.5 + # fraction of total synthetic N going to nh4. set to 0.5 for a 50/50 + # nh4/no3 split. value 0 makes it all nitrate, value 1 makes it all + # ammonium. + nh4_fraction: 0.5 seed: 42 years: [2016, 2018, 2019, 2020, 2021, 2022, 2023] matched_dir: /projectnb/dietzelab/ccmmf/management/phenology/matched_landiq_mslsp_v4.1 - crosswalk_path: /projectnb/dietzelab/ccmmf/usr/akash/management/fertilization/CCMMF Fertilization - Crop_types.tsv - output_dir: /projectnb/dietzelab/ccmmf/usr/akash/event_files - output_subdir: fertilization + crosswalk_path: /projectnb/dietzelab/ccmmf/management/fertilization/CCMMF_Fertilization_Crop_types.tsv + output_dir: /projectnb/dietzelab/ccmmf/usr/akash/event_files/fertilization batch_size: 100 workers: 1 -small: - medium: n_parcels: 10000 batch_size: 1000 workers: 4 all: - n_parcels: null # null means all parcels in the matched product + n_parcels: null # null means all parcels in the matched product batch_size: 5000 workers: 8 From 96f8002e6d0d09baac3a1683c8849a7cf923ac4b Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:13:46 -0400 Subject: [PATCH 22/37] split skip_zero_envelope drops from skip_no_rate, drop unused PFT column, use single output_dir --- .../01-build-parcel-design.R | 41 ++++++++++++++----- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/workflows/fertilization-statewide/01-build-parcel-design.R b/workflows/fertilization-statewide/01-build-parcel-design.R index 505b166b20..4ee9962398 100644 --- a/workflows/fertilization-statewide/01-build-parcel-design.R +++ b/workflows/fertilization-statewide/01-build-parcel-design.R @@ -5,7 +5,7 @@ config <- config::get(file = "workflows/fertilization-statewide/config.yml", set.seed(config[["seed"]]) -staging_dir <- file.path(config[["output_dir"]], config[["output_subdir"]], "_staging") +staging_dir <- file.path(config[["output_dir"]], "_staging") dir.create(staging_dir, showWarnings = FALSE, recursive = TRUE) options(arrow.unsafe_metadata = TRUE) @@ -111,8 +111,7 @@ read_matched_year <- function(year) { year = as.integer(.data$year), season = as.integer(.data$season), date = as.Date(.data$mslsp_OGI), - code = paste0(.data$landiq_CLASS, .data$landiq_SUBCLASS), - PFT = as.character(.data$landiq_PFT) + code = paste0(.data$landiq_CLASS, .data$landiq_SUBCLASS) ) } @@ -142,17 +141,24 @@ if (!is.null(n_parcels) && n_parcels < dplyr::n_distinct(plant$parcel_id)) { design <- plant |> dplyr::left_join(code_lookup, by = "code") |> dplyr::mutate( - rate_source = ifelse(is.na(.data$min_n_lbs_acre), "skip", "crosswalk") + rate_source = dplyr::case_when( + is.na(.data$min_n_lbs_acre) ~ "skip_no_rate", + .data$min_n_lbs_acre == 0 & .data$max_n_lbs_acre == 0 ~ "skip_zero_envelope", + TRUE ~ "crosswalk" + ) ) -skipped <- design |> dplyr::filter(.data$rate_source == "skip") -if (nrow(skipped) > 0) { - by_code <- skipped |> +# log both kinds of drops separately so it's clear which are missing data +# vs which are intentional zero envelopes (e.g. Alfalfa, legumes that fix +# their own N and carry a cited 0 to 0 rate) +unresolved <- design |> dplyr::filter(.data$rate_source == "skip_no_rate") +if (nrow(unresolved) > 0) { + by_code <- unresolved |> dplyr::count(.data$code, name = "n_events", sort = TRUE) |> head(15) PEcAn.logger::logger.warn(sprintf( "Dropping %d cycles across %d codes with no resolvable N rate. Top offenders:", - nrow(skipped), dplyr::n_distinct(skipped$code) + nrow(unresolved), dplyr::n_distinct(unresolved$code) )) for (i in seq_len(nrow(by_code))) { PEcAn.logger::logger.warn(sprintf(" %s: %d cycles", @@ -160,9 +166,24 @@ if (nrow(skipped) > 0) { } } +zero_env <- design |> dplyr::filter(.data$rate_source == "skip_zero_envelope") +if (nrow(zero_env) > 0) { + by_code <- zero_env |> + dplyr::count(.data$code, name = "n_events", sort = TRUE) |> + head(15) + PEcAn.logger::logger.info(sprintf( + "Dropping %d cycles across %d codes with a cited 0 to 0 N rate (no synthetic application). Top:", + nrow(zero_env), dplyr::n_distinct(zero_env$code) + )) + for (i in seq_len(nrow(by_code))) { + PEcAn.logger::logger.info(sprintf(" %s: %d cycles", + by_code$code[i], by_code$n_events[i])) + } +} + design <- design |> - dplyr::filter(.data$rate_source != "skip") |> - dplyr::select("parcel_id", "year", "season", "date", "code", "PFT", + dplyr::filter(.data$rate_source == "crosswalk") |> + dplyr::select("parcel_id", "year", "season", "date", "code", "min_n_lbs_acre", "max_n_lbs_acre") PEcAn.logger::logger.info(sprintf("Design table: %d events, %d parcels, %d years", From 3a2309707bd441614fc4fc3902214f66db69de41 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:14:25 -0400 Subject: [PATCH 23/37] switch fert sampler to shared ens_NNN naming --- .../fertilization-statewide/02-sample-n-rates.R | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/workflows/fertilization-statewide/02-sample-n-rates.R b/workflows/fertilization-statewide/02-sample-n-rates.R index 33533eb8c6..c42c6bc249 100644 --- a/workflows/fertilization-statewide/02-sample-n-rates.R +++ b/workflows/fertilization-statewide/02-sample-n-rates.R @@ -5,7 +5,7 @@ config <- config::get(file = "workflows/fertilization-statewide/config.yml", set.seed(config[["seed"]]) -staging_dir <- file.path(config[["output_dir"]], config[["output_subdir"]], "_staging") +staging_dir <- file.path(config[["output_dir"]], "_staging") design_file <- file.path(staging_dir, "_staging_01_design.rds") if (!file.exists(design_file)) { PEcAn.logger::logger.severe( @@ -24,16 +24,14 @@ PEcAn.logger::logger.info(sprintf( )) # expand each design row to one row per ensemble member and draw the annual -# N rate uniformly from the resolved min/max envelope. +# N rate uniformly from each cycle's min/max envelope. ens_id is shared with +# ncc workflow so a given ensemble member can carry both synthetic and +# compost rows under one fertilization event_type in json output events <- design |> tidyr::crossing(ensemble_member = seq_len(n_ensemble)) |> dplyr::mutate( - annual_n_lb_acre = stats::runif( - dplyr::n(), - min = .data$min_n_lbs_acre, - max = .data$max_n_lbs_acre - ), - ens_id = sprintf("fert_ens_%03d", .data$ensemble_member) + annual_n_lb_acre = stats::runif(dplyr::n(), min = .data$min_n_lbs_acre, max = .data$max_n_lbs_acre), + ens_id = sprintf("ens_%03d", .data$ensemble_member) ) PEcAn.logger::logger.info(sprintf( From 147c303a5ac96323e872a192def17e805f4b8bfb Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:15:10 -0400 Subject: [PATCH 24/37] swap fert constant for ud_convert, drop fert_subtype and pft cols --- .../03-write-parquet.R | 37 ++++++++----------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/workflows/fertilization-statewide/03-write-parquet.R b/workflows/fertilization-statewide/03-write-parquet.R index 09610c9261..17eee4628e 100644 --- a/workflows/fertilization-statewide/03-write-parquet.R +++ b/workflows/fertilization-statewide/03-write-parquet.R @@ -3,10 +3,7 @@ config <- config::get(file = "workflows/fertilization-statewide/config.yml", config = Sys.getenv("FERT_PROJECT", "default")) -# 1 lb/acre = 0.112085 g/m^2 = 0.000112085 kg/m^2. -LBS_ACRE_TO_KG_M2 <- 0.112085 / 1000 - -staging_dir <- file.path(config[["output_dir"]], config[["output_subdir"]], "_staging") +staging_dir <- file.path(config[["output_dir"]], "_staging") events_file <- file.path(staging_dir, "_staging_02_events.rds") if (!file.exists(events_file)) { PEcAn.logger::logger.severe( @@ -18,34 +15,32 @@ if (!file.exists(events_file)) { PEcAn.logger::logger.info("Reading events from ", events_file) events <- readRDS(events_file) -# synthetic N is split between nh4 and no3 by the configured ratio. org_c -# and org_n stay zero here because organic amendments live in the separate -# ncc product. -nh4_ratio <- config[["nh4_no3_ratio"]] +# synthetic N is split between nh4 and no3 using nh4_fraction (share going +# to nh4, rest goes to no3). org_c and org_n stay zero here; compost rows +# come in via ncc workflow and end up unioned into same fertilization +# parquet by cleaner +nh4_frac <- config[["nh4_fraction"]] out <- events |> dplyr::mutate( - total_n_kg_m2 = .data$annual_n_lb_acre * .env$LBS_ACRE_TO_KG_M2, - nh4_n_kg_m2 = .data$total_n_kg_m2 * .env$nh4_ratio, - no3_n_kg_m2 = .data$total_n_kg_m2 * (1 - .env$nh4_ratio), - org_c_kg_m2 = 0, - org_n_kg_m2 = 0, - fert_subtype = "synthetic" + total_n_kg_m2 = PEcAn.utils::ud_convert(.data$annual_n_lb_acre, "lb/acre", "kg/m^2"), + nh4_n_kg_m2 = .data$total_n_kg_m2 * .env$nh4_frac, + no3_n_kg_m2 = .data$total_n_kg_m2 * (1 - .env$nh4_frac), + org_c_kg_m2 = 0, + org_n_kg_m2 = 0 ) |> dplyr::transmute( - parcel_id = as.integer(.data$parcel_id), - ens_id = .data$ens_id, - date = as.Date(.data$date), + parcel_id = as.integer(.data$parcel_id), + ens_id = .data$ens_id, + date = as.Date(.data$date), .data$nh4_n_kg_m2, .data$no3_n_kg_m2, .data$org_c_kg_m2, .data$org_n_kg_m2, - .data$fert_subtype, - crop_code = .data$code, - PFT = .data$PFT + crop_code = .data$code ) -out_path <- file.path(config[["output_dir"]], config[["output_subdir"]]) +out_path <- config[["output_dir"]] dir.create(out_path, showWarnings = FALSE, recursive = TRUE) ## clean prior shards From 3f9a540afb4323e57460c15c0607805b93cb3bf9 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:16:01 -0400 Subject: [PATCH 25/37] fert check result, and collapsed output_dir --- workflows/fertilization-statewide/check-result.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflows/fertilization-statewide/check-result.R b/workflows/fertilization-statewide/check-result.R index b8409735d5..e7b33238ad 100644 --- a/workflows/fertilization-statewide/check-result.R +++ b/workflows/fertilization-statewide/check-result.R @@ -5,7 +5,7 @@ config <- config::get(file = "workflows/fertilization-statewide/config.yml", options(arrow.unsafe_metadata = TRUE) -out_path <- file.path(config[["output_dir"]], config[["output_subdir"]]) +out_path <- config[["output_dir"]] if (!dir.exists(out_path)) { PEcAn.logger::logger.severe("Output not found: ", out_path) } From ae7055b50fbe68e88ae57b1d6130553624cfdd8a Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:17:31 -0400 Subject: [PATCH 26/37] update readme --- workflows/fertilization-statewide/README.md | 24 ++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/workflows/fertilization-statewide/README.md b/workflows/fertilization-statewide/README.md index b5c7aed341..6ccd5121d3 100644 --- a/workflows/fertilization-statewide/README.md +++ b/workflows/fertilization-statewide/README.md @@ -1,15 +1,29 @@ # Statewide fertilization events workflow -Builds an ensemble of synthetic N fertilization events for California ag parcels across 2016 and 2018 to 2023. +Builds an ensemble of synthetic N fertilization events for every California ag parcel in the LandIQ MSLSP matched product, for 2016 and 2018 to 2023. 2017 is skipped because LandIQ did not run a statewide survey that year. N rate envelopes come from `PEcAn.data.land::ca_n_application_rate`. -Run from PEcAn project root. Pick a profile (default, small, medium, all) with `FERT_PROJECT`: +# Config + +Everything tweakable lives in `config.yml`. Most setups only need to look at: + +- `matched_dir`: the LandIQ MSLSP matched product directory +- `crosswalk_path`: the LandIQ to FREP to UC ANR crop name crosswalk TSV +- `output_dir`: where the parquet shards go +- `n_parcels`, `n_ensemble`, `batch_size`, `workers`: scale knobs per profile +- `nh4_fraction`: share of total synthetic N going to ammonium; the rest goes to nitrate (default 0.5 for a 50/50 split) + +# Run + +Pick a profile (`default`, `medium`, `all`) and run from the PEcAn project root: ``` FERT_PROJECT=default bash workflows/fertilization-statewide/run-statewide.sh ``` -The three scripts run in order. `01-build-parcel-design.R` reads the LandIQ MSLSP matched product for each configured year, walks the Crop_types crosswalk to map each LandIQ CLASS+SUBCLASS code onto a per crop N rate envelope from `PEcAn.data.land::ca_n_application_rate`, and saves the resulting design table. Cycles whose code does not resolve to a rate get logged and dropped. `02-sample-n-rates.R` crosses the design with the ensemble dimension and draws annual N uniformly from each cycle's min/max envelope. `03-write-parquet.R` converts lb/acre to kg/m^2, splits between nh4 and no3 by the configured `nh4_no3_ratio`, partitions parcels into batches, and writes one parquet per batch named `_.parquet`. Compression is ZSTD when available, snappy otherwise. +`default` is 1000 random parcels, single threaded. `all` is full statewide (~660k parcels), eight workers. The three R scripts chain together: 01 builds the design, 02 samples rates, 03 writes parquet. `check-result.R` reads the output back and prints a summary. + +# Output -Output lands at `/projectnb/dietzelab/ccmmf/usr/akash/event_files/fertilization/`. Intermediate `.rds` files live under `_staging/` next to the output and can be deleted after a successful run. `check-result.R` opens the dataset and prints schema, shard count, rows per ensemble member, rows per year, a sample of rows, and the total N range. `push-to-carb.sh` is the eventual `aws s3 sync` to `s3://carb/management/fertilization/v1.0/`. +Parcel range sharded parquet at `/`. Columns: `parcel_id`, `ens_id` (`ens_NNN`, shared with ncc workflow), `date`, `nh4_n_kg_m2`, `no3_n_kg_m2`, `org_c_kg_m2` (zero), `org_n_kg_m2` (zero), `crop_code`. -The output columns are `parcel_id` (int), `ens_id` (string `fert_ens_NNN`), `date` (MSLSP `mslsp_OGI`), `nh4_n_kg_m2`, `no3_n_kg_m2`, `org_c_kg_m2` (zero here, compost lives in the ncc product), `org_n_kg_m2` (zero), `fert_subtype` (`"synthetic"`), `crop_code` (LandIQ CLASS+SUBCLASS), `PFT`. The rename to the schema the JSON converter consumes (`site_id`, `event_member_id`, hive partitioned by ensemble) happens in the downstream cleaner under `workflows/preprocess-event-parquet/`. +Downstream, `workflows/preprocess-event-parquet/01c-clean-fertilization.R` renames `parcel_id` to `site_id` and `ens_id` to `event_member_id`, unions compost rows from `workflows/ncc-statewide/`, and writes JSON converter input. From f67b0b70c0f1c1f2493b0192aabc92f974661f87 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:18:35 -0400 Subject: [PATCH 27/37] drop output_subdir from ncc config and unused crosswalk_path --- workflows/ncc-statewide/config.yml | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/workflows/ncc-statewide/config.yml b/workflows/ncc-statewide/config.yml index 017bc307bd..fec0b17b32 100644 --- a/workflows/ncc-statewide/config.yml +++ b/workflows/ncc-statewide/config.yml @@ -6,18 +6,13 @@ default: matched_dir: /projectnb/dietzelab/ccmmf/management/phenology/matched_landiq_mslsp_v4.1 # probability of compost amendment per parcel year per ensemble member. - # scenario knob, useful range 0.05 to 0.25. the rest of the compost - # distributions live in PEcAn.data.land bundled data + # scenario knob, useful range 0.05 to 0.25 p_apply_default: 0.10 - crosswalk_path: /projectnb/dietzelab/ccmmf/usr/akash/management/fertilization/CCMMF Fertilization - Crop_types.tsv - output_dir: /projectnb/dietzelab/ccmmf/usr/akash/event_files - output_subdir: ncc + output_dir: /projectnb/dietzelab/ccmmf/usr/akash/event_files/ncc batch_size: 100 workers: 1 -small: - medium: n_parcels: 10000 batch_size: 1000 From 57ec447df3cad6034c8d9decf238eb448816ca1b Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:19:11 -0400 Subject: [PATCH 28/37] include rice and hay in pft families instead of dropping them --- workflows/ncc-statewide/01-build-parcel-design.R | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/workflows/ncc-statewide/01-build-parcel-design.R b/workflows/ncc-statewide/01-build-parcel-design.R index c4c0b92381..51867d51c5 100644 --- a/workflows/ncc-statewide/01-build-parcel-design.R +++ b/workflows/ncc-statewide/01-build-parcel-design.R @@ -5,7 +5,7 @@ config <- config::get(file = "workflows/ncc-statewide/config.yml", set.seed(config[["seed"]]) -staging_dir <- file.path(config[["output_dir"]], config[["output_subdir"]], "_staging") +staging_dir <- file.path(config[["output_dir"]], "_staging") dir.create(staging_dir, showWarnings = FALSE, recursive = TRUE) options(arrow.unsafe_metadata = TRUE) @@ -54,13 +54,11 @@ if (!is.null(n_parcels) && n_parcels < dplyr::n_distinct(plant$parcel_id)) { length(picked), n_parcels)) } -# classify each PFT into annual vs perennial. anything not in this set -# gets dropped with a warning since the rate envelope is per family. pft_family <- function(pft) { dplyr::case_when( - pft %in% c("row", "vegetable") ~ "annual", - pft %in% c("woody", "vine") ~ "perennial", - TRUE ~ NA_character_ + pft %in% c("row", "hay", "rice") ~ "annual", + pft == "woody" ~ "perennial", + TRUE ~ NA_character_ ) } From 86f1de86a9a8e4075189d28734fbcff11fe0e966 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:19:42 -0400 Subject: [PATCH 29/37] rewrite ncc sampler against ca_compost_amendment with uniform quantile draws --- .../ncc-statewide/02-sample-ncc-events.R | 71 ++++++++++++++----- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/workflows/ncc-statewide/02-sample-ncc-events.R b/workflows/ncc-statewide/02-sample-ncc-events.R index a39d1f679d..b4c1382856 100644 --- a/workflows/ncc-statewide/02-sample-ncc-events.R +++ b/workflows/ncc-statewide/02-sample-ncc-events.R @@ -5,7 +5,7 @@ config <- config::get(file = "workflows/ncc-statewide/config.yml", set.seed(config[["seed"]]) -staging_dir <- file.path(config[["output_dir"]], config[["output_subdir"]], "_staging") +staging_dir <- file.path(config[["output_dir"]], "_staging") design_file <- file.path(staging_dir, "_staging_01_design.rds") if (!file.exists(design_file)) { PEcAn.logger::logger.severe( @@ -18,15 +18,13 @@ PEcAn.logger::logger.info("Reading design from ", design_file) design <- readRDS(design_file) n_ensemble <- as.integer(config[["n_ensemble"]]) -p_apply <- as.numeric(config[["p_apply_default"]]) +p_apply <- as.numeric(config[["p_apply_default"]]) PEcAn.logger::logger.info(sprintf( "Sampling %d ensemble members per cycle with p_apply=%.2f", n_ensemble, p_apply)) -# three nested conditional draws: probability of amendment, then material -# / rate / date / %C / C:N given amendment. only rows where the bernoulli -# fires get emitted. all per family lookups live in PEcAn.data.land bundled -# tables so the science ships with package +# bernoulli gate first so most parcel year ensemble combinations drop out +# before heavier material lookup events <- design |> tidyr::crossing(ensemble_member = seq_len(n_ensemble)) |> dplyr::mutate( @@ -43,23 +41,62 @@ if (nrow(events) == 0) { PEcAn.logger::logger.severe("No events fired. Check p_apply_default.") } +# wood class is excluded from annuals; high C:N immobilizes more N than +# row crop rotations can absorb within one season +ca_compost <- PEcAn.data.land::ca_compost_amendment + +allowed_classes <- list( + annual = c("green", "food", "yard", "ag"), + perennial = c("green", "food", "yard", "ag", "wood") +) + +mat_idx_by_family <- lapply(allowed_classes, function(cls) { + which(ca_compost$material_class %in% cls) +}) + +# one material draw per event row, conditional on pft family +events$mat_idx <- vapply( + events$pft_family, + function(fam) sample(mat_idx_by_family[[fam]], 1L), + integer(1) +) + +mat_cols <- ca_compost[events$mat_idx, + c("material", "material_class", + "app_rate_min", "app_rate_max", + "n_pct", "pan_pct", + "cn_min", "cn_max")] +events <- dplyr::bind_cols(events, mat_cols) + +# date offset windows are working assumptions: perennials get fall/winter +# application (Niederholzer 2019, UCCE), annuals get a wider pre planting +# window (Fulford et al 2023, CA processing tomatoes) +ANNUAL_OFFSET_MIN <- 14L +ANNUAL_OFFSET_MAX <- 180L +PERENNIAL_OFFSET_MIN <- 30L +PERENNIAL_OFFSET_MAX <- 210L + events <- events |> dplyr::mutate( - date_offset_days = PEcAn.data.land::sample_ca_compost_date_offset(.data$pft_family), - date = .data$anchor - .data$date_offset_days, - material = PEcAn.data.land::sample_ca_compost_material(.data$pft_family), - app_rate_t_ac = PEcAn.data.land::sample_ca_compost_app_rate(.data$pft_family), - pct_c = PEcAn.data.land::sample_ca_compost_pct_c(dplyr::n()), - cn_ratio = PEcAn.data.land::sample_ca_compost_cn(dplyr::n()), - ens_id = sprintf("ncc_ens_%03d", .data$ensemble_member) + u_rate = stats::runif(dplyr::n()), + u_cn = stats::runif(dplyr::n()), + app_rate_lb_acre = .data$app_rate_min + .data$u_rate * (.data$app_rate_max - .data$app_rate_min), + cn_ratio = .data$cn_min + .data$u_cn * (.data$cn_max - .data$cn_min), + date_offset_days = ifelse( + .data$pft_family == "annual", + sample(ANNUAL_OFFSET_MIN:ANNUAL_OFFSET_MAX, dplyr::n(), replace = TRUE), + sample(PERENNIAL_OFFSET_MIN:PERENNIAL_OFFSET_MAX, dplyr::n(), replace = TRUE) + ), + date = .data$anchor - .data$date_offset_days, + ens_id = sprintf("ens_%03d", .data$ensemble_member) ) PEcAn.logger::logger.info(sprintf( - "Sampled %d NCC events. app_rate %.2f-%.2f t/ac, pct_c %.1f-%.1f, cn %.1f-%.1f", + "Sampled %d compost events. app_rate %.0f to %.0f lb/acre, n_pct %.2f to %.2f, cn %.1f to %.1f", nrow(events), - min(events[["app_rate_t_ac"]]), max(events[["app_rate_t_ac"]]), - min(events[["pct_c"]]), max(events[["pct_c"]]), - min(events[["cn_ratio"]]), max(events[["cn_ratio"]]))) + min(events[["app_rate_lb_acre"]]), max(events[["app_rate_lb_acre"]]), + min(events[["n_pct"]]), max(events[["n_pct"]]), + min(events[["cn_ratio"]]), max(events[["cn_ratio"]]))) staging_file <- file.path(staging_dir, "_staging_02_events.rds") saveRDS(events, staging_file) From 889b3a161ec08c00efeb97b33a54c9121fb5ea0a Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:20:12 -0400 Subject: [PATCH 30/37] ud_convert for units and PAN based mineral N split with negative clamp --- workflows/ncc-statewide/03-write-parquet.R | 50 ++++++++++------------ 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/workflows/ncc-statewide/03-write-parquet.R b/workflows/ncc-statewide/03-write-parquet.R index a8c9c56ca7..c6031438f9 100644 --- a/workflows/ncc-statewide/03-write-parquet.R +++ b/workflows/ncc-statewide/03-write-parquet.R @@ -3,10 +3,7 @@ config <- config::get(file = "workflows/ncc-statewide/config.yml", config = Sys.getenv("NCC_PROJECT", "default")) -# 1 t/acre = 224.1702 g/m^2 = 0.2241702 kg/m^2. -T_ACRE_TO_KG_M2 <- 0.2241702 - -staging_dir <- file.path(config[["output_dir"]], config[["output_subdir"]], "_staging") +staging_dir <- file.path(config[["output_dir"]], "_staging") events_file <- file.path(staging_dir, "_staging_02_events.rds") if (!file.exists(events_file)) { PEcAn.logger::logger.severe( @@ -18,33 +15,33 @@ if (!file.exists(events_file)) { PEcAn.logger::logger.info("Reading events from ", events_file) events <- readRDS(events_file) -# org_c = app_rate * (pct_c / 100); org_n = org_c / cn. nh4 and no3 stay -# zero here because compost releases organic N that mineralizes through -# the soil pool in SIPNET rather than as direct mineral N. -# TODO: in v2, sample %C and C:N jointly since they covary through %N. +# split total N using PAN (plant available N at 4 weeks). pan_pct can be +# negative for high C:N materials (immobilization); clamp to 0 in that +# case since SIPNET doesn't take negative minN. total C is just bulk +# material carbon, doesn't depend on PAN out <- events |> dplyr::mutate( - org_c_kg_m2 = .data$app_rate_t_ac * .env$T_ACRE_TO_KG_M2 * (.data$pct_c / 100), - org_n_kg_m2 = .data$org_c_kg_m2 / .data$cn_ratio, - nh4_n_kg_m2 = 0, - no3_n_kg_m2 = 0, - ncc_subtype = "compost" + dry_mass_kg_m2 = PEcAn.utils::ud_convert(.data$app_rate_lb_acre, "lb/acre", "kg/m^2"), + total_n_kg_m2 = .data$dry_mass_kg_m2 * (.data$n_pct / 100), + pan_frac = pmax(0, .data$pan_pct / 100), + nh4_n_kg_m2 = .data$total_n_kg_m2 * .data$pan_frac, + org_n_kg_m2 = .data$total_n_kg_m2 * (1 - .data$pan_frac), + org_c_kg_m2 = .data$total_n_kg_m2 * .data$cn_ratio, + no3_n_kg_m2 = 0 ) |> dplyr::transmute( - parcel_id = as.integer(.data$parcel_id), - ens_id = .data$ens_id, - date = as.Date(.data$date), - .data$material, - .data$org_c_kg_m2, - .data$org_n_kg_m2, + parcel_id = as.integer(.data$parcel_id), + ens_id = .data$ens_id, + date = as.Date(.data$date), .data$nh4_n_kg_m2, .data$no3_n_kg_m2, - .data$ncc_subtype, - crop_code = .data$code, - PFT = .data$PFT + .data$org_c_kg_m2, + .data$org_n_kg_m2, + crop_code = .data$code, + .data$material ) -out_path <- file.path(config[["output_dir"]], config[["output_subdir"]]) +out_path <- config[["output_dir"]] dir.create(out_path, showWarnings = FALSE, recursive = TRUE) ## clean prior shards @@ -55,7 +52,7 @@ if (length(existing) > 0) { } # partition by parcel_id range and write one parquet per batch named -# _.parquet. +# _.parquet all_parcels <- sort(unique(out[["parcel_id"]])) batch_size <- as.integer(config[["batch_size"]]) n_batches <- ceiling(length(all_parcels) / batch_size) @@ -66,7 +63,7 @@ PEcAn.logger::logger.info(sprintf( nrow(out), n_batches, batch_size, out_path)) # prefer ZSTD; fall back to snappy when zstd is not in the local arrow -# build. +# build parquet_codec <- if (arrow::codec_is_available("zstd")) "ZSTD" else "SNAPPY" PEcAn.logger::logger.info("Parquet compression codec: ", parquet_codec) @@ -88,8 +85,7 @@ if (workers > 1) { } PEcAn.logger::logger.info(sprintf( - "Done. wrote %d shards, %d total rows, parcels=%d, materials=%d, ensemble=%d", + "Done. wrote %d shards, %d total rows, parcels=%d, ensemble=%d", length(written), nrow(out), dplyr::n_distinct(out[["parcel_id"]]), - dplyr::n_distinct(out[["material"]]), dplyr::n_distinct(out[["ens_id"]]))) From 77996780d482a70261065ecffb4bab6671d1c8c7 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:21:02 -0400 Subject: [PATCH 31/37] collapsed output_dir --- workflows/ncc-statewide/check-result.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflows/ncc-statewide/check-result.R b/workflows/ncc-statewide/check-result.R index 95b1a99129..e1c60e054f 100644 --- a/workflows/ncc-statewide/check-result.R +++ b/workflows/ncc-statewide/check-result.R @@ -5,7 +5,7 @@ config <- config::get(file = "workflows/ncc-statewide/config.yml", options(arrow.unsafe_metadata = TRUE) -out_path <- file.path(config[["output_dir"]], config[["output_subdir"]]) +out_path <- config[["output_dir"]] if (!dir.exists(out_path)) { PEcAn.logger::logger.severe("Output not found: ", out_path) } From 3987a26a5afd9efda2406facb6e0a1764a1da166 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:21:02 -0400 Subject: [PATCH 32/37] ncc acronym defined and timing window cited --- workflows/ncc-statewide/README.md | 44 ++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/workflows/ncc-statewide/README.md b/workflows/ncc-statewide/README.md index 242c91169b..9cc63fda8f 100644 --- a/workflows/ncc-statewide/README.md +++ b/workflows/ncc-statewide/README.md @@ -1,17 +1,47 @@ -# Statewide NCC (compost) events workflow +# Statewide compost (NCC) events workflow -Builds an ensemble of compost amendment events for California ag parcels across 2016 and 2018 to 2023. Implements the monitoring framework's three nested conditional draws: probability of amendment per parcel year, then date conditional on amendment, then amount + material conditional on amendment. 2017 is skipped because LandIQ has no statewide survey for that year. +NCC is project shorthand for organic amendments like compost and manure. This workflow builds an ensemble of compost events for every California ag parcel in the LandIQ MSLSP matched product, for 2016 and 2018 to 2023. 2017 is skipped because LandIQ did not run a statewide survey that year. -Run from the PEcAn project root. Pick a profile (default, small, medium, all) with `NCC_PROJECT`: +Material properties (application rate, %N, C:N, PAN) come from `PEcAn.data.land::ca_compost_amendment`. + +# Config + +Tweakable bits in `config.yml`: + +- `matched_dir`: where LandIQ MSLSP matched product lives +- `output_dir`: where parquet shards go +- `n_parcels`, `n_ensemble`, `batch_size`, `workers`: scale knobs per profile +- `p_apply_default`: probability of compost per parcel year per ensemble member (default 0.10) + +# Run + +Pick a profile (`default`, `medium`, `all`) and run from PEcAn project root: ``` NCC_PROJECT=default bash workflows/ncc-statewide/run-statewide.sh ``` -The three scripts run in order. `01-build-parcel-design.R` reads the LandIQ MSLSP matched product for each configured year, classifies each PFT into the annual or perennial family (compost rate depends on family, not crop), and saves the design table. Cycles whose PFT is outside that set get logged and dropped. `02-sample-ncc-events.R` expands the design across the ensemble dimension, runs the Bernoulli probability draw, and for cycles that fire pulls material, application rate, %C, C:N, and a date offset by calling the bundled samplers in `PEcAn.data.land::sample_ca_compost_*`. `03-write-parquet.R` computes `org_c_kg_m2 = app_rate * 0.2241702 * (pct_c / 100)` and `org_n_kg_m2 = org_c / cn_ratio`, partitions parcels into batches, and writes one parquet per batch named `_.parquet`. Compression is ZSTD when available, snappy otherwise. +01 builds a design table and tags each PFT as annual (row, hay, rice) or perennial (woody). 02 runs Bernoulli gate, picks a material per fired row from `ca_compost_amendment` (constrained to CalRecycle classes that fit PFT family), draws app rate and C:N from that material's envelope. 03 does unit conversion, splits N into mineral and organic using PAN (clamped to zero for high C:N materials that immobilize), and writes parcel range sharded parquet. + +`check-result.R` reads output back and prints a summary. + +# Output columns + +- `parcel_id`, `ens_id` (`ens_NNN`, shared with fertilization workflow), `date` +- `nh4_n_kg_m2`: PAN release fraction. Zero when `pan_pct` is negative (high C:N materials immobilize N instead of releasing it) +- `no3_n_kg_m2`: zero. Compost releases ammonium, nitrification happens later in soil pool +- `org_c_kg_m2`: total organic C from application. Does not depend on PAN; C is C regardless of N fate +- `org_n_kg_m2`: leftover organic N after PAN split +- `crop_code`: passthrough +- `material`: diagnostic column for `check-result.R` only. Cleaner strips it before union so it never reaches SIPNET + +Downstream, `workflows/preprocess-event-parquet/01c-clean-fertilization.R` unions these with synthetic N rows from fertilization workflow and writes one `fertilization.parquet`. SIPNET handles both under same FERT event type. + +# Compost timing -Output lands at `/projectnb/dietzelab/ccmmf/usr/akash/event_files/ncc/`. Intermediate `.rds` files live under `_staging/` next to the output. `check-result.R` opens the dataset and prints schema, shard count, rows per ensemble member, rows per material, rows per year, a sample of rows, and the org_c / org_n ranges. `push-to-carb.sh` is the eventual `aws s3 sync` to `s3://carb/management/ncc/v1.0/`. +Anchor is MSLSP green up date (`mslsp_OGI`). For annuals that's close to but not exactly planting date (emergence lags planting by a few days to weeks). For perennials it's bud break. Date offset is uniform within a family window: -The output columns are `parcel_id` (int), `ens_id` (string `ncc_ens_NNN`), `date` (sampled within the per family calendar window), `material` (one of the CalRecycle classes: green, food, ag, yard), `org_c_kg_m2`, `org_n_kg_m2`, `nh4_n_kg_m2` (zero, compost N mineralizes through the soil pool in SIPNET), `no3_n_kg_m2` (zero), `ncc_subtype` (`"compost"`), `crop_code`, `PFT`. Only cycle ensemble rows where the probability draw fires get emitted, so the row count varies by `p_apply_default` in `config.yml`. +- annuals (row, hay, rice): 14 to 180 days before green up +- perennials (woody): 30 to 210 days before green up -The bundled sampling distributions and per family lookups live in `PEcAn.data.land` (tables `ca_compost_pct_c_distribution`, `ca_compost_cn_distribution`, `ca_compost_app_rate_envelope`, `ca_compost_calendar_window`, `ca_compost_material_whitelist`) so the workflow only carries the probability knob in its own config. Defaults: %C 15 to 30 from the CDFA and CalRecycle characterizations of CA finished compost, C:N 8 to 25 from the CDFA HSP white paper, rates from CDFA HSP Table 2 plus NRCS CPS 336, materials from 14 CCR section 17852, calendar windows from UC ANR practice guidance. +These are working assumptions consistent with broad direction in literature (fall application for perennials, pre plant for annuals). From bb1147f94b44136af9abf6c4a8276fe5326b4138 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:26:54 -0400 Subject: [PATCH 33/37] 01c unions fert and ncc into one fertilization parquet --- .../01c-clean-fertilization.R | 102 ++++++++++++++---- 1 file changed, 80 insertions(+), 22 deletions(-) diff --git a/workflows/preprocess-event-parquet/01c-clean-fertilization.R b/workflows/preprocess-event-parquet/01c-clean-fertilization.R index d496133855..2ec83a9afa 100644 --- a/workflows/preprocess-event-parquet/01c-clean-fertilization.R +++ b/workflows/preprocess-event-parquet/01c-clean-fertilization.R @@ -1,42 +1,100 @@ #!/usr/bin/env Rscript -fert_path <- "/projectnb/dietzelab/ccmmf/usr/akash/event_files/fertilization" +# both workflows feed this cleaner. synthetic N comes from fertilization- +# statewide as nh4 + no3 rows; compost comes from ncc-statewide as org_c + +# org_n + (optionally) nh4 from the PAN fraction. they share the ens_NNN +# naming so an ensemble member can carry both kinds of rows under one +# event_type = fertilization in the json output +# +# input paths default to where the two workflows ship their parquet +# shards but can be overridden with env vars FERT_RAW_DIR and NCC_RAW_DIR +# so a different output_dir in either workflow config does not require +# editing this file +fert_path <- Sys.getenv("FERT_RAW_DIR", + unset = "/projectnb/dietzelab/ccmmf/usr/akash/event_files/fertilization") +ncc_path <- Sys.getenv("NCC_RAW_DIR", + unset = "/projectnb/dietzelab/ccmmf/usr/akash/event_files/ncc") outdir <- "_output" dir.create(outdir, showWarnings = FALSE, recursive = TRUE) dbdir <- file.path(Sys.getenv("TMPDIR", "/tmp"), "temp.duckdb") conn <- DBI::dbConnect(duckdb::duckdb(dbdir = dbdir)) +on.exit({ + DBI::dbDisconnect(conn, shutdown = TRUE) + unlink(dbdir) +}, add = TRUE) + +# detect what is actually on disk so cleaner is runnable in three +# cases: fert only, ncc only, both +has_fert <- length(Sys.glob(file.path(fert_path, "*.parquet"))) > 0 +has_ncc <- length(Sys.glob(file.path(ncc_path, "*.parquet"))) > 0 + +if (!has_fert && !has_ncc) { + stop("No fertilization or compost parquet shards found at ", + fert_path, " or ", ncc_path) +} + +fert_select <- glue::glue(" + SELECT + CAST (parcel_id AS INTEGER) AS site_id, + CAST (ens_id AS event_ens_id_enum) AS event_member_id, + date, + CAST (nh4_n_kg_m2 AS DECIMAL(10, 8)) AS nh4_n_kg_m2, + CAST (no3_n_kg_m2 AS DECIMAL(10, 8)) AS no3_n_kg_m2, + CAST (org_c_kg_m2 AS DECIMAL(10, 8)) AS org_c_kg_m2, + CAST (org_n_kg_m2 AS DECIMAL(10, 8)) AS org_n_kg_m2, + crop_code + FROM read_parquet('{fert_path}/*.parquet') +") + +ncc_select <- glue::glue(" + SELECT + CAST (parcel_id AS INTEGER) AS site_id, + CAST (ens_id AS event_ens_id_enum) AS event_member_id, + date, + CAST (nh4_n_kg_m2 AS DECIMAL(10, 8)) AS nh4_n_kg_m2, + CAST (no3_n_kg_m2 AS DECIMAL(10, 8)) AS no3_n_kg_m2, + CAST (org_c_kg_m2 AS DECIMAL(10, 8)) AS org_c_kg_m2, + CAST (org_n_kg_m2 AS DECIMAL(10, 8)) AS org_n_kg_m2, + crop_code + FROM read_parquet('{ncc_path}/*.parquet') +") + +ens_source <- if (has_fert && has_ncc) { + glue::glue(" + SELECT DISTINCT ens_id FROM read_parquet('{fert_path}/*.parquet') + UNION + SELECT DISTINCT ens_id FROM read_parquet('{ncc_path}/*.parquet') + ") +} else if (has_fert) { + glue::glue("SELECT DISTINCT ens_id FROM read_parquet('{fert_path}/*.parquet')") +} else { + glue::glue("SELECT DISTINCT ens_id FROM read_parquet('{ncc_path}/*.parquet')") +} + +union_query <- if (has_fert && has_ncc) { + paste(fert_select, "UNION ALL", ncc_select) +} else if (has_fert) { + fert_select +} else { + ncc_select +} # cast ensemble id to an enum to accelerate and reduce the memory pressure -# of the sort. +# of the sort DBI::dbExecute(conn, glue::glue(" - CREATE OR REPLACE TYPE fert_ens_id_enum AS ENUM ( - SELECT DISTINCT ens_id FROM read_parquet('{fert_path}/*.parquet') - ) - " -)) + CREATE OR REPLACE TYPE event_ens_id_enum AS ENUM ({ens_source}) +")) # sort and write the partitioned parquet output. rename parcel_id to site_id # and ens_id to event_member_id to match the schema the json converter -# consumes. +# consumes DBI::dbExecute(conn, glue::glue(" COPY ( - SELECT - CAST (parcel_id AS INTEGER) AS site_id, - CAST (ens_id AS fert_ens_id_enum) AS event_member_id, - date, - CAST (nh4_n_kg_m2 AS DECIMAL(10, 8)) AS nh4_n_kg_m2, - CAST (no3_n_kg_m2 AS DECIMAL(10, 8)) AS no3_n_kg_m2, - CAST (org_c_kg_m2 AS DECIMAL(10, 8)) AS org_c_kg_m2, - CAST (org_n_kg_m2 AS DECIMAL(10, 8)) AS org_n_kg_m2, - fert_subtype, - crop_code, - PFT - FROM read_parquet('{fert_path}/*.parquet') + {union_query} ORDER BY event_member_id, site_id, date ) TO '{outdir}/fertilization.parquet' (FORMAT PARQUET, COMPRESSION ZSTD, OVERWRITE, PARTITION_BY (event_member_id)) - " -)) +")) From 7500d4786add3da427cb18a75a375a81b3d2e17e Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:27:28 -0400 Subject: [PATCH 34/37] drop 01d since 01c covers compost too --- .../preprocess-event-parquet/01d-clean-ncc.R | 43 ------------------- 1 file changed, 43 deletions(-) delete mode 100644 workflows/preprocess-event-parquet/01d-clean-ncc.R diff --git a/workflows/preprocess-event-parquet/01d-clean-ncc.R b/workflows/preprocess-event-parquet/01d-clean-ncc.R deleted file mode 100644 index 45698da8de..0000000000 --- a/workflows/preprocess-event-parquet/01d-clean-ncc.R +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env Rscript - -ncc_path <- "/projectnb/dietzelab/ccmmf/usr/akash/event_files/ncc" - -outdir <- "_output" -dir.create(outdir, showWarnings = FALSE, recursive = TRUE) - -dbdir <- file.path(Sys.getenv("TMPDIR", "/tmp"), "temp.duckdb") -conn <- DBI::dbConnect(duckdb::duckdb(dbdir = dbdir)) - -# cast ensemble id to an enum to accelerate and reduce the memory pressure -# of the sort. -DBI::dbExecute(conn, glue::glue(" - CREATE OR REPLACE TYPE ncc_ens_id_enum AS ENUM ( - SELECT DISTINCT ens_id FROM read_parquet('{ncc_path}/*.parquet') - ) - " -)) - -# sort and write the partitioned parquet output. rename parcel_id to site_id -# and ens_id to event_member_id to match the schema the json converter -# consumes. -DBI::dbExecute(conn, glue::glue(" - COPY ( - SELECT - CAST (parcel_id AS INTEGER) AS site_id, - CAST (ens_id AS ncc_ens_id_enum) AS event_member_id, - date, - material, - CAST (org_c_kg_m2 AS DECIMAL(10, 8)) AS org_c_kg_m2, - CAST (org_n_kg_m2 AS DECIMAL(10, 8)) AS org_n_kg_m2, - CAST (nh4_n_kg_m2 AS DECIMAL(10, 8)) AS nh4_n_kg_m2, - CAST (no3_n_kg_m2 AS DECIMAL(10, 8)) AS no3_n_kg_m2, - ncc_subtype, - crop_code, - PFT - FROM read_parquet('{ncc_path}/*.parquet') - ORDER BY event_member_id, site_id, date - ) TO - '{outdir}/ncc.parquet' - (FORMAT PARQUET, COMPRESSION ZSTD, OVERWRITE, PARTITION_BY (event_member_id)) - " -)) From e24b371a98990b86266b92577b817ba15ab3781f Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:27:28 -0400 Subject: [PATCH 35/37] note 01c now does both --- workflows/preprocess-event-parquet/README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/workflows/preprocess-event-parquet/README.md b/workflows/preprocess-event-parquet/README.md index b8e3aa0775..f1e16e615d 100644 --- a/workflows/preprocess-event-parquet/README.md +++ b/workflows/preprocess-event-parquet/README.md @@ -4,6 +4,5 @@ This converts the raw outputs of the management/monitoring CCMMF pipeline into p - `01a-clean-irrigation.R` --- Preprocess irrigation. This is handled separately because the raw data are really large (600M rows). - `01b-clean-other-events.R` --- Preprocess remaining events. These are done together because they are much smaller. -- `01c-clean-fertilization.R` --- Preprocess synthetic N fertilization (`workflows/fertilization-statewide/`). -- `01d-clean-ncc.R` --- Preprocess ncc (compost) events (`workflows/ncc-statewide/`). +- `01c-clean-fertilization.R` --- Preprocess fertilization events. Reads both `workflows/fertilization-statewide/` (synthetic N) and `workflows/ncc-statewide/` (compost) raw outputs and writes them as one `fertilization.parquet` since SIPNET's FERT handler accumulates the org and mineral channels on same event_type - `02-events-to-json.R` --- Example of running From a97b22bb3128316a3d3a23b256c09d4d266b453e Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:27:29 -0400 Subject: [PATCH 36/37] changelog entry for the rework --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index efecb01ec7..6363494af5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,7 +24,7 @@ For more information about this file see also [Keep a Changelog](http://keepacha - PEcAn.SIPNET gains support for SIPNET v2, whose features includes management events, nitrogen cycle tracking, explicit N2O and methane fluxes, runtime setting of feature flags, and changes to the parameter set (now 73 parameters). SIPNET v1 is still fully supported, but workarounds for bugs in the legacy `sipnet.unk` version have been removed. - Added `PEcAn.data.land::to_co2e()` for converting SOC change, CH4, and N2O to CO2-equivalent emissions using IPCC Global Warming Potential values. - Added `PEcAn.data.land::event_parquet_to_json` for generating PEcAn `event.json` files from well-formatted event parquet files, with support for ensembles of events. -- Added statewide fertilization and ncc compost event workflows that emit ensemble parquet shards for California ag parcels. +- Added statewide synthetic fertilization and compost amendment event workflows for CA ag parcels. Outputs share an ensemble naming so a downstream cleaner unions them into one fertilization event type for SIPNET. ### Fixed - Fixed broken pecanproject.github.io, pecan.gitbooks.io, and other outdated documentation links across book_source, tutorials, models, modules, web, and shiny files (#3710). From 6b7573ced6af5e55859bab6219200bf8108f1a04 Mon Sep 17 00:00:00 2001 From: divne7022 Date: Sun, 24 May 2026 13:51:18 -0400 Subject: [PATCH 37/37] drop ghost small profile from runner help text --- workflows/fertilization-statewide/run-statewide.sh | 2 +- workflows/ncc-statewide/run-statewide.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/workflows/fertilization-statewide/run-statewide.sh b/workflows/fertilization-statewide/run-statewide.sh index b01d2906d9..2c56685aae 100755 --- a/workflows/fertilization-statewide/run-statewide.sh +++ b/workflows/fertilization-statewide/run-statewide.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash # # runs the fertilization statewide pipeline. FERT_PROJECT picks a profile -# from config.yml (default, small, medium, all). +# from config.yml (default, medium, all). set -euo pipefail diff --git a/workflows/ncc-statewide/run-statewide.sh b/workflows/ncc-statewide/run-statewide.sh index 6e9f16d652..ea9b20d02b 100755 --- a/workflows/ncc-statewide/run-statewide.sh +++ b/workflows/ncc-statewide/run-statewide.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash # # runs the ncc (compost) statewide pipeline. NCC_PROJECT picks a profile -# from config.yml (default, small, medium, all). +# from config.yml (default, medium, all). set -euo pipefail