Skip to content

Commit 67edb98

Browse files
authored
GEOMESA-3577 FSDS - Add z-value column for partition pruning (#3543)
1 parent 557ab2c commit 67edb98

21 files changed

Lines changed: 400 additions & 420 deletions

File tree

docs/user/filesystem/configuration.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ are:
7676
* ``GeoParquetNative`` - This schema uses `GeoParquet 1.1.0 <https://geoparquet.org/releases/v1.1.0/>`__ with geometries
7777
encoded "natively". This format doesn't require special libraries to read, but isn't as widely supported as WKB.
7878

79+
geomesa.parquet.z-value-column
80+
++++++++++++++++++++++++++++++
81+
82+
This property can be used to skip writing an additional Z-value column for geometry-type columns when using Parquet, by setting
83+
it to ``false``. By default, each geometry will include an int-64-type column that includes the index value from a Z2 or XZ2
84+
space-filling-curve (as appropriate). This can be used to accelerate queries through partition pruning.
85+
7986
AWS S3 Configuration
8087
--------------------
8188

geomesa-convert/geomesa-convert-parquet/src/main/scala/org/locationtech/geomesa/convert/parquet/AvroReadSupport.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import org.apache.parquet.schema.Type.Repetition
2121
import org.apache.parquet.schema._
2222
import org.locationtech.geomesa.convert.parquet.AvroReadSupport.AvroRecordMaterializer
2323
import org.locationtech.geomesa.curve.BinnedTime
24-
import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureReadSupport._
25-
import org.locationtech.geomesa.fs.storage.parquet.io.{SimpleFeatureParquetSchema, SimpleFeatureReadSupport}
24+
import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureParquetSchema
25+
import org.locationtech.geomesa.fs.storage.parquet.io.rw.SimpleFeatureReadSupport
26+
import org.locationtech.geomesa.fs.storage.parquet.io.rw.SimpleFeatureReadSupport._
2627

2728
import java.util.{Collections, Date}
2829

geomesa-convert/geomesa-convert-parquet/src/main/scala/org/locationtech/geomesa/convert/parquet/ParquetConverterFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.locationtech.geomesa.convert2.TypeInference.{FunctionTransform, Infer
2727
import org.locationtech.geomesa.convert2.transforms.Expression
2828
import org.locationtech.geomesa.convert2.{AbstractConverterFactory, TypeInference}
2929
import org.locationtech.geomesa.fs.storage.parquet.io.GeoParquetMetadata.{ColumnMetadata, GeoParquetColumnEncoding, GeoParquetColumnType}
30-
import org.locationtech.geomesa.fs.storage.parquet.io.GeometrySchema.GeometryEncoding.GeoParquetNative
30+
import org.locationtech.geomesa.fs.storage.parquet.io.geometry.GeometrySchema.GeometryEncoding.GeoParquetNative
3131
import org.locationtech.geomesa.fs.storage.parquet.io.{GeoParquetMetadata, SimpleFeatureParquetSchema}
3232
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
3333
import org.locationtech.geomesa.utils.geotools.{ObjectType, SimpleFeatureTypes}

geomesa-convert/geomesa-convert-parquet/src/main/scala/org/locationtech/geomesa/convert/parquet/ParquetFunctionFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class ParquetFunctionFactory extends TransformerFunctionFactory {
6060
abstract class ParquetGeometryFn[T <: Geometry, U](name: String, path: Option[AvroPath])
6161
extends NamedTransformerFunction(Seq(name), pure = true) {
6262

63-
import org.locationtech.geomesa.fs.storage.parquet.io.GeometrySchema.{GeometryColumnX, GeometryColumnY}
63+
import org.locationtech.geomesa.fs.storage.parquet.io.geometry.GeometrySchema.{GeometryColumnX, GeometryColumnY}
6464

6565
override def apply(args: Array[AnyRef]): AnyRef = {
6666
val attribute = path match {

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-jobs/src/main/scala/org/locationtech/geomesa/fs/storage/jobs/parquet/ParquetSimpleFeatureInputFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import org.geotools.api.filter.Filter
1919
import org.locationtech.geomesa.features.TransformSimpleFeature
2020
import org.locationtech.geomesa.fs.storage.jobs.StorageConfiguration
2121
import org.locationtech.geomesa.fs.storage.jobs.parquet.ParquetSimpleFeatureInputFormat.{ParquetSimpleFeatureInputFormatBase, ParquetSimpleFeatureRecordReaderBase, ParquetSimpleFeatureTransformRecordReaderBase}
22-
import org.locationtech.geomesa.fs.storage.parquet.io.SimpleFeatureReadSupport
22+
import org.locationtech.geomesa.fs.storage.parquet.io.rw.SimpleFeatureReadSupport
2323
import org.locationtech.geomesa.fs.storage.parquet.{ReadFilter, ReadSchema}
2424
import org.locationtech.geomesa.index.planning.QueryRunner
2525

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-jobs/src/main/scala/org/locationtech/geomesa/fs/storage/jobs/parquet/ParquetStorageConfiguration.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName
1414
import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetOutputFormat}
1515
import org.geotools.api.feature.simple.SimpleFeatureType
1616
import org.locationtech.geomesa.fs.storage.jobs.StorageConfiguration
17-
import org.locationtech.geomesa.fs.storage.parquet.io.{SimpleFeatureReadSupport, SimpleFeatureWriteSupport}
17+
import org.locationtech.geomesa.fs.storage.parquet.io.rw.{SimpleFeatureReadSupport, SimpleFeatureWriteSupport}
1818

1919
trait ParquetStorageConfiguration extends StorageConfiguration with LazyLogging {
2020
override def configureOutput(sft: SimpleFeatureType, job: Job): Unit = {

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet-io/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/GeoParquetMetadata.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import org.geotools.api.feature.`type`.GeometryDescriptor
1313
import org.geotools.api.feature.simple.SimpleFeature
1414
import org.locationtech.geomesa.fs.storage.core.observer.FileSystemObserver
1515
import org.locationtech.geomesa.fs.storage.parquet.io.GeoParquetMetadata.ColumnMetadata
16-
import org.locationtech.geomesa.fs.storage.parquet.io.GeometrySchema.GeometryEncoding
16+
import org.locationtech.geomesa.fs.storage.parquet.io.geometry.GeometrySchema.GeometryEncoding
1717
import org.locationtech.geomesa.utils.geotools.ObjectType
1818
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
1919
import org.locationtech.geomesa.utils.text.StringSerialization

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet-io/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/ParquetFileSystemReader.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import org.locationtech.geomesa.features.TransformSimpleFeature
1919
import org.locationtech.geomesa.fs.storage.core.FileSystemContext
2020
import org.locationtech.geomesa.fs.storage.core.FileSystemStorage.FileSystemPathReader
2121
import org.locationtech.geomesa.fs.storage.core.fs.{LocalObjectStore, ObjectStore, S3ObjectStore}
22+
import org.locationtech.geomesa.fs.storage.parquet.io.rw.SimpleFeatureReadSupport
2223
import org.locationtech.geomesa.fs.storage.parquet.io.s3.S3InputFile
2324
import org.locationtech.geomesa.utils.collection.CloseableIterator
2425
import org.locationtech.geomesa.utils.geotools.Transform.Transforms

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet-io/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/ParquetFileSystemWriter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.locationtech.geomesa.fs.storage.core.FileSystemStorage.FileSystemWrit
2121
import org.locationtech.geomesa.fs.storage.core.fs.{LocalObjectStore, ObjectStore, S3ObjectStore}
2222
import org.locationtech.geomesa.fs.storage.core.observer.FileSystemObserver
2323
import org.locationtech.geomesa.fs.storage.core.observer.FileSystemObserverFactory.NoOpObserver
24+
import org.locationtech.geomesa.fs.storage.parquet.io.rw.SimpleFeatureWriteSupport
2425
import org.locationtech.geomesa.fs.storage.parquet.io.s3.S3OutputFile
2526
import org.locationtech.geomesa.utils.io.CloseQuietly
2627

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet-io/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/SimpleFeatureParquetSchema.scala

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,20 @@
1010
package org.locationtech.geomesa.fs.storage.parquet.io
1111

1212
import com.typesafe.scalalogging.LazyLogging
13-
import org.apache.hadoop.conf.Configuration
1413
import org.apache.iceberg.Schema
1514
import org.apache.iceberg.types.Types._
16-
import org.apache.parquet.conf.{HadoopParquetConfiguration, ParquetConfiguration, PlainParquetConfiguration}
15+
import org.apache.parquet.conf.{ParquetConfiguration, PlainParquetConfiguration}
1716
import org.apache.parquet.hadoop.api.InitContext
1817
import org.apache.parquet.hadoop.metadata.FileMetaData
1918
import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit
2019
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
2120
import org.apache.parquet.schema.Type.Repetition
2221
import org.apache.parquet.schema._
2322
import org.geotools.api.feature.simple.SimpleFeatureType
24-
import org.locationtech.geomesa.fs.storage.parquet.io.GeometrySchema.{BoundingBoxField, BoundingBoxes, GeometryEncoding}
23+
import org.locationtech.geomesa.fs.storage.parquet.io.geometry.BoundingBoxes.BoundingBoxField
24+
import org.locationtech.geomesa.fs.storage.parquet.io.geometry.GeometrySchema.GeometryEncoding
25+
import org.locationtech.geomesa.fs.storage.parquet.io.geometry.ZValues.ZValueField
26+
import org.locationtech.geomesa.fs.storage.parquet.io.geometry.{BoundingBoxes, GeometrySchema, ZValues}
2527
import org.locationtech.geomesa.utils.geotools.ObjectType.ObjectType
2628
import org.locationtech.geomesa.utils.geotools.{ObjectType, SimpleFeatureTypes}
2729
import org.locationtech.geomesa.utils.text.StringSerialization
@@ -35,6 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger
3537
* @param encodings type encoding
3638
* @param hasVisibilities whether the schema encodes visibilities, or not
3739
* @param bboxes fields with bounding boxes
40+
* @param zValues fields with z-value columns
3841
* @param metadata file metadata
3942
* @param schema parquet message schema
4043
*/
@@ -43,6 +46,7 @@ case class SimpleFeatureParquetSchema(
4346
encodings: Encodings,
4447
hasVisibilities: Boolean,
4548
bboxes: BoundingBoxes,
49+
zValues: ZValues,
4650
metadata: java.util.Map[String, String],
4751
schema: MessageType) {
4852

@@ -71,6 +75,7 @@ object SimpleFeatureParquetSchema extends LazyLogging {
7175
val SftReadSpecKey = "geomesa.fs.sft.read.spec"
7276
val GeometryEncodingKey = "geomesa.parquet.geometries"
7377
val BBoxEncodingKey = "geomesa.parquet.bounding-boxes"
78+
val ZValueColumKey = "geomesa.parquet.z-value-column"
7479
val VisibilityEncodingKey = "geomesa.fs.visibilities"
7580
val PartitionKey = "geomesa.fs.partition"
7681

@@ -148,6 +153,7 @@ object SimpleFeatureParquetSchema extends LazyLogging {
148153
// only include bboxes if they help with push-down filters
149154
val bboxes =
150155
if (Option(conf.get(BBoxEncodingKey)).forall(_.toBoolean)) { BoundingBoxes(sft, geometries) } else { BoundingBoxes(Seq.empty) }
156+
val zCols = if (Option(conf.get(ZValueColumKey)).forall(_.toBoolean)) { ZValues(sft) } else { ZValues(Seq.empty) }
151157
val visibilities =
152158
Option(sft.getUserData.get(VisibilityEncodingKey)).orElse(Option(conf.get(VisibilityEncodingKey))).forall(_.toString.toBoolean)
153159
val metadata = Map(
@@ -156,8 +162,8 @@ object SimpleFeatureParquetSchema extends LazyLogging {
156162
GeometryEncodingKey -> geometries.toString,
157163
) ++ Option(conf.get(PartitionKey)).map(PartitionKey -> _)
158164
val encodings = Encodings(geometries)
159-
val messageType = schema(sft, None, encodings, bboxes, visibilities)
160-
SimpleFeatureParquetSchema(sft, encodings, visibilities, bboxes, metadata.asJava, messageType)
165+
val messageType = schema(sft, encodings, bboxes, zCols, visibilities).toMessageType(sft)
166+
SimpleFeatureParquetSchema(sft, encodings, visibilities, bboxes, zCols, metadata.asJava, messageType)
161167
}
162168
}
163169

@@ -192,10 +198,11 @@ object SimpleFeatureParquetSchema extends LazyLogging {
192198
throw new UnsupportedOperationException("GeoMesaV0/GeoMesaV1 encoding is no longer supported")
193199
}
194200
val encodings = Encodings(geometries)
195-
val bboxes = BoundingBoxes(fileSchema.getFields.asScala.map(_.getName).flatMap(BoundingBoxField.fromBoundingBox).toSeq)
201+
val bboxes = BoundingBoxes(fileSchema.getFields.asScala.map(_.getName).flatMap(BoundingBoxField.fromFieldName).toSeq)
202+
val zCols = ZValues(fileSchema.getFields.asScala.map(_.getName).flatMap(ZValueField.fromFieldName).toSeq)
196203
val visibilities = fileSchema.containsField(VisibilitiesField)
197-
val schema = readSchema(fileSchema, sft, readSft, encodings, bboxes, visibilities)
198-
SimpleFeatureParquetSchema(readSft.getOrElse(sft), encodings, visibilities, bboxes, metadata.asJava, schema)
204+
val schema = readSchema(fileSchema, sft, readSft, encodings, bboxes, zCols, visibilities)
205+
SimpleFeatureParquetSchema(readSft.getOrElse(sft), encodings, visibilities, bboxes, zCols, metadata.asJava, schema)
199206
}
200207
}
201208

@@ -216,43 +223,27 @@ object SimpleFeatureParquetSchema extends LazyLogging {
216223
readSft: Option[SimpleFeatureType],
217224
encodings: Encodings,
218225
bboxes: BoundingBoxes,
226+
zValues: ZValues,
219227
visibilities: Boolean): MessageType = {
220-
val consistentSchema = schema(sft, readSft, encodings, bboxes, visibilities) // current schema
221-
if (fileSchema.getFieldName(0) == SimpleFeatureParquetSchema.FeatureIdField) {
222-
consistentSchema
223-
} else {
224-
def getMappedField(name: String): Type = consistentSchema.getFields.get(consistentSchema.getFieldIndex(name))
225-
// old files - attributes are first, then fid, vis, bboxes
226-
val attributes = readSft.getOrElse(sft).getAttributeDescriptors.asScala.map { d =>
227-
getMappedField(alphaNumericSafeString(d.getLocalName))
228-
}
229-
val id = getMappedField(FeatureIdField)
230-
val vis = if (!visibilities) { None } else { Some(getMappedField(VisibilitiesField)) }
231-
val boxes = sft.getAttributeDescriptors.asScala.flatMap { d =>
232-
bboxes.get(d.getLocalName).map(getMappedField)
233-
}
234-
val fields = attributes ++ Seq(id) ++ vis ++ boxes
235-
new MessageType(alphaNumericSafeString(sft.getTypeName), fields.asJava)
236-
}
228+
schema(sft, encodings, bboxes, zValues, visibilities).toMessageType(readSft.getOrElse(sft), excludeZValues = true)
237229
}
238230

239231
/**
240232
* Get the message type for a simple feature type. We need the full sft in order to ensure field ids are
241233
* consistent, but the schema may be reduced (e.g. on read) based on the filter sft
242234
*
243235
* @param sft simple feature type
244-
* @param readSft optional feature type used to filter the fields in the schema
245236
* @param encodings field type encoding
246237
* @param bboxes include bounding boxes for each row
247238
* @param visibilities include visibilities
248239
* @return
249240
*/
250241
private def schema(
251242
sft: SimpleFeatureType,
252-
readSft: Option[SimpleFeatureType],
253243
encodings: Encodings,
254244
bboxes: BoundingBoxes,
255-
visibilities: Boolean): MessageType = {
245+
zValues: ZValues,
246+
visibilities: Boolean): SchemaFields = {
256247
// note: for iceberg compatibility, field ids need to start at one and increment (without gaps) across all top-level fields.
257248
// All nested fields (structs, lists) get ids *after* all the top-level fields, once again incrementing without gaps
258249
val fieldIds = new AtomicInteger(1)
@@ -271,23 +262,18 @@ object SimpleFeatureParquetSchema extends LazyLogging {
271262
}
272263
val attributes = {
273264
val builder = Map.newBuilder[String, Seq[Type]]
274-
val nestedFieldIds = new AtomicInteger(fieldIds.get() + sft.getAttributeCount + bboxes.fields.size)
265+
val nestedFieldIds = new AtomicInteger(fieldIds.get() + sft.getAttributeCount + bboxes.fields.size + zValues.fields.size)
275266
sft.getAttributeDescriptors.asScala.foreach { d =>
276267
val name = alphaNumericSafeString(d.getLocalName)
277268
val types =
278269
Seq(buildType(name, ObjectType.selectType(d), encodings, fieldIds, nestedFieldIds)) ++
279-
bboxes.get(d.getLocalName).map(bbox => BoundingBoxField.schema(bbox, fieldIds, nestedFieldIds))
270+
bboxes.get(d.getLocalName).map(bbox => BoundingBoxField.schema(bbox, fieldIds, nestedFieldIds)) ++
271+
zValues.get(d.getLocalName).map(zValue => ZValueField.schema(zValue, fieldIds))
280272
builder += name -> types
281273
}
282274
builder.result()
283275
}
284-
val attributeFields = readSft.getOrElse(sft).getAttributeDescriptors.asScala.flatMap { d =>
285-
attributes(alphaNumericSafeString(d.getLocalName))
286-
}
287-
// note: id field goes at the front of the record, then vis, then attributes and bounding boxes
288-
val fields = Seq(id) ++ vis ++ attributeFields
289-
val name = alphaNumericSafeString(sft.getTypeName)
290-
new MessageType(name, fields.asJava)
276+
SchemaFields(id, vis, attributes)
291277
}
292278

293279
private def icebergSchema(schema: SimpleFeatureParquetSchema): Schema = {
@@ -305,7 +291,8 @@ object SimpleFeatureParquetSchema extends LazyLogging {
305291
fields.add(NestedField.optional(fieldIds.getAndIncrement(), VisibilitiesField, StringType.get()))
306292
}
307293

308-
val nestedFieldIds = new AtomicInteger(fieldIds.get() + schema.sft.getAttributeCount + schema.bboxes.fields.size)
294+
val nestedFieldIds =
295+
new AtomicInteger(fieldIds.get() + schema.sft.getAttributeCount + schema.bboxes.fields.size + schema.zValues.fields.size)
309296
schema.sft.getAttributeDescriptors.asScala.foreach { d =>
310297
val name = alphaNumericSafeString(d.getLocalName)
311298
aliases.put(name, fieldIds.get())
@@ -314,6 +301,10 @@ object SimpleFeatureParquetSchema extends LazyLogging {
314301
aliases.put(bbox, fieldIds.get())
315302
fields.add(BoundingBoxField.icebergSchema(bbox, fieldIds, nestedFieldIds))
316303
}
304+
schema.zValues.get(d.getLocalName).foreach { zValue =>
305+
aliases.put(zValue, fieldIds.get())
306+
fields.add(ZValueField.icebergSchema(zValue, fieldIds))
307+
}
317308
}
318309
new Schema(fields, aliases, java.util.Set.of[Integer](1))
319310
}
@@ -423,4 +414,18 @@ object SimpleFeatureParquetSchema extends LazyLogging {
423414
}
424415
typed.build()
425416
}
417+
418+
private case class SchemaFields(fid: Type, vis: Option[Type], attributes: Map[String, Seq[Type]]) {
419+
def toMessageType(sft: SimpleFeatureType, excludeZValues: Boolean = false): MessageType = {
420+
// note: id field goes at the front of the record, then vis, then attributes and bounding boxes
421+
val fields = Seq(fid) ++ vis ++ {
422+
val all = sft.getAttributeDescriptors.asScala.flatMap { d =>
423+
attributes(alphaNumericSafeString(d.getLocalName))
424+
}
425+
if (!excludeZValues) { all } else { all.filter(f => ZValueField.fromFieldName(f.getName).isEmpty) }
426+
}
427+
val name = alphaNumericSafeString(sft.getTypeName)
428+
new MessageType(name, fields.asJava)
429+
}
430+
}
426431
}

0 commit comments

Comments
 (0)