Skip to content

Commit c85d83a

Browse files
authored
GEOMESA-3570 FSDS - Switch default parquet compression to zstd (#3537)
1 parent 5f88812 commit c85d83a

15 files changed

Lines changed: 30 additions & 38 deletions

File tree

build/dependencies.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ com.fasterxml.jackson.core:jackson-core:2.21.1:compile
88
com.fasterxml.jackson.core:jackson-databind:2.21.1:compile
99
com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.21.1:compile
1010
com.github.ben-manes.caffeine:caffeine:3.1.8:compile
11-
com.github.luben:zstd-jni:1.5.5-11:compile
11+
com.github.luben:zstd-jni:1.5.7-7:compile
1212
com.github.pureconfig:pureconfig-core_2.12:0.17.4:compile
1313
com.github.pureconfig:pureconfig-generic-base_2.12:0.17.4:compile
1414
com.github.pureconfig:pureconfig-generic_2.12:0.17.4:compile

build/test/resources/log4j.xml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66
<param name="ConversionPattern" value="[%d] %5p %c{1}: %m%n"/>
77
</layout>
88
</appender>
9-
<category name="org.locationtech.geomesa.fs">
10-
<priority value="debug"/>
11-
</category>
9+
1210
<category name="org.locationtech.geomesa">
1311
<priority value="info"/>
1412
</category>

geomesa-fs/geomesa-fs-datastore/src/test/scala/org/locationtech/geomesa/fs/data/FileSystemDataStoreTest.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,12 @@ class FileSystemDataStoreTest extends SpecificationWithJUnit with BeforeAfterAll
9090
|fs.metadata.jdbc.url=${container.getJdbcUrl}
9191
|fs.metadata.jdbc.user=${container.getUsername}
9292
|fs.metadata.jdbc.password=${container.getPassword}
93-
|parquet.compression=gzip
9493
|""".stripMargin
9594

9695
private lazy val dsParams = Seq(
9796
Map(
9897
"fs.path" -> s"${dir.getPath}/file",
9998
"fs.metadata.type" -> "file",
100-
"fs.config.properties" -> "parquet.compression=gzip",
10199
"geomesa.security.auths" -> "user",
102100
),
103101
Map(

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-jobs/src/main/scala/org/locationtech/geomesa/fs/storage/jobs/parquet/ParquetStorageConfiguration.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ trait ParquetStorageConfiguration extends StorageConfiguration with LazyLogging
3636
Option(job.getConfiguration.get(ParquetOutputFormat.COMPRESSION))
3737
.orElse(Option(sft.getUserData.get(ParquetOutputFormat.COMPRESSION).asInstanceOf[String]))
3838
.map(CompressionCodecName.valueOf)
39-
.getOrElse(CompressionCodecName.SNAPPY)
39+
.getOrElse(CompressionCodecName.ZSTD)
4040
ParquetOutputFormat.setCompression(job, compression)
4141
logger.debug(s"Parquet compression is $compression")
4242
}

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet-io/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
<groupId>org.apache.hadoop</groupId>
2828
<artifactId>hadoop-common</artifactId>
2929
</dependency>
30+
<dependency>
31+
<groupId>com.github.luben</groupId>
32+
<artifactId>zstd-jni</artifactId>
33+
</dependency>
3034

3135
<!-- test dependencies -->
3236
<dependency>

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet-io/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/io/ParquetFileSystemWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class ParquetFileSystemWriter(
6464
object ParquetFileSystemWriter extends LazyLogging {
6565

6666
def builder(fs: ObjectStore, path: URI, conf: ParquetConfiguration): Builder = {
67-
val codec = CompressionCodecName.fromConf(conf.get("parquet.compression", "SNAPPY"))
67+
val codec = CompressionCodecName.fromConf(conf.get("parquet.compression", "ZSTD"))
6868
logger.debug(s"Using Parquet Compression codec ${codec.name()}")
6969

7070
val file = fs match {

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/CompactionTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class CompactionTest extends SpecificationWithJUnit {
3030
"compact partitions" in {
3131
val tempDir = Files.createTempDirectory("geomesa")
3232
try {
33-
val conf = Map("parquet.compression" -> "gzip", StorageMetadataCatalog.MetadataTypeConfig -> "file")
33+
val conf = Map(StorageMetadataCatalog.MetadataTypeConfig -> "file")
3434
val context = FileSystemContext.create(tempDir.toUri, conf)
3535

3636
val dtg = "2017-01-01"

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/GenerateParquetFiles.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ object GenerateParquetFiles extends StrictLogging {
6666
}
6767

6868
Seq(GeometryEncoding.GeoParquetNative, GeometryEncoding.GeoParquetWkb, GeometryEncoding.GeoMesaV1).foreach { encoding =>
69-
val conf = Map("parquet.compression" -> "gzip", SimpleFeatureParquetSchema.GeometryEncodingKey -> encoding.toString)
69+
val conf = Map(SimpleFeatureParquetSchema.GeometryEncodingKey -> encoding.toString)
7070
val dir = new Path(sys.props("java.io.tmpdir"))
7171
val file = new Path(dir, s"${encoding.toString.replace("GeoParquet", "geoparquet-").toLowerCase(Locale.US)}-test.parquet")
7272
WithClose(new ParquetFileSystemWriter(LocalObjectStore, conf, sft, file.toUri)) { writer =>

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ListMapTest.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,9 @@ class ListMapTest extends SpecificationWithJUnit {
5050
val sf2 = new ScalaSimpleFeature(sft, "2", Array(null, d2, gf.createPoint(new Coordinate(67.2363, 55.236))))
5151
val sf3 = new ScalaSimpleFeature(sft, "3", Array(List.empty[String].asJava, d3, gf.createPoint(new Coordinate(73.0, 73.0))))
5252

53-
// Use GZIP in tests but snappy in prod due to license issues
5453
val writeBuilder =
5554
ParquetFileSystemWriter.builder(LocalObjectStore, f.toUri, sftConf)
56-
.withCompressionCodec(CompressionCodecName.GZIP)
55+
.withCompressionCodec(CompressionCodecName.ZSTD)
5756

5857
WithClose(writeBuilder.build()) { writer =>
5958
writer.write(sf)
@@ -110,10 +109,9 @@ class ListMapTest extends SpecificationWithJUnit {
110109
val sf2 = new ScalaSimpleFeature(sft, "2", Array(null, d2, gf.createPoint(new Coordinate(67.2363, 55.236))))
111110
val sf3 = new ScalaSimpleFeature(sft, "3", Array(Map.empty[String, String].asJava, d3, gf.createPoint(new Coordinate(73.0, 73.0))))
112111

113-
// Use GZIP in tests but snappy in prod due to license issues
114112
val writeBuilder =
115113
ParquetFileSystemWriter.builder(LocalObjectStore, f.toUri, sftConf)
116-
.withCompressionCodec(CompressionCodecName.GZIP)
114+
.withCompressionCodec(CompressionCodecName.ZSTD)
117115

118116
WithClose(writeBuilder.build()) { writer =>
119117
writer.write(sf)
@@ -179,10 +177,9 @@ class ListMapTest extends SpecificationWithJUnit {
179177
val sf2 = new ScalaSimpleFeature(sft, "2", Array(null, null, d2, gf.createPoint(new Coordinate(67.2363, 55.236))))
180178
val sf3 = new ScalaSimpleFeature(sft, "3", Array(List.empty[UUID].asJava, Map.empty[Int, Double].asJava, d3, gf.createPoint(new Coordinate(73.0, 73.0))))
181179

182-
// Use GZIP in tests but snappy in prod due to license issues
183180
val writeBuilder =
184181
ParquetFileSystemWriter.builder(LocalObjectStore, f.toUri, sftConf)
185-
.withCompressionCodec(CompressionCodecName.GZIP)
182+
.withCompressionCodec(CompressionCodecName.ZSTD)
186183
WithClose(writeBuilder.build()) { writer =>
187184
writer.write(sf)
188185
writer.write(sf2)

geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetReadWriteTest.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,14 @@ package org.locationtech.geomesa.parquet
1313
import com.typesafe.scalalogging.LazyLogging
1414
import org.apache.parquet.conf.{ParquetConfiguration, PlainParquetConfiguration}
1515
import org.apache.parquet.filter2.compat.FilterCompat
16-
import org.apache.parquet.hadoop.metadata.CompressionCodecName
1716
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
1817
import org.geotools.data.DataUtilities
1918
import org.geotools.filter.text.ecql.ECQL
2019
import org.junit.runner.RunWith
2120
import org.locationtech.geomesa.features.ScalaSimpleFeature
2221
import org.locationtech.geomesa.fs.storage.core.fs.LocalObjectStore
2322
import org.locationtech.geomesa.fs.storage.parquet.FilterConverter
24-
import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.{FileValidationObserver, ParquetCompressionOpt}
23+
import org.locationtech.geomesa.fs.storage.parquet.ParquetFileSystemStorage.FileValidationObserver
2524
import org.locationtech.geomesa.fs.storage.parquet.io.{ParquetFileSystemReader, ParquetFileSystemWriter, SimpleFeatureParquetSchema}
2625
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
2726
import org.locationtech.geomesa.utils.io.WithClose
@@ -54,8 +53,6 @@ class ParquetReadWriteTest extends Specification with AllExpectations with LazyL
5453
val sftConf = {
5554
val c = new PlainParquetConfiguration()
5655
SimpleFeatureParquetSchema.setSft(c, sft)
57-
// Use GZIP in tests but snappy in prod due to license issues
58-
c.set(ParquetCompressionOpt, CompressionCodecName.GZIP.toString)
5956
c
6057
}
6158

@@ -93,12 +90,12 @@ class ParquetReadWriteTest extends Specification with AllExpectations with LazyL
9390
features.foreach(writer.write)
9491
}
9592

96-
// Corrupt the file by writing an invalid byte somewhere
93+
// corrupt the file by writing invalid bytes somewhere
9794
val randomAccessFile = new RandomAccessFile(f.toFile, "rw")
9895
logger.debug(s"File length: ${randomAccessFile.length()}")
9996
Files.size(f) must beGreaterThan(50L)
100-
randomAccessFile.seek(50)
101-
randomAccessFile.writeByte(999)
97+
randomAccessFile.seek(40)
98+
randomAccessFile.writeBytes("abcdefghij")
10299
randomAccessFile.close()
103100

104101
// Validate the file

0 commit comments

Comments
 (0)