diff --git a/docker/depends/pecan_package_dependencies.csv b/docker/depends/pecan_package_dependencies.csv index 32a5c8d700..298a3161b6 100644 --- a/docker/depends/pecan_package_dependencies.csv +++ b/docker/depends/pecan_package_dependencies.csv @@ -6,6 +6,7 @@ "abind",">= 1.4.5","modules/data.atmosphere","Imports",FALSE "amerifluxr","*","modules/data.atmosphere","Imports",FALSE "arrow","*","modules/data.atmosphere","Imports",FALSE +"arrow","*","modules/data.land","Imports",FALSE "assertthat","*","models/ed","Imports",FALSE "assertthat","*","modules/data.atmosphere","Imports",FALSE "BayesianTools","*","modules/assim.batch","Imports",FALSE @@ -133,6 +134,7 @@ "httr","*","modules/data.atmosphere","Imports",FALSE "httr","*","modules/data.land","Suggests",FALSE "httr","*","modules/data.remote","Suggests",FALSE +"httr2",">= 1.1.0","modules/data.land","Imports",FALSE "IDPmisc","*","modules/assim.batch","Imports",FALSE "imager","*","models/peprmt","Suggests",FALSE "itertools","*","modules/assim.sequential","Suggests",FALSE @@ -668,6 +670,7 @@ "tibble","*","models/fates","Imports",FALSE "tibble","*","models/lpjguess","Imports",FALSE "tibble","*","modules/data.atmosphere","Imports",FALSE +"tibble","*","modules/data.land","Imports",FALSE "tibble","*","modules/data.remote","Suggests",FALSE "tibble","*","modules/meta.analysis","Suggests",FALSE "tictoc","*","modules/assim.sequential","Suggests",FALSE diff --git a/modules/data.land/DESCRIPTION b/modules/data.land/DESCRIPTION index 13691a4327..7f52937bb0 100644 --- a/modules/data.land/DESCRIPTION +++ b/modules/data.land/DESCRIPTION @@ -25,6 +25,7 @@ URL: https://pecanproject.github.io BugReports: https://github.com/PecanProject/pecan/issues Depends: R (>= 4.1.0) Imports: + arrow, coda, curl, dplyr, @@ -32,6 +33,7 @@ Imports: fs, future, furrr, + httr2 (>= 1.1.0), lubridate, magrittr, ncdf4 (>= 1.15), @@ -44,6 +46,7 @@ Imports: sf, stringr, terra, + tibble, tidyr, tidyselect, XML (>= 3.98-1.4) diff --git a/modules/data.land/NAMESPACE b/modules/data.land/NAMESPACE index ba47162bfc..0c701cc961 100644 --- a/modules/data.land/NAMESPACE +++ b/modules/data.land/NAMESPACE @@ -9,9 +9,13 @@ export(InventoryGrowthFusionDiagnostics) export(Read.IC.info.BADM) export(Read_Tucson) export(Soilgrids_SoilC_prep) +export(apply_water_balance) export(buildJAGSdata_InventoryRings) +export(calc_water_balance) +export(calc_water_balance_rice) export(clip_and_save_raster_file) export(cohort2pool) +export(create_event_file) export(dataone_download) export(download.SM_CDS) export(download_package_rm) @@ -23,6 +27,7 @@ export(extract.stringCode) export(extract_FIA) export(extract_NEON_veg) export(extract_SM_CDS) +export(extract_openet_daily) export(extract_soil_gssurgo) export(extract_soil_nc) export(extract_veg) @@ -46,6 +51,7 @@ export(matchInventoryRings) export(match_pft) export(match_species_id) export(mpot2smoist) +export(mslsp_to_canopycover) export(ndti_to_sipnet_tillage) export(netcdf.writer.BADM) export(om2soc) @@ -69,6 +75,9 @@ export(soil_process) export(soilgrids_ic_process) export(soilgrids_soilC_extract) export(soilgrids_texture_extraction) +export(ssurgo_mukeys_bbox) +export(ssurgo_mukeys_bigbbox) +export(ssurgo_mukeys_point) export(subset_layer) export(to.Tag) export(to.TreeCode) diff --git a/modules/data.land/R/data.R b/modules/data.land/R/data.R index a49b3eb76c..34f5c931c7 100644 --- a/modules/data.land/R/data.R +++ b/modules/data.land/R/data.R @@ -80,8 +80,8 @@ "soil_class" #' Fertilizer Nutrient Composition Table -#' -#' A dataset of fertilizer and organic matter addition types +#' +#' A dataset of fertilizer and organic matter addition types #' and their nitrogen and carbon composition, based on the SWAT model's #' `fertilizer.frt` table and DayCent model defaults for organic matter #' C:N ratio parameters. @@ -101,9 +101,9 @@ #' } #' #' @details -#' This table is based on SWAT model's \code{fertilizer.frt} file, and uses +#' This table is based on SWAT model's \code{fertilizer.frt} file, and uses #' C:N ratios (\code{cn_ratio}) from DayCent model default parameter files. -#' \code{fraction_nh3_n} and \code{fraction_no3_n} represent the fraction of +#' \code{fraction_nh3_n} and \code{fraction_no3_n} represent the fraction of #' fertilizer by mass that is ammonium-N and nitrate-N, respectively. This is different from #' the SWAT model's definition of \code{fraction_nh3_n} as a fraction of the total mineral N. #' @@ -123,7 +123,7 @@ #' \item{SUBCLASS}{LandIQ subclass code.} #' \item{subclass_name}{LandIQ subclass name.} #' } -#' @source California Department of Water Resources. (2023). Statewide Crop Mapping—California +#' @source California Department of Water Resources. (2023). Statewide Crop Mapping—California #' Natural Resources Agency Open Data. Metadata retrieved from https://data.cnra.ca.gov/dataset/statewide-crop-mapping and manually extracted into `data-raw/landiq_crop_mapping_codes.tsv`. "landiq_crop_mapping_codes" @@ -132,7 +132,7 @@ #' Crop and growth stage specific coefficients (Kc) from the Basic Irrigation Scheduling #' (BIS) Excel workbook (Snyder et. al., 2014). #' The dataset is an export of the BISm.xlsx workbook's `CropRef` worksheet, with columns renamed -#' and columns added that map to LandIQ CADWR land use dataset +#' and columns added that map to LandIQ CADWR land use dataset #' (\code{\link{landiq_crop_mapping_codes}}; California Department of Water Resources, 2023). #' This dataset provides the information needed to reconstruct a stage-based daily Kc curve when #' combined with grass-reference evapotranspiration (ETo), such as that provided @@ -287,3 +287,31 @@ #' \code{\link{look_up_fertilizer_components}} for fertilizer nutrient #' composition (N/C fractions) from the SWAT/DayCent database. "ca_compost_amendment" + +#' Crop-specific rooting depths and water-depletion thresholds +#' +#' Maximum effective rooting depth and minimum soil water content thresholds +#' for various crops. The `whc_min_frac` column represents the fraction of +#' total available water (TAW) that should remain in the root zone to avoid +#' moisture stress (equivalent to 1 - p, where p is the depletion fraction +#' from FAO-56). +#' +#' @format A tibble with one row per crop and the following columns: +#' \describe{ +#' \item{crop_number}{BIS crop number (character). Blank for crops not in BIS.} +#' \item{crop_name}{Crop name.} +#' \item{Category}{Crop category (e.g., Woody Perennial, Annual (Hardy)).} +#' \item{rooting_depth_m}{Maximum effective rooting depth in meters.} +#' \item{whc_min_frac}{Minimum soil water as fraction of available water-holding capacity (0-1).} +#' \item{whc_notes}{Rationale or source for the minimum WHC value.} +#' \item{rooting_depth_notes}{Rationale or source for the rooting depth value.} +#' } +#' @source Allen, R. G., Pereira, L. S., Raes, D., & Smith, M. +#' \emph{FAO Irrigation and Drainage Paper No. 56: Crop evapotranspiration}. Chapter 8. Table 22. +#' https://www.fao.org/4/x0490e/x0490e0e.htm#chapter%208%20%20%20etc%20under%20soil%20water%20stress%20conditions +#' @examples +#' data(crop_whc) +#' head(crop_whc) +#' +#' @keywords datasets +"crop_whc" diff --git a/modules/data.land/R/gSSURGO_Query.R b/modules/data.land/R/gSSURGO_Query.R index d90d1e02c0..83050d9e91 100644 --- a/modules/data.land/R/gSSURGO_Query.R +++ b/modules/data.land/R/gSSURGO_Query.R @@ -30,10 +30,10 @@ #' | `chorizon.om_r` | Organic matter (<2 mm soil) | % | #' | `chorizon.hzdept_r` | Horizon top depth | cm | #' | `chfrags.fragvol_r` | Rock fragments | % (by volume)| -#' | `chorizon.dbthirdbar_r`| Bulk density at field capacity | g/cm³ | +#' | `chorizon.dbthirdbar_r`| Bulk density at field capacity | g/cm3 | #' | `chorizon.ph1to1h2o_r` | Soil pH (1:1 H2O) | pH (unitless)| -#' | `chorizon.cokey` | Component key (identifier) | — | -#' | `chorizon.chkey` | Horizon key (identifier) | — | +#' | `chorizon.cokey` | Component key (identifier) | - | +#' | `chorizon.chkey` | Horizon key (identifier) | - | #' #' **API stability:** The NRCS occasionally modifies the API schema. If queries fail, #' adjustments may be required here to align with the updated structure. @@ -138,4 +138,281 @@ gSSURGO.Query <- function(mukeys, } +#' Maximum area for SSURGO API requests +SSURGO_API_MAX_AREA_M2 <- 10100000000 # nolint: object_name_linter +#' Get map unit keys (mukeys) from gSSURGO +#' +#' These functions query the NRCS gSSURGO Web Feature Service to retrieve map +#' unit keys based on different spatial filters. +#' +#' @param bbox Numeric vector of length 4: c(xmin, ymin, xmax, ymax) in WGS84 +#' (EPSG:4326). Features that intersect the bounding box are returned. +#' @param point Numeric vector of length 2: c(lon, lat) in WGS84 (EPSG:4326). +#' @param distance Numeric. Distance in meters from the point. Use 0 for exact +#' point intersection. +#' +#' @return Character vector of unique map unit keys (mukeys). +#' +#' @details +#' These functions use the NRCS SDM Data Access Web Feature Service: +#' \url{https://sdmdataaccess.nrcs.usda.gov/SpatialFilterHelp.htm} +#' +#' The total extent cannot exceed 10,100,000,000 square meters (~3,900 square +#' miles). Use `ssurgo_mukeys_bigbbox()` for large bounding boxes. +#' +#' @examples +#' \dontrun{ +#' # Bounding box query +#' mukeys <- ssurgo_mukeys_bbox(bbox = c(-114.006, 32.1823, -113.806, 32.2823)) +#' +#' # Point with distance (600m radius) +#' mukeys <- ssurgo_mukeys_point(point = c(-91.22, 38.46), distance = 600) +#' +#' # Large bounding box +#' mukeys <- ssurgo_mukeys_bigbbox(bbox = c(-120, 35, -110, 45)) +#' } +#' @name ssurgo_mukeys +NULL + +#' @rdname ssurgo_mukeys +#' @export +ssurgo_mukeys_bbox <- function(bbox) { + if (!is.numeric(bbox) || length(bbox) != 4) { + stop("bbox must be a numeric vector of length 4: c(xmin, ymin, xmax, ymax)") + } + + xmin <- bbox[1] + ymin <- bbox[2] + xmax <- bbox[3] + ymax <- bbox[4] + + if (xmin >= xmax || ymin >= ymax) { + stop("bbox must have xmin < xmax and ymin < ymax") + } + + wgs84_crs <- sf::st_crs(4326) + + # Calculate the area of the bbox to make sure that it's smaller than the + # SSURGO limit (`SSURGO_API_MAX_AREA_M2`). + bbox_matrix <- rbind( + c(xmin, ymin), + c(xmax, ymin), + c(xmax, ymax), + c(xmin, ymax), + c(xmin, ymin) + ) + bbox_poly <- sf::st_polygon(list(bbox_matrix)) + bbox_sf <- sf::st_sfc(bbox_poly, crs = wgs84_crs) + area <- as.numeric(sf::st_area(bbox_sf)) + + if (area > SSURGO_API_MAX_AREA_M2) { + stop( + paste0( + "Bounding box area (", format(area, scientific = FALSE), + " m2) exceeds maximum allowed area (", format(SSURGO_API_MAX_AREA_M2, scientific = FALSE), + " m2). Use ssurgo_mukeys_bigbbox() for large bounding boxes." + ) + ) + } + + base_url <- "https://sdmdataaccess.nrcs.usda.gov/Spatial/SDMWGS84Geographic.wfs" + + query <- list( + SERVICE = "WFS", + VERSION = "1.1.0", + REQUEST = "GetFeature", + TYPENAME = "MapunitPoly", + BBOX = paste(bbox, collapse = ","), + OUTPUTFORMAT = "XMLMukeyList" + ) + + resp <- httr2::request(base_url) |> + httr2::req_url_query(!!!query) |> + httr2::req_perform() + + httr2::resp_check_status(resp) + + mukeys <- unique(parse_mukey_response(resp)) + + mukeys +} + +#' @rdname ssurgo_mukeys +#' @export +ssurgo_mukeys_point <- function(point, distance) { + if (length(point) != 2) { + stop("point must be a numeric vector of length 2: c(lon, lat)") + } + + if (!is.numeric(distance) || distance < 0) { + stop("distance must be a non-negative numeric value") + } + + lon <- point[1] + lat <- point[2] + + circle_area <- pi * (distance^2) + if (circle_area > SSURGO_API_MAX_AREA_M2) { + stop( + paste0( + "Search radius area (", format(circle_area, scientific = FALSE), + " m2) exceeds maximum allowed area (", format(SSURGO_API_MAX_AREA_M2, scientific = FALSE), + " m2)." + ) + ) + } + + filter_xml <- paste0( + "", + "", + "Geometry", + "", + "", lon, ",", lat, "", + "", + "", distance, "", + "", + "" + ) + + base_url <- "https://sdmdataaccess.nrcs.usda.gov/Spatial/SDMWGS84Geographic.wfs" + + query <- list( + SERVICE = "WFS", + VERSION = "1.1.0", + REQUEST = "GetFeature", + TYPENAME = "MapunitPoly", + FILTER = filter_xml, + OUTPUTFORMAT = "XMLMukeyList" + ) + + resp <- httr2::request(base_url) |> + httr2::req_url_query(!!!query) |> + httr2::req_perform() + + httr2::resp_check_status(resp) + + mukeys <- unique(parse_mukey_response(resp)) + + mukeys +} + +#' @rdname ssurgo_mukeys +#' @export +ssurgo_mukeys_bigbbox <- function(bbox) { + if (!is.numeric(bbox) || length(bbox) != 4) { + stop("bbox must be a numeric vector of length 4: c(xmin, ymin, xmax, ymax)") + } + + xmin <- bbox[1] + ymin <- bbox[2] + xmax <- bbox[3] + ymax <- bbox[4] + + if (xmin >= xmax || ymin >= ymax) { + stop("bbox must have xmin < xmax and ymin < ymax") + } + + wgs84_crs <- sf::st_crs(4326) + + # Get the total bbox area. + bbox_matrix <- rbind( + c(xmin, ymin), + c(xmax, ymin), + c(xmax, ymax), + c(xmin, ymax), + c(xmin, ymin) + ) + bbox_poly <- sf::st_polygon(list(bbox_matrix)) + bbox_sf <- sf::st_sfc(bbox_poly, crs = wgs84_crs) + + bbox_area <- as.numeric(sf::st_area(bbox_sf)) + + width_deg <- xmax - xmin + height_deg <- ymax - ymin + + aspect_ratio <- width_deg / height_deg + + n_cells <- ceiling(bbox_area / SSURGO_API_MAX_AREA_M2) + cells_per_side <- sqrt(n_cells) + + ncol_cells <- ceiling(cells_per_side * sqrt(aspect_ratio)) + nrow_cells <- ceiling(cells_per_side / sqrt(aspect_ratio)) + + if (ncol_cells < 1) ncol_cells <- 1 + if (nrow_cells < 1) nrow_cells <- 1 + + grid_wgs84 <- sf::st_make_grid( + bbox_sf, + n = c(ncol_cells, nrow_cells), + crs = wgs84_crs + ) + + cell_bboxes <- purrr::map(grid_wgs84, sf::st_bbox) + + base_url <- "https://sdmdataaccess.nrcs.usda.gov/Spatial/SDMWGS84Geographic.wfs" + + reqs <- purrr::map(cell_bboxes, function(cell_bbox) { + cell_vec <- c( + cell_bbox["xmin"], cell_bbox["ymin"], + cell_bbox["xmax"], cell_bbox["ymax"] + ) + query <- list( + SERVICE = "WFS", + VERSION = "1.1.0", + REQUEST = "GetFeature", + TYPENAME = "MapunitPoly", + BBOX = paste(cell_vec, collapse = ","), + OUTPUTFORMAT = "XMLMukeyList" + ) + httr2::request(base_url) |> + httr2::req_url_query(!!!query) + }) + + reqs_throttled <- reqs |> + # max 10 tries per minute + purrr::map(httr2::req_throttle, capacity = 10) |> + # keep trying for 2 minutes before giving up + purrr::map(httr2::req_retry, max_seconds = 120) + + resps <- httr2::req_perform_parallel( + reqs_throttled, + on_error = "continue", + max_active = 10, + progress = TRUE + ) + + parse_mukeys <- function(resp) { + if (inherits(resp, "httr2_response")) { + tryCatch({ + parse_mukey_response(resp) + }, error = function(e) { + character(0) + }) + } else { + character(0) + } + } + + mukeys_list <- purrr::map(resps, parse_mukeys) + + unique(unlist(mukeys_list, use.names = FALSE)) +} + +#' Parse responses from the mukey WFS service +#' +#' @param resp `httr2` response object from SSURGO mukey WFS API +#' @return character vector of mukeys +parse_mukey_response <- function(resp) { + resp_text <- httr2::resp_body_string(resp) + resp_xml <- XML::xmlParse(resp_text) + mukey_nodes <- XML::getNodeSet(resp_xml, "//MapUnitKeyList") + if (length(mukey_nodes) == 0) { + return(character(0)) + } + mukey_str <- XML::xmlValue(mukey_nodes[[1]]) + if (is.null(mukey_str) || nchar(trimws(mukey_str)) == 0) { + return(character(0)) + } + strsplit(trimws(mukey_str), ",")[[1]] +} diff --git a/modules/data.land/R/irrigation-event-files.R b/modules/data.land/R/irrigation-event-files.R new file mode 100644 index 0000000000..e7eb347541 --- /dev/null +++ b/modules/data.land/R/irrigation-event-files.R @@ -0,0 +1,23 @@ +#' Create SIPNET event files from water balance data +#' +#' Aggregates irrigation to weekly values and formats for SIPNET. +#' Irrigation is summed by week and reported on the first day of each week. +#' Units are converted from mm to cm. +#' +#' @param df Data frame with columns: date, year, week, day_of_year, irr +#' @return Data frame with columns: loc, year, doy, event_type, irr_cm, type +#' @export +create_event_file <- function(df) { + df |> + dplyr::summarize( + loc = 0, + year = dplyr::first(.data$year), + doy = dplyr::first(.data$day_of_year), + event_type = "irrig", + irr_mm_week = sum(.data$irr, na.rm = TRUE), + type = 1, + .by = c(.data$year, .data$week) + ) |> + dplyr::mutate(irr_cm = .data$irr_mm_week / 10) |> + dplyr::select("loc", "year", "doy", "event_type", "irr_cm", "type") +} diff --git a/modules/data.land/R/mslsp_to_canopycover.R b/modules/data.land/R/mslsp_to_canopycover.R new file mode 100644 index 0000000000..074b49e77e --- /dev/null +++ b/modules/data.land/R/mslsp_to_canopycover.R @@ -0,0 +1,79 @@ +#' Convert MSLSP phenology data to tidy canopy cover +#' +#' @param mslsp_path Path to directory containing MSLSP outputs (in parquet format) +#' @param parcel_ids Vector of parcel IDs for filtering. If `NULL`, use all parcels. +#' @param years Vector of years for filtering. If `NULL`, use all years. +#' +#' @return `data.frame` of `parcel_id`, `year`, `season`, `date`, and +#' `canopy_cover` (fraction). `date` is a sequence from the MSLSP greenness +#' onset (growing season start) to the greenness minimum (growing season end). +#' `canopy_cover` is the fractional canpoy cover (0 to 1), suitable for ingest +#' into [eto_to_etc_bism()] in "canopy cover mode". +#' @export +mslsp_to_canopycover <- function(mslsp_path, parcel_ids = NULL, years = NULL) { + stopifnot(dir.exists(mslsp_path)) + mslsp_files <- list.files(mslsp_path, "\\.parquet", full.names = TRUE) + mslsp_dat <- arrow::open_dataset(mslsp_files) + if (!is.null(parcel_ids)) { + mslsp_dat <- mslsp_dat |> + dplyr::filter(.data[["parcel_id"]] %in% unique(parcel_ids)) + } + if (!is.null(years)) { + mslsp_dat <- mslsp_dat |> + dplyr::filter(.data[["year"]] %in% years) + } + common_cols <- c( + "parcel_id", + "year", + "season", + "mslsp_cycle", + "landiq_PFT", + "landiq_CLASS", + "landiq_SUBCLASS" + ) + mslsp_tbl <- mslsp_dat |> + dplyr::filter(!is.na(.data[["mslsp_cycle"]])) |> + dplyr::select( + dplyr::all_of(common_cols), + dplyr::all_of(.MSLSP_DATE_MAPPING[["date_name"]]) + ) |> + dplyr::collect() |> + tibble::as_tibble() + + result <- mslsp_tbl |> + dplyr::mutate( + cc_nested = purrr::pmap( + dplyr::pick(dplyr::all_of(.MSLSP_DATE_MAPPING[["date_name"]])), + \(...) expand_mslsp_cycle(list(...)), + .progress = TRUE + ) + ) |> + dplyr::select(dplyr::all_of(common_cols), "cc_nested") |> + tidyr::unnest("cc_nested") + + result +} + +expand_mslsp_cycle <- function(mslsp_row) { + dates <- as.Date(unlist(mslsp_row[.MSLSP_DATE_MAPPING[["date_name"]]])) + all_dates <- seq(min(dates), max(dates), by = "1 day") + tibble::tibble( + date = all_dates, + canopy_cover = stats::approx( + x = dates, + y = .MSLSP_DATE_MAPPING$canopy_cover, + xout = all_dates + )[["y"]] + ) +} + +.MSLSP_DATE_MAPPING <- tibble::tribble( + ~date_name, ~canopy_cover, + "mslsp_OGI", 0.15, + "mslsp_50PCGI", 0.5, + "mslsp_OGMx", 0.9, + "mslsp_Peak", 1.0, + "mslsp_OGD", 0.9, + "mslsp_50PCGD", 0.5, + "mslsp_OGMn", 0.15 +) diff --git a/modules/data.land/R/openET.R b/modules/data.land/R/openET.R new file mode 100644 index 0000000000..66e39b6291 --- /dev/null +++ b/modules/data.land/R/openET.R @@ -0,0 +1,82 @@ +#' Extract daily ET data from OpenET +#' +#' Note that this requires the environment variable `OPENET_API_KEY` to be set. +#' A convenient way to do this is via a `.Renviron`, either globally +#' (`~/.Renviron`) or in the current working directory (`./.Renviron`), with +#' contents like: +#' +#' ``` +#' OPENET_API_KEY="abcdefg123456" +#' ``` +#' +#' You can obtain an OpenET API key from the OpenET data portal. +#' +#' @param design_points `data.frame` of design points with columns `lat` and `lon` +#' @param start_date Start date for data extraction +#' @param end_date End date for data extraction +#' +#' @return `design_points` `data.frame` with additional columns `date`, and +#' `et_mm_day` (ET, mm/day) +#' @export +extract_openet_daily <- function(design_points, start_date, end_date) { + api_key <- Sys.getenv("OPENET_API_KEY") + if (api_key == "") { + stop("OPENET_API_KEY environment variable is not set") + } + + start_date_str <- format(start_date, "%Y-%m-%d") + end_date_str <- format(end_date, "%Y-%m-%d") + + request_body_template <- list( + date_range = c(start_date_str, end_date_str), + interval = "daily", + model = "Ensemble", + variable = "ET", + reference_et = "gridMET", + units = "mm", + file_format = "JSON" + ) + + prep_request <- function(lon, lat) { + request_body <- request_body_template + request_body$geometry <- c(lon, lat) + + httr2::request("https://openet-api.org/raster/timeseries/point") |> + httr2::req_headers(Authorization = api_key) |> + httr2::req_body_json(request_body) |> + httr2::req_throttle(capacity = 10, fill_time_s = 1) |> + httr2::req_retry(max_tries = 3) |> + httr2::req_timeout(seconds = 150) + } + + raw_results <- design_points |> + dplyr::mutate( + reqs = purrr::map2(.data$lon, .data$lat, prep_request), + resps = httr2::req_perform_parallel( + raw_results[["reqs"]], + max_active = 10, + on_error = "continue" + ) + ) + + parse_response <- function(resp) { + if (!inherits(resp, "httr2_response")) { + return(NULL) + } + data <- httr2::resp_body_json(resp) + if (length(data) == 0 || is.null(data[[1]]$time)) { + return(NULL) + } + tibble::tibble( + date = as.Date(purrr::map_chr(data, "time")), + et_mm_day = purrr::map_dbl(data, "et") + ) + } + + results <- raw_results |> + dplyr::mutate(results = purrr::map(.data$resps, parse_response)) |> + dplyr::select(-c("reqs", "resps")) |> + tidyr::unnest("results", keep_empty = TRUE) + + results +} diff --git a/modules/data.land/R/water_balance.R b/modules/data.land/R/water_balance.R new file mode 100644 index 0000000000..12a266e7c5 --- /dev/null +++ b/modules/data.land/R/water_balance.R @@ -0,0 +1,413 @@ +#' Calculate water balance for a time series at a single site +#' +#' This is the core water balance calculation that operates on primitive +#' numeric vectors for easy testing and debugging. Each input is a time series +#' of daily values for a single location (one date per row). The units for all +#' quantities are arbitrary, but they should be consistent (distance / time; +#' e.g., most commonly, mm/day). +#' +#' This function operates in *relative* WHC space, where: +#' - `w = 0` represents wilting point (no plant-available water) +#' - `w = whc` represents field capacity (maximum plant-available water) +#' - `whc = (field_capacity - wilting_point) * rooting_depth` (the plant-available range) +#' +#' Although this function can be used as a crude approximation of rice +#' irrigation (by setting `whc_min_frac = 1.0`), we recommend using +#' [calc_water_balance_rice()] instead, which explicitly tracks rice pond +#' depth, implements seepage, etc. +#' +#' @param et Vector of evapotranspiration values (distance / time) +#' @param precip Vector of precipitation values (distance / time) +#' @param whc Water holding capacity (distance); the plant-available range from +#' wilting point to field capacity (i.e., `whc = field_capacity - wilting_point`). +#' Can be a single value or a vector of the same length as `et`. +#' @param whc_min_frac Fraction of WHC for minimum water level (irrigation +#' trigger); unused if `w_min` is explicitly specified. +#' Can be a single value or a vector of the same length as `et`. +#' @param W_initial Initial soil water content at start of time series +#' (distance); defaults to `whc[1]` (field capacity) if NULL +#' @param w_min Minimum water level threshold (distance); irrigation is +#' triggered when soil water falls below this level; defaults to +#' `whc_min_frac * whc` if NULL. +#' Can be a single value or a vector of the same length as `et`. +#' @param irrigation_max If set, maximum amount of irrigation to apply at a +#' time (distance). +#' @return List with vectors: W_t (soil water), irr (irrigation), runoff +#' @examples +#' # Calculate WHC from field capacity, wilting point, and rooting depth +#' field_capacity <- 0.30 # volumetric (m3/m3) +#' wilting_point <- 0.10 # volumetric (m3/m3) +#' rooting_depth <- 1000 # mm +#' whc <- (field_capacity - wilting_point) * rooting_depth # mm +#' +#' # Run water balance with 5 days of ET and precip data +#' et <- c(4, 5, 6, 4, 3) # mm/day +#' precip <- c(0, 0, 10, 0, 0) # mm/day +#' result <- calc_water_balance(et, precip, whc = whc, whc_min_frac = 0.5) +#' str(result) +#' @export +calc_water_balance <- function( + et, + precip, + whc, + whc_min_frac, + W_initial = NULL, #nolint: object_name_linter + w_min = NULL, + irrigation_max = NULL +) { + + # nolint start: object_name_linter + n <- length(et) + + if (!length(precip) == n) { + PEcAn.logger::logger.severe( + "Precip and ET have different lengths. ", + "length(precip) = ", length(precip), " ", + "length(et) = ", n + ) + } + + ensure_vec <- function(x, n, name) { + if (length(x) == 1) { + rep(x, n) + } else if (length(x) == n) { + x + } else { + PEcAn.logger::logger.severe( + sprintf( + "%s must have length 1 or %d; actual length = %d", + name, + n, + length(x) + ) + ) + } + } + + whc <- ensure_vec(whc, n, "whc") + if (!is.null(whc_min_frac)) { + whc_min_frac <- ensure_vec(whc_min_frac, n, "whc_min_frac") + } + + if (is.null(w_min)) { + if (is.null(whc_min_frac)) { + PEcAn.logger::logger.severe( + "Either whc_min_frac or w_min must be provided" + ) + } + w_min <- whc_min_frac * whc + } else { + w_min <- ensure_vec(w_min, n, "w_min") + } + + if (is.null(W_initial)) { + # Initialize at field capacity (i.e., full WHC) + W_prev <- whc[1] + } else { + W_prev <- W_initial + } + + W_t <- numeric(n) + irr <- numeric(n) + runoff <- numeric(n) + + for (t in seq_len(n)) { + # Potential state after precip and ET + W0 <- W_prev + precip[t] - et[t] + + # If W0 falls below w_min (e.g., high ET; low precip), irrigate + # to field capacity (i.e., full WHC), but no more than irrigation_max. + if (W0 < w_min[t]) { + irr[t] <- min(whc[t] - W0, irrigation_max) + W0 <- W0 + irr[t] + } else { + irr[t] <- 0 + } + + # If W0 exceeds field capacity (i.e., whc), the difference is runoff. + if (W0 > whc[t]) { + runoff[t] <- W0 - whc[t] + W_t[t] <- whc[t] + } else { + runoff[t] <- 0 + W_t[t] <- max(W0, w_min[t]) + } + + W_prev <- W_t[t] + } + + # nolint end: object_name_linter + + list(W_t = W_t, irr = irr, runoff = runoff) +} + +#' Calculate water balance for a flooded rice paddy +#' +#' Models the water balance of a flooded rice system with a two-layer +#' structure: a ponded water layer above a saturated soil profile. This is +#' physically distinct from the upland soil water balance in +#' [calc_water_balance()]. Water is managed to maintain a target flood depth, +#' with support for mid-season drainage events. +#' +#' The soil profile is assumed to be continuously saturated during flooded +#' periods, so plant-available soil water is not tracked separately. ET is +#' applied directly to the pond layer (open-water ET during flooded periods). +#' +#' Irrigation is triggered when pond_depth falls below flood_min. Farmers +#' refill to flood_target. Runoff (bund overflow) occurs when pond_depth +#' exceeds flood_max. +#' +#' Mid-season drainage is specified as a logical vector (`drain[t] = TRUE` +#' means the field is intentionally drained on day t). During drainage days, +#' the pond is drawn down to pond_depth = 0 and irrigation is suppressed. This +#' represents practices such as weed control or pre-harvest drainage. +#' +#' @param et Numeric vector. Daily reference ET. During flooded +#' periods this is treated as open-water ET; no crop +#' coefficient is applied here but you can pre-multiply if +#' needed. +#' @param precip Numeric vector. Daily precipitation. +#' @param flood_target Numeric scalar. Target ponded water depth. +#' Irrigation refills to this level. +#' @param flood_min Numeric scalar. Minimum acceptable pond depth before +#' irrigation is triggered. +#' @param flood_max Numeric scalar. Maximum pond depth before bund +#' overflow / runoff occurs. +#' @param seepage Numeric scalar. Daily seepage + percolation loss +#' Represents losses through the bund and downward percolation +#' through the hardpan (if any). Typical range: 1-5 mm/day +#' for well-puddled California soils. +#' @param drain Logical vector (same length as et). TRUE on days when an +#' intentional drainage event occurs (e.g., mid-season drain, +#' pre-harvest drawdown). Pond is set to 0 on these days and +#' irrigation is suppressed. +#' @param pond_init Numeric scalar. Initial pond depth at t = 1. +#' Defaults to flood_target. +#' +#' @return A list with numeric vectors of length n: +#' \item{pond_depth}{Ponded water depth at end of each day} +#' \item{irr}{Irrigation applied} +#' \item{runoff}{Bund overflow / surface runoff} +#' +#' @export +calc_water_balance_rice <- function( + et, + precip, + flood_target, + flood_min, + flood_max, + seepage, + drain = NULL, + pond_init = flood_target +) { + n <- length(et) + + if (length(precip) != n) { + PEcAn.logger::logger.severe("et and precip must be the same length") + } + if (is.null(drain)) { + drain <- rep(FALSE, n) + } + if (length(drain) != n) { + PEcAn.logger::logger.severe("drain must be the same length as et") + } + if (flood_min >= flood_target) { + PEcAn.logger::logger.severe("flood_min must be less than flood_target") + } + if (flood_target >= flood_max) { + PEcAn.logger::logger.severe("flood_target must be less than flood_max") + } + if (seepage < 0) { + PEcAn.logger::logger.severe("seepage must be non-negative") + } + + pond_depth <- numeric(n) + irr <- numeric(n) + runoff <- numeric(n) + + pond_prev <- pond_init + + for (t in seq_len(n)) { + # --- Intentional drainage day ----------------------------------------- + # The field is deliberately drained (mid-season weed control, pre-harvest, + # etc.). All water in the pond is released as managed drainage, counted + # as runoff. Irrigation is suppressed for the day. + if (drain[t]) { + runoff[t] <- pond_prev + precip[t] # drain existing pond + any rain + irr[t] <- 0 + pond_depth[t] <- 0 + pond_prev <- 0 + next + } + + # --- Normal flooded day ----------------------------------------------- + + # 1. Fluxes: precip adds, ET and seepage remove. + # Seepage is capped at available pond depth so we don't go below zero + # before irrigation is assessed. + actual_seepage <- min(seepage, max(0, pond_prev)) + pond0 <- pond_prev + precip[t] - et[t] - actual_seepage + + # 2. Irrigation: refill to flood_target if pond drops below flood_min. + # Note that pond0 can be negative if ET is very high (e.g., early + # season before the pond is established). Irrigation covers the full + # deficit back to the target. + if (pond0 < flood_min) { + irr[t] <- flood_target - pond0 + pond0 <- flood_target + } else { + irr[t] <- 0 + } + + # 3. Runoff (bund overflow): any depth exceeding flood_max spills over. + if (pond0 > flood_max) { + runoff[t] <- pond0 - flood_max + pond_depth[t] <- flood_max + } else { + runoff[t] <- 0 + pond_depth[t] <- max(pond0, 0) + } + + pond_prev <- pond_depth[t] + } + + list(pond_depth = pond_depth, irr = irr, runoff = runoff) +} + +#' Apply water balance calculations to a data frame with multiple sites +#' +#' Groups by location and applies calc_water_balance to each group. Unlike +#' `calc_water_balance`, the units here *do* matter -- they should be `mm_day`. +#' +#' @param df Data frame with columns: `date`, `location_id`, `etc_mm_day`, +#' `precip_mm_day`, `crop_name`, and `whc_min_frac` (optional, defaults to +#' 0.375). If a `whc_mm` column is present, it is used as the water holding +#' capacity. +#' @param idcol Column name for grouping (typically, `location_id`, `parcel_id` +#' or similar). +#' @param whc_mm Water holding capacity (mm); ignored if `whc_mm` is a column +#' in `df`. +#' @param irrigation_max_mm Maximum irrigation to be applied at a time. See +#' `irrigation_max` argument of [calc_water_balance()]. Ignored if +#' `irrigation_max_mm` is a column of `df`. +#' @inheritParams calc_water_balance_rice +#' @return Data frame with added columns: `W_t` / `pond_depth`, `irr`, `runoff` +#' @export +apply_water_balance <- function( + df, + idcol, + whc_mm = 500, + irrigation_max_mm = 150, + flood_target = 125, + flood_min = 62.5, + flood_max = 175, + seepage = 2.5 +) { + need_cols <- c("etc_mm_day", "precip_mm_day", "date", "crop_name") + missing_cols <- need_cols[!(need_cols %in% colnames(df))] + default_whc_min_frac <- 0.375 + if (length(missing_cols) > 0) { + PEcAn.logger::logger.severe( + "Missing the following required columns: ", + paste(missing_cols, collapse = ", ") + ) + } + + if ("whc_min_frac" %in% colnames(df)) { + n_na <- sum(is.na(df$whc_min_frac)) + if (n_na > 0) { + PEcAn.logger::logger.warn( + sprintf( + "whc_min_frac has %d NA values. Replacing with default = %.3f", + n_na, + default_whc_min_frac + ) + ) + } + } else { + PEcAn.logger::logger.warn( + "whc_min_frac column not found in input data. Using default = ", + default_whc_min_frac + ) + df[["whc_min_frac"]] <- default_whc_min_frac + } + + if (!("whc_mm" %in% colnames(df))) { + df[["whc_mm"]] <- whc_mm + } + + if (!("irrigation_max_mm" %in% colnames(df))) { + df[["irrigation_max_mm"]] <- irrigation_max_mm + } + + try_wb_rice <- function(...) { + tryCatch( + calc_water_balance_rice(...), + error = function(e) { + warning("Hit the following error: \n\n", e$message) + list( + pond_depth = NA_real_, + irr = NA_real_, + runoff = NA_real_ + ) + } + ) + } + + rice <- df |> + dplyr::filter(.data$crop_name == "Rice") |> + dplyr::arrange(.data[[idcol]], .data$date) |> # nolint: object_usage_linter + dplyr::mutate( + year = as.integer(format(.data$date, "%Y")), + week = as.integer(format(.data$date, "%U")), + day_of_year = as.integer(format(.data$date, "%j")), + results = tibble::as_tibble(try_wb_rice( + et = .data$etc_mm_day, + precip = .data$precip_mm_day, + flood_target = .env$flood_target, + flood_min = .env$flood_min, + flood_max = .env$flood_max, + seepage = .env$seepage + )), + .by = dplyr::all_of(idcol) + ) |> + tidyr::unpack(.data$results) + + try_wb <- function(...) { + tryCatch( + calc_water_balance(...), + error = function(e) { + warning("Hit the following error: \n\n", e$message) + list( + W_t = NA_real_, + irr = NA_real_, + runoff = NA_real_ + ) + } + ) + } + + others <- df |> + dplyr::filter(.data$crop_name != "Rice") |> + dplyr::arrange(.data[[idcol]], .data$date) |> # nolint: object_usage_linter + dplyr::mutate( + year = as.integer(format(.data$date, "%Y")), + week = as.integer(format(.data$date, "%U")), + day_of_year = as.integer(format(.data$date, "%j")), + whc_min_frac = tidyr::replace_na( + .data$whc_min_frac, + default_whc_min_frac + ), + results = tibble::as_tibble(try_wb( + et = .data$etc_mm_day, + precip = .data$precip_mm_day, + whc = .data$whc_mm, + whc_min_frac = .data$whc_min_frac, + irrigation_max = .data$irrigation_max_mm + )), + .by = dplyr::all_of(idcol) + ) |> + tidyr::unpack(.data$results) + + dplyr::bind_rows(rice, others) +} diff --git a/modules/data.land/data-raw/crop_whc.R b/modules/data.land/data-raw/crop_whc.R new file mode 100644 index 0000000000..05fcf4fcd3 --- /dev/null +++ b/modules/data.land/data-raw/crop_whc.R @@ -0,0 +1,25 @@ +#!/usr/bin/env Rscript + +library(readr) + +# Convert warnings to errors +options(warn = 2) + +raw_csv <- file.path("data-raw", "crop_whc.csv") +stopifnot(file.exists(raw_csv)) + +crop_whc <- read_csv( + raw_csv, + col_types = cols( + crop_number = col_character(), + crop_name = col_character(), + Category = col_character(), + rooting_depth_m = col_double(), + whc_min_frac = col_double(), + whc_notes = col_character(), + rooting_depth_notes = col_character() + ), + progress = FALSE +) + +usethis::use_data(crop_whc, overwrite = TRUE) diff --git a/modules/data.land/data-raw/crop_whc.csv b/modules/data.land/data-raw/crop_whc.csv new file mode 100644 index 0000000000..766fbf01ea --- /dev/null +++ b/modules/data.land/data-raw/crop_whc.csv @@ -0,0 +1,67 @@ +crop_number,crop_name,Category,rooting_depth_m,whc_min_frac,whc_notes,rooting_depth_notes +2.01,Alfalfa (annual),Non-woody Perennial,1.5,0.45,FAO-56 Table 22 (hay),FAO-56 Table 22 +1.01,Alfalfa (cycle),Non-woody Perennial,2,0.4,FAO-56 Table 22 (seed),FAO-56 Table 22 +3.01,Almonds,Woody Perennial,1.5,0.6,FAO-56 Table 22,FAO-56 Table 22 +3.02,Apple,Woody Perennial,1.5,0.5,FAO-56 Table 22,FAO-56 Table 22 +1.02,Artichokes,Non-woody Perennial,0.75,0.55,FAO-56 Table 22,FAO-56 Table 22 +1.03,Asparagus,Non-woody Perennial,1.5,0.55,FAO-56 Table 22,FAO-56 Table 22 +4.01,Avocado,Woody Perennial,0.75,0.3,FAO-56 Table 22,FAO-56 Table 22 +1.04,Barley,Annual (Hardy),1.25,0.45,FAO-56 Table 22,FAO-56 Table 22 +1.06,Beans (dry),Annual (Hardy),0.75,0.55,FAO-56 Table 22,FAO-56 Table 22 +,Beans (Fresh),Annual (Sensitive),0.6,0.55,FAO-56 Table 22,FAO-56 Table 22 +1.05,Beans (pinto),Annual (Hardy),0.6,0.55,FAO-56 Table 22,FAO-56 Table 22 +1.08,Beets (table),Annual (Sensitive),0.8,0.5,FAO-56 Table 22,FAO-56 Table 22 +1.09,Broccoli,Annual (Sensitive),0.5,0.55,FAO-56 Table 22,FAO-56 Table 22 +1.1,Cabbage,Annual (Sensitive),0.65,0.55,FAO-56 Table 22,FAO-56 Table 22 +1.11,Carrots,Annual (Sensitive),0.75,0.65,FAO-56 Table 22,FAO-56 Table 22 +1.12,Celery,Annual (Sensitive),0.4,0.80,FAO-56 Table 22,FAO-56 Table 22 +4.03,Citrus (<3.0 m tall),Woody Perennial,0.95,0.50,FAO-56 Table 22,FAO-56 Table 22 +4.02,Citrus (>3.8 m tall),Woody Perennial,1.35,0.50,FAO-56 Table 22,FAO-56 Table 22 +1.13,Corn (grain),Annual (Hardy),1.35,0.45,FAO-56 Table 22,FAO-56 Table 22 +1.14,Corn (silage),Annual (Hardy),1.0,0.50,FAO-56 Table 22,FAO-56 Table 22 +1.15,Cotton,Annual (Hardy),1.35,0.35,FAO-56 Table 22,FAO-56 Table 22 +1.16,Cucumber,Annual (Sensitive),0.95,0.50,FAO-56 Table 22,FAO-56 Table 22 +4.04,Date Palm,Woody Perennial,2.0,0.50,FAO-56 Table 22,FAO-56 Table 22 +1.17,Eggplant,Annual (Hardy),0.95,0.55,FAO-56 Table 22,FAO-56 Table 22 +4.05,Evergreen,Woody Perennial,1.25,0.35,FAO-56 Table 22,FAO-56 Table 22 +1.18,Flax,Annual (Hardy),1.25,0.50,FAO-56 Table 22,FAO-56 Table 22 +1.19,Grains (small),Annual (Hardy),1.25,0.45,FAO-56 Table 22,FAO-56 Table 22 +1.2,Grains (winter),Annual (Hardy),1.65,0.45,FAO-56 Table 22,FAO-56 Table 22 +3.05,Grapes (raisin),Woody Perennial,1.5,0.65,FAO-56 Table 22,FAO-56 Table 22 +3.04,Grapes (table),Woody Perennial,1.5,0.35,FAO-56 Table 22,FAO-56 Table 22 +3.03,Grapes (wine),Woody Perennial,1.5,0.55,FAO-56 Table 22,FAO-56 Table 22 +2.02,Improved Pasture,Non-woody Perennial,1.0,0.40,FAO-56 Table 22,FAO-56 Table 22 +3.06,Kiwifruit,Woody Perennial,1.0,0.65,FAO-56 Table 22,FAO-56 Table 22 +1.21,Lentil,Annual (Hardy),0.7,0.50,FAO-56 Table 22,FAO-56 Table 22 +1.22,Lettuce,Annual (Sensitive),0.4,0.70,FAO-56 Table 22,FAO-56 Table 22 +1.23,Melon,Annual (Hardy),1.2,0.55,FAO-56 Table 22,FAO-56 Table 22 +1.24,Millet,Annual (Hardy),1.5,0.45,FAO-56 Table 22,FAO-56 Table 22 +1.25,Mustard,Annual (Hardy),1.2,0.45,Common value for other hardy annuals,"Dharmasri, Jong, Cowell. 1993. https://harvest.usask.ca/server/api/core/bitstreams/fc5e68a2-951c-4d34-9cfe-b5d2a383d449/content" +1.26,Oats,Annual (Hardy),1.25,0.45,FAO-56 Table 22,FAO-56 Table 22 +4.06,Olives,Woody Perennial,1.45,0.35,FAO-56 Table 22,FAO-56 Table 22 +1.27,Onion (dry),Annual (Sensitive),0.45,0.70,FAO-56 Table 22,FAO-56 Table 22 +1.28,Onion (green),Annual (Sensitive),0.45,0.70,FAO-56 Table 22,FAO-56 Table 22 +1.29,Peas,Annual (Sensitive),0.8,0.65,FAO-56 Table 22,FAO-56 Table 22 +1.3,Peppers,Annual (Sensitive),0.75,0.70,FAO-56 Table 22,FAO-56 Table 22 +1.31,Potato,Annual (Sensitive),0.5,0.65,FAO-56 Table 22,FAO-56 Table 22 +1.32,Radishes,Annual (Sensitive),0.4,0.70,FAO-56 Table 22,FAO-56 Table 22 +1.33,Rice,Rice,0.75,1.0,FAO-56 Table 22,FAO value is 0.8 but WHC logic should be bypassed +1.34,Safflower,Annual (Hardy),1.5,0.40,FAO-56 Table 22,FAO-56 Table 22 +1.35,Sisal,Annual (Hardy),0.75,0.20,FAO-56 Table 22,FAO-56 Table 22 +1.36,Sorghum,Annual (Hardy),1.5,0.45,FAO-56 Table 22,FAO-56 Table 22 +1.37,Spinach,Annual (Sensitive),0.4,0.80,FAO-56 Table 22,FAO-56 Table 22 +1.38,Squash,Annual (Hardy),0.8,0.50,FAO-56 Table 22,FAO-56 Table 22 +3.07,Stone fruits,Woody Perennial,1.5,0.50,FAO-56 Table 22,FAO-56 Table 22 +1.39,Strawberries w/mulch,Annual (Sensitive),0.25,0.80,FAO-56 Table 22,FAO-56 Table 22 +1.47,Sudan grass,Annual (Hardy),1.25,0.45,FAO-56 Table 22,FAO-56 Table 22 +1.4,Sugarbeet,Annual (Hardy),0.95,0.45,FAO-56 Table 22,FAO-56 Table 22 +3.48,Sugarcane,Non-woody Perennial,1.6,0.35,FAO-56 Table 22,FAO-56 Table 22 +1.41,Sunflower,Annual (Hardy),1.15,0.55,FAO-56 Table 22,FAO-56 Table 22 +1.42,Sweet Potatoes,Annual (Hardy),1.25,0.35,FAO-56 Table 22,FAO-56 Table 22 +1.43,Tomato,Annual (Sensitive),1.1,0.60,FAO-56 Table 22,FAO-56 Table 22 +2.03,Turfgrass (cool-season),Non-woody Perennial,0.75,0.60,FAO-56 Table 22,FAO-56 Table 22 +2.04,Turfgrass (warm-season),Non-woody Perennial,0.75,0.50,FAO-56 Table 22,FAO-56 Table 22 +1.44,Vegetables,Annual (Sensitive),0.4,0.70,FAO-56 Table 22,FAO-56 Table 22 +3.08,Walnuts,Woody Perennial,2.05,0.50,FAO-56 Table 22,FAO-56 Table 22 +1.46,Watermelon,Annual (Hardy),1.15,0.60,FAO-56 Table 22,FAO-56 Table 22 +1.45,Wheat,Annual (Hardy),1.25,0.45,FAO-56 Table 22,FAO-56 Table 22 diff --git a/modules/data.land/data/crop_whc.rda b/modules/data.land/data/crop_whc.rda new file mode 100644 index 0000000000..db521d6444 Binary files /dev/null and b/modules/data.land/data/crop_whc.rda differ diff --git a/modules/data.land/inst/compare-cimis-openet.R b/modules/data.land/inst/compare-cimis-openet.R new file mode 100644 index 0000000000..2f81c123c7 --- /dev/null +++ b/modules/data.land/inst/compare-cimis-openet.R @@ -0,0 +1,41 @@ +#!/usr/bin/env Rscript + +if (interactive()) { + devtools::load_all("modules/data.land") +} else { + library(PEcAn.data.land) +} + +design_points_path <- "~/projects/cimis-to-irrigation/design_points.csv" +cimis_eto_cog_path <- "~/data/CIMIS-ETo-COG" + +dates <- seq.Date(as.Date("2020-03-01"), as.Date("2020-11-30"), "day") +design_points <- readr::read_csv(design_points_path) |> + head(10) + +cimis_et <- extract_cimis_dates( + design_points, + dates, + cimis_eto_cog_path, + .progress = TRUE +) + +openet_et <- extract_openet_daily( + design_points, + min(dates), + max(dates) +) + +combined <- cimis_et |> + dplyr::rename(cimis = etref_mm_day) |> + dplyr::full_join(openet_et |> dplyr::rename(openet = et_mm_day)) + +combined_long <- combined |> + tidyr::pivot_longer(c(cimis, openet), names_to = "source", values_to = "et_mm_day") + +library(ggplot2) +ggplot(combined_long) + + aes(x = date, y = et_mm_day, color = source) + + geom_line() + + facet_wrap(~id) + + theme_bw() diff --git a/modules/data.land/inst/ssurgo-soil-inputs.R b/modules/data.land/inst/ssurgo-soil-inputs.R new file mode 100644 index 0000000000..04edd40a20 --- /dev/null +++ b/modules/data.land/inst/ssurgo-soil-inputs.R @@ -0,0 +1,34 @@ +# Example of querying available water content from SSURGO + +devtools::load_all("~/projects/pecan/cimis-et/modules/data.land") + +design_points <- readr::read_csv("~/projects/cimis-to-irrigation/design_points.csv") |> + head(10) + +mukeys_list <- purrr::map2( + design_points$lon, design_points$lat, + ~ PEcAn.data.land::ssurgo_mukeys_point(point = c(.x, .y), distance = 20) +) + +all_mukeys <- unique(unlist(mukeys_list)) + +soil_data <- PEcAn.data.land::gSSURGO.Query( + mukeys = all_mukeys, + fields = c( + "chorizon.hzdept_r", + "chorizon.hzdepb_r", + "chorizon.awc_r" + ) +) + +result <- design_points |> + dplyr::mutate(mukey = mukeys_list) |> + tidyr::unnest(mukey) |> + dplyr::mutate(mukey = as.numeric(mukey)) |> + dplyr::left_join( + soil_data |> + dplyr::select(mukey, awc_r), + by = "mukey" + ) + +readr::write_csv(result, "_test_soil_inputs.csv") diff --git a/modules/data.land/man/SSURGO_API_MAX_AREA_M2.Rd b/modules/data.land/man/SSURGO_API_MAX_AREA_M2.Rd new file mode 100644 index 0000000000..c07c2d0249 --- /dev/null +++ b/modules/data.land/man/SSURGO_API_MAX_AREA_M2.Rd @@ -0,0 +1,16 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/gSSURGO_Query.R +\docType{data} +\name{SSURGO_API_MAX_AREA_M2} +\alias{SSURGO_API_MAX_AREA_M2} +\title{Maximum area for SSURGO API requests} +\format{ +An object of class \code{numeric} of length 1. +} +\usage{ +SSURGO_API_MAX_AREA_M2 +} +\description{ +Maximum area for SSURGO API requests +} +\keyword{datasets} diff --git a/modules/data.land/man/apply_water_balance.Rd b/modules/data.land/man/apply_water_balance.Rd new file mode 100644 index 0000000000..dd27e9963f --- /dev/null +++ b/modules/data.land/man/apply_water_balance.Rd @@ -0,0 +1,54 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/water_balance.R +\name{apply_water_balance} +\alias{apply_water_balance} +\title{Apply water balance calculations to a data frame with multiple sites} +\usage{ +apply_water_balance( + df, + idcol, + whc_mm = 500, + irrigation_max_mm = 150, + flood_target = 125, + flood_min = 62.5, + flood_max = 175, + seepage = 2.5 +) +} +\arguments{ +\item{df}{Data frame with columns: `date`, `location_id`, `etc_mm_day`, +`precip_mm_day`, `crop_name`, and `whc_min_frac` (optional, defaults to +0.375). If a `whc_mm` column is present, it is used as the water holding +capacity.} + +\item{idcol}{Column name for grouping (typically, `location_id`, `parcel_id` +or similar).} + +\item{whc_mm}{Water holding capacity (mm); ignored if `whc_mm` is a column +in `df`.} + +\item{irrigation_max_mm}{Maximum irrigation to be applied at a time. See +`irrigation_max` argument of [calc_water_balance()]. Ignored if +`irrigation_max_mm` is a column of `df`.} + +\item{flood_target}{Numeric scalar. Target ponded water depth. +Irrigation refills to this level.} + +\item{flood_min}{Numeric scalar. Minimum acceptable pond depth before +irrigation is triggered.} + +\item{flood_max}{Numeric scalar. Maximum pond depth before bund +overflow / runoff occurs.} + +\item{seepage}{Numeric scalar. Daily seepage + percolation loss +Represents losses through the bund and downward percolation +through the hardpan (if any). Typical range: 1-5 mm/day +for well-puddled California soils.} +} +\value{ +Data frame with added columns: `W_t` / `pond_depth`, `irr`, `runoff` +} +\description{ +Groups by location and applies calc_water_balance to each group. Unlike +`calc_water_balance`, the units here *do* matter -- they should be `mm_day`. +} diff --git a/modules/data.land/man/bism_kc_by_crop.Rd b/modules/data.land/man/bism_kc_by_crop.Rd index 3edcb76830..c6771fb564 100644 --- a/modules/data.land/man/bism_kc_by_crop.Rd +++ b/modules/data.land/man/bism_kc_by_crop.Rd @@ -36,7 +36,7 @@ bism_kc_by_crop Crop and growth stage specific coefficients (Kc) from the Basic Irrigation Scheduling (BIS) Excel workbook (Snyder et. al., 2014). The dataset is an export of the BISm.xlsx workbook's `CropRef` worksheet, with columns renamed -and columns added that map to LandIQ CADWR land use dataset +and columns added that map to LandIQ CADWR land use dataset (\code{\link{landiq_crop_mapping_codes}}; California Department of Water Resources, 2023). This dataset provides the information needed to reconstruct a stage-based daily Kc curve when combined with grass-reference evapotranspiration (ETo), such as that provided diff --git a/modules/data.land/man/calc_water_balance.Rd b/modules/data.land/man/calc_water_balance.Rd new file mode 100644 index 0000000000..0a6eb20d64 --- /dev/null +++ b/modules/data.land/man/calc_water_balance.Rd @@ -0,0 +1,74 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/water_balance.R +\name{calc_water_balance} +\alias{calc_water_balance} +\title{Calculate water balance for a time series at a single site} +\usage{ +calc_water_balance( + et, + precip, + whc, + whc_min_frac, + W_initial = NULL, + w_min = NULL, + irrigation_max = NULL +) +} +\arguments{ +\item{et}{Vector of evapotranspiration values (distance / time)} + +\item{precip}{Vector of precipitation values (distance / time)} + +\item{whc}{Water holding capacity (distance); the plant-available range from +wilting point to field capacity (i.e., `whc = field_capacity - wilting_point`). +Can be a single value or a vector of the same length as `et`.} + +\item{whc_min_frac}{Fraction of WHC for minimum water level (irrigation +trigger); unused if `w_min` is explicitly specified. +Can be a single value or a vector of the same length as `et`.} + +\item{W_initial}{Initial soil water content at start of time series +(distance); defaults to `whc[1]` (field capacity) if NULL} + +\item{w_min}{Minimum water level threshold (distance); irrigation is +triggered when soil water falls below this level; defaults to +`whc_min_frac * whc` if NULL. +Can be a single value or a vector of the same length as `et`.} + +\item{irrigation_max}{If set, maximum amount of irrigation to apply at a +time (distance).} +} +\value{ +List with vectors: W_t (soil water), irr (irrigation), runoff +} +\description{ +This is the core water balance calculation that operates on primitive +numeric vectors for easy testing and debugging. Each input is a time series +of daily values for a single location (one date per row). The units for all +quantities are arbitrary, but they should be consistent (distance / time; +e.g., most commonly, mm/day). +} +\details{ +This function operates in *relative* WHC space, where: +- `w = 0` represents wilting point (no plant-available water) +- `w = whc` represents field capacity (maximum plant-available water) +- `whc = (field_capacity - wilting_point) * rooting_depth` (the plant-available range) + +Although this function can be used as a crude approximation of rice +irrigation (by setting `whc_min_frac = 1.0`), we recommend using +[calc_water_balance_rice()] instead, which explicitly tracks rice pond +depth, implements seepage, etc. +} +\examples{ +# Calculate WHC from field capacity, wilting point, and rooting depth +field_capacity <- 0.30 # volumetric (m3/m3) +wilting_point <- 0.10 # volumetric (m3/m3) +rooting_depth <- 1000 # mm +whc <- (field_capacity - wilting_point) * rooting_depth # mm + +# Run water balance with 5 days of ET and precip data +et <- c(4, 5, 6, 4, 3) # mm/day +precip <- c(0, 0, 10, 0, 0) # mm/day +result <- calc_water_balance(et, precip, whc = whc, whc_min_frac = 0.5) +str(result) +} diff --git a/modules/data.land/man/calc_water_balance_rice.Rd b/modules/data.land/man/calc_water_balance_rice.Rd new file mode 100644 index 0000000000..249c874f01 --- /dev/null +++ b/modules/data.land/man/calc_water_balance_rice.Rd @@ -0,0 +1,74 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/water_balance.R +\name{calc_water_balance_rice} +\alias{calc_water_balance_rice} +\title{Calculate water balance for a flooded rice paddy} +\usage{ +calc_water_balance_rice( + et, + precip, + flood_target, + flood_min, + flood_max, + seepage, + drain = NULL, + pond_init = flood_target +) +} +\arguments{ +\item{et}{Numeric vector. Daily reference ET. During flooded +periods this is treated as open-water ET; no crop +coefficient is applied here but you can pre-multiply if +needed.} + +\item{precip}{Numeric vector. Daily precipitation.} + +\item{flood_target}{Numeric scalar. Target ponded water depth. +Irrigation refills to this level.} + +\item{flood_min}{Numeric scalar. Minimum acceptable pond depth before +irrigation is triggered.} + +\item{flood_max}{Numeric scalar. Maximum pond depth before bund +overflow / runoff occurs.} + +\item{seepage}{Numeric scalar. Daily seepage + percolation loss +Represents losses through the bund and downward percolation +through the hardpan (if any). Typical range: 1-5 mm/day +for well-puddled California soils.} + +\item{drain}{Logical vector (same length as et). TRUE on days when an +intentional drainage event occurs (e.g., mid-season drain, +pre-harvest drawdown). Pond is set to 0 on these days and +irrigation is suppressed.} + +\item{pond_init}{Numeric scalar. Initial pond depth at t = 1. +Defaults to flood_target.} +} +\value{ +A list with numeric vectors of length n: + \item{pond_depth}{Ponded water depth at end of each day} + \item{irr}{Irrigation applied} + \item{runoff}{Bund overflow / surface runoff} +} +\description{ +Models the water balance of a flooded rice system with a two-layer +structure: a ponded water layer above a saturated soil profile. This is +physically distinct from the upland soil water balance in +[calc_water_balance()]. Water is managed to maintain a target flood depth, +with support for mid-season drainage events. +} +\details{ +The soil profile is assumed to be continuously saturated during flooded +periods, so plant-available soil water is not tracked separately. ET is +applied directly to the pond layer (open-water ET during flooded periods). + +Irrigation is triggered when pond_depth falls below flood_min. Farmers +refill to flood_target. Runoff (bund overflow) occurs when pond_depth +exceeds flood_max. + +Mid-season drainage is specified as a logical vector (`drain[t] = TRUE` +means the field is intentionally drained on day t). During drainage days, +the pond is drawn down to pond_depth = 0 and irrigation is suppressed. This +represents practices such as weed control or pre-harvest drainage. +} diff --git a/modules/data.land/man/create_event_file.Rd b/modules/data.land/man/create_event_file.Rd new file mode 100644 index 0000000000..1031fb712e --- /dev/null +++ b/modules/data.land/man/create_event_file.Rd @@ -0,0 +1,19 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/irrigation-event-files.R +\name{create_event_file} +\alias{create_event_file} +\title{Create SIPNET event files from water balance data} +\usage{ +create_event_file(df) +} +\arguments{ +\item{df}{Data frame with columns: date, year, week, day_of_year, irr} +} +\value{ +Data frame with columns: loc, year, doy, event_type, irr_cm, type +} +\description{ +Aggregates irrigation to weekly values and formats for SIPNET. +Irrigation is summed by week and reported on the first day of each week. +Units are converted from mm to cm. +} diff --git a/modules/data.land/man/crop_whc.Rd b/modules/data.land/man/crop_whc.Rd new file mode 100644 index 0000000000..875dfb16b1 --- /dev/null +++ b/modules/data.land/man/crop_whc.Rd @@ -0,0 +1,39 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/data.R +\docType{data} +\name{crop_whc} +\alias{crop_whc} +\title{Crop-specific rooting depths and water-depletion thresholds} +\format{ +A tibble with one row per crop and the following columns: +\describe{ + \item{crop_number}{BIS crop number (character). Blank for crops not in BIS.} + \item{crop_name}{Crop name.} + \item{Category}{Crop category (e.g., Woody Perennial, Annual (Hardy)).} + \item{rooting_depth_m}{Maximum effective rooting depth in meters.} + \item{whc_min_frac}{Minimum soil water as fraction of available water-holding capacity (0-1).} + \item{whc_notes}{Rationale or source for the minimum WHC value.} + \item{rooting_depth_notes}{Rationale or source for the rooting depth value.} +} +} +\source{ +Allen, R. G., Pereira, L. S., Raes, D., & Smith, M. +\emph{FAO Irrigation and Drainage Paper No. 56: Crop evapotranspiration}. Chapter 8. Table 22. +https://www.fao.org/4/x0490e/x0490e0e.htm#chapter%208%20%20%20etc%20under%20soil%20water%20stress%20conditions +} +\usage{ +crop_whc +} +\description{ +Maximum effective rooting depth and minimum soil water content thresholds +for various crops. The `whc_min_frac` column represents the fraction of +total available water (TAW) that should remain in the root zone to avoid +moisture stress (equivalent to 1 - p, where p is the depletion fraction +from FAO-56). +} +\examples{ +data(crop_whc) +head(crop_whc) + +} +\keyword{datasets} diff --git a/modules/data.land/man/extract_openet_daily.Rd b/modules/data.land/man/extract_openet_daily.Rd new file mode 100644 index 0000000000..7099a0d504 --- /dev/null +++ b/modules/data.land/man/extract_openet_daily.Rd @@ -0,0 +1,32 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/openET.R +\name{extract_openet_daily} +\alias{extract_openet_daily} +\title{Extract daily ET data from OpenET} +\usage{ +extract_openet_daily(design_points, start_date, end_date) +} +\arguments{ +\item{design_points}{`data.frame` of design points with columns `lat` and `lon`} + +\item{start_date}{Start date for data extraction} + +\item{end_date}{End date for data extraction} +} +\value{ +`design_points` `data.frame` with additional columns `date`, and +`et_mm_day` (ET, mm/day) +} +\description{ +Note that this requires the environment variable `OPENET_API_KEY` to be set. +A convenient way to do this is via a `.Renviron`, either globally +(`~/.Renviron`) or in the current working directory (`./.Renviron`), with +contents like: +} +\details{ +``` +OPENET_API_KEY="abcdefg123456" +``` + +You can obtain an OpenET API key from the OpenET data portal. +} diff --git a/modules/data.land/man/fertilizer_composition_data.Rd b/modules/data.land/man/fertilizer_composition_data.Rd index 4a7c41a23b..50954070e8 100644 --- a/modules/data.land/man/fertilizer_composition_data.Rd +++ b/modules/data.land/man/fertilizer_composition_data.Rd @@ -28,15 +28,15 @@ DayCent model default parameter file: `omad.100` obtained from the Soil Carbon S fertilizer_composition_data } \description{ -A dataset of fertilizer and organic matter addition types +A dataset of fertilizer and organic matter addition types and their nitrogen and carbon composition, based on the SWAT model's `fertilizer.frt` table and DayCent model defaults for organic matter C:N ratio parameters. } \details{ -This table is based on SWAT model's \code{fertilizer.frt} file, and uses +This table is based on SWAT model's \code{fertilizer.frt} file, and uses C:N ratios (\code{cn_ratio}) from DayCent model default parameter files. -\code{fraction_nh3_n} and \code{fraction_no3_n} represent the fraction of +\code{fraction_nh3_n} and \code{fraction_no3_n} represent the fraction of fertilizer by mass that is ammonium-N and nitrate-N, respectively. This is different from the SWAT model's definition of \code{fraction_nh3_n} as a fraction of the total mineral N. } diff --git a/modules/data.land/man/gSSURGO.Query.Rd b/modules/data.land/man/gSSURGO.Query.Rd index 51d6def7dc..336bcd20e5 100644 --- a/modules/data.land/man/gSSURGO.Query.Rd +++ b/modules/data.land/man/gSSURGO.Query.Rd @@ -44,10 +44,10 @@ for full list):\tabular{lll}{ \code{chorizon.om_r} \tab Organic matter (<2 mm soil) \tab \% \cr \code{chorizon.hzdept_r} \tab Horizon top depth \tab cm \cr \code{chfrags.fragvol_r} \tab Rock fragments \tab \% (by volume) \cr - \code{chorizon.dbthirdbar_r} \tab Bulk density at field capacity \tab g/cm³ \cr + \code{chorizon.dbthirdbar_r} \tab Bulk density at field capacity \tab g/cm3 \cr \code{chorizon.ph1to1h2o_r} \tab Soil pH (1:1 H2O) \tab pH (unitless) \cr - \code{chorizon.cokey} \tab Component key (identifier) \tab — \cr - \code{chorizon.chkey} \tab Horizon key (identifier) \tab — \cr + \code{chorizon.cokey} \tab Component key (identifier) \tab - \cr + \code{chorizon.chkey} \tab Horizon key (identifier) \tab - \cr } diff --git a/modules/data.land/man/landiq_crop_mapping_codes.Rd b/modules/data.land/man/landiq_crop_mapping_codes.Rd index 8cbd354c43..7de715dfdc 100644 --- a/modules/data.land/man/landiq_crop_mapping_codes.Rd +++ b/modules/data.land/man/landiq_crop_mapping_codes.Rd @@ -15,7 +15,7 @@ A data frame with 203 rows and 4 columns: } } \source{ -California Department of Water Resources. (2023). Statewide Crop Mapping—California +California Department of Water Resources. (2023). Statewide Crop Mapping—California Natural Resources Agency Open Data. Metadata retrieved from https://data.cnra.ca.gov/dataset/statewide-crop-mapping and manually extracted into `data-raw/landiq_crop_mapping_codes.tsv`. } \usage{ diff --git a/modules/data.land/man/mslsp_to_canopycover.Rd b/modules/data.land/man/mslsp_to_canopycover.Rd new file mode 100644 index 0000000000..9998f68cb0 --- /dev/null +++ b/modules/data.land/man/mslsp_to_canopycover.Rd @@ -0,0 +1,25 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/mslsp_to_canopycover.R +\name{mslsp_to_canopycover} +\alias{mslsp_to_canopycover} +\title{Convert MSLSP phenology data to tidy canopy cover} +\usage{ +mslsp_to_canopycover(mslsp_path, parcel_ids = NULL, years = NULL) +} +\arguments{ +\item{mslsp_path}{Path to directory containing MSLSP outputs (in parquet format)} + +\item{parcel_ids}{Vector of parcel IDs for filtering. If `NULL`, use all parcels.} + +\item{years}{Vector of years for filtering. If `NULL`, use all years.} +} +\value{ +`data.frame` of `parcel_id`, `year`, `season`, `date`, and +`canopy_cover` (fraction). `date` is a sequence from the MSLSP greenness +onset (growing season start) to the greenness minimum (growing season end). +`canopy_cover` is the fractional canpoy cover (0 to 1), suitable for ingest +into [eto_to_etc_bism()] in "canopy cover mode". +} +\description{ +Convert MSLSP phenology data to tidy canopy cover +} diff --git a/modules/data.land/man/parse_mukey_response.Rd b/modules/data.land/man/parse_mukey_response.Rd new file mode 100644 index 0000000000..596f532e33 --- /dev/null +++ b/modules/data.land/man/parse_mukey_response.Rd @@ -0,0 +1,17 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/gSSURGO_Query.R +\name{parse_mukey_response} +\alias{parse_mukey_response} +\title{Parse responses from the mukey WFS service} +\usage{ +parse_mukey_response(resp) +} +\arguments{ +\item{resp}{`httr2` response object from SSURGO mukey WFS API} +} +\value{ +character vector of mukeys +} +\description{ +Parse responses from the mukey WFS service +} diff --git a/modules/data.land/man/ssurgo_mukeys.Rd b/modules/data.land/man/ssurgo_mukeys.Rd new file mode 100644 index 0000000000..de370953f5 --- /dev/null +++ b/modules/data.land/man/ssurgo_mukeys.Rd @@ -0,0 +1,50 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/gSSURGO_Query.R +\name{ssurgo_mukeys} +\alias{ssurgo_mukeys} +\alias{ssurgo_mukeys_bbox} +\alias{ssurgo_mukeys_point} +\alias{ssurgo_mukeys_bigbbox} +\title{Get map unit keys (mukeys) from gSSURGO} +\usage{ +ssurgo_mukeys_bbox(bbox) + +ssurgo_mukeys_point(point, distance) + +ssurgo_mukeys_bigbbox(bbox) +} +\arguments{ +\item{bbox}{Numeric vector of length 4: c(xmin, ymin, xmax, ymax) in WGS84 +(EPSG:4326). Features that intersect the bounding box are returned.} + +\item{point}{Numeric vector of length 2: c(lon, lat) in WGS84 (EPSG:4326).} + +\item{distance}{Numeric. Distance in meters from the point. Use 0 for exact +point intersection.} +} +\value{ +Character vector of unique map unit keys (mukeys). +} +\description{ +These functions query the NRCS gSSURGO Web Feature Service to retrieve map +unit keys based on different spatial filters. +} +\details{ +These functions use the NRCS SDM Data Access Web Feature Service: +\url{https://sdmdataaccess.nrcs.usda.gov/SpatialFilterHelp.htm} + +The total extent cannot exceed 10,100,000,000 square meters (~3,900 square +miles). Use `ssurgo_mukeys_bigbbox()` for large bounding boxes. +} +\examples{ +\dontrun{ +# Bounding box query +mukeys <- ssurgo_mukeys_bbox(bbox = c(-114.006, 32.1823, -113.806, 32.2823)) + +# Point with distance (600m radius) +mukeys <- ssurgo_mukeys_point(point = c(-91.22, 38.46), distance = 600) + +# Large bounding box +mukeys <- ssurgo_mukeys_bigbbox(bbox = c(-120, 35, -110, 45)) +} +} diff --git a/modules/data.land/tests/Rcheck_reference.log b/modules/data.land/tests/Rcheck_reference.log index 5452a7e02c..84df7871a3 100644 --- a/modules/data.land/tests/Rcheck_reference.log +++ b/modules/data.land/tests/Rcheck_reference.log @@ -13,7 +13,7 @@ * package encoding: UTF-8 * checking package namespace information ... OK * checking package dependencies ... NOTE -Imports includes 22 non-default packages. +Imports includes 25 non-default packages. Importing from so many packages makes the package vulnerable to any of them becoming unavailable. Move as many as possible to Suggests and use conditionally. diff --git a/modules/data.land/tests/testthat/test-ssurgo.R b/modules/data.land/tests/testthat/test-ssurgo.R new file mode 100644 index 0000000000..2c13a0e2f3 --- /dev/null +++ b/modules/data.land/tests/testthat/test-ssurgo.R @@ -0,0 +1,125 @@ +context("ssurgo_mukeys") + +test_that("ssurgo_mukeys_bbox validates bbox input", { + expect_error(ssurgo_mukeys_bbox("not numeric"), "numeric vector of length 4") + expect_error(ssurgo_mukeys_bbox(c(1, 2)), "numeric vector of length 4") + expect_error(ssurgo_mukeys_bbox(c(3, 2, 1, 4)), "xmin < xmax") + expect_error(ssurgo_mukeys_bbox(c(1, 4, 3, 2)), "ymin < ymax") +}) + +test_that("ssurgo_mukeys_point validates point and distance", { + expect_error(ssurgo_mukeys_point(c(1, 2, 3)), "length 2") + expect_error(ssurgo_mukeys_point(c(1, 2)), "missing") + expect_error(ssurgo_mukeys_point(point = c(1, 2), distance = -10), "non-negative") + expect_error(ssurgo_mukeys_point(point = c(1, 2), distance = "100"), "non-negative") +}) + +test_that("ssurgo_mukeys_bbox returns mukeys for valid location", { + skip_on_cran() + skip_on_ci() + + mukeys <- ssurgo_mukeys_bbox(bbox = c(-114.006, 32.1823, -113.806, 32.2823)) + + expect_type(mukeys, "character") + expect_gt(length(mukeys), 0) +}) + +test_that("ssurgo_mukeys_point with distance returns mukeys", { + skip_on_cran() + skip_on_ci() + + mukeys <- ssurgo_mukeys_point(point = c(-91.22, 38.46), distance = 600) + + expect_type(mukeys, "character") + expect_gt(length(mukeys), 0) +}) + +test_that("ssurgo_mukeys_point with zero distance returns mukeys", { + skip_on_cran() + skip_on_ci() + + mukeys <- ssurgo_mukeys_point(point = c(-91.22, 38.46), distance = 0) + + expect_type(mukeys, "character") +}) + +test_that("ssurgo_mukeys_bbox returns unique mukeys", { + skip_on_cran() + skip_on_ci() + + mukeys <- ssurgo_mukeys_bbox(bbox = c(-114.006, 32.1823, -113.806, 32.2823)) + + expect_equal(length(mukeys), length(unique(mukeys))) +}) + +test_that("ssurgo_mukeys_point returns unique mukeys", { + skip_on_cran() + skip_on_ci() + + mukeys <- ssurgo_mukeys_point(point = c(-91.22, 38.46), distance = 600) + + expect_equal(length(mukeys), length(unique(mukeys))) +}) + +test_that("ssurgo_mukeys_bbox handles area with no soil data gracefully", { + skip_on_cran() + skip_on_ci() + + mukeys <- ssurgo_mukeys_bbox(bbox = c(0, 0, 0.001, 0.001)) + + expect_type(mukeys, "character") + expect_equal(length(mukeys), 0) +}) + +test_that("ssurgo_mukeys_bbox and ssurgo_mukeys_point return consistent results for same area", { + skip_on_cran() + skip_on_ci() + + center_lon <- -91.22 + center_lat <- 38.46 + distance <- 600 + + bbox_mukeys <- ssurgo_mukeys_bbox( + bbox = c( + center_lon - 0.01, + center_lat - 0.01, + center_lon + 0.01, + center_lat + 0.01 + ) + ) + point_mukeys <- ssurgo_mukeys_point( + point = c(center_lon, center_lat), + distance = distance + ) + + expect_type(bbox_mukeys, "character") + expect_type(point_mukeys, "character") + expect_gt(length(bbox_mukeys), length(point_mukeys)) +}) + +test_that("big bounding boxes exceed area limit", { + skip_on_cran() + skip_on_ci() + + bbox_01 <- c(-123.569131, 39.638344, -121.234281, 41.461763) + bbox_02 <- c(-124.064177, 38.994921, -120.102514, 42.088592) + + expect_error(ssurgo_mukeys_bbox(bbox = bbox_01), "exceeds maximum allowed area") + expect_error(ssurgo_mukeys_bbox(bbox = bbox_02), "exceeds maximum allowed area") +}) + +test_that("ssurgo_mukeys_bigbbox returns mukeys for large bounding boxes", { + skip_on_cran() + skip_on_ci() + + bbox_01 <- c(-123.569131, 39.638344, -121.234281, 41.461763) + bbox_02 <- c(-124.064177, 38.994921, -120.102514, 42.088592) + + mukeys_01 <- ssurgo_mukeys_bigbbox(bbox_01) + expect_type(mukeys_01, "character") + expect_gt(length(mukeys_01), 0) + + mukeys_02 <- ssurgo_mukeys_bigbbox(bbox_02) + expect_type(mukeys_02, "character") + expect_gt(length(mukeys_02), 0) +}) diff --git a/modules/data.land/tests/testthat/test-water_balance.R b/modules/data.land/tests/testthat/test-water_balance.R new file mode 100644 index 0000000000..998c4e9dda --- /dev/null +++ b/modules/data.land/tests/testthat/test-water_balance.R @@ -0,0 +1,86 @@ +context("Water balance calculations") + +expect_nonnegative <- function(result) { + testthat::expect_true(all(result$W_t >= 0)) + testthat::expect_true(all(result$irr >= 0)) + testthat::expect_true(all(result$runoff >= 0)) +} + +test_that("calc_water_balance: more precip leads to more runoff", { + n <- 10 + et <- rep(5, n) + whc <- 100 + whc_min_frac <- 0.5 + + precip_low <- c(rep(5, 5), rep(0, 5)) + precip_high <- c(rep(15, 5), rep(0, 5)) + + result_low <- calc_water_balance(et, precip_low, whc, whc_min_frac) + result_high <- calc_water_balance(et, precip_high, whc, whc_min_frac) + + expect_true(sum(result_high$runoff) > sum(result_low$runoff)) + expect_nonnegative(result_low) + expect_nonnegative(result_high) +}) + +test_that("calc_water_balance: more ET leads to less runoff", { + n <- 10 + precip <- c(rep(10, 5), rep(0, 5)) + whc <- 100 + whc_min_frac <- 0.5 + + et_low <- rep(2, n) + et_high <- rep(8, n) + + result_low <- calc_water_balance(et_low, precip, whc, whc_min_frac) + result_high <- calc_water_balance(et_high, precip, whc, whc_min_frac) + + expect_true(sum(result_high$runoff) < sum(result_low$runoff)) + expect_nonnegative(result_low) + expect_nonnegative(result_high) +}) + +test_that("calc_water_balance: more ET leads to more irrigation", { + n <- 60 + precip <- rep(0, n) + whc <- 100 + whc_min_frac <- 0.5 + + et_low <- rep(1, n) + et_high <- rep(5, n) + + result_low <- calc_water_balance(et_low, precip, whc, whc_min_frac) + result_high <- calc_water_balance(et_high, precip, whc, whc_min_frac) + + expect_true(sum(result_high$irr) > sum(result_low$irr)) + expect_nonnegative(result_low) + expect_nonnegative(result_high) +}) + +test_that("calc_water_balance: vector parameters work", { + n <- 10 + et <- rep(5, n) + precip <- rep(0, n) + + # WHC decreases halfway through + whc_vec <- c(rep(100, 5), rep(50, 5)) + whc_min_frac <- 0.5 + + # Constant WHC for comparison + result_const100 <- calc_water_balance(et, precip, 100, whc_min_frac) + result_const50 <- calc_water_balance(et, precip, 50, whc_min_frac) + result_vec <- calc_water_balance(et, precip, whc_vec, whc_min_frac) + + # Irrigation should match the first half of 100 + expect_equal(result_vec$irr[1:5], result_const100$irr[1:5]) + # Step 6 might differ because it depends on W_t[5], which is different. + # But subsequent steps (7:10) should be identical because step 6 will force W_t[6] to whc[6] (50). + expect_equal(result_vec$irr[7:10], result_const50$irr[7:10]) + + # w_min vector + w_min_vec <- c(rep(50, 5), rep(25, 5)) + # whc_min_frac is required but ignored if w_min is provided + result_wmin_vec <- calc_water_balance(et, precip, 100, whc_min_frac = 0.5, w_min = w_min_vec) + expect_equal(result_wmin_vec$irr[1:5], result_const100$irr[1:5]) +}) + diff --git a/workflows/irrigation-statewide/.gitignore b/workflows/irrigation-statewide/.gitignore new file mode 100644 index 0000000000..42959cecde --- /dev/null +++ b/workflows/irrigation-statewide/.gitignore @@ -0,0 +1,3 @@ +_logs/ +_targets*/ +zz-* diff --git a/workflows/irrigation-statewide/R/calc_awc.R b/workflows/irrigation-statewide/R/calc_awc.R new file mode 100644 index 0000000000..dfd671a44f --- /dev/null +++ b/workflows/irrigation-statewide/R/calc_awc.R @@ -0,0 +1,94 @@ +#' Calculate effective available water capacity (mm) for a soil profile +#' clipped to a given rooting depth. +calc_effective_awc <- function( + hzdept_r_cm, + hzdepb_r_cm, + awc_r, + rooting_depth_cm +) { + effective_top <- pmin(hzdept_r_cm, rooting_depth_cm) + effective_bottom <- pmin(hzdepb_r_cm, rooting_depth_cm) + thickness_cm <- pmax(0, effective_bottom - effective_top) + # awc_r is cm water / cm soil; + # multiply by thickness -> cm water -> mm water + sum(awc_r * thickness_cm, na.rm = TRUE) * 10 +} + +add_soil_awc <- function( + crop_info, + ssurgo_weights_path, + ssurgo_gdb_path +) { + parcel_ids <- unique(crop_info[["parcel_id"]]) + message("Reading SSURGO weights") + weights <- arrow::open_dataset(ssurgo_weights_path) |> + dplyr::filter(.data$parcel_id %in% parcel_ids) |> + dplyr::collect() + + component <- sf::read_sf( + ssurgo_gdb_path, + layer = "component", + as_tibble = TRUE + ) |> + dplyr::semi_join(weights, by = "mukey") + + chorizon <- sf::read_sf( + ssurgo_gdb_path, + layer = "chorizon", + as_tibble = TRUE + ) |> + dplyr::semi_join(component, by = "cokey") + + combined <- weights |> + dplyr::left_join( + dplyr::select(crop_info, "parcel_id", "rooting_depth_m"), + by = "parcel_id", + relationship = "many-to-many" + ) |> + dplyr::left_join( + component, + by = "mukey", + relationship = "many-to-many" + ) |> + dplyr::left_join( + chorizon, + by = "cokey", + relationship = "many-to-many" + ) + + awc <- combined |> + dplyr::filter( + !is.na(.data$awc_r), + !is.na(.data$hzdept_r), + !is.na(.data$hzdepb_r) + ) |> + dplyr::mutate( + rooting_depth_cm = .data$rooting_depth_m * 100, + .keep = "unused" + ) |> + # Aggregate horizons (by component) + dplyr::summarize( + whc_mm_cmp = calc_effective_awc( + .data$hzdept_r, + .data$hzdepb_r, + .data$awc_r, + .data$rooting_depth_cm + ), + .by = c("parcel_id", "mukey", "cokey", "area_m2", "weight", "comppct_r") + ) |> + # Aggregate components (by mapping unit) + dplyr::summarize( + whc_mm_mu = sum( + .data$whc_mm_cmp * .data$comppct_r / sum(.data$comppct_r) + ), + .by = c("parcel_id", "mukey", "area_m2", "weight") + ) |> + # Aggregate mapping units (by parcel) + dplyr::summarize( + whc_mm = sum(.data$whc_mm_mu * .data$weight), + .by = "parcel_id" + ) + + crops_with_soil <- dplyr::left_join(crop_info, awc, by = "parcel_id") + crops_with_soil +} diff --git a/workflows/irrigation-statewide/R/crop_timeseries.R b/workflows/irrigation-statewide/R/crop_timeseries.R new file mode 100644 index 0000000000..d706558c0e --- /dev/null +++ b/workflows/irrigation-statewide/R/crop_timeseries.R @@ -0,0 +1,83 @@ +#!/usr/bin/env Rscript + +# Sys.setenv(TAR_PROJECT = "small") +# targets::tar_load(c(crops_with_soil, phenology, precip, etref)) + +make_crop_timeseries <- function(crops_with_soil, phenology, precip, etref) { + crop_cols <- c( + "parcel_id", + "year", + "crop_name", + "whc_min_frac", + "whc_mm", + "whc_min_frac" + ) + + crop_soil_timeseries <- crops_with_soil |> + dplyr::select(dplyr::all_of(crop_cols)) |> + dplyr::inner_join( + phenology, + by = c("parcel_id", "year"), + relationship = "many-to-many" + ) |> + dplyr::slice_max( + .data$canopy_cover, + n = 1, + by = c("parcel_id", "date") + ) + + check_unique <- crop_soil_timeseries |> + dplyr::group_by(.data$parcel_id, .data$date) |> + dplyr::count() |> + dplyr::filter(.data$n > 1) + if (nrow(check_unique) > 1) { + bad_parcels <- unique(check_unique[["parcel_id"]]) + warning( + "The parcels below have some non-unique values ", + "even after `slice_max(canopy_cover)`. ", + "This is likely because of non-unique ", + "landIQ crop --> crop_type mappings. ", + "Selecting only the first row in each of these cases.", + "\n", + paste(bad_parcels, collapse = ", ") + ) + crop_soil_timeseries <- crop_soil_timeseries |> + dplyr::slice_max( + .data$canopy_cover, + n = 1, + by = c("parcel_id", "date"), + with_ties = FALSE + ) + } + + start_date <- min(crop_soil_timeseries[["date"]]) + end_date <- max(crop_soil_timeseries[["date"]]) + + complete_crop_timeseries <- precip |> + dplyr::filter( + .data$date >= .env$start_date, + .data$date <= .env$end_date + ) |> + dplyr::left_join( + dplyr::select(etref, -"year"), + by = c("parcel_id", "date") + ) |> + dplyr::arrange(.data$parcel_id, .data$date) |> + tidyr::fill("etref_mm_day") |> + dplyr::left_join(crop_soil_timeseries, by = c("parcel_id", "date")) |> + tidyr::replace_na(list(canopy_cover = 0)) |> + tidyr::fill( + c("whc_min_frac", "whc_mm", "crop_name"), + .direction = "downup" + ) |> + dplyr::mutate( + etc_mm_day = eto_to_etc_bism( + eto = .data$etref_mm_day, + crop_name = .data$crop_name[[1]], + date = .data$date + ), + .by = "crop_name" + ) + + complete_crop_timeseries +} diff --git a/workflows/irrigation-statewide/R/events_df.R b/workflows/irrigation-statewide/R/events_df.R new file mode 100644 index 0000000000..085e38c51b --- /dev/null +++ b/workflows/irrigation-statewide/R/events_df.R @@ -0,0 +1,68 @@ +#!/usr/bin/env Rscript + +make_event_df_parquet <- function(output_dir, ..., out_file = NULL) { + result <- make_event_df(...) + if (is.null(out_file)) { + pid_min <- min(result[["parcel_id"]], na.rm = TRUE) + pid_max <- max(result[["parcel_id"]], na.rm = TRUE) + dir.create(output_dir, showWarnings = FALSE, recursive = TRUE) + out_file <- file.path( + output_dir, + sprintf("%d_%d.parquet", pid_min, pid_max) + ) + } + arrow::write_parquet(result, out_file) + invisible(out_file) +} + +make_event_df <- function( + parcel_waterbalance, + n_ensemble = NULL, + frac_uncertainty = 0.1 +) { + pw_sub <- parcel_waterbalance |> + dplyr::filter(.data$irr > 0) |> + dplyr::relocate("parcel_id", "date", "crop_name", "canopy_cover", "irr") + + irr_events <- pw_sub |> + dplyr::mutate( + crop_code = .data$crop_name, + method = dplyr::case_when( + .data$crop_code == "Rice" ~ "flood", + TRUE ~ "canopy" + ) + ) |> + dplyr::select("parcel_id", "date", "amount_mm" = "irr", "method") + + if (is.null(n_ensemble)) { + return(irr_events) + } + + # Crude uncertainty propagation. We apply a uniform uncertainty multiplier + # across the entire irrigation time series. + unc_table <- irr_events |> + dplyr::distinct(.data$parcel_id) |> + dplyr::mutate( + unc_multi = purrr::map( + .data$parcel_id, + ~rnorm(n_ensemble, 1.0, frac_uncertainty) + ), + ens_id = purrr::map(.data$unc_multi, seq_along) + ) |> + tidyr::unnest(c("unc_multi", "ens_id")) |> + dplyr::mutate(ens_id = sprintf("irr_ens_%03d", .data$ens_id)) + + irr_events_unc <- irr_events |> + dplyr::left_join( + unc_table, + by = "parcel_id", + relationship = "many-to-many" + ) |> + dplyr::mutate( + amount_mm = .data$amount_mm * .data$unc_multi, + .keep = "unused" + ) |> + dplyr::relocate("parcel_id", "ens_id", "date") + + irr_events_unc +} diff --git a/workflows/irrigation-statewide/R/get.R b/workflows/irrigation-statewide/R/get.R new file mode 100644 index 0000000000..c4b09f3806 --- /dev/null +++ b/workflows/irrigation-statewide/R/get.R @@ -0,0 +1,108 @@ +#!/usr/bin/env Rscript +# library(PEcAn.data.land) +# parcel_ids <- parcel_id_batches[[1]] + +get_parcel_ids <- function(crops_path, n_parcels = NULL) { + parcel_ids <- arrow::open_dataset(crops_path) |> + dplyr::distinct(.data$parcel_id) |> + dplyr::pull(as_vector = TRUE) + if (is.null(n_parcels)) return(parcel_ids) + sample(parcel_ids, n_parcels) +} + +get_phenology <- function(mslsp_path, parcel_ids = NULL) { + dat_raw <- mslsp_to_canopycover(mslsp_path, parcel_ids) + phenology_raw <- dat_raw |> + dplyr::mutate( + parcel_id = as.integer(parcel_id), + landiq_SUBCLASS = as.integer(.data$landiq_SUBCLASS) + ) + # Resolve overlapping canopy_cover values. If a single date has multiple + # rows, we take the row with the largest canopy cover. + phenology <- phenology_raw |> + dplyr::slice_max(.data$canopy_cover, n = 1, by = c("parcel_id", "date")) |> + dplyr::relocate("parcel_id", "year", "date") + phenology +} + +get_etref <- function(cimis_etref_path, parcel_ids = NULL) { + dat <- arrow::open_dataset(cimis_etref_path) + if (!is.null(parcel_ids)) { + dat <- dplyr::filter(dat, .data$parcel_id %in% parcel_ids) + } + etref <- dat |> + dplyr::arrange(.data$parcel_id, .data$date) |> + dplyr::collect() + etref +} + +get_precip <- function(chirps_precip_path, parcel_ids = NULL) { + dat <- arrow::open_dataset(chirps_precip_path) + if (!is.null(parcel_ids)) { + dat <- dplyr::filter(dat, .data$parcel_id %in% parcel_ids) + } + precip <- dat |> + dplyr::arrange(.data$parcel_id, .data$date) |> + dplyr::collect() + precip +} + +#' @importFrom PEcAn.data.land bism_kc_by_crop crop_whc +get_crop_info <- function(crops_path, parcel_ids = NULL) { + # parcel_ids <- parcel_id_batches[[1]] + dat <- arrow::open_dataset(crops_path) + if (!is.null(parcel_ids)) { + dat <- dplyr::filter(dat, .data$parcel_id %in% .env$parcel_ids) + } + dlocal <- dat |> + dplyr::collect() + + #' NOTE: Some LandIQ classes/subclasses map onto multiple BISM crop + #' types. HACK: select just the first crop per class/subclass group. + bism_crop_unique <- bism_kc_by_crop |> + dplyr::distinct( + .data$landiq_class, + .data$landiq_subclass, + .data$crop_name + ) |> + dplyr::slice(1, .by = c("landiq_class", "landiq_subclass")) + + crops <- dlocal |> + dplyr::left_join( + bism_crop_unique, + by = c( + "CLASS" = "landiq_class", + "SUBCLASS" = "landiq_subclass" + ) + ) + + missing_crops <- dplyr::filter(crops, is.na(.data$crop_name)) + if (nrow(missing_crops) > 0) { + missing_crop_strs <- missing_crops |> + dplyr::distinct(.data$CLASS, .data$SUBCLASS) |> + dplyr::mutate( + string = glue::glue( + "CLASS: {.data$CLASS} ", + "SUBCLASS: {.data$SUBCLASS}" + ) + ) |> + dplyr::pull(.data$string) + warning( + "Skipping ", + nrow(missing_crops), + " rows with no matching BIS crop. Relevant pairs are: [", + paste(missing_crop_strs, collapse = "; "), + "]" + ) + } + crop_whc_sub <- dplyr::select( + crop_whc, + "crop_name", + "whc_min_frac", + "rooting_depth_m" + ) + crop_info <- crops |> + dplyr::filter(!is.na(.data$crop_name)) |> + dplyr::left_join(crop_whc_sub, by = "crop_name") + crop_info +} diff --git a/workflows/irrigation-statewide/R/misc.R b/workflows/irrigation-statewide/R/misc.R new file mode 100644 index 0000000000..eaa4cdfed7 --- /dev/null +++ b/workflows/irrigation-statewide/R/misc.R @@ -0,0 +1,5 @@ +#!/usr/bin/env Rscript + +split_into_batches <- function(x, batch_size) { + split(x, ceiling(seq_along(x) / batch_size)) +} diff --git a/workflows/irrigation-statewide/README.md b/workflows/irrigation-statewide/README.md new file mode 100644 index 0000000000..9e172ef4c2 --- /dev/null +++ b/workflows/irrigation-statewide/README.md @@ -0,0 +1,36 @@ +# Statewide irrigation events workflow + +This generates PEcAn event files for irrigation events across all of California. +The spatial unit is harmonized LandIQ parcels. + +The workflow uses `targets` for reproducibility, scalability, and incremental execution. + +# Setup + +This code assumes that you are running from the PEcAn root directory. +To ensure discovery of the targets script and store directories, you will need to set the `TAR_CONFIG` environment variable to point to the `_targets.yaml` file in this directory. +One way to do this is to create a `.Renviron` file in the PEcAn project root with the following contents: + +``` +TAR_CONFIG=workflows/irrigation-statewide/_targets.yaml +``` + +If you are running this code on the BU SCC, all the paths have already been preconfigured for you. +If you are running the code on another system, or if the paths have changed, modify the `config_paths.yml` file accordingly. + +# Execution + +This code ships with three different configurations (defined in `config.yml`) of the irrigation pipeline: + +- `small` (default) --- 1000 randomly selected parcels split into batches of 100 parcels each. The code will run locally, using as many CPUs as are defined by the `NSLOTS` environment variable (or 1 CPU, if `NSLOTS` is unset). +- `medium` --- 10,000 randomly selected parcels split into batches of 1000 parcels each. This will run across `n_remote_workers` (set to 15) SGE array jobs. +- `all` --- This will run all (~600,000) parcels in California in batches of 5000 parcels each. This will run on 60 SGE array jobs. + +These values can be modified by modifying the `config.yml` file. + +To run a specific configuration, set the `TAR_PROJECT` environment variable accordingly and run `Rscript -e 'targets::tar_make()'`. +For example, to run the full statewide pipeline (all 600K parcels), you can use a command like: + +``` +TAR_PROJECT=all Rscript -e "targets::tar_make()" +``` diff --git a/workflows/irrigation-statewide/_targets.R b/workflows/irrigation-statewide/_targets.R new file mode 100644 index 0000000000..12dfaccfd4 --- /dev/null +++ b/workflows/irrigation-statewide/_targets.R @@ -0,0 +1,200 @@ +#!/usr/bin/env Rscript + +library(targets) +library(tarchetypes) +library(crew) +library(crew.cluster) + +root_dir <- here::here("workflows/irrigation-statewide") +logdir <- file.path(root_dir, "_logs") +dir.create(logdir, showWarnings = FALSE, recursive = TRUE) + +targets_config <- Sys.getenv("TAR_CONFIG", file.path(root_dir, "_targets.yaml")) +Sys.setenv(TAR_CONFIG = targets_config) + +project <- Sys.getenv("TAR_PROJECT", "small") +config_base <- config::get( + file = file.path(root_dir, "config.yml"), + config = project +) +config_paths <- config::get( + file = file.path(root_dir, "config_paths.yml"), + config = Sys.getenv("IRRIGATION_PATHS_CONFIG", "default") +) +config <- config::merge(config_base, config_paths) + +n_parcels <- config[["n_parcels"]] +batch_size <- config[["batch_size"]] +n_remote_workers <- config[["n_remote_workers"]] +n_local_workers <- as.integer(Sys.getenv("NSLOTS", 1)) +exec_type <- config[["exec_type"]] +stopifnot(exec_type %in% c("cluster", "local")) +event_filename <- config[["event_filename"]] +n_irr_ensemble <- config[["n_irr_ensemble"]] + +message(glue::glue( + "PROJECT: {project}\n", + "Running {n_parcels} parcels in batches of {batch_size} parcels each.\n", + "Execution type: {exec_type} with ", + if (exec_type == "local") { + "{n_local_workers} workers.\n" + } else { + "{n_remote_workers} workers.\n" + }, + "Output will be saved to ", + "{file.path(config[['event_output_dir']], event_filename)}\n", + "Targets output will be stored in ", tar_config_get("store") +)) + +ctrl_local <- crew_controller_local( + name = "local", + workers = n_local_workers +) + +ctrl_sge <- crew_controller_sge( + name = "sge", + workers = n_remote_workers, + # TLS causes weird allocator bugs. We're on an internal network, so no TLS is + # probably fine. + tls = crew::crew_tls(mode = "none"), + options_cluster = crew_options_sge( + log_output = logdir, + script_lines = c( + # Activate pixi + 'eval "$(pixi shell-hook -s bash)"', + # Diagnostics + "echo 'PIXI environment:'", + "env | grep PIXI", + "echo 'R .libPaths():'", + "Rscript -e '.libPaths()'", + # prevent arrow parallelism + "export OMP_NUM_THREADS=1" + ) + ) +) + +res_local <- tar_resources( + crew = tar_resources_crew(controller = "local") +) +res_sge <- tar_resources( + crew = tar_resources_crew(controller = "sge") +) + +res_default <- if (exec_type == "local") { + message("Running locally") + res_local +} else if (exec_type == "cluster") { + message("Running via SGE cluster") + res_sge +} else { + stop("Unknown exec_type ", shQuote(exec_type)) +} + +tar_option_set( + controller = crew_controller_group(ctrl_local, ctrl_sge), + resources = res_default, + packages = c("ggplot2", "rlang", "PEcAn.data.land"), + imports = c("PEcAn.data.land") +) + +if (exec_type == "cluster") { + tar_option_set(storage = "worker", retrieval = "worker") +} + +tar_source(file.path(root_dir, "R")) + +list( + tar_target(crops_path, path.expand(config[["crops_path"]])), + tar_target(mslsp_path, path.expand(config[["mslsp_path"]])), + tar_target(cimis_etref_path, path.expand(config[["cimis_etref_path"]])), + tar_target(chirps_precip_path, path.expand(config[["chirps_precip_path"]])), + tar_target(ssurgo_weights_path, path.expand(config[["ssurgo_weights_path"]])), + tar_target(ssurgo_gdb_path, path.expand(config[["ssurgo_gdb_path"]])), + + tar_target(event_output_dir, path.expand(config[["event_output_dir"]])), + + tar_target(validated_paths, { + stopifnot( + file.exists(crops_path), + dir.exists(mslsp_path), + length(list.files(mslsp_path, "\\.parquet")) == 7, + dir.exists(cimis_etref_path), + dir.exists(chirps_precip_path), + file.exists(ssurgo_weights_path), + dir.exists(ssurgo_gdb_path) + ) + dir.create(event_output_dir, showWarnings = FALSE, recursive = TRUE) + TRUE + }), + + tar_target(parcel_ids, get_parcel_ids(crops_path, n_parcels)), + + tar_target( + parcel_id_batches, + split_into_batches(parcel_ids, batch_size), + iteration = "list" + ), + + tar_target( + phenology, + get_phenology(mslsp_path, parcel_id_batches), + pattern = map(parcel_id_batches), + format = "parquet" + ), + + tar_target( + etref, + get_etref(cimis_etref_path, parcel_id_batches), + pattern = map(parcel_id_batches), + format = "parquet" + ), + + tar_target( + precip, + get_precip(chirps_precip_path, parcel_id_batches), + pattern = map(parcel_id_batches), + format = "parquet" + ), + + tar_target( + crop_info, + get_crop_info(crops_path, parcel_id_batches), + pattern = map(parcel_id_batches), + format = "parquet" + ), + + tar_target( + crops_with_soil, + add_soil_awc(crop_info, ssurgo_weights_path, ssurgo_gdb_path), + pattern = map(crop_info), + format = "parquet" + ), + + tar_target( + complete_crop_timeseries, + make_crop_timeseries(crops_with_soil, phenology, precip, etref), + pattern = map(crops_with_soil, phenology, precip, etref), + format = "parquet" + ), + + tar_target( + parcel_waterbalance, + apply_water_balance(complete_crop_timeseries, "parcel_id"), + pattern = map(complete_crop_timeseries), + format = "parquet" + ), + + tar_target( + irr_events_df, + make_event_df_parquet( + file.path(event_output_dir, event_filename), + parcel_waterbalance, + n_ensemble = n_irr_ensemble, + frac_uncertainty = 0.1 + ), + pattern = map(parcel_waterbalance), + format = "file" + ), + + NULL +) diff --git a/workflows/irrigation-statewide/_targets.yaml b/workflows/irrigation-statewide/_targets.yaml new file mode 100644 index 0000000000..5958a0c0f6 --- /dev/null +++ b/workflows/irrigation-statewide/_targets.yaml @@ -0,0 +1,12 @@ +main: + script: workflows/irrigation-statewide/_targets.R + store: workflows/irrigation-statewide/_targets/ +small: + inherits: main + store: workflows/irrigation-statewide/_targets_small/ +medium: + inherits: main + store: workflows/irrigation-statewide/_targets_medium/ +all: + inherits: main + store: workflows/irrigation-statewide/_targets_all/ diff --git a/workflows/irrigation-statewide/check-result.R b/workflows/irrigation-statewide/check-result.R new file mode 100644 index 0000000000..79bca2627d --- /dev/null +++ b/workflows/irrigation-statewide/check-result.R @@ -0,0 +1,16 @@ +#!/usr/bin/env Rscript + +fname <- "../../event-outputs/irrigation_all" +dat <- arrow::open_dataset(fname) + +dat |> + head(20) |> + dplyr::collect() + +dat |> + dplyr::filter(parcel_id == 3657) |> + dplyr::collect() + +pids <- dat |> + dplyr::distinct(.data$parcel_id) |> + dplyr::pull() diff --git a/workflows/irrigation-statewide/config.yml b/workflows/irrigation-statewide/config.yml new file mode 100644 index 0000000000..4483c4a5bf --- /dev/null +++ b/workflows/irrigation-statewide/config.yml @@ -0,0 +1,23 @@ +default: + n_parcels: 1000 + batch_size: 100 + n_remote_workers: 1 + exec_type: "local" + event_filename: "irrigation_1000" + n_irr_ensemble: 20 + +small: + +medium: + n_parcels: 10000 + batch_size: 1000 + n_remote_workers: 15 + exec_type: "cluster" + event_filename: "irrigation_10000" + +all: + n_parcels: null # `null` means "all" + batch_size: 5000 + n_remote_workers: 60 + exec_type: "cluster" + event_filename: "irrigation_all" diff --git a/workflows/irrigation-statewide/config_paths.yml b/workflows/irrigation-statewide/config_paths.yml new file mode 100644 index 0000000000..64e981f73d --- /dev/null +++ b/workflows/irrigation-statewide/config_paths.yml @@ -0,0 +1,8 @@ +default: + event_output_dir: "/projectnb/dietzelab/ccmmf/usr/ashiklom/event-outputs" + crops_path: "/projectnb/dietzelab/ccmmf/LandIQ-harmonized-v4.1/crops_all_years.parq" + mslsp_path: "/projectnb/dietzelab/ccmmf/management/phenology/matched_landiq_mslsp_v4.1" + cimis_etref_path: "/projectnb/dietzelab/ccmmf/data/cimis-extracted" + chirps_precip_path: "/projectnb/dietzelab/ccmmf/data/chirps-extracted" + ssurgo_weights_path: "/projectnb/dietzelab/ccmmf/data_raw/ssurgo/ssurgo-weights.parquet" + ssurgo_gdb_path: "/projectnb/dietzelab/ccmmf/data_raw/ssurgo/gSSURGO_CA.gdb" diff --git a/workflows/irrigation-statewide/preprocessing/.clustermq_sge.tmpl b/workflows/irrigation-statewide/preprocessing/.clustermq_sge.tmpl new file mode 100644 index 0000000000..881add9c82 --- /dev/null +++ b/workflows/irrigation-statewide/preprocessing/.clustermq_sge.tmpl @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +#$ -N {{ job_name }} # job name +##$ -q default # submit to queue named "default" +#$ -j y # combine stdout/error in one file +#$ -o {{ log_file | /dev/null }} # output file +#$ -cwd # use pwd as work dir +#$ -V # use environment variable +#$ -t 1-{{ n_jobs }} # submit jobs as array +#$ -pe smp {{ cores | 1 }} # number of cores to use per job +#$ -l h_rt={{ walltime | 01:00:00 }} + +# ulimit -v $(( 1024 * {{ memory | 4096 }} )) + +CMQ_AUTH={{ auth }} pixi run Rscript -e "clustermq:::worker('{{ master }}')" diff --git a/workflows/irrigation-statewide/preprocessing/README.md b/workflows/irrigation-statewide/preprocessing/README.md new file mode 100644 index 0000000000..d4c090af2c --- /dev/null +++ b/workflows/irrigation-statewide/preprocessing/README.md @@ -0,0 +1,24 @@ +# Data preprocessing workflows for irrigation inputs + +This directory uses `pixi` for dependency management and `clustermq` for parallelizing work automatically across an SGE cluster (like the BU SCC). + +## CIMIS reference ET + +Prerequisites: Raw CIMIS ETref data downloaded from spatialcimis.water.ca.gov. + +- `cimis-01-weights.R` --- Pre-calculate the area weights of CIMIS ETref pixels for each harmonized LandIQ polygon using `exactextractr` (slow) +- `cimis-02-extract.R` --- Apply the weights to each CIMITS ETref raster file (fast) +- `cimis-03-combine.sql` --- Recombine the results into a properly hive-partitioned parquet dataset. + +## CHIRPS + +Prerequisites: Raw CHIRPS v2 data downloaded from https://data.chc.ucsb.edu/products/CHIRPS-2.0/global_daily/netcdf/p05/. + +- `chirps-preprocess.R` --- Entire workflow. For each year, use `exactextractr::exact_extract` to pull out the area-weighted values of CHIRPS for each harmonized LandIQ parcel. + +## SSURGO soil data + +Prerequisites: A downloaded copy of gSSURGO for all of California (https://nrcs.app.box.com/v/soils/folder/233398887779). + +- `ssurgo-01-spatial-weights.R` --- Calculate the area weights of each SSURGO mapping unit for each harmonized LandIQ parcel. Parallelized across batches of parcels. +- `ssurgo-02-combine.R` --- Recombine the batches into a single parquet file. diff --git a/workflows/irrigation-statewide/preprocessing/chirps-preprocess.R b/workflows/irrigation-statewide/preprocessing/chirps-preprocess.R new file mode 100644 index 0000000000..87b03239a2 --- /dev/null +++ b/workflows/irrigation-statewide/preprocessing/chirps-preprocess.R @@ -0,0 +1,77 @@ +#!/usr/bin/env Rscript + +chirps_dir <- "/projectnb/dietzelab/ccmmf/management/irrigation/" +chirpsfiles <- list.files( + chirps_dir, + "chirps-v2.0.*.nc", + full.names = TRUE +) + +parcel_file <- "/projectnb/dietzelab/ccmmf/LandIQ-harmonized-v4.1/parcels.gpkg" + +extract_chirps <- function(fname, parcel_file, outdir = "_results_chirps") { + # fname <- chirpsfiles[[1]] + parcels_sf <- sf::read_sf(parcel_file, use_stream = TRUE) + # parcels_sf <- head(parcels_sf_full, 500) + + dir.create(outdir, showWarnings = FALSE, recursive = TRUE) + + year <- basename(fname) |> + gsub( + pattern = "chirps-v2\\.0\\.(\\d+)\\.days_p05\\.nc", + replacement = "\\1" + ) |> + as.numeric() + + outfile <- file.path(outdir, paste0("chirps-", year, ".parquet")) + if (file.exists(outfile)) { + message("File exists ", outfile) + return(outfile) + } + + r <- terra::rast(fname) + terra::ext(r) <- c(-180, 180, -50, 50) + terra::crs(r) <- "EPSG:4326" + + parcels_proj <- sf::st_transform(parcels_sf, sf::st_crs(r)) + + vals <- exactextractr::exact_extract( + r, + parcels_proj, + "mean", + append_cols = "parcel_id" + ) + + date0 <- as.Date(paste0(year, "-01-01")) + vals_df <- dplyr::bind_cols(vals) |> + tibble::as_tibble() |> + tidyr::pivot_longer( + -c("parcel_id"), + names_to = "yday", + names_pattern = ".*\\.days_p05_(\\d+)$", + names_transform = as.integer, + values_to = "precip_mm_day" + ) |> + dplyr::mutate( + date = date0 + .data$yday, + .keep = "unused" + ) |> + dplyr::relocate("date", .after = "parcel_id") |> + dplyr::arrange(.data$parcel_id, .data$date) + + arrow::write_parquet(vals_df, outfile) + invisible(outfile) +} + +options( + clustermq.scheduler = "sge", + clustermq.template = ".clustermq_sge.tmpl" +) + +clustermq::Q( + fun = extract_chirps, + fname = chirpsfiles, + const = list(parcel_file = parcel_file), + n_jobs = length(chirpsfiles), + template = list(cores = 1, walltime = "06:00:00") +) diff --git a/workflows/irrigation-statewide/preprocessing/cimis-01-weights.R b/workflows/irrigation-statewide/preprocessing/cimis-01-weights.R new file mode 100644 index 0000000000..b652062a4c --- /dev/null +++ b/workflows/irrigation-statewide/preprocessing/cimis-01-weights.R @@ -0,0 +1,79 @@ +#!/usr/bin/env Rscript + +library(sf) +library(terra) +library(dplyr) +library(Matrix) +library(exactextractr) +library(arrow) + +parcel_file <- "/projectnb/dietzelab/ccmmf/LandIQ-harmonized-v4.1/parcels.gpkg" +cimis_dir <- "/projectnb/dietzelab/ccmmf/data_raw/cimis/cimis" + +outdir <- "_results_v2" +dir.create(outdir, showWarnings = FALSE, recursive = TRUE) + +parcels_sf <- read_sf(parcel_file, use_stream = TRUE) + +cimis_f0 <- file.path(cimis_dir, "2019", "01", "01", "ETo.asc.gz") +stopifnot(file.exists(cimis_f0)) + +# A subset of CIMIS files, like this one, have slightly smaller dimensions for +# some reason. Compute their weights separately. +cimis_falt <- file.path(cimis_dir, "2015", "03", "01", "ETo.asc.gz") +stopifnot(file.exists(cimis_falt)) + +n_parcels <- nrow(parcels_sf) + +calc_weights <- function(fname, rdsfile, parqfile, overwrite = FALSE) { + if (!overwrite && file.exists(rdsfile) && file.exists(parqfile)) { + message("Files exist") + return(NULL) + } + + r <- rast(file.path("/vsigzip", fname)) + crs(r) <- "EPSG:3310" + + n_cells <- ncell(r) + + raw_weights <- exact_extract( + r, + parcels_sf, + fun = NULL, + include_cell = TRUE, + include_cols = "parcel_id", + progress = TRUE + ) + + weights_df <- raw_weights |> + bind_rows(.id = "parcel_idx") |> + mutate(parcel_idx = as.integer(.data$parcel_idx)) |> + as_tibble() |> + mutate( + w = .data$coverage_fraction / sum(.data$coverage_fraction), + .by = "parcel_idx" + ) |> + select(all_of(c("parcel_idx", "parcel_id", "cell", "w"))) + + W <- sparseMatrix( + i = weights_df[["parcel_idx"]], + j = weights_df[["cell"]], + x = weights_df[["w"]], + dims = c(n_parcels, n_cells), + repr = "C" + ) + rownames(W) <- parcels_sf[["parcel_id"]] + + saveRDS(W, rdsfile) + write_parquet(weights_df, parqfile) + TRUE +} + +rdsfile_0 <- file.path(outdir, "spatial_weights.rds") +parqfile_0 <- file.path(outdir, "weights_df.parquet") + +calc_weights(cimis_f0, rdsfile_0, parqfile_0) + +rdsfile_alt <- file.path(outdir, "spatial_weights_alt.rds") +parqfile_alt <- file.path(outdir, "weights_df_alt.parquet") +calc_weights(cimis_falt, rdsfile_alt, parqfle_alt) diff --git a/workflows/irrigation-statewide/preprocessing/cimis-02-extract.R b/workflows/irrigation-statewide/preprocessing/cimis-02-extract.R new file mode 100644 index 0000000000..b0fcd2c87c --- /dev/null +++ b/workflows/irrigation-statewide/preprocessing/cimis-02-extract.R @@ -0,0 +1,98 @@ +#!/usr/bin/env Rscript + +options( + clustermq.scheduler = "sge", + clustermq.template = ".clustermq_sge.tmpl" +) + +library(terra) +library(progress) +library(arrow) +library(clustermq) + +n_workers <- 20 +walltime <- "02:00:00" + +outdir <- "_results_v2/daily-raw" +dir.create(outdir, recursive = TRUE, showWarnings = FALSE) + +W <- readRDS("_results_v2/spatial_weights.rds") + +cimis_manifest <- "cimis_files.txt" +years <- seq(2015, 2024) +if (!file.exists(cimis_manifest)) { + get_cimis_files <- function(year) { + ydir <- file.path(cimis_root, year) + stopifnot(dir.exists(ydir)) + list.files( + ydir, + pattern = "ETo\\.asc\\.gz$", + full.names = TRUE, + recursive = TRUE + ) + } + ylist <- map(years, get_cimis_files, .progress = TRUE) + cimis_files <- sort(unlist(ylist)) + writeLines(cimis_files, cimis_manifest) +} else { + cimis_files <- readLines(cimis_manifest) +} + +# Extract +process_file <- function(fname, W, outdir) { + day <- basename(dirname(fname)) + month <- basename(dirname(dirname(fname))) + year <- basename(dirname(dirname(dirname(fname)))) + datestr <- paste(year, month, day, sep = "-") + outfile <- file.path(outdir, paste0(datestr, ".parquet")) + if (file.exists(outfile)) { + return(outfile) + } + r <- tryCatch({ + terra::rast(file.path("/vsigzip", fname)) + }, error = function(e) { + # Some files aren't actually zipped. Try them this way. + message("Error reading zipped. Trying unzipped. --- ", fname) + terra::rast(fname) + }) + rsize <- terra::size(r) + if (rsize == 285600) { + # Alternate size files -- use alternate weights + W <- readRDS("_results_v2/spatial_weights_alt.rds") + } else if (rsize != 276000) { + stop("File ", fname, " has unexpected size ", rsize) + } + terra::crs(r) <- "EPSG:3310" + v <- terra::values(r, mat = FALSE) + date <- as.Date(datestr) + na_mask <- is.na(v) + v[na_mask] <- 0 + valid_mask <- as.numeric(!na_mask) + weight_sums <- as.numeric(W %*% valid_mask) + et_vals <- as.numeric(W %*% v) + et_vals[weight_sums == 0] <- NA_real_ # All values are NA + result <- tibble::tibble( + parcel_id = as.numeric(rownames(W)), + date = date, + etref_mm_day = et_vals + ) + arrow::write_parquet(result, outfile) + invisible(outfile) +} + +# outfiles_raw <- purrr::map( +# cimis_files, +# process_file, +# W = W, +# outdir = outdir, +# .progress = TRUE +# ) + +cimis_long <- Q( + fun = process_file, + fname = cimis_files, + const = list(W = W, outdir = outdir), + n_jobs = n_workers, # SGE array size — persistent worker processes + template = list(cores = 1, walltime = walltime), + fail_on_error = FALSE +) diff --git a/workflows/irrigation-statewide/preprocessing/cimis-03-combine.sql b/workflows/irrigation-statewide/preprocessing/cimis-03-combine.sql new file mode 100644 index 0000000000..95805898c0 --- /dev/null +++ b/workflows/irrigation-statewide/preprocessing/cimis-03-combine.sql @@ -0,0 +1,27 @@ +-- DuckDB SQL script +-- Run with: +-- duckdb < 03-combine.sql +-- +-- Or, interactively: +-- 1. Launch duckdb -- `duckdb` +-- 2. From duckdb -- `.read 03-combine.sql` + +SET memory_limit = '32GB'; + +COPY ( + SELECT + parcel_id, + date, + etref_mm_day, + year(date) AS year + FROM read_parquet('_results_v2/daily-raw/*.parquet') + ORDER BY parcel_id ASC +) +TO '_results_v2/cimis-extracted' +( + FORMAT PARQUET, + PARTITION_BY (year), + OVERWRITE_OR_IGNORE, + COMPRESSION 'ZSTD' +); + diff --git a/workflows/irrigation-statewide/preprocessing/ssurgo-01-spatial-weights.R b/workflows/irrigation-statewide/preprocessing/ssurgo-01-spatial-weights.R new file mode 100644 index 0000000000..e96ade3734 --- /dev/null +++ b/workflows/irrigation-statewide/preprocessing/ssurgo-01-spatial-weights.R @@ -0,0 +1,130 @@ +#!/usr/bin/env Rscript + +options( + clustermq.scheduler = "sge", + clustermq.template = ".clustermq_sge.tmpl" +) + +library(sf) +library(dplyr) +library(arrow) # for efficient weight storage +library(clustermq) + +outdir <- "_results" +dir.create(outdir, showWarnings = FALSE, recursive = TRUE) + +# Number of parcels to process at a time +# Higher numbers mean a more intensive intersection. +# Lower numbers waste more cycles re-reading the (big) mupolygons data frame. +parcel_chunk_size <- 2000 + +# Transform ssurgo mupolygon column to parquet for faster reading +gdb_path <- "/projectnb/dietzelab/ccmmf/data_raw/ssurgo/gSSURGO_CA.gdb" +mupoly_path <- "./ssurgo_mupolygons.parquet" +if (!file.exists(mupoly_path)) { + message("Creating parquet version of mupolygons for faster reads") + message("01 - reading mupoly") + mupoly_raw <- sf::read_sf( + gdb_path, + layer = "MUPOLYGON", + use_stream = TRUE, + as_tibble = TRUE + ) + message("02 - validating and transforming") + mupoly <- mupoly_raw |> + sf::st_make_valid() |> + sf::st_transform(crs = "EPSG:3310") + message("03 - writing parquet") + sf::write_sf(mupoly, mupoly_path, driver = "Parquet") + rm(mupoly_raw, mupoly) +} + +parcels_path <- "/projectnb/dietzelab/ccmmf/LandIQ-harmonized-v4.1/parcels.gpkg" #nolint + +parcel_ids <- st_read( + parcels_path, + query = "SELECT DISTINCT parcel_id FROM parcels", + geometry_column = NULL, + use_stream = TRUE +)[["parcel_id"]] + +parcel_chunks <- split(parcel_ids, parcel_ids %/% parcel_chunk_size) +parcel_mins <- lapply(parcel_chunks, min) +parcel_maxs <- lapply(parcel_chunks, max) + +get_weights <- function( + parcel_min, + parcel_max, + parcels_path, + mupoly_path, + outdir +) { + # parcel_min <- parcel_mins[[1]] + # parcel_max <- parcel_maxs[[1]] + stopifnot( + file.exists(mupoly_path), + file.exists(parcels_path) + ) + outfile <- file.path( + outdir, + sprintf("%d-%d.parquet", parcel_min, parcel_max) + ) + if (file.exists(outfile)) { + message("File exists: ", outfile) + return(invisible(outfile)) + } + parcels <- sf::read_sf( + parcels_path, + query = sprintf( + paste( + "SELECT parcel_id, geom FROM parcels", + "WHERE parcel_id >= %d AND parcel_id <= %d" + ), + parcel_min, parcel_max + ), + use_stream = TRUE + ) + mupolygon <- sf::read_sf( + mupoly_path, + # use_stream = TRUE, + as_tibble = TRUE + )["MUKEY"] + intersect <- sf::st_intersection(parcels, mupolygon) + weights <- intersect |> + dplyr::mutate( + area_m2 = as.numeric(sf::st_area(.data$geom)), + mukey = as.character(.data$MUKEY) + ) |> + sf::st_drop_geometry() |> + dplyr::mutate( + weight = .data$area_m2 / sum(.data$area_m2), + .by = "parcel_id" + ) |> + dplyr::select("parcel_id", "mukey", "area_m2", "weight") + arrow::write_parquet(weights, outfile) + invisible(outfile) +} + +# for (i in seq_along(parcel_mins)) { +# message("Trying ", i) +# get_weights( +# parcel_mins[[i]], +# parcel_maxs[[i]], +# parcels_path, +# mupoly_path, +# outdir +# ) +# } + +Q( + get_weights, + parcel_min = parcel_mins, + parcel_max = parcel_maxs, + const = list( + parcels_path = parcels_path, + mupoly_path = mupoly_path, + outdir = outdir + ), + n_jobs = 30, + template = list(cores = 1, walltime = "05:00:00") +) diff --git a/workflows/irrigation-statewide/preprocessing/ssurgo-02-combine.R b/workflows/irrigation-statewide/preprocessing/ssurgo-02-combine.R new file mode 100644 index 0000000000..36c5763074 --- /dev/null +++ b/workflows/irrigation-statewide/preprocessing/ssurgo-02-combine.R @@ -0,0 +1,47 @@ +#!/usr/bin/env Rscript + +library(arrow) +library(duckdb) + +conn <- dbConnect(duckdb(), dbdir = ":memory:") + +dbExecute(conn, "SET memory_limit = '32GB'") + +dbExecute(conn, " + COPY ( + SELECT + * + FROM read_parquet('_results/*.parquet') + ORDER BY parcel_id ASC + ) + TO 'ssurgo-weights.parquet' + ( + FORMAT PARQUET, + OVERWRITE_OR_IGNORE, + COMPRESSION 'ZSTD' + ); + " +) + +dbDisconnect(conn, shutdown = TRUE) + +# Test to confirm we can open +message( + "Testing to confirm we can open the data ", + "and it produces valid weights." +) +dat <- open_dataset("ssurgo-weights.parquet") + +dsub <- dat |> + dplyr::filter(parcel_id %in% c(1, 100, 1000, 10000, 100000)) |> + dplyr::collect() +print(dsub) + +dat |> + dplyr::summarize( + wt = sum(weight), + delta = abs(wt - 1), + .by = "parcel_id" + ) |> + dplyr::arrange(dplyr::desc(delta)) |> + dplyr::collect() diff --git a/workflows/irrigation-statewide/push-to-carb.sh b/workflows/irrigation-statewide/push-to-carb.sh new file mode 100644 index 0000000000..ef0822b44f --- /dev/null +++ b/workflows/irrigation-statewide/push-to-carb.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +aws s3 sync --profile ccmmf \ + /projectnb/dietzelab/ccmmf/management/irrigation_event_files/ \ + s3://carb/management/irrigation/v1.0/