Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
522 changes: 522 additions & 0 deletions modules/data.remote/inst/ccmmf/events/make_events_statewide.R

Large diffs are not rendered by default.

188 changes: 188 additions & 0 deletions modules/data.remote/inst/ccmmf/hls/build_hls_parcel_tile_map.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Build a parcel-year to HLS tile map for California agricultural parcels (LandIQ v4.1).
# Reads crops_all_years.parquet filtered by agricultural CLASS/SUBCLASS, loads parcel
# geometries, intersects with the HLS tile grid, and writes RDS plus tile parcel counts.
#
# Main inputs: CCMMF_LANDIQ_V4 (LandIQ root), CCMMF_MANAGEMENT (crop lookup, tile extent),
# optional CLI year_min year_max overwrite.
# Main outputs: hls_parcel_tile_map_v4.1_years=MIN-MAX.rds, hls_tile_parcel_counts CSV,
# optional removed parcel-years CSV when geometries fail QC.
# How to run: Rscript scripts/hls/build_hls_parcel_tile_map.R [year_min] [year_max] [overwrite]
# Workflow: upstream of tilewise MSLSP/NDTI drivers; run build_hls_tile_extent.R first.

suppressPackageStartupMessages({
library(sf)
library(data.table)
library(arrow)
library(dplyr)
})
sf::sf_use_s2(FALSE)

#### Configuration

path_landiq_v4 <- Sys.getenv("CCMMF_LANDIQ_V4", "/projectnb/dietzelab/ccmmf/LandIQ-harmonized-v4.1")
path_management <- Sys.getenv("CCMMF_MANAGEMENT", "/projectnb/dietzelab/ccmmf/management")
path_parcels <- file.path(path_landiq_v4, "parcels-consolidated.gpkg")
path_crops_parq <- file.path(path_landiq_v4, "crops_all_years.parq")
path_cropcode_lookup <- file.path(path_management, "LandIQ_cropCode_lookup_table.csv")
path_tiles <- file.path(path_management, "hls_tile_extent.rds")
path_out <- path_management

#### Parse arguments

args <- commandArgs(trailingOnly = TRUE)
year_min <- if (length(args) >= 1) as.integer(args[1]) else 2016L
year_max <- if (length(args) >= 2) as.integer(args[2]) else 2024L
overwrite <- length(args) >= 3 && tolower(args[3]) %in% c("overwrite", "true", "t", "1", "yes", "y")

out_file <- file.path(path_out, sprintf("hls_parcel_tile_map_v4.1_years=%d-%d.rds", year_min, year_max))
if (file.exists(out_file) && !overwrite) quit(save = "no", status = 0)
if (!file.exists(path_tiles)) {
stop("Tile extent not found. Run: Rscript scripts/hls/build_hls_tile_extent.R")
}

tile_prep <- readRDS(path_tiles)
tile_extent <- tile_prep$tile_extent_sf
used_crs <- tile_prep$used_crs

#### Parcel-year rows (agricultural only via CLASS and SUBCLASS join)

# Join on CLASS+SUBCLASS so subclass-level PFT differences (e.g. T19 vs T28 woody) stay correct.
lookup <- fread(path_cropcode_lookup)
ag_pairs <- unique(lookup[is_agricultural == TRUE,
.(CLASS = trimws(CLASS), SUBCLASS = as.character(SUBCLASS))])
ag_classes_filter <- unique(ag_pairs$CLASS)

parcel_year_raw <- arrow::open_dataset(path_crops_parq) |>
dplyr::filter(year >= year_min, year <= year_max, CLASS %in% ag_classes_filter) |>
dplyr::select(parcel_id, year, CLASS, SUBCLASS) |>
dplyr::collect() |>
as.data.table()
parcel_year_raw[, CLASS := trimws(as.character(CLASS))]
parcel_year_raw[, SUBCLASS := as.character(SUBCLASS)]
parcel_year <- merge(parcel_year_raw, ag_pairs, by = c("CLASS", "SUBCLASS"))[
, .(parcel_id = as.character(parcel_id), year = as.integer(year))
] |> unique()
parcel_year[, parcel_id := as.character(parcel_id)]
parcel_year[, year := as.integer(year)]
message("Parcel-year rows (agricultural, ", year_min, "-", year_max, "): ", nrow(parcel_year))

#### Load parcel geometry in chunks (avoid huge SQL IN lists)

