Skip to content

Commit 0862d64

Browse files
authored
feat: add compression config to writer properties (#524)
- refactor writer and reader properties not to use pointers - bump avro-cpp to use codec enum - adapter parquet and avro writers to respect codec config
1 parent f9f1449 commit 0862d64

18 files changed

+153
-107
lines changed

cmake_modules/IcebergThirdpartyToolchain.cmake

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -191,24 +191,16 @@ function(resolve_avro_dependency)
191191
NAMES
192192
avro-cpp
193193
CONFIG)
194-
elseif(DEFINED ENV{ICEBERG_AVRO_GIT_URL})
195-
# Support custom git URL for mirrors
196-
fetchcontent_declare(avro-cpp
197-
${FC_DECLARE_COMMON_OPTIONS}
198-
GIT_REPOSITORY $ENV{ICEBERG_AVRO_GIT_URL}
199-
GIT_TAG e6c308780e876b4c11a470b9900995947f7b0fb5
200-
SOURCE_SUBDIR
201-
lang/c++
202-
FIND_PACKAGE_ARGS
203-
NAMES
204-
avro-cpp
205-
CONFIG)
206194
else()
207-
# Default to GitHub - uses unreleased version
195+
if(DEFINED ENV{ICEBERG_AVRO_GIT_URL})
196+
set(AVRO_GIT_REPOSITORY "$ENV{ICEBERG_AVRO_GIT_URL}")
197+
else()
198+
set(AVRO_GIT_REPOSITORY "https://github.com/apache/avro.git")
199+
endif()
208200
fetchcontent_declare(avro-cpp
209201
${FC_DECLARE_COMMON_OPTIONS}
210-
GIT_REPOSITORY https://github.com/apache/avro.git
211-
GIT_TAG e6c308780e876b4c11a470b9900995947f7b0fb5
202+
GIT_REPOSITORY ${AVRO_GIT_REPOSITORY}
203+
GIT_TAG 11fb55500bed9fbe9af53b85112cd13887f0ce80
212204
SOURCE_SUBDIR
213205
lang/c++
214206
FIND_PACKAGE_ARGS

src/iceberg/avro/avro_reader.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,17 +226,17 @@ class AvroReader::Impl {
226226
return InvalidArgument("Projected schema is required by Avro reader");
227227
}
228228

229-
batch_size_ = options.properties->Get(ReaderProperties::kBatchSize);
229+
batch_size_ = options.properties.Get(ReaderProperties::kBatchSize);
230230
read_schema_ = options.projection;
231231

232232
// Open the input stream and adapt to the avro interface.
233233
ICEBERG_ASSIGN_OR_RAISE(
234234
auto input_stream,
235235
CreateInputStream(options,
236-
options.properties->Get(ReaderProperties::kAvroBufferSize)));
236+
options.properties.Get(ReaderProperties::kAvroBufferSize)));
237237

