Skip to content

Commit 147f25b

Browse files
committed
feat: implement DataWriter for Iceberg data files
Implements DataWriter class for writing Iceberg data files as part of issue #441 (task 2). Implementation: - Static factory method DataWriter::Make() for creating writer instances - Support for Parquet and Avro file formats via WriterFactoryRegistry - Complete DataFile metadata generation including partition info, column statistics, serialized bounds, and sort order ID - Proper lifecycle management with Write/Close/Metadata methods - Idempotent Close() - multiple calls succeed (no-op after first) - PIMPL idiom for ABI stability - Not thread-safe (documented) Tests: - 13 comprehensive unit tests including parameterized format tests - Coverage: creation, write/close lifecycle, metadata generation, error handling, feature validation, and data size verification - All tests passing (13/13) Related to #441
1 parent 43b83c5 commit 147f25b

File tree

3 files changed

+520
-5
lines changed

3 files changed

+520
-5
lines changed

src/iceberg/data/data_writer.cc

Lines changed: 102 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,118 @@
1919

2020
#include "iceberg/data/data_writer.h"
2121

22+
#include "iceberg/file_writer.h"
23+
#include "iceberg/manifest/manifest_entry.h"
24+
#include "iceberg/util/macros.h"
25+
2226
namespace iceberg {
2327

2428
class DataWriter::Impl {
2529
public:
30+
static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
31+
WriterOptions writer_options;
32+
writer_options.path = options.path;
33+
writer_options.schema = options.schema;
34+
writer_options.io = options.io;
35+
writer_options.properties = WriterProperties::FromMap(options.properties);
36+
37+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
38+
WriterFactoryRegistry::Open(options.format, writer_options));
39+
40+
return std::unique_ptr<Impl>(new Impl(std::move(options), std::move(writer)));
41+
}
42+
43+
Status Write(ArrowArray* data) {
44+
ICEBERG_PRECHECK(writer_, "Writer not initialized");
45+
return writer_->Write(data);
46+
}
47+
48+
Result<int64_t> Length() const {
49+
ICEBERG_PRECHECK(writer_, "Writer not initialized");
50+
return writer_->length();
51+
}
52+
53+
Status Close() {
54+
ICEBERG_PRECHECK(writer_, "Writer not initialized");
55+
if (closed_) {
56+
// Idempotent: no-op if already closed
57+
return {};
58+
}
59+
ICEBERG_RETURN_UNEXPECTED(writer_->Close());
60+
closed_ = true;
61+
return {};
62+
}
63+
64+
Result<FileWriter::WriteResult> Metadata() {
65+
ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer");
66+
67+
ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
68+
ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
69+
auto split_offsets = writer_->split_offsets();
70+
71+
auto data_file = std::make_shared<DataFile>();
72+
data_file->content = DataFile::Content::kData;
73+
data_file->file_path = options_.path;
74+
data_file->file_format = options_.format;
75+
data_file->partition = options_.partition;
76+
data_file->record_count = metrics.row_count.value_or(0);
77+
data_file->file_size_in_bytes = length;
78+
data_file->sort_order_id = options_.sort_order_id;
79+
data_file->split_offsets = std::move(split_offsets);
80+
81+
// Convert metrics maps from unordered_map to map
82+
for (const auto& [col_id, size] : metrics.column_sizes) {
83+
data_file->column_sizes[col_id] = size;
84+
}
85+
for (const auto& [col_id, count] : metrics.value_counts) {
86+
data_file->value_counts[col_id] = count;
87+
}
88+
for (const auto& [col_id, count] : metrics.null_value_counts) {
89+
data_file->null_value_counts[col_id] = count;
90+
}
91+
for (const auto& [col_id, count] : metrics.nan_value_counts) {
92+
data_file->nan_value_counts[col_id] = count;
93+
}
94+
95+
// Serialize literal bounds to binary format
96+
for (const auto& [col_id, literal] : metrics.lower_bounds) {
97+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
98+
data_file->lower_bounds[col_id] = std::move(serialized);
99+
}
100+
for (const auto& [col_id, literal] : metrics.upper_bounds) {
101+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
102+
data_file->upper_bounds[col_id] = std::move(serialized);
103+
}
104+
105+
FileWriter::WriteResult result;
106+
result.data_files.push_back(std::move(data_file));
107+
return result;
108+
}
109+
110+
private:
111+
Impl(DataWriterOptions options, std::unique_ptr<Writer> writer)
112+
: options_(std::move(options)), writer_(std::move(writer)) {}
113+
114+
DataWriterOptions options_;
115+
std::unique_ptr<Writer> writer_;
116+
bool closed_ = false;
26117
};
27118

119+
DataWriter::DataWriter(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
120+
28121
DataWriter::~DataWriter() = default;
29122

30-
Status DataWriter::Write(ArrowArray* data) { return NotImplemented(""); }
123+
Result<std::unique_ptr<DataWriter>> DataWriter::Make(const DataWriterOptions& options) {
124+
ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
125+
return std::unique_ptr<DataWriter>(new DataWriter(std::move(impl)));
126+
}
127+
128+
Status DataWriter::Write(ArrowArray* data) { return impl_->Write(data); }
31129

32-
Result<int64_t> DataWriter::Length() const { return NotImplemented(""); }
130+
Result<int64_t> DataWriter::Length() const { return impl_->Length(); }
33131

34-
Status DataWriter::Close() { return NotImplemented(""); }
132+
Status DataWriter::Close() { return impl_->Close(); }
35133

36-
Result<FileWriter::WriteResult> DataWriter::Metadata() { return NotImplemented(""); }
134+
Result<FileWriter::WriteResult> DataWriter::Metadata() { return impl_->Metadata(); }
37135

38136
} // namespace iceberg

src/iceberg/data/data_writer.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,16 @@ struct ICEBERG_EXPORT DataWriterOptions {
5151
};
5252

5353
/// \brief Writer for Iceberg data files.
54+
///
55+
/// This class is not thread-safe. Concurrent calls to Write(), Close(), or Metadata()
56+
/// from multiple threads may result in undefined behavior.
5457
class ICEBERG_EXPORT DataWriter : public FileWriter {
5558
public:
5659
~DataWriter() override;
5760

61+
/// \brief Create a new DataWriter instance.
62+
static Result<std::unique_ptr<DataWriter>> Make(const DataWriterOptions& options);
63+
5864
Status Write(ArrowArray* data) override;
5965
Result<int64_t> Length() const override;
6066
Status Close() override;
@@ -63,6 +69,8 @@ class ICEBERG_EXPORT DataWriter : public FileWriter {
6369
private:
6470
class Impl;
6571
std::unique_ptr<Impl> impl_;
72+
73+
explicit DataWriter(std::unique_ptr<Impl> impl);
6674
};
6775

6876
} // namespace iceberg

0 commit comments

Comments
 (0)