ids <- unique(parcel_year$parcel_id)
layer <- st_layers(path_parcels)$name[1]
chunks <- split(ids, ceiling(seq_along(ids) / 5000L))
geom_chunks <- lapply(chunks, function(x) {
esc <- gsub("'", "''", x, fixed = TRUE)
q <- sprintf('SELECT * FROM "%s" WHERE parcel_id IN (%s)', layer, paste0("'", esc, "'", collapse = ","))
st_read(path_parcels, query = q, quiet = TRUE)
})
parcels <- do.call(rbind, geom_chunks)
parcels$parcel_id <- as.character(parcels$parcel_id)

#### QC: drop invalid or empty geometries (bad WKB can break OGR)

valid <- tryCatch(
!sf::st_is_empty(sf::st_geometry(parcels)),
error = function(e) {
message("Bulk geometry check failed; checking row-by-row for corrupt geometries.")
vapply(seq_len(nrow(parcels)), function(i) {
tryCatch(!sf::st_is_empty(sf::st_geometry(parcels)[i]), error = function(e) FALSE)
}, logical(1))
}
)
removed_log <- if (any(!valid)) {
parcel_year[parcel_id %in% parcels$parcel_id[!valid], .(parcel_id, year)]
} else {
data.table(parcel_id = character(), year = integer())
}
parcels <- parcels[valid, ]

#### Reproject to tile CRS (row-by-row fallback if bulk transform fails)

parcels_tr <- tryCatch(sf::st_transform(parcels, used_crs), error = function(e) NULL)
if (is.null(parcels_tr)) {
message("Bulk st_transform failed; checking row-by-row.")
chunk_size <- 5000L
n <- nrow(parcels)
good <- logical(n)
for (start in seq(1L, n, by = chunk_size)) {
end <- min(start + chunk_size - 1L, n)
chk <- tryCatch(sf::st_transform(parcels[start:end, ], used_crs), error = function(e) NULL)
if (!is.null(chk)) {
good[start:end] <- TRUE
} else {
for (i in start:end) {
good[i] <- tryCatch({
sf::st_transform(parcels[i, ], used_crs)
TRUE
}, error = function(e) FALSE)
}
}
}
drop_ids <- parcels$parcel_id[!good]
if (length(drop_ids) > 0) {
removed_log <- rbind(removed_log, parcel_year[parcel_id %in% drop_ids, .(parcel_id, year)])
}
parcels <- parcels[good, ]
parcels <- sf::st_transform(parcels, used_crs)
} else {
parcels <- parcels_tr
}

#### Spatial join: parcel polygon intersects tile polygon (any overlap counts)

hits <- tryCatch(sf::st_intersects(parcels, tile_extent), error = function(e) NULL)
if (is.null(hits)) {
message("Bulk st_intersects failed; checking row-by-row.")
n <- nrow(parcels)
good <- logical(n)
for (i in seq_len(n)) {
good[i] <- tryCatch({
hi <- sf::st_intersects(parcels[i, ], tile_extent)
length(hi[[1]]) >= 0
TRUE
}, error = function(e) FALSE)
}
drop_ids <- parcels$parcel_id[!good]
if (length(drop_ids) > 0) {
removed_log <- rbind(removed_log, parcel_year[parcel_id %in% drop_ids, .(parcel_id, year)])
}
parcels <- parcels[good, ]
hits <- sf::st_intersects(parcels, tile_extent)
}
keep <- lengths(hits) > 0
parcels <- parcels[keep, ]
hits <- hits[keep]

if (nrow(removed_log) > 0) {
removed_log <- unique(removed_log)
removed_log_file <- file.path(path_out, sprintf("hls_parcel_tile_map_removed_v4.1_years=%d-%d.csv", year_min, year_max))
dir.create(path_out, recursive = TRUE, showWarnings = FALSE)
fwrite(removed_log, removed_log_file)
message("Dropped ", nrow(removed_log), " parcel-years with invalid geometry; log: ", removed_log_file)
}

#### Build parcel to tiles table and join to parcel_year

tile_by_parcel <- data.table(
parcel_id = parcels$parcel_id,
tileIDs = vapply(hits, function(i) paste(tile_extent$tile_id[i], collapse = ","), character(1)),
n_tiles = lengths(hits)
)
setkey(tile_by_parcel, parcel_id)
setkey(parcel_year, parcel_id)
out <- tile_by_parcel[parcel_year, nomatch = 0][, .(parcel_id, year, tileIDs, n_tiles)]

#### Tile to parcel counts (for scheduling)

tile_long <- out[, .(tile_id = unlist(strsplit(tileIDs, ",", fixed = TRUE))), by = .(parcel_id, year)]
tile_counts <- tile_long[, .(n_parcels = .N), by = .(tile_id, year)]
setorder(tile_counts, tile_id, year)

