Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 182 additions & 5 deletions src/iceberg/data/position_delete_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,203 @@

#include "iceberg/data/position_delete_writer.h"

#include <map>
#include <set>
#include <vector>

#include <nanoarrow/nanoarrow.h>

#include "iceberg/arrow/nanoarrow_status_internal.h"
#include "iceberg/file_writer.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/metadata_columns.h"
#include "iceberg/schema.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/macros.h"

namespace iceberg {

class PositionDeleteWriter::Impl {
public:
static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions options) {
// Build the position delete schema with file_path and pos columns
Comment thread
wgtmac marked this conversation as resolved.
Outdated
std::vector<SchemaField> fields;
fields.push_back(MetadataColumns::kDeleteFilePath);
fields.push_back(MetadataColumns::kDeleteFilePos);

auto delete_schema = std::make_shared<Schema>(std::move(fields));

WriterOptions writer_options{
.path = options.path,
.schema = delete_schema,
.io = options.io,
.properties = WriterProperties::FromMap(options.properties),
};

ICEBERG_ASSIGN_OR_RAISE(auto writer,
WriterFactoryRegistry::Open(options.format, writer_options));

return std::unique_ptr<Impl>(
new Impl(std::move(options), std::move(delete_schema), std::move(writer)));
}

Status Write(ArrowArray* data) {
Comment thread
wgtmac marked this conversation as resolved.
ICEBERG_DCHECK(writer_, "Writer not initialized");
return writer_->Write(data);
}

Status WriteDelete(std::string_view file_path, int64_t pos) {
ICEBERG_DCHECK(writer_, "Writer not initialized");
buffered_paths_.emplace_back(file_path);
buffered_positions_.push_back(pos);
referenced_paths_.emplace(file_path);

if (static_cast<int64_t>(buffered_paths_.size()) >= kFlushThreshold) {
Comment thread
wgtmac marked this conversation as resolved.
Outdated
return FlushBuffer();
}
return {};
}

Result<int64_t> Length() const {
ICEBERG_DCHECK(writer_, "Writer not initialized");
return writer_->length();
}

Status Close() {
ICEBERG_DCHECK(writer_, "Writer not initialized");
if (closed_) {
return {};
}
if (!buffered_paths_.empty()) {
ICEBERG_RETURN_UNEXPECTED(FlushBuffer());
}
ICEBERG_RETURN_UNEXPECTED(writer_->Close());
closed_ = true;
return {};
}

Result<FileWriter::WriteResult> Metadata() {
ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");

ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
auto split_offsets = writer_->split_offsets();

// Serialize literal bounds to binary format
std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
Comment thread
wgtmac marked this conversation as resolved.
for (const auto& [col_id, literal] : metrics.lower_bounds) {
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
lower_bounds_map[col_id] = std::move(serialized);
}
std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
for (const auto& [col_id, literal] : metrics.upper_bounds) {
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
upper_bounds_map[col_id] = std::move(serialized);
}

// Set referenced_data_file if all deletes reference the same data file
std::optional<std::string> referenced_data_file;
if (referenced_paths_.size() == 1) {
referenced_data_file = *referenced_paths_.begin();
}

auto data_file = std::make_shared<DataFile>(DataFile{
.content = DataFile::Content::kPositionDeletes,
.file_path = options_.path,
.file_format = options_.format,
.partition = options_.partition,
.record_count = metrics.row_count.value_or(-1),
.file_size_in_bytes = length,
.column_sizes = {metrics.column_sizes.begin(), metrics.column_sizes.end()},
.value_counts = {metrics.value_counts.begin(), metrics.value_counts.end()},
.null_value_counts = {metrics.null_value_counts.begin(),
metrics.null_value_counts.end()},
.nan_value_counts = {metrics.nan_value_counts.begin(),
metrics.nan_value_counts.end()},
.lower_bounds = std::move(lower_bounds_map),
.upper_bounds = std::move(upper_bounds_map),
.split_offsets = std::move(split_offsets),
.sort_order_id = std::nullopt,
.referenced_data_file = std::move(referenced_data_file),
});

FileWriter::WriteResult result;
result.data_files.push_back(std::move(data_file));
return result;
}

private:
static constexpr int64_t kFlushThreshold = 1000;

Impl(PositionDeleteWriterOptions options, std::shared_ptr<Schema> delete_schema,
std::unique_ptr<Writer> writer)
: options_(std::move(options)),
delete_schema_(std::move(delete_schema)),
writer_(std::move(writer)) {}

Status FlushBuffer() {
ArrowSchema arrow_schema;
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*delete_schema_, &arrow_schema));

