Skip to content

Commit e683ce2

Browse files
committed
address review: make flush threshold configurable and add test
Make kFlushThreshold configurable via PositionDeleteWriterOptions with a default of 1000. Add AutoFlushOnThreshold test that uses a small threshold to verify the automatic flush logic.
1 parent 9a7f53a commit e683ce2

File tree

3 files changed

+30
-4
lines changed

3 files changed

+30
-4
lines changed

src/iceberg/data/position_delete_writer.cc

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class PositionDeleteWriter::Impl {
7070
buffered_positions_.push_back(pos);
7171
referenced_paths_.emplace(file_path);
7272

73-
if (static_cast<int64_t>(buffered_paths_.size()) >= kFlushThreshold) {
73+
if (static_cast<int64_t>(buffered_paths_.size()) >= options_.flush_threshold) {
7474
return FlushBuffer();
7575
}
7676
return {};
@@ -145,8 +145,6 @@ class PositionDeleteWriter::Impl {
145145
}
146146

147147
private:
148-
static constexpr int64_t kFlushThreshold = 1000;
149-
150148
Impl(PositionDeleteWriterOptions options, std::shared_ptr<Schema> delete_schema,
151149
std::unique_ptr<Writer> writer)
152150
: options_(std::move(options)),

src/iceberg/data/position_delete_writer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ struct ICEBERG_EXPORT PositionDeleteWriterOptions {
4747
FileFormatType format = FileFormatType::kParquet;
4848
std::shared_ptr<FileIO> io;
4949
std::shared_ptr<Schema> row_schema; // Optional row data schema
50+
int64_t flush_threshold = 1000; // Number of buffered deletes before auto-flush
5051
std::unordered_map<std::string, std::string> properties;
5152
};
5253

src/iceberg/test/data_writer_test.cc

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,14 +268,15 @@ TEST_F(DataWriterTest, WriteMultipleBatches) {
268268

269269
class PositionDeleteWriterTest : public DataWriterTest {
270270
protected:
271-
PositionDeleteWriterOptions MakeDeleteOptions() {
271+
PositionDeleteWriterOptions MakeDeleteOptions(int64_t flush_threshold = 1000) {
272272
return PositionDeleteWriterOptions{
273273
.path = "test_deletes.parquet",
274274
.schema = schema_,
275275
.spec = partition_spec_,
276276
.partition = PartitionValues{},
277277
.format = FileFormatType::kParquet,
278278
.io = file_io_,
279+
.flush_threshold = flush_threshold,
279280
.properties = {{"write.parquet.compression-codec", "uncompressed"}},
280281
};
281282
}
@@ -396,4 +397,30 @@ TEST_F(PositionDeleteWriterTest, WriteBatchData) {
396397
EXPECT_GT(data_file->file_size_in_bytes, 0);
397398
}
398399

400+
TEST_F(PositionDeleteWriterTest, AutoFlushOnThreshold) {
401+
// Use a small flush threshold to trigger automatic flush
402+
const int64_t flush_threshold = 5;
403+
auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions(flush_threshold));
404+
ASSERT_THAT(writer_result, IsOk());
405+
auto writer = std::move(writer_result.value());
406+
407+
// Write more deletes than the threshold to trigger auto-flush
408+
for (int64_t i = 0; i < 12; ++i) {
409+
ASSERT_THAT(writer->WriteDelete("data_file.parquet", i), IsOk());
410+
}
411+
412+
// Length should be > 0 since auto-flush should have written data
413+
auto length_result = writer->Length();
414+
ASSERT_THAT(length_result, IsOk());
415+
EXPECT_GT(length_result.value(), 0);
416+
417+
ASSERT_THAT(writer->Close(), IsOk());
418+
419+
auto metadata_result = writer->Metadata();
420+
ASSERT_THAT(metadata_result, IsOk());
421+
const auto& data_file = metadata_result.value().data_files[0];
422+
EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
423+
EXPECT_GT(data_file->file_size_in_bytes, 0);
424+
}
425+
399426
} // namespace iceberg

0 commit comments

Comments
 (0)