Skip to content

Commit 9a7f53a

Browse files
shangxinliclaude
andcommitted
feat: Implement PositionDeleteWriter for position delete files
Implement the PositionDeleteWriter following the same PIMPL pattern as DataWriter. The writer supports both buffered WriteDelete(file_path, pos) calls and direct Write(ArrowArray*) for pre-formed batches. Metadata reports content=kPositionDeletes with sort_order_id=nullopt per spec, and tracks referenced_data_file when all deletes target a single file. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent cd93b99 commit 9a7f53a

File tree

3 files changed

+320
-5
lines changed

3 files changed

+320
-5
lines changed

src/iceberg/data/position_delete_writer.cc

Lines changed: 182 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,203 @@
1919

2020
#include "iceberg/data/position_delete_writer.h"
2121

22+
#include <map>
23+
#include <set>
24+
#include <vector>
25+
26+
#include <nanoarrow/nanoarrow.h>
27+
28+
#include "iceberg/arrow/nanoarrow_status_internal.h"
29+
#include "iceberg/file_writer.h"
30+
#include "iceberg/manifest/manifest_entry.h"
31+
#include "iceberg/metadata_columns.h"
32+
#include "iceberg/schema.h"
33+
#include "iceberg/schema_internal.h"
34+
#include "iceberg/util/macros.h"
35+
2236
namespace iceberg {
2337

2438
class PositionDeleteWriter::Impl {
2539
public:
40+
static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions options) {
41+
// Build the position delete schema with file_path and pos columns
42+
std::vector<SchemaField> fields;
43+
fields.push_back(MetadataColumns::kDeleteFilePath);
44+
fields.push_back(MetadataColumns::kDeleteFilePos);
45+
46+
auto delete_schema = std::make_shared<Schema>(std::move(fields));
47+
48+
WriterOptions writer_options{
49+
.path = options.path,
50+
.schema = delete_schema,
51+
.io = options.io,
52+
.properties = WriterProperties::FromMap(options.properties),
53+
};
54+
55+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
56+
WriterFactoryRegistry::Open(options.format, writer_options));
57+
58+
return std::unique_ptr<Impl>(
59+
new Impl(std::move(options), std::move(delete_schema), std::move(writer)));
60+
}
61+
62+
Status Write(ArrowArray* data) {
63+
ICEBERG_DCHECK(writer_, "Writer not initialized");
64+
return writer_->Write(data);
65+
}
66+
67+
Status WriteDelete(std::string_view file_path, int64_t pos) {
68+
ICEBERG_DCHECK(writer_, "Writer not initialized");
69+
buffered_paths_.emplace_back(file_path);
70+
buffered_positions_.push_back(pos);
71+
referenced_paths_.emplace(file_path);
72+
73+
if (static_cast<int64_t>(buffered_paths_.size()) >= kFlushThreshold) {
74+
return FlushBuffer();
75+
}
76+
return {};
77+
}
78+
79+
Result<int64_t> Length() const {
80+
ICEBERG_DCHECK(writer_, "Writer not initialized");
81+
return writer_->length();
82+
}
83+
84+
Status Close() {
85+
ICEBERG_DCHECK(writer_, "Writer not initialized");
86+
if (closed_) {
87+
return {};
88+
}
89+
if (!buffered_paths_.empty()) {
90+
ICEBERG_RETURN_UNEXPECTED(FlushBuffer());
91+
}
92+
ICEBERG_RETURN_UNEXPECTED(writer_->Close());
93+
closed_ = true;
94+
return {};
95+
}
96+
97+
Result<FileWriter::WriteResult> Metadata() {
98+
ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
99+
100+
ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
101+
ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
102+
auto split_offsets = writer_->split_offsets();
103+
104+
// Serialize literal bounds to binary format
105+
std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
106+
for (const auto& [col_id, literal] : metrics.lower_bounds) {
107+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
108+
lower_bounds_map[col_id] = std::move(serialized);
109+
}
110+
std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
111+
for (const auto& [col_id, literal] : metrics.upper_bounds) {
112+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
113+
upper_bounds_map[col_id] = std::move(serialized);
114+
}
115+
116+
// Set referenced_data_file if all deletes reference the same data file
117+
std::optional<std::string> referenced_data_file;
118+
if (referenced_paths_.size() == 1) {
119+
referenced_data_file = *referenced_paths_.begin();
120+
}
121+
122+
auto data_file = std::make_shared<DataFile>(DataFile{
123+
.content = DataFile::Content::kPositionDeletes,
124+
.file_path = options_.path,
125+
.file_format = options_.format,
126+
.partition = options_.partition,
127+
.record_count = metrics.row_count.value_or(-1),
128+
.file_size_in_bytes = length,
129+
.column_sizes = {metrics.column_sizes.begin(), metrics.column_sizes.end()},
130+
.value_counts = {metrics.value_counts.begin(), metrics.value_counts.end()},
131+
.null_value_counts = {metrics.null_value_counts.begin(),
132+
metrics.null_value_counts.end()},
133+
.nan_value_counts = {metrics.nan_value_counts.begin(),
134+
metrics.nan_value_counts.end()},
135+
.lower_bounds = std::move(lower_bounds_map),
136+
.upper_bounds = std::move(upper_bounds_map),
137+
.split_offsets = std::move(split_offsets),
138+
.sort_order_id = std::nullopt,
139+
.referenced_data_file = std::move(referenced_data_file),
140+
});
141+
142+
FileWriter::WriteResult result;
143+
result.data_files.push_back(std::move(data_file));
144+
return result;
145+
}
146+
147+
private:
148+
static constexpr int64_t kFlushThreshold = 1000;
149+
150+
Impl(PositionDeleteWriterOptions options, std::shared_ptr<Schema> delete_schema,
151+
std::unique_ptr<Writer> writer)
152+
: options_(std::move(options)),
153+
delete_schema_(std::move(delete_schema)),
154+
writer_(std::move(writer)) {}
155+
156+
Status FlushBuffer() {
157+
ArrowSchema arrow_schema;
158+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*delete_schema_, &arrow_schema));
159+
160+
ArrowArray array;
161+
ArrowError error;
162+
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
163+
ArrowArrayInitFromSchema(&array, &arrow_schema, &error), error);
164+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&array));
165+
166+
for (size_t i = 0; i < buffered_paths_.size(); ++i) {
167+
ArrowStringView path_view(buffered_paths_[i].data(),
168+
static_cast<int64_t>(buffered_paths_[i].size()));
169+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(
170+
ArrowArrayAppendString(array.children[0], path_view));
171+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(
172+
ArrowArrayAppendInt(array.children[1], buffered_positions_[i]));
173+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&array));
174+
}
175+
176+
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
177+
ArrowArrayFinishBuildingDefault(&array, &error), error);
178+
179+
ICEBERG_RETURN_UNEXPECTED(writer_->Write(&array));
180+
181+
buffered_paths_.clear();
182+
buffered_positions_.clear();
183+
arrow_schema.release(&arrow_schema);
184+
return {};
185+
}
186+
187+
PositionDeleteWriterOptions options_;
188+
std::shared_ptr<Schema> delete_schema_;
189+
std::unique_ptr<Writer> writer_;
190+
bool closed_ = false;
191+
std::vector<std::string> buffered_paths_;
192+
std::vector<int64_t> buffered_positions_;
193+
std::set<std::string> referenced_paths_;
26194
};
27195

