Skip to content

Commit 268e58d

Browse files
committed
feat: iceberg v3 variant non-shredding
1 parent 9dbad31 commit 268e58d

28 files changed

Lines changed: 785 additions & 42 deletions

src/iceberg/avro/avro_constants.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ namespace iceberg::avro {
2525

2626
// Avro logical type constants
2727
constexpr std::string_view kMapLogicalType = "map";
28+
constexpr std::string_view kVariantLogicalType = "variant";
2829

2930
// Name mapping field constants
3031
constexpr std::string_view kElement = "element";
3132
constexpr std::string_view kKey = "key";
3233
constexpr std::string_view kValue = "value";
34+
constexpr std::string_view kMetadata = "metadata";
3335

3436
// Avro custom attributes constants
3537
constexpr std::string_view kIcebergFieldNameProp = "iceberg-field-name";

src/iceberg/avro/avro_register.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ void RegisterLogicalTypes() {
2828
// See https://github.com/apache/avro/pull/3326 for details.
2929
::avro::CustomLogicalTypeRegistry::instance().registerType(
3030
"map", [](const std::string&) { return std::make_shared<MapLogicalType>(); });
31+
::avro::CustomLogicalTypeRegistry::instance().registerType(
32+
"variant",
33+
[](const std::string&) { return std::make_shared<VariantLogicalType>(); });
3134
}
3235

3336
void RegisterAll() {

src/iceberg/avro/avro_schema_util.cc

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ ::avro::LogicalType GetMapLogicalType() {
4848
return ::avro::LogicalType(std::make_shared<MapLogicalType>());
4949
}
5050

51+
::avro::LogicalType GetVariantLogicalType() {
52+
return ::avro::LogicalType(std::make_shared<VariantLogicalType>());
53+
}
54+
5155
::avro::CustomAttributes GetAttributesWithFieldId(int32_t field_id) {
5256
::avro::CustomAttributes attributes;
5357
attributes.addAttribute(std::string(kFieldIdProp), std::to_string(field_id),
@@ -242,6 +246,22 @@ Status ToAvroNodeVisitor::Visit(const UnknownType&, ::avro::NodePtr* node) {
242246
return {};
243247
}
244248

249+
Status ToAvroNodeVisitor::Visit(const VariantType& type, ::avro::NodePtr* node) {
250+
*node = std::make_shared<::avro::NodeRecord>();
251+
if (field_ids_.empty()) {
252+
(*node)->setName(::avro::Name(std::string(kVariantLogicalType)));
253+
} else {
254+
(*node)->setName(::avro::Name(std::format("r{}", field_ids_.top())));
255+
}
256+
(*node)->setLogicalType(GetVariantLogicalType());
257+
258+
(*node)->addName(std::string(kMetadata));
259+
(*node)->addLeaf(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BYTES));
260+
(*node)->addName(std::string(kValue));
261+
(*node)->addLeaf(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_BYTES));
262+
return {};
263+
}
264+
245265
Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) {
246266
*node = std::make_shared<::avro::NodeRecord>();
247267

@@ -397,6 +417,10 @@ Status HasIdVisitor::Visit(const ::avro::NodePtr& node) {
397417
}
398418

399419
Status HasIdVisitor::VisitRecord(const ::avro::NodePtr& node) {
420+
if (HasVariantLogicalType(node)) {
421+
return {};
422+
}
423+
400424
static const std::string kFieldIdKey{kFieldIdProp};
401425
total_fields_ += node->leaves();
402426
for (size_t i = 0; i < node->leaves(); ++i) {
@@ -515,6 +539,13 @@ Result<int32_t> GetFieldId(const ::avro::NodePtr& node, size_t field_idx) {
515539
return GetId(node, kFieldIdKey, field_idx);
516540
}
517541

542+
bool IsVariantAvroSchema(const ::avro::NodePtr& node) {
543+
return HasVariantLogicalType(node) && node->type() == ::avro::AVRO_RECORD &&
544+
node->leaves() == 2 && node->names() == 2 && node->nameAt(0) == kMetadata &&
545+
node->nameAt(1) == kValue && node->leafAt(0)->type() == ::avro::AVRO_BYTES &&
546+
node->leafAt(1)->type() == ::avro::AVRO_BYTES;
547+
}
548+
518549
Status ValidateAvroSchemaEvolution(const Type& expected_type,
519550
const ::avro::NodePtr& avro_node) {
520551
if (avro_node->type() == ::avro::AVRO_NULL) {
@@ -626,6 +657,11 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type,
626657
break;
627658
case TypeId::kUnknown:
628659
return {};
660+
case TypeId::kVariant:
661+
if (IsVariantAvroSchema(avro_node)) {
662+
return {};
663+
}
664+
break;
629665
default:
630666
break;
631667
}
@@ -859,7 +895,13 @@ Result<FieldProjection> ProjectNested(const Type& expected_type,
859895
bool HasMapLogicalType(const ::avro::NodePtr& node) {
860896
return node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
861897
node->logicalType().customLogicalType() != nullptr &&
862-
node->logicalType().customLogicalType()->name() == "map";
898+
node->logicalType().customLogicalType()->name() == kMapLogicalType;
899+
}
900+
901+
bool HasVariantLogicalType(const ::avro::NodePtr& node) {
902+
return node->logicalType().type() == ::avro::LogicalType::CUSTOM &&
903+
node->logicalType().customLogicalType() != nullptr &&
904+
node->logicalType().customLogicalType()->name() == kVariantLogicalType;
863905
}
864906

865907
Result<SchemaProjection> Project(const Schema& expected_schema,
@@ -1044,6 +1086,10 @@ Result<::avro::NodePtr> MakeUnionNodeWithFieldIds(const ::avro::NodePtr& origina
10441086
Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original_node,
10451087
const NameMapping& mapping,
10461088
std::vector<std::string>& names) {
1089+
if (HasVariantLogicalType(original_node)) {
1090+
return original_node;
1091+
}
1092+
10471093
switch (original_node->type()) {
10481094
case ::avro::AVRO_RECORD:
10491095
return MakeRecordNodeWithFieldIds(original_node, mapping, names);

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ struct MapLogicalType : public ::avro::CustomLogicalType {
3939
MapLogicalType() : ::avro::CustomLogicalType("map") {}
4040
};
4141

42+
struct VariantLogicalType : public ::avro::CustomLogicalType {
43+
VariantLogicalType() : ::avro::CustomLogicalType("variant") {}
44+
};
45+
4246
/// \brief A visitor that converts an Iceberg type to an Avro node.
4347
class ToAvroNodeVisitor {
4448
public:
@@ -59,6 +63,7 @@ class ToAvroNodeVisitor {
5963
Status Visit(const FixedType& type, ::avro::NodePtr* node);
6064
Status Visit(const BinaryType& type, ::avro::NodePtr* node);
6165
Status Visit(const UnknownType&, ::avro::NodePtr*);
66+
Status Visit(const VariantType& type, ::avro::NodePtr* node);
6267
Status Visit(const StructType& type, ::avro::NodePtr* node);
6368
Status Visit(const ListType& type, ::avro::NodePtr* node);
6469
Status Visit(const MapType& type, ::avro::NodePtr* node);
@@ -152,6 +157,11 @@ std::string ToString(const ::avro::LogicalType::Type& logical_type);
152157
/// \return True if the node has a map logical type, false otherwise.
153158
bool HasMapLogicalType(const ::avro::NodePtr& node);
154159

160+
/// \brief Check if an Avro node has a variant logical type.
161+
/// \param node The Avro node to check.
162+
/// \return True if the node has a variant logical type, false otherwise.
163+
bool HasVariantLogicalType(const ::avro::NodePtr& node);
164+
155165
/// \brief Check if a string is a valid Avro name.
156166
///
157167
/// Valid Avro names must:

src/iceberg/json_serde.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ constexpr std::string_view kType = "type";
7373
constexpr std::string_view kStruct = "struct";
7474
constexpr std::string_view kList = "list";
7575
constexpr std::string_view kMap = "map";
76+
constexpr std::string_view kVariant = "variant";
7677
constexpr std::string_view kElement = "element";
7778
constexpr std::string_view kKey = "key";
7879
constexpr std::string_view kValue = "value";
@@ -379,6 +380,8 @@ nlohmann::json ToJson(const Type& type) {
379380
return "uuid";
380381
case TypeId::kUnknown:
381382
return "unknown";
383+
case TypeId::kVariant:
384+
return "variant";
382385
}
383386
std::unreachable();
384387
}
@@ -506,6 +509,8 @@ Result<std::unique_ptr<Type>> TypeFromJson(const nlohmann::json& json) {
506509
return std::make_unique<UuidType>();
507510
} else if (type_str == "unknown") {
508511
return std::make_unique<UnknownType>();
512+
} else if (type_str == kVariant) {
513+
return std::make_unique<VariantType>();
509514
} else if (type_str.starts_with("fixed")) {
510515
std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])");
511516
std::smatch match;

src/iceberg/metrics_config.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& s
197197
Status Visit(const Type& type) {
198198
if (type.is_nested()) {
199199
return VisitNested(internal::checked_cast<const NestedType&>(type));
200+
} else if (type.is_variant()) {
201+
return VisitVariant(internal::checked_cast<const VariantType&>(type));
200202
} else {
201203
return VisitPrimitive(internal::checked_cast<const PrimitiveType&>(type));
202204
}
@@ -207,8 +209,7 @@ Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& s
207209
if (!ShouldContinue()) {
208210
break;
209211
}
210-
// TODO(zhuo.wang): variant type should also be handled here
211-
if (field.type()->is_primitive()) {
212+
if (field.type()->is_primitive() || field.type()->is_variant()) {
212213
ids_.insert(field.field_id());
213214
}
214215
}
@@ -222,6 +223,7 @@ Result<std::unordered_set<int32_t>> MetricsConfig::LimitFieldIds(const Schema& s
222223
}
223224

224225
Status VisitPrimitive(const PrimitiveType& type) { return {}; }
226+
Status VisitVariant(const VariantType& type) { return {}; }
225227

226228
std::unordered_set<int32_t> Finish() const { return ids_; }
227229

src/iceberg/parquet/parquet_metrics.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,10 @@ class CollectMetricsVisitor {
423423

424424
Status VisitMap(const MapType& /*type*/, const std::string& /*prefix*/) { return {}; }
425425

426+
Status VisitVariant(const VariantType& /*type*/, const std::string& /*prefix*/) {
427+
return {};
428+
}
429+
426430
Status VisitPrimitive(const PrimitiveType& /*type*/, const std::string& /*prefix*/) {
427431
return {};
428432
}

src/iceberg/parquet/parquet_reader.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ class ParquetReader::Impl {
113113
read_schema_ = options.projection;
114114

115115
// Prepare reader properties
116+
ICEBERG_RETURN_UNEXPECTED(EnsureParquetVariantExtensionRegistered());
116117
::parquet::ReaderProperties reader_properties(pool_);
117118
::parquet::ArrowReaderProperties arrow_reader_properties;
118119
arrow_reader_properties.set_batch_size(
@@ -212,6 +213,7 @@ class ParquetReader::Impl {
212213
// Build the output Arrow schema
213214
ArrowSchema arrow_schema;
214215
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema));
216+
ICEBERG_RETURN_UNEXPECTED(EnsureParquetVariantExtensionRegistered());
215217
ICEBERG_ARROW_ASSIGN_OR_RETURN(context_->output_arrow_schema_,
216218
::arrow::ImportSchema(&arrow_schema));
217219

src/iceberg/parquet/parquet_register.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@
1919

2020
#include "iceberg/parquet/parquet_register.h"
2121

22+
#include <tuple>
23+
24+
#include "iceberg/parquet/parquet_schema_util_internal.h"
25+
2226
namespace iceberg::parquet {
2327

2428
void RegisterAll() {
29+
std::ignore = EnsureParquetVariantExtensionRegistered();
2530
RegisterReader();
2631
RegisterWriter();
2732
}

0 commit comments

Comments
 (0)