Skip to content

Commit 63e4ec0

Browse files
authored
feat: implement DataWriter for Iceberg data files (#552)
Implements DataWriter class for writing Iceberg data files as part of issue #441 (task 2). Implementation: - Factory method DataWriter::Make() for creating writer instances - Support for Parquet and Avro file formats via WriterFactoryRegistry - Complete DataFile metadata generation including partition info, column statistics, serialized bounds, and sort order ID - Proper lifecycle management with Initialize/Write/Close/Metadata - PIMPL idiom for ABI stability
1 parent 7e6a7e3 commit 63e4ec0

File tree

3 files changed

+352
-5
lines changed

3 files changed

+352
-5
lines changed

src/iceberg/data/data_writer.cc

Lines changed: 102 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,118 @@
1919

2020
#include "iceberg/data/data_writer.h"
2121

22+
#include <map>
23+
24+
#include "iceberg/file_writer.h"
25+
#include "iceberg/manifest/manifest_entry.h"
26+
#include "iceberg/util/macros.h"
27+
2228
namespace iceberg {
2329

2430
class DataWriter::Impl {
2531
public:
32+
static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
33+
WriterOptions writer_options{
34+
.path = options.path,
35+
.schema = options.schema,
36+
.io = options.io,
37+
.properties = WriterProperties::FromMap(options.properties),
38+
};
39+
40+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
41+
WriterFactoryRegistry::Open(options.format, writer_options));
42+
43+
return std::unique_ptr<Impl>(new Impl(std::move(options), std::move(writer)));
44+
}
45+
46+
Status Write(ArrowArray* data) {
47+
ICEBERG_DCHECK(writer_, "Writer not initialized");
48+
return writer_->Write(data);
49+
}
50+
51+
Result<int64_t> Length() const {
52+
ICEBERG_DCHECK(writer_, "Writer not initialized");
53+
return writer_->length();
54+
}
55+
56+
Status Close() {
57+
ICEBERG_DCHECK(writer_, "Writer not initialized");
58+
if (closed_) {
59+
// Idempotent: no-op if already closed
60+
return {};
61+
}
62+
ICEBERG_RETURN_UNEXPECTED(writer_->Close());
63+
closed_ = true;
64+
return {};
65+
}
66+
67+
Result<FileWriter::WriteResult> Metadata() {
68+
ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
69+
70+
ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
71+
ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
72+
auto split_offsets = writer_->split_offsets();
73+
74+
// Serialize literal bounds to binary format
75+
std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
76+
for (const auto& [col_id, literal] : metrics.lower_bounds) {
77+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
78+
lower_bounds_map[col_id] = std::move(serialized);
79+
}
80+
std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
81+
for (const auto& [col_id, literal] : metrics.upper_bounds) {
82+
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
83+
upper_bounds_map[col_id] = std::move(serialized);
84+
}
85+
86+
auto data_file = std::make_shared<DataFile>(DataFile{
87+
.content = DataFile::Content::kData,
88+
.file_path = options_.path,
89+
.file_format = options_.format,
90+
.partition = options_.partition,
91+
.record_count = metrics.row_count.value_or(-1),
92+
.file_size_in_bytes = length,
93+
.column_sizes = {metrics.column_sizes.begin(), metrics.column_sizes.end()},
94+
.value_counts = {metrics.value_counts.begin(), metrics.value_counts.end()},
95+
.null_value_counts = {metrics.null_value_counts.begin(),
96+
metrics.null_value_counts.end()},
97+
.nan_value_counts = {metrics.nan_value_counts.begin(),
98+
metrics.nan_value_counts.end()},
99+
.lower_bounds = std::move(lower_bounds_map),
100+
.upper_bounds = std::move(upper_bounds_map),
101+
.split_offsets = std::move(split_offsets),
102+
.sort_order_id = options_.sort_order_id,
103+
});
104+
105+
FileWriter::WriteResult result;
106+
result.data_files.push_back(std::move(data_file));
107+
return result;
108+
}
109+
110+
private:
111+
Impl(DataWriterOptions options, std::unique_ptr<Writer> writer)
112+
: options_(std::move(options)), writer_(std::move(writer)) {}
113+
114+
DataWriterOptions options_;
115+
std::unique_ptr<Writer> writer_;
116+
bool closed_ = false;
26117
};
27118