196+
PositionDeleteWriter::PositionDeleteWriter(std::unique_ptr<Impl> impl)
197+
: impl_(std::move(impl)) {}
198+
28199
PositionDeleteWriter::~PositionDeleteWriter() = default;
29200

30-
Status PositionDeleteWriter::Write(ArrowArray* data) { return NotImplemented(""); }
201+
Result<std::unique_ptr<PositionDeleteWriter>> PositionDeleteWriter::Make(
202+
const PositionDeleteWriterOptions& options) {
203+
ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
204+
return std::unique_ptr<PositionDeleteWriter>(new PositionDeleteWriter(std::move(impl)));
205+
}
206+
207+
Status PositionDeleteWriter::Write(ArrowArray* data) { return impl_->Write(data); }
31208

32209
Status PositionDeleteWriter::WriteDelete(std::string_view file_path, int64_t pos) {
33-
return NotImplemented("");
210+
return impl_->WriteDelete(file_path, pos);
34211
}
35212

36-
Result<int64_t> PositionDeleteWriter::Length() const { return NotImplemented(""); }
213+
Result<int64_t> PositionDeleteWriter::Length() const { return impl_->Length(); }
37214

38-
Status PositionDeleteWriter::Close() { return NotImplemented(""); }
215+
Status PositionDeleteWriter::Close() { return impl_->Close(); }
39216

