diff --git a/CHANGELOG.md b/CHANGELOG.md index e966311292..6363494af5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ For more information about this file see also [Keep a Changelog](http://keepacha - PEcAn.SIPNET gains support for SIPNET v2, whose features includes management events, nitrogen cycle tracking, explicit N2O and methane fluxes, runtime setting of feature flags, and changes to the parameter set (now 73 parameters). SIPNET v1 is still fully supported, but workarounds for bugs in the legacy `sipnet.unk` version have been removed. - Added `PEcAn.data.land::to_co2e()` for converting SOC change, CH4, and N2O to CO2-equivalent emissions using IPCC Global Warming Potential values. - Added `PEcAn.data.land::event_parquet_to_json` for generating PEcAn `event.json` files from well-formatted event parquet files, with support for ensembles of events. +- Added statewide synthetic fertilization and compost amendment event workflows for CA ag parcels. Outputs share an ensemble naming so a downstream cleaner unions them into one fertilization event type for SIPNET. ### Fixed - Fixed broken pecanproject.github.io, pecan.gitbooks.io, and other outdated documentation links across book_source, tutorials, models, modules, web, and shiny files (#3710). diff --git a/workflows/fertilization-statewide/01-build-parcel-design.R b/workflows/fertilization-statewide/01-build-parcel-design.R new file mode 100644 index 0000000000..4ee9962398 --- /dev/null +++ b/workflows/fertilization-statewide/01-build-parcel-design.R @@ -0,0 +1,196 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/fertilization-statewide/config.yml", + config = Sys.getenv("FERT_PROJECT", "default")) + +set.seed(config[["seed"]]) + +staging_dir <- file.path(config[["output_dir"]], "_staging") +dir.create(staging_dir, showWarnings = FALSE, recursive = TRUE) + +options(arrow.unsafe_metadata = TRUE) + +# strip parenthetical annotations, conjunctions, and punctuation so the +# LandIQ, FREP, and UC ANR strings can be matched on a common key. +normalize_name <- function(s) { + s |> tolower() |> + stringr::str_replace_all("\\(.*?\\)", "") |> + stringr::str_replace_all("grouped for remote sensing only", "") |> + stringr::str_replace_all("\\bor\\b", "") |> + stringr::str_replace_all("\\band\\b", "") |> + stringr::str_replace_all("&", "") |> + stringr::str_replace_all("[[:punct:]]", " ") |> + stringr::str_squish() +} + +# split slash separated UC ANR or FREP candidates and reattach the shared +# prefix when a part is missing it. e.g. "Plums, dried / fresh" expands to +# c("Plums, dried", "Plums, fresh"). only candidates that match the known +# crop list are kept. +parse_candidates <- function(s, known) { + if (is.na(s) || nchar(s) == 0) return(character(0)) + parts <- stringr::str_split(s, " ?/ ?")[[1]] |> stringr::str_squish() + prefix <- if (stringr::str_detect(parts[1], ",")) { + stringr::str_extract(parts[1], "^[^,]+, ") + } else "" + result <- character() + for (p in parts) { + if (tolower(p) %in% tolower(known)) { + result <- c(result, p) + } else if (nchar(prefix) > 0) { + combined <- paste0(prefix, p) + if (tolower(combined) %in% tolower(known)) { + result <- c(result, combined) + } + } + } + unique(result) +} + +## load lookups +PEcAn.logger::logger.info("Loading bundled lookups and crosswalk") +crosswalk <- readr::read_tsv(config[["crosswalk_path"]], show_col_types = FALSE) |> + dplyr::select(landiq = "LandIQ Name", frep = "FREP Name", ucanr = "UC ANR") +ca_rates <- PEcAn.data.land::ca_n_application_rate +code_map <- PEcAn.data.land::landiq_crop_mapping_codes + +## crosswalk lookup +# resolve each LandIQ CLASS+SUBCLASS code to an N rate envelope by walking +# the crosswalk to UC ANR or FREP names and matching against the bundled +# rate table. +known_crops <- ca_rates$crop +xw_norm <- crosswalk |> + dplyr::mutate( + key = normalize_name(.data$landiq), + candidates = dplyr::coalesce(.data$ucanr, .data$frep) + ) + +code_lookup <- code_map |> + dplyr::mutate( + code = paste0(.data$CLASS, .data$SUBCLASS), + key = normalize_name(.data$subclass_name) + ) |> + dplyr::left_join(xw_norm |> dplyr::select("key", "candidates"), by = "key") |> + dplyr::rowwise() |> + dplyr::mutate(matched_crops = list(parse_candidates(.data$candidates, known_crops))) |> + dplyr::ungroup() |> + dplyr::filter(lengths(.data$matched_crops) > 0) |> + dplyr::mutate(rates = lapply(.data$matched_crops, function(cc) { + ca_rates |> + dplyr::filter(.data$crop %in% cc) |> + dplyr::summarize( + min_n_lbs_acre = min(.data$min_n_lbs_acre, na.rm = TRUE), + max_n_lbs_acre = max(.data$max_n_lbs_acre, na.rm = TRUE) + ) + })) |> + tidyr::unnest("rates") |> + dplyr::select("code", "min_n_lbs_acre", "max_n_lbs_acre") + +PEcAn.logger::logger.info(sprintf("Resolved %d LandIQ codes via crosswalk", nrow(code_lookup))) + +## load matched product +# parcel_id is stored as a string in the source so cast to int on the way +# in. filter keeps matched cycles with non NA crop class plus a valid EVI +# signal. +read_matched_year <- function(year) { + fn <- file.path(config[["matched_dir"]], + sprintf("assigned_year=%d.parquet", year)) + if (!file.exists(fn)) { + PEcAn.logger::logger.warn("Missing matched file for year ", year, ": ", fn) + return(NULL) + } + arrow::read_parquet(fn) |> + dplyr::filter(.data$assigned_by == "matched", + !is.na(.data$landiq_CLASS), + !is.na(.data$landiq_SUBCLASS), + !is.na(.data$landiq_PFT), + !is.na(.data$mslsp_EVImax), + !is.na(.data$mslsp_EVIamp)) |> + dplyr::transmute( + parcel_id = as.integer(.data$parcel_id), + year = as.integer(.data$year), + season = as.integer(.data$season), + date = as.Date(.data$mslsp_OGI), + code = paste0(.data$landiq_CLASS, .data$landiq_SUBCLASS) + ) +} + +PEcAn.logger::logger.info("Reading matched LandIQ MSLSP for years: ", + paste(config[["years"]], collapse = ", ")) +plant <- purrr::map_dfr(config[["years"]], read_matched_year) +PEcAn.logger::logger.info(sprintf("Loaded %d cycles across %d parcels", + nrow(plant), dplyr::n_distinct(plant$parcel_id))) + +## subsample +# parcel set is sampled once and applied to all years so the same parcels +# appear in every year. +n_parcels <- config[["n_parcels"]] +if (!is.null(n_parcels) && n_parcels < dplyr::n_distinct(plant$parcel_id)) { + picked <- plant |> + dplyr::distinct(.data$parcel_id) |> + dplyr::slice_sample(n = n_parcels) |> + dplyr::pull(.data$parcel_id) + plant <- plant |> dplyr::filter(.data$parcel_id %in% picked) + PEcAn.logger::logger.info(sprintf("Sampled %d parcels (n_parcels=%d)", + length(picked), n_parcels)) +} + +## attach rates +# cycles with no resolvable rate are dropped with an audit log. no PFT +# median fallback. +design <- plant |> + dplyr::left_join(code_lookup, by = "code") |> + dplyr::mutate( + rate_source = dplyr::case_when( + is.na(.data$min_n_lbs_acre) ~ "skip_no_rate", + .data$min_n_lbs_acre == 0 & .data$max_n_lbs_acre == 0 ~ "skip_zero_envelope", + TRUE ~ "crosswalk" + ) + ) + +# log both kinds of drops separately so it's clear which are missing data +# vs which are intentional zero envelopes (e.g. Alfalfa, legumes that fix +# their own N and carry a cited 0 to 0 rate) +unresolved <- design |> dplyr::filter(.data$rate_source == "skip_no_rate") +if (nrow(unresolved) > 0) { + by_code <- unresolved |> + dplyr::count(.data$code, name = "n_events", sort = TRUE) |> + head(15) + PEcAn.logger::logger.warn(sprintf( + "Dropping %d cycles across %d codes with no resolvable N rate. Top offenders:", + nrow(unresolved), dplyr::n_distinct(unresolved$code) + )) + for (i in seq_len(nrow(by_code))) { + PEcAn.logger::logger.warn(sprintf(" %s: %d cycles", + by_code$code[i], by_code$n_events[i])) + } +} + +zero_env <- design |> dplyr::filter(.data$rate_source == "skip_zero_envelope") +if (nrow(zero_env) > 0) { + by_code <- zero_env |> + dplyr::count(.data$code, name = "n_events", sort = TRUE) |> + head(15) + PEcAn.logger::logger.info(sprintf( + "Dropping %d cycles across %d codes with a cited 0 to 0 N rate (no synthetic application). Top:", + nrow(zero_env), dplyr::n_distinct(zero_env$code) + )) + for (i in seq_len(nrow(by_code))) { + PEcAn.logger::logger.info(sprintf(" %s: %d cycles", + by_code$code[i], by_code$n_events[i])) + } +} + +design <- design |> + dplyr::filter(.data$rate_source == "crosswalk") |> + dplyr::select("parcel_id", "year", "season", "date", "code", + "min_n_lbs_acre", "max_n_lbs_acre") + +PEcAn.logger::logger.info(sprintf("Design table: %d events, %d parcels, %d years", + nrow(design), + dplyr::n_distinct(design$parcel_id), + dplyr::n_distinct(design$year))) + +staging_file <- file.path(staging_dir, "_staging_01_design.rds") +saveRDS(design, staging_file) +PEcAn.logger::logger.info("Wrote ", staging_file) diff --git a/workflows/fertilization-statewide/02-sample-n-rates.R b/workflows/fertilization-statewide/02-sample-n-rates.R new file mode 100644 index 0000000000..c42c6bc249 --- /dev/null +++ b/workflows/fertilization-statewide/02-sample-n-rates.R @@ -0,0 +1,44 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/fertilization-statewide/config.yml", + config = Sys.getenv("FERT_PROJECT", "default")) + +set.seed(config[["seed"]]) + +staging_dir <- file.path(config[["output_dir"]], "_staging") +design_file <- file.path(staging_dir, "_staging_01_design.rds") +if (!file.exists(design_file)) { + PEcAn.logger::logger.severe( + "Stage 01 output not found: ", design_file, + ". Run 01-build-parcel-design.R first." + ) +} + +PEcAn.logger::logger.info("Reading design from ", design_file) +design <- readRDS(design_file) + +n_ensemble <- config[["n_ensemble"]] +PEcAn.logger::logger.info(sprintf( + "Sampling %d ensemble members across %d design rows", + n_ensemble, nrow(design) +)) + +# expand each design row to one row per ensemble member and draw the annual +# N rate uniformly from each cycle's min/max envelope. ens_id is shared with +# ncc workflow so a given ensemble member can carry both synthetic and +# compost rows under one fertilization event_type in json output +events <- design |> + tidyr::crossing(ensemble_member = seq_len(n_ensemble)) |> + dplyr::mutate( + annual_n_lb_acre = stats::runif(dplyr::n(), min = .data$min_n_lbs_acre, max = .data$max_n_lbs_acre), + ens_id = sprintf("ens_%03d", .data$ensemble_member) + ) + +PEcAn.logger::logger.info(sprintf( + "Sampled %d events. annual N range: %.2f to %.2f lb/acre", + nrow(events), min(events$annual_n_lb_acre), max(events$annual_n_lb_acre) +)) + +staging_file <- file.path(staging_dir, "_staging_02_events.rds") +saveRDS(events, staging_file) +PEcAn.logger::logger.info("Wrote ", staging_file) diff --git a/workflows/fertilization-statewide/03-write-parquet.R b/workflows/fertilization-statewide/03-write-parquet.R new file mode 100644 index 0000000000..17eee4628e --- /dev/null +++ b/workflows/fertilization-statewide/03-write-parquet.R @@ -0,0 +1,93 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/fertilization-statewide/config.yml", + config = Sys.getenv("FERT_PROJECT", "default")) + +staging_dir <- file.path(config[["output_dir"]], "_staging") +events_file <- file.path(staging_dir, "_staging_02_events.rds") +if (!file.exists(events_file)) { + PEcAn.logger::logger.severe( + "Stage 02 output not found: ", events_file, + ". Run 02-sample-n-rates.R first." + ) +} + +PEcAn.logger::logger.info("Reading events from ", events_file) +events <- readRDS(events_file) + +# synthetic N is split between nh4 and no3 using nh4_fraction (share going +# to nh4, rest goes to no3). org_c and org_n stay zero here; compost rows +# come in via ncc workflow and end up unioned into same fertilization +# parquet by cleaner +nh4_frac <- config[["nh4_fraction"]] + +out <- events |> + dplyr::mutate( + total_n_kg_m2 = PEcAn.utils::ud_convert(.data$annual_n_lb_acre, "lb/acre", "kg/m^2"), + nh4_n_kg_m2 = .data$total_n_kg_m2 * .env$nh4_frac, + no3_n_kg_m2 = .data$total_n_kg_m2 * (1 - .env$nh4_frac), + org_c_kg_m2 = 0, + org_n_kg_m2 = 0 + ) |> + dplyr::transmute( + parcel_id = as.integer(.data$parcel_id), + ens_id = .data$ens_id, + date = as.Date(.data$date), + .data$nh4_n_kg_m2, + .data$no3_n_kg_m2, + .data$org_c_kg_m2, + .data$org_n_kg_m2, + crop_code = .data$code + ) + +out_path <- config[["output_dir"]] +dir.create(out_path, showWarnings = FALSE, recursive = TRUE) + +## clean prior shards +existing <- list.files(out_path, pattern = "\\.parquet$", full.names = TRUE) +if (length(existing) > 0) { + PEcAn.logger::logger.info(sprintf("Removing %d existing parquet shards", length(existing))) + unlink(existing) +} + +# partition by parcel_id range and write one parquet per batch named +# _.parquet. +all_parcels <- sort(unique(out[["parcel_id"]])) +batch_size <- config[["batch_size"]] +n_batches <- ceiling(length(all_parcels) / batch_size) +batches <- split(all_parcels, ceiling(seq_along(all_parcels) / batch_size)) + +PEcAn.logger::logger.info(sprintf( + "Writing %d rows across %d parcel batches (batch_size=%d) to %s", + nrow(out), n_batches, batch_size, out_path +)) + +# prefer ZSTD; fall back to snappy when zstd is not in the local arrow +# build. +parquet_codec <- if (arrow::codec_is_available("zstd")) "ZSTD" else "SNAPPY" +PEcAn.logger::logger.info("Parquet compression codec: ", parquet_codec) + +write_batch <- function(pids) { + shard <- out |> dplyr::filter(.data$parcel_id %in% pids) + pid_min <- min(shard[["parcel_id"]]) + pid_max <- max(shard[["parcel_id"]]) + fn <- file.path(out_path, sprintf("%d_%d.parquet", pid_min, pid_max)) + arrow::write_parquet(shard, fn, compression = parquet_codec) + fn +} + +workers <- as.integer(config[["workers"]]) +if (workers > 1) { + PEcAn.logger::logger.info(sprintf("Using mclapply with %d workers", workers)) + written <- parallel::mclapply(batches, write_batch, mc.cores = workers) +} else { + written <- lapply(batches, write_batch) +} + +PEcAn.logger::logger.info(sprintf( + "Done. wrote %d shards, %d total rows, parcels=%d, years=%d, ensemble=%d", + length(written), nrow(out), + dplyr::n_distinct(out[["parcel_id"]]), + dplyr::n_distinct(format(out[["date"]], "%Y")), + dplyr::n_distinct(out[["ens_id"]]) +)) diff --git a/workflows/fertilization-statewide/README.md b/workflows/fertilization-statewide/README.md new file mode 100644 index 0000000000..6ccd5121d3 --- /dev/null +++ b/workflows/fertilization-statewide/README.md @@ -0,0 +1,29 @@ +# Statewide fertilization events workflow + +Builds an ensemble of synthetic N fertilization events for every California ag parcel in the LandIQ MSLSP matched product, for 2016 and 2018 to 2023. 2017 is skipped because LandIQ did not run a statewide survey that year. N rate envelopes come from `PEcAn.data.land::ca_n_application_rate`. + +# Config + +Everything tweakable lives in `config.yml`. Most setups only need to look at: + +- `matched_dir`: the LandIQ MSLSP matched product directory +- `crosswalk_path`: the LandIQ to FREP to UC ANR crop name crosswalk TSV +- `output_dir`: where the parquet shards go +- `n_parcels`, `n_ensemble`, `batch_size`, `workers`: scale knobs per profile +- `nh4_fraction`: share of total synthetic N going to ammonium; the rest goes to nitrate (default 0.5 for a 50/50 split) + +# Run + +Pick a profile (`default`, `medium`, `all`) and run from the PEcAn project root: + +``` +FERT_PROJECT=default bash workflows/fertilization-statewide/run-statewide.sh +``` + +`default` is 1000 random parcels, single threaded. `all` is full statewide (~660k parcels), eight workers. The three R scripts chain together: 01 builds the design, 02 samples rates, 03 writes parquet. `check-result.R` reads the output back and prints a summary. + +# Output + +Parcel range sharded parquet at `/`. Columns: `parcel_id`, `ens_id` (`ens_NNN`, shared with ncc workflow), `date`, `nh4_n_kg_m2`, `no3_n_kg_m2`, `org_c_kg_m2` (zero), `org_n_kg_m2` (zero), `crop_code`. + +Downstream, `workflows/preprocess-event-parquet/01c-clean-fertilization.R` renames `parcel_id` to `site_id` and `ens_id` to `event_member_id`, unions compost rows from `workflows/ncc-statewide/`, and writes JSON converter input. diff --git a/workflows/fertilization-statewide/check-result.R b/workflows/fertilization-statewide/check-result.R new file mode 100644 index 0000000000..e7b33238ad --- /dev/null +++ b/workflows/fertilization-statewide/check-result.R @@ -0,0 +1,59 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/fertilization-statewide/config.yml", + config = Sys.getenv("FERT_PROJECT", "default")) + +options(arrow.unsafe_metadata = TRUE) + +out_path <- config[["output_dir"]] +if (!dir.exists(out_path)) { + PEcAn.logger::logger.severe("Output not found: ", out_path) +} + +shards <- list.files(out_path, pattern = "\\.parquet$", full.names = TRUE) +if (length(shards) == 0) { + PEcAn.logger::logger.severe("No parquet shards under: ", out_path) +} +ds <- arrow::open_dataset(shards) + +# schema +print(ds$schema) + +PEcAn.logger::logger.info(sprintf( + "%d shards: %s ... %s", + length(shards), basename(shards[1]), basename(shards[length(shards)]))) + +tot <- ds |> dplyr::count() |> dplyr::collect() +PEcAn.logger::logger.info(sprintf("total rows: %d", tot[["n"]])) + +# rows per ensemble member +print( + ds |> + dplyr::count(.data$ens_id) |> + dplyr::collect() |> + as.data.frame() +) + +# rows per year +print( + ds |> + dplyr::mutate(year = lubridate::year(.data$date)) |> + dplyr::count(.data$year) |> + dplyr::collect() |> + dplyr::arrange(.data$year) |> + as.data.frame() +) + +# first 5 rows +print(ds |> dplyr::slice_head(n = 5) |> dplyr::collect() |> as.data.frame()) + +# nh4 and no3 total range (kg N/m^2) +stats <- ds |> + dplyr::mutate(total_n = .data$nh4_n_kg_m2 + .data$no3_n_kg_m2) |> + dplyr::summarize( + min_total_n = min(.data$total_n, na.rm = TRUE), + max_total_n = max(.data$total_n, na.rm = TRUE), + mean_total_n = mean(.data$total_n, na.rm = TRUE) + ) |> + dplyr::collect() +print(stats |> as.data.frame()) diff --git a/workflows/fertilization-statewide/config.yml b/workflows/fertilization-statewide/config.yml new file mode 100644 index 0000000000..0a37d286b9 --- /dev/null +++ b/workflows/fertilization-statewide/config.yml @@ -0,0 +1,24 @@ +default: + n_parcels: 1000 + n_ensemble: 20 + # fraction of total synthetic N going to nh4. set to 0.5 for a 50/50 + # nh4/no3 split. value 0 makes it all nitrate, value 1 makes it all + # ammonium. + nh4_fraction: 0.5 + seed: 42 + years: [2016, 2018, 2019, 2020, 2021, 2022, 2023] + matched_dir: /projectnb/dietzelab/ccmmf/management/phenology/matched_landiq_mslsp_v4.1 + crosswalk_path: /projectnb/dietzelab/ccmmf/management/fertilization/CCMMF_Fertilization_Crop_types.tsv + output_dir: /projectnb/dietzelab/ccmmf/usr/akash/event_files/fertilization + batch_size: 100 + workers: 1 + +medium: + n_parcels: 10000 + batch_size: 1000 + workers: 4 + +all: + n_parcels: null # null means all parcels in the matched product + batch_size: 5000 + workers: 8 diff --git a/workflows/fertilization-statewide/push-to-carb.sh b/workflows/fertilization-statewide/push-to-carb.sh new file mode 100755 index 0000000000..5298e90433 --- /dev/null +++ b/workflows/fertilization-statewide/push-to-carb.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +aws s3 sync --profile ccmmf \ + /projectnb/dietzelab/ccmmf/management/fertilization_event_files/ \ + s3://carb/management/fertilization/v1.0/ diff --git a/workflows/fertilization-statewide/run-statewide.sh b/workflows/fertilization-statewide/run-statewide.sh new file mode 100755 index 0000000000..2c56685aae --- /dev/null +++ b/workflows/fertilization-statewide/run-statewide.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +# +# runs the fertilization statewide pipeline. FERT_PROJECT picks a profile +# from config.yml (default, medium, all). + +set -euo pipefail + +# run from PEcAn repo root regardless of caller cwd +cd "$(dirname "$0")/../.." + +FERT_PROJECT="${FERT_PROJECT:-default}" +export FERT_PROJECT + +DIR="workflows/fertilization-statewide" + +Rscript "$DIR/01-build-parcel-design.R" +Rscript "$DIR/02-sample-n-rates.R" +Rscript "$DIR/03-write-parquet.R" diff --git a/workflows/ncc-statewide/01-build-parcel-design.R b/workflows/ncc-statewide/01-build-parcel-design.R new file mode 100644 index 0000000000..51867d51c5 --- /dev/null +++ b/workflows/ncc-statewide/01-build-parcel-design.R @@ -0,0 +1,94 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/ncc-statewide/config.yml", + config = Sys.getenv("NCC_PROJECT", "default")) + +set.seed(config[["seed"]]) + +staging_dir <- file.path(config[["output_dir"]], "_staging") +dir.create(staging_dir, showWarnings = FALSE, recursive = TRUE) + +options(arrow.unsafe_metadata = TRUE) + +# compost application rate is conditioned on PFT family rather than crop +# code so no rate crosswalk is needed at this stage. +read_matched_year <- function(year) { + fn <- file.path(config[["matched_dir"]], + sprintf("assigned_year=%d.parquet", year)) + if (!file.exists(fn)) { + PEcAn.logger::logger.warn("Missing matched file for year ", year, ": ", fn) + return(NULL) + } + arrow::read_parquet(fn) |> + dplyr::filter(.data$assigned_by == "matched", + !is.na(.data$landiq_CLASS), + !is.na(.data$landiq_SUBCLASS), + !is.na(.data$landiq_PFT), + !is.na(.data$mslsp_EVImax), + !is.na(.data$mslsp_EVIamp)) |> + dplyr::transmute( + parcel_id = as.integer(.data$parcel_id), + year = as.integer(.data$year), + season = as.integer(.data$season), + anchor = as.Date(.data$mslsp_OGI), + code = paste0(.data$landiq_CLASS, .data$landiq_SUBCLASS), + PFT = as.character(.data$landiq_PFT) + ) +} + +PEcAn.logger::logger.info("Reading matched LandIQ MSLSP for years: ", + paste(config[["years"]], collapse = ", ")) +plant <- purrr::map_dfr(config[["years"]], read_matched_year) +PEcAn.logger::logger.info(sprintf("Loaded %d cycles across %d parcels", + nrow(plant), dplyr::n_distinct(plant$parcel_id))) + +## subsample +n_parcels <- config[["n_parcels"]] +if (!is.null(n_parcels) && n_parcels < dplyr::n_distinct(plant$parcel_id)) { + picked <- plant |> + dplyr::distinct(.data$parcel_id) |> + dplyr::slice_sample(n = n_parcels) |> + dplyr::pull(.data$parcel_id) + plant <- plant |> dplyr::filter(.data$parcel_id %in% picked) + PEcAn.logger::logger.info(sprintf("Sampled %d parcels (n_parcels=%d)", + length(picked), n_parcels)) +} + +pft_family <- function(pft) { + dplyr::case_when( + pft %in% c("row", "hay", "rice") ~ "annual", + pft == "woody" ~ "perennial", + TRUE ~ NA_character_ + ) +} + +design <- plant |> + dplyr::mutate(pft_family = pft_family(.data$PFT)) + +unknown <- design |> dplyr::filter(is.na(.data$pft_family)) +if (nrow(unknown) > 0) { + by_pft <- unknown |> dplyr::count(.data$PFT, sort = TRUE) + PEcAn.logger::logger.warn(sprintf( + "Dropping %d cycles with unknown PFT family. Breakdown:", nrow(unknown))) + for (i in seq_len(nrow(by_pft))) { + PEcAn.logger::logger.warn(sprintf(" PFT=%s: %d cycles", + by_pft$PFT[i], by_pft$n[i])) + } +} + +design <- design |> + dplyr::filter(!is.na(.data$pft_family)) |> + dplyr::select("parcel_id", "year", "season", "anchor", + "code", "PFT", "pft_family") + +PEcAn.logger::logger.info(sprintf("Design table: %d cycles, %d parcels, %d years", + nrow(design), + dplyr::n_distinct(design$parcel_id), + dplyr::n_distinct(design$year))) +PEcAn.logger::logger.info(sprintf("PFT family split: annual=%d, perennial=%d", + sum(design$pft_family == "annual"), + sum(design$pft_family == "perennial"))) + +staging_file <- file.path(staging_dir, "_staging_01_design.rds") +saveRDS(design, staging_file) +PEcAn.logger::logger.info("Wrote ", staging_file) diff --git a/workflows/ncc-statewide/02-sample-ncc-events.R b/workflows/ncc-statewide/02-sample-ncc-events.R new file mode 100644 index 0000000000..b4c1382856 --- /dev/null +++ b/workflows/ncc-statewide/02-sample-ncc-events.R @@ -0,0 +1,103 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/ncc-statewide/config.yml", + config = Sys.getenv("NCC_PROJECT", "default")) + +set.seed(config[["seed"]]) + +staging_dir <- file.path(config[["output_dir"]], "_staging") +design_file <- file.path(staging_dir, "_staging_01_design.rds") +if (!file.exists(design_file)) { + PEcAn.logger::logger.severe( + "Stage 01 output not found: ", design_file, + ". Run 01-build-parcel-design.R first." + ) +} + +PEcAn.logger::logger.info("Reading design from ", design_file) +design <- readRDS(design_file) + +n_ensemble <- as.integer(config[["n_ensemble"]]) +p_apply <- as.numeric(config[["p_apply_default"]]) + +PEcAn.logger::logger.info(sprintf( + "Sampling %d ensemble members per cycle with p_apply=%.2f", n_ensemble, p_apply)) + +# bernoulli gate first so most parcel year ensemble combinations drop out +# before heavier material lookup +events <- design |> + tidyr::crossing(ensemble_member = seq_len(n_ensemble)) |> + dplyr::mutate( + applied = stats::rbinom(dplyr::n(), 1, p_apply) == 1L + ) |> + dplyr::filter(.data$applied) + +n_total <- nrow(design) * n_ensemble +PEcAn.logger::logger.info(sprintf( + "Probability draw fired %d of %d possible events (%.1f%%)", + nrow(events), n_total, 100 * nrow(events) / n_total)) + +if (nrow(events) == 0) { + PEcAn.logger::logger.severe("No events fired. Check p_apply_default.") +} + +# wood class is excluded from annuals; high C:N immobilizes more N than +# row crop rotations can absorb within one season +ca_compost <- PEcAn.data.land::ca_compost_amendment + +allowed_classes <- list( + annual = c("green", "food", "yard", "ag"), + perennial = c("green", "food", "yard", "ag", "wood") +) + +mat_idx_by_family <- lapply(allowed_classes, function(cls) { + which(ca_compost$material_class %in% cls) +}) + +# one material draw per event row, conditional on pft family +events$mat_idx <- vapply( + events$pft_family, + function(fam) sample(mat_idx_by_family[[fam]], 1L), + integer(1) +) + +mat_cols <- ca_compost[events$mat_idx, + c("material", "material_class", + "app_rate_min", "app_rate_max", + "n_pct", "pan_pct", + "cn_min", "cn_max")] +events <- dplyr::bind_cols(events, mat_cols) + +# date offset windows are working assumptions: perennials get fall/winter +# application (Niederholzer 2019, UCCE), annuals get a wider pre planting +# window (Fulford et al 2023, CA processing tomatoes) +ANNUAL_OFFSET_MIN <- 14L +ANNUAL_OFFSET_MAX <- 180L +PERENNIAL_OFFSET_MIN <- 30L +PERENNIAL_OFFSET_MAX <- 210L + +events <- events |> + dplyr::mutate( + u_rate = stats::runif(dplyr::n()), + u_cn = stats::runif(dplyr::n()), + app_rate_lb_acre = .data$app_rate_min + .data$u_rate * (.data$app_rate_max - .data$app_rate_min), + cn_ratio = .data$cn_min + .data$u_cn * (.data$cn_max - .data$cn_min), + date_offset_days = ifelse( + .data$pft_family == "annual", + sample(ANNUAL_OFFSET_MIN:ANNUAL_OFFSET_MAX, dplyr::n(), replace = TRUE), + sample(PERENNIAL_OFFSET_MIN:PERENNIAL_OFFSET_MAX, dplyr::n(), replace = TRUE) + ), + date = .data$anchor - .data$date_offset_days, + ens_id = sprintf("ens_%03d", .data$ensemble_member) + ) + +PEcAn.logger::logger.info(sprintf( + "Sampled %d compost events. app_rate %.0f to %.0f lb/acre, n_pct %.2f to %.2f, cn %.1f to %.1f", + nrow(events), + min(events[["app_rate_lb_acre"]]), max(events[["app_rate_lb_acre"]]), + min(events[["n_pct"]]), max(events[["n_pct"]]), + min(events[["cn_ratio"]]), max(events[["cn_ratio"]]))) + +staging_file <- file.path(staging_dir, "_staging_02_events.rds") +saveRDS(events, staging_file) +PEcAn.logger::logger.info("Wrote ", staging_file) diff --git a/workflows/ncc-statewide/03-write-parquet.R b/workflows/ncc-statewide/03-write-parquet.R new file mode 100644 index 0000000000..c6031438f9 --- /dev/null +++ b/workflows/ncc-statewide/03-write-parquet.R @@ -0,0 +1,91 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/ncc-statewide/config.yml", + config = Sys.getenv("NCC_PROJECT", "default")) + +staging_dir <- file.path(config[["output_dir"]], "_staging") +events_file <- file.path(staging_dir, "_staging_02_events.rds") +if (!file.exists(events_file)) { + PEcAn.logger::logger.severe( + "Stage 02 output not found: ", events_file, + ". Run 02-sample-ncc-events.R first." + ) +} + +PEcAn.logger::logger.info("Reading events from ", events_file) +events <- readRDS(events_file) + +# split total N using PAN (plant available N at 4 weeks). pan_pct can be +# negative for high C:N materials (immobilization); clamp to 0 in that +# case since SIPNET doesn't take negative minN. total C is just bulk +# material carbon, doesn't depend on PAN +out <- events |> + dplyr::mutate( + dry_mass_kg_m2 = PEcAn.utils::ud_convert(.data$app_rate_lb_acre, "lb/acre", "kg/m^2"), + total_n_kg_m2 = .data$dry_mass_kg_m2 * (.data$n_pct / 100), + pan_frac = pmax(0, .data$pan_pct / 100), + nh4_n_kg_m2 = .data$total_n_kg_m2 * .data$pan_frac, + org_n_kg_m2 = .data$total_n_kg_m2 * (1 - .data$pan_frac), + org_c_kg_m2 = .data$total_n_kg_m2 * .data$cn_ratio, + no3_n_kg_m2 = 0 + ) |> + dplyr::transmute( + parcel_id = as.integer(.data$parcel_id), + ens_id = .data$ens_id, + date = as.Date(.data$date), + .data$nh4_n_kg_m2, + .data$no3_n_kg_m2, + .data$org_c_kg_m2, + .data$org_n_kg_m2, + crop_code = .data$code, + .data$material + ) + +out_path <- config[["output_dir"]] +dir.create(out_path, showWarnings = FALSE, recursive = TRUE) + +## clean prior shards +existing <- list.files(out_path, pattern = "\\.parquet$", full.names = TRUE) +if (length(existing) > 0) { + PEcAn.logger::logger.info(sprintf("Removing %d existing parquet shards", length(existing))) + unlink(existing) +} + +# partition by parcel_id range and write one parquet per batch named +# _.parquet +all_parcels <- sort(unique(out[["parcel_id"]])) +batch_size <- as.integer(config[["batch_size"]]) +n_batches <- ceiling(length(all_parcels) / batch_size) +batches <- split(all_parcels, ceiling(seq_along(all_parcels) / batch_size)) + +PEcAn.logger::logger.info(sprintf( + "Writing %d rows across %d parcel batches (batch_size=%d) to %s", + nrow(out), n_batches, batch_size, out_path)) + +# prefer ZSTD; fall back to snappy when zstd is not in the local arrow +# build +parquet_codec <- if (arrow::codec_is_available("zstd")) "ZSTD" else "SNAPPY" +PEcAn.logger::logger.info("Parquet compression codec: ", parquet_codec) + +write_batch <- function(pids) { + shard <- out |> dplyr::filter(.data$parcel_id %in% pids) + pid_min <- min(shard[["parcel_id"]]) + pid_max <- max(shard[["parcel_id"]]) + fn <- file.path(out_path, sprintf("%d_%d.parquet", pid_min, pid_max)) + arrow::write_parquet(shard, fn, compression = parquet_codec) + fn +} + +workers <- as.integer(config[["workers"]]) +if (workers > 1) { + PEcAn.logger::logger.info(sprintf("Using mclapply with %d workers", workers)) + written <- parallel::mclapply(batches, write_batch, mc.cores = workers) +} else { + written <- lapply(batches, write_batch) +} + +PEcAn.logger::logger.info(sprintf( + "Done. wrote %d shards, %d total rows, parcels=%d, ensemble=%d", + length(written), nrow(out), + dplyr::n_distinct(out[["parcel_id"]]), + dplyr::n_distinct(out[["ens_id"]]))) diff --git a/workflows/ncc-statewide/README.md b/workflows/ncc-statewide/README.md new file mode 100644 index 0000000000..9cc63fda8f --- /dev/null +++ b/workflows/ncc-statewide/README.md @@ -0,0 +1,47 @@ +# Statewide compost (NCC) events workflow + +NCC is project shorthand for organic amendments like compost and manure. This workflow builds an ensemble of compost events for every California ag parcel in the LandIQ MSLSP matched product, for 2016 and 2018 to 2023. 2017 is skipped because LandIQ did not run a statewide survey that year. + +Material properties (application rate, %N, C:N, PAN) come from `PEcAn.data.land::ca_compost_amendment`. + +# Config + +Tweakable bits in `config.yml`: + +- `matched_dir`: where LandIQ MSLSP matched product lives +- `output_dir`: where parquet shards go +- `n_parcels`, `n_ensemble`, `batch_size`, `workers`: scale knobs per profile +- `p_apply_default`: probability of compost per parcel year per ensemble member (default 0.10) + +# Run + +Pick a profile (`default`, `medium`, `all`) and run from PEcAn project root: + +``` +NCC_PROJECT=default bash workflows/ncc-statewide/run-statewide.sh +``` + +01 builds a design table and tags each PFT as annual (row, hay, rice) or perennial (woody). 02 runs Bernoulli gate, picks a material per fired row from `ca_compost_amendment` (constrained to CalRecycle classes that fit PFT family), draws app rate and C:N from that material's envelope. 03 does unit conversion, splits N into mineral and organic using PAN (clamped to zero for high C:N materials that immobilize), and writes parcel range sharded parquet. + +`check-result.R` reads output back and prints a summary. + +# Output columns + +- `parcel_id`, `ens_id` (`ens_NNN`, shared with fertilization workflow), `date` +- `nh4_n_kg_m2`: PAN release fraction. Zero when `pan_pct` is negative (high C:N materials immobilize N instead of releasing it) +- `no3_n_kg_m2`: zero. Compost releases ammonium, nitrification happens later in soil pool +- `org_c_kg_m2`: total organic C from application. Does not depend on PAN; C is C regardless of N fate +- `org_n_kg_m2`: leftover organic N after PAN split +- `crop_code`: passthrough +- `material`: diagnostic column for `check-result.R` only. Cleaner strips it before union so it never reaches SIPNET + +Downstream, `workflows/preprocess-event-parquet/01c-clean-fertilization.R` unions these with synthetic N rows from fertilization workflow and writes one `fertilization.parquet`. SIPNET handles both under same FERT event type. + +# Compost timing + +Anchor is MSLSP green up date (`mslsp_OGI`). For annuals that's close to but not exactly planting date (emergence lags planting by a few days to weeks). For perennials it's bud break. Date offset is uniform within a family window: + +- annuals (row, hay, rice): 14 to 180 days before green up +- perennials (woody): 30 to 210 days before green up + +These are working assumptions consistent with broad direction in literature (fall application for perennials, pre plant for annuals). diff --git a/workflows/ncc-statewide/check-result.R b/workflows/ncc-statewide/check-result.R new file mode 100644 index 0000000000..e1c60e054f --- /dev/null +++ b/workflows/ncc-statewide/check-result.R @@ -0,0 +1,69 @@ +#!/usr/bin/env Rscript + +config <- config::get(file = "workflows/ncc-statewide/config.yml", + config = Sys.getenv("NCC_PROJECT", "default")) + +options(arrow.unsafe_metadata = TRUE) + +out_path <- config[["output_dir"]] +if (!dir.exists(out_path)) { + PEcAn.logger::logger.severe("Output not found: ", out_path) +} + +shards <- list.files(out_path, pattern = "\\.parquet$", full.names = TRUE) +if (length(shards) == 0) { + PEcAn.logger::logger.severe("No parquet shards under: ", out_path) +} +ds <- arrow::open_dataset(shards) + +# schema +print(ds$schema) + +PEcAn.logger::logger.info(sprintf( + "%d shards: %s ... %s", + length(shards), basename(shards[1]), basename(shards[length(shards)]))) + +tot <- ds |> dplyr::count() |> dplyr::collect() +PEcAn.logger::logger.info(sprintf("total rows: %d", tot[["n"]])) + +# rows per ensemble member +print( + ds |> + dplyr::count(.data$ens_id) |> + dplyr::collect() |> + as.data.frame() +) + +# rows per material +print( + ds |> + dplyr::count(.data$material) |> + dplyr::collect() |> + as.data.frame() +) + +# rows per year +print( + ds |> + dplyr::mutate(year = lubridate::year(.data$date)) |> + dplyr::count(.data$year) |> + dplyr::collect() |> + dplyr::arrange(.data$year) |> + as.data.frame() +) + +# first 5 rows +print(ds |> dplyr::slice_head(n = 5) |> dplyr::collect() |> as.data.frame()) + +# org_c and org_n ranges +stats <- ds |> + dplyr::summarize( + min_org_c = min(.data$org_c_kg_m2, na.rm = TRUE), + max_org_c = max(.data$org_c_kg_m2, na.rm = TRUE), + mean_org_c = mean(.data$org_c_kg_m2, na.rm = TRUE), + min_org_n = min(.data$org_n_kg_m2, na.rm = TRUE), + max_org_n = max(.data$org_n_kg_m2, na.rm = TRUE), + mean_org_n = mean(.data$org_n_kg_m2, na.rm = TRUE) + ) |> + dplyr::collect() +print(stats |> as.data.frame()) diff --git a/workflows/ncc-statewide/config.yml b/workflows/ncc-statewide/config.yml new file mode 100644 index 0000000000..fec0b17b32 --- /dev/null +++ b/workflows/ncc-statewide/config.yml @@ -0,0 +1,24 @@ +default: + n_parcels: 1000 + n_ensemble: 20 + seed: 42 + years: [2016, 2018, 2019, 2020, 2021, 2022, 2023] + matched_dir: /projectnb/dietzelab/ccmmf/management/phenology/matched_landiq_mslsp_v4.1 + + # probability of compost amendment per parcel year per ensemble member. + # scenario knob, useful range 0.05 to 0.25 + p_apply_default: 0.10 + + output_dir: /projectnb/dietzelab/ccmmf/usr/akash/event_files/ncc + batch_size: 100 + workers: 1 + +medium: + n_parcels: 10000 + batch_size: 1000 + workers: 4 + +all: + n_parcels: null + batch_size: 5000 + workers: 8 diff --git a/workflows/ncc-statewide/push-to-carb.sh b/workflows/ncc-statewide/push-to-carb.sh new file mode 100755 index 0000000000..745e5cfeb0 --- /dev/null +++ b/workflows/ncc-statewide/push-to-carb.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +aws s3 sync --profile ccmmf \ + /projectnb/dietzelab/ccmmf/management/ncc_event_files/ \ + s3://carb/management/ncc/v1.0/ diff --git a/workflows/ncc-statewide/run-statewide.sh b/workflows/ncc-statewide/run-statewide.sh new file mode 100755 index 0000000000..ea9b20d02b --- /dev/null +++ b/workflows/ncc-statewide/run-statewide.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +# +# runs the ncc (compost) statewide pipeline. NCC_PROJECT picks a profile +# from config.yml (default, medium, all). + +set -euo pipefail + +# run from the PEcAn repo root regardless of caller cwd. +cd "$(dirname "$0")/../.." + +NCC_PROJECT="${NCC_PROJECT:-default}" +export NCC_PROJECT + +DIR="workflows/ncc-statewide" + +Rscript "$DIR/01-build-parcel-design.R" +Rscript "$DIR/02-sample-ncc-events.R" +Rscript "$DIR/03-write-parquet.R" diff --git a/workflows/preprocess-event-parquet/01c-clean-fertilization.R b/workflows/preprocess-event-parquet/01c-clean-fertilization.R new file mode 100644 index 0000000000..2ec83a9afa --- /dev/null +++ b/workflows/preprocess-event-parquet/01c-clean-fertilization.R @@ -0,0 +1,100 @@ +#!/usr/bin/env Rscript + +# both workflows feed this cleaner. synthetic N comes from fertilization- +# statewide as nh4 + no3 rows; compost comes from ncc-statewide as org_c + +# org_n + (optionally) nh4 from the PAN fraction. they share the ens_NNN +# naming so an ensemble member can carry both kinds of rows under one +# event_type = fertilization in the json output +# +# input paths default to where the two workflows ship their parquet +# shards but can be overridden with env vars FERT_RAW_DIR and NCC_RAW_DIR +# so a different output_dir in either workflow config does not require +# editing this file +fert_path <- Sys.getenv("FERT_RAW_DIR", + unset = "/projectnb/dietzelab/ccmmf/usr/akash/event_files/fertilization") +ncc_path <- Sys.getenv("NCC_RAW_DIR", + unset = "/projectnb/dietzelab/ccmmf/usr/akash/event_files/ncc") + +outdir <- "_output" +dir.create(outdir, showWarnings = FALSE, recursive = TRUE) + +dbdir <- file.path(Sys.getenv("TMPDIR", "/tmp"), "temp.duckdb") +conn <- DBI::dbConnect(duckdb::duckdb(dbdir = dbdir)) +on.exit({ + DBI::dbDisconnect(conn, shutdown = TRUE) + unlink(dbdir) +}, add = TRUE) + +# detect what is actually on disk so cleaner is runnable in three +# cases: fert only, ncc only, both +has_fert <- length(Sys.glob(file.path(fert_path, "*.parquet"))) > 0 +has_ncc <- length(Sys.glob(file.path(ncc_path, "*.parquet"))) > 0 + +if (!has_fert && !has_ncc) { + stop("No fertilization or compost parquet shards found at ", + fert_path, " or ", ncc_path) +} + +fert_select <- glue::glue(" + SELECT + CAST (parcel_id AS INTEGER) AS site_id, + CAST (ens_id AS event_ens_id_enum) AS event_member_id, + date, + CAST (nh4_n_kg_m2 AS DECIMAL(10, 8)) AS nh4_n_kg_m2, + CAST (no3_n_kg_m2 AS DECIMAL(10, 8)) AS no3_n_kg_m2, + CAST (org_c_kg_m2 AS DECIMAL(10, 8)) AS org_c_kg_m2, + CAST (org_n_kg_m2 AS DECIMAL(10, 8)) AS org_n_kg_m2, + crop_code + FROM read_parquet('{fert_path}/*.parquet') +") + +ncc_select <- glue::glue(" + SELECT + CAST (parcel_id AS INTEGER) AS site_id, + CAST (ens_id AS event_ens_id_enum) AS event_member_id, + date, + CAST (nh4_n_kg_m2 AS DECIMAL(10, 8)) AS nh4_n_kg_m2, + CAST (no3_n_kg_m2 AS DECIMAL(10, 8)) AS no3_n_kg_m2, + CAST (org_c_kg_m2 AS DECIMAL(10, 8)) AS org_c_kg_m2, + CAST (org_n_kg_m2 AS DECIMAL(10, 8)) AS org_n_kg_m2, + crop_code + FROM read_parquet('{ncc_path}/*.parquet') +") + +ens_source <- if (has_fert && has_ncc) { + glue::glue(" + SELECT DISTINCT ens_id FROM read_parquet('{fert_path}/*.parquet') + UNION + SELECT DISTINCT ens_id FROM read_parquet('{ncc_path}/*.parquet') + ") +} else if (has_fert) { + glue::glue("SELECT DISTINCT ens_id FROM read_parquet('{fert_path}/*.parquet')") +} else { + glue::glue("SELECT DISTINCT ens_id FROM read_parquet('{ncc_path}/*.parquet')") +} + +union_query <- if (has_fert && has_ncc) { + paste(fert_select, "UNION ALL", ncc_select) +} else if (has_fert) { + fert_select +} else { + ncc_select +} + +# cast ensemble id to an enum to accelerate and reduce the memory pressure +# of the sort +DBI::dbExecute(conn, glue::glue(" + CREATE OR REPLACE TYPE event_ens_id_enum AS ENUM ({ens_source}) +")) + +# sort and write the partitioned parquet output. rename parcel_id to site_id +# and ens_id to event_member_id to match the schema the json converter +# consumes +DBI::dbExecute(conn, glue::glue(" + COPY ( + {union_query} + ORDER BY event_member_id, site_id, date + ) TO + '{outdir}/fertilization.parquet' + (FORMAT PARQUET, COMPRESSION ZSTD, OVERWRITE, PARTITION_BY (event_member_id)) +")) diff --git a/workflows/preprocess-event-parquet/README.md b/workflows/preprocess-event-parquet/README.md index c64ce25d7e..f1e16e615d 100644 --- a/workflows/preprocess-event-parquet/README.md +++ b/workflows/preprocess-event-parquet/README.md @@ -4,4 +4,5 @@ This converts the raw outputs of the management/monitoring CCMMF pipeline into p - `01a-clean-irrigation.R` --- Preprocess irrigation. This is handled separately because the raw data are really large (600M rows). - `01b-clean-other-events.R` --- Preprocess remaining events. These are done together because they are much smaller. +- `01c-clean-fertilization.R` --- Preprocess fertilization events. Reads both `workflows/fertilization-statewide/` (synthetic N) and `workflows/ncc-statewide/` (compost) raw outputs and writes them as one `fertilization.parquet` since SIPNET's FERT handler accumulates the org and mineral channels on same event_type - `02-events-to-json.R` --- Example of running