Skip to content

Commit 53602f9

Browse files
committed
[GH-2809] Support distance joins for raster predicates
Add `RS_DWithin(raster|geom, raster|geom, distance)` so distance joins can use raster operands, and route the join planner through the existing spatial-index machinery. - `RS_DWithin` expression in `RasterPredicates.scala`, backed by new `RasterPredicates.rsDWithin` overloads (raster-geom, raster-raster) that reuse `convertCRSIfNeeded` and JTS `isWithinDistance`. - `JoinQueryDetector` and `OptimizableJoinCondition` recognise `RS_DWithin` as a distance-join predicate; the relationship label collapses to `RS_DWithin` for all raster + distance cases. - `BroadcastIndexJoinExec.createStreamShapes` and the new `TraitJoinQueryBase.toExpandedWGS84EnvelopeRDD` handle the raster stream and build sides for broadcast-index joins; `SpatialIndexExec` and `DistanceJoinExec` route to the same helper so non-broadcast distance joins work too. - Drop the placeholder `UnsupportedOperationException` guards for distance + raster combinations; geography + raster + distance remains guarded since the geography refiner does not handle raster shapes. Tests - `BroadcastIndexJoinSuite`: `RS_DWithin` covers stream-raster / broadcast-raster / swapped-operand forms. - `RasterJoinSuite`: new `RS_DWithin distance join` describe block covers `DistanceJoinExec` with both partition-side configs, swapped operands, and raster-raster. Docs - New `docs/api/sql/Raster-Predicates/RS_DWithin.md` page. - `Raster-Functions.md` predicate table row. - `Optimizer.md` raster-distance-join subsection.
1 parent 85270fe commit 53602f9

14 files changed

Lines changed: 411 additions & 12 deletions

File tree

common/src/main/java/org/apache/sedona/common/raster/RasterPredicates.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,22 @@ public static boolean rsContains(GridCoverage2D left, GridCoverage2D right) {
8282
return leftGeometry.contains(rightGeometry);
8383
}
8484