40217
Result<FileWriter::WriteResult> PositionDeleteWriter::Metadata() {
41-
return NotImplemented("");
218+
return impl_->Metadata();
42219
}
43220

44221
} // namespace iceberg

src/iceberg/data/position_delete_writer.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter {
5555
public:
5656
~PositionDeleteWriter() override;
5757

58+
/// \brief Create a new PositionDeleteWriter instance.
59+
static Result<std::unique_ptr<PositionDeleteWriter>> Make(
60+
const PositionDeleteWriterOptions& options);
61+
5862
Status Write(ArrowArray* data) override;
5963
Status WriteDelete(std::string_view file_path, int64_t pos);
6064
Result<int64_t> Length() const override;
@@ -64,6 +68,8 @@ class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter {
6468
private:
6569
class Impl;
6670
std::unique_ptr<Impl> impl_;
71+
72+
explicit PositionDeleteWriter(std::unique_ptr<Impl> impl);
6773
};
6874

6975
} // namespace iceberg

src/iceberg/test/data_writer_test.cc

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@
2727

2828
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
2929
#include "iceberg/avro/avro_register.h"
30+
#include "iceberg/data/position_delete_writer.h"
3031
#include "iceberg/file_format.h"
3132
#include "iceberg/manifest/manifest_entry.h"
33+
#include "iceberg/metadata_columns.h"
3234
#include "iceberg/parquet/parquet_register.h"
3335
#include "iceberg/partition_spec.h"
3436
#include "iceberg/row/partition_values.h"
@@ -264,4 +266,134 @@ TEST_F(DataWriterTest, WriteMultipleBatches) {
264266
EXPECT_GT(data_file->file_size_in_bytes, 0);
265267
}
266268

