Skip to content

Commit 091dfc1

Browse files
committed
GEOMESA-3575 FSDS - Switch to lexicoded z columns
* Allows for 'truncate' iceberg transforms to map to z/xz partition schemes
1 parent 2350a7f commit 091dfc1

14 files changed

Lines changed: 144 additions & 192 deletions

File tree

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/FileSystemStorage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -498,8 +498,8 @@ object FileSystemStorage {
498498

499499
private val columnBounds = sft.columnBounds().flatMap { i =>
500500
val binding = sft.getDescriptor(i).getType.getBinding
501-
val encoders = StorageMetadata.TypeRegistry.getAllEncoders.asScala
502-
encoders.find(_.resolves() == binding).orElse(encoders.find(_.resolves().isAssignableFrom(binding))) match {
501+
val alias = StorageMetadata.typeAlias(binding)
502+
StorageMetadata.TypeRegistry.getAllEncoders.asScala.find(_.getAlias == alias) match {
503503
case Some(encoder) => Some(ColumnBoundsBuilder(i, encoder.asInstanceOf[TypeEncoder[AnyRef, String]]))
504504
case None =>
505505
logger.warn(

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/StorageMetadata.scala

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,7 @@ object StorageMetadata {
171171
*/
172172
object Z2Encoder extends TypeEncoder[Point, String] {
173173

174-
val sfc: Z2SFC = Z2SFC
175-
174+
private val sfc = Z2SFC
176175
private val longEncoder = new LongEncoder()
177176
private val factory = new GeometryFactory()
178177

@@ -187,21 +186,27 @@ object StorageMetadata {
187186
longEncoder.encode(sfc.index(value.getX, value.getY))
188187
}
189188

190-
def encode(z: Long): String = longEncoder.encode(z)
191-
192189
override def decode(value: String): Point = {
193190
val (x, y) = sfc.invert(longEncoder.decode(value))
194191
factory.createPoint(new Coordinate(x, y))
195192
}
193+
194+
/**
195+
* Calculate encoded ranges
196+
*
197+
* @param queries a sequence of OR'd windows to cover. Each window is in the form (xmin, ymin, xmax, ymax)
198+
* @param maxRanges rough upper bound on the number of ranges to return
199+
*/
200+
def ranges(queries: Seq[(Double, Double, Double, Double)], maxRanges: Option[Int] = None): Seq[(String, String)] =
201+
sfc.ranges(queries, maxRanges = maxRanges).map(r => longEncoder.encode(r.lower) -> longEncoder.encode(r.upper))
196202
}
197203

198204
/**
199205
* Encoder for points
200206
*/
201207
object XZ2Encoder extends TypeEncoder[Geometry, String] {
202208

203-
val sfc: XZ2SFC = XZ2SFC
204-
209+
private val sfc = XZ2SFC
205210
private val longEncoder = new LongEncoder()
206211
private val factory = new GeometryFactory()
207212

@@ -220,8 +225,6 @@ object StorageMetadata {
220225
longEncoder.encode(sfc.index(env.getMinX, env.getMinY, env.getMaxX, env.getMaxY))
221226
}
222227

223-
def encode(z: Long): String = longEncoder.encode(z)
224-
225228
override def decode(value: String): Geometry = {
226229
val (xmin, ymin, xmax, ymax) = sfc.invert(longEncoder.decode(value))
227230
val ring = Array(
@@ -233,5 +236,14 @@ object StorageMetadata {
233236
)
234237
factory.createPolygon(ring)
235238
}
239+
240+
/**
241+
* Calculate encoded ranges
242+
*
243+
* @param queries a sequence of OR'd windows to cover. Each window is in the form (xmin, ymin, xmax, ymax)
244+
* @param maxRanges a rough upper limit on the number of ranges to generate
245+
*/
246+
def ranges(queries: Seq[(Double, Double, Double, Double)], maxRanges: Option[Int] = None): Seq[(String, String)] =
247+
sfc.ranges(queries, maxRanges).map(r => longEncoder.encode(r.lower) -> longEncoder.encode(r.upper))
236248
}
237249
}

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/metadata/SchemeFilterExtraction.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,16 +68,13 @@ trait SchemeFilterExtraction extends AnyLogging {
6868
if (classOf[Geometry].isAssignableFrom(d.getType.getBinding)) {
6969
val ors = FilterHelper.extractGeometries(filter, sft.getDescriptor(i).getLocalName).values.flatMap { g =>
7070
val env = g.getEnvelopeInternal
71-
if (d.getType.getBinding == classOf[Point]) {
72-
// TODO make max ranges configurable
73-
Z2Encoder.sfc.ranges(Seq((env.getMinX, env.getMinY, env.getMaxX, env.getMaxY)), maxRanges = Some(8)).map { range =>
74-
ColumnBound(Z2Encoder.encode(range.lower), Z2Encoder.encode(range.upper))
75-
}
71+
// TODO make max ranges configurable
72+
val ranges = if (d.getType.getBinding == classOf[Point]) {
73+
Z2Encoder.ranges(Seq((env.getMinX, env.getMinY, env.getMaxX, env.getMaxY)), Some(8))
7674
} else {
77-
XZ2Encoder.sfc.ranges((env.getMinX, env.getMinY, env.getMaxX, env.getMaxY), Some(8)).map { range =>
78-
ColumnBound(XZ2Encoder.encode(range.lower), XZ2Encoder.encode(range.upper))
79-
}
75+
XZ2Encoder.ranges(Seq((env.getMinX, env.getMinY, env.getMaxX, env.getMaxY)), Some(8))
8076
}
77+
ranges.map { case (lower, upper) => ColumnBound(lower, upper) }
8178
}
8279
if (ors.isEmpty) { None } else { Some(ColumnOr(i, ors)) }
8380
} else {

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet-io/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/geometry/ZValues.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88

99
package org.locationtech.geomesa.fs.storage.parquet.io.geometry
1010

11-
import org.apache.iceberg.types.Types.{LongType, NestedField}
11+
import org.apache.iceberg.types.Types.{LongType, NestedField, StringType}
1212
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
13-
import org.apache.parquet.schema.{PrimitiveType, Types}
13+
import org.apache.parquet.schema.{LogicalTypeAnnotation, PrimitiveType, Types}
1414
import org.geotools.api.feature.simple.SimpleFeatureType
1515
import org.locationtech.geomesa.fs.storage.parquet.io.geometry.ZValues.ZValueField
1616
import org.locationtech.geomesa.utils.text.StringSerialization.alphaNumericSafeString
@@ -104,7 +104,7 @@ object ZValues {
104104
* @return
105105
*/
106106
def schema(zValue: String, fieldIds: AtomicInteger): PrimitiveType =
107-
Types.optional(PrimitiveTypeName.INT64).id(fieldIds.getAndIncrement()).named(zValue)
107+
Types.optional(PrimitiveTypeName.BINARY).id(fieldIds.getAndIncrement()).as(LogicalTypeAnnotation.stringType()).named(zValue)
108108

109109
/**
110110
* The iceberg schema for a z-value field
@@ -113,6 +113,6 @@ object ZValues {
113113
* @return
114114
*/
115115
def icebergSchema(zValue: String, fieldIds: AtomicInteger): NestedField =
116-
NestedField.optional(zValue).withId(fieldIds.getAndIncrement()).ofType(LongType.get()).build()
116+
NestedField.optional(zValue).withId(fieldIds.getAndIncrement()).ofType(StringType.get()).build()
117117
}
118118
}

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet-io/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/rw/SimpleFeatureReadSupport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ object SimpleFeatureReadSupport {
120120
// note: bboxes are excluded from our read schema, they're only present to conform with geoparquet
121121
// note: zValues have to be present for filtering, but we don't do anything with them on read
122122
if (schema.zValues.get(descriptor.getLocalName).isDefined) {
123-
builder += new LongConverter()
123+
builder += new StringConverter()
124124
}
125125
i += 1
126126
}

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet-io/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/rw/SimpleFeatureWriteSupport.scala

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import org.apache.parquet.hadoop.api.WriteSupport.{FinalizedWriteContext, WriteC
1515
import org.apache.parquet.io.api.{Binary, RecordConsumer}
1616
import org.geotools.api.feature.`type`.AttributeDescriptor
1717
import org.geotools.api.feature.simple.SimpleFeature
18-
import org.locationtech.geomesa.curve.{XZ2SFC, XZSFC, Z2SFC}
18+
import org.locationtech.geomesa.fs.storage.core.StorageMetadata.{XZ2Encoder, Z2Encoder}
1919
import org.locationtech.geomesa.fs.storage.parquet.io.GeoParquetMetadata.GeoParquetObserver
2020
import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema
2121
import org.locationtech.geomesa.fs.storage.parquet.io.geometry.BoundingBoxes.BoundingBoxField
@@ -81,8 +81,6 @@ object SimpleFeatureWriteSupport {
8181

8282
import StringSerialization.alphaNumericSafeString
8383

84-
private val xz = XZ2SFC(XZSFC.DefaultPrecision)
85-
8684
private class SimpleFeatureWriter(schema: SimpleFeatureParquetSchema) {
8785

8886
private val fids = new FidWriter(0) // ID is the 1st field
@@ -295,7 +293,7 @@ object SimpleFeatureWriteSupport {
295293
private abstract class GeometryWriter[T <: Geometry](name: String, index: Int, bbox: Option[String], zValue: Option[String])
296294
extends AttributeWriter[T](name, index, 1 + bbox.size + zValue.size) {
297295

298-
protected def z(geom: T): Long
296+
protected def z(geom: T): String
299297

300298
/**
301299
* Writes a value to the current record
@@ -329,7 +327,7 @@ object SimpleFeatureWriteSupport {
329327
}
330328
zValue.foreach { name =>
331329
consumer.startField(name, index + 1 + bbox.size)
332-
consumer.addLong(z(value))
330+
consumer.addBinary(Binary.fromString(z(value)))
333331
consumer.endField(name, index + 1 + bbox.size)
334332
}
335333
}
@@ -338,15 +336,12 @@ object SimpleFeatureWriteSupport {
338336

339337
private abstract class GeometryZWriter(name: String, index: Int, bbox: Option[String], zValue: Option[String])
340338
extends GeometryWriter[Point](name, index, bbox, zValue) {
341-
override protected def z(geom: Point): Long = Z2SFC.index(geom.getX, geom.getY)
339+
override protected def z(geom: Point): String = Z2Encoder.encode(geom)
342340
}
343341

344342
private abstract class GeometryXZWriter[T <: Geometry](name: String, index: Int, bbox: Option[String], zValue: Option[String])
345-
extends GeometryWriter[T](name, index, bbox, zValue) {
346-
override protected def z(geom: T): Long = {
347-
val env = geom.getEnvelopeInternal
348-
xz.index(env.getMinX, env.getMinY, env.getMaxX, env.getMaxY)
349-
}
343+
extends GeometryWriter[T](name, index, bbox, zValue) {
344+
override protected def z(geom: T): String = XZ2Encoder.encode(geom)
350345
}
351346

352347
private class PointWriter(name: String, index: Int, bbox: Option[String], zValue: Option[String])

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/FilterConverter.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,18 @@ object FilterConverter {
7979
val (field, ranges) =
8080
if (typed == ObjectType.POINT) {
8181
val field = ZValueField.z2(col, encoded = true).zValue
82-
val ranges = Z2Encoder.sfc.ranges(bounds, 64, Some(8)) // TODO make configurable
82+
// TODO make max ranges configurable
83+
val ranges = Z2Encoder.ranges(bounds, Some(8))
8384
(field, ranges)
8485
} else {
8586
val field = ZValueField.xz2(col, encoded = true).zValue
86-
val ranges = XZ2Encoder.sfc.ranges(bounds, Some(8))
87+
val ranges = XZ2Encoder.ranges(bounds, Some(8))
8788
(field, ranges)
8889
}
89-
90-
val zcol = FilterApi.longColumn(field)
91-
val filters = ranges.map(r => FilterApi.and(FilterApi.gtEq(zcol, Long.box(r.lower)), FilterApi.ltEq(zcol, Long.box(r.upper))))
90+
val zcol = FilterApi.binaryColumn(field)
91+
val filters = ranges.map { case (lower, upper) =>
92+
FilterApi.and(FilterApi.gtEq(zcol, Binary.fromString(lower)), FilterApi.ltEq(zcol, Binary.fromString(upper)))
93+
}
9294
filters.reduce(FilterApi.or)
9395
}
9496
}

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/iceberg/IcebergMapper.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,18 @@
99
package org.locationtech.geomesa.fs.storage.parquet.iceberg
1010

1111
import com.typesafe.scalalogging.LazyLogging
12-
import org.apache.iceberg.parquet.ParquetUtil
1312
import org.apache.iceberg._
13+
import org.apache.iceberg.parquet.ParquetUtil
1414
import org.apache.parquet.ParquetReadOptions
1515
import org.apache.parquet.hadoop.ParquetFileReader
1616
import org.apache.parquet.hadoop.metadata.ParquetMetadata
1717
import org.calrissian.mango.types.{LexiTypeEncoders, TypeEncoder}
1818
import org.locationtech.geomesa.fs.storage.core.StorageMetadata.StorageFile
1919
import org.locationtech.geomesa.fs.storage.core.schemes.AttributeScheme.{IntegralBucketing, WidthBucketing}
20-
import org.locationtech.geomesa.fs.storage.core.schemes.{AttributeScheme, DateTimeScheme, HashScheme}
20+
import org.locationtech.geomesa.fs.storage.core.schemes._
2121
import org.locationtech.geomesa.fs.storage.core.{FileSystemStorage, Partition, PartitionScheme}
2222
import org.locationtech.geomesa.fs.storage.parquet.iceberg.IcebergMapper.SchemeMapper
23+
import org.locationtech.geomesa.fs.storage.parquet.io.geometry.ZValues.ZValueField
2324
import org.locationtech.geomesa.fs.storage.parquet.io.{ParquetFileSystemReader, SimpleFeatureParquetSchema}
2425
import org.locationtech.geomesa.utils.io.WithClose
2526

@@ -144,6 +145,10 @@ object IcebergMapper {
144145
case s: DateTimeScheme if s.step == 1 && s.unit == ChronoUnit.DAYS => Some(DayMapper(s))
145146
case s: DateTimeScheme if s.step == 1 && s.unit == ChronoUnit.MONTHS => Some(MonthMapper(s))
146147
case s: DateTimeScheme if s.step == 1 && s.unit == ChronoUnit.YEARS => Some(YearMapper(s))
148+
149+
case s: Z2Scheme if s.bits % 4 == 0 => Some(Z2Mapper(s))
150+
case s: XZ2Scheme if s.bits % 4 == 0 => Some(XZ2Mapper(s))
151+
147152
case s: HashScheme[_] => Some(HashMapper(s))
148153

149154
case s: AttributeScheme[_] if classOf[String].isAssignableFrom(binding) =>
@@ -188,6 +193,18 @@ object IcebergMapper {
188193
override def toIceberg(key: String): String = LexiTypeEncoders.integerEncoder().decode(key).toString
189194
}
190195

196+
private case class Z2Mapper(scheme: Z2Scheme) extends SchemeMapper {
197+
override def spec(b: PartitionSpec.Builder): PartitionSpec.Builder =
198+
b.truncate(ZValueField.z2(scheme.attribute).zValue, scheme.bits / 4)
199+
override def toIceberg(partitionValue: String): String = partitionValue
200+
}
201+
202+
private case class XZ2Mapper(scheme: XZ2Scheme) extends SchemeMapper {
203+
override def spec(b: PartitionSpec.Builder): PartitionSpec.Builder =
204+
b.truncate(ZValueField.xz2(scheme.attribute).zValue, scheme.bits / 4)
205+
override def toIceberg(partitionValue: String): String = partitionValue
206+
}
207+
191208
private case class HashMapper(scheme: HashScheme[_]) extends SchemeMapper {
192209
override def spec(b: PartitionSpec.Builder): PartitionSpec.Builder = b.bucket(scheme.attribute, scheme.buckets)
193210
override def toIceberg(key: String): String = key

0 commit comments

Comments
 (0)