Skip to content

Commit 2350a7f

Browse files
committed
GEOMESA-3565 FSDS - Standardize file metadata bounds against z-values
* Instead of storing spatial bounds separately, just store them as z-values
1 parent 4e02d00 commit 2350a7f

24 files changed

Lines changed: 674 additions & 505 deletions

File tree

geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemFeatureStore.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.locationtech.geomesa.fs.storage.core.{CloseableFeatureIterator, FileS
2424
import org.locationtech.geomesa.index.geotools.{FastSettableFeatureWriter, GeoMesaFeatureWriter}
2525
import org.locationtech.geomesa.index.utils.ThreadManagement.{LowLevelScanner, ManagedScan, Timeout}
2626
import org.locationtech.geomesa.utils.io.{CloseQuietly, CloseWithLogging, FlushWithLogging}
27+
import org.locationtech.jts.geom.Geometry
2728

2829
import java.io.{Closeable, Flushable}
2930
import java.util.concurrent.TimeUnit
@@ -59,7 +60,11 @@ class FileSystemFeatureStore(
5960
Option(sft.getGeometryDescriptor).foreach { g =>
6061
val i = sft.indexOf(g.getLocalName)
6162
storage.metadata.getFiles(query.getFilter).foreach { file =>
62-
file.spatialBounds.find(_.attribute == i).foreach(b => envelope.expandToInclude(b.envelope))
63+
file.bounds.find(_.attribute == i).foreach { b =>
64+
b.decode(sft).productIterator.foreach {
65+
case g: Geometry => envelope.expandToInclude(g.getEnvelopeInternal)
66+
}
67+
}
6368
}
6469
}
6570
envelope

geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/stats/FileSystemStats.scala

Lines changed: 19 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,10 @@ import org.geotools.api.filter.Filter
1313
import org.geotools.util.factory.Hints
1414
import org.locationtech.geomesa.features.ScalaSimpleFeature
1515
import org.locationtech.geomesa.fs.data.FileSystemDataStore
16-
import org.locationtech.geomesa.index.index.attribute.AttributeIndexKey
16+
import org.locationtech.geomesa.fs.storage.core.StorageMetadata.ColumnBounds
1717
import org.locationtech.geomesa.index.stats.RunnableStats.UnoptimizedRunnableStats
1818
import org.locationtech.geomesa.index.stats.impl.MinMax
1919
import org.locationtech.geomesa.index.stats.impl.MinMax.MinMaxDefaults
20-
import org.locationtech.jts.geom.Point
2120

2221
/**
2322
* Optimized stats using per-file bounds for non-exact cases
@@ -45,56 +44,29 @@ class FileSystemStats(ds: FileSystemDataStore) extends UnoptimizedRunnableStats(
4544
attribute: String,
4645
filter: Filter,
4746
exact: Boolean): Option[MinMax[T]] = {
48-
if (!exact || filter == Filter.INCLUDE) {
49-
val i = sft.indexOf(attribute)
50-
if (sft.nonSpatialBounds().contains(i)) {
51-
val binding = sft.getDescriptor(i).getType.getBinding
52-
val minMax = new MinMax[T](sft, sft.getDescriptor(i).getLocalName)(MinMaxDefaults(binding))
53-
54-
var min: String = null
55-
var max: String = null
56-
ds.storage(sft.getTypeName).metadata.getFiles(filter).foreach { file =>
57-
file.attributeBounds.filter(_.attribute == i).foreach { bounds =>
58-
if (max == null || max < bounds.upper) {
59-
max = bounds.upper
60-
}
61-
if (min == null || min > bounds.lower) {
62-
min = bounds.lower
63-
}
64-
}
65-
}
66-
if (min != null) {
67-
val alias = AttributeIndexKey.alias(binding)
68-
val sf = new ScalaSimpleFeature(sft, "")
69-
Seq(min, max).foreach { value =>
70-
sf.setAttribute(i, AttributeIndexKey.decode(alias, value))
71-
minMax.observe(sf)
47+
val i = sft.indexOf(attribute)
48+
if ((!exact || filter == Filter.INCLUDE) && sft.columnBounds().contains(i)) {
49+
var min: String = null
50+
var max: String = null
51+
ds.storage(sft.getTypeName).metadata.getFiles(filter).foreach { file =>
52+
file.bounds.filter(_.attribute == i).foreach { bounds =>
53+
if (max == null || max < bounds.upper) {
54+
max = bounds.upper
7255
}
73-
}
74-
Some(minMax)
75-
} else if (sft.spatialBounds().contains(i) && sft.getDescriptor(i).getType.getBinding == classOf[Point]) {
76-
val minMax = new MinMax[T](sft, sft.getDescriptor(i).getLocalName)(MinMaxDefaults(classOf[Point]))
77-
78-
var xmin, ymin, xmax, ymax: java.lang.Double = null
79-
ds.storage(sft.getTypeName).metadata.getFiles(filter).foreach { file =>
80-
file.spatialBounds.filter(_.attribute == i).foreach { bounds =>
81-
xmin = if (xmin == null) { bounds.xmin } else { math.min(xmin, bounds.xmin) }
82-
xmax = if (xmax == null) { bounds.xmax } else { math.max(xmax, bounds.xmax) }
83-
ymin = if (ymin == null) { bounds.ymin } else { math.min(ymin, bounds.ymin) }
84-
ymax = if (ymax == null) { bounds.ymax } else { math.max(ymax, bounds.ymax) }
56+
if (min == null || min > bounds.lower) {
57+
min = bounds.lower
8558
}
8659
}
87-
if (xmin != null) {
88-
val sf = new ScalaSimpleFeature(sft, "")
89-
Seq(xmin -> ymin, xmax -> ymax).foreach { case (x, y) =>
90-
sf.setAttribute(i, s"POINT ($x $y)")
91-
minMax.observe(sf)
92-
}
60+
}
61+
val minMax = new MinMax[T](sft, sft.getDescriptor(i).getLocalName)(MinMaxDefaults(sft.getDescriptor(i).getType.getBinding))
62+
if (min != null) {
63+
val sf = new ScalaSimpleFeature(sft, "")
64+
ColumnBounds(i, min, max).decode(sft).productIterator.foreach { value =>
65+
sf.setAttribute(i, value.asInstanceOf[AnyRef])
66+
minMax.observe(sf)
9367
}
94-
Some(minMax)
95-
} else {
96-
super.getMinMax(sft, attribute, filter, exact)
9768
}
69+
Some(minMax)
9870
} else {
9971
super.getMinMax(sft, attribute, filter, exact)
10072
}

geomesa-fs/geomesa-fs-datastore/src/test/scala/org/locationtech/geomesa/fs/data/FileSystemDataStoreTest.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.locationtech.geomesa.utils.geotools.{CRS_EPSG_4326, FeatureUtils, Sim
2424
import org.locationtech.geomesa.utils.io.WithClose
2525
import org.locationtech.jts.geom.Geometry
2626
import org.slf4j.LoggerFactory
27-
import org.specs2.matcher.Matcher
27+
import org.specs2.matcher.{MatchResult, Matcher}
2828
import org.specs2.mutable.SpecificationWithJUnit
2929
import org.specs2.specification.BeforeAfterAll
3030
import org.testcontainers.containers.output.Slf4jLogConsumer
@@ -162,7 +162,7 @@ class FileSystemDataStoreTest extends SpecificationWithJUnit with BeforeAfterAll
162162

163163
// This shows that the FeatureSource doing the writing has an up-to-date view of the metadata
164164
fs.getCount(Query.ALL) must beEqualTo(10)
165-
fs.getBounds must equalTo(new ReferencedEnvelope(10.0, 10.0, 10.0, 10.9, CRS_EPSG_4326))
165+
compareBounds(fs.getBounds, new ReferencedEnvelope(10.0, 10.0, 10.0, 10.9, CRS_EPSG_4326))
166166

167167
val results = CloseableIterator(fs.getFeatures(new Query(sft.getTypeName)).features()).toList
168168
results must containTheSameElementsAs(features)
@@ -171,7 +171,7 @@ class FileSystemDataStoreTest extends SpecificationWithJUnit with BeforeAfterAll
171171
WithClose(DataStoreFinder.getDataStore(params.asJava)) { ds2 =>
172172
val fs2 = ds2.getFeatureSource(sft.getTypeName)
173173
fs2.getCount(Query.ALL) must beEqualTo(10)
174-
fs2.getBounds must equalTo(new ReferencedEnvelope(10.0, 10.0, 10.0, 10.9, CRS_EPSG_4326))
174+
compareBounds(fs2.getBounds, new ReferencedEnvelope(10.0, 10.0, 10.0, 10.9, CRS_EPSG_4326))
175175
}
176176

177177
// test stats queries
@@ -406,7 +406,7 @@ class FileSystemDataStoreTest extends SpecificationWithJUnit with BeforeAfterAll
406406
fs.getCount(Query.ALL) mustEqual 10
407407
val env = new ReferencedEnvelope(CRS_EPSG_4326)
408408
features.foreach(f => env.expandToInclude(f.getDefaultGeometry.asInstanceOf[Geometry].getEnvelopeInternal))
409-
fs.getBounds mustEqual env
409+
compareBounds(fs.getBounds, env, 10) // xz2 inversion is not very precise...
410410

411411
foreach(Seq("INCLUDE", s"bbox(geom,${env.getMinX},${env.getMinY},${env.getMaxX},${env.getMaxY})")) { filter =>
412412
val query = new Query(sft.getTypeName, ECQL.toFilter(filter))
@@ -424,4 +424,11 @@ class FileSystemDataStoreTest extends SpecificationWithJUnit with BeforeAfterAll
424424
}
425425
}
426426
}
427+
428+
private def compareBounds(bounds: ReferencedEnvelope, expected: ReferencedEnvelope, delta: Double = 0.01): MatchResult[_] = {
429+
def toSeq(b: ReferencedEnvelope): Seq[Double] = Seq(b.getMinX, b.getMinY, b.getMaxX, b.getMaxY)
430+
foreach(toSeq(bounds).zip(toSeq(expected))) { case (b, e) =>
431+
b must beCloseTo(e, delta)
432+
}
433+
}
427434
}

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/FileSystemStorage.scala

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,16 @@ import org.geotools.filter.text.ecql.ECQL
1818
import org.locationtech.geomesa.fs.storage.core.FileSystemStorage.FileType.FileType
1919
import org.locationtech.geomesa.fs.storage.core.FileSystemStorage._
2020
import org.locationtech.geomesa.fs.storage.core.StorageMetadata.StorageFileAction.StorageFileAction
21-
import org.locationtech.geomesa.fs.storage.core.StorageMetadata.{AttributeBounds, SpatialBounds, StorageFile, StorageFileAction}
21+
import org.locationtech.geomesa.fs.storage.core.StorageMetadata.{ColumnBounds, StorageFile, StorageFileAction}
2222
import org.locationtech.geomesa.fs.storage.core.fs.ObjectStore
2323
import org.locationtech.geomesa.fs.storage.core.observer.FileSystemObserverFactory.CompositeObserver
2424
import org.locationtech.geomesa.fs.storage.core.observer.{FileSystemObserver, FileSystemObserverFactory}
2525
import org.locationtech.geomesa.fs.storage.core.utils.FileSize.UpdatingFileSizeEstimator
2626
import org.locationtech.geomesa.fs.storage.core.utils.{FileSize, FileSystemThreadedReader}
27-
import org.locationtech.geomesa.index.index.attribute.AttributeIndexKey
2827
import org.locationtech.geomesa.index.planning.QueryRunner
2928
import org.locationtech.geomesa.index.utils.SortingSimpleFeatureIterator
3029
import org.locationtech.geomesa.utils.collection.CloseableIterator
3130
import org.locationtech.geomesa.utils.io.{CloseQuietly, CloseWithLogging, FlushQuietly, WithClose}
32-
import org.locationtech.jts.geom.{Envelope, Geometry}
3331

3432
import java.io.{Closeable, Flushable}
3533
import java.net.URI
@@ -363,6 +361,7 @@ object FileSystemStorage {
363361
* @param writers iterator of files to write
364362
* @param estimator target file size estimator
365363
*/
364+
// noinspection ScalaWeakerAccess
366365
class ChunkedFileSystemWriter(writers: Iterator[FileSystemWriter], estimator: UpdatingFileSizeEstimator)
367366
extends FileSystemWriter {
368367

@@ -497,12 +496,11 @@ object FileSystemStorage {
497496

498497
private var count: Long = 0L
499498

500-
private val spatialBounds = sft.spatialBounds().map(_ -> new Envelope())
501-
502-
private val nonSpatialBounds = sft.nonSpatialBounds().flatMap { i =>
499+
private val columnBounds = sft.columnBounds().flatMap { i =>
503500
val binding = sft.getDescriptor(i).getType.getBinding
504-
AttributeIndexKey.TypeRegistry.getAllEncoders.asScala.find(_.resolves().isAssignableFrom(binding)) match {
505-
case Some(encoder) => Some(AttributeBoundsBuilder(i, encoder.asInstanceOf[TypeEncoder[AnyRef, String]]))
501+
val encoders = StorageMetadata.TypeRegistry.getAllEncoders.asScala
502+
encoders.find(_.resolves() == binding).orElse(encoders.find(_.resolves().isAssignableFrom(binding))) match {
503+
case Some(encoder) => Some(ColumnBoundsBuilder(i, encoder.asInstanceOf[TypeEncoder[AnyRef, String]]))
506504
case None =>
507505
logger.warn(
508506
s"Can't find an encoder for attribute ${sft.getDescriptor(i).getLocalName} of type ${binding.getSimpleName} - " +
@@ -529,21 +527,14 @@ object FileSystemStorage {
529527
* @return
530528
*/
531529
def file(path: String, partition: Partition, action: StorageFileAction): StorageFile = {
532-
val spatial = spatialBounds.flatMap { case (i, env) => SpatialBounds(i, env) }
533-
val nonSpatial = nonSpatialBounds.flatMap(_.build())
530+
val bounds = columnBounds.flatMap(_.build())
534531
val sort = sorted.map(_._1)
535-
StorageFile(path, partition, count, action, spatial, nonSpatial, sort)
532+
StorageFile(path, partition, count, action, bounds, sort)
536533
}
537534

538535
override def apply(feature: SimpleFeature): Unit = {
539536
count += 1L
540-
spatialBounds.foreach { case (i, env) =>
541-
val geom = feature.getAttribute(i).asInstanceOf[Geometry]
542-
if (geom != null) {
543-
env.expandToInclude(geom.getEnvelopeInternal)
544-
}
545-
}
546-
nonSpatialBounds.foreach(_.apply(feature))
537+
columnBounds.foreach(_.apply(feature))
547538
if (sorted.nonEmpty) {
548539
sorted = sorted.flatMap { case (i, ordering, last) =>
549540
val next = feature.getAttribute(i)
@@ -584,7 +575,7 @@ object FileSystemStorage {
584575
* @param i attribute index
585576
* @param lexicoder lexicoder for the attribute type
586577
*/
587-
private case class AttributeBoundsBuilder(i: Int, lexicoder: TypeEncoder[AnyRef, String]) {
578+
private case class ColumnBoundsBuilder(i: Int, lexicoder: TypeEncoder[AnyRef, String]) {
588579

589580
private var lower: String = _
590581
private var upper: String = _
@@ -604,7 +595,7 @@ object FileSystemStorage {
604595
}
605596
}
606597

607-
def build(): Option[AttributeBounds] = if (lower == null) { None } else { Some(AttributeBounds(i, lower, upper)) }
598+
def build(): Option[ColumnBounds] = if (lower == null) { None } else { Some(ColumnBounds(i, lower, upper)) }
608599
}
609600

610601
/**

0 commit comments

Comments
 (0)