Skip to content

Commit cca3ff5

Browse files
committed
simplify impl and align with Java impl
1 parent 35e82d4 commit cca3ff5

14 files changed

Lines changed: 86 additions & 701 deletions

src/iceberg/avro/avro_direct_encoder.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,16 @@ Status EncodeArrowToAvro(const ::avro::NodePtr& avro_node, ::avro::Encoder& enco
8080
return EncodeArrowToAvro(branches.value_node, encoder, type, array, row_index, ctx);
8181
}
8282

83-
if (is_null && avro_node->type() != ::avro::AVRO_NULL) {
83+
if (avro_node->type() == ::avro::AVRO_NULL) {
84+
encoder.encodeNull();
85+
return {};
86+
}
87+
88+
if (is_null) {
8489
return InvalidArgument("Null value in non-nullable field");
8590
}
8691

8792
switch (avro_node->type()) {
88-
case ::avro::AVRO_NULL:
89-
encoder.encodeNull();
90-
return {};
91-
9293
case ::avro::AVRO_BOOL: {
9394
const auto& bool_array =
9495
internal::checked_cast<const ::arrow::BooleanArray&>(array);

src/iceberg/avro/avro_writer.cc

Lines changed: 19 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@
2020
#include "iceberg/avro/avro_writer.h"
2121

2222
#include <memory>
23-
#include <optional>
24-
#include <utility>
25-
#include <vector>
2623

27-
#include <arrow/array.h>
2824
#include <arrow/array/builder_base.h>
2925
#include <arrow/c/bridge.h>
3026
#include <arrow/record_batch.h>
@@ -42,12 +38,8 @@
4238
#include "iceberg/avro/avro_schema_util_internal.h"
4339
#include "iceberg/avro/avro_stream_internal.h"
4440
#include "iceberg/metrics_config.h"
45-
#include "iceberg/parquet/parquet_data_util_internal.h"
4641
#include "iceberg/schema.h"
4742
#include "iceberg/schema_internal.h"
48-
#include "iceberg/schema_util.h"
49-
#include "iceberg/type.h"
50-
#include "iceberg/util/checked_cast.h"
5143
#include "iceberg/util/macros.h"
5244

5345
namespace iceberg::avro {
@@ -89,126 +81,6 @@ Result<std::optional<int32_t>> ParseCodecLevel(const WriterProperties& propertie
8981
return level;
9082
}
9183

92-
enum class FieldContext {
93-
kTopLevel,
94-
kStruct,
95-
kListElement,
96-
kMapKey,
97-
kMapValue,
98-
};
99-
100-
Result<std::optional<SchemaField>> PruneUnknownField(const SchemaField& field,
101-
FieldContext context) {
102-
if (field.type()->type_id() == TypeId::kUnknown) {
103-
ICEBERG_PRECHECK(context != FieldContext::kMapKey,
104-
"Cannot write map key '{}' of unknown type because it has no "
105-
"physical Avro representation",
106-
field.name());
107-
ICEBERG_PRECHECK(field.optional(), "Unknown type field '{}' must be optional",
108-
field.name());
109-
if (context == FieldContext::kListElement || context == FieldContext::kMapValue) {
110-
return field;
111-
}
112-
return std::nullopt;
113-
}
114-
115-
switch (field.type()->type_id()) {
116-
case TypeId::kStruct: {
117-
const auto& struct_type = internal::checked_cast<const StructType&>(*field.type());
118-
std::vector<SchemaField> pruned_fields;
119-
pruned_fields.reserve(struct_type.fields().size());
120-
bool changed = false;
121-
for (const auto& child : struct_type.fields()) {
122-
ICEBERG_ASSIGN_OR_RAISE(auto pruned_child,
123-
PruneUnknownField(child, FieldContext::kStruct));
124-
if (pruned_child.has_value()) {
125-
if (!(pruned_child.value() == child)) {
126-
changed = true;
127-
}
128-
pruned_fields.push_back(std::move(pruned_child.value()));
129-
} else {
130-
changed = true;
131-
}
132-
}
133-
134-
if (!changed) {
135-
return field;
136-
}
137-
138-
ICEBERG_PRECHECK(
139-
!pruned_fields.empty(),
140-
"Cannot write struct field '{}' because all child fields are unknown and "
141-
"would be omitted from Avro",
142-
field.name());
143-
144-
return SchemaField(field.field_id(), field.name(),
145-
std::make_shared<StructType>(std::move(pruned_fields)),
146-
field.optional(), field.doc());
147-
}
148-
case TypeId::kList: {
149-
const auto& list_type = internal::checked_cast<const ListType&>(*field.type());
150-
const auto& element = list_type.element();
151-
ICEBERG_ASSIGN_OR_RAISE(auto pruned_element,
152-
PruneUnknownField(element, FieldContext::kListElement));
153-
ICEBERG_PRECHECK(pruned_element.has_value(),
154-
"Cannot write list field '{}' because its element has no "
155-
"physical Avro representation",
156-
field.name());
157-
if (pruned_element.value() == element) {
158-
return field;
159-
}
160-
return SchemaField(field.field_id(), field.name(),
161-
std::make_shared<ListType>(std::move(pruned_element.value())),
162-
field.optional(), field.doc());
163-
}
164-
case TypeId::kMap: {
165-
const auto& map_type = internal::checked_cast<const MapType&>(*field.type());
166-
ICEBERG_ASSIGN_OR_RAISE(auto pruned_key,
167-
PruneUnknownField(map_type.key(), FieldContext::kMapKey));
168-
ICEBERG_ASSIGN_OR_RAISE(
169-
auto pruned_value,
170-
PruneUnknownField(map_type.value(), FieldContext::kMapValue));
171-
ICEBERG_PRECHECK(pruned_key.has_value(),
172-
"Cannot write map field '{}' because its key has no physical "
173-
"Avro representation",
174-
field.name());
175-
ICEBERG_PRECHECK(pruned_value.has_value(),
176-
"Cannot write map field '{}' because its value has no physical "
177-
"Avro representation",
178-
field.name());
179-
if (pruned_key.value() == map_type.key() &&
180-
pruned_value.value() == map_type.value()) {
181-
return field;
182-
}
183-
return SchemaField(field.field_id(), field.name(),
184-
std::make_shared<MapType>(std::move(pruned_key.value()),
185-
std::move(pruned_value.value())),
186-
field.optional(), field.doc());
187-
}
188-
default:
189-
return field;
190-
}
191-
}
192-
193-
Result<std::shared_ptr<Schema>> PhysicalWriteSchema(const Schema& schema) {
194-
std::vector<SchemaField> pruned_fields;
195-
pruned_fields.reserve(schema.fields().size());
196-
for (const auto& field : schema.fields()) {
197-
ICEBERG_ASSIGN_OR_RAISE(auto pruned_field,
198-
PruneUnknownField(field, FieldContext::kTopLevel));
199-
if (pruned_field.has_value()) {
200-
pruned_fields.push_back(std::move(pruned_field.value()));
201-
}
202-
}
203-
204-
ICEBERG_PRECHECK(
205-
!pruned_fields.empty(),
206-
"Cannot write schema because all fields are unknown and would be omitted from "
207-
"Avro");
208-
209-
return std::make_shared<Schema>(std::move(pruned_fields), schema.schema_id());
210-
}
211-
21284
// Abstract base class for Avro write backends.
21385
class AvroWriteBackend {
21486
public:
@@ -306,14 +178,17 @@ class GenericDatumBackend : public AvroWriteBackend {
306178

307179
class AvroWriter::Impl {
308180
public:
181+
~Impl() {
182+
if (arrow_schema_.release != nullptr) {
183+
ArrowSchemaRelease(&arrow_schema_);
184+
}
185+
}
186+
309187
Status Open(const WriterOptions& options) {
310-
schema_ = options.schema;
311-
ICEBERG_ASSIGN_OR_RAISE(physical_schema_, PhysicalWriteSchema(*schema_));
312-
ICEBERG_ASSIGN_OR_RAISE(projection_, iceberg::Project(*physical_schema_, *schema_,
313-
/*prune_source=*/false));
188+
write_schema_ = options.schema;
314189

315190
::avro::NodePtr root;
316-
ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*physical_schema_, &root));
191+
ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root));
317192
if (const auto& schema_name =
318193
options.properties.Get(WriterProperties::kAvroSchemaName);
319194
!schema_name.empty()) {
@@ -352,43 +227,19 @@ class AvroWriter::Impl {
352227
options.properties.Get(WriterProperties::kAvroSyncInterval), codec,
353228
compression_level, metadata));
354229

355-
ArrowSchema input_arrow_c_schema;
356-
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*schema_, &input_arrow_c_schema));
357-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input_type,
358-
::arrow::ImportType(&input_arrow_c_schema));
359-
input_arrow_type_ = internal::checked_pointer_cast<::arrow::StructType>(input_type);
360-
input_arrow_schema_ = ::arrow::schema(input_arrow_type_->fields());
361-
362-
ArrowSchema physical_arrow_c_schema;
363-
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*physical_schema_, &physical_arrow_c_schema));
364-
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto physical_type,
365-
::arrow::ImportType(&physical_arrow_c_schema));
366-
write_arrow_type_ =
367-
internal::checked_pointer_cast<::arrow::StructType>(physical_type);
368-
write_arrow_schema_ = ::arrow::schema(write_arrow_type_->fields());
230+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
369231
return {};
370232
}
371233

