Skip to content

Commit 0a7bb4f

Browse files
authored
feat(query): replace Geometry deserialization with streaming bbox extraction (#19944)
Spatial index writing, statistics collection, join runtime-filter build, and predicate pushdown now compute bounding boxes by streaming over EWKB coordinates directly, avoiding per-row geo::Geometry heap allocations.
1 parent 5af4195 commit 0a7bb4f

10 files changed

Lines changed: 262 additions & 72 deletions

File tree

src/common/io/src/geometry.rs

Lines changed: 127 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -244,14 +244,6 @@ pub fn rect_to_polygon(rect: Rect<f64>) -> Polygon<f64> {
244244
Polygon::new(exterior, vec![])
245245
}
246246

247-
/// Process EWKB input and return SRID.
248-
pub fn read_srid<B: AsRef<[u8]>>(ewkb: &mut Ewkb<B>) -> Option<i32> {
249-
let mut srid_processor = SridProcessor::new();
250-
ewkb.process_geom(&mut srid_processor).ok()?;
251-
252-
srid_processor.srid
253-
}
254-
255247
/// Process EWKB input and return Geometry object and SRID.
256248
pub fn ewkb_to_geo<B: AsRef<[u8]>>(ewkb: &mut Ewkb<B>) -> Result<(Geometry<f64>, Option<i32>)> {
257249
let mut ewkb_processor = EwkbProcessor::new();
@@ -265,23 +257,6 @@ pub fn ewkb_to_geo<B: AsRef<[u8]>>(ewkb: &mut Ewkb<B>) -> Result<(Geometry<f64>,
265257
Ok((geo, srid))
266258
}
267259

268-
struct SridProcessor {
269-
srid: Option<i32>,
270-
}
271-
272-
impl SridProcessor {
273-
fn new() -> Self {
274-
Self { srid: None }
275-
}
276-
}
277-
278-
impl GeomProcessor for SridProcessor {
279-
fn srid(&mut self, srid: Option<i32>) -> geozero::error::Result<()> {
280-
self.srid = srid;
281-
Ok(())
282-
}
283-
}
284-
285260
struct EwkbProcessor {
286261
geo_writer: GeoWriter,
287262
srid: Option<i32>,
@@ -449,3 +424,130 @@ pub fn point_to_geohash(ewkb: &[u8], precision: Option<i32>) -> Result<String> {
449424
encode(point.0, precision.map_or(12, |p| p as usize))
450425
.map_err(|e| ErrorCode::GeometryError(e.to_string()))
451426
}
427+
428+
#[derive(Debug, Clone, Copy, PartialEq)]
429+
pub struct Bbox {
430+
min_x: f64,
431+
min_y: f64,
432+
max_x: f64,
433+
max_y: f64,
434+
}
435+
436+
impl Bbox {
437+
pub fn new(x: f64, y: f64) -> Self {
438+
Self {
439+
min_x: x,
440+
min_y: y,
441+
max_x: x,
442+
max_y: y,
443+
}
444+
}
445+
446+
pub fn from_corners(min_x: f64, min_y: f64, max_x: f64, max_y: f64) -> Self {
447+
Self {
448+
min_x,
449+
min_y,
450+
max_x,
451+
max_y,
452+
}
453+
}
454+
455+
pub fn corners(&self) -> (f64, f64, f64, f64) {
456+
(self.min_x, self.min_y, self.max_x, self.max_y)
457+
}
458+
459+
pub fn extend(&mut self, x: f64, y: f64) {
460+
if x < self.min_x {
461+
self.min_x = x;
462+
}
463+
if x > self.max_x {
464+
self.max_x = x;
465+
}
466+
if y < self.min_y {
467+
self.min_y = y;
468+
}
469+
if y > self.max_y {
470+
self.max_y = y;
471+
}
472+
}
473+
474+
pub fn expand(&mut self, distance: f64) {
475+
self.min_x -= distance;
476+
self.min_y -= distance;
477+
self.max_x += distance;
478+
self.max_y += distance;
479+
}
480+
481+
pub fn intersects(&self, other: &Self) -> bool {
482+
self.max_x >= other.min_x
483+
&& self.min_x <= other.max_x
484+
&& self.max_y >= other.min_y
485+
&& self.min_y <= other.max_y
486+
}
487+
}
488+
489+
#[derive(Debug, Clone, Copy, PartialEq)]
490+
pub struct EwkbBbox {
491+
pub bbox: Option<Bbox>,
492+
pub srid: Option<i32>,
493+
}
494+
495+
pub fn ewkb_to_bbox(ewkb: &[u8]) -> Option<EwkbBbox> {
496+
let mut processor = BboxProcessor::new();
497+
Ewkb(ewkb).process_geom(&mut processor).ok()?;
498+
Some(processor.into_ewkb_bbox())
499+
}
500+
501+
struct BboxProcessor {
502+
bbox: Option<Bbox>,
503+
srid: Option<i32>,
504+
}
505+
506+
impl BboxProcessor {
507+
fn new() -> Self {
508+
Self {
509+
bbox: None,
510+
srid: None,
511+
}
512+
}
513+
514+
fn extend(&mut self, x: f64, y: f64) {
515+
if let Some(bbox) = self.bbox.as_mut() {
516+
bbox.extend(x, y);
517+
} else {
518+
self.bbox = Some(Bbox::new(x, y));
519+
}
520+
}
521+
522+
fn into_ewkb_bbox(self) -> EwkbBbox {
523+
EwkbBbox {
524+
bbox: self.bbox,
525+
srid: self.srid,
526+
}
527+
}
528+
}
529+
530+
impl GeomProcessor for BboxProcessor {
531+
fn srid(&mut self, srid: Option<i32>) -> geozero::error::Result<()> {
532+
self.srid = srid;
533+
Ok(())
534+
}
535+
536+
fn xy(&mut self, x: f64, y: f64, _idx: usize) -> geozero::error::Result<()> {
537+
self.extend(x, y);
538+
Ok(())
539+
}
540+
541+
fn coordinate(
542+
&mut self,
543+
x: f64,
544+
y: f64,
545+
_z: Option<f64>,
546+
_m: Option<f64>,
547+
_t: Option<f64>,
548+
_tm: Option<u64>,
549+
idx: usize,
550+
) -> geozero::error::Result<()> {
551+
self.xy(x, y, idx)
552+
}
553+
}

src/common/io/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,11 @@ pub use escape::escape_string_with_quote;
6464
pub use geography::GEOGRAPHY_SRID;
6565
pub use geography::geography_format;
6666
pub use geometry::Axis;
67+
pub use geometry::Bbox;
68+
pub use geometry::EwkbBbox;
6769
pub use geometry::Extremum;
6870
pub use geometry::GeometryDataType;
71+
pub use geometry::ewkb_to_bbox;
6972
pub use geometry::ewkb_to_geo;
7073
pub use geometry::geo_to_ewkb;
7174
pub use geometry::geo_to_ewkt;
@@ -76,5 +79,4 @@ pub use geometry::geometry_format;
7679
pub use geometry::geometry_from_ewkt;
7780
pub use geometry::geometry_type_name;
7881
pub use geometry::parse_bytes_to_ewkb;
79-
pub use geometry::read_srid;
8082
pub use interval::Interval;

src/common/io/tests/it/geometry.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use databend_common_io::geometry::Bbox;
16+
use databend_common_io::geometry::ewkb_to_bbox;
17+
use databend_common_io::geometry::geometry_from_str;
18+
use databend_common_io::wkb::make_point;
19+
20+
#[test]
21+
fn test_extract_bbox_point() {
22+
let point = make_point(-74.0, 40.7);
23+
let bbox = ewkb_to_bbox(&point).unwrap().bbox.unwrap();
24+
assert_eq!(bbox, Bbox::from_corners(-74.0, 40.7, -74.0, 40.7));
25+
}
26+
27+
#[test]
28+
fn test_extract_bbox_point_with_srid() {
29+
let ewkb = geometry_from_str("SRID=4326;POINT(-122.35 37.55)", None).unwrap();
30+
let result = ewkb_to_bbox(&ewkb).unwrap();
31+
assert_eq!(
32+
result.bbox.unwrap(),
33+
Bbox::from_corners(-122.35, 37.55, -122.35, 37.55)
34+
);
35+
assert_eq!(result.srid, Some(4326));
36+
}
37+
38+
#[test]
39+
fn test_extract_bbox_linestring() {
40+
let ewkb = geometry_from_str("LINESTRING(0 0, 10 5, 20 3)", None).unwrap();
41+
let bbox = ewkb_to_bbox(&ewkb).unwrap().bbox.unwrap();
42+
assert_eq!(bbox, Bbox::from_corners(0.0, 0.0, 20.0, 5.0));
43+
}
44+
45+
#[test]
46+
fn test_extract_bbox_polygon() {
47+
let ewkb = geometry_from_str(
48+
"POLYGON((0 0, 10 0, 10 10, 0 10, 0 0), (2 2, 8 2, 8 8, 2 8, 2 2))",
49+
None,
50+
)
51+
.unwrap();
52+
let bbox = ewkb_to_bbox(&ewkb).unwrap().bbox.unwrap();
53+
assert_eq!(bbox, Bbox::from_corners(0.0, 0.0, 10.0, 10.0));
54+
}
55+
56+
#[test]
57+
fn test_extract_bbox_point_z() {
58+
// EWKB PointZ(1 2 3), little-endian, Z flag 0x80000000. Databend never stores
59+
// 3D geometries today (all ingest paths funnel through to_ewkb(xy)), but the
60+
// EWKB parser must still derive a correct 2D bbox if one ever appears.
61+
let ewkb: &[u8] = &[
62+
0x01, // little-endian
63+
0x01, 0x00, 0x00, 0x80, // type = Point | Z flag
64+
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xF0, 0x3F, // x = 1.0
65+
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, // y = 2.0
66+
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x40, // z = 3.0
67+
];
68+
let bbox = ewkb_to_bbox(ewkb).unwrap().bbox.unwrap();
69+
assert_eq!(bbox, Bbox::from_corners(1.0, 2.0, 1.0, 2.0));
70+
}

src/common/io/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ mod bincode_serialization;
2323
mod borsh_serialization;
2424
mod cursor_ext;
2525
mod escape;
26+
mod geometry;
2627
mod interval;
2728
mod serialization_format_compatibility;

src/query/expression/src/types/geometry.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use std::ops::Range;
1717

1818
use databend_common_exception::ErrorCode;
1919
use databend_common_exception::Result;
20+
use databend_common_io::Bbox;
2021
use databend_common_io::GEOGRAPHY_SRID;
22+
use databend_common_io::ewkb_to_bbox;
2123
use databend_common_io::ewkb_to_geo;
2224
use geo::Geometry;
2325
use geozero::ToGeo;
@@ -242,3 +244,21 @@ pub fn extract_geo_and_srid(value: ScalarRef) -> Result<Option<(Geometry<f64>, i
242244
};
243245
Ok(Some((geo, srid)))
244246
}
247+
248+
/// Compute the bounding box of a geometry/geography scalar by streaming over its
249+
/// EWKB coordinates, avoiding the cost of materializing a `geo::Geometry`.
250+
///
251+
/// SRID follows the same rule as [`extract_geo_and_srid`]: geometries carry their
252+
/// own SRID (0 when absent), geographies are always [`GEOGRAPHY_SRID`].
253+
pub fn extract_bbox_and_srid(value: ScalarRef) -> Result<Option<(Option<Bbox>, i32)>> {
254+
let (ewkb, geography_srid) = match value {
255+
ScalarRef::Geometry(buf) => (buf, None),
256+
ScalarRef::Geography(buf) => (buf.0, Some(GEOGRAPHY_SRID)),
257+
_ => return Ok(None),
258+
};
259+
let Some(result) = ewkb_to_bbox(ewkb) else {
260+
return Err(ErrorCode::GeometryError("invalid EWKB input"));
261+
};
262+
let srid = geography_srid.unwrap_or_else(|| result.srid.unwrap_or(0));
263+
Ok(Some((result.bbox, srid)))
264+
}

src/query/functions/src/scalars/geographic/src/geometry.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use databend_common_expression::vectorize_with_builder_4_arg;
4141
use databend_common_io::Axis;
4242
use databend_common_io::Extremum;
4343
use databend_common_io::GEOGRAPHY_SRID;
44+
use databend_common_io::ewkb_to_bbox;
4445
use databend_common_io::ewkb_to_geo;
4546
use databend_common_io::geo_to_ewkb;
4647
use databend_common_io::geo_to_ewkt;
@@ -56,7 +57,6 @@ use databend_common_io::geometry::st_extreme;
5657
use databend_common_io::geometry_format;
5758
use databend_common_io::geometry_from_ewkt;
5859
use databend_common_io::geometry_type_name;
59-
use databend_common_io::read_srid;
6060
use geo::Area;
6161
use geo::BoundingRect;
6262
use geo::Buffer;
@@ -751,7 +751,9 @@ pub fn register(registry: &mut FunctionRegistry) {
751751
}
752752
}
753753

754-
let srid = read_srid(&mut Ewkb(ewkb)).unwrap_or_default();
754+
let srid = ewkb_to_bbox(ewkb)
755+
.and_then(|result| result.srid)
756+
.unwrap_or_default();
755757
output.push(srid);
756758
}),
757759
);
@@ -771,7 +773,7 @@ pub fn register(registry: &mut FunctionRegistry) {
771773

772774
// All representations of the geo types supported by crates under the GeoRust organization, have not implemented srid().
773775
// Currently, the srid() of all types returns the default value `None`, so we need to parse it manually here.
774-
let Some(from_srid) = read_srid(&mut Ewkb(original)) else {
776+
let Some(from_srid) = ewkb_to_bbox(original).and_then(|result| result.srid) else {
775777
ctx.set_error(row, "input geometry must has the correct SRID".to_string());
776778
builder.commit_row();
777779
return;

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/local_builder.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@ use databend_common_expression::HashMethodKind;
2424
use databend_common_expression::RawExpr;
2525
use databend_common_expression::Scalar;
2626
use databend_common_expression::types::DataType;
27-
use databend_common_expression::types::geometry::extract_geo_and_srid;
27+
use databend_common_expression::types::geometry::extract_bbox_and_srid;
2828
use databend_common_functions::BUILTIN_FUNCTIONS;
29-
use geo::algorithm::bounding_rect::BoundingRect;
3029

3130
use crate::physical_plans::SpatialRuntimeFilterMode;
3231
use crate::pipelines::processors::transforms::hash_join::desc::RuntimeFilterDesc;
@@ -168,7 +167,7 @@ impl SingleFilterBuilder {
168167
}
169168

170169
for value in column.iter() {
171-
let Some((geo, srid)) = extract_geo_and_srid(value)? else {
170+
let Some((bbox, srid)) = extract_bbox_and_srid(value)? else {
172171
continue;
173172
};
174173

@@ -182,10 +181,9 @@ impl SingleFilterBuilder {
182181
self.spatial_srid = Some(srid);
183182
}
184183

185-
if let Some(rect) = geo.bounding_rect() {
186-
let min = rect.min();
187-
let max = rect.max();
188-
self.spatial_rects.push((min.x, min.y, max.x, max.y));
184+
if let Some(bbox) = bbox {
185+
let (min_x, min_y, max_x, max_y) = bbox.corners();
186+
self.spatial_rects.push((min_x, min_y, max_x, max_y));
189187
}
190188
}
191189

src/query/storages/common/index/src/spatial_predicate.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,8 @@ use databend_common_expression::TableSchemaRef;
2626
use databend_common_expression::types::DataType;
2727
use databend_common_expression::visit_expr;
2828
use databend_common_functions::SPATIAL_INDEX_FUNCTIONS;
29-
use databend_common_io::ewkb_to_geo;
30-
use geo::BoundingRect;
29+
use databend_common_io::ewkb_to_bbox;
3130
use geo::Rect;
32-
use geozero::wkb::Ewkb;
3331
use unicase::Ascii;
3432

3533
use crate::scalar_to_distance_threshold;
@@ -139,10 +137,12 @@ impl<'a> SpatialPredicateVisitor<'a> {
139137
fn scalar_to_query(scalar: &Scalar) -> Option<(Option<Rect<f64>>, i32)> {
140138
match scalar {
141139
Scalar::Geometry(buffer) => {
142-
let mut ewkb = Ewkb(buffer.as_slice());
143-
let (geom, srid) = ewkb_to_geo(&mut ewkb).ok()?;
144-
let rect = geom.bounding_rect();
145-
Some((rect, srid.unwrap_or(0)))
140+
let result = ewkb_to_bbox(buffer.as_slice())?;
141+
let rect = result.bbox.map(|bbox| {
142+
let (min_x, min_y, max_x, max_y) = bbox.corners();
143+
Rect::new((min_x, min_y), (max_x, max_y))
144+
});
145+
Some((rect, result.srid.unwrap_or(0)))
146146
}
147147
_ => None,
148148
}

0 commit comments

Comments
 (0)