238238
// Create the appropriate backend based on configuration
239-
if (options.properties->Get(ReaderProperties::kAvroSkipDatum)) {
239+
if (options.properties.Get(ReaderProperties::kAvroSkipDatum)) {
240240
backend_ = std::make_unique<DirectDecoderBackend>();
241241
} else {
242242
backend_ = std::make_unique<GenericDatumBackend>();

src/iceberg/avro/avro_writer.cc

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,41 @@ Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const WriterOptions
5252
return std::make_unique<AvroOutputStream>(output, buffer_size);
5353
}
5454

55+
Result<::avro::Codec> ParseCodec(const WriterProperties& properties) {
56+
const auto& codec_name = properties.Get(WriterProperties::kAvroCompression);
57+
::avro::Codec codec;
58+
if (codec_name == "uncompressed") {
59+
codec = ::avro::NULL_CODEC;
60+
} else if (codec_name == "gzip") {
61+
codec = ::avro::DEFLATE_CODEC;
62+
} else if (codec_name == "snappy") {
63+
codec = ::avro::SNAPPY_CODEC;
64+
} else if (codec_name == "zstd") {
65+
codec = ::avro::ZSTD_CODEC;
66+
} else {
67+
return InvalidArgument("Unsupported Avro codec: {}", codec_name);
68+
}
69+
ICEBERG_PRECHECK(::avro::isCodecAvailable(codec),
70+
"Avro codec {} is not available in the current build", codec_name);
71+
return codec;
72+
}
73+
74+
Result<std::optional<int32_t>> ParseCodecLevel(const WriterProperties& properties) {
75+
auto level_str = properties.Get(WriterProperties::kAvroCompressionLevel);
76+
if (level_str.empty()) {
77+
return std::nullopt;
78+
}
79+
ICEBERG_ASSIGN_OR_RAISE(auto level, StringUtils::ParseInt<int32_t>(level_str));
80+
return level;
81+
}
82+
5583
// Abstract base class for Avro write backends.
5684
class AvroWriteBackend {
5785
public:
5886
virtual ~AvroWriteBackend() = default;
5987
virtual Status Init(std::unique_ptr<AvroOutputStream> output_stream,
6088
const ::avro::ValidSchema& avro_schema, int64_t sync_interval,
89+
::avro::Codec codec, std::optional<int32_t> compression_level,
6190
const std::map<std::string, std::vector<uint8_t>>& metadata) = 0;
6291
virtual Status WriteRow(const Schema& write_schema, const ::arrow::Array& array,
6392
int64_t row_index) = 0;
@@ -70,10 +99,11 @@ class DirectEncoderBackend : public AvroWriteBackend {
7099
public:
71100
Status Init(std::unique_ptr<AvroOutputStream> output_stream,
72101
const ::avro::ValidSchema& avro_schema, int64_t sync_interval,
102+
::avro::Codec codec, std::optional<int32_t> compression_level,
73103
const std::map<std::string, std::vector<uint8_t>>& metadata) override {
74-
writer_ = std::make_unique<::avro::DataFileWriterBase>(std::move(output_stream),
75-
avro_schema, sync_interval,
76-
::avro::NULL_CODEC, metadata);
104+
writer_ = std::make_unique<::avro::DataFileWriterBase>(
105+
std::move(output_stream), avro_schema, sync_interval, codec, metadata,
106+
compression_level);
77107
avro_root_node_ = avro_schema.root();
78108
return {};
79109
}
@@ -111,10 +141,11 @@ class GenericDatumBackend : public AvroWriteBackend {
111141
public:
112142
Status Init(std::unique_ptr<AvroOutputStream> output_stream,
113143
const ::avro::ValidSchema& avro_schema, int64_t sync_interval,
144+
::avro::Codec codec, std::optional<int32_t> compression_level,
114145
const std::map<std::string, std::vector<uint8_t>>& metadata) override {
115146
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
116-
std::move(output_stream), avro_schema, sync_interval, ::avro::NULL_CODEC,
117-
metadata);
147+
std::move(output_stream), avro_schema, sync_interval, codec, metadata,
148+
compression_level);
118149
datum_ = std::make_unique<::avro::GenericDatum>(avro_schema);
119150
return {};
120151
}
@@ -158,7 +189,7 @@ class AvroWriter::Impl {
158189
::avro::NodePtr root;
159190
ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root));
160191
if (const auto& schema_name =
161-
options.properties->Get(WriterProperties::kAvroSchemaName);
192+
options.properties.Get(WriterProperties::kAvroSchemaName);
162193
!schema_name.empty()) {
163194
root->setName(::avro::Name(schema_name));
164195
}
@@ -169,7 +200,7 @@ class AvroWriter::Impl {
169200
ICEBERG_ASSIGN_OR_RAISE(
170201
auto output_stream,
171202
CreateOutputStream(options,
172-
options.properties->Get(WriterProperties::kAvroBufferSize)));
203+
options.properties.Get(WriterProperties::kAvroBufferSize)));
173204
arrow_output_stream_ = output_stream->arrow_output_stream();
174205

175206
std::map<std::string, std::vector<uint8_t>> metadata;
@@ -181,15 +212,19 @@ class AvroWriter::Impl {
181212
}
182213

