Skip to content

Commit 6f3eaf3

Browse files
shangxinliwgtmac
authored andcommitted
feat: Implement EqualityDeleteWriter for equality delete files
Implement the EqualityDeleteWriter following the same PIMPL pattern as DataWriter. The writer accepts Arrow data matching the equality delete schema (columns for the equality field values) and produces metadata with content=kEqualityDeletes, equality_ids set from options, and sort_order_id propagated from options.
1 parent 69cf2d3 commit 6f3eaf3

File tree

3 files changed

+257
-5
lines changed

3 files changed

+257
-5
lines changed

src/iceberg/data/equality_delete_writer.cc

Lines changed: 112 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,131 @@
1919

2020
#include "iceberg/data/equality_delete_writer.h"
2121

22+
#include <map>
23+
24+
#include "iceberg/file_writer.h"
25+
#include "iceberg/manifest/manifest_entry.h"
26+
#include "iceberg/util/macros.h"
27+
2228
namespace iceberg {
2329

2430
class EqualityDeleteWriter::Impl {
2531
public:
32+
static Result<std::unique_ptr<Impl>> Make(EqualityDeleteWriterOptions options) {
33+
WriterOptions writer_options{
34+
.path = options.path,
35+
.schema = options.schema,
36+
.io = options.io,
37+
.properties = WriterProperties::FromMap(options.properties),
38+
};
39+
40+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
41+
WriterFactoryRegistry::Open(options.format, writer_options));
42+
43+
return std::unique_ptr<Impl>(new Impl(std::move(options), std::move(writer)));
44+
}
45+
46+
Status Write(ArrowArray* data) {
47+
ICEBERG_DCHECK(writer_, "Writer not initialized");
48+
return writer_->Write(data);
49+
}
50+
51+
Result<int64_t> Length() const {
52+
ICEBERG_DCHECK(writer_, "Writer not initialized");
53+
return writer_->length();
54+
}
55+
56+
Status Close() {
57+
ICEBERG_DCHECK(writer_, "Writer not initialized");
58+
if (closed_) {
59+
// Idempotent: no-op if already closed
60+
return {};
61+
}
62+
ICEBERG_RETURN_UNEXPECTED(writer_->Close());
63+
closed_ = true;
64+
return {};
65+
}
66+
67+
Result<FileWriter::WriteResult> Metadata() {
68+
ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
69+
70+
ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
71+
ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
72+
auto split_offsets = writer_->split_offsets();
73+
74+
// Serialize literal bounds to binary format
75+
std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
76+
for (const auto& [col_id, literal] : metrics.lower_bounds) {
77+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
78+
lower_bounds_map[col_id] = std::move(serialized);
79+
}
80+
std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
81+
for (const auto& [col_id, literal] : metrics.upper_bounds) {
82+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
83+
upper_bounds_map[col_id] = std::move(serialized);
84+
}
85+
86+
auto data_file = std::make_shared<DataFile>(DataFile{
87+
.content = DataFile::Content::kEqualityDeletes,
88+
.file_path = options_.path,
89+
.file_format = options_.format,
90+
.partition = options_.partition,
91+
.record_count = metrics.row_count.value_or(-1),
92+
.file_size_in_bytes = length,
93+
.column_sizes = {metrics.column_sizes.begin(), metrics.column_sizes.end()},
94+
.value_counts = {metrics.value_counts.begin(), metrics.value_counts.end()},
95+
.null_value_counts = {metrics.null_value_counts.begin(),
96+
metrics.null_value_counts.end()},
97+
.nan_value_counts = {metrics.nan_value_counts.begin(),
98+
metrics.nan_value_counts.end()},
99+
.lower_bounds = std::move(lower_bounds_map),
100+
.upper_bounds = std::move(upper_bounds_map),
101+
.split_offsets = std::move(split_offsets),
102+
.equality_ids = options_.equality_field_ids,
103+
.sort_order_id = options_.sort_order_id,
104+
});
105+
106+
FileWriter::WriteResult result;
107+
result.data_files.push_back(std::move(data_file));
108+
return result;
109+
}
110+
111+
std::span<const int32_t> equality_field_ids() const {
112+
return options_.equality_field_ids;
113+
}
114+
115+
private:
116+
Impl(EqualityDeleteWriterOptions options, std::unique_ptr<Writer> writer)
117+
: options_(std::move(options)), writer_(std::move(writer)) {}
118+
119+
EqualityDeleteWriterOptions options_;
120+
std::unique_ptr<Writer> writer_;
121+
bool closed_ = false;
26122
};
27123