372234
Status Write(ArrowArray* data) {
373235
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result,
374-
::arrow::ImportArray(data, input_arrow_type_));
375-
auto input_struct_array =
376-
internal::checked_pointer_cast<::arrow::StructArray>(result);
377-
auto batch = ::arrow::RecordBatch::Make(input_arrow_schema_, result->length(),
378-
input_struct_array->fields());
379-
ICEBERG_ASSIGN_OR_RAISE(
380-
batch, iceberg::parquet::ProjectRecordBatch(
381-
std::move(batch), write_arrow_schema_, *physical_schema_, projection_,
382-
arrow::MetadataColumnContext{}, ::arrow::default_memory_pool()));
383-
384-
auto write_array = std::make_shared<::arrow::StructArray>(
385-
write_arrow_type_, batch->num_rows(), batch->columns());
236+
::arrow::ImportArray(data, &arrow_schema_));
386237

387-
for (int64_t i = 0; i < write_array->length(); i++) {
388-
ICEBERG_RETURN_UNEXPECTED(backend_->WriteRow(*physical_schema_, *write_array, i));
238+
for (int64_t i = 0; i < result->length(); i++) {
239+
ICEBERG_RETURN_UNEXPECTED(backend_->WriteRow(*write_schema_, *result, i));
389240
}
390241

391-
num_records_ += write_array->length();
242+
num_records_ += result->length();
392243
return {};
393244
}
394245

