Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
set(ICEBERG_SOURCES
arrow_c_data_guard_internal.cc
catalog/memory/in_memory_catalog.cc
data/writer.cc
expression/aggregate.cc
expression/binder.cc
expression/evaluator.cc
Expand Down Expand Up @@ -142,6 +143,7 @@ add_iceberg_lib(iceberg
iceberg_install_all_headers(iceberg)

add_subdirectory(catalog)
add_subdirectory(data)
add_subdirectory(expression)
add_subdirectory(manifest)
add_subdirectory(row)
Expand Down
18 changes: 18 additions & 0 deletions src/iceberg/data/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

iceberg_install_all_headers(iceberg/data)
27 changes: 27 additions & 0 deletions src/iceberg/data/writer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/data/writer.h"

namespace iceberg {

// FileWriter is a pure virtual interface class.
// Implementations will be provided in subsequent tasks.

} // namespace iceberg
91 changes: 91 additions & 0 deletions src/iceberg/data/writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

/// \file iceberg/data/writer.h
/// Base interface for Iceberg data file writers.

#include <cstdint>
#include <memory>
#include <vector>
Comment thread
wgtmac marked this conversation as resolved.

#include "iceberg/arrow_c_data.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/result.h"

namespace iceberg {

/// \brief Base interface for data file writers.
///
/// This interface defines the common operations for writing Iceberg data files,
/// including data files, equality delete files, and position delete files.
///
/// Typical usage:
/// 1. Create a writer instance (via concrete implementation)
/// 2. Call Write() one or more times to write data
/// 3. Call Close() to finalize the file
/// 4. Call Metadata() to get file metadata (only valid after Close())
///
/// \note This interface is not thread-safe. Concurrent calls to Write()
/// from multiple threads on the same instance are not supported.
///
/// \note This interface uses PascalCase method naming (Write, Length, Close, Metadata)
/// to distinguish it from the lower-level iceberg/file_writer.h::Writer interface which
/// uses lowercase naming. FileWriter is the Iceberg-specific data file writer
/// abstraction, while Writer is the file format-level abstraction.
Comment thread
shangxinli marked this conversation as resolved.
Outdated
class ICEBERG_EXPORT FileWriter {
public:
virtual ~FileWriter() = default;

/// \brief Write a batch of records.
///
/// \param data Arrow array containing the records to write.
/// \return Status indicating success or failure.
virtual Status Write(ArrowArray* data) = 0;

/// \brief Get the current number of bytes written.
///
/// \return Result containing the number of bytes written or an error.
virtual Result<int64_t> Length() const = 0;

/// \brief Close the writer and finalize the file.
///
/// \return Status indicating success or failure.
virtual Status Close() = 0;

/// \brief File metadata for all files produced by the writer.
struct ICEBERG_EXPORT WriteResult {
/// Usually a writer produces a single data or delete file.
/// Position delete writer may produce multiple file-scoped delete files.
/// In the future, multiple files can be produced if file rolling is supported.
std::vector<std::shared_ptr<DataFile>> data_files;
};

/// \brief Get file metadata for all files produced by this writer.
///
/// This method should be called after Close() to retrieve the metadata
/// for all files written by this writer.
///
/// \return Result containing the write result or an error.
virtual Result<WriteResult> Metadata() = 0;
};

} // namespace iceberg
2 changes: 2 additions & 0 deletions src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ add_iceberg_test(util_test

add_iceberg_test(roaring_test SOURCES roaring_test.cc)

add_iceberg_test(data_writer_test SOURCES data_writer_test.cc)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to move it to the if(ICEBERG_BUILD_BUNDLE) block below since it depends on Avro and Parquet libraries.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


if(ICEBERG_BUILD_BUNDLE)
add_iceberg_test(avro_test
USE_BUNDLE
Expand Down
218 changes: 218 additions & 0 deletions src/iceberg/test/data_writer_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <memory>
#include <vector>

#include <gmock/gmock.h>
#include <gtest/gtest.h>

#include "iceberg/arrow_c_data.h"
#include "iceberg/data/writer.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/result.h"
#include "iceberg/test/matchers.h"

namespace iceberg {

// Mock implementation of FileWriter for testing
class MockFileWriter : public FileWriter {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, we don't need to add test cases for a pure interface classes. We can keep it for now and then remove all these cases once we have implemented DataWriter.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

public:
MockFileWriter() = default;

Status Write(ArrowArray* data) override {
if (is_closed_) {
return Invalid("Writer is closed");
}
if (data == nullptr) {
return Invalid("Null data provided");
}
write_count_++;
// Simulate writing some bytes
bytes_written_ += 1024;
return {};
}

Result<int64_t> Length() const override { return bytes_written_; }

Status Close() override {
if (is_closed_) {
return Invalid("Writer already closed");
}
is_closed_ = true;
return {};
}

Result<WriteResult> Metadata() override {
if (!is_closed_) {
return Invalid("Writer must be closed before getting metadata");
}

WriteResult result;
auto data_file = std::make_shared<DataFile>();
data_file->file_path = "/test/data/file.parquet";
data_file->file_format = FileFormatType::kParquet;
data_file->record_count = write_count_ * 100;
data_file->file_size_in_bytes = bytes_written_;
result.data_files.push_back(data_file);

return result;
}

bool is_closed() const { return is_closed_; }
int32_t write_count() const { return write_count_; }

private:
int64_t bytes_written_ = 0;
bool is_closed_ = false;
int32_t write_count_ = 0;
};

TEST(FileWriterTest, BasicWriteOperation) {
MockFileWriter writer;

// Create a dummy ArrowArray (normally this would contain actual data)
ArrowArray dummy_array = {};

ASSERT_THAT(writer.Write(&dummy_array), IsOk());
ASSERT_EQ(writer.write_count(), 1);

auto length_result = writer.Length();
ASSERT_THAT(length_result, IsOk());
ASSERT_EQ(*length_result, 1024);
}

TEST(FileWriterTest, MultipleWrites) {
MockFileWriter writer;
ArrowArray dummy_array = {};

// Write multiple times
for (int i = 0; i < 5; i++) {
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
}

ASSERT_EQ(writer.write_count(), 5);

auto length_result = writer.Length();
ASSERT_THAT(length_result, IsOk());
ASSERT_EQ(*length_result, 5120); // 5 * 1024
}

TEST(FileWriterTest, WriteNullData) {
MockFileWriter writer;

auto status = writer.Write(nullptr);
ASSERT_THAT(status, HasErrorMessage("Null data provided"));
}

TEST(FileWriterTest, CloseWriter) {
MockFileWriter writer;
ArrowArray dummy_array = {};

ASSERT_THAT(writer.Write(&dummy_array), IsOk());
ASSERT_FALSE(writer.is_closed());

ASSERT_THAT(writer.Close(), IsOk());
ASSERT_TRUE(writer.is_closed());
}

TEST(FileWriterTest, DoubleClose) {
MockFileWriter writer;

ASSERT_THAT(writer.Close(), IsOk());
auto status = writer.Close();
ASSERT_THAT(status, HasErrorMessage("Writer already closed"));
}

TEST(FileWriterTest, WriteAfterClose) {
MockFileWriter writer;
ArrowArray dummy_array = {};

ASSERT_THAT(writer.Close(), IsOk());

auto status = writer.Write(&dummy_array);
ASSERT_THAT(status, HasErrorMessage("Writer is closed"));
}

TEST(FileWriterTest, MetadataBeforeClose) {
MockFileWriter writer;
ArrowArray dummy_array = {};

ASSERT_THAT(writer.Write(&dummy_array), IsOk());

auto metadata_result = writer.Metadata();
ASSERT_THAT(metadata_result,
HasErrorMessage("Writer must be closed before getting metadata"));
}

TEST(FileWriterTest, MetadataAfterClose) {
MockFileWriter writer;
ArrowArray dummy_array = {};

// Write some data
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
ASSERT_THAT(writer.Write(&dummy_array), IsOk());
ASSERT_THAT(writer.Write(&dummy_array), IsOk());

// Close the writer
ASSERT_THAT(writer.Close(), IsOk());

// Get metadata
auto metadata_result = writer.Metadata();
ASSERT_THAT(metadata_result, IsOk());

const auto& result = *metadata_result;
ASSERT_EQ(result.data_files.size(), 1);

const auto& data_file = result.data_files[0];
ASSERT_EQ(data_file->file_path, "/test/data/file.parquet");
ASSERT_EQ(data_file->file_format, FileFormatType::kParquet);
ASSERT_EQ(data_file->record_count, 300); // 3 writes * 100 records
ASSERT_EQ(data_file->file_size_in_bytes, 3072); // 3 * 1024
}

TEST(FileWriterTest, WriteResultStructure) {
FileWriter::WriteResult result;

// Test that WriteResult can hold multiple data files
auto data_file1 = std::make_shared<DataFile>();
data_file1->file_path = "/test/data/file1.parquet";
data_file1->record_count = 100;

auto data_file2 = std::make_shared<DataFile>();
data_file2->file_path = "/test/data/file2.parquet";
data_file2->record_count = 200;

result.data_files.push_back(data_file1);
result.data_files.push_back(data_file2);

ASSERT_EQ(result.data_files.size(), 2);
ASSERT_EQ(result.data_files[0]->file_path, "/test/data/file1.parquet");
ASSERT_EQ(result.data_files[0]->record_count, 100);
ASSERT_EQ(result.data_files[1]->file_path, "/test/data/file2.parquet");
ASSERT_EQ(result.data_files[1]->record_count, 200);
}

TEST(FileWriterTest, EmptyWriteResult) {
FileWriter::WriteResult result;
ASSERT_EQ(result.data_files.size(), 0);
ASSERT_TRUE(result.data_files.empty());
}

} // namespace iceberg
Loading