Skip to content

Commit 87bfae0

Browse files
manuzhangcodex
andcommitted
feat: support Iceberg v3 unknown type
Add an Iceberg unknown primitive type and JSON, Arrow, Avro, Parquet, projection, and data path support for null-only unknown fields. Enforce optionality invariants so required projections cannot be materialized from unknown/null-only fields. Co-authored-by: Codex <codex@openai.com> test: cover forbidden nested type promotions Assert that promotion helpers reject nested type targets for unknown and regular primitive source types. Co-authored-by: Codex <codex@openai.com>
1 parent 100bbe3 commit 87bfae0

36 files changed

Lines changed: 1860 additions & 108 deletions

src/iceberg/avro/avro_data_util.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,11 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node,
457457
const SchemaField& projected_field,
458458
const arrow::MetadataColumnContext& metadata_context,
459459
::arrow::ArrayBuilder* array_builder) {
460+
if (projection.kind == FieldProjection::Kind::kNull) {
461+
ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull());
462+
return {};
463+
}
464+
460465
if (avro_node->type() == ::avro::AVRO_UNION) {
461466
size_t branch = avro_datum.unionBranch();
462467
if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) {
@@ -507,6 +512,9 @@ Status ExtractDatumFromArray(const ::arrow::Array& array, int64_t index,
507512
}
508513

509514
if (array.IsNull(index)) {
515+
if (datum->type() == ::avro::AVRO_NULL) {
516+
return {};
517+
}
510518
if (!datum->isUnion()) [[unlikely]] {
511519
return InvalidSchema("Cannot extract null to non-union type: {}",
512520
::avro::toString(datum->type()));

src/iceberg/avro/avro_direct_decoder.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,12 @@ Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& d
588588
const SchemaField& projected_field,
589589
const arrow::MetadataColumnContext& metadata_context,
590590
::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) {
591+
if (projection.kind == FieldProjection::Kind::kNull) {
592+
ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node, decoder));
593+
ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull());
594+
return {};
595+
}
596+
591597
if (avro_node->type() == ::avro::AVRO_UNION) {
592598
const size_t branch_index = decoder.decodeUnionIndex();
593599

src/iceberg/avro/avro_direct_encoder.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ Status EncodeArrowToAvro(const ::avro::NodePtr& avro_node, ::avro::Encoder& enco
8080
return EncodeArrowToAvro(branches.value_node, encoder, type, array, row_index, ctx);
8181
}
8282

83-
if (is_null) {
83+
if (is_null && avro_node->type() != ::avro::AVRO_NULL) {
8484
return InvalidArgument("Null value in non-nullable field");
8585
}
8686

src/iceberg/avro/avro_schema_util.cc

Lines changed: 52 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,11 @@ Status ToAvroNodeVisitor::Visit(const BinaryType& type, ::avro::NodePtr* node) {
237237
return {};
238238
}
239239

240+
Status ToAvroNodeVisitor::Visit(const UnknownType&, ::avro::NodePtr* node) {
241+
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL);
242+
return {};
243+
}
244+
240245
Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) {
241246
*node = std::make_shared<::avro::NodeRecord>();
242247

@@ -338,7 +343,7 @@ Status ToAvroNodeVisitor::Visit(const SchemaField& field, ::avro::NodePtr* node)
338343
field_ids_.push(field.field_id());
339344
ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*field.type(), /*visitor=*/this, node));
340345

