|
| 1 | +# SPDX-FileCopyrightText: 2025-2026 Contributors to the MeteoForge project |
| 2 | +# SPDX-License-Identifier: MPL-2.0 |
| 3 | + |
| 4 | +"""Hybrid location module: easy use, CRS-flexible, fuzzy equality, and robust containment.""" |
| 5 | + |
| 6 | +from collections.abc import Iterable |
| 7 | +from typing import Any |
| 8 | + |
| 9 | +from pyproj import CRS, Transformer |
| 10 | +from shapely.geometry import Point, Polygon |
| 11 | + |
| 12 | +from meteoforge.spatial_temporal.validators import validate_mf_location |
| 13 | + |
| 14 | + |
| 15 | +def _crs_to_obj(crs_like: int | str | CRS) -> CRS: |
| 16 | + """Convert an int, str, or CRS to a CRS object.""" |
| 17 | + if isinstance(crs_like, CRS): |
| 18 | + return crs_like |
| 19 | + return CRS.from_user_input(value=crs_like) |
| 20 | + |
| 21 | + |
| 22 | +def _transform_point(x: float, y: float, from_crs: CRS, to_crs: CRS) -> tuple[float, float]: |
| 23 | + """Transform a point (x, y) from one CRS to another.""" |
| 24 | + if from_crs == to_crs: |
| 25 | + return x, y |
| 26 | + transformer = Transformer.from_crs(from_crs, to_crs, always_xy=True) |
| 27 | + x2, y2 = transformer.transform(x, y) |
| 28 | + return float(x2), float(y2) |
| 29 | + |
| 30 | + |
| 31 | +class MFLocation: |
| 32 | + """A geographic location with CRS, supporting transformation and fuzzy equality.""" |
| 33 | + |
| 34 | + def __init__(self, x: float, y: float, crs: int | str | CRS = 4326): |
| 35 | + """Create a location with coordinates (x, y) and a CRS.""" |
| 36 | + self.crs = _crs_to_obj(crs) |
| 37 | + self.x = float(x) |
| 38 | + self.y = float(y) |
| 39 | + self.point = Point(self.x, self.y) |
| 40 | + validate_mf_location(self.x, self.y, self.crs) |
| 41 | + |
| 42 | + def to(self, target_crs: int | str | CRS) -> "MFLocation": |
| 43 | + """Return a new MFLocation transformed to the target CRS.""" |
| 44 | + target_crs_obj = _crs_to_obj(target_crs) |
| 45 | + x2, y2 = _transform_point(self.x, self.y, self.crs, target_crs_obj) |
| 46 | + return MFLocation(x2, y2, target_crs_obj) |
| 47 | + |
| 48 | + def equals(self, other: "MFLocation", tol: float = 1e-6, crs: int | str | CRS = 4326) -> bool: |
| 49 | + """Check if two locations are close enough in a common CRS.""" |
| 50 | + # Compare in a common CRS (default: WGS84) |
| 51 | + crs_obj = _crs_to_obj(crs) |
| 52 | + a = self.to(crs_obj) |
| 53 | + b = other.to(crs_obj) |
| 54 | + return abs(a.x - b.x) < tol and abs(a.y - b.y) < tol |
| 55 | + |
| 56 | + def __eq__(self, other: Any) -> bool: |
| 57 | + """Check equality with another MFLocation, using fuzzy equality in a common CRS.""" |
| 58 | + if not isinstance(other, MFLocation): |
| 59 | + return NotImplemented |
| 60 | + return self.equals(other) |
| 61 | + |
| 62 | + def __repr__(self) -> str: |
| 63 | + """Get a string representation of the location.""" |
| 64 | + return f"MFLocation(x={self.x}, y={self.y}, crs={self.crs.to_string()})" |
| 65 | + |
| 66 | + |
| 67 | +class MFLocationList: |
| 68 | + """A class representing a list of MFLocation objects, with fuzzy containment and CRS handling.""" |
| 69 | + |
| 70 | + def __init__(self, locations: Iterable[MFLocation] | None = None, crs: int | str | CRS = 4326): |
| 71 | + """Create a list of locations, converting all to the given CRS.""" |
| 72 | + self.crs = _crs_to_obj(crs) |
| 73 | + self.locations: list[MFLocation] = [] |
| 74 | + if locations: |
| 75 | + for loc in locations: |
| 76 | + self.append(loc) |
| 77 | + |
| 78 | + def append(self, location: Any) -> None: |
| 79 | + """Add a location to the list, converting to the list's CRS if needed.""" |
| 80 | + if not isinstance(location, MFLocation): |
| 81 | + raise TypeError("Only MFLocation instances can be added.") |
| 82 | + # Accept any CRS, but store as self.crs |
| 83 | + loc_in_crs = location.to(self.crs) |
| 84 | + self.locations.append(loc_in_crs) |
| 85 | + |
| 86 | + def __getitem__(self, idx: int) -> MFLocation: |
| 87 | + """Get a location by index.""" |
| 88 | + return self.locations[idx] |
| 89 | + |
| 90 | + def __setitem__(self, idx: int, value: MFLocation) -> None: |
| 91 | + """Set a location by index, converting to the list's CRS if needed.""" |
| 92 | + self.locations[idx] = value.to(self.crs) |
| 93 | + |
| 94 | + def __delitem__(self, idx: int) -> None: |
| 95 | + """Delete a location by index.""" |
| 96 | + del self.locations[idx] |
| 97 | + |
| 98 | + def __len__(self) -> int: |
| 99 | + """Return the number of locations in the list.""" |
| 100 | + return len(self.locations) |
| 101 | + |
| 102 | + def __contains__(self, item: MFLocation) -> bool: |
| 103 | + """Check if a location is 'fuzzily' in the list, CRS-aware.""" |
| 104 | + # Fuzzy containment: is any location in the list 'close enough' to item? |
| 105 | + return any(loc.equals(item) for loc in self.locations) |
| 106 | + |
| 107 | + def find_nearby(self, item: MFLocation, tol: float = 1e-6) -> MFLocation | None: |
| 108 | + """Return the first location in the list close to the given item, or None.""" |
| 109 | + for loc in self.locations: |
| 110 | + if loc.equals(item, tol=tol): |
| 111 | + return loc |
| 112 | + return None |
| 113 | + |
| 114 | + def __repr__(self) -> str: |
| 115 | + """Return a string representation of the MFLocationList instance.""" |
| 116 | + return f"MFLocationList({self.locations}, crs={self.crs.to_string()})" |
| 117 | + |
| 118 | + |
| 119 | +class MFLocationVector: |
| 120 | + """A vector (polygon) of MFLocation objects, with fuzzy containment and CRS handling.""" |
| 121 | + |
| 122 | + def __init__(self, locations: Iterable[MFLocation] | None = None, crs: int | str | CRS = 4326): |
| 123 | + """Create a vector (polygon) from locations, converting all to the given CRS.""" |
| 124 | + self.crs = _crs_to_obj(crs) |
| 125 | + self.locations: list[MFLocation] = [] |
| 126 | + self.polygon: Polygon | None = None |
| 127 | + if locations: |
| 128 | + for loc in locations: |
| 129 | + self.append(loc) |
| 130 | + self._update_polygon() |
| 131 | + |
| 132 | + def append(self, location: Any) -> None: |
| 133 | + """Add a location to the vector, converting to the vector's CRS if needed.""" |
| 134 | + if not isinstance(location, MFLocation): |
| 135 | + raise TypeError("Only MFLocation instances can be added.") |
| 136 | + loc_in_crs = location.to(self.crs) |
| 137 | + self.locations.append(loc_in_crs) |
| 138 | + self._update_polygon() |
| 139 | + |
| 140 | + def _update_polygon(self) -> None: |
| 141 | + """Update the internal Shapely polygon from the current locations.""" |
| 142 | + coords = [loc.to(self.crs).point for loc in self.locations] |
| 143 | + # Only create a polygon if there are at least 3 unique points (4 for closure) |
| 144 | + if len(coords) >= 3: |
| 145 | + xy = [(pt.x, pt.y) for pt in coords] |
| 146 | + # Ensure closure: first == last |
| 147 | + if xy[0] != xy[-1]: |
| 148 | + xy.append(xy[0]) |
| 149 | + if len(xy) >= 4: |
| 150 | + self.polygon = Polygon(xy) |
| 151 | + else: |
| 152 | + self.polygon = None |
| 153 | + else: |
| 154 | + self.polygon = None |
| 155 | + |
| 156 | + def contains(self, location: MFLocation, tol: float = 1e-6) -> bool: |
| 157 | + """Check if the vector contains a location (fuzzy, CRS-aware, and near boundary).""" |
| 158 | + pt = location.to(self.crs).point |
| 159 | + # 1. True Shapely containment |
| 160 | + if self.polygon and self.polygon.contains(pt): |
| 161 | + return True |
| 162 | + # 2. Fuzzy: check if any vertex is close to the point |
| 163 | + if any(pt.distance(vertex.point) < tol for vertex in self.locations): |
| 164 | + return True |
| 165 | + # 3. Fuzzy: check if point is near the polygon boundary (within tol) |
| 166 | + return bool(self.polygon and self.polygon.boundary.distance(pt) < tol) |
| 167 | + |
| 168 | + def __contains__(self, item: MFLocation) -> bool: |
| 169 | + """Check if a location is in the vector, using fuzzy containment.""" |
| 170 | + return self.contains(item) |
| 171 | + |
| 172 | + def __getitem__(self, idx: int) -> MFLocation: |
| 173 | + """Get a location by index.""" |
| 174 | + return self.locations[idx] |
| 175 | + |
| 176 | + def __setitem__(self, idx: int, value: MFLocation) -> None: |
| 177 | + """Set a location by index, converting to the vector's CRS if needed.""" |
| 178 | + self.locations[idx] = value.to(self.crs) |
| 179 | + self._update_polygon() |
| 180 | + |
| 181 | + def __delitem__(self, idx: int) -> None: |
| 182 | + """Delete a location by index.""" |
| 183 | + del self.locations[idx] |
| 184 | + self._update_polygon() |
| 185 | + |
| 186 | + def __len__(self) -> int: |
| 187 | + """Return the number of locations in the vector.""" |
| 188 | + return len(self.locations) |
| 189 | + |
| 190 | + def __repr__(self) -> str: |
| 191 | + """Return a string representation of the MFLocationVector instance.""" |
| 192 | + return f"MFLocationVector({self.locations}, crs={self.crs.to_string()})" |
| 193 | + |
| 194 | + |
| 195 | +# Utility: fuzzy membership for a location in a list/vector |
| 196 | + |
| 197 | + |
| 198 | +def fuzzy_in(item: MFLocation, container: Iterable[MFLocation], tol: float = 1e-6, crs: int | str | CRS = 4326) -> bool: |
| 199 | + """Check if a location is 'fuzzily' in a container (list/vector), CRS-aware.""" |
| 200 | + crs_obj = _crs_to_obj(crs) |
| 201 | + item_in_crs = item.to(crs_obj) |
| 202 | + return any(loc.to(crs_obj).equals(item_in_crs, tol=tol, crs=crs_obj) for loc in container) |
0 commit comments