183214
// Create the appropriate backend based on configuration
184-
if (options.properties->Get(WriterProperties::kAvroSkipDatum)) {
215+
if (options.properties.Get(WriterProperties::kAvroSkipDatum)) {
185216
backend_ = std::make_unique<DirectEncoderBackend>();
186217
} else {
187218
backend_ = std::make_unique<GenericDatumBackend>();
188219
}
189220

190-
ICEBERG_RETURN_UNEXPECTED(backend_->Init(
191-
std::move(output_stream), *avro_schema_,
192-
options.properties->Get(WriterProperties::kAvroSyncInterval), metadata));
221+
ICEBERG_ASSIGN_OR_RAISE(auto codec, ParseCodec(options.properties));
222+
ICEBERG_ASSIGN_OR_RAISE(auto compression_level, ParseCodecLevel(options.properties));
223+
224+
ICEBERG_RETURN_UNEXPECTED(
225+
backend_->Init(std::move(output_stream), *avro_schema_,
226+
options.properties.Get(WriterProperties::kAvroSyncInterval), codec,
227+
compression_level, metadata));
193228

194229
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
195230
return {};

src/iceberg/file_reader.cc

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,10 @@ Result<std::unique_ptr<Reader>> ReaderFactoryRegistry::Open(
5959
return reader;
6060
}
6161

62-
std::unique_ptr<ReaderProperties> ReaderProperties::default_properties() {
63-
return std::unique_ptr<ReaderProperties>(new ReaderProperties());
64-
}
65-
66-
std::unique_ptr<ReaderProperties> ReaderProperties::FromMap(
62+
ReaderProperties ReaderProperties::FromMap(
6763
const std::unordered_map<std::string, std::string>& properties) {
68-
auto reader_properties = std::unique_ptr<ReaderProperties>(new ReaderProperties());
69-
reader_properties->configs_ = properties;
64+
ReaderProperties reader_properties;
65+
reader_properties.configs_ = properties;
7066
return reader_properties;
7167
}
7268

src/iceberg/file_reader.h

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,24 +75,16 @@ class ReaderProperties : public ConfigBase<ReaderProperties> {
7575

7676
/// \brief The batch size to read.
7777
inline static Entry<int64_t> kBatchSize{"read.batch-size", 4096};
78-
7978
/// \brief Skip GenericDatum in Avro reader for better performance.
8079
/// When true, decode directly from Avro to Arrow without GenericDatum intermediate.
8180
/// Default: true (skip GenericDatum for better performance).
8281
inline static Entry<bool> kAvroSkipDatum{"read.avro.skip-datum", true};
83-
8482
/// \brief The buffer size used by Avro input stream.
8583
inline static Entry<int64_t> kAvroBufferSize{"read.avro.buffer-size", 1024 * 1024};
8684

87-
/// \brief Create a default ReaderProperties instance.
88-
static std::unique_ptr<ReaderProperties> default_properties();
89-
9085
/// \brief Create a ReaderProperties instance from a map of key-value pairs.
91-
static std::unique_ptr<ReaderProperties> FromMap(
86+
static ReaderProperties FromMap(
9287
const std::unordered_map<std::string, std::string>& properties);
93-
94-
private:
95-
ReaderProperties() = default;
9688
};
9789

9890
/// \brief Options for creating a reader.
@@ -116,7 +108,7 @@ struct ICEBERG_EXPORT ReaderOptions {
116108
/// that may have different field names than the current schema.
117109
std::shared_ptr<class NameMapping> name_mapping;
118110
/// \brief Format-specific or implementation-specific properties.
119-
std::shared_ptr<ReaderProperties> properties = ReaderProperties::default_properties();
111+
ReaderProperties properties;
120112
};
121113

122114
/// \brief Factory function to create a reader of a specific file format.

src/iceberg/file_writer.cc

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,10 @@ Result<std::unique_ptr<Writer>> WriterFactoryRegistry::Open(
5959
return writer;
6060
}
6161

62-
std::unique_ptr<WriterProperties> WriterProperties::default_properties() {
63-
return std::unique_ptr<WriterProperties>(new WriterProperties());
64-
}
65-
66-
std::unique_ptr<WriterProperties> WriterProperties::FromMap(
62+
WriterProperties WriterProperties::FromMap(
6763
const std::unordered_map<std::string, std::string>& properties) {
68-
auto writer_properties = std::unique_ptr<WriterProperties>(new WriterProperties());
69-
writer_properties->configs_ = properties;
64+
WriterProperties writer_properties;
65+
writer_properties.configs_ = properties;
7066
return writer_properties;
7167
}
7268

src/iceberg/file_writer.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,28 +42,28 @@ class WriterProperties : public ConfigBase<WriterProperties> {
4242

4343
/// \brief The name of the Avro root node schema to write.
4444
inline static Entry<std::string> kAvroSchemaName{"write.avro.schema-name", ""};
45-
4645
/// \brief The buffer size used by Avro output stream.
4746
inline static Entry<int64_t> kAvroBufferSize{"write.avro.buffer-size", 1024 * 1024};
48-
4947
/// \brief The sync interval used by Avro writer.
5048
inline static Entry<int64_t> kAvroSyncInterval{"write.avro.sync-interval", 16 * 1024};
51-
5249
/// \brief Whether to skip GenericDatum and use direct encoder for Avro writing.
5350
/// When true, uses direct encoder (faster). When false, uses GenericDatum.
5451
inline static Entry<bool> kAvroSkipDatum{"write.avro.skip-datum", true};
52+
inline static Entry<std::string> kAvroCompression{"write.avro.compression-codec",
53+
"gzip"};
54+
inline static Entry<std::string> kAvroCompressionLevel{"write.avro.compression-level",
55+
""};
5556

56-
/// TODO(gangwu): add more properties, like compression codec, compression level, etc.
57+
inline static Entry<std::string> kParquetCompression{"write.parquet.compression-codec",
58+
"zstd"};
59+
inline static Entry<std::string> kParquetCompressionLevel{
60+
"write.parquet.compression-level", ""};
5761

58-
/// \brief Create a default WriterProperties instance.
59-
static std::unique_ptr<WriterProperties> default_properties();
62+
/// TODO(gangwu): add table properties with write.avro|parquet|orc.*
6063

6164
/// \brief Create a WriterProperties instance from a map of key-value pairs.
62-
static std::unique_ptr<WriterProperties> FromMap(
65+
static WriterProperties FromMap(
6366
const std::unordered_map<std::string, std::string>& properties);
64-
65-
private:
66-
WriterProperties() = default;
6767
};
6868

6969
/// \brief Options for creating a writer.
@@ -79,7 +79,7 @@ struct ICEBERG_EXPORT WriterOptions {
7979
/// \brief Metadata to write to the file.
8080
std::unordered_map<std::string, std::string> metadata;
8181
/// \brief Format-specific or implementation-specific properties.
82-
std::shared_ptr<WriterProperties> properties = WriterProperties::default_properties();
82+
WriterProperties properties;
8383
};
8484

8585
/// \brief Base writer class to write data from different file formats.

src/iceberg/json_internal.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,8 +1119,6 @@ Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const nlohmann::jso
11191119
if (json.contains(kProperties)) {
11201120
ICEBERG_ASSIGN_OR_RAISE(auto properties, FromJsonMap(json, kProperties));
11211121
table_metadata->properties = TableProperties::FromMap(std::move(properties));
1122-
} else {
1123-
table_metadata->properties = TableProperties::default_properties();
11241122
}
11251123

11261124
// This field is optional, but internally we set this to -1 when not set

src/iceberg/manifest/manifest_writer.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,9 @@ Result<std::unique_ptr<Writer>> OpenFileWriter(
256256
std::string_view location, std::shared_ptr<Schema> schema,
257257
std::shared_ptr<FileIO> file_io,
258258
std::unordered_map<std::string, std::string> metadata, std::string_view schema_name) {
259-
auto writer_properties = WriterProperties::default_properties();
259+
WriterProperties writer_properties;
260260
if (!schema_name.empty()) {
261-
writer_properties->Set(WriterProperties::kAvroSchemaName, std::string(schema_name));
261+
writer_properties.Set(WriterProperties::kAvroSchemaName, std::string(schema_name));
262262
}
263263
ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open(
264264
FileFormatType::kAvro,

src/iceberg/parquet/parquet_reader.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ class ParquetReader::Impl {
124124
::parquet::ReaderProperties reader_properties(pool_);
125125
::parquet::ArrowReaderProperties arrow_reader_properties;
126126
arrow_reader_properties.set_batch_size(
127-
options.properties->Get(ReaderProperties::kBatchSize));
127+
options.properties.Get(ReaderProperties::kBatchSize));
128128
arrow_reader_properties.set_arrow_extensions_enabled(true);
129129

130130
// Open the Parquet file reader

0 commit comments

Comments
 (0)