124+
EqualityDeleteWriter::EqualityDeleteWriter(std::unique_ptr<Impl> impl)
125+
: impl_(std::move(impl)) {}
126+
28127
EqualityDeleteWriter::~EqualityDeleteWriter() = default;
29128

30-
Status EqualityDeleteWriter::Write(ArrowArray* data) { return NotImplemented(""); }
129+
Result<std::unique_ptr<EqualityDeleteWriter>> EqualityDeleteWriter::Make(
130+
const EqualityDeleteWriterOptions& options) {
131+
ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
132+
return std::unique_ptr<EqualityDeleteWriter>(new EqualityDeleteWriter(std::move(impl)));
133+
}
134+
135+
Status EqualityDeleteWriter::Write(ArrowArray* data) { return impl_->Write(data); }
31136

32-
Result<int64_t> EqualityDeleteWriter::Length() const { return NotImplemented(""); }
137+
Result<int64_t> EqualityDeleteWriter::Length() const { return impl_->Length(); }
33138

34-
Status EqualityDeleteWriter::Close() { return NotImplemented(""); }
139+
Status EqualityDeleteWriter::Close() { return impl_->Close(); }
35140

36141
Result<FileWriter::WriteResult> EqualityDeleteWriter::Metadata() {
37-
return NotImplemented("");
142+
return impl_->Metadata();
38143
}
39144

40-
std::span<const int32_t> EqualityDeleteWriter::equality_field_ids() const { return {}; }
145+
std::span<const int32_t> EqualityDeleteWriter::equality_field_ids() const {
146+
return impl_->equality_field_ids();
147+
}
41148

42149
} // namespace iceberg

src/iceberg/data/equality_delete_writer.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter {
5757
public:
5858
~EqualityDeleteWriter() override;
5959

60+
/// \brief Create a new EqualityDeleteWriter instance.
61+
static Result<std::unique_ptr<EqualityDeleteWriter>> Make(
62+
const EqualityDeleteWriterOptions& options);
63+
6064
Status Write(ArrowArray* data) override;
6165
Result<int64_t> Length() const override;
6266
Status Close() override;
@@ -67,6 +71,8 @@ class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter {
6771
private:
6872
class Impl;
6973
std::unique_ptr<Impl> impl_;
74+
75+
explicit EqualityDeleteWriter(std::unique_ptr<Impl> impl);
7076
};
7177

7278
} // namespace iceberg

src/iceberg/test/data_writer_test.cc

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
2929
#include "iceberg/avro/avro_register.h"
30+
#include "iceberg/data/equality_delete_writer.h"
3031
#include "iceberg/data/position_delete_writer.h"
3132
#include "iceberg/file_format.h"
3233
#include "iceberg/manifest/manifest_entry.h"
@@ -423,4 +424,142 @@ TEST_F(PositionDeleteWriterTest, AutoFlushOnThreshold) {
423424
EXPECT_GT(data_file->file_size_in_bytes, 0);
424425
}
425426

