Skip to content

Commit 153d763

Browse files
committed
feat: implement DataWriter for Iceberg data files
Implements DataWriter class for writing Iceberg data files as part of issue #441 (task 2). Implementation: - Static factory method DataWriter::Make() for creating writer instances - Support for Parquet and Avro file formats via WriterFactoryRegistry - Complete DataFile metadata generation including partition info, column statistics, serialized bounds, and sort order ID - Proper lifecycle management with Write/Close/Metadata methods - Idempotent Close() - multiple calls succeed (no-op after first) - PIMPL idiom for ABI stability - Not thread-safe (documented) Tests: - 13 comprehensive unit tests including parameterized format tests - Coverage: creation, write/close lifecycle, metadata generation, error handling, feature validation, and data size verification - All tests passing (13/13) Related to #441
1 parent 43b83c5 commit 153d763

File tree

3 files changed

+520
-5
lines changed

3 files changed

+520
-5
lines changed

src/iceberg/data/data_writer.cc

Lines changed: 103 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,119 @@
1919

2020
#include "iceberg/data/data_writer.h"
2121

22+
#include "iceberg/file_writer.h"
23+
#include "iceberg/manifest/manifest_entry.h"
24+
#include "iceberg/util/macros.h"
25+
2226
namespace iceberg {
2327

2428
class DataWriter::Impl {
2529
public:
30+
static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
31+
WriterOptions writer_options;
32+
writer_options.path = options.path;
33+
writer_options.schema = options.schema;
34+
writer_options.io = options.io;
35+
writer_options.properties = WriterProperties::FromMap(options.properties);
36+
37+
ICEBERG_ASSIGN_OR_RAISE(
38+
auto writer, WriterFactoryRegistry::Open(options.format, writer_options));
39+
40+
return std::unique_ptr<Impl>(
41+
new Impl(std::move(options), std::move(writer)));
42+
}
43+
44+
Status Write(ArrowArray* data) {
45+
ICEBERG_PRECHECK(writer_, "Writer not initialized");
46+
return writer_->Write(data);
47+
}
48+
49+
Result<int64_t> Length() const {
50+
ICEBERG_PRECHECK(writer_, "Writer not initialized");
51+
return writer_->length();
52+
}
53+
54+
Status Close() {
55+
ICEBERG_PRECHECK(writer_, "Writer not initialized");
56+
if (closed_) {
57+
// Idempotent: no-op if already closed
58+
return {};
59+
}
60+
ICEBERG_RETURN_UNEXPECTED(writer_->Close());
61+
closed_ = true;
62+
return {};
63+
}
64+
65+
Result<FileWriter::WriteResult> Metadata() {
66+
ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer");
67+
68+
ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
69+
ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
70+
auto split_offsets = writer_->split_offsets();
71+
72+
auto data_file = std::make_shared<DataFile>();
73+
data_file->content = DataFile::Content::kData;
74+
data_file->file_path = options_.path;
75+
data_file->file_format = options_.format;
76+
data_file->partition = options_.partition;
77+
data_file->record_count = metrics.row_count.value_or(0);
78+
data_file->file_size_in_bytes = length;
79+
data_file->sort_order_id = options_.sort_order_id;
80+
data_file->split_offsets = std::move(split_offsets);
81+
82+
// Convert metrics maps from unordered_map to map
83+
for (const auto& [col_id, size] : metrics.column_sizes) {
84+
data_file->column_sizes[col_id] = size;
85+
}
86+
for (const auto& [col_id, count] : metrics.value_counts) {
87+
data_file->value_counts[col_id] = count;
88+
}
89+
for (const auto& [col_id, count] : metrics.null_value_counts) {
90+
data_file->null_value_counts[col_id] = count;
91+
}
92+
for (const auto& [col_id, count] : metrics.nan_value_counts) {
93+
data_file->nan_value_counts[col_id] = count;
94+
}
95+
96+
// Serialize literal bounds to binary format
97+
for (const auto& [col_id, literal] : metrics.lower_bounds) {
98+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
99+
data_file->lower_bounds[col_id] = std::move(serialized);
100+
}
101+
for (const auto& [col_id, literal] : metrics.upper_bounds) {
102+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
103+
data_file->upper_bounds[col_id] = std::move(serialized);
104+
}
105+
106+
FileWriter::WriteResult result;
107+
result.data_files.push_back(std::move(data_file));
108+
return result;
109+
}
110+
111+
private:
112+
Impl(DataWriterOptions options, std::unique_ptr<Writer> writer)
113+
: options_(std::move(options)), writer_(std::move(writer)) {}
114+
115+
DataWriterOptions options_;
116+
std::unique_ptr<Writer> writer_;
117+
bool closed_ = false;
26118
};
27119

120+
DataWriter::DataWriter(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
121+
28122
DataWriter::~DataWriter() = default;
29123

30-
Status DataWriter::Write(ArrowArray* data) { return NotImplemented(""); }
124+
Result<std::unique_ptr<DataWriter>> DataWriter::Make(const DataWriterOptions& options) {
125+
ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
126+
return std::unique_ptr<DataWriter>(new DataWriter(std::move(impl)));
127+
}
128+
129+
Status DataWriter::Write(ArrowArray* data) { return impl_->Write(data); }
31130

32-
Result<int64_t> DataWriter::Length() const { return NotImplemented(""); }
131+
Result<int64_t> DataWriter::Length() const { return impl_->Length(); }
33132

34-
Status DataWriter::Close() { return NotImplemented(""); }
133+
Status DataWriter::Close() { return impl_->Close(); }
35134

36-
Result<FileWriter::WriteResult> DataWriter::Metadata() { return NotImplemented(""); }
135+
Result<FileWriter::WriteResult> DataWriter::Metadata() { return impl_->Metadata(); }
37136

38137
} // namespace iceberg

src/iceberg/data/data_writer.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,16 @@ struct ICEBERG_EXPORT DataWriterOptions {
5151
};
5252

5353
/// \brief Writer for Iceberg data files.
54+
///
55+
/// This class is not thread-safe. Concurrent calls to Write(), Close(), or Metadata()
56+
/// from multiple threads may result in undefined behavior.
5457
class ICEBERG_EXPORT DataWriter : public FileWriter {
5558
public:
5659
~DataWriter() override;
5760

61+
/// \brief Create a new DataWriter instance.
62+
static Result<std::unique_ptr<DataWriter>> Make(const DataWriterOptions& options);
63+
5864
Status Write(ArrowArray* data) override;
5965
Result<int64_t> Length() const override;
6066
Status Close() override;
@@ -63,6 +69,8 @@ class ICEBERG_EXPORT DataWriter : public FileWriter {
6369
private:
6470
class Impl;
6571
std::unique_ptr<Impl> impl_;
72+
73+
explicit DataWriter(std::unique_ptr<Impl> impl);
6674
};
6775

6876
} // namespace iceberg

0 commit comments

Comments
 (0)