341-
if (field.optional()) {
346+
if (field.optional() && (*node)->type() != ::avro::AVRO_NULL) {
342347
::avro::MultiLeaves union_types;
343348
union_types.add(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL));
344349
union_types.add(std::move(*node));
@@ -383,8 +388,8 @@ Status HasIdVisitor::Visit(const ::avro::NodePtr& node) {
383388
case ::avro::AVRO_STRING:
384389
case ::avro::AVRO_BYTES:
385390
case ::avro::AVRO_FIXED:
386-
return {};
387391
case ::avro::AVRO_NULL:
392+
return {};
388393
case ::avro::AVRO_ENUM:
389394
default:
390395
return InvalidSchema("Unsupported Avro type: {}", static_cast<int>(node->type()));
@@ -512,6 +517,10 @@ Result<int32_t> GetFieldId(const ::avro::NodePtr& node, size_t field_idx) {
512517

513518
Status ValidateAvroSchemaEvolution(const Type& expected_type,
514519
const ::avro::NodePtr& avro_node) {
520+
if (avro_node->type() == ::avro::AVRO_NULL) {
521+
return {};
522+
}
523+
515524
switch (expected_type.type_id()) {
516525
case TypeId::kBoolean:
517526
if (avro_node->type() == ::avro::AVRO_BOOL) {
@@ -615,6 +624,8 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type,
615624
return {};
616625
}
617626
break;
627+
case TypeId::kUnknown:
628+
return {};
618629
default:
619630
break;
620631
}
@@ -650,6 +661,35 @@ Result<FieldProjection> ProjectNested(const Type& expected_type,
650661
const ::avro::NodePtr& avro_node,
651662
bool prune_source);
652663

664+
Result<FieldProjection> ProjectField(const SchemaField& expected_field,
665+
const ::avro::NodePtr& avro_node,
666+
size_t source_index, bool prune_source) {
667+
const Type& expected_type = *expected_field.type();
668+
::avro::NodePtr field_node;
669+
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node, &field_node));
670+
671+
FieldProjection projection;
672+
if (expected_type.type_id() == TypeId::kUnknown ||
673+
field_node->type() == ::avro::AVRO_NULL) {
674+
if (!expected_field.optional()) {
675+
return InvalidSchema("Cannot project required field with ID: {} as null",
676+
expected_field.field_id());
677+
}
678+
projection.kind = FieldProjection::Kind::kNull;
679+
return projection;
680+
}
681+
682+
if (expected_type.is_nested()) {
683+
ICEBERG_ASSIGN_OR_RAISE(projection,
684+
ProjectNested(expected_type, field_node, prune_source));
685+
} else {
686+
ICEBERG_RETURN_UNEXPECTED(ValidateAvroSchemaEvolution(expected_type, field_node));
687+
}
688+
projection.from = source_index;
689+
projection.kind = FieldProjection::Kind::kProjected;
690+
return projection;
691+
}
692+
653693
Result<FieldProjection> ProjectStruct(const StructType& struct_type,
654694
const ::avro::NodePtr& avro_node,
655695
bool prune_source) {
@@ -685,18 +725,9 @@ Result<FieldProjection> ProjectStruct(const StructType& struct_type,
685725
FieldProjection child_projection;
686726

687727
if (auto iter = node_info_map.find(field_id); iter != node_info_map.cend()) {
688-
::avro::NodePtr field_node;
689-
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(iter->second.field_node, &field_node));
690-
if (expected_field.type()->is_nested()) {
691-
ICEBERG_ASSIGN_OR_RAISE(
692-
child_projection,
693-
ProjectNested(*expected_field.type(), field_node, prune_source));
694-
} else {
695-
ICEBERG_RETURN_UNEXPECTED(
696-
ValidateAvroSchemaEvolution(*expected_field.type(), field_node));
697-
}
698-
child_projection.from = iter->second.local_index;
699-
child_projection.kind = FieldProjection::Kind::kProjected;
728+
ICEBERG_ASSIGN_OR_RAISE(child_projection,
729+
ProjectField(expected_field, iter->second.field_node,
730+
iter->second.local_index, prune_source));
700731
} else if (MetadataColumns::IsMetadataColumn(field_id)) {
701732
child_projection.kind = FieldProjection::Kind::kMetadata;
702733
} else if (expected_field.optional()) {
@@ -733,20 +764,9 @@ Result<FieldProjection> ProjectList(const ListType& list_type,
733764
}
734765

735766
FieldProjection element_projection;
736-
::avro::NodePtr element_node;
737-
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node->leafAt(0), &element_node));
738-
if (expected_element_field.type()->is_nested()) {
739-
ICEBERG_ASSIGN_OR_RAISE(
740-
element_projection,
741-
ProjectNested(*expected_element_field.type(), element_node, prune_source));
742-
} else {
743-
ICEBERG_RETURN_UNEXPECTED(
744-
ValidateAvroSchemaEvolution(*expected_element_field.type(), element_node));
745-
}
746-
747-
// Set the element projection metadata but preserve its children
748-
element_projection.kind = FieldProjection::Kind::kProjected;
749-
element_projection.from = size_t{0};
767+
ICEBERG_ASSIGN_OR_RAISE(element_projection,
768+
ProjectField(expected_element_field, avro_node->leafAt(0),
769+
size_t{0}, prune_source));
750770

751771
FieldProjection result;
752772
result.children.emplace_back(std::move(element_projection));
@@ -802,18 +822,10 @@ Result<FieldProjection> ProjectMap(const MapType& map_type,
802822

803823
for (size_t i = 0; i < map_node->leaves(); ++i) {
804824
FieldProjection sub_projection;
805-
::avro::NodePtr sub_node;
806-
ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(map_node->leafAt(i), &sub_node));
807825
const auto& expected_sub_field = map_type.fields()[i];
808-
if (expected_sub_field.type()->is_nested()) {
809-
ICEBERG_ASSIGN_OR_RAISE(sub_projection, ProjectNested(*expected_sub_field.type(),
810-
sub_node, prune_source));
811-
} else {
812-
ICEBERG_RETURN_UNEXPECTED(
813-
ValidateAvroSchemaEvolution(*expected_sub_field.type(), sub_node));
814-
}
815-
sub_projection.kind = FieldProjection::Kind::kProjected;
816-
sub_projection.from = i;
826+
ICEBERG_ASSIGN_OR_RAISE(
827+
sub_projection,
828+
ProjectField(expected_sub_field, map_node->leafAt(i), i, prune_source));
817829
result.children.emplace_back(std::move(sub_projection));
818830
}
819831

@@ -1049,9 +1061,9 @@ Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original
10491061
case ::avro::AVRO_STRING:
10501062
case ::avro::AVRO_BYTES:
10511063
case ::avro::AVRO_FIXED:
1064+
case ::avro::AVRO_NULL:
10521065
// For primitive types, just return a copy
10531066
return original_node;
1054-
case ::avro::AVRO_NULL:
10551067
case ::avro::AVRO_ENUM:
10561068
default:
10571069
return InvalidSchema("Unsupported Avro type for field ID application: {}",

src/iceberg/avro/avro_schema_util_internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class ToAvroNodeVisitor {
5858
Status Visit(const UuidType& type, ::avro::NodePtr* node);
5959
Status Visit(const FixedType& type, ::avro::NodePtr* node);
6060
Status Visit(const BinaryType& type, ::avro::NodePtr* node);
61+
Status Visit(const UnknownType&, ::avro::NodePtr*);
6162
Status Visit(const StructType& type, ::avro::NodePtr* node);
6263
Status Visit(const ListType& type, ::avro::NodePtr* node);
6364
Status Visit(const MapType& type, ::avro::NodePtr* node);

src/iceberg/json_serde.cc

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,8 @@ nlohmann::json ToJson(const Type& type) {
377377
}
378378
case TypeId::kUuid:
379379
return "uuid";
380+
case TypeId::kUnknown:
381+
return "unknown";
380382
}
381383
std::unreachable();
382384
}
@@ -441,12 +443,22 @@ Result<std::unique_ptr<Type>> StructTypeFromJson(const nlohmann::json& json) {
441443
return std::make_unique<StructType>(std::move(fields));
442444
}
443445

446+
Status ValidateUnknownFieldOptional(const Type& type, bool optional,
447+
std::string_view field_name) {
448+
if (type.type_id() == TypeId::kUnknown && !optional) {
449+
return JsonParseError("Unknown type field '{}' must be optional", field_name);
450+
}
451+
return {};
452+
}
453+
444454
Result<std::unique_ptr<Type>> ListTypeFromJson(const nlohmann::json& json) {
445455
ICEBERG_ASSIGN_OR_RAISE(auto element_type, TypeFromJson(json[kElement]));
446456
ICEBERG_ASSIGN_OR_RAISE(auto element_id, GetJsonValue<int32_t>(json, kElementId));
447457
ICEBERG_ASSIGN_OR_RAISE(auto element_required,
448458
GetJsonValue<bool>(json, kElementRequired));
449459

460+
ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldOptional(*element_type, !element_required,
461+
ListType::kElementName));
450462
return std::make_unique<ListType>(
451463
SchemaField(element_id, std::string(ListType::kElementName),
452464
std::move(element_type), !element_required));
@@ -462,6 +474,11 @@ Result<std::unique_ptr<Type>> MapTypeFromJson(const nlohmann::json& json) {
462474
ICEBERG_ASSIGN_OR_RAISE(auto value_id, GetJsonValue<int32_t>(json, kValueId));
463475
ICEBERG_ASSIGN_OR_RAISE(auto value_required, GetJsonValue<bool>(json, kValueRequired));
464476

477+
if (key_type->type_id() == TypeId::kUnknown) {
478+
return JsonParseError("Map 'key' cannot be unknown type");
479+
}
480+
ICEBERG_RETURN_UNEXPECTED(
481+
ValidateUnknownFieldOptional(*value_type, !value_required, MapType::kValueName));
465482
SchemaField key_field(key_id, std::string(MapType::kKeyName), std::move(key_type),
466483
/*optional=*/false);
467484
SchemaField value_field(value_id, std::string(MapType::kValueName),
@@ -502,6 +519,8 @@ Result<std::unique_ptr<Type>> TypeFromJson(const nlohmann::json& json) {
502519
return std::make_unique<BinaryType>();
503520
} else if (type_str == "uuid") {
504521
return std::make_unique<UuidType>();
522+
} else if (type_str == "unknown") {
523+
return std::make_unique<UnknownType>();
505524
} else if (type_str.starts_with("fixed")) {
506525
std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])");
507526
std::smatch match;
@@ -548,6 +567,7 @@ Result<std::unique_ptr<SchemaField>> FieldFromJson(const nlohmann::json& json) {
548567
ICEBERG_ASSIGN_OR_RAISE(auto required, GetJsonValue<bool>(json, kRequired));
549568
ICEBERG_ASSIGN_OR_RAISE(auto doc, GetJsonValueOrDefault<std::string>(json, kDoc));
550569

570+
ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldOptional(*type, !required, name));
551571
return std::make_unique<SchemaField>(field_id, std::move(name), std::move(type),
552572
!required, doc);
553573
}
@@ -949,6 +969,7 @@ Result<std::shared_ptr<Schema>> ParseSchemas(
949969
for (const auto& schema_json : schema_array) {
950970
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<Schema> schema,
951971
SchemaFromJson(schema_json));
972+
ICEBERG_RETURN_UNEXPECTED(schema->Validate(format_version));
952973
if (schema->schema_id() == current_schema_id) {
953974
current_schema = schema;
954975
}
@@ -965,6 +986,7 @@ Result<std::shared_ptr<Schema>> ParseSchemas(
965986
ICEBERG_ASSIGN_OR_RAISE(auto schema_json,
966987
GetJsonValue<nlohmann::json>(json, kSchema));
967988
ICEBERG_ASSIGN_OR_RAISE(current_schema, SchemaFromJson(schema_json));
989+
ICEBERG_RETURN_UNEXPECTED(current_schema->Validate(format_version));
968990
current_schema_id = current_schema->schema_id();
969991
schemas.push_back(current_schema);
970992
}

src/iceberg/parquet/parquet_data_util.cc

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,14 @@ Result<std::shared_ptr<::arrow::Array>> ProjectListArrayImpl(
166166
const auto& output_element_type = output_list_type->value_type();
167167

168168
std::shared_ptr<::arrow::Array> projected_values;
169-
if (element_field.type()->is_nested()) {
169+
if (element_projection.kind == FieldProjection::Kind::kNull) {
170+
ICEBERG_ASSIGN_OR_RAISE(
171+
projected_values,
172+
MakeNullArray(output_element_type, list_array->values()->length(), pool));
173+
} else if (element_projection.kind != FieldProjection::Kind::kProjected) {
174+
return NotImplemented("Unsupported list element projection kind: {}",
175+
ToString(element_projection.kind));
176+
} else if (element_field.type()->is_nested()) {
170177
const auto& nested_type =
171178
internal::checked_cast<const NestedType&>(*element_field.type());
172179
ICEBERG_ASSIGN_OR_RAISE(
@@ -219,7 +226,14 @@ Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(
219226

220227
// Project keys
221228
std::shared_ptr<::arrow::Array> projected_keys;
222-
if (key_type->is_nested()) {
229+
if (key_projection.kind == FieldProjection::Kind::kNull) {
230+
ICEBERG_ASSIGN_OR_RAISE(
231+
projected_keys,
232+
MakeNullArray(output_map_type->key_type(), map_array->keys()->length(), pool));
233+
} else if (key_projection.kind != FieldProjection::Kind::kProjected) {
234+
return NotImplemented("Unsupported map key projection kind: {}",
235+
ToString(key_projection.kind));
236+
} else if (key_type->is_nested()) {
223237
const auto& nested_type = internal::checked_cast<const NestedType&>(*key_type);
224238
ICEBERG_ASSIGN_OR_RAISE(
225239
projected_keys,
@@ -233,7 +247,14 @@ Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(
233247

234248
// Project values
235249
std::shared_ptr<::arrow::Array> projected_items;
236-
if (value_type->is_nested()) {
250+
if (value_projection.kind == FieldProjection::Kind::kNull) {
251+
ICEBERG_ASSIGN_OR_RAISE(
252+
projected_items,
253+
MakeNullArray(output_map_type->item_type(), map_array->items()->length(), pool));
254+
} else if (value_projection.kind != FieldProjection::Kind::kProjected) {
255+
return NotImplemented("Unsupported map value projection kind: {}",
256+
ToString(value_projection.kind));
257+
} else if (value_type->is_nested()) {
237258
const auto& nested_type = internal::checked_cast<const NestedType&>(*value_type);
238259
ICEBERG_ASSIGN_OR_RAISE(
239260
projected_items,

0 commit comments

Comments
 (0)