Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.data.store.{ContentDataStore, ContentEntry, ContentFeatureSource}
import org.geotools.feature.NameImpl
import org.locationtech.geomesa.fs.data.FileSystemDataStore.FileSystemDataStoreConfig
import org.locationtech.geomesa.fs.data.stats.FileSystemStats
import org.locationtech.geomesa.fs.storage.core.{FileSystemContext, FileSystemStorage, FileSystemStorageFactory, StorageMetadataCatalog}
import org.locationtech.geomesa.index.stats.RunnableStats.UnoptimizedRunnableStats
import org.locationtech.geomesa.index.stats.{GeoMesaStats, HasGeoMesaStats}
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
import org.locationtech.geomesa.utils.index.GeoMesaSchemaValidator
Expand Down Expand Up @@ -45,7 +45,7 @@ class FileSystemDataStore(storageFactory: FileSystemStorageFactory, catalog: Sto

config.context.namespace.foreach(setNamespaceURI)

override val stats: GeoMesaStats = new UnoptimizedRunnableStats(this)
override val stats: GeoMesaStats = new FileSystemStats(this)

override def createTypeNames(): java.util.List[Name] = {
val names = new java.util.ArrayList[Name]()
Expand Down Expand Up @@ -100,6 +100,15 @@ class FileSystemDataStore(storageFactory: FileSystemStorageFactory, catalog: Sto
}

object FileSystemDataStore {

/**
* Config options
*
* @param context handle to the file system
* @param readThreads number of threads per read
* @param writeTimeout write timeout
* @param queryTimeout read timeout
*/
case class FileSystemDataStoreConfig(
context: FileSystemContext,
readThreads: Int,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/***********************************************************************
* Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* https://www.apache.org/licenses/LICENSE-2.0
***********************************************************************/

package org.locationtech.geomesa.fs.data.stats

import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.Filter
import org.geotools.util.factory.Hints
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.fs.data.FileSystemDataStore
import org.locationtech.geomesa.index.index.attribute.AttributeIndexKey
import org.locationtech.geomesa.index.stats.RunnableStats.UnoptimizedRunnableStats
import org.locationtech.geomesa.index.stats.impl.MinMax
import org.locationtech.geomesa.index.stats.impl.MinMax.MinMaxDefaults
import org.locationtech.jts.geom.Point

/**
* Optimized stats using per-file bounds for non-exact cases
*
* @param ds datastore
*/
class FileSystemStats(ds: FileSystemDataStore) extends UnoptimizedRunnableStats(ds) {

import org.locationtech.geomesa.fs.storage.core.RichSimpleFeatureType

override def getCount(
sft: SimpleFeatureType,
filter: Filter,
exact: Boolean,
queryHints: Hints): Option[Long] = {
if (!exact || filter == Filter.INCLUDE) {
Some(ds.storage(sft.getTypeName).metadata.getFiles(filter).map(_.count).sum)
} else {
super.getCount(sft, filter, exact, queryHints)
}
}

override def getMinMax[T](
sft: SimpleFeatureType,
attribute: String,
filter: Filter,
exact: Boolean): Option[MinMax[T]] = {
if (!exact || filter == Filter.INCLUDE) {
val i = sft.indexOf(attribute)
if (sft.nonSpatialBounds().contains(i)) {
val binding = sft.getDescriptor(i).getType.getBinding
val minMax = new MinMax[T](sft, sft.getDescriptor(i).getLocalName)(MinMaxDefaults(binding))

var min: String = null
var max: String = null
ds.storage(sft.getTypeName).metadata.getFiles(filter).foreach { file =>
file.attributeBounds.filter(_.attribute == i).foreach { bounds =>
if (max == null || max < bounds.upper) {
max = bounds.upper
}
if (min == null || min > bounds.lower) {
min = bounds.lower
}
}
}
if (min != null) {
val alias = AttributeIndexKey.alias(binding)
val sf = new ScalaSimpleFeature(sft, "")
Seq(min, max).foreach { value =>
sf.setAttribute(i, AttributeIndexKey.decode(alias, value))
minMax.observe(sf)
}
}
Some(minMax)
} else if (sft.spatialBounds().contains(i) && sft.getDescriptor(i).getType.getBinding == classOf[Point]) {
val minMax = new MinMax[T](sft, sft.getDescriptor(i).getLocalName)(MinMaxDefaults(classOf[Point]))

var xmin, ymin, xmax, ymax: java.lang.Double = null
ds.storage(sft.getTypeName).metadata.getFiles(filter).foreach { file =>
file.spatialBounds.filter(_.attribute == i).foreach { bounds =>
xmin = if (xmin == null) { bounds.xmin } else { math.min(xmin, bounds.xmin) }
xmax = if (xmax == null) { bounds.xmax } else { math.max(xmax, bounds.xmax) }
ymin = if (ymin == null) { bounds.ymin } else { math.min(ymin, bounds.ymin) }
ymax = if (ymax == null) { bounds.ymax } else { math.max(ymax, bounds.ymax) }
}
}
if (xmin != null) {
val sf = new ScalaSimpleFeature(sft, "")
Seq(xmin -> ymin, xmax -> ymax).foreach { case (x, y) =>
sf.setAttribute(i, s"POINT ($x $y)")
minMax.observe(sf)
}
}
Some(minMax)
} else {
super.getMinMax(sft, attribute, filter, exact)
}
} else {
super.getMinMax(sft, attribute, filter, exact)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ class FileSystemDataStoreTest extends SpecificationWithJUnit with BeforeAfterAll
fs2.getCount(Query.ALL) must beEqualTo(10)
fs2.getBounds must equalTo(new ReferencedEnvelope(10.0, 10.0, 10.0, 10.9, CRS_EPSG_4326))
}

// test stats queries
ds.stats.getCount(sft) must beSome(10L)
ds.stats.getCount(sft, exact = true) must beSome(10L)
val minMax = ds.stats.getMinMax[String](sft, "name").orNull
minMax must not(beNull)
minMax.min mustEqual "test0"
minMax.max mustEqual "test9"
ds.stats.getMinMax[Int](sft, "age") must beNone // only attributes with fs.bounds will return cached stats
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import java.util
import java.util.Collections
import scala.util.Try

class GeoMesaFeatureSource(val ds: GeoMeasBaseStore, val sft: SimpleFeatureType)
class GeoMesaFeatureSource(val ds: GeoMesaBaseStore, val sft: SimpleFeatureType)
extends SimpleFeatureSource with LazyLogging {

lazy private val hints = Collections.unmodifiableSet(Collections.emptySet[Key])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.locationtech.geomesa.utils.io.WithClose
import java.util.Collections
import scala.collection.mutable.ArrayBuffer

class GeoMesaFeatureStore(ds: GeoMeasBaseStore, sft: SimpleFeatureType)
class GeoMesaFeatureStore(ds: GeoMesaBaseStore, sft: SimpleFeatureType)
extends GeoMesaFeatureSource(ds, sft) with SimpleFeatureStore {

private var transaction: Transaction = Transaction.AUTO_COMMIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ import org.locationtech.geomesa.index.stats.HasGeoMesaStats

package object geotools {

type GeoMeasBaseStore = DataStore with HasGeoMesaFeatureReader with HasGeoMesaStats
type GeoMesaBaseStore = DataStore with HasGeoMesaFeatureReader with HasGeoMesaStats
}
Loading