@@ -416,28 +267,19 @@ class AvroWriter::Impl {
416267
if (!Closed()) {
417268
return Invalid("AvroWriter is not closed");
418269
}
419-
return AvroMetrics::GetMetrics(*schema_, num_records_, *MetricsConfig::Default());
270+
return AvroMetrics::GetMetrics(*write_schema_, num_records_,
271+
*MetricsConfig::Default());
420272
}
421273

422274
private:
423-
// Schema supplied by the caller.
424-
std::shared_ptr<::iceberg::Schema> schema_;
425-
// Schema used to write physical Avro fields after pruning unknown fields.
426-
std::shared_ptr<::iceberg::Schema> physical_schema_;
427-
// Arrow type used to import caller-provided ArrowArray data.
428-
std::shared_ptr<::arrow::StructType> input_arrow_type_;
429-
// Arrow schema used to project caller-provided data.
430-
std::shared_ptr<::arrow::Schema> input_arrow_schema_;
431-
// Arrow type used by the Avro writer backends.
432-
std::shared_ptr<::arrow::StructType> write_arrow_type_;
433-
// Arrow schema used to write physical Avro fields.
434-
std::shared_ptr<::arrow::Schema> write_arrow_schema_;
435-
// Projection from the logical Iceberg schema to the physical write schema.
436-
SchemaProjection projection_;
275+
// The schema to write.
276+
std::shared_ptr<::iceberg::Schema> write_schema_;
437277
// The avro schema to write.
438278
std::shared_ptr<::avro::ValidSchema> avro_schema_;
439279
// Arrow output stream of the Avro file to write
440280
std::shared_ptr<::arrow::io::OutputStream> arrow_output_stream_;
281+
// Arrow schema to write data.
282+
ArrowSchema arrow_schema_;
441283
// Total length of the written Avro file.
442284
int64_t total_bytes_ = 0;
443285
// Number of records written.

src/iceberg/json_serde.cc

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -443,22 +443,12 @@ Result<std::unique_ptr<Type>> StructTypeFromJson(const nlohmann::json& json) {
443443
return std::make_unique<StructType>(std::move(fields));
444444
}
445445

