Skip to content

Commit b0352a5

Browse files
committed
feat: iceberg v3 variant non-shredding
1 parent 35dde9a commit b0352a5

27 files changed

Lines changed: 795 additions & 50 deletions

src/iceberg/avro/avro_constants.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ namespace iceberg::avro {
2525

2626
// Avro logical type constants
2727
constexpr std::string_view kMapLogicalType = "map";
28+
constexpr std::string_view kVariantLogicalType = "variant";
2829

2930
// Name mapping field constants
3031
constexpr std::string_view kElement = "element";
3132
constexpr std::string_view kKey = "key";
3233
constexpr std::string_view kValue = "value";
34+
constexpr std::string_view kMetadata = "metadata";
3335

3436
// Avro custom attributes constants
3537
constexpr std::string_view kIcebergFieldNameProp = "iceberg-field-name";

src/iceberg/avro/avro_register.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ void RegisterLogicalTypes() {
2828
// See https://github.com/apache/avro/pull/3326 for details.
2929
::avro::CustomLogicalTypeRegistry::instance().registerType(
3030
"map", [](const std::string&) { return std::make_shared<MapLogicalType>(); });
31+
::avro::CustomLogicalTypeRegistry::instance().registerType(
32+
"variant",
33+
[](const std::string&) { return std::make_shared<VariantLogicalType>(); });
3134
}
3235

3336
void RegisterAll() {

src/iceberg/avro/avro_schema_util.cc

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ ::avro::LogicalType GetMapLogicalType() {
4848
return ::avro::LogicalType(std::make_shared<MapLogicalType>());
4949
}
5050

51+
::avro::LogicalType GetVariantLogicalType() {
52+
return ::avro::LogicalType(std::make_shared<VariantLogicalType>());
53+
}
54+
5155
::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) {
5256
::avro::CustomAttributes attributes;
5357
attributes.addAttribute(std::string(kFieldIdProp), std::to_string(field_id),
@@ -237,6 +241,22 @@ Status ToAvroNodeVisitor::Visit(const BinaryType& type, ::avro::NodePtr* node) {
237241
return {};
238242
}
239243

244+
Status ToAvroNodeVisitor::Visit(const VariantType& type, ::avro::NodePtr* node) {
245+
*node = std::make_shared<::avro::NodeRecord>();
246+
if (field_ids_.empty()) {
247+
(*node)->setName(::avro::Name(std::string(kVariantLogicalType)));
248+
} else {
249+
(*node)->setName(::avro::Name(std::format("r{}", field_ids_.top())));
250+
}
251+
(*node)->setLogicalType(GetVariantLogicalType());
252+
253+
(*node)->addName(std::string(kMetadata));
254+
(*node)->addLeaf(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BYTES));
255+
(*node)->addName(std::string(kValue));
256+
(*node)->addLeaf(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BYTES));
257+
return {};
258+
}
259+
240260
Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) {
241261
*node = std::make_shared<::avro::NodeRecord>();
242262

@@ -392,6 +412,10 @@ Status HasIdVisitor::Visit(const ::avro::NodePtr& node) {
392412
}
393413

394414
Status HasIdVisitor::VisitRecord(const ::avro::NodePtr& node) {
415+
if (HasVariantLogicalType(node)) {
416+
return {};
417+
}
418+
395419
static const std::string kFieldIdKey{kFieldIdProp};
396420
total_fields_ += node->leaves();
397421
for (size_t i = 0; i < node->leaves(); ++i) {
@@ -510,6 +534,13 @@ Result<int32_t> GetFieldId(const ::avro::NodePtr& node, size_t field_idx) {
510534
return GetId(node, kFieldIdKey, field_idx);
511535
}
512536

537+
bool IsVariantAvroSchema(const ::avro::NodePtr& node) {
538+
return HasVariantLogicalType(node) && node->type() == ::avro::AVRO_RECORD &&
539+
node->leaves() == 2 && node->names() == 2 && node->nameAt(0) == kMetadata &&
540+
node->nameAt(1) == kValue && node->leafAt(0)->type() == ::avro::AVRO_BYTES &&
541+
node->leafAt(1)->type() == ::avro::AVRO_BYTES;
542+
}
543+
513544
Status ValidateAvroSchemaEvolution(const Type& expected_type,
514545
const ::avro::NodePtr& avro_node) {
515546
switch (expected_type.type_id()) {
@@ -615,6 +646,11 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type,
615646
return {};
616647
}
617648
break;
649+
case TypeId::kVariant:
650+
if (IsVariantAvroSchema(avro_node)) {
651+
return {};
652+
}
653+
break;
618654
default:
619655
break;
620656
}
@@ -847,7 +883,13 @@ Result<FieldProjection> ProjectNested(const Type& expected_type,
847883
bool HasMapLogicalType(const ::avro::NodePtr& node) {
848884
return node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
849885
node->logicalType().customLogicalType() != nullptr &&
850-
node->logicalType().customLogicalType()->name() == "map";
886+
node->logicalType().customLogicalType()->name() == kMapLogicalType;
887+
}
888+
889+
bool HasVariantLogicalType(const ::avro::NodePtr& node) {
890+
return node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
891+
node->logicalType().customLogicalType() != nullptr &&
892+
node->logicalType().customLogicalType()->name() == kVariantLogicalType;
851893
}
852894

853895
Result<SchemaProjection> Project(const Schema& expected_schema,
@@ -1032,6 +1074,10 @@ Result<::avro::NodePtr> MakeUnionNodeWithFieldIds(const ::avro::NodePtr& origina
10321074
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
10331075
const NameMapping& mapping,
10341076
std::vector<std::string>& names) {
1077+
if (HasVariantLogicalType(original_node)) {
1078+
return original_node;
1079+
}
1080+
10351081
switch (original_node->type()) {
10361082
case ::avro::AVRO_RECORD:
10371083
return MakeRecordNodeWithFieldIds(original_node, mapping, names);

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ struct MapLogicalType : public ::avro::CustomLogicalType {
3939
MapLogicalType() : ::avro::CustomLogicalType("map") {}
4040
};
4141

42+
struct VariantLogicalType : public ::avro::CustomLogicalType {
43+
VariantLogicalType() : ::avro::CustomLogicalType("variant") {}
44+
};
45+
4246
/// \brief A visitor that converts an Iceberg type to an Avro node.
4347
class ToAvroNodeVisitor {
4448
public:
@@ -58,6 +62,7 @@ class ToAvroNodeVisitor {
5862
Status Visit(const UuidType& type, ::avro::NodePtr* node);
5963
Status Visit(const FixedType& type, ::avro::NodePtr* node);
6064
Status Visit(const BinaryType& type, ::avro::NodePtr* node);
65+
Status Visit(const VariantType& type, ::avro::NodePtr* node);
6166
Status Visit(const StructType& type, ::avro::NodePtr* node);
6267
Status Visit(const ListType& type, ::avro::NodePtr* node);
6368
Status Visit(const MapType& type, ::avro::NodePtr* node);
@@ -151,6 +156,11 @@ std::string ToString(const ::avro::LogicalType::Type& logical_type);
151156
/// \return True if the node has a map logical type, false otherwise.
152157
bool HasMapLogicalType(const ::avro::NodePtr& node);
153158

159+
/// \brief Check if an Avro node has a variant logical type.
160+
/// \param node The Avro node to check.
161+
/// \return True if the node has a variant logical type, false otherwise.
162+
bool HasVariantLogicalType(const ::avro::NodePtr& node);
163+
154164
/// \brief Check if a string is a valid Avro name.
155165
///
156166
/// Valid Avro names must:

src/iceberg/json_serde.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ constexpr std::string_view kType = "type";
7373
constexpr std::string_view kStruct = "struct";
7474
constexpr std::string_view kList = "list";
7575
constexpr std::string_view kMap = "map";
76+
constexpr std::string_view kVariant = "variant";
7677
constexpr std::string_view kElement = "element";
7778
constexpr std::string_view kKey = "key";
7879
constexpr std::string_view kValue = "value";
@@ -377,6 +378,8 @@ nlohmann::json ToJson(const Type& type) {
377378
}
378379
case TypeId::kUuid:
379380
return "uuid";
381+
case TypeId::kVariant:
382+
return "variant";
380383
}
381384
std::unreachable();
382385
}
@@ -502,6 +505,8 @@ Result<std::unique_ptr<Type>> TypeFromJson(const nlohmann::json& json) {
502505
return std::make_unique<BinaryType>();
503506
} else if (type_str == "uuid") {
504507
return std::make_unique<UuidType>();
508+
} else if (type_str == kVariant) {
509+
return std::make_unique<VariantType>();
505510
} else if (type_str.starts_with("fixed")) {
506511
std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])");
507512
std::smatch match;

src/iceberg/metrics_config.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& s
174174
Status Visit(const Type& type) {
175175
if (type.is_nested()) {
176176
return VisitNested(internal::checked_cast<const NestedType&>(type));
177+
} else if (type.is_variant()) {
178+
return VisitVariant(internal::checked_cast<const VariantType&>(type));
177179
} else {
178180
return VisitPrimitive(internal::checked_cast<const PrimitiveType&>(type));
179181
}
@@ -184,8 +186,7 @@ Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& s
184186
if (!ShouldContinue()) {
185187
break;
186188
}
187-
// TODO(zhuo.wang): variant type should also be handled here
188-
if (field.type()->is_primitive()) {
189+
if (field.type()->is_primitive() || field.type()->is_variant()) {
189190
ids_.insert(field.field_id());
190191
}
191192
}
@@ -199,6 +200,7 @@ Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& s
199200
}
200201

201202
Status VisitPrimitive(const PrimitiveType& type) { return {}; }
203+
Status VisitVariant(const VariantType& type) { return {}; }
202204

203205
std::unordered_set<int32_t> Finish() const { return ids_; }
204206

src/iceberg/parquet/parquet_reader.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ class ParquetReader::Impl {
113113
read_schema_ = options.projection;
114114

115115
// Prepare reader properties
116+
ICEBERG_RETURN_UNEXPECTED(EnsureParquetVariantExtensionRegistered());
116117
::parquet::ReaderProperties reader_properties(pool_);
117118
::parquet::ArrowReaderProperties arrow_reader_properties;
118119
arrow_reader_properties.set_batch_size(
@@ -212,6 +213,7 @@ class ParquetReader::Impl {
212213
// Build the output Arrow schema
213214
ArrowSchema arrow_schema;
214215
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema));
216+
ICEBERG_RETURN_UNEXPECTED(EnsureParquetVariantExtensionRegistered());
215217
ICEBERG_ARROW_ASSIGN_OR_RETURN(context_->output_arrow_schema_,
216218
::arrow::ImportSchema(&arrow_schema));
217219

src/iceberg/parquet/parquet_register.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@
1919

2020
#include "iceberg/parquet/parquet_register.h"
2121

22+
#include <tuple>
23+
24+
#include "iceberg/parquet/parquet_schema_util_internal.h"
25+
2226
namespace iceberg::parquet {
2327

2428
void RegisterAll() {
29+
std::ignore = EnsureParquetVariantExtensionRegistered();
2530
RegisterReader();
2631
RegisterWriter();
2732
}

0 commit comments

Comments
 (0)