427+
class EqualityDeleteWriterTest : public DataWriterTest {
428+
protected:
429+
EqualityDeleteWriterOptions MakeDeleteOptions(
430+
std::vector<int32_t> equality_field_ids = {1, 2},
431+
std::optional<int32_t> sort_order_id = std::nullopt) {
432+
return EqualityDeleteWriterOptions{
433+
.path = "test_eq_deletes.parquet",
434+
.schema = schema_,
435+
.spec = partition_spec_,
436+
.partition = PartitionValues{},
437+
.format = FileFormatType::kParquet,
438+
.io = file_io_,
439+
.equality_field_ids = std::move(equality_field_ids),
440+
.sort_order_id = sort_order_id,
441+
.properties = {{"write.parquet.compression-codec", "uncompressed"}},
442+
};
443+
}
444+
445+
void WriteTestDataToEqualityWriter(EqualityDeleteWriter* writer) {
446+
auto test_data = CreateTestData();
447+
ArrowArray arrow_array;
448+
ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok());
449+
ASSERT_THAT(writer->Write(&arrow_array), IsOk());
450+
}
451+
};
452+
453+
TEST_F(EqualityDeleteWriterTest, WriteAndClose) {
454+
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
455+
ASSERT_THAT(writer_result, IsOk());
456+
auto writer = std::move(writer_result.value());
457+
458+
WriteTestDataToEqualityWriter(writer.get());
459+
460+
auto length_result = writer->Length();
461+
ASSERT_THAT(length_result, IsOk());
462+
EXPECT_GT(length_result.value(), 0);
463+
464+
ASSERT_THAT(writer->Close(), IsOk());
465+
}
466+
467+
TEST_F(EqualityDeleteWriterTest, MetadataAfterClose) {
468+
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
469+
ASSERT_THAT(writer_result, IsOk());
470+
auto writer = std::move(writer_result.value());
471+
472+
WriteTestDataToEqualityWriter(writer.get());
473+
ASSERT_THAT(writer->Close(), IsOk());
474+
475+
auto metadata_result = writer->Metadata();
476+
ASSERT_THAT(metadata_result, IsOk());
477+
478+
const auto& write_result = metadata_result.value();
479+
ASSERT_EQ(write_result.data_files.size(), 1);
480+
481+
const auto& data_file = write_result.data_files[0];
482+
EXPECT_EQ(data_file->content, DataFile::Content::kEqualityDeletes);
483+
EXPECT_EQ(data_file->file_path, "test_eq_deletes.parquet");
484+
EXPECT_EQ(data_file->file_format, FileFormatType::kParquet);
485+
EXPECT_GT(data_file->file_size_in_bytes, 0);
486+
487+
// Partition spec id must be set
488+
ASSERT_TRUE(data_file->partition_spec_id.has_value());
489+
EXPECT_EQ(data_file->partition_spec_id.value(), PartitionSpec::kInitialSpecId);
490+
491+
// Equality field ids must be set
492+
ASSERT_EQ(data_file->equality_ids.size(), 2);
493+
EXPECT_EQ(data_file->equality_ids[0], 1);
494+
EXPECT_EQ(data_file->equality_ids[1], 2);
495+
}
496+
497+
TEST_F(EqualityDeleteWriterTest, MetadataBeforeCloseReturnsError) {
498+
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
499+
ASSERT_THAT(writer_result, IsOk());
500+
auto writer = std::move(writer_result.value());
501+
502+
auto metadata_result = writer->Metadata();
503+
ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed));
504+
EXPECT_THAT(metadata_result,
505+
HasErrorMessage("Cannot get metadata before closing the writer"));
506+
}
507+
508+
TEST_F(EqualityDeleteWriterTest, CloseIsIdempotent) {
509+
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
510+
ASSERT_THAT(writer_result, IsOk());
511+
auto writer = std::move(writer_result.value());
512+
513+
WriteTestDataToEqualityWriter(writer.get());
514+
515+
ASSERT_THAT(writer->Close(), IsOk());
516+
ASSERT_THAT(writer->Close(), IsOk());
517+
ASSERT_THAT(writer->Close(), IsOk());
518+
}
519+
520+
TEST_F(EqualityDeleteWriterTest, SortOrderIdInMetadata) {
521+
const int32_t sort_order_id = 7;
522+
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions({1}, sort_order_id));
523+
ASSERT_THAT(writer_result, IsOk());
524+
auto writer = std::move(writer_result.value());
525+
526+
WriteTestDataToEqualityWriter(writer.get());
527+
ASSERT_THAT(writer->Close(), IsOk());
528+
529+
auto metadata_result = writer->Metadata();
530+
ASSERT_THAT(metadata_result, IsOk());
531+
const auto& data_file = metadata_result.value().data_files[0];
532+
ASSERT_TRUE(data_file->sort_order_id.has_value());
533+
EXPECT_EQ(data_file->sort_order_id.value(), sort_order_id);
534+
}
535+
536+
TEST_F(EqualityDeleteWriterTest, EqualityFieldIdsAccessor) {
537+
std::vector<int32_t> field_ids = {1, 2, 3};
538+
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions(field_ids));
539+
ASSERT_THAT(writer_result, IsOk());
540+
auto writer = std::move(writer_result.value());
541+
542+
auto ids = writer->equality_field_ids();
543+
ASSERT_EQ(ids.size(), 3);
544+
EXPECT_EQ(ids[0], 1);
545+
EXPECT_EQ(ids[1], 2);
546+
EXPECT_EQ(ids[2], 3);
547+
}
548+
549+
TEST_F(EqualityDeleteWriterTest, WriteMultipleBatches) {
550+
auto writer_result = EqualityDeleteWriter::Make(MakeDeleteOptions());
551+
ASSERT_THAT(writer_result, IsOk());
552+
auto writer = std::move(writer_result.value());
553+
554+
WriteTestDataToEqualityWriter(writer.get());
555+
WriteTestDataToEqualityWriter(writer.get());
556+
ASSERT_THAT(writer->Close(), IsOk());
557+
558+
auto metadata_result = writer->Metadata();
559+
ASSERT_THAT(metadata_result, IsOk());
560+
const auto& data_file = metadata_result.value().data_files[0];
561+
EXPECT_EQ(data_file->content, DataFile::Content::kEqualityDeletes);
562+
EXPECT_GT(data_file->file_size_in_bytes, 0);
563+
}
564+
426565
} // namespace iceberg

0 commit comments

Comments
 (0)