ArrowArray array;
ArrowError error;
ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
ArrowArrayInitFromSchema(&array, &arrow_schema, &error), error);
Comment thread
wgtmac marked this conversation as resolved.
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&array));

for (size_t i = 0; i < buffered_paths_.size(); ++i) {
ArrowStringView path_view(buffered_paths_[i].data(),
static_cast<int64_t>(buffered_paths_[i].size()));
ICEBERG_NANOARROW_RETURN_UNEXPECTED(
ArrowArrayAppendString(array.children[0], path_view));
ICEBERG_NANOARROW_RETURN_UNEXPECTED(
ArrowArrayAppendInt(array.children[1], buffered_positions_[i]));
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&array));
}

ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
ArrowArrayFinishBuildingDefault(&array, &error), error);

ICEBERG_RETURN_UNEXPECTED(writer_->Write(&array));

buffered_paths_.clear();
buffered_positions_.clear();
arrow_schema.release(&arrow_schema);
return {};
}

PositionDeleteWriterOptions options_;
std::shared_ptr<Schema> delete_schema_;
std::unique_ptr<Writer> writer_;
bool closed_ = false;
std::vector<std::string> buffered_paths_;
std::vector<int64_t> buffered_positions_;
std::set<std::string> referenced_paths_;
};

PositionDeleteWriter::PositionDeleteWriter(std::unique_ptr<Impl> impl)
: impl_(std::move(impl)) {}

PositionDeleteWriter::~PositionDeleteWriter() = default;

Status PositionDeleteWriter::Write(ArrowArray* data) { return NotImplemented(""); }
Result<std::unique_ptr<PositionDeleteWriter>> PositionDeleteWriter::Make(
const PositionDeleteWriterOptions& options) {
ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
return std::unique_ptr<PositionDeleteWriter>(new PositionDeleteWriter(std::move(impl)));
}

Status PositionDeleteWriter::Write(ArrowArray* data) { return impl_->Write(data); }

Status PositionDeleteWriter::WriteDelete(std::string_view file_path, int64_t pos) {
return NotImplemented("");
return impl_->WriteDelete(file_path, pos);
}

Result<int64_t> PositionDeleteWriter::Length() const { return NotImplemented(""); }
Result<int64_t> PositionDeleteWriter::Length() const { return impl_->Length(); }

Status PositionDeleteWriter::Close() { return NotImplemented(""); }
Status PositionDeleteWriter::Close() { return impl_->Close(); }

Result<FileWriter::WriteResult> PositionDeleteWriter::Metadata() {
return NotImplemented("");
return impl_->Metadata();
}

} // namespace iceberg
6 changes: 6 additions & 0 deletions src/iceberg/data/position_delete_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter {
public:
~PositionDeleteWriter() override;

/// \brief Create a new PositionDeleteWriter instance.
static Result<std::unique_ptr<PositionDeleteWriter>> Make(
const PositionDeleteWriterOptions& options);

Status Write(ArrowArray* data) override;
Status WriteDelete(std::string_view file_path, int64_t pos);
Result<int64_t> Length() const override;
Expand All @@ -64,6 +68,8 @@ class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter {
private:
class Impl;
std::unique_ptr<Impl> impl_;

explicit PositionDeleteWriter(std::unique_ptr<Impl> impl);
};

} // namespace iceberg
132 changes: 132 additions & 0 deletions src/iceberg/test/data_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@

#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/data/position_delete_writer.h"
#include "iceberg/file_format.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/metadata_columns.h"
#include "iceberg/parquet/parquet_register.h"
#include "iceberg/partition_spec.h"
#include "iceberg/row/partition_values.h"
Expand Down Expand Up @@ -264,4 +266,134 @@ TEST_F(DataWriterTest, WriteMultipleBatches) {
EXPECT_GT(data_file->file_size_in_bytes, 0);
}

