Skip to content

Commit ed56d1b

Browse files
authored
GEOMESA-3578 FSDS - Standardize target file size (#3545)
* Target file size must be set explicitly * Avoids updating metadata for one-off commands * Store configs in separate rows in JdbcMetadata to reduce update cost * Fix potential conflict between feature types in JdbcMetadata
1 parent 291790f commit ed56d1b

16 files changed

Lines changed: 227 additions & 167 deletions

File tree

docs/user/filesystem/commandline.rst

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ Argument Description
4444
``-p, --path *`` The filesystem root path used to store data
4545
``-f, --feature-name *`` The name of the schema
4646
``--partitions`` Partitions to compact (omit to compact all partitions)
47-
``--target-file-size`` Target size for data files (e.g. 500MB or 1GB)
4847
``--mode`` One of ``local`` or ``distributed`` (to use map/reduce)
4948
``--temp-path`` Path to a temp directory used for working files
5049
======================== =========================================================
@@ -113,7 +112,6 @@ Argument Description
113112
``-p, --path *`` The filesystem root path used to store data
114113
``--partition-scheme`` Partition schemes
115114
``--num-reducers`` Number of reducers to use (required for distributed ingest)
116-
``--target-file-size`` Target size for data files (e.g. 500MB or 1GB)
117115
``--temp-path`` Path to a temp directory used for working files
118116
``--storage-opt`` Additional storage options to set as SimpleFeatureType user data, in the form ``key=value``
119117
======================== =============================================================
@@ -137,6 +135,7 @@ sub-commands:
137135

138136
* ``register`` - create a new metadata entry for an existing data file
139137
* ``unregister`` - remove a metadata entry for an existing data file
138+
* ``configure`` - set or unset metadata configuration values
140139
* ``migrate`` - migrate metadata from one type to another
141140
* ``check-consistency`` - check consistency between the metadata and data files
142141

@@ -176,6 +175,17 @@ Argument Description
176175
``<file> *`` The path of the file to unregister, relative to the storage root path
177176
======================== =====================================================================================================
178177

178+
``configure``
179+
~~~~~~~~~~~~~
180+
181+
The ``configure`` sub-command lets you set storage-level configuration options.
182+
183+
======================== =====================================================================================================
184+
Argument Description
185+
======================== =====================================================================================================
186+
``--set *`` The configuration to set, in the form ``key=value``. To remove a key, use an empty value
187+
======================== =====================================================================================================
188+
179189
``migrate``
180190
~~~~~~~~~~~~~~
181191

docs/user/filesystem/index_config.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ the user data key ``geomesa.fs.file-size``:
7575
// or set directly in the user data as a string
7676
sft.getUserData.put("geomesa.fs.file-size", "1GB")
7777

78-
Note that target file size can also be specified in some operations, which will override any default configured
79-
in the feature type. See :ref:`fsds_compact_command` and :ref:`fsds_ingest_command` for details. See
80-
:ref:`fsds_size_threshold_prop` for controlling the file size error margin.
78+
Once the schema has been created, the file size can be configured through the storage metadata key ``target-file-size``. See
79+
:ref:`fsds_manage_metadata_command` for setting metadata keys, and see :ref:`fsds_size_threshold_prop` for controlling the file
80+
size error margin.
8181

8282
Configuring Visibility Persistence
8383
----------------------------------

geomesa-features/geomesa-feature-exporters/src/main/scala/org/locationtech/geomesa/features/exporters/GeoParquetExporter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class GeoParquetExporter(path: String) extends FeatureExporter with LazyLogging
5454
Some(i)
5555
}
5656

57-
override def bytes: Long = if (writer == null) { 0 } else { writer.getDataSize }
57+
override def bytes: Long = if (writer == null) { 0 } else { writer.size }
5858

