Skip to content

Commit 5d6eb36

Browse files
manuzhangcodex
andcommitted
feat: support Iceberg v3 unknown type
Add an Iceberg unknown primitive type and JSON, Arrow, Avro, Parquet, projection, and data path support for null-only unknown fields. Enforce optionality invariants so required projections cannot be materialized from unknown/null-only fields. Co-authored-by: Codex <codex@openai.com>
1 parent d690aaa commit 5d6eb36

30 files changed

Lines changed: 1251 additions & 99 deletions

src/iceberg/avro/avro_data_util.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,11 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node,
444444
const SchemaField& projected_field,
445445
const arrow::MetadataColumnContext& metadata_context,
446446
::arrow::ArrayBuilder* array_builder) {
447+
if (projection.kind == FieldProjection::Kind::kNull) {
448+
ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull());
449+
return {};
450+
}
451+
447452
if (avro_node->type() == ::avro::AVRO_UNION) {
448453
size_t branch = avro_datum.unionBranch();
449454
if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) {
@@ -494,6 +499,9 @@ Status ExtractDatumFromArray(const ::arrow::Array& array, int64_t index,
494499
}
495500

496501
if (array.IsNull(index)) {
502+
if (datum->type() == ::avro::AVRO_NULL) {
503+
return {};
504+
}
497505
if (!datum->isUnion()) [[unlikely]] {
498506
return InvalidSchema("Cannot extract null to non-union type: {}",
499507
::avro::toString(datum->type()));

src/iceberg/avro/avro_direct_decoder.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,12 @@ Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& d
574574
const SchemaField& projected_field,
575575
const arrow::MetadataColumnContext& metadata_context,
576576
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) {
577+
if (projection.kind == FieldProjection::Kind::kNull) {
578+
ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node, decoder));
579+
ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull());
580+
return {};
581+
}
582+
577583
if (avro_node->type() == ::avro::AVRO_UNION) {
578584
const size_t branch_index = decoder.decodeUnionIndex();
579585

src/iceberg/avro/avro_direct_encoder.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ Status EncodeArrowToAvro(const ::avro::NodePtr& avro_node, ::avro::Encoder& enco
8080
return EncodeArrowToAvro(branches.value_node, encoder, type, array, row_index, ctx);
8181
}
8282

83-
if (is_null) {
83+
if (is_null && avro_node->type() != ::avro::AVRO_NULL) {
8484
return InvalidArgument("Null value in non-nullable field");
8585
}
8686

src/iceberg/avro/avro_schema_util.cc

Lines changed: 52 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,11 @@ Status ToAvroNodeVisitor::Visit(const BinaryType& type, ::avro::NodePtr* node) {
219219
return {};
220220
}
221221

222+
Status ToAvroNodeVisitor::Visit(const UnknownType&, ::avro::NodePtr* node) {
223+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL);
224+
return {};
225+
}
226+
222227
Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) {
223228
*node = std::make_shared<::avro::NodeRecord>();
224229

@@ -320,7 +325,7 @@ Status ToAvroNodeVisitor::Visit(const SchemaField& field, ::avro::NodePtr* node)
320325
field_ids_.push(field.field_id());
321326
ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*field.type(), /*visitor=*/this, node));
322327