446-
Status ValidateUnknownFieldOptional(const Type& type, bool optional,
447-
std::string_view field_name) {
448-
if (type.type_id() == TypeId::kUnknown && !optional) {
449-
return JsonParseError("Unknown type field '{}' must be optional", field_name);
450-
}
451-
return {};
452-
}
453-
454446
Result<std::unique_ptr<Type>> ListTypeFromJson(const nlohmann::json& json) {
455447
ICEBERG_ASSIGN_OR_RAISE(auto element_type, TypeFromJson(json[kElement]));
456448
ICEBERG_ASSIGN_OR_RAISE(auto element_id, GetJsonValue<int32_t>(json, kElementId));
457449
ICEBERG_ASSIGN_OR_RAISE(auto element_required,
458450
GetJsonValue<bool>(json, kElementRequired));
459451

460-
ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldOptional(*element_type, !element_required,
461-
ListType::kElementName));
462452
return std::make_unique<ListType>(
463453
SchemaField(element_id, std::string(ListType::kElementName),
464454
std::move(element_type), !element_required));
@@ -474,11 +464,6 @@ Result<std::unique_ptr<Type>> MapTypeFromJson(const nlohmann::json& json) {
474464
ICEBERG_ASSIGN_OR_RAISE(auto value_id, GetJsonValue<int32_t>(json, kValueId));
475465
ICEBERG_ASSIGN_OR_RAISE(auto value_required, GetJsonValue<bool>(json, kValueRequired));
476466

477-
if (key_type->type_id() == TypeId::kUnknown) {
478-
return JsonParseError("Map 'key' cannot be unknown type");
479-
}
480-
ICEBERG_RETURN_UNEXPECTED(
481-
ValidateUnknownFieldOptional(*value_type, !value_required, MapType::kValueName));
482467
SchemaField key_field(key_id, std::string(MapType::kKeyName), std::move(key_type),
483468
/*optional=*/false);
484469
SchemaField value_field(value_id, std::string(MapType::kValueName),
@@ -567,7 +552,6 @@ Result<std::unique_ptr<SchemaField>> FieldFromJson(const nlohmann::json& json) {
567552
ICEBERG_ASSIGN_OR_RAISE(auto required, GetJsonValue<bool>(json, kRequired));
568553
ICEBERG_ASSIGN_OR_RAISE(auto doc, GetJsonValueOrDefault<std::string>(json, kDoc));
569554

570-
ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldOptional(*type, !required, name));
571555
return std::make_unique<SchemaField>(field_id, std::move(name), std::move(type),
572556
!required, doc);
573557
}

src/iceberg/parquet/parquet_schema_util.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ std::optional<int32_t> FirstColumnIndex(
8787
return std::nullopt;
8888
}
8989

90-
void SelectShapeColumnIfNeeded(
90+
// Pick a physical column as an anchor when all children are null-projected,
91+
// so that Parquet readers can still locate row boundaries.
92+
void SelectAnchorColumnIfEmpty(
9193
FieldProjection* projection,
9294
const std::vector<::parquet::arrow::SchemaField>& parquet_fields) {
9395
if (HasSelectedColumn(*projection)) {
@@ -343,7 +345,7 @@ Result<FieldProjection> ProjectStruct(
343345
result.children.emplace_back(std::move(child_projection));
344346
}
345347

346-
SelectShapeColumnIfNeeded(&result, parquet_fields);
348+
SelectAnchorColumnIfEmpty(&result, parquet_fields);
347349
PruneFieldProjection(result);
348350
return result;
349351
}
@@ -373,7 +375,7 @@ Result<FieldProjection> ProjectList(
373375

374376
FieldProjection result;
375377
result.children.emplace_back(std::move(element_projection));
376-
SelectShapeColumnIfNeeded(&result, parquet_fields);
378+
SelectAnchorColumnIfEmpty(&result, parquet_fields);
377379
return result;
378380
}
379381

@@ -422,7 +424,7 @@ Result<FieldProjection> ProjectMap(
422424
result.children.emplace_back(std::move(sub_projection));
423425
}
424426

425-
SelectShapeColumnIfNeeded(&result, parquet_fields);
427+
SelectAnchorColumnIfEmpty(&result, parquet_fields);
426428
return result;
427429
}
428430

0 commit comments

Comments
 (0)