5959
override def close(): Unit = CloseWithLogging(Seq(writer, fs).filter(_ != null))
6060
}

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class ConverterStorage(
4444
new ConverterFileSystemReader(fs, context.root, converter, filter, transform, pathFiltering)
4545
}
4646

47-
override def compact(partition: Partition, fileSize: Option[Long], threads: Int): Unit =
47+
override def compact(partition: Partition, threads: Int): Unit =
4848
throw new UnsupportedOperationException("Converter storage does not support compactions")
4949

5050
override def register(file: URI): StorageFile =

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/FileSystemStorage.scala

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,11 @@ abstract class FileSystemStorage(val context: FileSystemContext, val metadata: S
171171
* multiple threads or storage instances attempt to compact the same partition simultaneously.
172172
*
173173
* @param partition partition to compact, or all partitions
174-
* @param fileSize approximate target size of files, in bytes
175174
* @param threads suggested threads to use for file system operations
176175
*/
177-
def compact(partition: Partition, fileSize: Option[Long] = None, threads: Int = 1): Unit = {
178-
val target = fileSize.orElse(this.sizer.targetSize)
176+
def compact(partition: Partition, threads: Int = 1): Unit = {
179177
val files = metadata.getFiles(partition)
180-
val toCompact = target match {
178+
val toCompact = sizer.targetSize match {
181179
case None => files
182180
case Some(t) =>
183181
files.filter { f =>
@@ -201,7 +199,7 @@ abstract class FileSystemStorage(val context: FileSystemContext, val metadata: S
201199
// tracks newly added files so we can register them atomically
202200
val fileTracker = new FileTracker(metadata.sft, metadata.schemes)
203201

204-
WithClose(createWriter(partition, StorageFileAction.Append, FileType.Compacted, target, fileTracker)) { writer =>
202+
WithClose(createWriter(partition, StorageFileAction.Append, FileType.Compacted, fileTracker)) { writer =>
205203
WithClose(FileSystemThreadedReader(reader, toCompact, threads)) { reader =>
206204
while (reader.hasNext) {
207205
val feature = reader.next()
@@ -265,7 +263,7 @@ abstract class FileSystemStorage(val context: FileSystemContext, val metadata: S
265263
case StorageFileAction.Modify => FileType.Modified
266264
case StorageFileAction.Delete => FileType.Deleted
267265
}
268-
createWriter(partition, action, fileType, sizer.targetSize, metadata)
266+
createWriter(partition, action, fileType, metadata)
269267
}
270268

271269
/**
@@ -274,30 +272,28 @@ abstract class FileSystemStorage(val context: FileSystemContext, val metadata: S
274272
* @param partition partition being written to
275273
* @param action write type
276274
* @param fileType file type
277-
* @param targetFileSize target file size
278275
* @param metadata metata to track added files
279276
* @return
280277
*/
281278
private def createWriter(
282279
partition: Partition,
283280
action: StorageFileAction,
284281
fileType: FileType,
285-
targetFileSize: Option[Long],
286282
metadata: StorageMetadata): FileSystemWriter = {
287283

288-
def pathAndWriter: (URI, FileSystemWriter) = {
284+
def newWriter(): FileSystemWriter = {
289285
val file = FileSystemStorage.newFilePath(metadata.sft.getTypeName, fileType, encoding)
290286
val path = context.root.resolve(file)
291287
val updateObserver = new MetadataObserver(metadata, file, partition, action)
292288
val observer = if (observers.isEmpty) { updateObserver } else {
293289
new CompositeObserver(observers.map(_.apply(path)).+:(updateObserver))
294290
}
295-
(path, createWriter(path, partition, observer))
291+
createWriter(path, partition, observer)
296292
}
297293

298-
targetFileSize match {
299-
case None => pathAndWriter._2
300-
case Some(s) => new ChunkedFileSystemWriter(fs, Iterator.continually(pathAndWriter), sizer.estimator(s))
294+
sizer.targetSize match {
295+
case None => newWriter()
296+
case Some(s) => new ChunkedFileSystemWriter(Iterator.continually(newWriter()), sizer.estimator(s))
301297
}
302298
}
303299
}
@@ -345,47 +341,54 @@ object FileSystemStorage {
345341
* @param feature feature
346342
*/
347343
def write(feature: SimpleFeature): Unit
344+
345+
/**
346+
* Gets the size of the data written so far, in bytes. May not be accurate until the writer is
347+
* closed, due to buffering, etc
348+
*
349+
* @return
350+
*/
351+
def size: Long
348352
}
349353

350354
/**
351355
* Writes files up to a given size, then starts a new file
352356
*
353-
* @param fs file system
354357
* @param writers iterator of files to write
355358
* @param estimator target file size estimator
356359
*/
357-
private class ChunkedFileSystemWriter(
358-
fs: ObjectStore,
359-
writers: Iterator[(URI, FileSystemWriter)],
360-
estimator: UpdatingFileSizeEstimator
361-
) extends FileSystemWriter {
362-
363-
private var count = 0L // number of features written
364-
private var total = 0L // sum size of all finished chunks
365-
private var remaining = estimator.estimate(0L)
360+
class ChunkedFileSystemWriter(writers: Iterator[FileSystemWriter], estimator: UpdatingFileSizeEstimator)
361+
extends FileSystemWriter {
366362

367-
private var path: URI = _
363+
private var totalCount = 0L // total number of features written across all chunks
364+
private var totalBytes = 0L // sum size of all finished chunks
365+
private var remaining = estimator.estimate(0L)
368366
private var writer: FileSystemWriter = _
369367

370368
override def write(feature: SimpleFeature): Unit = {
371369
if (writer == null) {
372-
val (path, writer) = writers.next()
373-
this.path = path
374-
this.writer = writer
370+
writer = writers.next()
375371
}
376372
writer.write(feature)
377-
count += 1
373+
totalCount += 1
378374
remaining -= 1
379375
if (remaining == 0) {
380-
writer.close()
381-
writer = null
382-
// adjust our estimate to account for the actual bytes written
383-
total += fs.size(path)
384-
estimator.update(total, count)
385-
remaining = estimator.estimate(0L)
376+
val dataSize = writer.size
377+
if (estimator.done(dataSize)) {
378+
writer.close()
379+
totalBytes += writer.size // re-calculate now that writer is closed, so we get the final, accurate size
380+
writer = null
381+
// adjust our estimate to account for the actual bytes written
382+
estimator.update(totalBytes, totalCount)
383+
remaining = estimator.estimate(0L)
384+
} else {
385+
remaining = math.max(100L, estimator.estimate(dataSize))
386+
}
386387
}
387388
}
388389

390+
override def size: Long = totalBytes + Option(writer).fold(0L)(_.size)
391+
389392
override def flush(): Unit = if (writer != null) { writer.flush() }
390393

391394
override def close(): Unit = {

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/StorageMetadata.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ trait StorageMetadata extends Closeable {
9898
* Set a key-value pair
9999
*
100100
* @param key key
101-
* @param value value
101+
* @param value value - may be null
102102
*/
103103
def set(key: String, value: String): Unit = throw new UnsupportedOperationException()
104104
}

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/metadata/FileBasedMetadata.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,11 @@ class FileBasedMetadata(fs: ObjectStore, meta: Metadata, directory: URI)
6262

6363
// note: this isn't synchronized across jvms
6464
override def set(key: String, value: String): Unit = FileBasedMetadata.synchronized {
65-
kvs.put(key, value)
65+
if (value == null) {
66+
kvs.remove(key)
67+
} else {
68+
kvs.put(key, value)
69+
}
6670
WithClose(fs.overwrite(metadataFilePath)) { out =>
6771
MetadataSerialization.serialize(out, meta.copy(config = kvs.asScala.toMap))
6872
}

0 commit comments

Comments
 (0)