Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
70f204b
add fertilization config
divine7022 May 18, 2026
676b478
add fertilization parcel design step
divine7022 May 18, 2026
7188dd8
add n rate sampling step
divine7022 May 18, 2026
6166a4b
add fertilization parquet writer
divine7022 May 18, 2026
2f1b4bd
add fertilization runner script
divine7022 May 18, 2026
5f585b5
add fertilization sanity check
divine7022 May 18, 2026
b20f9d9
add fertilization s3 sync
divine7022 May 18, 2026
608af23
add fertilization workflow readme
divine7022 May 18, 2026
fadbff9
add ncc config
divine7022 May 18, 2026
0ce93d1
add ncc parcel design step
divine7022 May 18, 2026
4c9fbad
add ncc event sampling step
divine7022 May 18, 2026
418410b
add ncc parquet writer
divine7022 May 18, 2026
659c532
add ncc runner script
divine7022 May 18, 2026
2759164
add ncc sanity check
divine7022 May 18, 2026
7d4246b
add ncc s3 sync
divine7022 May 18, 2026
6af1c01
add ncc workflow readme
divine7022 May 18, 2026
b8e99cb
add fertilization cleaner
divine7022 May 18, 2026
bbbb1da
add ncc cleaner
divine7022 May 18, 2026
d87c873
list new cleaners in readme
divine7022 May 18, 2026
83df132
add changelog entry for event workflows
divine7022 May 18, 2026
3ab64b3
drop output_subdir, fix nh4 split knob name, point crosswalk at share…
divine7022 May 24, 2026
96f8002
split skip_zero_envelope drops from skip_no_rate, drop unused PFT col…
divine7022 May 24, 2026
3a23097
switch fert sampler to shared ens_NNN naming
divine7022 May 24, 2026
147c303
swap fert constant for ud_convert, drop fert_subtype and pft cols
divine7022 May 24, 2026
3f9a540
fert check result, and collapsed output_dir
divine7022 May 24, 2026
ae7055b
update readme
divine7022 May 24, 2026
f67b0b7
drop output_subdir from ncc config and unused crosswalk_path
divine7022 May 24, 2026
57ec447
include rice and hay in pft families instead of dropping them
divine7022 May 24, 2026
86f1de8
rewrite ncc sampler against ca_compost_amendment with uniform quantil…
divine7022 May 24, 2026
889b3a1
ud_convert for units and PAN based mineral N split with negative clamp
divine7022 May 24, 2026
7799678
collapsed output_dir
divine7022 May 24, 2026
3987a26
ncc acronym defined and timing window cited
divine7022 May 24, 2026
bb1147f
01c unions fert and ncc into one fertilization parquet
divine7022 May 24, 2026
7500d47
drop 01d since 01c covers compost too
divine7022 May 24, 2026
e24b371
note 01c now does both
divine7022 May 24, 2026
a97b22b
changelog entry for the rework
divine7022 May 24, 2026
6b7573c
drop ghost small profile from runner help text
divine7022 May 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ For more information about this file see also [Keep a Changelog](http://keepacha
- PEcAn.SIPNET gains support for SIPNET v2, whose features includes management events, nitrogen cycle tracking, explicit N2O and methane fluxes, runtime setting of feature flags, and changes to the parameter set (now 73 parameters). SIPNET v1 is still fully supported, but workarounds for bugs in the legacy `sipnet.unk` version have been removed.
- Added `PEcAn.data.land::to_co2e()` for converting SOC change, CH4, and N2O to CO2-equivalent emissions using IPCC Global Warming Potential values.
- Added `PEcAn.data.land::event_parquet_to_json` for generating PEcAn `event.json` files from well-formatted event parquet files, with support for ensembles of events.
- Added statewide synthetic fertilization and compost amendment event workflows for CA ag parcels. Outputs share an ensemble naming so a downstream cleaner unions them into one fertilization event type for SIPNET.

### Fixed
- Fixed broken pecanproject.github.io, pecan.gitbooks.io, and other outdated documentation links across book_source, tutorials, models, modules, web, and shiny files (#3710).
Expand Down
196 changes: 196 additions & 0 deletions workflows/fertilization-statewide/01-build-parcel-design.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
#!/usr/bin/env Rscript

config <- config::get(file = "workflows/fertilization-statewide/config.yml",
config = Sys.getenv("FERT_PROJECT", "default"))

set.seed(config[["seed"]])

staging_dir <- file.path(config[["output_dir"]], "_staging")
dir.create(staging_dir, showWarnings = FALSE, recursive = TRUE)

options(arrow.unsafe_metadata = TRUE)

# strip parenthetical annotations, conjunctions, and punctuation so the
# LandIQ, FREP, and UC ANR strings can be matched on a common key.
normalize_name <- function(s) {
s |> tolower() |>
stringr::str_replace_all("\\(.*?\\)", "") |>
stringr::str_replace_all("grouped for remote sensing only", "") |>
stringr::str_replace_all("\\bor\\b", "") |>
stringr::str_replace_all("\\band\\b", "") |>
stringr::str_replace_all("&", "") |>
stringr::str_replace_all("[[:punct:]]", " ") |>
stringr::str_squish()
}

# split slash separated UC ANR or FREP candidates and reattach the shared
# prefix when a part is missing it. e.g. "Plums, dried / fresh" expands to
# c("Plums, dried", "Plums, fresh"). only candidates that match the known
# crop list are kept.
parse_candidates <- function(s, known) {
if (is.na(s) || nchar(s) == 0) return(character(0))
parts <- stringr::str_split(s, " ?/ ?")[[1]] |> stringr::str_squish()
prefix <- if (stringr::str_detect(parts[1], ",")) {
stringr::str_extract(parts[1], "^[^,]+, ")
} else ""
result <- character()
for (p in parts) {
if (tolower(p) %in% tolower(known)) {
result <- c(result, p)
} else if (nchar(prefix) > 0) {
combined <- paste0(prefix, p)
if (tolower(combined) %in% tolower(known)) {
result <- c(result, combined)
}
}
}
unique(result)
}

## load lookups
PEcAn.logger::logger.info("Loading bundled lookups and crosswalk")
crosswalk <- readr::read_tsv(config[["crosswalk_path"]], show_col_types = FALSE) |>
dplyr::select(landiq = "LandIQ Name", frep = "FREP Name", ucanr = "UC ANR")
ca_rates <- PEcAn.data.land::ca_n_application_rate
code_map <- PEcAn.data.land::landiq_crop_mapping_codes

## crosswalk lookup
# resolve each LandIQ CLASS+SUBCLASS code to an N rate envelope by walking
# the crosswalk to UC ANR or FREP names and matching against the bundled
# rate table.
known_crops <- ca_rates$crop
xw_norm <- crosswalk |>
dplyr::mutate(
key = normalize_name(.data$landiq),
candidates = dplyr::coalesce(.data$ucanr, .data$frep)
)

code_lookup <- code_map |>
dplyr::mutate(
code = paste0(.data$CLASS, .data$SUBCLASS),
key = normalize_name(.data$subclass_name)
) |>
dplyr::left_join(xw_norm |> dplyr::select("key", "candidates"), by = "key") |>
dplyr::rowwise() |>
dplyr::mutate(matched_crops = list(parse_candidates(.data$candidates, known_crops))) |>
dplyr::ungroup() |>
dplyr::filter(lengths(.data$matched_crops) > 0) |>
dplyr::mutate(rates = lapply(.data$matched_crops, function(cc) {
ca_rates |>
dplyr::filter(.data$crop %in% cc) |>
dplyr::summarize(
min_n_lbs_acre = min(.data$min_n_lbs_acre, na.rm = TRUE),
max_n_lbs_acre = max(.data$max_n_lbs_acre, na.rm = TRUE)
)
})) |>
tidyr::unnest("rates") |>
dplyr::select("code", "min_n_lbs_acre", "max_n_lbs_acre")

PEcAn.logger::logger.info(sprintf("Resolved %d LandIQ codes via crosswalk", nrow(code_lookup)))

## load matched product
# parcel_id is stored as a string in the source so cast to int on the way
# in. filter keeps matched cycles with non NA crop class plus a valid EVI
# signal.
read_matched_year <- function(year) {
fn <- file.path(config[["matched_dir"]],
sprintf("assigned_year=%d.parquet", year))
if (!file.exists(fn)) {
PEcAn.logger::logger.warn("Missing matched file for year ", year, ": ", fn)
return(NULL)
}
arrow::read_parquet(fn) |>
dplyr::filter(.data$assigned_by == "matched",
!is.na(.data$landiq_CLASS),
!is.na(.data$landiq_SUBCLASS),
!is.na(.data$landiq_PFT),
!is.na(.data$mslsp_EVImax),
!is.na(.data$mslsp_EVIamp)) |>
dplyr::transmute(
parcel_id = as.integer(.data$parcel_id),
year = as.integer(.data$year),
season = as.integer(.data$season),
date = as.Date(.data$mslsp_OGI),
code = paste0(.data$landiq_CLASS, .data$landiq_SUBCLASS)
)
}

PEcAn.logger::logger.info("Reading matched LandIQ MSLSP for years: ",
paste(config[["years"]], collapse = ", "))
plant <- purrr::map_dfr(config[["years"]], read_matched_year)
PEcAn.logger::logger.info(sprintf("Loaded %d cycles across %d parcels",
nrow(plant), dplyr::n_distinct(plant$parcel_id)))

## subsample
# parcel set is sampled once and applied to all years so the same parcels
# appear in every year.
n_parcels <- config[["n_parcels"]]
if (!is.null(n_parcels) && n_parcels < dplyr::n_distinct(plant$parcel_id)) {
picked <- plant |>
dplyr::distinct(.data$parcel_id) |>
dplyr::slice_sample(n = n_parcels) |>
dplyr::pull(.data$parcel_id)
plant <- plant |> dplyr::filter(.data$parcel_id %in% picked)
PEcAn.logger::logger.info(sprintf("Sampled %d parcels (n_parcels=%d)",
length(picked), n_parcels))
}

## attach rates
# cycles with no resolvable rate are dropped with an audit log. no PFT
# median fallback.
design <- plant |>
dplyr::left_join(code_lookup, by = "code") |>
dplyr::mutate(
rate_source = dplyr::case_when(
is.na(.data$min_n_lbs_acre) ~ "skip_no_rate",
.data$min_n_lbs_acre == 0 & .data$max_n_lbs_acre == 0 ~ "skip_zero_envelope",
TRUE ~ "crosswalk"
)
)

# log both kinds of drops separately so it's clear which are missing data
# vs which are intentional zero envelopes (e.g. Alfalfa, legumes that fix
# their own N and carry a cited 0 to 0 rate)
unresolved <- design |> dplyr::filter(.data$rate_source == "skip_no_rate")
if (nrow(unresolved) > 0) {
by_code <- unresolved |>
dplyr::count(.data$code, name = "n_events", sort = TRUE) |>
head(15)
PEcAn.logger::logger.warn(sprintf(
"Dropping %d cycles across %d codes with no resolvable N rate. Top offenders:",
nrow(unresolved), dplyr::n_distinct(unresolved$code)
))
for (i in seq_len(nrow(by_code))) {
PEcAn.logger::logger.warn(sprintf(" %s: %d cycles",
by_code$code[i], by_code$n_events[i]))
}
}

zero_env <- design |> dplyr::filter(.data$rate_source == "skip_zero_envelope")
if (nrow(zero_env) > 0) {
by_code <- zero_env |>
dplyr::count(.data$code, name = "n_events", sort = TRUE) |>
head(15)
PEcAn.logger::logger.info(sprintf(
"Dropping %d cycles across %d codes with a cited 0 to 0 N rate (no synthetic application). Top:",
nrow(zero_env), dplyr::n_distinct(zero_env$code)
))
for (i in seq_len(nrow(by_code))) {
PEcAn.logger::logger.info(sprintf(" %s: %d cycles",
by_code$code[i], by_code$n_events[i]))
}
}

design <- design |>
dplyr::filter(.data$rate_source == "crosswalk") |>
dplyr::select("parcel_id", "year", "season", "date", "code",
"min_n_lbs_acre", "max_n_lbs_acre")

PEcAn.logger::logger.info(sprintf("Design table: %d events, %d parcels, %d years",
nrow(design),
dplyr::n_distinct(design$parcel_id),
dplyr::n_distinct(design$year)))

staging_file <- file.path(staging_dir, "_staging_01_design.rds")
saveRDS(design, staging_file)
PEcAn.logger::logger.info("Wrote ", staging_file)
44 changes: 44 additions & 0 deletions workflows/fertilization-statewide/02-sample-n-rates.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env Rscript

config <- config::get(file = "workflows/fertilization-statewide/config.yml",
config = Sys.getenv("FERT_PROJECT", "default"))

set.seed(config[["seed"]])

staging_dir <- file.path(config[["output_dir"]], "_staging")
design_file <- file.path(staging_dir, "_staging_01_design.rds")
if (!file.exists(design_file)) {
PEcAn.logger::logger.severe(
"Stage 01 output not found: ", design_file,
". Run 01-build-parcel-design.R first."
)
}

PEcAn.logger::logger.info("Reading design from ", design_file)
design <- readRDS(design_file)

n_ensemble <- config[["n_ensemble"]]
PEcAn.logger::logger.info(sprintf(
"Sampling %d ensemble members across %d design rows",
n_ensemble, nrow(design)
))

# expand each design row to one row per ensemble member and draw the annual
# N rate uniformly from each cycle's min/max envelope. ens_id is shared with
# ncc workflow so a given ensemble member can carry both synthetic and
# compost rows under one fertilization event_type in json output
events <- design |>
tidyr::crossing(ensemble_member = seq_len(n_ensemble)) |>
dplyr::mutate(
annual_n_lb_acre = stats::runif(dplyr::n(), min = .data$min_n_lbs_acre, max = .data$max_n_lbs_acre),
ens_id = sprintf("ens_%03d", .data$ensemble_member)
)

PEcAn.logger::logger.info(sprintf(
"Sampled %d events. annual N range: %.2f to %.2f lb/acre",
nrow(events), min(events$annual_n_lb_acre), max(events$annual_n_lb_acre)
))

staging_file <- file.path(staging_dir, "_staging_02_events.rds")
saveRDS(events, staging_file)
PEcAn.logger::logger.info("Wrote ", staging_file)
93 changes: 93 additions & 0 deletions workflows/fertilization-statewide/03-write-parquet.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env Rscript

config <- config::get(file = "workflows/fertilization-statewide/config.yml",
config = Sys.getenv("FERT_PROJECT", "default"))

staging_dir <- file.path(config[["output_dir"]], "_staging")
events_file <- file.path(staging_dir, "_staging_02_events.rds")
if (!file.exists(events_file)) {
PEcAn.logger::logger.severe(
"Stage 02 output not found: ", events_file,
". Run 02-sample-n-rates.R first."
)
}

PEcAn.logger::logger.info("Reading events from ", events_file)
events <- readRDS(events_file)

# synthetic N is split between nh4 and no3 using nh4_fraction (share going
# to nh4, rest goes to no3). org_c and org_n stay zero here; compost rows
# come in via ncc workflow and end up unioned into same fertilization
# parquet by cleaner
nh4_frac <- config[["nh4_fraction"]]

out <- events |>
dplyr::mutate(
total_n_kg_m2 = PEcAn.utils::ud_convert(.data$annual_n_lb_acre, "lb/acre", "kg/m^2"),
nh4_n_kg_m2 = .data$total_n_kg_m2 * .env$nh4_frac,
no3_n_kg_m2 = .data$total_n_kg_m2 * (1 - .env$nh4_frac),
org_c_kg_m2 = 0,
org_n_kg_m2 = 0
) |>
dplyr::transmute(
parcel_id = as.integer(.data$parcel_id),
ens_id = .data$ens_id,
date = as.Date(.data$date),
.data$nh4_n_kg_m2,
.data$no3_n_kg_m2,
.data$org_c_kg_m2,
.data$org_n_kg_m2,
crop_code = .data$code
)

out_path <- config[["output_dir"]]
dir.create(out_path, showWarnings = FALSE, recursive = TRUE)

## clean prior shards
existing <- list.files(out_path, pattern = "\\.parquet$", full.names = TRUE)
if (length(existing) > 0) {
PEcAn.logger::logger.info(sprintf("Removing %d existing parquet shards", length(existing)))
unlink(existing)
}

# partition by parcel_id range and write one parquet per batch named
# <pid_min>_<pid_max>.parquet.
all_parcels <- sort(unique(out[["parcel_id"]]))
batch_size <- config[["batch_size"]]
n_batches <- ceiling(length(all_parcels) / batch_size)
batches <- split(all_parcels, ceiling(seq_along(all_parcels) / batch_size))

PEcAn.logger::logger.info(sprintf(
"Writing %d rows across %d parcel batches (batch_size=%d) to %s",
nrow(out), n_batches, batch_size, out_path
))

# prefer ZSTD; fall back to snappy when zstd is not in the local arrow
# build.
parquet_codec <- if (arrow::codec_is_available("zstd")) "ZSTD" else "SNAPPY"
PEcAn.logger::logger.info("Parquet compression codec: ", parquet_codec)

write_batch <- function(pids) {
shard <- out |> dplyr::filter(.data$parcel_id %in% pids)
pid_min <- min(shard[["parcel_id"]])
pid_max <- max(shard[["parcel_id"]])
fn <- file.path(out_path, sprintf("%d_%d.parquet", pid_min, pid_max))
arrow::write_parquet(shard, fn, compression = parquet_codec)
fn
}

workers <- as.integer(config[["workers"]])
if (workers > 1) {
PEcAn.logger::logger.info(sprintf("Using mclapply with %d workers", workers))
written <- parallel::mclapply(batches, write_batch, mc.cores = workers)
} else {
written <- lapply(batches, write_batch)
}

PEcAn.logger::logger.info(sprintf(
"Done. wrote %d shards, %d total rows, parcels=%d, years=%d, ensemble=%d",
length(written), nrow(out),
dplyr::n_distinct(out[["parcel_id"]]),
dplyr::n_distinct(format(out[["date"]], "%Y")),
dplyr::n_distinct(out[["ens_id"]])
))
29 changes: 29 additions & 0 deletions workflows/fertilization-statewide/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Statewide fertilization events workflow

Builds an ensemble of synthetic N fertilization events for every California ag parcel in the LandIQ MSLSP matched product, for 2016 and 2018 to 2023. 2017 is skipped because LandIQ did not run a statewide survey that year. N rate envelopes come from `PEcAn.data.land::ca_n_application_rate`.

# Config

Everything tweakable lives in `config.yml`. Most setups only need to look at:

- `matched_dir`: the LandIQ MSLSP matched product directory
- `crosswalk_path`: the LandIQ to FREP to UC ANR crop name crosswalk TSV
- `output_dir`: where the parquet shards go
- `n_parcels`, `n_ensemble`, `batch_size`, `workers`: scale knobs per profile
- `nh4_fraction`: share of total synthetic N going to ammonium; the rest goes to nitrate (default 0.5 for a 50/50 split)

# Run

Pick a profile (`default`, `medium`, `all`) and run from the PEcAn project root:

```
FERT_PROJECT=default bash workflows/fertilization-statewide/run-statewide.sh
```

`default` is 1000 random parcels, single threaded. `all` is full statewide (~660k parcels), eight workers. The three R scripts chain together: 01 builds the design, 02 samples rates, 03 writes parquet. `check-result.R` reads the output back and prints a summary.

# Output

Parcel range sharded parquet at `<output_dir>/`. Columns: `parcel_id`, `ens_id` (`ens_NNN`, shared with ncc workflow), `date`, `nh4_n_kg_m2`, `no3_n_kg_m2`, `org_c_kg_m2` (zero), `org_n_kg_m2` (zero), `crop_code`.

Downstream, `workflows/preprocess-event-parquet/01c-clean-fertilization.R` renames `parcel_id` to `site_id` and `ens_id` to `event_member_id`, unions compost rows from `workflows/ncc-statewide/`, and writes JSON converter input.
Loading
Loading