#### Write outputs

dir.create(path_out, recursive = TRUE, showWarnings = FALSE)
saveRDS(out, out_file)
tile_counts_file <- file.path(path_out, sprintf("hls_tile_parcel_counts_v4.1_years=%d-%d.csv", year_min, year_max))
fwrite(tile_counts, tile_counts_file)
message("Wrote tile->parcel counts: ", tile_counts_file)
69 changes: 69 additions & 0 deletions modules/data.remote/inst/ccmmf/hls/combine_mslsp_tilepieces.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env Rscript
# Combine per-tile MSLSP tilepieces into one annual Parquet for a year.
# Aggregates duplicate parcels at tile boundaries with weighted means and QA modes.
#
# Main inputs: prep from mslsp_prep_static_tilewise(year); tilepieces under tilepieces_year=.
# Main outputs: mslsp_year=Y.parquet in the year output directory.
# How to run: Rscript combine_mslsp_tilepieces.R <year> [overwrite], or via driver combine.
# Workflow: monitoring workflow merge step for MSLSP.

script_dir <- if (length(file_arg <- commandArgs(trailingOnly = FALSE)[grepl("^--file=",
commandArgs(trailingOnly = FALSE))])) {
dirname(sub("^--file=", "", file_arg[1]))
} else "."
source(file.path(script_dir, "extract_summary_core.R"))
source(file.path(script_dir, "tilewise_mslsp_implementation.R"))

#### Combine function
# Read all tilepieces for a year, aggregate cross-tile duplicates, write Parquet.
# Weighted aggregation across tiles (when a parcel spans multiple tiles):
# metric mean: SUM(w_valid * metric_mean) / SUM(w_valid)
# metric sd: derived from parallel variance identity
# QA mode: weighted mode (tile with greatest coverage area wins)
# na_frac: weighted mean of per-tile na_frac values
mslsp_combine <- function(prep, time_key, overwrite = FALSE, verbose = TRUE) {
year <- prep$year

tilepieces_dir <- file.path(prep$out_dir, sprintf("tilepieces_year=%d", year))
out_path <- file.path(prep$out_dir, sprintf("mslsp_year=%d.parquet", year))

if (file.exists(out_path) && !overwrite) {
if (verbose) message("[combine] skip (exists): ", out_path)
return(invisible(out_path))
}

tile_files <- list.files(tilepieces_dir, "^tile=.*\\.csv\\.gz$", full.names = TRUE)
if (length(tile_files) == 0) stop("No tilepieces found in: ", tilepieces_dir)
if (verbose) message("[combine] ", length(tile_files), " tilepieces -> ", out_path)

dt <- rbindlist(lapply(tile_files, function(f) {
tryCatch(fread(f, showProgress = FALSE), error = function(e) NULL)
}), fill = TRUE, use.names = TRUE)

if (nrow(dt) == 0) {
if (verbose) message("[combine] no rows - writing empty parquet")
arrow::write_parquet(dt, out_path)
return(invisible(out_path))
}

agg <- mslsp_aggregate_tilepieces(dt, year)
arrow::write_parquet(agg, out_path)
if (verbose) message("[combine] wrote ", nrow(agg), " rows")
invisible(out_path)
}

#### CLI

if (sys.nframe() == 0) {
args <- commandArgs(trailingOnly = TRUE)
is_ow <- function(x) tolower(x) %in% c("true", "t", "yes", "y", "overwrite")
overwrite_flag <- any(sapply(args, is_ow))

if (length(args) < 1) stop(
"Usage: Rscript combine_mslsp_tilepieces.R <year> [overwrite]"
)

year_arg <- as.integer(args[1])
prep <- mslsp_prep_static_tilewise(year_arg)
mslsp_combine(prep, 1L, overwrite = overwrite_flag)
}
118 changes: 118 additions & 0 deletions modules/data.remote/inst/ccmmf/hls/combine_ndti_tilepieces.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#!/usr/bin/env Rscript
# Combine per-tile NDTI tilepieces into one monthly Parquet for a year and month.
# Aggregates duplicate (parcel_id, date) rows with weighted mean and variance rules.
#
# Main inputs: prep from ndti_prep_static_tilewise(year); tilepieces for that month.
# Main outputs: ndti_year=Y_month=MM.parquet under the year directory.
# How to run: Rscript combine_ndti_tilepieces.R <year> <month> [overwrite], or via driver.
# Workflow: monitoring workflow merge step for NDTI.

