Skip to content

Commit 26ccf9b

Browse files
manuzhangcodex
andcommitted
fix: address unknown type review feedback
Enforce unknown as a v3-only optional type in schema validation and update paths, disallow schema-update promotion from unknown, and skip unknown fields when writing Parquet physical schemas. Co-authored-by: Codex <codex@openai.com>
1 parent 5d6eb36 commit 26ccf9b

9 files changed

Lines changed: 241 additions & 15 deletions

File tree

src/iceberg/parquet/parquet_writer.cc

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
#include "iceberg/parquet/parquet_writer.h"
2121

2222
#include <memory>
23+
#include <optional>
24+
#include <utility>
25+
#include <vector>
2326

2427
#include <arrow/c/bridge.h>
2528
#include <arrow/record_batch.h>
@@ -31,7 +34,11 @@
3134

3235
#include "iceberg/arrow/arrow_io_internal.h"
3336
#include "iceberg/arrow/arrow_status_internal.h"
37+
#include "iceberg/parquet/parquet_data_util_internal.h"
38+
#include "iceberg/schema.h"
3439
#include "iceberg/schema_internal.h"
40+
#include "iceberg/schema_util.h"
41+
#include "iceberg/type.h"
3542
#include "iceberg/util/macros.h"
3643

3744
namespace iceberg::parquet {
@@ -71,6 +78,64 @@ Result<std::optional<int32_t>> ParseCodecLevel(const WriterProperties& propertie
7178
return level;
7279
}
7380

81+
std::optional<std::shared_ptr<Type>> PruneUnknownType(const std::shared_ptr<Type>& type);
82+
83+
std::optional<SchemaField> PruneUnknownField(const SchemaField& field) {
84+
auto pruned_type = PruneUnknownType(field.type());
85+
if (!pruned_type.has_value()) {
86+
return std::nullopt;
87+
}
88+
return SchemaField(field.field_id(), field.name(), std::move(pruned_type.value()),
89+
field.optional(), field.doc());
90+
}
91+
92+
std::optional<std::shared_ptr<Type>> PruneUnknownType(const std::shared_ptr<Type>& type) {
93+
switch (type->type_id()) {
94+
case TypeId::kUnknown:
95+
return std::nullopt;
96+
case TypeId::kStruct: {
97+
const auto& struct_type = static_cast<const StructType&>(*type);
98+
std::vector<SchemaField> fields;
99+
for (const auto& field : struct_type.fields()) {
100+
if (auto pruned_field = PruneUnknownField(field)) {
101+
fields.emplace_back(std::move(pruned_field.value()));
102+
}
103+
}
104+
return std::make_shared<StructType>(std::move(fields));
105+
}
106+
case TypeId::kList: {
107+
const auto& list_type = static_cast<const ListType&>(*type);
108+
auto pruned_element = PruneUnknownField(list_type.element());
109+
if (!pruned_element.has_value()) {
110+
return std::nullopt;
111+
}
112+
return std::make_shared<ListType>(std::move(pruned_element.value()));
113+
}
114+
case TypeId::kMap: {
115+
const auto& map_type = static_cast<const MapType&>(*type);
116+
auto pruned_key = PruneUnknownField(map_type.key());
117+
auto pruned_value = PruneUnknownField(map_type.value());
118+
if (!pruned_key.has_value() || !pruned_value.has_value()) {
119+
return std::nullopt;
120+
}
121+
return std::make_shared<MapType>(std::move(pruned_key.value()),
122+
std::move(pruned_value.value()));
123+
}
124+
default:
125+
return type;
126+
}
127+
}
128+
129+
std::shared_ptr<Schema> PruneUnknownFields(const Schema& schema) {
130+
std::vector<SchemaField> fields;
131+
for (const auto& field : schema.fields()) {
132+
if (auto pruned_field = PruneUnknownField(field)) {
133+
fields.emplace_back(std::move(pruned_field.value()));
134+
}
135+
}
136+
return std::make_shared<Schema>(std::move(fields), schema.schema_id());
137+
}
138+
74139
} // namespace
75140

76141
class ParquetWriter::Impl {
@@ -87,8 +152,17 @@ class ParquetWriter::Impl {
87152
auto writer_properties = properties_builder.memory_pool(pool_)->build();
88153
auto arrow_writer_properties = ::parquet::default_arrow_writer_properties();
89154

155+
ArrowSchema input_c_schema;
156+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &input_c_schema));
157+
ICEBERG_ARROW_ASSIGN_OR_RETURN(input_arrow_schema_,
158+
::arrow::ImportSchema(&input_c_schema));
159+
160+
write_schema_ = PruneUnknownFields(*options.schema);
161+
ICEBERG_ASSIGN_OR_RAISE(write_projection_, Project(*write_schema_, *options.schema,
162+
/*prune_source=*/false));
163+
90164
ArrowSchema c_schema;
91-
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &c_schema));
165+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &c_schema));
92166
ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, ::arrow::ImportSchema(&c_schema));
93167

