diff --git a/common/src/main/java/org/apache/sedona/common/geometryObjects/Box2D.java b/common/src/main/java/org/apache/sedona/common/geometryObjects/Box2D.java new file mode 100644 index 00000000000..b691e6d7c92 --- /dev/null +++ b/common/src/main/java/org/apache/sedona/common/geometryObjects/Box2D.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.common.geometryObjects; + +import java.io.Serializable; +import java.util.Objects; +import org.locationtech.jts.geom.Envelope; +import org.locationtech.jts.geom.Geometry; + +/** + * Planar 2D bounding box with min/max X and Y. Always a valid finite bbox; absence of a bbox (e.g. + * bbox of an empty geometry, extent over zero rows) is represented by SQL NULL at the column level + * rather than by an in-band sentinel. This matches PostGIS behavior and leaves {@code xmin > xmax} + * free for a future antimeridian-wraparound semantics on geography bboxes (cf. sedona-db's {@code + * WraparoundInterval}). + */ +public final class Box2D implements Serializable { + + private final double xmin; + private final double ymin; + private final double xmax; + private final double ymax; + + public Box2D(double xmin, double ymin, double xmax, double ymax) { + this.xmin = xmin; + this.ymin = ymin; + this.xmax = xmax; + this.ymax = ymax; + } + + /** Returns the bbox of {@code geometry}, or {@code null} for null/empty geometry. */ + public static Box2D fromGeometry(Geometry geometry) { + if (geometry == null || geometry.isEmpty()) { + return null; + } + Envelope env = geometry.getEnvelopeInternal(); + return new Box2D(env.getMinX(), env.getMinY(), env.getMaxX(), env.getMaxY()); + } + + public double getXMin() { + return xmin; + } + + public double getYMin() { + return ymin; + } + + public double getXMax() { + return xmax; + } + + public double getYMax() { + return ymax; + } + + /** + * Returns the union of {@code this} and {@code other}. {@code other == null} is treated as a + * no-op, returning {@code this}, so callers can fold over a stream that may include nulls. + */ + public Box2D expandToInclude(Box2D other) { + if (other == null) { + return this; + } + return new Box2D( + Math.min(xmin, other.xmin), + Math.min(ymin, other.ymin), + Math.max(xmax, other.xmax), + Math.max(ymax, other.ymax)); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Box2D)) return false; + Box2D other = (Box2D) o; + return Double.compare(xmin, other.xmin) == 0 + && Double.compare(ymin, other.ymin) == 0 + && Double.compare(xmax, other.xmax) == 0 + && Double.compare(ymax, other.ymax) == 0; + } + + @Override + public int hashCode() { + return Objects.hash(xmin, ymin, xmax, ymax); + } + + @Override + public String toString() { + return "BOX(" + xmin + " " + ymin + ", " + xmax + " " + ymax + ")"; + } +} diff --git a/python/sedona/spark/__init__.py b/python/sedona/spark/__init__.py index 707aa24c95e..70abbaf6907 100644 --- a/python/sedona/spark/__init__.py +++ b/python/sedona/spark/__init__.py @@ -55,7 +55,12 @@ from sedona.spark.sql.st_constructors import * from sedona.spark.sql.st_functions import * from sedona.spark.sql.st_predicates import * -from sedona.spark.sql.types import GeometryType, GeographyType, RasterType +from sedona.spark.sql.types import ( + Box2DType, + GeometryType, + GeographyType, + RasterType, +) from sedona.spark.stac import Client from sedona.spark.stac.collection_client import CollectionClient from sedona.spark.stats.clustering.dbscan import dbscan diff --git a/python/sedona/spark/core/geom/box2d.py b/python/sedona/spark/core/geom/box2d.py new file mode 100644 index 00000000000..1a1748e5385 --- /dev/null +++ b/python/sedona/spark/core/geom/box2d.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +class Box2D: + """Planar 2D bounding box. Always a valid finite bbox; absence of a bbox + is represented by ``None`` (SQL NULL) at the column level rather than by an + in-band sentinel. This matches PostGIS behavior and leaves ``xmin > xmax`` + free for a future antimeridian-wraparound semantics on geography bboxes.""" + + __slots__ = ("xmin", "ymin", "xmax", "ymax") + + def __init__(self, xmin: float, ymin: float, xmax: float, ymax: float): + self.xmin = float(xmin) + self.ymin = float(ymin) + self.xmax = float(xmax) + self.ymax = float(ymax) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Box2D): + return NotImplemented + return ( + self.xmin == other.xmin + and self.ymin == other.ymin + and self.xmax == other.xmax + and self.ymax == other.ymax + ) + + def __hash__(self) -> int: + return hash((self.xmin, self.ymin, self.xmax, self.ymax)) + + def __repr__(self) -> str: + return f"Box2D({self.xmin}, {self.ymin}, {self.xmax}, {self.ymax})" diff --git a/python/sedona/spark/sql/__init__.py b/python/sedona/spark/sql/__init__.py index f193b1ce5e9..ef3cf7df006 100644 --- a/python/sedona/spark/sql/__init__.py +++ b/python/sedona/spark/sql/__init__.py @@ -30,7 +30,12 @@ from sedona.spark.sql.st_constructors import * from sedona.spark.sql.st_functions import * from sedona.spark.sql.st_predicates import * -from sedona.spark.sql.types import GeometryType, GeographyType, RasterType +from sedona.spark.sql.types import ( + Box2DType, + GeometryType, + GeographyType, + RasterType, +) __all__ = ( [ diff --git a/python/sedona/spark/sql/types.py b/python/sedona/spark/sql/types.py index 63267a1d5cd..e5460673b68 100644 --- a/python/sedona/spark/sql/types.py +++ b/python/sedona/spark/sql/types.py @@ -15,7 +15,13 @@ # specific language governing permissions and limitations # under the License. -from pyspark.sql.types import BinaryType, UserDefinedType +from pyspark.sql.types import ( + BinaryType, + DoubleType, + StructField, + StructType, + UserDefinedType, +) # Only support RasterType when rasterio is installed try: @@ -34,6 +40,7 @@ from sedona.spark.utils import geometry_serde from sedona.spark.core.geom.geography import Geography +from sedona.spark.core.geom.box2d import Box2D class GeometryType(UserDefinedType): @@ -86,6 +93,37 @@ def scalaUDT(cls): return "org.apache.spark.sql.sedona_sql.UDT.GeographyUDT" +class Box2DType(UserDefinedType): + + @classmethod + def sqlType(cls): + return StructType( + [ + StructField("xmin", DoubleType(), nullable=False), + StructField("ymin", DoubleType(), nullable=False), + StructField("xmax", DoubleType(), nullable=False), + StructField("ymax", DoubleType(), nullable=False), + ] + ) + + def serialize(self, obj): + return (obj.xmin, obj.ymin, obj.xmax, obj.ymax) + + def deserialize(self, datum): + return Box2D(datum[0], datum[1], datum[2], datum[3]) + + @classmethod + def module(cls): + return "sedona.spark.sql.types" + + def needConversion(self): + return True + + @classmethod + def scalaUDT(cls): + return "org.apache.spark.sql.sedona_sql.UDT.Box2DUDT" + + class RasterType(UserDefinedType): @classmethod diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/Box2DUDT.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/Box2DUDT.scala new file mode 100644 index 00000000000..c771dc0999b --- /dev/null +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/Box2DUDT.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.sedona_sql.UDT + +import org.apache.sedona.common.geometryObjects.Box2D +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ + +/** + * UDT for [[Box2D]]. Stored as a Spark struct of four non-nullable doubles (`xmin`, `ymin`, + * `xmax`, `ymax`) so values round-trip natively to Parquet and align with GeoParquet 1.1 bbox + * covering columns. + */ +class Box2DUDT extends UserDefinedType[Box2D] { + + override def sqlType: DataType = StructType( + Seq( + StructField("xmin", DoubleType, nullable = false), + StructField("ymin", DoubleType, nullable = false), + StructField("xmax", DoubleType, nullable = false), + StructField("ymax", DoubleType, nullable = false))) + + override def pyUDT: String = "sedona.spark.sql.types.Box2DType" + + override def userClass: Class[Box2D] = classOf[Box2D] + + override def serialize(obj: Box2D): InternalRow = { + val row = new GenericInternalRow(4) + row.setDouble(0, obj.getXMin) + row.setDouble(1, obj.getYMin) + row.setDouble(2, obj.getXMax) + row.setDouble(3, obj.getYMax) + row + } + + override def deserialize(datum: Any): Box2D = datum match { + case row: InternalRow => + new Box2D(row.getDouble(0), row.getDouble(1), row.getDouble(2), row.getDouble(3)) + } + + override private[sql] def jsonValue: JValue = { + super.jsonValue mapField { + case ("class", _) => "class" -> this.getClass.getName.stripSuffix("$") + case other: Any => other + } + } + + override def equals(other: Any): Boolean = other match { + case _: UserDefinedType[_] => other.isInstanceOf[Box2DUDT] + case _ => false + } + + override def hashCode(): Int = userClass.hashCode() + + override def toString: String = "Box2DUDT" +} + +case object Box2DUDT + extends org.apache.spark.sql.sedona_sql.UDT.Box2DUDT + with scala.Serializable { + def apply(): Box2DUDT = new Box2DUDT() +} diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/UdtRegistratorWrapper.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/UdtRegistratorWrapper.scala index 9b0179fce6e..cf9a44aa971 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/UdtRegistratorWrapper.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/UdtRegistratorWrapper.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.sedona_sql.UDT import org.apache.sedona.common.S2Geography.Geography +import org.apache.sedona.common.geometryObjects.Box2D import org.apache.spark.sql.types.UDTRegistration import org.locationtech.jts.geom.Geometry import org.locationtech.jts.index.SpatialIndex @@ -28,6 +29,7 @@ object UdtRegistratorWrapper { def registerAll(): Unit = { registerIfNotExists(classOf[Geometry].getName, classOf[GeometryUDT].getName) registerIfNotExists(classOf[Geography].getName, classOf[GeographyUDT].getName) + registerIfNotExists(classOf[Box2D].getName, classOf[Box2DUDT].getName) registerIfNotExists(classOf[SpatialIndex].getName, classOf[IndexUDT].getName) } diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/Box2DUDTSuite.scala b/spark/common/src/test/scala/org/apache/sedona/sql/Box2DUDTSuite.scala new file mode 100644 index 00000000000..0779d675ff5 --- /dev/null +++ b/spark/common/src/test/scala/org/apache/sedona/sql/Box2DUDTSuite.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.sql + +import org.apache.sedona.common.geometryObjects.Box2D +import org.apache.spark.sql.Row +import org.apache.spark.sql.sedona_sql.UDT.Box2DUDT +import org.apache.spark.sql.types.{DataType, IntegerType, StructType, UDTRegistration} +import org.junit.rules.TemporaryFolder +import org.scalatest.BeforeAndAfter + +class Box2DUDTSuite extends TestBaseScala with BeforeAndAfter { + + val tempFolder: TemporaryFolder = new TemporaryFolder + + before { + tempFolder.create() + } + + after { + tempFolder.delete() + } + + describe("Box2DUDT") { + it("registers Box2D via UdtRegistratorWrapper") { + assert(UDTRegistration.exists(classOf[Box2D].getName)) + } + + it("renders and parses a JSON schema round-trip") { + val schema = new StructType().add("box", new Box2DUDT()) + assert(DataType.fromJson(schema.json).asInstanceOf[StructType] == schema) + } + + it("serializes and deserializes a Box2D round-trip") { + val udt = new Box2DUDT() + val box = new Box2D(-10.0, -20.0, 30.0, 40.0) + assert(udt.deserialize(udt.serialize(box)) == box) + } + + it("case object equals a fresh instance") { + val instance = new Box2DUDT() + assert(Box2DUDT == Box2DUDT) + assert(instance.equals(instance)) + assert(instance.equals(Box2DUDT)) + assert(Box2DUDT.equals(instance)) + assert(instance.hashCode() == Box2DUDT.hashCode()) + } + + it("writes and reads a Box2D column via Parquet") { + val box = new Box2D(1.0, 2.0, 3.0, 4.0) + val schema = new StructType() + .add("id", IntegerType, nullable = false) + .add("bbox", new Box2DUDT(), nullable = false) + val rdd = sparkSession.sparkContext.parallelize(Seq(Row(1, box))) + val df = sparkSession.createDataFrame(rdd, schema) + + val path = tempFolder.getRoot.getPath + "/box2d-parquet" + df.write.parquet(path) + + val read = sparkSession.read.parquet(path) + val row = read.collect()(0) + assert(row.getAs[Int]("id") == 1) + assert(row.getAs[Box2D]("bbox") == box) + } + } +}