Skip to content

Commit 8944a75

Browse files
committed
feat: implement DataWriter for Iceberg data files
Implements DataWriter class for writing Iceberg data files as part of issue #441 (task 2). Implementation: - Factory method DataWriter::Make() for creating writer instances - Support for Parquet and Avro file formats via WriterFactoryRegistry - Complete DataFile metadata generation including partition info, column statistics, serialized bounds, and sort order ID - Proper lifecycle management with Initialize/Write/Close/Metadata - PIMPL idiom for ABI stability Tests: - 12 comprehensive unit tests covering creation, write/close lifecycle, metadata generation, error handling, and feature validation - All tests passing (12/12) Related to #441
1 parent 43b83c5 commit 8944a75

File tree

3 files changed

+521
-5
lines changed

3 files changed

+521
-5
lines changed

src/iceberg/data/data_writer.cc

Lines changed: 108 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,124 @@
1919

2020
#include "iceberg/data/data_writer.h"
2121

22+
#include "iceberg/file_writer.h"
23+
#include "iceberg/manifest/manifest_entry.h"
24+
#include "iceberg/util/macros.h"
25+
2226
namespace iceberg {
2327

2428
class DataWriter::Impl {
2529
public:
30+
explicit Impl(DataWriterOptions options) : options_(std::move(options)) {}
31+
32+
Status Initialize() {
33+
WriterOptions writer_options;
34+
writer_options.path = options_.path;
35+
writer_options.schema = options_.schema;
36+
writer_options.io = options_.io;
37+
writer_options.properties = WriterProperties::FromMap(options_.properties);
38+
39+
ICEBERG_ASSIGN_OR_RAISE(writer_,
40+
WriterFactoryRegistry::Open(options_.format, writer_options));
41+
return {};
42+
}
43+
44+
Status Write(ArrowArray* data) {
45+
if (!writer_) {
46+
return InvalidArgument("Writer not initialized");
47+
}
48+
return writer_->Write(data);
49+
}
50+
51+
Result<int64_t> Length() const {
52+
if (!writer_) {
53+
return InvalidArgument("Writer not initialized");
54+
}
55+
return writer_->length();
56+
}
57+
58+
Status Close() {
59+
if (!writer_) {
60+
return InvalidArgument("Writer not initialized");
61+
}
62+
if (closed_) {
63+
return InvalidArgument("Writer already closed");
64+
}
65+
ICEBERG_RETURN_UNEXPECTED(writer_->Close());
66+
closed_ = true;
67+
return {};
68+
}
69+
70+
Result<FileWriter::WriteResult> Metadata() {
71+
if (!closed_) {
72+
return InvalidArgument("Cannot get metadata before closing the writer");
73+
}
74+
75+
ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
76+
ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
77+
auto split_offsets = writer_->split_offsets();
78+
79+
auto data_file = std::make_shared<DataFile>();
80+
data_file->content = DataFile::Content::kData;
81+
data_file->file_path = options_.path;
82+
data_file->file_format = options_.format;
83+
data_file->partition = options_.partition;
84+
data_file->record_count = metrics.row_count.value_or(0);
85+
data_file->file_size_in_bytes = length;
86+
data_file->sort_order_id = options_.sort_order_id;
87+
data_file->split_offsets = std::move(split_offsets);
88+
89+
// Convert metrics maps from unordered_map to map
90+
for (const auto& [col_id, size] : metrics.column_sizes) {
91+
data_file->column_sizes[col_id] = size;
92+
}
93+
for (const auto& [col_id, count] : metrics.value_counts) {
94+
data_file->value_counts[col_id] = count;
95+
}
96+
for (const auto& [col_id, count] : metrics.null_value_counts) {
97+
data_file->null_value_counts[col_id] = count;
98+
}
99+
for (const auto& [col_id, count] : metrics.nan_value_counts) {
100+
data_file->nan_value_counts[col_id] = count;
101+
}
102+
103+
// Serialize literal bounds to binary format
104+
for (const auto& [col_id, literal] : metrics.lower_bounds) {
105+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
106+
data_file->lower_bounds[col_id] = std::move(serialized);
107+
}
108+
for (const auto& [col_id, literal] : metrics.upper_bounds) {
109+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
110+
data_file->upper_bounds[col_id] = std::move(serialized);
111+
}
112+
113+
FileWriter::WriteResult result;
114+
result.data_files.push_back(std::move(data_file));
115+
return result;
116+
}
117+
118+
private:
119+
DataWriterOptions options_;
120+
std::unique_ptr<Writer> writer_;
121+
bool closed_ = false;
26122
};
27123

124+
DataWriter::DataWriter(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
125+
28126
DataWriter::~DataWriter() = default;
29127

30-
Status DataWriter::Write(ArrowArray* data) { return NotImplemented(""); }
128+
Result<std::unique_ptr<DataWriter>> DataWriter::Make(const DataWriterOptions& options) {
129+
auto impl = std::make_unique<Impl>(options);
130+
ICEBERG_RETURN_UNEXPECTED(impl->Initialize());
131+
return std::unique_ptr<DataWriter>(new DataWriter(std::move(impl)));
132+
}
133+
134+
Status DataWriter::Write(ArrowArray* data) { return impl_->Write(data); }
31135

32-
Result<int64_t> DataWriter::Length() const { return NotImplemented(""); }
136+
Result<int64_t> DataWriter::Length() const { return impl_->Length(); }
33137

34-
Status DataWriter::Close() { return NotImplemented(""); }
138+
Status DataWriter::Close() { return impl_->Close(); }
35139

36-
Result<FileWriter::WriteResult> DataWriter::Metadata() { return NotImplemented(""); }
140+
Result<FileWriter::WriteResult> DataWriter::Metadata() { return impl_->Metadata(); }
37141

38142
} // namespace iceberg

src/iceberg/data/data_writer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ class ICEBERG_EXPORT DataWriter : public FileWriter {
5555
public:
5656
~DataWriter() override;
5757

58+
/// \brief Create a new DataWriter instance.
59+
static Result<std::unique_ptr<DataWriter>> Make(const DataWriterOptions& options);
60+
5861
Status Write(ArrowArray* data) override;
5962
Result<int64_t> Length() const override;
6063
Status Close() override;
@@ -63,6 +66,8 @@ class ICEBERG_EXPORT DataWriter : public FileWriter {
6366
private:
6467
class Impl;
6568
std::unique_ptr<Impl> impl_;
69+
70+
explicit DataWriter(std::unique_ptr<Impl> impl);
6671
};
6772

6873
} // namespace iceberg

0 commit comments

Comments
 (0)