119+
DataWriter::DataWriter(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
120+
28121
DataWriter::~DataWriter() = default;
29122

30-
Status DataWriter::Write(ArrowArray* data) { return NotImplemented(""); }
123+
Result<std::unique_ptr<DataWriter>> DataWriter::Make(const DataWriterOptions& options) {
124+
ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
125+
return std::unique_ptr<DataWriter>(new DataWriter(std::move(impl)));
126+
}
127+
128+
Status DataWriter::Write(ArrowArray* data) { return impl_->Write(data); }
31129

32-
Result<int64_t> DataWriter::Length() const { return NotImplemented(""); }
130+
Result<int64_t> DataWriter::Length() const { return impl_->Length(); }
33131

34-
Status DataWriter::Close() { return NotImplemented(""); }
132+
Status DataWriter::Close() { return impl_->Close(); }
35133

36-
Result<FileWriter::WriteResult> DataWriter::Metadata() { return NotImplemented(""); }
134+
Result<FileWriter::WriteResult> DataWriter::Metadata() { return impl_->Metadata(); }
37135

38136
} // namespace iceberg

src/iceberg/data/data_writer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ class ICEBERG_EXPORT DataWriter : public FileWriter {
5555
public:
5656
~DataWriter() override;
5757

58+
/// \brief Create a new DataWriter instance.
59+
static Result<std::unique_ptr<DataWriter>> Make(const DataWriterOptions& options);
60+
5861
Status Write(ArrowArray* data) override;
5962
Result<int64_t> Length() const override;
6063
Status Close() override;
@@ -63,6 +66,8 @@ class ICEBERG_EXPORT DataWriter : public FileWriter {
6366
private:
6467
class Impl;
6568
std::unique_ptr<Impl> impl_;
69+
70+
explicit DataWriter(std::unique_ptr<Impl> impl);
6671
};
6772

6873
} // namespace iceberg

src/iceberg/test/data_writer_test.cc

Lines changed: 245 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,251 @@
1717
* under the License.
1818
*/
1919

20+
#include "iceberg/data/data_writer.h"
21+
22+
#include <arrow/array.h>
23+
#include <arrow/c/bridge.h>
24+
#include <arrow/json/from_string.h>
2025
#include <gmock/gmock.h>
2126
#include <gtest/gtest.h>
2227

