Skip to content

Commit dde8a92

Browse files
committed
feat(avro): support writing multiple blocks
1 parent 39a9c8b commit dde8a92

2 files changed

Lines changed: 54 additions & 1 deletion

File tree

src/iceberg/avro/avro_writer.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class DirectEncoderBackend : public AvroWriteBackend {
8080

8181
Status WriteRow(const Schema& write_schema, const ::arrow::Array& array,
8282
int64_t row_index) override {
83+
writer_->syncIfNeeded();
8384
ICEBERG_RETURN_UNEXPECTED(EncodeArrowToAvro(avro_root_node_, writer_->encoder(),
8485
write_schema, array, row_index,
8586
encode_ctx_));

src/iceberg/test/avro_test.cc

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,25 @@
1818
*/
1919

2020
#include <sstream>
21+
#include <unordered_map>
2122

2223
#include <arrow/array/array_base.h>
2324
#include <arrow/c/bridge.h>
2425
#include <arrow/json/from_string.h>
26+
#include <avro/DataFile.hh>
27+
#include <avro/Generic.hh>
2528
#include <gtest/gtest.h>
2629

2730
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
2831
#include "iceberg/avro/avro_register.h"
32+
#include "iceberg/avro/avro_stream_internal.h"
2933
#include "iceberg/avro/avro_writer.h"
3034
#include "iceberg/file_reader.h"
3135
#include "iceberg/schema.h"
3236
#include "iceberg/schema_internal.h"
3337
#include "iceberg/test/matchers.h"
3438
#include "iceberg/type.h"
39+
#include "iceberg/util/checked_cast.h"
3540

3641
namespace iceberg::avro {
3742

@@ -527,7 +532,9 @@ class AvroWriterTest : public ::testing::Test,
527532
skip_datum_ = GetParam();
528533
}
529534

530-
void WriteAvroFile(std::shared_ptr<Schema> schema, const std::string& json_data) {
535+
void WriteAvroFile(
536+
std::shared_ptr<Schema> schema, const std::string& json_data,
537+
const std::unordered_map<std::string, std::string>& extra_properties = {}) {
531538
ArrowSchema arrow_c_schema;
532539
ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
533540

@@ -548,6 +555,9 @@ class AvroWriterTest : public ::testing::Test,
548555

549556
auto writer_properties = WriterProperties::default_properties();
550557
writer_properties->Set(WriterProperties::kAvroSkipDatum, skip_datum_);
558+
for (const auto& [key, value] : extra_properties) {
559+
writer_properties->mutable_configs().emplace(key, value);
560+
}
551561

552562
auto writer_result = WriterFactoryRegistry::Open(
553563
FileFormatType::kAvro, {.path = temp_avro_file_,
@@ -772,6 +782,48 @@ TEST_P(AvroWriterTest, WriteLargeDataset) {
772782
VerifyWrittenData(json.str());
773783
}
774784

785+
TEST_P(AvroWriterTest, MultipleAvroBlocks) {
786+
auto schema = std::make_shared<Schema>(
787+
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
788+
SchemaField::MakeRequired(2, "name", string())});
789+
790+
const std::string json_data = R"([
791+
[1, "Alice_with_a_very_long_name_to_exceed_sync_interval"],
792+
[2, "Bob_with_another_very_long_name_to_exceed_sync_interval"],
793+
[3, "Charlie_with_yet_another_very_long_name_to_exceed_sync"],
794+
[4, "David_with_a_super_long_name_that_will_exceed_interval"],
795+
[5, "Eve_with_an_extremely_long_name_to_force_new_block_here"]
796+
])";
797+
798+
const std::vector<std::pair</*sync_interval*/ std::string, /*num_blocks*/ size_t>>
799+
test_cases = {{"32", 5}, {"65536", 1}};
800+
801+
for (const auto& [interval, num_blocks] : test_cases) {
802+
WriteAvroFile(schema, json_data,
803+
{{WriterProperties::kAvroSyncInterval.key(), interval}});
804+
VerifyWrittenData(json_data);
805+
806+
// Use raw avro-cpp reader to count blocks by tracking previousSync() changes
807+
auto mock_io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(file_io_);
808+
auto input = mock_io->fs()->OpenInputFile(temp_avro_file_).ValueOrDie();
809+
auto input_stream = std::make_unique<AvroInputStream>(std::move(input), 1024 * 1024);
810+
::avro::DataFileReader<::avro::GenericDatum> avro_reader(std::move(input_stream));
811+
::avro::GenericDatum datum(avro_reader.dataSchema());
812+
813+
size_t block_count = 0;
814+
int64_t last_sync = -1;
815+
816+
while (avro_reader.read(datum)) {
817+
if (int64_t current_sync = avro_reader.previousSync(); current_sync != last_sync) {
818+
block_count++;
819+
last_sync = current_sync;
820+
}
821+
}
822+
823+
ASSERT_EQ(block_count, num_blocks);
824+
}
825+
}
826+
775827
// Instantiate parameterized tests for both direct encoder and GenericDatum paths
776828
INSTANTIATE_TEST_SUITE_P(DirectEncoderModes, AvroWriterTest,
777829
::testing::Values(true, false),

0 commit comments

Comments
 (0)