script_dir <- if (length(file_arg <- commandArgs(trailingOnly = FALSE)[grepl("^--file=", commandArgs(trailingOnly = FALSE))])) {
dirname(sub("^--file=", "", file_arg[1]))
} else "."
source(file.path(script_dir, "tilewise_ndti_implementation.R"))

#### Combine function
# Read all tilepieces for one month, aggregate cross-tile duplicates, write parquet.
# Weighted aggregation across tiles:
# mean: sum(w * x) / sum(w)
# sd: via parallel variance identity: Var = E[X^2] - E[X]^2
# na_frac: recover w_total = w_valid / (1 - na_frac) per tile row, then
# re-derive na_frac = 1 - sum(w_valid) / sum(w_total) across tiles
ndti_combine <- function(prep, month, overwrite = FALSE, verbose = TRUE) {
year <- prep$year
month <- as.integer(month)

out_path <- path_monthly_output(prep$out_dir, year, month)
if (file.exists(out_path) && !overwrite) {
if (verbose) message("[combine] skip (exists): ", out_path)
return(invisible(out_path))
}

tilepieces_dir <- path_tilepieces(prep$out_dir, year, month)
tile_files <- list.files(tilepieces_dir, "^tile=.*\\.csv\\.gz$", full.names = TRUE)
if (length(tile_files) == 0) stop("No tilepieces found in: ", tilepieces_dir)
if (verbose) message("[combine] ", length(tile_files), " tilepieces -> ", out_path)

col_types <- list(
character = "parcel_id",
integer = "n_valid",
double = c("ndti_mean", "ndti_sd", "w_valid", "sum_w2", "na_frac")
# date and parcel_id read as character; date is cast after rbindlist
)

# Read all tiles; fread handles .gz natively
all_tiles <- rbindlist(
lapply(tile_files, function(f) {
dt <- tryCatch(fread(f, colClasses = col_types), error = function(e) NULL)
if (is.null(dt) || nrow(dt) == 0) return(NULL)
dt
}),
fill = TRUE
)

all_tiles <- all_tiles[!is.na(ndti_mean)]
all_tiles[, date := as.Date(date)]

if (nrow(all_tiles) == 0) {
if (verbose) message("[combine] no non-null rows - writing empty parquet")
result <- data.table(
parcel_id = character(), year = integer(),
date = as.Date(integer(0), origin = "1970-01-01"),
ndti_mean = double(), ndti_sd = double(),
n_valid = integer(), w_valid = double(),
sum_w2 = double(), na_frac = double()
)
arrow::write_parquet(result, out_path)
return(invisible(out_path))
}

# Recover per-row w_total for na_frac re-derivation across tiles
all_tiles[, w_total := fifelse(!is.na(na_frac) & na_frac < 1,
w_valid / (1 - na_frac), 0)]

result <- all_tiles[, {
sw <- sum(w_valid)
mu <- sum(w_valid * ndti_mean) / sw
wt <- sum(w_total)
.(
ndti_mean = mu,
# parallel variance: E[X^2] - E[X]^2
ndti_sd = sqrt(pmax(0,
sum(w_valid * (ndti_sd^2 + ndti_mean^2)) / sw - mu^2
)),
n_valid = sum(as.integer(n_valid)),
w_valid = sw,
sum_w2 = sum(sum_w2),
na_frac = fifelse(wt > 0, 1 - sw / wt, NA_real_)
)
}, by = .(parcel_id, date)]

setorder(result, parcel_id, date)
result[, year := year]
setcolorder(result, c("parcel_id", "year", "date",
"ndti_mean", "ndti_sd", "n_valid", "w_valid", "sum_w2", "na_frac"))

arrow::write_parquet(result, out_path)
if (verbose) message("[combine] wrote ", nrow(result), " rows")
invisible(out_path)
}

#### CLI

if (sys.nframe() == 0) {
args <- commandArgs(trailingOnly = TRUE)
is_ow <- function(x) tolower(x) %in% c("true", "t", "yes", "y", "overwrite")
overwrite_flag <- any(sapply(args, is_ow))

if (length(args) < 2) stop(
"Usage: Rscript combine_ndti_tilepieces.R <year> <month 1-12> [overwrite]"
)

year_arg <- as.integer(args[1])
month_arg <- suppressWarnings(as.integer(args[2]))
if (is.na(month_arg) || month_arg < 1L || month_arg > 12L) stop("Month must be 1-12")

prep <- ndti_prep_static_tilewise(year_arg)
ndti_combine(prep, month_arg, overwrite = overwrite_flag)
}
Loading
Loading