From 98bfb9f24d263bbe6a017c0753d2d7bc24bc1511 Mon Sep 17 00:00:00 2001 From: LRydin Date: Fri, 16 Jan 2026 20:02:42 +0100 Subject: [PATCH] interpolating biomass and cost data between usual 5-year increments --- rules/retrieve.smk | 14 +- scripts/build_biomass_potentials.py | 101 +++++++++-- scripts/retrieve_cost_data_interpolated.py | 200 +++++++++++++++++++++ 3 files changed, 296 insertions(+), 19 deletions(-) create mode 100644 scripts/retrieve_cost_data_interpolated.py diff --git a/rules/retrieve.smk b/rules/retrieve.smk index 2b5c09a95c..893f5162cb 100755 --- a/rules/retrieve.smk +++ b/rules/retrieve.smk @@ -351,12 +351,18 @@ if (COSTS_DATASET := dataset_version("costs"))["source"] in [ ]: rule retrieve_cost_data: - input: - costs=storage(COSTS_DATASET["url"] + "/costs_{planning_horizons}.csv"), output: costs=COSTS_DATASET["folder"] + "/costs_{planning_horizons}.csv", - run: - copy2(input["costs"], output["costs"]) + log: + "logs/retrieve_cost_data/costs_{planning_horizons}.log", + retries: 2 + params: + costs_url=COSTS_DATASET["url"], + min_year=2020, + max_year=2050, + step=5, + script: + "../scripts/retrieve_cost_data_interpolated.py" if (POWERPLANTS_DATASET := dataset_version("powerplants"))["source"] in [ diff --git a/scripts/build_biomass_potentials.py b/scripts/build_biomass_potentials.py index a5373d10a0..ccba94eae2 100755 --- a/scripts/build_biomass_potentials.py +++ b/scripts/build_biomass_potentials.py @@ -4,6 +4,12 @@ """ Compute biogas and solid biomass potentials for each clustered model region using data from JRC ENSPRESO. + +Potentials are taken from the discrete ENSPRESO reporting years and, if +requested, interpolated linearly between the nearest available years. +Additional (assumed unsustainable) potentials are derived by comparison with +Eurostat primary production data and scaled by year-dependent shares from the +configuration. """ import logging @@ -19,6 +25,55 @@ AVAILABLE_BIOMASS_YEARS = [2010, 2020, 2030, 2040, 2050] +def _year_piecewise_linear(param_by_year, year: int) -> float: + """ + Return a piecewise-linear interpolation of a year-indexed parameter map. + + The function interpolates linearly between the two anchor years bracketing + ``year``. If ``year`` coincides with an anchor year, the corresponding value + is returned. If ``year`` lies outside the anchor range, the function clamps + to the nearest endpoint value. + + Parameters + ---------- + param_by_year : Mapping[int, float] + Mapping from year to parameter value (e.g. loaded from the YAML config). + Keys are interpreted as years and cast to integers. + year : int + Target year for which to obtain the interpolated parameter value. + + Returns + ------- + float + Interpolated (or clamped) parameter value for the given year. + """ + + if param_by_year is None or len(param_by_year) == 0: + raise ValueError("Parameter mapping is empty or None.") + + # ensure int keys + float values + xs = np.array(sorted(int(k) for k in param_by_year.keys()), dtype=int) + ys = np.array([float(param_by_year[int(x)]) for x in xs], dtype=float) + + # exact hit + if year in set(xs.tolist()): + return float(param_by_year[int(year)]) + + # clamp outside range + if year <= xs[0]: + return float(ys[0]) + if year >= xs[-1]: + return float(ys[-1]) + + # bracket year by nearest anchors + i_right = int(np.searchsorted(xs, year, side="right")) + x0, x1 = xs[i_right - 1], xs[i_right] + y0, y1 = ys[i_right - 1], ys[i_right] + + # linear interpolation within this interval + return float(y0 + (year - x0) * (y1 - y0) / (x1 - x0)) + + def _calc_unsustainable_potential(df, df_unsustainable, share_unsus, resource_type): """ Calculate the unsustainable biomass potential for a given resource type or @@ -303,8 +358,9 @@ def add_unsustainable_potentials(df): df_unsustainable = df_unsustainable[bio_carriers] - # Phase out unsustainable biomass potentials linearly from 2020 to 2035 while phasing in sustainable potentials - share_unsus = params.get("share_unsustainable_use_retained").get(investment_year) + # Scale sustainable/unsustainable potentials according to year-dependent shares + # specified in the biomass configuration (with piecewise-linear interpolation). + share_unsus = _year_piecewise_linear(params.get("share_unsustainable_use_retained"), investment_year) df_wo_ch = df.drop(df.filter(regex=r"CH\d*", axis=0).index) @@ -326,7 +382,8 @@ def add_unsustainable_potentials(df): resource_type="gasoline|diesel|kerosene|liquid", ) - share_sus = params.get("share_sustainable_potential_available").get(investment_year) + share_sus = _year_piecewise_linear(params.get("share_sustainable_potential_available"), investment_year) + df.loc[df_wo_ch.index] *= share_sus df = df.join(df_wo_ch.filter(like="unsustainable")).fillna(0) @@ -353,24 +410,38 @@ def add_unsustainable_potentials(df): year = params["year"] if overnight else investment_year scenario = params["scenario"] - if year > 2050: - logger.info("No biomass potentials for years after 2050, using 2050.") - max_year = max(AVAILABLE_BIOMASS_YEARS) + max_year = max(AVAILABLE_BIOMASS_YEARS) + + if year > max_year: + logger.info(f"No biomass potentials for years after {max_year}, using {max_year}.") enspreso = enspreso_biomass_potentials(max_year, scenario) elif year not in AVAILABLE_BIOMASS_YEARS: - before = int(np.floor(year / 10) * 10) - after = int(np.ceil(year / 10) * 10) - logger.info( - f"No biomass potentials for {year}, interpolating linearly between {before} and {after}." - ) + years = np.array(sorted(AVAILABLE_BIOMASS_YEARS), dtype=int) + + if year <= years.min(): + before = after = int(years.min()) + logger.info(f"No biomass potentials for {year}, using {before}.") + enspreso = enspreso_biomass_potentials(before, scenario) + + elif year >= years.max(): + before = after = int(years.max()) + logger.info(f"No biomass potentials for {year}, using {before}.") + enspreso = enspreso_biomass_potentials(before, scenario) + + else: + before = int(years[years < year].max()) + after = int(years[years > year].min()) - enspreso_before = enspreso_biomass_potentials(before, scenario) - enspreso_after = enspreso_biomass_potentials(after, scenario) + logger.info( + f"No biomass potentials for {year}, interpolating linearly between {before} and {after}." + ) - fraction = (year - before) / (after - before) + enspreso_before = enspreso_biomass_potentials(before, scenario) + enspreso_after = enspreso_biomass_potentials(after, scenario) - enspreso = enspreso_before + fraction * (enspreso_after - enspreso_before) + fraction = (year - before) / (after - before) + enspreso = enspreso_before + fraction * (enspreso_after - enspreso_before) else: logger.info(f"Using biomass potentials for {year}.") diff --git a/scripts/retrieve_cost_data_interpolated.py b/scripts/retrieve_cost_data_interpolated.py new file mode 100644 index 0000000000..302048978d --- /dev/null +++ b/scripts/retrieve_cost_data_interpolated.py @@ -0,0 +1,200 @@ +# SPDX-FileCopyrightText: Contributors to PyPSA-Eur +# +# SPDX-License-Identifier: MIT +""" +Retrieve technology cost data from the technology-data repository. + +The upstream dataset provides costs in 5-year increments. For intermediate +planning years, this script linearly interpolates between the nearest bracketing +anchor years and writes a synthetic costs_{planning_horizons}.csv. +""" + +import logging +from pathlib import Path +import tempfile + +import numpy as np +import pandas as pd +import requests + +logger = logging.getLogger(__name__) + + +def _anchor_years(year: int, *, min_year: int = 2020, max_year: int = 2050, step: int = 5) -> tuple[int, int]: + """ + Return bracketing anchor years for the given target year. + + Parameters + ---------- + year : int + Target year. + min_year, max_year : int + Lower/upper bounds of the dataset coverage. Years outside the range are + clamped to the nearest endpoint. + step : int + Spacing of anchor years (default: 5 years). + + Returns + ------- + tuple[int, int] + (before, after) anchor years. If year is on an anchor, returns (year, year). + """ + y = int(year) + + if y <= min_year: + return min_year, min_year + if y >= max_year: + return max_year, max_year + if y % step == 0: + return y, y + + before = (y // step) * step + after = before + step + return before, after + + +def _download(url: str, path: Path, *, timeout: int = 120) -> None: + """ + Download a URL to a local file and fail fast on HTTP errors. + """ + logger.info("Downloading %s", url) + r = requests.get(url, stream=True, timeout=timeout) + r.raise_for_status() + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "wb") as f: + for chunk in r.iter_content(chunk_size=1024 * 1024): + if chunk: + f.write(chunk) + + +def _interpolate_costs_csv( + path_before: Path, + path_after: Path, + year_before: int, + year_after: int, + year_target: int, + path_out: Path, +) -> None: + """ + Interpolate technology-data cost tables (long format) for a target year. + + The technology-data cost tables are expected to contain a numeric 'value' + column and a set of identifier columns (e.g. technology/parameter/unit). + The interpolation is performed on rows aligned by common identifier columns. + + Parameters + ---------- + path_before, path_after : Path + Input CSV paths for the two anchor years. + year_before, year_after : int + Anchor years. + year_target : int + Target year. + path_out : Path + Output CSV path. + """ + y0 = int(year_before) + y1 = int(year_after) + yt = int(year_target) + + if y0 == y1: + df = pd.read_csv(path_before) + df.to_csv(path_out, index=False) + return + + frac = (yt - y0) / (y1 - y0) + + df0 = pd.read_csv(path_before) + df1 = pd.read_csv(path_after) + + if "value" not in df0.columns or "value" not in df1.columns: + raise ValueError("Expected a 'value' column in technology-data cost tables.") + + # Prefer stable identifier columns if present; otherwise fall back to shared non-'value' columns. + preferred_keys = ["technology", "parameter", "unit"] + key_cols = [c for c in preferred_keys if c in df0.columns and c in df1.columns] + if not key_cols: + shared = [c for c in df0.columns if c in df1.columns and c != "value"] + if not shared: + raise ValueError("Could not determine common identifier columns to align cost tables.") + key_cols = shared + + df0i = df0.set_index(key_cols) + df1i = df1.set_index(key_cols) + + all_idx = df0i.index.union(df1i.index) + + v0 = pd.to_numeric(df0i.reindex(all_idx)["value"], errors="coerce") + v1 = pd.to_numeric(df1i.reindex(all_idx)["value"], errors="coerce") + + # Interpolate where both endpoints exist; otherwise carry the available endpoint. + v = v0 + frac * (v1 - v0) + v = v.where(~(v0.isna() & ~v1.isna()), v1) + v = v.where(~(v1.isna() & ~v0.isna()), v0) + + meta0 = df0i.reindex(all_idx).drop(columns=["value"], errors="ignore") + meta1 = df1i.reindex(all_idx).drop(columns=["value"], errors="ignore") + meta = meta0.combine_first(meta1) + + out = meta.copy() + out["value"] = v.astype(float) + + path_out.parent.mkdir(parents=True, exist_ok=True) + + out_df = out.reset_index() + + # Reorder columns to match the original file column order (use df0 as template). + template_cols = list(df0.columns) + desired_cols = [c for c in template_cols if c in out_df.columns] + [ + c for c in out_df.columns if c not in template_cols + ] + out_df = out_df[desired_cols] + + out_df.to_csv(path_out, index=False) + + +if __name__ == "__main__": + # Minimal logging setup compatible with Snakemake. + log_file = None + if "snakemake" in globals() and getattr(snakemake, "log", None): + # snakemake.log may be a list-like container + try: + log_file = snakemake.log[0] + except Exception: + log_file = None + + logging.basicConfig( + filename=log_file, + level=logging.INFO, + format="%(levelname)s:%(name)s:%(message)s", + ) + + year = int(snakemake.wildcards.planning_horizons) + base_url = str(snakemake.params.costs_url).rstrip("/") + + min_year = int(getattr(snakemake.params, "min_year", 2020)) + max_year = int(getattr(snakemake.params, "max_year", 2050)) + step = int(getattr(snakemake.params, "step", 5)) + + before, after = _anchor_years(year, min_year=min_year, max_year=max_year, step=step) + + out_path = Path(snakemake.output.costs) + out_path.parent.mkdir(parents=True, exist_ok=True) + + if before == after: + url = f"{base_url}/costs_{before}.csv" + _download(url, out_path) + logger.info("Wrote %s (no interpolation needed).", out_path) + else: + with tempfile.TemporaryDirectory() as tmp: + tmp = Path(tmp) + f0 = tmp / f"costs_{before}.csv" + f1 = tmp / f"costs_{after}.csv" + + _download(f"{base_url}/costs_{before}.csv", f0) + _download(f"{base_url}/costs_{after}.csv", f1) + + _interpolate_costs_csv(f0, f1, before, after, year, out_path) + + logger.info("Wrote %s (interpolated between %s and %s).", out_path, before, after) +