23-
namespace iceberg {} // namespace iceberg
28+
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
29+
#include "iceberg/avro/avro_register.h"
30+
#include "iceberg/file_format.h"
31+
#include "iceberg/manifest/manifest_entry.h"
32+
#include "iceberg/parquet/parquet_register.h"
33+
#include "iceberg/partition_spec.h"
34+
#include "iceberg/row/partition_values.h"
35+
#include "iceberg/schema.h"
36+
#include "iceberg/schema_field.h"
37+
#include "iceberg/schema_internal.h"
38+
#include "iceberg/test/matchers.h"
39+
#include "iceberg/type.h"
40+
#include "iceberg/util/macros.h"
41+
42+
namespace iceberg {
43+
44+
using ::testing::HasSubstr;
45+
46+
class DataWriterTest : public ::testing::Test {
47+
protected:
48+
static void SetUpTestSuite() {
49+
parquet::RegisterAll();
50+
avro::RegisterAll();
51+
}
52+
53+
void SetUp() override {
54+
file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
55+
schema_ = std::make_shared<Schema>(
56+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
57+
SchemaField::MakeOptional(2, "name", string())});
58+
partition_spec_ = PartitionSpec::Unpartitioned();
59+
}
60+
61+
DataWriterOptions MakeDefaultOptions(
62+
std::optional<int32_t> sort_order_id = std::nullopt,
63+
PartitionValues partition = PartitionValues{}) {
64+
return DataWriterOptions{
65+
.path = "test_data.parquet",
66+
.schema = schema_,
67+
.spec = partition_spec_,
68+
.partition = std::move(partition),
69+
.format = FileFormatType::kParquet,
70+
.io = file_io_,
71+
.sort_order_id = sort_order_id,
72+
.properties = {{"write.parquet.compression-codec", "uncompressed"}},
73+
};
74+
}
75+
76+
std::shared_ptr<::arrow::Array> CreateTestData() {
77+
ArrowSchema arrow_c_schema;
78+
ICEBERG_THROW_NOT_OK(ToArrowSchema(*schema_, &arrow_c_schema));
79+
auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
80+
81+
return ::arrow::json::ArrayFromJSONString(
82+
::arrow::struct_(arrow_schema->fields()),
83+
R"([[1, "Alice"], [2, "Bob"], [3, "Charlie"]])")
84+
.ValueOrDie();
85+
}
86+
87+
void WriteTestDataToWriter(DataWriter* writer) {
88+
auto test_data = CreateTestData();
89+
ArrowArray arrow_array;
90+
ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok());
91+
ASSERT_THAT(writer->Write(&arrow_array), IsOk());
92+
}
93+
94+
std::shared_ptr<FileIO> file_io_;
95+
std::shared_ptr<Schema> schema_;
96+
std::shared_ptr<PartitionSpec> partition_spec_;
97+
};
98+
99+
class DataWriterFormatTest
100+
: public DataWriterTest,
101+
public ::testing::WithParamInterface<std::pair<FileFormatType, std::string>> {};
102+
103+
TEST_P(DataWriterFormatTest, CreateWithFormat) {
104+
auto [format, path] = GetParam();
105+
DataWriterOptions options{
106+
.path = path,
107+
.schema = schema_,
108+
.spec = partition_spec_,
109+
.partition = PartitionValues{},
110+
.format = format,
111+
.io = file_io_,
112+
.properties =
113+
format == FileFormatType::kParquet
114+
? std::unordered_map<std::string,
115+
std::string>{{"write.parquet.compression-codec",
116+
"uncompressed"}}
117+
: std::unordered_map<std::string, std::string>{},
118+
};
119+
120+
auto writer_result = DataWriter::Make(options);
121+
ASSERT_THAT(writer_result, IsOk());
122+
auto writer = std::move(writer_result.value());
123+
ASSERT_NE(writer, nullptr);
124+
}
125+
126+
INSTANTIATE_TEST_SUITE_P(
127+
FormatTypes, DataWriterFormatTest,
128+
::testing::Values(std::make_pair(FileFormatType::kParquet, "test_data.parquet"),
129+
std::make_pair(FileFormatType::kAvro, "test_data.avro")));
130+
131+
TEST_F(DataWriterTest, WriteAndClose) {
132+
auto writer_result = DataWriter::Make(MakeDefaultOptions());
133+
ASSERT_THAT(writer_result, IsOk());
134+
auto writer = std::move(writer_result.value());
135+
136+
// Write data
137+
WriteTestDataToWriter(writer.get());
138+
139+
// Length should be greater than 0 after write
140+
auto length_result = writer->Length();
141+
ASSERT_THAT(length_result, IsOk());
142+
EXPECT_GT(length_result.value(), 0);
143+
144+
// Close
145+
ASSERT_THAT(writer->Close(), IsOk());
146+
}
147+
148+
TEST_F(DataWriterTest, MetadataAfterClose) {
149+
auto writer_result = DataWriter::Make(MakeDefaultOptions());
150+
ASSERT_THAT(writer_result, IsOk());
151+
auto writer = std::move(writer_result.value());
152+
153+
WriteTestDataToWriter(writer.get());
154+
ASSERT_THAT(writer->Close(), IsOk());
155+
156+
// Get metadata
157+
auto metadata_result = writer->Metadata();
158+
ASSERT_THAT(metadata_result, IsOk());
159+
160+
const auto& write_result = metadata_result.value();
161+
ASSERT_EQ(write_result.data_files.size(), 1);
162+
163+
const auto& data_file = write_result.data_files[0];
164+
EXPECT_EQ(data_file->content, DataFile::Content::kData);
165+
EXPECT_EQ(data_file->file_path, "test_data.parquet");
166+
EXPECT_EQ(data_file->file_format, FileFormatType::kParquet);
167+
EXPECT_GT(data_file->file_size_in_bytes, 0);
168+
169+
// Metrics availability depends on the underlying writer implementation
170+
EXPECT_GE(data_file->column_sizes.size(), 0);
171+
EXPECT_GE(data_file->value_counts.size(), 0);
172+
EXPECT_GE(data_file->null_value_counts.size(), 0);
173+
}
174+
175+
TEST_F(DataWriterTest, MetadataBeforeCloseReturnsError) {
176+
auto writer_result = DataWriter::Make(MakeDefaultOptions());
177+
ASSERT_THAT(writer_result, IsOk());
178+
auto writer = std::move(writer_result.value());
179+
180+
// Try to get metadata before closing
181+
auto metadata_result = writer->Metadata();
182+
ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed));
183+
EXPECT_THAT(metadata_result,
184+
HasErrorMessage("Cannot get metadata before closing the writer"));
185+
}
186+
187+
TEST_F(DataWriterTest, CloseIsIdempotent) {
188+
auto writer_result = DataWriter::Make(MakeDefaultOptions());
189+
ASSERT_THAT(writer_result, IsOk());
190+
auto writer = std::move(writer_result.value());
191+
192+
WriteTestDataToWriter(writer.get());
193+
194+
ASSERT_THAT(writer->Close(), IsOk());
195+
ASSERT_THAT(writer->Close(), IsOk());
196+
ASSERT_THAT(writer->Close(), IsOk());
197+
}
198+
199+
TEST_F(DataWriterTest, SortOrderIdInMetadata) {
200+
// Test with explicit sort order id
201+
{
202+
const int32_t sort_order_id = 42;
203+
auto writer_result = DataWriter::Make(MakeDefaultOptions(sort_order_id));
204+
ASSERT_THAT(writer_result, IsOk());
205+
auto writer = std::move(writer_result.value());
206+
207+
WriteTestDataToWriter(writer.get());
208+
ASSERT_THAT(writer->Close(), IsOk());
209+
210+
auto metadata_result = writer->Metadata();
211+
ASSERT_THAT(metadata_result, IsOk());
212+
const auto& data_file = metadata_result.value().data_files[0];
213+
ASSERT_TRUE(data_file->sort_order_id.has_value());
214+
EXPECT_EQ(data_file->sort_order_id.value(), sort_order_id);
215+
}
216+
217+
// Test without sort order id (should be nullopt)
218+
{
219+
auto writer_result = DataWriter::Make(MakeDefaultOptions());
220+
ASSERT_THAT(writer_result, IsOk());
221+
auto writer = std::move(writer_result.value());
222+
223+
WriteTestDataToWriter(writer.get());
224+
ASSERT_THAT(writer->Close(), IsOk());
225+
226+
auto metadata_result = writer->Metadata();
227+
ASSERT_THAT(metadata_result, IsOk());
228+
const auto& data_file = metadata_result.value().data_files[0];
229+
EXPECT_FALSE(data_file->sort_order_id.has_value());
230+
}
231+
}
232+
233+
TEST_F(DataWriterTest, PartitionValuesPreserved) {
234+
PartitionValues partition_values({Literal::Int(42), Literal::String("test")});
235+
236+
auto writer_result =
237+
DataWriter::Make(MakeDefaultOptions(std::nullopt, partition_values));
238+
ASSERT_THAT(writer_result, IsOk());
239+
auto writer = std::move(writer_result.value());
240+
241+
WriteTestDataToWriter(writer.get());
242+
ASSERT_THAT(writer->Close(), IsOk());
243+
244+
auto metadata_result = writer->Metadata();
245+
ASSERT_THAT(metadata_result, IsOk());
246+
const auto& data_file = metadata_result.value().data_files[0];
247+
248+
EXPECT_EQ(data_file->partition.num_fields(), partition_values.num_fields());
249+
EXPECT_EQ(data_file->partition.num_fields(), 2);
250+
}
251+
252+
TEST_F(DataWriterTest, WriteMultipleBatches) {
253+
auto writer_result = DataWriter::Make(MakeDefaultOptions());
254+
ASSERT_THAT(writer_result, IsOk());
255+
auto writer = std::move(writer_result.value());
256+
257+
WriteTestDataToWriter(writer.get());
258+
WriteTestDataToWriter(writer.get());
259+
ASSERT_THAT(writer->Close(), IsOk());
260+
261+
auto metadata_result = writer->Metadata();
262+
ASSERT_THAT(metadata_result, IsOk());
263+
const auto& data_file = metadata_result.value().data_files[0];
264+
EXPECT_GT(data_file->file_size_in_bytes, 0);
265+
}
266+
267+
} // namespace iceberg

0 commit comments

Comments
 (0)