323-
if (field.optional()) {
328+
if (field.optional() && (*node)->type() != ::avro::AVRO_NULL) {
324329
::avro::MultiLeaves union_types;
325330
union_types.add(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL));
326331
union_types.add(std::move(*node));
@@ -365,8 +370,8 @@ Status HasIdVisitor::Visit(const ::avro::NodePtr& node) {
365370
case ::avro::AVRO_STRING:
366371
case ::avro::AVRO_BYTES:
367372
case ::avro::AVRO_FIXED:
368-
return {};
369373
case ::avro::AVRO_NULL:
374+
return {};
370375
case ::avro::AVRO_ENUM:
371376
default:
372377
return InvalidSchema("Unsupported Avro type: {}", static_cast<int>(node->type()));
@@ -494,6 +499,10 @@ Result<int32_t> GetFieldId(const ::avro::NodePtr& node, size_t field_idx) {
494499

495500
Status ValidateAvroSchemaEvolution(const Type& expected_type,
496501
const ::avro::NodePtr& avro_node) {
502+
if (avro_node->type() == ::avro::AVRO_NULL) {
503+
return {};
504+
}
505+
497506
switch (expected_type.type_id()) {
498507
case TypeId::kBoolean:
499508
if (avro_node->type() == ::avro::AVRO_BOOL) {
@@ -583,6 +592,8 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type,
583592
return {};
584593
}
585594
break;
595+
case TypeId::kUnknown:
596+
return {};
586597
default:
587598
break;
588599
}
@@ -618,6 +629,35 @@ Result<FieldProjection> ProjectNested(const Type& expected_type,
618629
const ::avro::NodePtr& avro_node,
619630
bool prune_source);
620631

632+
Result<FieldProjection> ProjectField(const SchemaField& expected_field,
633+
const ::avro::NodePtr& avro_node,
634+
size_t source_index, bool prune_source) {
635+
const Type& expected_type = *expected_field.type();
636+
::avro::NodePtr field_node;
637+
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node, &field_node));
638+
639+
FieldProjection projection;
640+
if (expected_type.type_id() == TypeId::kUnknown ||
641+
field_node->type() == ::avro::AVRO_NULL) {
642+
if (!expected_field.optional()) {
643+
return InvalidSchema("Cannot project required field with ID: {} as null",
644+
expected_field.field_id());
645+
}
646+
projection.kind = FieldProjection::Kind::kNull;
647+
return projection;
648+
}
649+
650+
if (expected_type.is_nested()) {
651+
ICEBERG_ASSIGN_OR_RAISE(projection,
652+
ProjectNested(expected_type, field_node, prune_source));
653+
} else {
654+
ICEBERG_RETURN_UNEXPECTED(ValidateAvroSchemaEvolution(expected_type, field_node));
655+
}
656+
projection.from = source_index;
657+
projection.kind = FieldProjection::Kind::kProjected;
658+
return projection;
659+
}
660+
621661
Result<FieldProjection> ProjectStruct(const StructType& struct_type,
622662
const ::avro::NodePtr& avro_node,
623663
bool prune_source) {
@@ -653,18 +693,9 @@ Result<FieldProjection> ProjectStruct(const StructType& struct_type,
653693
FieldProjection child_projection;
654694

655695
if (auto iter = node_info_map.find(field_id); iter != node_info_map.cend()) {
656-
::avro::NodePtr field_node;
657-
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(iter->second.field_node, &field_node));
658-
if (expected_field.type()->is_nested()) {
659-
ICEBERG_ASSIGN_OR_RAISE(
660-
child_projection,
661-
ProjectNested(*expected_field.type(), field_node, prune_source));
662-
} else {
663-
ICEBERG_RETURN_UNEXPECTED(
664-
ValidateAvroSchemaEvolution(*expected_field.type(), field_node));
665-
}
666-
child_projection.from = iter->second.local_index;
667-
child_projection.kind = FieldProjection::Kind::kProjected;
696+
ICEBERG_ASSIGN_OR_RAISE(child_projection,
697+
ProjectField(expected_field, iter->second.field_node,
698+
iter->second.local_index, prune_source));
668699
} else if (MetadataColumns::IsMetadataColumn(field_id)) {
669700
child_projection.kind = FieldProjection::Kind::kMetadata;
670701
} else if (expected_field.optional()) {
@@ -701,20 +732,9 @@ Result<FieldProjection> ProjectList(const ListType& list_type,
701732
}
702733

703734
FieldProjection element_projection;
704-
::avro::NodePtr element_node;
705-
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node->leafAt(0), &element_node));
706-
if (expected_element_field.type()->is_nested()) {
707-
ICEBERG_ASSIGN_OR_RAISE(
708-
element_projection,
709-
ProjectNested(*expected_element_field.type(), element_node, prune_source));
710-
} else {
711-
ICEBERG_RETURN_UNEXPECTED(
712-
ValidateAvroSchemaEvolution(*expected_element_field.type(), element_node));
713-
}
714-
715-
// Set the element projection metadata but preserve its children
716-
element_projection.kind = FieldProjection::Kind::kProjected;
717-
element_projection.from = size_t{0};
735+
ICEBERG_ASSIGN_OR_RAISE(element_projection,
736+
ProjectField(expected_element_field, avro_node->leafAt(0),
737+
size_t{0}, prune_source));
718738

719739
FieldProjection result;
720740
result.children.emplace_back(std::move(element_projection));
@@ -770,18 +790,10 @@ Result<FieldProjection> ProjectMap(const MapType& map_type,
770790

771791
for (size_t i = 0; i < map_node->leaves(); ++i) {
772792
FieldProjection sub_projection;
773-
::avro::NodePtr sub_node;
774-
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(map_node->leafAt(i), &sub_node));
775793
const auto& expected_sub_field = map_type.fields()[i];
776-
if (expected_sub_field.type()->is_nested()) {
777-
ICEBERG_ASSIGN_OR_RAISE(sub_projection, ProjectNested(*expected_sub_field.type(),
778-
sub_node, prune_source));
779-
} else {
780-
ICEBERG_RETURN_UNEXPECTED(
781-
ValidateAvroSchemaEvolution(*expected_sub_field.type(), sub_node));
782-
}
783-
sub_projection.kind = FieldProjection::Kind::kProjected;
784-
sub_projection.from = i;
794+
ICEBERG_ASSIGN_OR_RAISE(
795+
sub_projection,
796+
ProjectField(expected_sub_field, map_node->leafAt(i), i, prune_source));
785797
result.children.emplace_back(std::move(sub_projection));
786798
}
787799

@@ -1017,9 +1029,9 @@ Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original
10171029
case ::avro::AVRO_STRING:
10181030
case ::avro::AVRO_BYTES:
10191031
case ::avro::AVRO_FIXED:
1032+
case ::avro::AVRO_NULL:
10201033
// For primitive types, just return a copy
10211034
return original_node;
1022-
case ::avro::AVRO_NULL:
10231035
case ::avro::AVRO_ENUM:
10241036
default:
10251037
return InvalidSchema("Unsupported Avro type for field ID application: {}",

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class ToAvroNodeVisitor {
5656
Status Visit(const UuidType& type, ::avro::NodePtr* node);
5757
Status Visit(const FixedType& type, ::avro::NodePtr* node);
5858
Status Visit(const BinaryType& type, ::avro::NodePtr* node);
59+
Status Visit(const UnknownType&, ::avro::NodePtr*);
5960
Status Visit(const StructType& type, ::avro::NodePtr* node);
6061
Status Visit(const ListType& type, ::avro::NodePtr* node);
6162
Status Visit(const MapType& type, ::avro::NodePtr* node);

src/iceberg/json_serde.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,8 @@ nlohmann::json ToJson(const Type& type) {
373373
}
374374
case TypeId::kUuid:
375375
return "uuid";
376+
case TypeId::kUnknown:
377+
return "unknown";
376378
}
377379
std::unreachable();
378380
}
@@ -437,12 +439,22 @@ Result<std::unique_ptr<Type>> StructTypeFromJson(const nlohmann::json& json) {
437439
return std::make_unique<StructType>(std::move(fields));
438440
}
439441

442+
Status ValidateUnknownFieldOptional(const Type& type, bool optional,
443+
std::string_view field_name) {
444+
if (type.type_id() == TypeId::kUnknown && !optional) {
445+
return JsonParseError("Unknown type field '{}' must be optional", field_name);
446+
}
447+
return {};
448+
}
449+
440450
Result<std::unique_ptr<Type>> ListTypeFromJson(const nlohmann::json& json) {
441451
ICEBERG_ASSIGN_OR_RAISE(auto element_type, TypeFromJson(json[kElement]));
442452
ICEBERG_ASSIGN_OR_RAISE(auto element_id, GetJsonValue<int32_t>(json, kElementId));
443453
ICEBERG_ASSIGN_OR_RAISE(auto element_required,
444454
GetJsonValue<bool>(json, kElementRequired));
445455

456+
ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldOptional(*element_type, !element_required,
457+
ListType::kElementName));
446458
return std::make_unique<ListType>(
447459
SchemaField(element_id, std::string(ListType::kElementName),
448460
std::move(element_type), !element_required));
@@ -458,6 +470,10 @@ Result<std::unique_ptr<Type>> MapTypeFromJson(const nlohmann::json& json) {
458470
ICEBERG_ASSIGN_OR_RAISE(auto value_id, GetJsonValue<int32_t>(json, kValueId));
459471
ICEBERG_ASSIGN_OR_RAISE(auto value_required, GetJsonValue<bool>(json, kValueRequired));
460472

473+
ICEBERG_RETURN_UNEXPECTED(
474+
ValidateUnknownFieldOptional(*key_type, /*optional=*/false, MapType::kKeyName));
475+
ICEBERG_RETURN_UNEXPECTED(
476+
ValidateUnknownFieldOptional(*value_type, !value_required, MapType::kValueName));
461477
SchemaField key_field(key_id, std::string(MapType::kKeyName), std::move(key_type),
462478
/*optional=*/false);
463479
SchemaField value_field(value_id, std::string(MapType::kValueName),
@@ -494,6 +510,8 @@ Result<std::unique_ptr<Type>> TypeFromJson(const nlohmann::json& json) {
494510
return std::make_unique<BinaryType>();
495511
} else if (type_str == "uuid") {
496512
return std::make_unique<UuidType>();
513+
} else if (type_str == "unknown") {
514+
return std::make_unique<UnknownType>();
497515
} else if (type_str.starts_with("fixed")) {
498516
std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])");
499517
std::smatch match;
@@ -540,6 +558,7 @@ Result<std::unique_ptr<SchemaField>> FieldFromJson(const nlohmann::json& json) {
540558
ICEBERG_ASSIGN_OR_RAISE(auto required, GetJsonValue<bool>(json, kRequired));
541559
ICEBERG_ASSIGN_OR_RAISE(auto doc, GetJsonValueOrDefault<std::string>(json, kDoc));
542560

561+
ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldOptional(*type, !required, name));
543562
return std::make_unique<SchemaField>(field_id, std::move(name), std::move(type),
544563
!required, doc);
545564
}

src/iceberg/parquet/parquet_data_util.cc

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,14 @@ Result<std::shared_ptr<::arrow::Array>> ProjectListArrayImpl(
166166
const auto& output_element_type = output_list_type->value_type();
167167

168168
std::shared_ptr<::arrow::Array> projected_values;
169-
if (element_field.type()->is_nested()) {
169+
if (element_projection.kind == FieldProjection::Kind::kNull) {
170+
ICEBERG_ASSIGN_OR_RAISE(
171+
projected_values,
172+
MakeNullArray(output_element_type, list_array->values()->length(), pool));
173+
} else if (element_projection.kind != FieldProjection::Kind::kProjected) {
174+
return NotImplemented("Unsupported list element projection kind: {}",
175+
ToString(element_projection.kind));
176+
} else if (element_field.type()->is_nested()) {
170177
const auto& nested_type =
171178
internal::checked_cast<const NestedType&>(*element_field.type());
172179
ICEBERG_ASSIGN_OR_RAISE(
@@ -219,7 +226,14 @@ Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(
219226

220227
// Project keys
221228
std::shared_ptr<::arrow::Array> projected_keys;
222-
if (key_type->is_nested()) {
229+
if (key_projection.kind == FieldProjection::Kind::kNull) {
230+
ICEBERG_ASSIGN_OR_RAISE(
231+
projected_keys,
232+
MakeNullArray(output_map_type->key_type(), map_array->keys()->length(), pool));
233+
} else if (key_projection.kind != FieldProjection::Kind::kProjected) {
234+
return NotImplemented("Unsupported map key projection kind: {}",
235+
ToString(key_projection.kind));
236+
} else if (key_type->is_nested()) {
223237
const auto& nested_type = internal::checked_cast<const NestedType&>(*key_type);
224238
ICEBERG_ASSIGN_OR_RAISE(
225239
projected_keys,
@@ -233,7 +247,14 @@ Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(
233247

234248
// Project values
235249
std::shared_ptr<::arrow::Array> projected_items;
236-
if (value_type->is_nested()) {
250+
if (value_projection.kind == FieldProjection::Kind::kNull) {
251+
ICEBERG_ASSIGN_OR_RAISE(
252+
projected_items,
253+
MakeNullArray(output_map_type->item_type(), map_array->items()->length(), pool));
254+
} else if (value_projection.kind != FieldProjection::Kind::kProjected) {
255+
return NotImplemented("Unsupported map value projection kind: {}",
256+
ToString(value_projection.kind));
257+
} else if (value_type->is_nested()) {
237258
const auto& nested_type = internal::checked_cast<const NestedType&>(*value_type);
238259
ICEBERG_ASSIGN_OR_RAISE(
239260
projected_items,

0 commit comments

Comments
 (0)