Skip to content

Commit 48f9476

Browse files
committed
feat: add xsection module for adjusting channel cross-section properties
1 parent 753def8 commit 48f9476

3 files changed

Lines changed: 296 additions & 5 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
pydsm/_version.py # ignore auto generated file
12
#DSS Files and catalogs
23
**/*.ds[cdk]
34

pydsm/hydroh5.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,6 @@ def create_catalog(self):
179179
self.filename, dfc, "area", "ft^2", updown=False
180180
)
181181
cat_stage = dsm2h5.create_catalog_entry(self.filename, dfc, "stage", "ft")
182-
# %%
183-
# %%
184182
dfr = self.get_reservoirs()
185183
cat_res_height = dsm2h5.create_catalog_entry(
186184
self.filename,
@@ -191,7 +189,6 @@ def create_catalog(self):
191189
prefix="RES_",
192190
id_column="name",
193191
)
194-
# %%
195192
dfrn = self.get_reservoir_node_connections()
196193
self.get_input_table("/hydro/input/reservoir_connection")
197194
dfrn["id"] = (
@@ -220,7 +217,6 @@ def create_catalog(self):
220217
prefix="QEXT_",
221218
id_column="name",
222219
)
223-
# %%
224220
dft = self.get_transfer_names()
225221
cat_transfer_flow = dsm2h5.create_catalog_entry(
226222
self.filename,
@@ -231,7 +227,7 @@ def create_catalog(self):
231227
id_column=0,
232228
prefix="TRANSFER_",
233229
)
234-
# %%
230+
235231
catalog = pd.concat(
236232
[
237233
cat_flow,

pydsm/input/xsection.py

Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
import numpy as np
2+
import pandas as pd
3+
from io import StringIO
4+
from pathlib import Path
5+
from .parser import read_input, write_input
6+
7+
8+
def adjust_xsection_wetted_perimeter(df, channel_no, elev_above, pct_change):
9+
"""
10+
Adjust the wetted perimeter of a channel cross-section above the specified elevation range
11+
Simply increases the wetted perimeter by a percentage change at the layers above the elevation range.
12+
13+
Parameters:
14+
channel_no : int
15+
Channel number to adjust
16+
elev_above : float
17+
Elevation above which to apply wetted perimeter adjustment
18+
pct_change : float
19+
Percentage change to apply to wetted perimeter (e.g., 0.1 for 10% increase)
20+
21+
Returns:
22+
--------
23+
pandas.DataFrame or None
24+
Modified cross-section data if output_path is None, otherwise None
25+
"""
26+
# Filter by channel number
27+
all_channel_data = df[df["CHAN_NO"] == channel_no].copy()
28+
if all_channel_data.empty:
29+
raise ValueError(f"Channel number {channel_no} not found in the data")
30+
31+
# channels with same channel number may have different distances, so loop over all distances available
32+
distances = all_channel_data["DIST"].unique()
33+
for dist in distances:
34+
channel_data = all_channel_data[all_channel_data["DIST"] == dist].copy()
35+
if channel_data.empty:
36+
continue
37+
38+
# Sort by elevation
39+
channel_data = channel_data.sort_values(by="ELEV")
40+
41+
# Find rows to adjust (at or above the specified elevation)
42+
if elev_above > channel_data["ELEV"].max():
43+
raise ValueError(
44+
f"Elevation {elev_above} is above the maximum elevation in the data, channel {channel_no}, distance {dist}, max elevation {channel_data['ELEV'].max()}"
45+
)
46+
# Find the rows just below and above the adjustment elevation
47+
above_idx = channel_data[channel_data["ELEV"] > elev_above].index[0]
48+
49+
# Interpolate width at the adjustment elevation
50+
above_indices = channel_data[channel_data["ELEV"] > elev_above].index
51+
# Adjust wetted perimeter for elevations above the range
52+
for idx in above_indices:
53+
current_wp = channel_data.loc[idx, "WET_PERIM"]
54+
channel_data.loc[idx, "WET_PERIM"] = current_wp * (1 + pct_change)
55+
all_channel_data.update(channel_data)
56+
# Update the original dataframe with the modified channel data
57+
df.update(all_channel_data)
58+
59+
return df
60+
61+
62+
def adjust_xsection_width(df, channel_no, elev_adjust, width_pct_change):
63+
"""
64+
Adjust the width of a channel cross-section at a specific elevation and recalculate area and
65+
wetted perimeter for all elevations above the adjustment point.
66+
67+
Parameters:
68+
channel_no : int
69+
Channel number to adjust
70+
elev_adjust : float
71+
Elevation at which to apply width adjustment
72+
width_pct_change : float
73+
Percentage change to apply to width (e.g., 0.1 for 10% increase)
74+
75+
Returns:
76+
--------
77+
pandas.DataFrame or None
78+
Modified cross-section data if output_path is None, otherwise None
79+
"""
80+
# Filter by channel number
81+
all_channel_data = df[df["CHAN_NO"] == channel_no].copy()
82+
if all_channel_data.empty:
83+
raise ValueError(f"Channel number {channel_no} not found in the data")
84+
# channels with same channel number may have different distances, so loop over all distances available
85+
distances = all_channel_data["DIST"].unique()
86+
for dist in distances:
87+
channel_data = all_channel_data[all_channel_data["DIST"] == dist].copy()
88+
if channel_data.empty:
89+
continue
90+
# Sort by elevation
91+
channel_data = channel_data.sort_values(by="ELEV")
92+
93+
# Find rows to adjust (at or above the specified elevation)
94+
if (
95+
elev_adjust < channel_data["ELEV"].min()
96+
or elev_adjust > channel_data["ELEV"].max()
97+
):
98+
raise ValueError(
99+
f"Adjustment elevation {elev_adjust} is outside the range of the data"
100+
)
101+
102+
# If the exact elevation is not in the data, we need to interpolate
103+
exact_match = channel_data[channel_data["ELEV"] == elev_adjust]
104+
if exact_match.empty:
105+
# Find the rows just below and above the adjustment elevation
106+
below_idx = channel_data[channel_data["ELEV"] < elev_adjust].index[-1]
107+
above_idx = channel_data[channel_data["ELEV"] > elev_adjust].index[0]
108+
109+
# Interpolate width at the adjustment elevation
110+
below_elev = channel_data.loc[below_idx, "ELEV"]
111+
above_elev = channel_data.loc[above_idx, "ELEV"]
112+
below_width = channel_data.loc[below_idx, "WIDTH"]
113+
above_width = channel_data.loc[above_idx, "WIDTH"]
114+
max_elev = channel_data["ELEV"].max()
115+
# Linear interpolation
116+
ratio = (elev_adjust - below_elev) / (above_elev - below_elev)
117+
interp_width = below_width + ratio * (above_width - below_width)
118+
119+
# Calculate adjusted width
120+
adjusted_width = interp_width * (1 + width_pct_change)
121+
122+
# Calculate width adjustment factor
123+
width_factor = adjusted_width / interp_width
124+
125+
# Apply adjustment to all elevations above the adjustment point
126+
adjust_indices = channel_data[channel_data["ELEV"] >= elev_adjust].index
127+
128+
# Interpolate adjustment factor for each elevation above
129+
for idx in adjust_indices:
130+
current_elev = channel_data.loc[idx, "ELEV"]
131+
# Linear scaling of the adjustment factor based on elevation
132+
if current_elev == elev_adjust:
133+
# Direct adjustment at the above elevation
134+
factor = width_factor
135+
else:
136+
# Scale the elev_ratio from 0 to 1
137+
elev_ratio = (current_elev - elev_adjust) / (max_elev - elev_adjust)
138+
# Gradually reduce adjustment effect as we move up
139+
factor = 1 + (width_factor - 1) * max(0, (1 - 0.5 * elev_ratio))
140+
141+
channel_data.loc[idx, "WIDTH"] *= factor
142+
else:
143+
# Direct adjustment at the exact elevation
144+
adjust_idx = exact_match.index[0]
145+
channel_data.loc[adjust_idx, "WIDTH"] *= 1 + width_pct_change
146+
147+
# Apply to all elevations above
148+
adjust_indices = channel_data[channel_data["ELEV"] > elev_adjust].index
149+
for idx in adjust_indices:
150+
channel_data.loc[idx, "WIDTH"] *= 1 + width_pct_change
151+
152+
# Recalculate areas based on trapezoid formula
153+
elevs = channel_data["ELEV"].values
154+
widths = channel_data["WIDTH"].values
155+
156+
# Skip the first row since we need a previous row to calculate area
157+
for i in range(1, len(channel_data)):
158+
h = elevs[i] - elevs[i - 1] # Height difference
159+
avg_width = (widths[i] + widths[i - 1]) / 2 # Average width for trapezoid
160+
channel_data.iloc[i, channel_data.columns.get_loc("AREA")] = (
161+
channel_data.iloc[i - 1, channel_data.columns.get_loc("AREA")]
162+
+ h * avg_width
163+
)
164+
165+
# Recalculate wetted perimeter using pythagoras for each segment
166+
for i in range(1, len(channel_data)):
167+
h = elevs[i] - elevs[i - 1] # Height difference
168+
w_diff = abs(widths[i] - widths[i - 1]) / 2 # Half-width difference
169+
segment_length = np.sqrt(h**2 + w_diff**2) * 2 # Both sides
170+
171+
# Update wetted perimeter - this is approximate and could be refined
172+
prev_wet_perim = channel_data.iloc[
173+
i - 1, channel_data.columns.get_loc("WET_PERIM")
174+
]
175+
new_wet_perim = prev_wet_perim + segment_length
176+
channel_data.iloc[i, channel_data.columns.get_loc("WET_PERIM")] = (
177+
new_wet_perim
178+
)
179+
# final check to ensure that area and widths and wetted perimeters are increasing with elevation or set to previous elelvation value
180+
for i in range(1, len(channel_data)):
181+
if (
182+
channel_data.iloc[i, channel_data.columns.get_loc("ELEV")]
183+
< channel_data.iloc[i - 1, channel_data.columns.get_loc("ELEV")]
184+
):
185+
raise ValueError("Elevation values must be strictly increasing.")
186+
if (
187+
channel_data.iloc[i, channel_data.columns.get_loc("AREA")]
188+
< channel_data.iloc[i - 1, channel_data.columns.get_loc("AREA")]
189+
):
190+
channel_data.iloc[i, channel_data.columns.get_loc("AREA")] = (
191+
channel_data.iloc[i - 1, channel_data.columns.get_loc("AREA")]
192+
)
193+
if (
194+
channel_data.iloc[i, channel_data.columns.get_loc("WIDTH")]
195+
< channel_data.iloc[i - 1, channel_data.columns.get_loc("WIDTH")]
196+
):
197+
channel_data.iloc[i, channel_data.columns.get_loc("WIDTH")] = (
198+
channel_data.iloc[i - 1, channel_data.columns.get_loc("WIDTH")]
199+
)
200+
if (
201+
channel_data.iloc[i, channel_data.columns.get_loc("WET_PERIM")]
202+
< channel_data.iloc[i - 1, channel_data.columns.get_loc("WET_PERIM")]
203+
):
204+
channel_data.iloc[i, channel_data.columns.get_loc("WET_PERIM")] = (
205+
channel_data.iloc[i - 1, channel_data.columns.get_loc("WET_PERIM")]
206+
)
207+
all_channel_data.update(channel_data)
208+
# Update the original dataframe with the modified channel data
209+
df.update(all_channel_data)
210+
211+
return df
212+
213+
214+
def adjust_xsection_bottom_elevation(df, channel_no, minimum_elevation):
215+
"""
216+
Adjust the bottom elevation of a channel cross-section to a specified minimum elevation.
217+
This will raise the bottom elevation for all layers in the specified channel.
218+
219+
Parameters:
220+
channel_no : int
221+
Channel number to adjust
222+
minimum_elevation : float
223+
Minimum elevation to set for the channel
224+
225+
Returns:
226+
--------
227+
pandas.DataFrame or None
228+
Modified cross-section data if output_path is None, otherwise None
229+
"""
230+
# Filter by channel number
231+
all_channel_data = df[df["CHAN_NO"] == channel_no].copy()
232+
if all_channel_data.empty:
233+
raise ValueError(f"Channel number {channel_no} not found in the data")
234+
235+
# Adjust all elevations below the minimum elevation and set to zero all areas and wetted perimeters
236+
elev_below_min = all_channel_data["ELEV"] < minimum_elevation
237+
if not elev_below_min.any():
238+
print(f"No elevations below {minimum_elevation} found for channel {channel_no}")
239+
return df
240+
all_channel_data.loc[elev_below_min, "AREA"] = 0.0
241+
all_channel_data.loc[elev_below_min, "WET_PERIM"] = 0.0
242+
all_channel_data.loc[elev_below_min, "WIDTH"] = 0.0
243+
elev_above_min = all_channel_data["ELEV"] >= minimum_elevation
244+
# recalculate areas and wetted perimeters for elevations above the minimum elevation
245+
for idx in all_channel_data[elev_above_min].index:
246+
# recalculate areas based on trapezoid formula
247+
if idx == 0:
248+
continue
249+
h = all_channel_data.loc[idx, "ELEV"] - all_channel_data.loc[idx - 1, "ELEV"]
250+
avg_width = (
251+
all_channel_data.loc[idx, "WIDTH"] + all_channel_data.loc[idx - 1, "WIDTH"]
252+
) / 2
253+
all_channel_data.loc[idx, "ELEV"] = max(
254+
all_channel_data.loc[idx, "ELEV"], minimum_elevation
255+
)
256+
all_channel_data.loc[idx, "AREA"] = (
257+
all_channel_data.loc[idx - 1, "AREA"] + h * avg_width
258+
)
259+
# recalculate wetted perimeter using pythagoras for each segment
260+
w_diff = (
261+
all_channel_data.loc[idx, "WIDTH"] - all_channel_data.loc[idx - 1, "WIDTH"]
262+
) / 2
263+
segment_length = np.sqrt(h**2 + w_diff**2) * 2
264+
prev_wet_perim = all_channel_data.loc[idx - 1, "WET_PERIM"]
265+
new_wet_perim = prev_wet_perim + segment_length
266+
all_channel_data.loc[idx, "WET_PERIM"] = new_wet_perim
267+
# Update the original dataframe with the modified channel data
268+
df.update(all_channel_data)
269+
270+
return df
271+
272+
273+
# Example usage:
274+
if __name__ == "__main__":
275+
input_file = "tests/data/test_xsection_data_all_dist.inp"
276+
output_file = "tests/data/test_xsection_data_all_dist_modified.inp"
277+
278+
dflist = read_input(input_file)
279+
df = dflist["XSECT_LAYER"]
280+
# Adjust channel 130 at elevation 1.5 by increasing width 10%
281+
dfadj = adjust_xsection_width(
282+
df,
283+
channel_no=124,
284+
elev_adjust=4,
285+
width_pct_change=0.1,
286+
)
287+
dflist["XSECT_LAYER"] = dfadj
288+
# Write the modified data back to a file
289+
write_input(output_file, dflist, append=False)
290+
dfwpadj = adjust_xsection_wetted_perimeter(
291+
dfadj, channel_no=124, elev_range=(2, 5), pct_change=0.2
292+
)
293+
dflist["XSECT_LAYER"] = dfwpadj
294+
write_input(output_file, dflist, append=False)

0 commit comments

Comments
 (0)