85+
/**
86+
* Test if a raster is within {@code distance} of a geometry. Both shapes are converted to a
87+
* common CRS (matching the rest of the RS_* predicate family); the distance is then measured in
88+
* that CRS's units (degrees for WGS84, the projected unit otherwise).
89+
*/
90+
public static boolean rsDWithin(GridCoverage2D raster, Geometry geometry, double distance) {
91+
Pair<Geometry, Geometry> geometries = convertCRSIfNeeded(raster, geometry);
92+
return geometries.getLeft().isWithinDistance(geometries.getRight(), distance);
93+
}
94+
95+
/** Raster-raster variant of {@link #rsDWithin(GridCoverage2D, Geometry, double)}. */
96+
public static boolean rsDWithin(GridCoverage2D left, GridCoverage2D right, double distance) {
97+
Pair<Geometry, Geometry> geometries = convertCRSIfNeeded(left, right);
98+
return geometries.getLeft().isWithinDistance(geometries.getRight(), distance);
99+
}
100+
85101
private static Pair<Geometry, Geometry> convertCRSIfNeeded(
86102
GridCoverage2D raster, Geometry queryWindow) {
87103
Geometry rasterGeometry;

docs/api/sql/Optimizer.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,22 @@ These queries could be planned as RangeJoin or BroadcastIndexJoin. Here is an ex
282282
+- LocalTableScan [geom#24, id#25]
283283
```
284284

285+
### Raster distance join
286+
287+
`RS_DWithin(left, right, distance)` is recognised as a distance-join predicate. Sedona converts each side to a WGS84 envelope, expands the envelope on the distance side by `distance`, and runs an R-tree filter before applying the per-row `RS_DWithin` check. The same plan is chosen as for `ST_DWithin``BroadcastIndexJoinExec` when one side is small enough to broadcast, otherwise `DistanceJoinExec`.
288+
289+
```sql
290+
-- Raster-geometry distance join (broadcastable)
291+
SELECT /*+ BROADCAST(points) */ r.id, p.id
292+
FROM rasters r
293+
JOIN points p ON RS_DWithin(r.raster, p.geom, 0.01)
294+
295+
-- Raster-raster distance join (partitioned spatial join)
296+
SELECT a.id, b.id
297+
FROM rasters a
298+
JOIN rasters b ON RS_DWithin(a.raster, b.raster, 0.01)
299+
```
300+
285301
## Google S2 based approximate equi-join
286302

287303
If the performance of Sedona optimized join is not ideal, which is possibly caused by complicated and overlapping geometries, you can resort to Sedona built-in Google S2-based approximate equi-join. This equi-join leverages Spark's internal equi-join algorithm and might be performant given that you can opt to skip the refinement step by sacrificing query accuracy.

docs/api/sql/Raster-Functions.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ These functions test spatial relationships involving raster objects.
102102
| Function | Return type | Description | Since |
103103
| :--- | :--- | :--- | :--- |
104104
| [RS_Contains](Raster-Predicates/RS_Contains.md) | Boolean | Returns true if the geometry or raster on the left side contains the geometry or raster on the right side. The convex hull of the raster is considered in the test. | v1.5.0 |
105+
| [RS_DWithin](Raster-Predicates/RS_DWithin.md) | Boolean | Returns true if the raster or geometry on the left side is within `distance` of the raster or geometry on the right side. The convex hull of the raster is considered in the test. | v1.9.1 |
105106
| [RS_Intersects](Raster-Predicates/RS_Intersects.md) | Boolean | Returns true if raster or geometry on the left side intersects with the raster or geometry on the right side. The convex hull of the raster is considered in the test. | v1.5.0 |
106107
| [RS_Within](Raster-Predicates/RS_Within.md) | Boolean | Returns true if the geometry or raster on the left side is within the geometry or raster on the right side. The convex hull of the raster is considered in the test. | v1.5.0 |
107108

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# RS_DWithin
21+
22+
Introduction: Returns true if the raster or geometry on the left side is within `distance` of the raster or geometry on the right side. The convex hull of the raster is considered in the test. At least one of the two shape arguments must be a raster — for the geometry-only case use [`ST_DWithin`](../Predicate/#st_dwithin) instead.
23+
24+
Rules for testing spatial relationship:
25+
26+
- If the raster or geometry does not have a defined SRID, it is assumed to be in WGS84.
27+
- If both sides are in the same CRS, then the distance is measured in the units of that CRS.
28+
- Otherwise, both sides will be transformed to WGS84 before the distance test and the distance is measured in degrees.
29+
30+
When used as a join condition, Sedona plans the join as an optimized distance join (`BroadcastIndexJoinExec` or `DistanceJoinExec`): each side's WGS84 envelope is computed, the distance-side envelope is expanded by the radius, and the resulting envelopes drive an R-tree filter before the per-row `RS_DWithin` check refines the result.
31+
32+
Format:
33+
34+
`RS_DWithin(raster: Raster, geom: Geometry, distance: Double)`
35+
36+
`RS_DWithin(geom: Geometry, raster: Raster, distance: Double)`
37+
38+
`RS_DWithin(raster0: Raster, raster1: Raster, distance: Double)`
39+
40+
Return type: `Boolean`
41+
42+
Since: `v1.9.1`
43+
44+
SQL Example
45+
46+
```sql
47+
SELECT RS_DWithin(
48+
RS_MakeEmptyRaster(1, 20, 20, 2, 22, 1),
49+
ST_SetSRID(ST_PolygonFromEnvelope(30, 30, 40, 40), 4326),
50+
15.0
51+
) AS within_15_degrees
52+
```
53+
54+
Output:
55+
56+
```
57+
+-----------------+
58+
|within_15_degrees|
59+
+-----------------+
60+
| true|
61+
+-----------------+
62+
```
63+
64+
Using `RS_DWithin` as a distance-join condition:
65+
66+
```sql
67+
SELECT r.id, p.id
68+
FROM rasters r
69+
JOIN points p ON RS_DWithin(r.raster, p.geom, 0.01)
70+
```

spark/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,11 @@ object Catalog extends AbstractCatalog with Logging {
417417

418418
// Raster-Predicates
419419
val rasterPredicateExprs: Seq[FunctionDescription] =
420-
Seq(function[RS_Contains](), function[RS_Intersects](), function[RS_Within]())
420+
Seq(
421+
function[RS_Contains](),
422+
function[RS_DWithin](),
423+
function[RS_Intersects](),
424+
function[RS_Within]())
421425

422426
// Raster-Geometry-Functions (raster → geometry derivations)
423427
val rasterGeometryExprs: Seq[FunctionDescription] =

spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/RasterPredicates.scala

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ import org.apache.sedona.common.raster.RasterPredicates
2222
import org.apache.sedona.sql.utils.{GeometrySerializer, RasterSerializer}
2323
import org.apache.spark.sql.catalyst.InternalRow
2424
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
25-
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression}
25+
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ImplicitCastInputTypes}
2626
import org.apache.spark.sql.sedona_sql.UDT.{GeometryUDT, RasterUDT}
2727
import org.apache.spark.sql.sedona_sql.expressions.{FoldableExpression, NullIntolerantShim}
28-
import org.apache.spark.sql.types.{AbstractDataType, BooleanType, DataType}
28+
import org.apache.spark.sql.types.{AbstractDataType, BooleanType, DataType, DoubleType}
2929
import org.geotools.coverage.grid.GridCoverage2D
3030
import org.locationtech.jts.geom.Geometry
3131

@@ -155,3 +155,83 @@ private[apache] case class RS_Within(inputExpressions: Seq[Expression])
155155
copy(inputExpressions = newChildren)
156156
}
157157
}
158+
159+
/**
160+
* Distance predicate for rasters: `RS_DWithin(left, right, distance)`. `left` and `right` can each
161+
* be a raster or a geometry (at least one must be a raster). Returns true when the shapes are
162+
* within `distance` of each other, with both sides projected to a common CRS prior to the JTS
163+
* distance check (mirroring [[RS_Intersects]]). This expression is recognised by
164+
* [[org.apache.spark.sql.sedona_sql.strategy.join.JoinQueryDetector]] as a distance-join key, so a
165+
* coarse R-tree filter over expanded WGS84 envelopes prunes candidates before the per-row check.
166+
*/
167+
private[apache] case class RS_DWithin(inputExpressions: Seq[Expression])
168+
extends Expression
169+
with FoldableExpression
170+
with ImplicitCastInputTypes
171+
with NullIntolerantShim
172+
with CodegenFallback {
173+
174+
override def children: Seq[Expression] = inputExpressions
175+
176+
override def nullable: Boolean = children.exists(_.nullable)
177+
178+
override def dataType: DataType = BooleanType
179+
180+
override def toString: String = s" **${getClass.getName}** "
181+
182+
override def inputTypes: Seq[AbstractDataType] = {
183+
if (inputExpressions.length != 3) {
184+
throw new IllegalArgumentException(
185+
s"RS_DWithin requires exactly 3 inputs, but got ${inputExpressions.length}")
186+
}
187+
val leftType = inputExpressions.head.dataType
188+
val rightType = inputExpressions(1).dataType
189+
(leftType, rightType) match {
190+
case (_: RasterUDT, _: GeometryUDT) => Seq(RasterUDT(), GeometryUDT(), DoubleType)
191+
case (_: GeometryUDT, _: RasterUDT) => Seq(GeometryUDT(), RasterUDT(), DoubleType)
192+
case (_: RasterUDT, _: RasterUDT) => Seq(RasterUDT(), RasterUDT(), DoubleType)
193+
case _ =>
194+
throw new IllegalArgumentException(
195+
s"RS_DWithin requires at least one raster input; got: $leftType, $rightType")
196+
}
197+
}
198+
199+
override def eval(inputRow: InternalRow): Any = {
200+
val leftValue = inputExpressions.head.eval(inputRow)
201+
if (leftValue == null) return null
202+
val rightValue = inputExpressions(1).eval(inputRow)
203+
if (rightValue == null) return null
204+
val distanceValue = inputExpressions(2).eval(inputRow)
205+
if (distanceValue == null) return null
206+
207+
val leftBytes = leftValue.asInstanceOf[Array[Byte]]
208+
val rightBytes = rightValue.asInstanceOf[Array[Byte]]
209+
val distance = distanceValue.asInstanceOf[Double]
210+
val leftType = inputExpressions.head.dataType
211+
val rightType = inputExpressions(1).dataType
212+
(leftType, rightType) match {
213+
case (_: RasterUDT, _: GeometryUDT) =>
214+
RasterPredicates.rsDWithin(
215+
RasterSerializer.deserialize(leftBytes),
216+
GeometrySerializer.deserialize(rightBytes),
217+
distance)
218+
case (_: GeometryUDT, _: RasterUDT) =>
219+
RasterPredicates.rsDWithin(
220+
RasterSerializer.deserialize(rightBytes),
221+
GeometrySerializer.deserialize(leftBytes),
222+
distance)
223+
case (_: RasterUDT, _: RasterUDT) =>
224+
RasterPredicates.rsDWithin(
225+
RasterSerializer.deserialize(leftBytes),
226+
RasterSerializer.deserialize(rightBytes),
227+
distance)
228+
case _ =>
229+
throw new IllegalArgumentException(
230+
s"RS_DWithin requires at least one raster input; got: $leftType, $rightType")
231+
}
232+
}
233+
234+
protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
235+
copy(inputExpressions = newChildren)
236+
}
237+
}

spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/BroadcastIndexJoinExec.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,7 @@ case class BroadcastIndexJoinExec(
122122
case (None, _, false) => s"ST_$spatialPredicate($windowExpression, $objectExpression)"
123123
case (None, _, true) => s"RS_$spatialPredicate($windowExpression, $objectExpression)"
124124
case (Some(r), _, true) =>
125-
throw new UnsupportedOperationException(
126-
"Distance joins are not supported for raster predicates")
125+
s"RS_Distance($windowExpression, $objectExpression) < $r"
127126
}
128127

129128
override def simpleString(maxFields: Int): String =
@@ -321,6 +320,27 @@ case class BroadcastIndexJoinExec(
321320
(shape, row)
322321
}
323322
})
323+
case Some(distanceExpression) if isRasterPredicate =>
324+
val boundDistance =
325+
BindReferences.bindReference(distanceExpression, streamed.output)
326+
streamResultsRaw.map(row => {
327+
val serialized = boundStreamShape.eval(row).asInstanceOf[Array[Byte]]
328+
if (serialized == null) {
329+
(null, row)
330+
} else {
331+
val baseShape = if (boundStreamShape.dataType.isInstanceOf[RasterUDT]) {
332+
val raster = RasterSerializer.deserialize(serialized)
333+
JoinedGeometryRaster.rasterToWGS84Envelope(raster)
334+
} else {
335+
val geom = GeometrySerializer.deserialize(serialized)
336+
JoinedGeometryRaster.geometryToWGS84Envelope(geom)
337+
}
338+
val radius = boundDistance.eval(row).asInstanceOf[Double]
339+
val envelope = baseShape.getEnvelopeInternal
340+
envelope.expandBy(radius)
341+
(baseShape.getFactory.toGeometry(envelope), row)
342+
}
343+
})
324344
case Some(distanceExpression) =>
325345
streamResultsRaw.map(row => {
326346
val geometry = TraitJoinQueryBase.shapeToGeometry(boundStreamShape, row)

spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/DistanceJoinExec.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging
2424
import org.apache.spark.rdd.RDD
2525
import org.apache.spark.sql.catalyst.expressions.{BindReferences, Expression, UnsafeRow}
2626
import org.apache.spark.sql.execution.SparkPlan
27+
import org.apache.spark.sql.sedona_sql.UDT.RasterUDT
2728
import org.apache.spark.sql.sedona_sql.execution.SedonaBinaryExecNode
2829
import org.locationtech.jts.geom.Geometry
2930

@@ -83,7 +84,23 @@ case class DistanceJoinExec(
8384
leftShapeExpr: Expression,
8485
rightRdd: RDD[UnsafeRow],
8586
rightShapeExpr: Expression): (SpatialRDD[Geometry], SpatialRDD[Geometry]) = {
86-
if (distanceBoundToLeft) {
87+
// Raster predicates project both sides to a WGS84 envelope before partitioning so the coarse
88+
// R-tree filter applies regardless of the input CRS — mirroring the non-distance raster join
89+
// (toWGS84EnvelopeRDD) and the broadcast-index raster distance path.
90+
val isRasterPredicate =
91+
leftShapeExpr.dataType.isInstanceOf[RasterUDT] ||
92+
rightShapeExpr.dataType.isInstanceOf[RasterUDT]
93+
if (isRasterPredicate) {
94+
if (distanceBoundToLeft) {
95+
(
96+
toExpandedWGS84EnvelopeRDD(leftRdd, leftShapeExpr, boundRadius),
97+
toWGS84EnvelopeRDD(rightRdd, rightShapeExpr))
98+
} else {
99+
(
100+
toWGS84EnvelopeRDD(leftRdd, leftShapeExpr),
101+
toExpandedWGS84EnvelopeRDD(rightRdd, rightShapeExpr, boundRadius))
102+
}
103+
} else if (distanceBoundToLeft) {
87104
(
88105
toExpandedEnvelopeRDD(leftRdd, leftShapeExpr, boundRadius, isGeography),
89106
toSpatialRDD(rightRdd, rightShapeExpr))

spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,32 @@ class JoinQueryDetector(sparkSession: SparkSession) extends SparkStrategy {
174174
Some(condition)))
175175
}
176176

177+
/**
178+
* Build a [[JoinQueryDetection]] for [[RS_DWithin]]. The coarse spatial join uses an expanded
179+
* WGS84 envelope (driven by `distance`) and falls back to the per-row predicate for refinement,
180+
* matching the pattern used for [[org.apache.spark.sql.sedona_sql.expressions.ST_DWithin]].
181+
*/
182+
private def getRasterDistanceJoinDetection(
183+
left: LogicalPlan,
184+
right: LogicalPlan,
185+
predicate: RS_DWithin,
186+
extraCondition: Option[Expression] = None): Option[JoinQueryDetection] = {
187+
val leftShape = predicate.inputExpressions.head
188+
val rightShape = predicate.inputExpressions(1)
189+
val distance = predicate.inputExpressions(2)
190+
val condition = extraCondition.map(And(_, predicate)).getOrElse(predicate)
191+
Some(
192+
JoinQueryDetection(
193+
left,
194+
right,
195+
leftShape,
196+
rightShape,
197+
SpatialPredicate.INTERSECTS,
198+
isGeography = false,
199+
extraCondition = Some(condition),
200+
distance = Some(distance)))
201+
}
202+
177203
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
178204
case Join(left, right, joinType, condition, JoinHint(leftHint, rightHint))
179205
if optimizationEnabled(left, right, condition) =>
@@ -320,6 +346,8 @@ class JoinQueryDetector(sparkSession: SparkSession) extends SparkStrategy {
320346
getJoinDetection(left, right, pred, extraCondition)
321347
case pred: RS_Predicate =>
322348
getRasterJoinDetection(left, right, pred, extraCondition)
349+
case pred: RS_DWithin =>
350+
getRasterDistanceJoinDetection(left, right, pred, extraCondition)
323351
case ST_DWithin(Seq(leftShape, rightShape, distance)) =>
324352
val geographyShape =
325353
isGeographyInput(leftShape) || isGeographyInput(rightShape)
@@ -903,9 +931,10 @@ class JoinQueryDetector(sparkSession: SparkSession) extends SparkStrategy {
903931
case (None, _, true, _, true) =>
904932
throw new UnsupportedOperationException(
905933
"Geography joins are not yet supported for raster predicates")
906-
case (Some(_), _, _, _, true) =>
934+
case (Some(_), _, true, _, true) =>
907935
throw new UnsupportedOperationException(
908-
"Distance joins are not supported for raster predicates")
936+
"Geography distance joins are not yet supported for raster predicates")
937+
case (Some(_), _, false, _, true) => "RS_DWithin"
909938
}
910939
val (distanceOnIndexSide, distanceOnStreamSide) = distance
911940
.map { distanceExpr =>

spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/OptimizableJoinCondition.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package org.apache.spark.sql.sedona_sql.strategy.join
2121
import org.apache.spark.sql.catalyst.expressions.{And, Expression, LessThan, LessThanOrEqual, Literal}
2222
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2323
import org.apache.spark.sql.sedona_sql.expressions._
24-
import org.apache.spark.sql.sedona_sql.expressions.raster.RS_Predicate
24+
import org.apache.spark.sql.sedona_sql.expressions.raster.{RS_DWithin, RS_Predicate}
2525
import org.apache.spark.sql.sedona_sql.optimization.ExpressionUtils
2626

2727
case class OptimizableJoinCondition(left: LogicalPlan, right: LogicalPlan) {
@@ -73,6 +73,8 @@ case class OptimizableJoinCondition(left: LogicalPlan, right: LogicalPlan) {
7373
case ST_DWithin(Seq(leftShape, rightShape, distance, useSpheroid)) =>
7474
useSpheroid
7575
.isInstanceOf[Literal] && isDistanceJoinOptimizable(leftShape, rightShape, distance)
76+
case RS_DWithin(Seq(leftShape, rightShape, distance)) =>
77+
isDistanceJoinOptimizable(leftShape, rightShape, distance)
7678

7779
case _: LessThan | _: LessThanOrEqual =>
7880
val (smaller, larger) = (expression.children.head, expression.children(1))

0 commit comments

Comments
 (0)