Skip to content

Commit a8d32fe

Browse files
committed
feat(query): Support Geometry aggregate functions
1 parent 4d7997e commit a8d32fe

29 files changed

Lines changed: 4405 additions & 3010 deletions

File tree

Cargo.lock

Lines changed: 9 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ futures = "0.3.24"
267267
futures-async-stream = { version = "0.2.7" }
268268
futures-util = "0.3.24"
269269
geo = { version = "0.32.0", features = ["use-serde"] }
270-
geo-index = "0.3.2"
270+
geo-index = "0.3.4"
271271
geohash = "0.13.1"
272272
geozero = { version = "0.15.1", features = ["with-geo", "with-geojson", "with-wkb", "with-wkt"] }
273273
gimli = "0.31.0"
@@ -397,7 +397,7 @@ pprof = { git = "https://github.com/datafuse-extras/pprof-rs", rev = "edecd74",
397397
] }
398398
pretty_assertions = "1.3.0"
399399
procfs = { version = "0.17.0" }
400-
proj4rs = { version = "0.1.9", features = ["geo-types", "crs-definitions"] }
400+
proj4rs = { version = "0.1.10", features = ["geo-types", "crs-definitions"] }
401401
proptest = { version = "1", default-features = false, features = ["std"] }
402402
prost = { version = "0.13" }
403403
prost-build = { version = "0.13" }

src/common/io/src/geography.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,16 @@ use geozero::ToGeo;
2525
use geozero::ToWkb;
2626
use geozero::geojson::GeoJson;
2727
use geozero::wkb::Ewkb;
28+
use hex::encode_upper;
2829

2930
use crate::ewkb_to_geo;
31+
use crate::geometry::GeometryDataType;
3032
use crate::geometry::ewkt_str_to_geo;
33+
use crate::geometry::geo_to_ewkb;
34+
use crate::geometry::geo_to_ewkt;
35+
use crate::geometry::geo_to_json;
36+
use crate::geometry::geo_to_wkb;
37+
use crate::geometry::geo_to_wkt;
3138

3239
pub const LONGITUDE_MIN: f64 = -180.0;
3340
pub const LONGITUDE_MAX: f64 = 180.0;
@@ -69,6 +76,26 @@ pub fn geography_from_geojson(json_str: &str) -> Result<Vec<u8>> {
6976
.map_err(|e| ErrorCode::GeometryError(e.to_string()))
7077
}
7178

