Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions rules/retrieve.smk
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,18 @@ if (COSTS_DATASET := dataset_version("costs"))["source"] in [
]:

rule retrieve_cost_data:
input:
costs=storage(COSTS_DATASET["url"] + "/costs_{planning_horizons}.csv"),
output:
costs=COSTS_DATASET["folder"] + "/costs_{planning_horizons}.csv",
run:
copy2(input["costs"], output["costs"])
log:
"logs/retrieve_cost_data/costs_{planning_horizons}.log",
retries: 2
params:
costs_url=COSTS_DATASET["url"],
min_year=2020,
max_year=2050,
step=5,
script:
"../scripts/retrieve_cost_data_interpolated.py"


if (POWERPLANTS_DATASET := dataset_version("powerplants"))["source"] in [
Expand Down
101 changes: 86 additions & 15 deletions scripts/build_biomass_potentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
"""
Compute biogas and solid biomass potentials for each clustered model region
using data from JRC ENSPRESO.

Potentials are taken from the discrete ENSPRESO reporting years and, if
requested, interpolated linearly between the nearest available years.
Additional (assumed unsustainable) potentials are derived by comparison with
Eurostat primary production data and scaled by year-dependent shares from the
configuration.
"""

import logging
Expand All @@ -19,6 +25,55 @@
AVAILABLE_BIOMASS_YEARS = [2010, 2020, 2030, 2040, 2050]


def _year_piecewise_linear(param_by_year, year: int) -> float:
"""
Return a piecewise-linear interpolation of a year-indexed parameter map.

The function interpolates linearly between the two anchor years bracketing
``year``. If ``year`` coincides with an anchor year, the corresponding value
is returned. If ``year`` lies outside the anchor range, the function clamps
to the nearest endpoint value.

Parameters
----------
param_by_year : Mapping[int, float]
Mapping from year to parameter value (e.g. loaded from the YAML config).
Keys are interpreted as years and cast to integers.
year : int
Target year for which to obtain the interpolated parameter value.

Returns
-------
float
Interpolated (or clamped) parameter value for the given year.
"""

if param_by_year is None or len(param_by_year) == 0:
raise ValueError("Parameter mapping is empty or None.")

# ensure int keys + float values
xs = np.array(sorted(int(k) for k in param_by_year.keys()), dtype=int)
ys = np.array([float(param_by_year[int(x)]) for x in xs], dtype=float)

# exact hit
if year in set(xs.tolist()):
return float(param_by_year[int(year)])

# clamp outside range
if year <= xs[0]:
return float(ys[0])
if year >= xs[-1]:
return float(ys[-1])

# bracket year by nearest anchors
i_right = int(np.searchsorted(xs, year, side="right"))
x0, x1 = xs[i_right - 1], xs[i_right]
y0, y1 = ys[i_right - 1], ys[i_right]

# linear interpolation within this interval
return float(y0 + (year - x0) * (y1 - y0) / (x1 - x0))


def _calc_unsustainable_potential(df, df_unsustainable, share_unsus, resource_type):
"""
Calculate the unsustainable biomass potential for a given resource type or
Expand Down Expand Up @@ -303,8 +358,9 @@ def add_unsustainable_potentials(df):

df_unsustainable = df_unsustainable[bio_carriers]

# Phase out unsustainable biomass potentials linearly from 2020 to 2035 while phasing in sustainable potentials
share_unsus = params.get("share_unsustainable_use_retained").get(investment_year)
# Scale sustainable/unsustainable potentials according to year-dependent shares
# specified in the biomass configuration (with piecewise-linear interpolation).
share_unsus = _year_piecewise_linear(params.get("share_unsustainable_use_retained"), investment_year)

df_wo_ch = df.drop(df.filter(regex=r"CH\d*", axis=0).index)

Expand All @@ -326,7 +382,8 @@ def add_unsustainable_potentials(df):
resource_type="gasoline|diesel|kerosene|liquid",
)

share_sus = params.get("share_sustainable_potential_available").get(investment_year)
share_sus = _year_piecewise_linear(params.get("share_sustainable_potential_available"), investment_year)

df.loc[df_wo_ch.index] *= share_sus

df = df.join(df_wo_ch.filter(like="unsustainable")).fillna(0)
Expand All @@ -353,24 +410,38 @@ def add_unsustainable_potentials(df):
year = params["year"] if overnight else investment_year
scenario = params["scenario"]

if year > 2050:
logger.info("No biomass potentials for years after 2050, using 2050.")
max_year = max(AVAILABLE_BIOMASS_YEARS)
max_year = max(AVAILABLE_BIOMASS_YEARS)

if year > max_year:
logger.info(f"No biomass potentials for years after {max_year}, using {max_year}.")
enspreso = enspreso_biomass_potentials(max_year, scenario)

elif year not in AVAILABLE_BIOMASS_YEARS:
before = int(np.floor(year / 10) * 10)
after = int(np.ceil(year / 10) * 10)
logger.info(
f"No biomass potentials for {year}, interpolating linearly between {before} and {after}."
)
years = np.array(sorted(AVAILABLE_BIOMASS_YEARS), dtype=int)

if year <= years.min():
before = after = int(years.min())
logger.info(f"No biomass potentials for {year}, using {before}.")
enspreso = enspreso_biomass_potentials(before, scenario)

elif year >= years.max():
before = after = int(years.max())
logger.info(f"No biomass potentials for {year}, using {before}.")
enspreso = enspreso_biomass_potentials(before, scenario)

else:
before = int(years[years < year].max())
after = int(years[years > year].min())

enspreso_before = enspreso_biomass_potentials(before, scenario)
enspreso_after = enspreso_biomass_potentials(after, scenario)
logger.info(
f"No biomass potentials for {year}, interpolating linearly between {before} and {after}."
)

fraction = (year - before) / (after - before)
enspreso_before = enspreso_biomass_potentials(before, scenario)
enspreso_after = enspreso_biomass_potentials(after, scenario)

enspreso = enspreso_before + fraction * (enspreso_after - enspreso_before)
fraction = (year - before) / (after - before)
enspreso = enspreso_before + fraction * (enspreso_after - enspreso_before)

else:
logger.info(f"Using biomass potentials for {year}.")
Expand Down
200 changes: 200 additions & 0 deletions scripts/retrieve_cost_data_interpolated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# SPDX-FileCopyrightText: Contributors to PyPSA-Eur <https://github.com/pypsa/pypsa-eur>
#
# SPDX-License-Identifier: MIT
"""
Retrieve technology cost data from the technology-data repository.

The upstream dataset provides costs in 5-year increments. For intermediate
planning years, this script linearly interpolates between the nearest bracketing
anchor years and writes a synthetic costs_{planning_horizons}.csv.
"""

import logging
from pathlib import Path
import tempfile

import numpy as np
import pandas as pd
import requests

logger = logging.getLogger(__name__)


def _anchor_years(year: int, *, min_year: int = 2020, max_year: int = 2050, step: int = 5) -> tuple[int, int]:
"""
Return bracketing anchor years for the given target year.

Parameters
----------
year : int
Target year.
min_year, max_year : int
Lower/upper bounds of the dataset coverage. Years outside the range are
clamped to the nearest endpoint.
step : int
Spacing of anchor years (default: 5 years).

Returns
-------
tuple[int, int]
(before, after) anchor years. If year is on an anchor, returns (year, year).
"""
y = int(year)

if y <= min_year:
return min_year, min_year
if y >= max_year:
return max_year, max_year
if y % step == 0:
return y, y

before = (y // step) * step
after = before + step
return before, after


def _download(url: str, path: Path, *, timeout: int = 120) -> None:
"""
Download a URL to a local file and fail fast on HTTP errors.
"""
logger.info("Downloading %s", url)
r = requests.get(url, stream=True, timeout=timeout)
r.raise_for_status()
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "wb") as f:
for chunk in r.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)


def _interpolate_costs_csv(
path_before: Path,
path_after: Path,
year_before: int,
year_after: int,
year_target: int,
path_out: Path,
) -> None:
"""
Interpolate technology-data cost tables (long format) for a target year.

The technology-data cost tables are expected to contain a numeric 'value'
column and a set of identifier columns (e.g. technology/parameter/unit).
The interpolation is performed on rows aligned by common identifier columns.

Parameters
----------
path_before, path_after : Path
Input CSV paths for the two anchor years.
year_before, year_after : int
Anchor years.
year_target : int
Target year.
path_out : Path
Output CSV path.
"""
y0 = int(year_before)
y1 = int(year_after)
yt = int(year_target)

if y0 == y1:
df = pd.read_csv(path_before)
df.to_csv(path_out, index=False)
return

frac = (yt - y0) / (y1 - y0)

df0 = pd.read_csv(path_before)
df1 = pd.read_csv(path_after)

if "value" not in df0.columns or "value" not in df1.columns:
raise ValueError("Expected a 'value' column in technology-data cost tables.")

# Prefer stable identifier columns if present; otherwise fall back to shared non-'value' columns.
preferred_keys = ["technology", "parameter", "unit"]
key_cols = [c for c in preferred_keys if c in df0.columns and c in df1.columns]
if not key_cols:
shared = [c for c in df0.columns if c in df1.columns and c != "value"]
if not shared:
raise ValueError("Could not determine common identifier columns to align cost tables.")
key_cols = shared

df0i = df0.set_index(key_cols)
df1i = df1.set_index(key_cols)

all_idx = df0i.index.union(df1i.index)

v0 = pd.to_numeric(df0i.reindex(all_idx)["value"], errors="coerce")
v1 = pd.to_numeric(df1i.reindex(all_idx)["value"], errors="coerce")

# Interpolate where both endpoints exist; otherwise carry the available endpoint.
v = v0 + frac * (v1 - v0)
v = v.where(~(v0.isna() & ~v1.isna()), v1)
v = v.where(~(v1.isna() & ~v0.isna()), v0)

meta0 = df0i.reindex(all_idx).drop(columns=["value"], errors="ignore")
meta1 = df1i.reindex(all_idx).drop(columns=["value"], errors="ignore")
meta = meta0.combine_first(meta1)

out = meta.copy()
out["value"] = v.astype(float)

path_out.parent.mkdir(parents=True, exist_ok=True)

out_df = out.reset_index()

# Reorder columns to match the original file column order (use df0 as template).
template_cols = list(df0.columns)
desired_cols = [c for c in template_cols if c in out_df.columns] + [
c for c in out_df.columns if c not in template_cols
]
out_df = out_df[desired_cols]

out_df.to_csv(path_out, index=False)


if __name__ == "__main__":
# Minimal logging setup compatible with Snakemake.
log_file = None
if "snakemake" in globals() and getattr(snakemake, "log", None):
# snakemake.log may be a list-like container
try:
log_file = snakemake.log[0]
except Exception:
log_file = None

logging.basicConfig(
filename=log_file,
level=logging.INFO,
format="%(levelname)s:%(name)s:%(message)s",
)

year = int(snakemake.wildcards.planning_horizons)
base_url = str(snakemake.params.costs_url).rstrip("/")

min_year = int(getattr(snakemake.params, "min_year", 2020))
max_year = int(getattr(snakemake.params, "max_year", 2050))
step = int(getattr(snakemake.params, "step", 5))

before, after = _anchor_years(year, min_year=min_year, max_year=max_year, step=step)

out_path = Path(snakemake.output.costs)
out_path.parent.mkdir(parents=True, exist_ok=True)

if before == after:
url = f"{base_url}/costs_{before}.csv"
_download(url, out_path)
logger.info("Wrote %s (no interpolation needed).", out_path)
else:
with tempfile.TemporaryDirectory() as tmp:
tmp = Path(tmp)
f0 = tmp / f"costs_{before}.csv"
f1 = tmp / f"costs_{after}.csv"

_download(f"{base_url}/costs_{before}.csv", f0)
_download(f"{base_url}/costs_{after}.csv", f1)

_interpolate_costs_csv(f0, f1, before, after, year, out_path)

logger.info("Wrote %s (interpolated between %s and %s).", out_path, before, after)