class PositionDeleteWriterTest : public DataWriterTest {
protected:
PositionDeleteWriterOptions MakeDeleteOptions() {
return PositionDeleteWriterOptions{
.path = "test_deletes.parquet",
.schema = schema_,
.spec = partition_spec_,
.partition = PartitionValues{},
.format = FileFormatType::kParquet,
.io = file_io_,
.properties = {{"write.parquet.compression-codec", "uncompressed"}},
};
}

std::shared_ptr<::arrow::Array> CreatePositionDeleteData() {
auto delete_schema = std::make_shared<Schema>(std::vector<SchemaField>{
MetadataColumns::kDeleteFilePath, MetadataColumns::kDeleteFilePos});

ArrowSchema arrow_c_schema;
ICEBERG_THROW_NOT_OK(ToArrowSchema(*delete_schema, &arrow_c_schema));
auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();

return ::arrow::json::ArrayFromJSONString(
::arrow::struct_(arrow_type->fields()),
R"([["data_file_1.parquet", 0], ["data_file_1.parquet", 5], ["data_file_1.parquet", 10]])")
.ValueOrDie();
}
};

TEST_F(PositionDeleteWriterTest, WriteDeleteAndClose) {
auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
ASSERT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());
ASSERT_THAT(writer->WriteDelete("data_file.parquet", 5), IsOk());
ASSERT_THAT(writer->WriteDelete("data_file.parquet", 10), IsOk());

ASSERT_THAT(writer->Close(), IsOk());

auto length_result = writer->Length();
ASSERT_THAT(length_result, IsOk());
EXPECT_GT(length_result.value(), 0);
}

TEST_F(PositionDeleteWriterTest, MetadataAfterClose) {
auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
ASSERT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());
ASSERT_THAT(writer->WriteDelete("data_file.parquet", 5), IsOk());
ASSERT_THAT(writer->Close(), IsOk());

auto metadata_result = writer->Metadata();
ASSERT_THAT(metadata_result, IsOk());

const auto& write_result = metadata_result.value();
ASSERT_EQ(write_result.data_files.size(), 1);

const auto& data_file = write_result.data_files[0];
EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
EXPECT_EQ(data_file->file_path, "test_deletes.parquet");
EXPECT_EQ(data_file->file_format, FileFormatType::kParquet);
EXPECT_GT(data_file->file_size_in_bytes, 0);
EXPECT_FALSE(data_file->sort_order_id.has_value());
}

TEST_F(PositionDeleteWriterTest, MetadataBeforeCloseReturnsError) {
auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
ASSERT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

auto metadata_result = writer->Metadata();
ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed));
EXPECT_THAT(metadata_result,
HasErrorMessage("Cannot get metadata before closing the writer"));
}

TEST_F(PositionDeleteWriterTest, CloseIsIdempotent) {
auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
ASSERT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());

ASSERT_THAT(writer->Close(), IsOk());
ASSERT_THAT(writer->Close(), IsOk());
ASSERT_THAT(writer->Close(), IsOk());
}

TEST_F(PositionDeleteWriterTest, WriteMultipleDeletes) {
auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
ASSERT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

for (int64_t i = 0; i < 100; ++i) {
ASSERT_THAT(writer->WriteDelete("data_file.parquet", i), IsOk());
}

ASSERT_THAT(writer->Close(), IsOk());

auto metadata_result = writer->Metadata();
ASSERT_THAT(metadata_result, IsOk());

const auto& data_file = metadata_result.value().data_files[0];
EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
EXPECT_GT(data_file->file_size_in_bytes, 0);
}

TEST_F(PositionDeleteWriterTest, WriteBatchData) {
auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
ASSERT_THAT(writer_result, IsOk());
auto writer = std::move(writer_result.value());

auto test_data = CreatePositionDeleteData();
ArrowArray arrow_array;
ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok());
ASSERT_THAT(writer->Write(&arrow_array), IsOk());

ASSERT_THAT(writer->Close(), IsOk());

auto metadata_result = writer->Metadata();
ASSERT_THAT(metadata_result, IsOk());

const auto& data_file = metadata_result.value().data_files[0];
EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
EXPECT_GT(data_file->file_size_in_bytes, 0);
}
Comment thread
wgtmac marked this conversation as resolved.

} // namespace iceberg
Loading