94168
std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor;
@@ -110,8 +184,12 @@ class ParquetWriter::Impl {
110184
}
111185

112186
Status Write(ArrowArray* array) {
113-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch,
114-
::arrow::ImportRecordBatch(array, arrow_schema_));
187+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
188+
auto input_batch, ::arrow::ImportRecordBatch(array, input_arrow_schema_));
189+
ICEBERG_ASSIGN_OR_RAISE(
190+
auto batch,
191+
ProjectRecordBatch(std::move(input_batch), arrow_schema_, *write_schema_,
192+
write_projection_, arrow::MetadataColumnContext{}, pool_));
115193

116194
ICEBERG_ARROW_RETURN_NOT_OK(writer_->WriteRecordBatch(*batch));
117195

@@ -155,6 +233,11 @@ class ParquetWriter::Impl {
155233
private:
156234
// TODO(gangwu): make memory pool configurable
157235
::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool();
236+
// The schema accepted from callers.
237+
std::shared_ptr<::arrow::Schema> input_arrow_schema_;
238+
// The Iceberg schema that has v3 unknown fields removed for physical writes.
239+
std::shared_ptr<Schema> write_schema_;
240+
SchemaProjection write_projection_;
158241
// Schema to write from the Parquet file.
159242
std::shared_ptr<::arrow::Schema> arrow_schema_;
160243
// The output stream to write Parquet file.

src/iceberg/schema.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ Result<std::unique_ptr<Schema>> Schema::Make(std::vector<SchemaField> fields,
4444
int32_t schema_id,
4545
std::vector<int32_t> identifier_field_ids) {
4646
auto schema = std::make_unique<Schema>(std::move(fields), schema_id);
47+
for (const auto& field : schema->fields()) {
48+
ICEBERG_RETURN_UNEXPECTED(field.Validate());
49+
}
4750

4851
if (!identifier_field_ids.empty()) {
4952
auto id_to_parent = IndexParents(*schema);
@@ -61,6 +64,9 @@ Result<std::unique_ptr<Schema>> Schema::Make(
6164
std::vector<SchemaField> fields, int32_t schema_id,
6265
const std::vector<std::string>& identifier_field_names) {
6366
auto schema = std::make_unique<Schema>(std::move(fields), schema_id);
67+
for (const auto& field : schema->fields()) {
68+
ICEBERG_RETURN_UNEXPECTED(field.Validate());
69+
}
6470

6571
std::vector<int32_t> fresh_identifier_ids;
6672
for (const auto& name : identifier_field_names) {
@@ -286,6 +292,7 @@ Status Schema::Validate(int32_t format_version) const {
286292
// Check each field's type and defaults
287293
for (const auto& [field_id, field_ref] : id_to_field.get()) {
288294
const auto& field = field_ref.get();
295+
ICEBERG_RETURN_UNEXPECTED(field.Validate());
289296

290297
// Check if the field's type requires a minimum format version
291298
if (auto it = TableMetadata::kMinFormatVersions.find(field.type()->type_id());

src/iceberg/schema_field.cc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,26 @@
2424

2525
#include "iceberg/type.h"
2626
#include "iceberg/util/formatter.h" // IWYU pragma: keep
27+
#include "iceberg/util/macros.h"
2728

2829
namespace iceberg {
2930

31+
namespace {
32+
33+
Status ValidateNestedFields(const Type& type) {
34+
if (!type.is_nested()) {
35+
return {};
36+
}
37+
38+
const auto& nested_type = static_cast<const NestedType&>(type);
39+
for (const auto& field : nested_type.fields()) {
40+
ICEBERG_RETURN_UNEXPECTED(field.Validate());
41+
}
42+
return {};
43+
}
44+
45+
} // namespace
46+
3047
SchemaField::SchemaField(int32_t field_id, std::string_view name,
3148
std::shared_ptr<Type> type, bool optional, std::string_view doc)
3249
: field_id_(field_id),
@@ -62,6 +79,10 @@ Status SchemaField::Validate() const {
6279
if (type_ == nullptr) [[unlikely]] {
6380
return InvalidSchema("SchemaField cannot have null type");
6481
}
82+
if (type_->type_id() == TypeId::kUnknown && !optional_) [[unlikely]] {
83+
return InvalidSchema("Unknown type field '{}' must be optional", name_);
84+
}
85+
ICEBERG_RETURN_UNEXPECTED(ValidateNestedFields(*type_));
6586
return {};
6687
}
6788

src/iceberg/table_metadata.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ struct ICEBERG_EXPORT TableMetadata {
7777
static constexpr int64_t kInitialSequenceNumber = 0;
7878
static constexpr int64_t kInitialRowId = 0;
7979

80-
static inline const std::unordered_map<TypeId, int8_t> kMinFormatVersions = {};
80+
static inline const std::unordered_map<TypeId, int8_t> kMinFormatVersions = {
81+
{TypeId::kUnknown, 3},
82+
};
8183

8284
/// An integer version number for the format
8385
int8_t format_version;

src/iceberg/test/parquet_test.cc

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,59 @@ TEST_F(ParquetReaderTest, ReadNestedUnknownProjection) {
508508
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
509509
}
510510

511+
TEST_F(ParquetReaderTest, WriteSkipsUnknownFields) {
512+
temp_parquet_file_ = "write_skips_unknown.parquet";
513+
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
514+
SchemaField::MakeRequired(1, "id", int32()),
515+
SchemaField::MakeOptional(2, "mystery", unknown()),
516+
SchemaField::MakeOptional(3, "profile",
517+
std::make_shared<StructType>(std::vector<SchemaField>{
518+
SchemaField::MakeOptional(4, "name", string()),
519+
SchemaField::MakeOptional(5, "mystery", unknown()),
520+
})),
521+
});
522+
523+
ArrowSchema arrow_c_schema;
524+
ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
525+
auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
526+
auto array = ::arrow::json::ArrayFromJSONString(arrow_type,
527+
R"([
528+
{"id": 1, "mystery": null, "profile": {"name": "Alice", "mystery": null}},
529+
{"id": 2, "mystery": null, "profile": {"name": "Bob", "mystery": null}}
530+
])")
531+
.ValueOrDie();
532+
533+
WriterProperties writer_properties;
534+
writer_properties.Set(WriterProperties::kParquetCompression,
535+
std::string("uncompressed"));
536+
ASSERT_THAT(WriteArray(array, {.path = temp_parquet_file_,
537+
.schema = schema,
538+
.io = file_io_,
539+
.properties = std::move(writer_properties)}),
540+
IsOk());
541+
542+
auto io = internal::checked_cast<arrow::ArrowFileSystemFileIO&>(*file_io_);
543+
auto input_stream = io.fs()->OpenInputFile(temp_parquet_file_).ValueOrDie();
544+
auto metadata = ::parquet::ReadMetaData(input_stream);
545+
ASSERT_EQ(metadata->schema()->num_columns(), 2);
546+
EXPECT_THAT(metadata->schema()->ToString(), ::testing::HasSubstr("id"));
547+
EXPECT_THAT(metadata->schema()->ToString(), ::testing::HasSubstr("name"));
548+
EXPECT_THAT(metadata->schema()->ToString(),
549+
::testing::Not(::testing::HasSubstr("mystery")));
550+
551+
ICEBERG_UNWRAP_OR_FAIL(
552+
auto reader,
553+
ReaderFactoryRegistry::Open(
554+
FileFormatType::kParquet,
555+
{.path = temp_parquet_file_, .io = file_io_, .projection = schema}));
556+
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader,
557+
R"([
558+
{"id": 1, "mystery": null, "profile": {"name": "Alice", "mystery": null}},
559+
{"id": 2, "mystery": null, "profile": {"name": "Bob", "mystery": null}}
560+
])"));
561+
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
562+
}
563+
511564
class ParquetReadWrite : public ::testing::Test {
512565
protected:
513566
static void SetUpTestSuite() { parquet::RegisterAll(); }

src/iceberg/test/schema_test.cc

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,53 @@ TEST(SchemaTest, Equality) {
9696
ASSERT_EQ(schema5, schema1);
9797
}
9898

99+
TEST(SchemaTest, UnknownTypeRequiresFormatV3) {
100+
iceberg::Schema schema({
101+
iceberg::SchemaField::MakeOptional(1, "mystery", iceberg::unknown()),
102+
iceberg::SchemaField::MakeOptional(
103+
2, "profile",
104+
iceberg::struct_({
105+
iceberg::SchemaField::MakeOptional(3, "nested", iceberg::unknown()),
106+
})),
107+
});
108+
109+
EXPECT_THAT(schema.Validate(2), iceberg::IsError(iceberg::ErrorKind::kInvalidSchema));
110+
EXPECT_THAT(schema.Validate(2),
111+
iceberg::HasErrorMessage("unknown is not supported until v3"));
112+
EXPECT_THAT(schema.Validate(3), iceberg::IsOk());
113+
}
114+
115+
TEST(SchemaTest, RequiredUnknownFieldIsInvalid) {
116+
iceberg::Schema direct_schema(
117+
{iceberg::SchemaField::MakeRequired(1, "mystery", iceberg::unknown())});
118+
EXPECT_THAT(direct_schema.Validate(3),
119+
iceberg::IsError(iceberg::ErrorKind::kInvalidSchema));
120+
EXPECT_THAT(direct_schema.Validate(3),
121+
iceberg::HasErrorMessage("Unknown type field 'mystery' must be optional"));
122+
123+
auto schema = iceberg::Schema::Make(
124+
{iceberg::SchemaField::MakeRequired(1, "mystery", iceberg::unknown())}, 100,
125+
std::vector<int32_t>{});
126+
127+
EXPECT_THAT(schema, iceberg::IsError(iceberg::ErrorKind::kInvalidSchema));
128+
EXPECT_THAT(schema,
129+
iceberg::HasErrorMessage("Unknown type field 'mystery' must be optional"));
130+
}
131+
132+
TEST(SchemaTest, RequiredNestedUnknownFieldIsInvalid) {
133+
auto schema = iceberg::Schema::Make(
134+
{iceberg::SchemaField::MakeOptional(
135+
1, "profile",
136+
iceberg::struct_({
137+
iceberg::SchemaField::MakeRequired(2, "mystery", iceberg::unknown()),
138+
}))},
139+
100, std::vector<int32_t>{});
140+
141+
EXPECT_THAT(schema, iceberg::IsError(iceberg::ErrorKind::kInvalidSchema));
142+
EXPECT_THAT(schema,
143+
iceberg::HasErrorMessage("Unknown type field 'mystery' must be optional"));
144+
}
145+
99146
TEST(SchemaTest, IdentifierFields) {
100147
using iceberg::ErrorKind;
101148
using iceberg::Schema;

src/iceberg/test/update_schema_test.cc

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,18 +1054,34 @@ TEST_F(UpdateSchemaTest, UpdateColumnFloatToDouble) {
10541054
EXPECT_EQ(*field_opt->get().type(), *float64());
10551055
}
10561056

1057-
TEST_F(UpdateSchemaTest, UpdateColumnUnknownToPrimitive) {
1057+
TEST_F(UpdateSchemaTest, UpdateColumnUnknownToPrimitiveFails) {
10581058
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
10591059
update->AddColumn("mystery", unknown(), "A null-only placeholder");
10601060
update->UpdateColumn("mystery", string());
10611061

1062-
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
1062+
auto result = update->Apply();
1063+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
1064+
EXPECT_THAT(result, HasErrorMessage("Cannot change column type"));
1065+
}
10631066

1064-
ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("mystery"));
1065-
ASSERT_TRUE(field_opt.has_value());
1066-
EXPECT_EQ(*field_opt->get().type(), *string());
1067-
EXPECT_TRUE(field_opt->get().optional());
1068-
EXPECT_EQ(field_opt->get().doc(), "A null-only placeholder");
1067+
TEST_F(UpdateSchemaTest, AddRequiredUnknownColumnFails) {
1068+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
1069+
update->AllowIncompatibleChanges().AddRequiredColumn("mystery", unknown());
1070+
1071+
auto result = update->Apply();
1072+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
1073+
EXPECT_THAT(result, HasErrorMessage("Unknown type field 'mystery' must be optional"));
1074+
}
1075+
1076+
TEST_F(UpdateSchemaTest, AddColumnWithRequiredNestedUnknownFails) {
1077+
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
1078+
update->AddColumn("profile", struct_({
1079+
SchemaField::MakeRequired(3, "mystery", unknown()),
1080+
}));
1081+
1082+
auto result = update->Apply();
1083+
EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
1084+
EXPECT_THAT(result, HasErrorMessage("Unknown type field 'mystery' must be optional"));
10691085
}
10701086

10711087
TEST_F(UpdateSchemaTest, UpdateColumnSameType) {

src/iceberg/update/update_schema.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,7 @@ UpdateSchema& UpdateSchema::AddColumnInternal(std::optional<std::string_view> pa
693693
auto new_field = std::make_shared<SchemaField>(new_id, std::string(name),
694694
std::move(type_with_fresh_ids),
695695
is_optional, std::string(doc));
696+
ICEBERG_BUILDER_RETURN_IF_ERROR(new_field->Validate());
696697

697698
updates_[new_id] = std::move(new_field);
698699
parent_to_added_ids_[parent_id].push_back(new_id);

src/iceberg/util/type_util.cc

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -426,10 +426,6 @@ bool IsPromotionAllowed(const std::shared_ptr<Type>& from_type,
426426
TypeId from_id = from_type->type_id();
427427
TypeId to_id = to_type->type_id();
428428

429-
if (from_id == TypeId::kUnknown) {
430-
return true;
431-
}
432-
433429
// int -> long
434430
if (from_id == TypeId::kInt && to_id == TypeId::kLong) {
435431
return true;

0 commit comments

Comments
 (0)