[GH-2830] Add broadcast spatial-join support for the Geography type with ST_Contains#2864
Conversation
There was a problem hiding this comment.
Pull request overview
Adds broadcast spatial-join planning for ST_Contains over GeographyUDT, enabling BroadcastIndexJoinExec (index on coarse lat/lng envelope + S2 refine) instead of falling back to BroadcastNestedLoopJoin. Also includes a fix for a Geography ST_Contains correctness issue in the WKB fast-path, plus new regression and planning tests.
Changes:
- Plan
BroadcastIndexJoinExecforST_Contains(geog, geog)when a broadcast hint (or Sedona auto-broadcast) applies, using a Geography-aware index/refine path. - Index Geography rows by their S2 lat/lng bounding rectangle (full-longitude fallback for antimeridian-wrapping rects) and refine with
org.apache.sedona.common.geography.Functions.contains. - Fix WKB polygon ring handling in
WkbS2Shape(open vs closed rings) and add tests covering the regression + join planning/correctness.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| spark/common/src/test/scala/org/apache/sedona/sql/geography/BroadcastIndexJoinGeographySuite.scala | Adds planning + correctness tests for broadcast Geography ST_Contains, including antimeridian spanning polygons. |
| spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryBase.scala | Adds toGeographySpatialRDD to build broadcast-side index entries from Geography bytes with coarse envelopes + attached payload. |
| spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/SpatialIndexExec.scala | Routes broadcast index build through toGeographySpatialRDD when geographyShape is set. |
| spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinedGeometry.scala | Introduces GeographyJoinShape payload and envelope construction from S2 rect bounds (antimeridian full-longitude fallback). |
| spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala | Detects Geography ST_Contains and plans broadcast index join with a Geography-specific execution path; keeps non-broadcast fallback row-by-row. |
| spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/BroadcastIndexJoinExec.scala | Adds Geography refiner (S2 contains) and Geography stream-shape extraction; refactors JTS vs Geography refinement into JoinRefiner. |
| common/src/test/java/org/apache/sedona/common/S2Geography/WkbContainsRoundtripTest.java | Adds regression coverage for WKB round-trip contains correctness. |
| common/src/main/java/org/apache/sedona/common/S2Geography/WkbS2Shape.java | Fixes polygon ring edge counting for open vs closed rings; adds helper to detect closing duplicate coordinates. |
Comments suppressed due to low confidence (1)
common/src/main/java/org/apache/sedona/common/S2Geography/WkbS2Shape.java:155
WkbS2ShapecomputescontainsOriginValueby callingcomputeContainsOrigin()unconditionally, but for an empty polygon WKB (numRings == 0) thevertexOffsets/chainLengthsarrays are length 0 andcomputeContainsOrigin()will throwArrayIndexOutOfBoundsException. Add a fast-path fornumRings == 0(and possibly for rings with < 3 unique vertices) to setcontainsOriginValue = falsewithout callingcomputeContainsOrigin().
case 3: // Polygon
{
this.dim = 2;
int numRings = buf.getInt(payloadOffset);
this.chainStarts = new int[numRings];
this.chainLengths = new int[numRings];
this.vertexOffsets = new int[numRings];
// First pass: count total vertices and compute offsets. Sedona's WKBWriter writes
// open rings (n unique vertices, no closing duplicate); standard WKB writes closed
// rings (n+1 coords with last == first). Detect the closing-duplicate case by
// comparing the first and last (lon, lat) pair so we get the right edge count
// either way: edges = uniqueVertices = closed ? ringCoords - 1 : ringCoords.
int totalVerts = 0;
int edgeCount = 0;
int byteOffset = payloadOffset + 4;
int[] ringCoordCounts = new int[numRings];
int[] ringByteOffsets = new int[numRings];
boolean[] ringClosed = new boolean[numRings];
for (int r = 0; r < numRings; r++) {
int ringCoords = buf.getInt(byteOffset);
ringCoordCounts[r] = ringCoords;
ringByteOffsets[r] = byteOffset + 4;
boolean closed =
ringCoords >= 2 && firstAndLastEqual(buf, ringByteOffsets[r], ringCoords);
ringClosed[r] = closed;
byteOffset += 4 + ringCoords * 16;
int ringEdges = closed ? Math.max(0, ringCoords - 1) : ringCoords;
int storedVerts = closed ? ringCoords : ringCoords;
chainStarts[r] = edgeCount;
chainLengths[r] = ringEdges;
vertexOffsets[r] = totalVerts;
edgeCount += ringEdges;
totalVerts += storedVerts + (closed ? 0 : 1); // append closing duplicate for open rings
}
this.totalEdges = edgeCount;
// Second pass: read all vertices, appending a closing duplicate for open rings so
// the rest of the shape interface (getEdge, getChainEdge, computeContainsOrigin)
// can index `vertexOffsets[r] + (i % chainLengths[r])` uniformly.
this.vertices = new S2Point[totalVerts];
int vi = 0;
for (int r = 0; r < numRings; r++) {
S2Point[] ringVerts = readVertices(buf, ringByteOffsets[r], ringCoordCounts[r]);
System.arraycopy(ringVerts, 0, vertices, vi, ringVerts.length);
vi += ringVerts.length;
if (!ringClosed[r] && ringVerts.length > 0) {
vertices[vi++] = ringVerts[0];
}
}
// Eagerly compute containsOrigin from first ring
this.containsOriginValue = computeContainsOrigin();
break;
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@zhangfengcdt Please fix these simple comments otherwise this PR is read to be merged |
| .selectExpr("pt_id", "ST_GeogFromWKT(wkt, 4326) AS pt_geog") | ||
| } | ||
|
|
||
| private def planUsesBroadcastIndexJoin(df: org.apache.spark.sql.DataFrame): Boolean = |
There was a problem hiding this comment.
I assume left join / right join (broadcast version) will work as well?
There was a problem hiding this comment.
Yes — both LEFT OUTER and RIGHT OUTER broadcast variants are supported. The planner only matches them when the build side aligns with the join type:
LEFT OUTER ⇒ broadcast right
RIGHT OUTER ⇒ broadcast left
Did you read the Contributor Guide?
Is this PR related to a ticket?
[GH-XXX] my subject. Closes #<issue_number>What changes were proposed in this PR?
Adds broadcast spatial-join support for the Geography type with ST_Contains. JoinQueryDetector now plans BroadcastIndexJoinExec for ST_Contains(geog, geog) with a broadcast() hint (previously fell back to a row-by-row BroadcastNestedLoopJoin). The build side is indexed by each Geography's lat/lng bounding rect (full-longitude fallback at the antimeridian) and refined with the existing S2 Functions.contains. Non-broadcast Geography joins still fall through to row-by-row.
Also fixes a Geography ST_Contains correctness bug uncovered during testing.
Out of scope: range/partition joins, distance/KNN joins, and true split-at-±180 indexing for antimeridian polygons.
EXPLAIN on SELECT … JOIN /*+ BROADCAST(b) */ b ON ST_Contains(a.geog, b.geog) now shows BroadcastIndexJoinExec.
How was this patch tested?
New tests:
Regression (Spark 3.4 / Scala 2.12), 363/363 total:
Did this PR include necessary documentation updates?