From 45e4806e15d92b5a341de81ef7c8f23269ff7a61 Mon Sep 17 00:00:00 2001 From: Emilio Lahr-Vivaz Date: Fri, 15 May 2026 12:54:40 -0400 Subject: [PATCH] GEOMESA-3581 Parallelize spill-to-disk during sort operations --- .../geomesa/fs/storage/core/package.scala | 4 +- .../FsGeneratePartitionFiltersCommand.scala | 2 +- .../utils/SortingSimpleFeatureIterator.scala | 281 ++++++++++++------ .../SortingSimpleFeatureIteratorTest.scala | 5 +- .../utils/concurrent/CachedThreadPool.scala | 8 + 5 files changed, 206 insertions(+), 94 deletions(-) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/package.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/package.scala index d2fb6a75d5d3..bbe3b0b6715f 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/package.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/package.scala @@ -130,7 +130,7 @@ package object core { /** * Json serializer for partitions */ - private[core] object PartitionSerializer extends JsonSerializer[Partition] with JsonDeserializer[Partition] { + object PartitionSerializer extends JsonSerializer[Partition] with JsonDeserializer[Partition] { override def serialize(src: Partition, typeOfSrc: Type, context: JsonSerializationContext): JsonElement = { val array = new JsonArray(src.values.size) @@ -180,7 +180,7 @@ package object core { /** * Json serializer for partition keys */ - private[core] object PartitionKeySerializer extends JsonSerializer[PartitionKey] with JsonDeserializer[PartitionKey] { + object PartitionKeySerializer extends JsonSerializer[PartitionKey] with JsonDeserializer[PartitionKey] { override def serialize(src: PartitionKey, typeOfSrc: Type, context: JsonSerializationContext): JsonElement = { val obj = new JsonObject() diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsGeneratePartitionFiltersCommand.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsGeneratePartitionFiltersCommand.scala index 8d48dd874a56..bf53c3b755b0 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsGeneratePartitionFiltersCommand.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsGeneratePartitionFiltersCommand.scala @@ -28,7 +28,7 @@ class FsGeneratePartitionFiltersCommand extends FsDataStoreCommand { override def execute(): Unit = withDataStore { ds => if (params.cqlFilter == null && params.partitions.isEmpty) { - throw new ParameterException("At least one of --partitions or --cql must be specified") + throw new ParameterException("At least one of --partition or --cql must be specified") } val metadata = ds.storage(params.featureName).metadata diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/utils/SortingSimpleFeatureIterator.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/utils/SortingSimpleFeatureIterator.scala index eb9a98880bce..30d61b319f6b 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/utils/SortingSimpleFeatureIterator.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/utils/SortingSimpleFeatureIterator.scala @@ -9,11 +9,12 @@ package org.locationtech.geomesa.index.utils import com.typesafe.scalalogging.LazyLogging -import org.geotools.api.feature.simple.SimpleFeature +import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.locationtech.geomesa.features.kryo.KryoFeatureSerializer import org.locationtech.geomesa.features.{SerializationOption, SimpleFeatureSerializer} import org.locationtech.geomesa.index.conf.QueryProperties import org.locationtech.geomesa.utils.collection.CloseableIterator +import org.locationtech.geomesa.utils.concurrent.CachedThreadPool import org.locationtech.geomesa.utils.geotools.SimpleFeatureOrdering import org.locationtech.geomesa.utils.index.ByteArrays import org.locationtech.geomesa.utils.io.{CloseQuietly, Sizable, WithClose} @@ -21,9 +22,9 @@ import org.locationtech.geomesa.utils.io.{CloseQuietly, Sizable, WithClose} import java.io.{File, FileInputStream, FileOutputStream} import java.nio.file.Files import java.util.concurrent.atomic.AtomicBoolean -import scala.collection.mutable.ArrayBuffer -import scala.util.Random +import java.util.concurrent.{Callable, Future} import scala.util.control.NonFatal +import scala.util.{Random, Try} /** * In memory sorting of simple features, with optional spill to disk @@ -40,15 +41,19 @@ class SortingSimpleFeatureIterator(features: CloseableIterator[SimpleFeature], s private val closed = new AtomicBoolean(false) private lazy val sorted: CloseableIterator[SimpleFeature] = { - if (closed.get || !features.hasNext) { features } else { - val head = features.next() - if (closed.get || !features.hasNext) { CloseableIterator.single(head, features.close()) } else { - val ordering = SimpleFeatureOrdering(head.getFeatureType, sortBy) - QueryProperties.SortMemoryThreshold.toBytes match { - case None => sortInMemory(head, features, ordering, closed) - case Some(threshold) => sortWithSpillover(head, features, ordering, closed, threshold) + try { + if (closed.get || !features.hasNext) { CloseableIterator.empty } else { + val head = features.next() + if (!features.hasNext) { CloseableIterator.single(head) } else { + val ordering = SimpleFeatureOrdering(head.getFeatureType, sortBy) + QueryProperties.SortMemoryThreshold.toBytes match { + case None => sortInMemory(head, features, ordering, closed) + case Some(threshold) => sortWithSpillover(head, features, ordering, closed, threshold) + } } } + } finally { + features.close() } } @@ -65,8 +70,6 @@ class SortingSimpleFeatureIterator(features: CloseableIterator[SimpleFeature], s object SortingSimpleFeatureIterator extends LazyLogging { - import scala.collection.JavaConverters._ - /** * Sorts the iterator in memory * @@ -82,7 +85,7 @@ object SortingSimpleFeatureIterator extends LazyLogging { ordering: Ordering[SimpleFeature], closed: AtomicBoolean): CloseableIterator[SimpleFeature] = { // use ArrayList for sort-in-place of the underlying array - val list = new java.util.ArrayList[SimpleFeature](100) + val list = new java.util.ArrayList[SimpleFeature](1000) list.add(head) while (tail.hasNext && !closed.get) { @@ -91,10 +94,10 @@ object SortingSimpleFeatureIterator extends LazyLogging { if (closed.get) { // don't bother sorting, just return an empty iterator - CloseableIterator(Iterator.empty, tail.close()) + CloseableIterator.empty } else { - list.sort(ordering) - CloseableIterator(list.iterator.asScala, tail.close()) + sort(list, ordering) + CloseableIterator(list.iterator) } } @@ -114,104 +117,209 @@ object SortingSimpleFeatureIterator extends LazyLogging { ordering: Ordering[SimpleFeature], closed: AtomicBoolean, threshold: Long): CloseableIterator[SimpleFeature] = { + var sorter = new SpillToDiskSorter(head, ordering, threshold) + try { + while (tail.hasNext && !closed.get()) { + sorter = sorter + tail.next() + } + sorter.result(closed.get) + } catch { + case NonFatal(e) => + sorter.cleanup() + throw e + } + } - // grouping ID for tmp files, doesn't have to be unique - lazy val id = { - val name = head.getFeatureType.getTypeName.replaceAll("[^A-Za-z0-9_-]", "").take(20) - f"$name-${Random.nextInt(10000)}%04d" + /** + * Sort a list in-place + * + * @param list list + * @param ordering ordering + */ + private def sort(list: java.util.ArrayList[SimpleFeature], ordering: Ordering[SimpleFeature]): Unit = { + val start = System.nanoTime() + list.sort(ordering) + logger.debug(s"Sorted ${list.size()} features in ${(System.nanoTime() - start)/1000000}ms") + } + + /** + * Delete a file + * + * @param file file to delete + */ + private def delete(file: File): Unit = { + try { + if (!file.delete()) { + logger.warn(s"Unable to delete tmp file '${file.getAbsolutePath}''") + } + } catch { + case NonFatal(e) => logger.warn(s"Unable to delete tmp file '${file.getAbsolutePath}''", e) } + } - val sizable: FeatureIsSizable[SimpleFeature] = head match { - case _: SimpleFeature with Sizable => - SizableFeatureIsSizable.asInstanceOf[FeatureIsSizable[SimpleFeature]] + /** + * Gets a sizeable implementation based on the feature class + * + * @param sf simple feature + * @return + */ + private def toSizeable(sf: SimpleFeature): FeatureIsSizable[SimpleFeature] = { + sf match { + case _: SimpleFeature with Sizable => SizableFeatureIsSizable.asInstanceOf[FeatureIsSizable[SimpleFeature]] case _ => // currently, all of our uses of this class pass in Sizable features... this means we missed one logger.warn( - s"Feature class '${head.getClass.getName}' doesn't implement Sizable - " + - "using estimated size for memory threshold calculations") + s"Feature class '${sf.getClass.getName}' doesn't implement Sizable - " + + "using estimated size for memory threshold calculations") UnSizableFeatureIsSizable } + } - val files = ArrayBuffer.empty[File] // tmp files we create when we exceed in-memory threshold - lazy val serializer = KryoFeatureSerializer(head.getFeatureType, SerializationOption.WithUserData) - // use ArrayList for sort-in-place of the underlying array - val list = new java.util.ArrayList[SimpleFeature](1000) - list.add(head) - var size = sizable.sizeOf(head) + /** + * Class for sorting with spill-to-disk + * + * @param spill write/read to disk + * @param sizer sizer for the features being sorted + * @param ordering sort ordering + * @param threshold memory threshold for dumping to disk, in bytes + * @param files list of files that have been dumped to disk + * @param pending a file that is asynchronously being dumped to disk, if any + */ + private class SpillToDiskSorter( + spill: SpillToDisk, + sizer: FeatureIsSizable[SimpleFeature], + ordering: Ordering[SimpleFeature], + threshold: Long, + files: Seq[File] = Seq.empty, + pending: Option[Future[File]] = None) extends Callable[File] { + + private val start = System.nanoTime() + private val mem = new java.util.ArrayList[SimpleFeature](1000) + private var size: Long = 0 + + /** + * Create a new sorter + * + * @param sf simple feature, representative of the features being chunked + * @param ordering ordering + * @param threshold threshold for triggering a spill-to-disk, in bytes + * @return + */ + def this(sf: SimpleFeature, ordering: Ordering[SimpleFeature], threshold: Long) = { + this(new SpillToDisk(sf.getFeatureType), toSizeable(sf), ordering, threshold) + mem.add(sf) + size += sizer.sizeOf(sf) + } - while (tail.hasNext && !closed.get) { + /** + * Add the simple feature to the chunk, returning either this chunk or a new chunk + * + * @param sf feature + * @return next chunk + */ + def +(sf: SimpleFeature): SpillToDiskSorter = { if (size >= threshold) { - // write out the sorted list to disk - list.sort(ordering) - val file = Files.createTempFile(s"gm-sort-$id-", ".kryo").toFile - files += file - logger.trace(s"Created temp sort file '${file.getAbsolutePath}'") - WithClose(new FileOutputStream(file)) { os => - var i = 0 - while (i < list.size()) { - val bytes = serializer.serialize(list.get(i)) - os.write(ByteArrays.toBytes(bytes.length)) - os.write(bytes) - i += 1 + logger.debug(s"Read ${mem.size()} features from the underlying store in ${(System.nanoTime() - start)/1000000}ms") + // only allow one sort at a time so we don't blow up memory with all the array lists + val last = pending.map { f => + if (f.isDone) { f.get() } else { + val start = System.nanoTime() + val file = f.get() + logger.debug(s"Waited ${(System.nanoTime() - start)/1000000}ms for last batch to be flushed to disk") + file } } - list.clear() - size = 0 + val future = CachedThreadPool.call(this) + new SpillToDiskSorter(spill, sizer, ordering, threshold, files ++ last, Some(future)) + sf + } else { + mem.add(sf) + size += sizer.sizeOf(sf) + this } - val next = tail.next() - list.add(next) - size += sizable.sizeOf(next) } - if (closed.get) { - files.foreach { file => - try { - if (!file.delete()) { - logger.warn(s"Unable to delete tmp file '${file.getAbsolutePath}''") - file.deleteOnExit() - } - } catch { - case NonFatal(e) => logger.warn(s"Unable to delete tmp file '${file.getAbsolutePath}''", e) + override def call(): File = { + sort(mem, ordering) + spill.write(mem) + } + + /** + * Get all the features added so far, in sorted order + * + * @return + */ + def result(closed: Boolean): CloseableIterator[SimpleFeature] = { + logger.debug(s"Read ${mem.size()} features from the underlying store in ${(System.nanoTime() - start)/1000000}ms") + if (closed) { + // don't bother sorting, just clean up and return an empty iterator + cleanup() + CloseableIterator.empty + } else { + sort(mem, ordering) + val memIter = CloseableIterator(mem.iterator) + if (files.isEmpty && pending.isEmpty) { + memIter + } else { + val filesIter = (files ++ pending.map(_.get())).map(spill.read) + new MergeSortingIterator(IndexedSeq(memIter) ++ filesIter, ordering) } } - // don't bother sorting, just return an empty iterator - CloseableIterator(Iterator.empty, tail.close()) - } else { - if (!list.isEmpty) { - list.sort(ordering) - } - if (files.isEmpty) { - CloseableIterator(list.iterator.asScala, tail.close()) - } else { - new MergeSortingIterator(files.toIndexedSeq, serializer, list.iterator.asScala, tail, ordering) + } + + /** + * Delete any underlying resources. Should only be called if not calling `result` due to an error/interruption + */ + def cleanup(): Unit = { + pending.foreach(_.cancel(true)) + files.foreach(delete) + Try(pending.map(_.get())).toOption.flatten.foreach(delete) + } + } + + /** + * Class for writing and reading back from disk + * + * @param sft feature type + */ + private class SpillToDisk(sft: SimpleFeatureType) { + + // grouping ID for tmp files, doesn't have to be unique + private val id =f"${sft.getTypeName.replaceAll("[^A-Za-z0-9_-]", "").take(20)}-${Random.nextInt(10000)}%04d" + + private val serializer = KryoFeatureSerializer(sft, SerializationOption.WithUserData) + + def write(features: java.util.ArrayList[SimpleFeature]): File = { + val start = System.nanoTime() + val file = Files.createTempFile(s"gm-sort-$id-", ".kryo").toFile + logger.trace(s"Created temp sort file '${file.getAbsolutePath}'") + file.deleteOnExit() + WithClose(new FileOutputStream(file)) { os => + features.forEach { sf => + val bytes = serializer.serialize(sf) + os.write(ByteArrays.toBytes(bytes.length)) + os.write(bytes) + } } + logger.debug(s"Wrote ${features.size()} features to disk in ${(System.nanoTime() - start)/1000000}ms") + file } + + def read(file: File): CloseableIterator[SimpleFeature] = new FileIterator(file, serializer) } /** * Does a merge sort on a group of locally sorted files and a left-over in-memory iterator * - * @param files files to merge - * @param serializer serializer - * @param mem left-over in-memory features, already sorted - * @param closeable link to the original (now-empty) feature iterator, for cleaning up on close + * @param merging list of feature iterators to merge, already sorted * @param ordering feature ordering */ - private class MergeSortingIterator( - files: IndexedSeq[File], - serializer: SimpleFeatureSerializer, - mem: Iterator[SimpleFeature], - closeable: CloseableIterator[_], - ordering: Ordering[SimpleFeature] - ) extends CloseableIterator[SimpleFeature] { - - // list of feature iterators to merge - private val merging = IndexedSeq(CloseableIterator(mem)) ++ files.map(new FileIterator(_, serializer)) + private class MergeSortingIterator(merging: IndexedSeq[CloseableIterator[SimpleFeature]], ordering: Ordering[SimpleFeature]) + extends CloseableIterator[SimpleFeature] { // priority queue containing the next feature from each file private val heads = { val o = Ordering.by[(SimpleFeature, Int), SimpleFeature](_._1)(ordering) - val q = new java.util.PriorityQueue[(SimpleFeature, Int)](files.size + 1, o) + val q = new java.util.PriorityQueue[(SimpleFeature, Int)](merging.size, o) var i = 0 while (i < merging.length) { val m = merging(i) @@ -234,7 +342,7 @@ object SortingSimpleFeatureIterator extends LazyLogging { f } - override def close(): Unit = CloseQuietly.raise(merging :+ closeable) + override def close(): Unit = CloseQuietly.raise(merging) } /** @@ -263,10 +371,7 @@ object SortingSimpleFeatureIterator extends LazyLogging { override def close(): Unit = { try { is.close() } finally { - if (!file.delete()) { - logger.warn(s"Unable to delete tmp file '${file.getAbsolutePath}''") - file.deleteOnExit() - } + delete(file) } } } diff --git a/geomesa-index-api/src/test/scala/org/locationtech/geomesa/index/utils/SortingSimpleFeatureIteratorTest.scala b/geomesa-index-api/src/test/scala/org/locationtech/geomesa/index/utils/SortingSimpleFeatureIteratorTest.scala index 273d55d46c78..dac873bdfc7a 100644 --- a/geomesa-index-api/src/test/scala/org/locationtech/geomesa/index/utils/SortingSimpleFeatureIteratorTest.scala +++ b/geomesa-index-api/src/test/scala/org/locationtech/geomesa/index/utils/SortingSimpleFeatureIteratorTest.scala @@ -53,7 +53,7 @@ class SortingSimpleFeatureIteratorTest extends Specification with Mockito { there were exactly(4)(features).hasNext there were two(features).next - there were no(features).close + there was one(features).close test.close() there was one(features).close @@ -75,7 +75,7 @@ class SortingSimpleFeatureIteratorTest extends Specification with Mockito { there were exactly(4)(features).hasNext there were two(features).next - there were no(features).close + there was one(features).close test.close() there was one(features).close @@ -125,7 +125,6 @@ class SortingSimpleFeatureIteratorTest extends Specification with Mockito { } finally { QueryProperties.SortMemoryThreshold.threadLocalValue.remove() } - ok } } diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/concurrent/CachedThreadPool.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/concurrent/CachedThreadPool.scala index 026eb0f1e2c7..aa577adbd0d3 100644 --- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/concurrent/CachedThreadPool.scala +++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/concurrent/CachedThreadPool.scala @@ -155,6 +155,14 @@ object CachedThreadPool { */ def submit(command: Runnable): Future[_] = pool.submit(command) + /** + * Submit a single command to run in a potentially cached thread + * + * @param command command + * @return + */ + def call[T](command: Callable[T]): Future[T] = pool.submit(command) + /** * Run commands in a executor with a fixed level of concurrency, potentially re-using threads. Will block * until any submitted tasks are complete.