|
| 1 | +import pandas as pd |
| 2 | +import geopandas as gpd |
| 3 | +import shapely |
| 4 | +import math |
| 5 | +import numpy as np |
| 6 | +from shapely.geometry import LineString, Point |
| 7 | +from . import parser |
| 8 | + |
| 9 | + |
| 10 | +def calculate_angle(p1, p2): |
| 11 | + """Calculate the angle (in degrees) of the line segment defined by two points (p1, p2).""" |
| 12 | + dx = p2[0] - p1[0] |
| 13 | + dy = p2[1] - p1[1] |
| 14 | + angle = math.degrees(math.atan2(dy, dx)) |
| 15 | + return angle |
| 16 | + |
| 17 | + |
| 18 | +def find_segment_and_angle(line, distance, normalized=True): |
| 19 | + """Find the segment of the line where the distance lies and calculate the angle of that segment.""" |
| 20 | + coords = list(line.coords) # Extract coordinates from the LineString |
| 21 | + total_distance = 0 |
| 22 | + if normalized: |
| 23 | + distance = distance * line.length |
| 24 | + for i in range(len(coords) - 1): |
| 25 | + p1 = coords[i] |
| 26 | + p2 = coords[i + 1] |
| 27 | + |
| 28 | + # Calculate the length of the current segment |
| 29 | + segment_length = Point(p1).distance(Point(p2)) |
| 30 | + |
| 31 | + if ( |
| 32 | + total_distance + segment_length >= distance |
| 33 | + ): # Check if the distance lies within this segment |
| 34 | + angle = calculate_angle(p1, p2) |
| 35 | + return { |
| 36 | + "segment_start": p1, |
| 37 | + "segment_end": p2, |
| 38 | + "angle": angle, |
| 39 | + "distance_within_segment": distance - total_distance, |
| 40 | + } |
| 41 | + |
| 42 | + total_distance += segment_length |
| 43 | + |
| 44 | + return None # If the distance is longer than the length of the line |
| 45 | + |
| 46 | + |
| 47 | +import click |
| 48 | + |
| 49 | + |
| 50 | +@click.command() |
| 51 | +@click.argument("channel_line_geojson_file") |
| 52 | +@click.argument("hydro_echo_file") |
| 53 | +@click.option("--channel_orient_file", default="channel_orient.inp") |
| 54 | +def generate_channel_orientation( |
| 55 | + channel_line_geojson_file, hydro_echo_file, channel_orient_file="channel_orient.inp" |
| 56 | +): |
| 57 | + from pydsm.analysis import dsm2study |
| 58 | + |
| 59 | + hydro_tables = dsm2study.load_echo_file(hydro_echo_file) |
| 60 | + channel_lines = dsm2study.load_dsm2_flowline_shapefile(channel_line_geojson_file) |
| 61 | + channels = dsm2study.join_channels_info_with_dsm2_channel_line( |
| 62 | + channel_lines, hydro_tables |
| 63 | + ) |
| 64 | + deltax = hydro_tables["SCALAR"].query('NAME=="deltax"').iloc[0].VALUE |
| 65 | + deltax = float(deltax) |
| 66 | + |
| 67 | + rows = [] |
| 68 | + for i, row in channels.iterrows(): |
| 69 | + line = row["geometry"] |
| 70 | + line = shapely.ops.linemerge(line) |
| 71 | + chan_length = float(row["LENGTH"]) |
| 72 | + nsegments = math.ceil(chan_length / deltax) |
| 73 | + for norm_distance in np.linspace(0, 1, nsegments + 1): |
| 74 | + result = find_segment_and_angle(line, norm_distance, normalized=True) |
| 75 | + rows.append([row["CHAN_NO"], norm_distance, result["angle"]]) |
| 76 | + |
| 77 | + df = pd.DataFrame(rows, columns=["CHAN_NO", "NORM_DISTANCE", "ANGLE"]) |
| 78 | + df["NORM_DISTANCE"] = round(df["NORM_DISTANCE"], 3) |
| 79 | + df["ANGLE"] = round(df["ANGLE"], 2) |
| 80 | + with open(channel_orient_file, "w") as f: |
| 81 | + parser.write(f, {"CHANNEL_ORIENTATION": df}) |
0 commit comments