269+
class PositionDeleteWriterTest : public DataWriterTest {
270+
protected:
271+
PositionDeleteWriterOptions MakeDeleteOptions() {
272+
return PositionDeleteWriterOptions{
273+
.path = "test_deletes.parquet",
274+
.schema = schema_,
275+
.spec = partition_spec_,
276+
.partition = PartitionValues{},
277+
.format = FileFormatType::kParquet,
278+
.io = file_io_,
279+
.properties = {{"write.parquet.compression-codec", "uncompressed"}},
280+
};
281+
}
282+
283+
std::shared_ptr<::arrow::Array> CreatePositionDeleteData() {
284+
auto delete_schema = std::make_shared<Schema>(std::vector<SchemaField>{
285+
MetadataColumns::kDeleteFilePath, MetadataColumns::kDeleteFilePos});
286+
287+
ArrowSchema arrow_c_schema;
288+
ICEBERG_THROW_NOT_OK(ToArrowSchema(*delete_schema, &arrow_c_schema));
289+
auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
290+
291+
return ::arrow::json::ArrayFromJSONString(
292+
::arrow::struct_(arrow_type->fields()),
293+
R"([["data_file_1.parquet", 0], ["data_file_1.parquet", 5], ["data_file_1.parquet", 10]])")
294+
.ValueOrDie();
295+
}
296+
};
297+
298+
TEST_F(PositionDeleteWriterTest, WriteDeleteAndClose) {
299+
auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
300+
ASSERT_THAT(writer_result, IsOk());
301+
auto writer = std::move(writer_result.value());
302+
303+
ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());
304+
ASSERT_THAT(writer->WriteDelete("data_file.parquet", 5), IsOk());
305+
ASSERT_THAT(writer->WriteDelete("data_file.parquet", 10), IsOk());
306+
307+
ASSERT_THAT(writer->Close(), IsOk());
308+
309+
auto length_result = writer->Length();
310+
ASSERT_THAT(length_result, IsOk());
311+
EXPECT_GT(length_result.value(), 0);
312+
}
313+
314+
TEST_F(PositionDeleteWriterTest, MetadataAfterClose) {
315+
auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
316+
ASSERT_THAT(writer_result, IsOk());
317+
auto writer = std::move(writer_result.value());
318+
319+
ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());
320+
ASSERT_THAT(writer->WriteDelete("data_file.parquet", 5), IsOk());
321+
ASSERT_THAT(writer->Close(), IsOk());
322+
323+
auto metadata_result = writer->Metadata();
324+
ASSERT_THAT(metadata_result, IsOk());
325+
326+
const auto& write_result = metadata_result.value();
327+
ASSERT_EQ(write_result.data_files.size(), 1);
328+
329+
const auto& data_file = write_result.data_files[0];
330+
EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
331+
EXPECT_EQ(data_file->file_path, "test_deletes.parquet");
332+
EXPECT_EQ(data_file->file_format, FileFormatType::kParquet);
333+
EXPECT_GT(data_file->file_size_in_bytes, 0);
334+
EXPECT_FALSE(data_file->sort_order_id.has_value());
335+
}
336+
337+
TEST_F(PositionDeleteWriterTest, MetadataBeforeCloseReturnsError) {
338+
auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
339+
ASSERT_THAT(writer_result, IsOk());
340+
auto writer = std::move(writer_result.value());
341+
342+
auto metadata_result = writer->Metadata();
343+
ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed));
344+
EXPECT_THAT(metadata_result,
345+
HasErrorMessage("Cannot get metadata before closing the writer"));
346+
}
347+
348+
TEST_F(PositionDeleteWriterTest, CloseIsIdempotent) {
349+
auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
350+
ASSERT_THAT(writer_result, IsOk());
351+
auto writer = std::move(writer_result.value());
352+
353+
ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());
354+
355+
ASSERT_THAT(writer->Close(), IsOk());
356+
ASSERT_THAT(writer->Close(), IsOk());
357+
ASSERT_THAT(writer->Close(), IsOk());
358+
}
359+
360+
TEST_F(PositionDeleteWriterTest, WriteMultipleDeletes) {
361+
auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
362+
ASSERT_THAT(writer_result, IsOk());
363+
auto writer = std::move(writer_result.value());
364+
365+
for (int64_t i = 0; i < 100; ++i) {
366+
ASSERT_THAT(writer->WriteDelete("data_file.parquet", i), IsOk());
367+
}
368+
369+
ASSERT_THAT(writer->Close(), IsOk());
370+
371+
auto metadata_result = writer->Metadata();
372+
ASSERT_THAT(metadata_result, IsOk());
373+
374+
const auto& data_file = metadata_result.value().data_files[0];
375+
EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
376+
EXPECT_GT(data_file->file_size_in_bytes, 0);
377+
}
378+
379+
TEST_F(PositionDeleteWriterTest, WriteBatchData) {
380+
auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
381+
ASSERT_THAT(writer_result, IsOk());
382+
auto writer = std::move(writer_result.value());
383+
384+
auto test_data = CreatePositionDeleteData();
385+
ArrowArray arrow_array;
386+
ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok());
387+
ASSERT_THAT(writer->Write(&arrow_array), IsOk());
388+
389+
ASSERT_THAT(writer->Close(), IsOk());
390+
391+
auto metadata_result = writer->Metadata();
392+
ASSERT_THAT(metadata_result, IsOk());
393+
394+
const auto& data_file = metadata_result.value().data_files[0];
395+
EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
396+
EXPECT_GT(data_file->file_size_in_bytes, 0);
397+
}
398+
267399
} // namespace iceberg

0 commit comments

Comments
 (0)