-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprepare_co2stop.py
More file actions
371 lines (307 loc) · 12.1 KB
/
prepare_co2stop.py
File metadata and controls
371 lines (307 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
"""Prepare the CO2Stop data so it fits our schemas.
Dataset-wide imputations happen here.
"""
import re
import sys
from typing import TYPE_CHECKING, Any
from warnings import warn
import _schemas
import geopandas as gpd
import numpy as np
import pandas as pd
from _utils import CDR_GROUP, CDRGroup, get_padded_bounds
from cmap import Colormap
from matplotlib import pyplot as plt
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from pyproj import CRS
if TYPE_CHECKING:
snakemake: Any
# Translate readable config names to CO2Stop columns
MINIMUMS_CO2STOP = {
"porosity_ratio": "POROSITY_MEAN",
"depth_m": "DEPTH_MEAN",
"reservoir_thickness_m": "GROSS_THICK_MEAN",
"seal_thickness_m": "MIN_SEAL_THICK",
"permeability_md": "PERM_MEAN",
}
def _surface_issues(df: pd.DataFrame) -> pd.Series:
"""Detect surface issues per row.
Columns used:
- SURF_ISSUES: Only empty or 'None' cases are considered safe.
- REMARKS: additional remarks by CO2Stop authors (e.g., land ownership issues).
Args:
df (pd.DataFrame): dataframe with CO2Stop data.
Returns:
pd.Series: True if issue is present. False otherwise.
"""
issues = (
df["SURF_ISSUES"]
.fillna("")
.astype(str)
.str.strip()
.str.lower()
.replace({"none": np.nan, "": np.nan})
)
unsafe = issues.notna()
problems = ["surface issue ="]
pattern = "|".join(re.escape(i) for i in problems)
flagged_in_remarks = (
df["REMARKS_DATA"]
.fillna("")
.astype(str)
.str.lower()
.str.contains(pattern, regex=True)
)
unsafe = unsafe | flagged_in_remarks
return unsafe
def _subsurface_interference_issues(df: pd.DataFrame) -> pd.Series:
"""Detect subsurface issues per row.
Columns used:
- SUBSURF_INTERF: Only empty or 'No' cases are considered safe.
- REMARKS: additional remarks by CO2Stop authors (e.g., groundwater source).
Args:
df (pd.DataFrame): dataframe with CO2Stop data.
Returns:
pd.Series: True if issue is present. False otherwise.
"""
subsurface_interf = (
df["SUBSURF_INTERF"]
.fillna("")
.astype(str)
.str.strip()
.str.lower()
.replace({"no": np.nan, "": np.nan})
)
unsafe = subsurface_interf.notna()
problems = ["subsurface issue =", "geothermal", "groundwater", "potable water"]
pattern = "|".join(re.escape(i) for i in problems)
flagged_in_remarks = (
df["REMARKS_DATA"]
.fillna("")
.astype(str)
.str.lower()
.str.contains(pattern, regex=True)
)
unsafe |= flagged_in_remarks
return unsafe
def _artificial_polygon_issues(df: pd.DataFrame) -> pd.Series:
"""Detect cases where the polygon is artificial.
Uses:
- REMARKS (from polygons)
- REMARKS_DATA (from CSV merge, if present)
"""
checks = [
(
"REMARKS",
[
"polygon does not represent",
"polygon in no way represents",
"arbitrary storage unit polygon",
],
),
(
"REMARKS_DATA",
[
"fictive saline aquifer",
"polygon not available",
"aproximated polygon",
"polygon aproximated",
],
),
]
fake = pd.Series(False, index=df.index)
for col, problems in checks:
pattern = "|".join(re.escape(p) for p in problems)
flagged = (
df[col].fillna("").astype(str).str.contains(pattern, case=False, regex=True)
)
fake |= flagged
return fake
def _ambiguous_duplicate_issues(df: pd.DataFrame, id_col: str) -> pd.Series:
"""Detect ambiguous cases with repeated IDs.
All duplicates are eliminated because attribution is uncertain.
"""
return df[id_col].duplicated(keep=False)
def _removal_warning(mask: pd.Series, name: str, cnf_value) -> None:
"""Issue a warning about the number of 'dropped' elements."""
if mask.any():
drops = mask.value_counts()[True] / len(mask)
warn(f"{name!r}={cnf_value} resulted in {drops:.1%} drops.")
def identify_removals(
df: pd.DataFrame, remarks: dict, minimums: dict[str, float], id_col: str
) -> pd.Series:
"""Get a mask highlighting rows with problematic qualities as `True`.
Args:
df (pd.DataFrame): harmonised CO2Stop dataframe.
remarks (dict): specifies removal settings that rely on CO2Stop remarks.
minimums (dict[str, float]): numeric minimums for specific columns.
id_col (str): column holding a unique per-row identifier.
Duplicates will be removed.
Returns:
pd.Series: resulting mask.
"""
mask = pd.Series(False, index=df.index)
# Try to catch problematic remarks
for remark, setting in remarks.items():
if setting:
match remark:
case "surface_issues":
dropped = _surface_issues(df)
case "subsurface_issues":
dropped = _subsurface_interference_issues(df)
case "artificial_polygons":
dropped = _artificial_polygon_issues(df)
case _:
raise KeyError(f"{remark!r} is not valid.")
_removal_warning(dropped, remark, True)
mask |= dropped
# Not optional: these are two small shapes, and skipping it breaks schema validation.
dropped = _ambiguous_duplicate_issues(df, id_col)
_removal_warning(dropped, "ambiguous duplicates", "obligatory")
mask |= dropped
# Mark rows that violate minimum values
for config_name, co2stop_col in MINIMUMS_CO2STOP.items():
min_cnf = minimums[config_name]
if min_cnf > 0:
dropped = df[co2stop_col] < min_cnf
_removal_warning(dropped, config_name, min_cnf)
mask |= dropped
return mask
def estimate_storage_scenarios(
df: pd.DataFrame, cdr_group: CDRGroup, *, lower=float, upper=float("inf")
) -> pd.DataFrame:
"""Get minimum, mean and maximum CO2 capacity per storage unit.
- Data for minimum, mean, and maximum CO2 capacity is taken from the primary column.
- If no primary data is present, the fallback column will be used.
- If at least one value is present (i.e., only mean is given), other categories will
be filled with it.
Smaller values are given priority over larger ones.
E.g.: if min and max are present, mean will be filled with min first.
- Corrections are applied to ensure monotonic behaviour per row
(i.e., `conservative <= neutral <= optimistic`).
"""
if len(cdr_group.primary) != 3:
raise ValueError(
"`primary_cols` must have length 3, ordered as min < mean < max."
)
out = pd.DataFrame(index=df.index)
for name, col in cdr_group.primary.items():
s = df[col].replace(0, np.nan)
s = s.fillna(df[cdr_group.fallback[name]].replace(0, np.nan))
out[name] = s
# Bidirectional propagation within each row
out = out.ffill(axis="columns").bfill(axis="columns")
# Enforce lo <= mid <= hi (preserving NaNs)
lo, mid, hi = list(cdr_group.primary.keys())
m = out[lo].notna() & out[mid].notna() & (out[lo] > out[mid])
out[lo] = out[lo].where(~m, out[mid])
m = out[hi].notna() & out[mid].notna() & (out[hi] < out[mid])
out[hi] = out[hi].where(~m, out[mid])
# Set bounds
out = out.clip(upper=upper)
out = out.mask(out < lower, 0)
return out
def harmonise_stopco2_dataset(
map_file: str, data_file: str, id_col: str, crs: str | int, suffix: str = "_DATA"
) -> gpd.GeoDataFrame:
"""Open and combine paired CO2Stop datasets."""
gdf = gpd.read_file(map_file).rename({"id": id_col}, axis="columns").to_crs(crs)
gdf.geometry = gdf.geometry.force_2d().make_valid()
return gdf.merge(
pd.read_csv(data_file), how="inner", on=id_col, suffixes=("", suffix)
)
def plot_kept_polygons(
countries: gpd.GeoDataFrame,
all_polygons: gpd.GeoDataFrame,
kept_polygons: pd.Series,
*,
cmap: str = "tol:high_contrast_alt_r",
) -> tuple[Figure, Axes]:
"""Show a visual summary of kept/removed cases."""
fig, ax = plt.subplots(layout="constrained")
countries.plot(color="grey", alpha=0.5, ax=ax)
countries.boundary.plot(color="black", lw=0.5, ax=ax)
polygons = all_polygons.copy()
polygons["kept"] = polygons[kept_polygons.name].isin(kept_polygons)
polygons.plot("kept", legend=True, ax=ax, cmap=Colormap(cmap).to_mpl())
x_lim, y_lim = get_padded_bounds(all_polygons, pad_frac=0.02)
ax.set_xlim(*x_lim)
ax.set_ylim(*y_lim)
ax.set_axis_off()
return fig, ax
def plot_scenarios(data: pd.DataFrame) -> tuple[Figure, list[Axes]]:
"""Show a quick comparison between each scenario."""
axes: list[Axes]
fig, axes = plt.subplots(1, 2, figsize=(8, 4), layout="constrained")
scen_names = data.columns.str.split("_", n=1).str[0].tolist()
axes[0].bar(x=scen_names, height=data.sum().values)
axes[0].set_title("Aggregate")
tmp = data.T.copy()
tmp.index = scen_names
tmp.plot(ax=axes[1], legend=False, color="grey", alpha=0.5, marker="o")
axes[1].set_title("Per polygon")
for ax in axes:
ax.set_ylabel("$MtCO_2$")
ax.tick_params(axis="x", which="both", length=0)
return fig, axes
def main() -> None:
"""Main snakemake process."""
geo_crs = snakemake.params.geo_crs
if not CRS.from_user_input(geo_crs).is_geographic:
raise ValueError(f"Expected geographic CRS, got {geo_crs!r}.")
dataset_name = snakemake.params.dataset
cdr_group = snakemake.params.cdr_group
config = snakemake.params.cdr_group_config
match dataset_name:
case "storage_units":
data_id = "STORAGE_UNIT_ID"
id_columns = {data_id: "storage_unit_id"}
validation_method = _schemas.StorageUnitsSchema.validate
case "traps":
data_id = "TRAP_ID"
id_columns = {data_id: "trap_id", "STORAGE_UNIT_ID": "storage_unit_id"}
validation_method = _schemas.TrapsSchema.validate
case _:
raise ValueError(f"Invalid dataset requested: {dataset_name!r}.")
dataset = harmonise_stopco2_dataset(
snakemake.input.polygons, snakemake.input.table, data_id, geo_crs
)
# Keep a copy of the full dataset, to help display what has been dropped.
all_polygons = dataset[[data_id, "geometry"]].copy()
# Identify and remove 'bad apples', depending on the configuration.
mask_issues = identify_removals(
dataset, config["remove_remarks"], config["minimums"], data_id
)
dataset = dataset[~mask_issues]
# Estimate storage capacity, keeping only rows with tangible values.
capacity_scenarios = estimate_storage_scenarios(
dataset, CDR_GROUP[cdr_group], **config["bounds_mtco2"]
)
capacity_cols = capacity_scenarios.columns
dataset = dataset.merge(
capacity_scenarios, how="inner", right_index=True, left_index=True
)
dataset = dataset.dropna(subset=capacity_cols, how="all")
# Plot omissions
countries = gpd.read_file(snakemake.input.countries).to_crs(geo_crs)
fig, ax = plot_kept_polygons(countries, all_polygons, dataset[data_id])
ax.set_title(f"Kept polygons for '{dataset_name}:{cdr_group}'.")
fig.savefig(snakemake.output.plot_kept, dpi=300, bbox_inches="tight")
# Plot scenarios
fig, _ = plot_scenarios(dataset[capacity_cols])
fig.suptitle(
f"Full CO2Stop dataset scenario comparison for '{dataset_name}:{cdr_group}'"
)
fig.savefig(snakemake.output.plot_scenarios, dpi=300, bbox_inches="tight")
# Remove unnecessary columns, add extra metadata, validate, save
final_cols = list(id_columns.keys()) + capacity_cols.to_list() + ["geometry"]
dataset = dataset[final_cols].copy()
dataset = dataset.rename(id_columns, axis="columns").reset_index(drop=True)
dataset["dataset"] = dataset_name
dataset["cdr_group"] = cdr_group
dataset = validation_method(dataset)
dataset.to_parquet(snakemake.output.mtco2)
if __name__ == "__main__":
sys.stderr = open(snakemake.log[0], "w")
main()