79+
pub fn geography_format(ewkb: &[u8], format_type: GeometryDataType) -> Result<String> {
80+
let geo = Ewkb(ewkb)
81+
.to_geo()
82+
.map_err(|e| ErrorCode::GeometryError(e.to_string()))?;
83+
84+
match format_type {
85+
GeometryDataType::WKB => {
86+
let bytes = geo_to_wkb(geo)?;
87+
Ok(encode_upper(bytes))
88+
}
89+
GeometryDataType::EWKB => {
90+
let bytes = geo_to_ewkb(geo, Some(GEOGRAPHY_SRID))?;
91+
Ok(encode_upper(bytes))
92+
}
93+
GeometryDataType::WKT => geo_to_wkt(geo),
94+
GeometryDataType::EWKT => geo_to_ewkt(geo, Some(GEOGRAPHY_SRID)),
95+
GeometryDataType::GEOJSON => geo_to_json(geo),
96+
}
97+
}
98+
7299
pub fn check_srid(srid: Option<i32>) -> Result<()> {
73100
if let Some(srid) = srid
74101
&& srid != GEOGRAPHY_SRID

src/common/io/src/geometry.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ use databend_common_exception::ErrorCode;
1919
use databend_common_exception::Result;
2020
use geo::BoundingRect;
2121
use geo::Geometry;
22+
use geo::LineString;
2223
use geo::Point;
24+
use geo::Polygon;
25+
use geo::Rect;
2326
use geohash::encode;
2427
use geozero::CoordDimensions;
2528
use geozero::GeomProcessor;
@@ -31,6 +34,7 @@ use geozero::ToWkt;
3134
use geozero::geo_types::GeoWriter;
3235
use geozero::geojson::GeoJson;
3336
use geozero::wkb::Ewkb;
37+
use hex::encode_upper;
3438
use serde::Deserialize;
3539
use serde::Serialize;
3640
use wkt::TryFromWkt;
@@ -193,21 +197,12 @@ impl<B: AsRef<[u8]>> GeometryFormatOutput for Ewkb<B> {
193197
match format_type {
194198
GeometryDataType::WKB => self
195199
.to_wkb(CoordDimensions::xy())
196-
.map(|bytes| {
197-
bytes
198-
.iter()
199-
.map(|b| format!("{:02X}", b))
200-
.collect::<Vec<_>>()
201-
.join("")
202-
})
200+
.map(encode_upper)
201+
.map_err(|e| ErrorCode::GeometryError(e.to_string())),
202+
GeometryDataType::EWKB => self
203+
.to_ewkb(CoordDimensions::xy(), self.srid())
204+
.map(encode_upper)
203205
.map_err(|e| ErrorCode::GeometryError(e.to_string())),
204-
GeometryDataType::EWKB => Ok(self
205-
.0
206-
.as_ref()
207-
.iter()
208-
.map(|b| format!("{:02X}", b))
209-
.collect::<Vec<_>>()
210-
.join("")),
211206
GeometryDataType::WKT => self
212207
.to_wkt()
213208
.map_err(|e| ErrorCode::GeometryError(e.to_string())),
@@ -258,6 +253,19 @@ pub fn geo_to_ewkt(geo: Geometry, srid: Option<i32>) -> Result<String> {
258253
.map_err(|e| ErrorCode::GeometryError(e.to_string()))
259254
}
260255

256+
pub fn rect_to_polygon(rect: Rect<f64>) -> Polygon<f64> {
257+
let min = rect.min();
258+
let max = rect.max();
259+
let exterior = LineString::from(vec![
260+
(min.x, min.y),
261+
(max.x, min.y),
262+
(max.x, max.y),
263+
(min.x, max.y),
264+
(min.x, min.y),
265+
]);
266+
Polygon::new(exterior, vec![])
267+
}
268+
261269
/// Process EWKB input and return SRID.
262270
pub fn read_srid<B: AsRef<[u8]>>(ewkb: &mut Ewkb<B>) -> Option<i32> {
263271
let mut srid_processor = SridProcessor::new();

src/common/io/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ pub use decimal::display_decimal_256_trimmed;
6262
pub use escape::escape_string;
6363
pub use escape::escape_string_with_quote;
6464
pub use geography::GEOGRAPHY_SRID;
65+
pub use geography::geography_format;
6566
pub use geometry::Axis;
6667
pub use geometry::Extremum;
6768
pub use geometry::GeometryDataType;

src/query/expression/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ micromarshal = { workspace = true }
5252
num-bigint = { workspace = true }
5353
num-derive = { workspace = true }
5454
num-traits = { workspace = true }
55+
proj4rs = { workspace = true }
5556
rand = { workspace = true }
5657
rand_distr = { workspace = true }
5758
recursive = { workspace = true }
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_exception::ErrorCode;
16+
use databend_common_exception::Result;
17+
use databend_common_io::geometry::rect_to_polygon;
18+
use geo::BoundingRect;
19+
use geo::Coord;
20+
use geo::Geometry;
21+
use geo::GeometryCollection;
22+
use geo::LineString;
23+
use geo::MultiLineString;
24+
use geo::MultiPoint;
25+
use geo::MultiPolygon;
26+
use geo::Point;
27+
use geo::Polygon;
28+
use geo::Rect;
29+
30+
use crate::geographic::GeometryOverlay;
31+
use crate::geographic::OverlayMode;
32+
33+
pub trait GeoAggOp: Send + Sync + 'static {
34+
fn compute(geos: Vec<Geometry<f64>>) -> Result<Option<Geometry<f64>>>;
35+
fn binary_compute(l_geo: Geometry<f64>, r_geo: Geometry<f64>) -> Result<Option<Geometry<f64>>>;
36+
}
37+
38+
pub struct GeometryUnionAggOp;
39+
40+
impl GeoAggOp for GeometryUnionAggOp {
41+
fn compute(geos: Vec<Geometry<f64>>) -> Result<Option<Geometry<f64>>> {
42+
apply_geometry_overlay(geos, OverlayMode::Union)
43+
}
44+
fn binary_compute(l_geo: Geometry<f64>, r_geo: Geometry<f64>) -> Result<Option<Geometry<f64>>> {
45+
apply_binary_geometry_overlay(l_geo, r_geo, OverlayMode::Union)
46+
}
47+
}
48+
49+
pub struct GeometryIntersectionAggOp;
50+
51+
impl GeoAggOp for GeometryIntersectionAggOp {
52+
fn compute(geos: Vec<Geometry<f64>>) -> Result<Option<Geometry<f64>>> {
53+
apply_geometry_overlay(geos, OverlayMode::Intersection)
54+
}
55+
fn binary_compute(l_geo: Geometry<f64>, r_geo: Geometry<f64>) -> Result<Option<Geometry<f64>>> {
56+
apply_binary_geometry_overlay(l_geo, r_geo, OverlayMode::Intersection)
57+
}
58+
}
59+
60+
pub struct GeometryDifferenceAggOp;
61+
62+
impl GeoAggOp for GeometryDifferenceAggOp {
63+
fn compute(geos: Vec<Geometry<f64>>) -> Result<Option<Geometry<f64>>> {
64+
apply_geometry_overlay(geos, OverlayMode::Difference)
65+
}
66+
fn binary_compute(l_geo: Geometry<f64>, r_geo: Geometry<f64>) -> Result<Option<Geometry<f64>>> {
67+
apply_binary_geometry_overlay(l_geo, r_geo, OverlayMode::Difference)
68+
}
69+
}
70+
71+
pub struct GeometrySymDifferenceAggOp;
72+
73+
impl GeoAggOp for GeometrySymDifferenceAggOp {
74+
fn compute(geos: Vec<Geometry<f64>>) -> Result<Option<Geometry<f64>>> {
75+
apply_geometry_overlay(geos, OverlayMode::SymDifference)
76+
}
77+
fn binary_compute(l_geo: Geometry<f64>, r_geo: Geometry<f64>) -> Result<Option<Geometry<f64>>> {
78+
apply_binary_geometry_overlay(l_geo, r_geo, OverlayMode::SymDifference)
79+
}
80+
}
81+
82+
pub struct CollectAggOp;
83+
84+
impl GeoAggOp for CollectAggOp {
85+
fn compute(geos: Vec<Geometry<f64>>) -> Result<Option<Geometry<f64>>> {
86+
if geos.is_empty() {
87+
return Ok(None);
88+
}
89+
90+
if geos.iter().all(|geo| matches!(geo, Geometry::Point(_))) {
91+
let points: Vec<Point<f64>> = geos
92+
.into_iter()
93+
.map(|geo| geo.try_into().expect("point geometry"))
94+
.collect();
95+
let multi_point = MultiPoint::from_iter(points);
96+
return Ok(Some(Geometry::MultiPoint(multi_point)));
97+
}
98+
if geos
99+
.iter()
100+
.all(|geo| matches!(geo, Geometry::LineString(_)))
101+
{
102+
let lines: Vec<LineString<f64>> = geos
103+
.into_iter()
104+
.map(|geo| geo.try_into().expect("linestring geometry"))
105+
.collect();
106+
let multi_line_string = MultiLineString::from_iter(lines);
107+
return Ok(Some(Geometry::MultiLineString(multi_line_string)));
108+
}
109+
if geos.iter().all(|geo| matches!(geo, Geometry::Polygon(_))) {
110+
let polygons: Vec<Polygon<f64>> = geos
111+
.into_iter()
112+
.map(|geo| geo.try_into().expect("polygon geometry"))
113+
.collect();
114+
let multi_polygon = MultiPolygon::from_iter(polygons);
115+
return Ok(Some(Geometry::MultiPolygon(multi_polygon)));
116+
}
117+
118+
let collection = GeometryCollection::from_iter(geos);
119+
Ok(Some(Geometry::GeometryCollection(collection)))
120+
}
121+
122+
fn binary_compute(l_geo: Geometry<f64>, r_geo: Geometry<f64>) -> Result<Option<Geometry<f64>>> {
123+
todo!();
124+
}
125+
}
126+
127+
pub struct EnvelopeAggOp;
128+
129+
impl GeoAggOp for EnvelopeAggOp {
130+
fn compute(geos: Vec<Geometry<f64>>) -> Result<Option<Geometry<f64>>> {
131+
let mut has_rect = false;
132+
let mut min_x = 0.0_f64;
133+
let mut min_y = 0.0_f64;
134+
let mut max_x = 0.0_f64;
135+
let mut max_y = 0.0_f64;
136+
137+
for geo in geos {
138+
if let Some(rect) = geo.bounding_rect() {
139+
let min = rect.min();
140+
let max = rect.max();
141+
if !has_rect {
142+
min_x = min.x;
143+
min_y = min.y;
144+
max_x = max.x;
145+
max_y = max.y;
146+
has_rect = true;
147+
} else {
148+
min_x = min_x.min(min.x);
149+
min_y = min_y.min(min.y);
150+
max_x = max_x.max(max.x);
151+
max_y = max_y.max(max.y);
152+
}
153+
}
154+
}
155+
156+
if !has_rect {
157+
return Ok(None);
158+
}
159+
160+
let rect = Rect::new(Coord { x: min_x, y: min_y }, Coord { x: max_x, y: max_y });
161+
Ok(Some(Geometry::Polygon(rect_to_polygon(rect))))
162+
}
163+
164+
fn binary_compute(l_geo: Geometry<f64>, r_geo: Geometry<f64>) -> Result<Option<Geometry<f64>>> {
165+
todo!();
166+
}
167+
}
168+
169+
fn apply_geometry_overlay(
170+
geos: Vec<Geometry<f64>>,
171+
mode: OverlayMode,
172+
) -> Result<Option<Geometry<f64>>> {
173+
let overlay = GeometryOverlay::new(mode);
174+
overlay.apply_iter(geos)
175+
}
176+
177+
fn apply_binary_geometry_overlay(
178+
l_geo: Geometry<f64>,
179+
r_geo: Geometry<f64>,
180+
mode: OverlayMode,
181+
) -> Result<Option<Geometry<f64>>> {
182+
let overlay = GeometryOverlay::new(mode);
183+
overlay.apply(&l_geo, &r_geo)
184+
}

0 commit comments

Comments
 (0)