Skip to content

Commit eb55b0f

Browse files
committed
fix(parquet): check compression codec availability
1 parent 0ca52ba commit eb55b0f

2 files changed

Lines changed: 67 additions & 6 deletions

File tree

src/iceberg/parquet/parquet_writer.cc

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
#include "iceberg/parquet/parquet_writer.h"
2121

2222
#include <memory>
23+
#include <string_view>
2324

2425
#include <arrow/c/bridge.h>
2526
#include <arrow/record_batch.h>
27+
#include <arrow/util/compression.h>
2628
#include <arrow/util/key_value_metadata.h>
2729
#include <parquet/arrow/schema.h>
2830
#include <parquet/arrow/writer.h>
@@ -45,21 +47,31 @@ Result<std::shared_ptr<::arrow::io::OutputStream>> OpenOutputStream(
4547

4648
Result<::arrow::Compression::type> ParseCompression(const WriterProperties& properties) {
4749
const auto& compression_name = properties.Get(WriterProperties::kParquetCompression);
50+
::arrow::Compression::type compression;
4851
if (compression_name == "uncompressed") {
49-
return ::arrow::Compression::UNCOMPRESSED;
52+
compression = ::arrow::Compression::UNCOMPRESSED;
5053
} else if (compression_name == "snappy") {
51-
return ::arrow::Compression::SNAPPY;
54+
compression = ::arrow::Compression::SNAPPY;
5255
} else if (compression_name == "gzip") {
53-
return ::arrow::Compression::GZIP;
56+
compression = ::arrow::Compression::GZIP;
5457
} else if (compression_name == "brotli") {
55-
return ::arrow::Compression::BROTLI;
58+
compression = ::arrow::Compression::BROTLI;
5659
} else if (compression_name == "lz4") {
57-
return ::arrow::Compression::LZ4;
60+
compression = ::arrow::Compression::LZ4;
5861
} else if (compression_name == "zstd") {
59-
return ::arrow::Compression::ZSTD;
62+
compression = ::arrow::Compression::ZSTD;
6063
} else {
6164
return InvalidArgument("Unsupported Parquet compression codec: {}", compression_name);
6265
}
66+
return compression;
67+
}
68+
69+
Status CheckCompressionAvailable(std::string_view compression_name,
70+
::arrow::Compression::type compression) {
71+
ICEBERG_PRECHECK(::arrow::util::Codec::IsAvailable(compression),
72+
"Parquet compression codec {} is not available in the current build",
73+
compression_name);
74+
return {};
6375
}
6476

6577
Result<std::optional<int32_t>> ParseCodecLevel(const WriterProperties& properties) {
@@ -98,6 +110,9 @@ class ParquetWriter::Impl {
98110
auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>(
99111
schema_descriptor->schema_root());
100112

113+
ICEBERG_RETURN_UNEXPECTED(CheckCompressionAvailable(
114+
options.properties.Get(WriterProperties::kParquetCompression), compression));
115+
101116
ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options));
102117
auto file_writer = ::parquet::ParquetFileWriter::Open(
103118
output_stream_, std::move(schema_node), std::move(writer_properties),

src/iceberg/test/parquet_test.cc

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
*/
1919

2020
#include <optional>
21+
#include <string>
22+
#include <utility>
23+
#include <vector>
2124

2225
#include <arrow/array.h>
2326
#include <arrow/c/bridge.h>
@@ -26,6 +29,7 @@
2629
#include <arrow/record_batch.h>
2730
#include <arrow/table.h>
2831
#include <arrow/type.h>
32+
#include <arrow/util/compression.h>
2933
#include <arrow/util/key_value_metadata.h>
3034
#include <parquet/arrow/reader.h>
3135
#include <parquet/arrow/writer.h>
@@ -124,6 +128,25 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr<Schema> s
124128
ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data";
125129
}
126130

131+
struct ParquetCodec {
132+
std::string name;
133+
::arrow::Compression::type compression;
134+
};
135+
136+
std::optional<ParquetCodec> UnavailableParquetCodec() {
137+
const std::vector<ParquetCodec> codecs = {
138+
{"snappy", ::arrow::Compression::SNAPPY}, {"gzip", ::arrow::Compression::GZIP},
139+
{"brotli", ::arrow::Compression::BROTLI}, {"lz4", ::arrow::Compression::LZ4},
140+
{"zstd", ::arrow::Compression::ZSTD},
141+
};
142+
for (const auto& codec : codecs) {
143+
if (!::arrow::util::Codec::IsAvailable(codec.compression)) {
144+
return codec;
145+
}
146+
}
147+
return std::nullopt;
148+
}
149+
127150
} // namespace
128151

129152
class ParquetReaderTest : public TempFileTestBase {
@@ -461,6 +484,29 @@ TEST_F(ParquetReadWrite, EmptyStruct) {
461484
IsError(ErrorKind::kNotImplemented));
462485
}
463486

487+
TEST_F(ParquetReadWrite, RejectsUnavailableCompressionCodec) {
488+
auto unavailable_codec = UnavailableParquetCodec();
489+
if (!unavailable_codec.has_value()) {
490+
GTEST_SKIP() << "All optional Parquet compression codecs are available";
491+
}
492+
493+
auto schema = std::make_shared<Schema>(
494+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
495+
WriterProperties writer_properties;
496+
writer_properties.Set(WriterProperties::kParquetCompression, unavailable_codec->name);
497+
498+
auto writer = WriterFactoryRegistry::Open(
499+
FileFormatType::kParquet, {.path = "unavailable_codec.parquet",
500+
.schema = schema,
501+
.io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(),
502+
.properties = std::move(writer_properties)});
503+
504+
EXPECT_THAT(writer, IsError(ErrorKind::kInvalidArgument));
505+
EXPECT_THAT(writer,
506+
HasErrorMessage("Parquet compression codec " + unavailable_codec->name +
507+
" is not available in the current build"));
508+
}
509+
464510
TEST_F(ParquetReadWrite, SimpleStructRoundTrip) {
465511
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
466512
SchemaField::MakeOptional(1, "a",

0 commit comments

Comments
 (0)