Skip to content

Commit 51aa748

Browse files
authored
feat: write manifest avro metadata (#261)
1 parent 730cd5c commit 51aa748

9 files changed

Lines changed: 92 additions & 43 deletions

File tree

src/iceberg/avro/avro_writer.cc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,16 @@ class AvroWriter::Impl {
6868
ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
6969
CreateOutputStream(options, kDefaultBufferSize));
7070
arrow_output_stream_ = output_stream->arrow_output_stream();
71+
std::map<std::string, std::vector<uint8_t>> metadata;
72+
for (const auto& [key, value] : options.properties) {
73+
std::vector<uint8_t> vec;
74+
vec.reserve(value.size());
75+
vec.assign(value.begin(), value.end());
76+
metadata.emplace(key, std::move(vec));
77+
}
7178
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
72-
std::move(output_stream), *avro_schema_);
79+
std::move(output_stream), *avro_schema_, 16 * 1024 /*syncInterval*/,
80+
::avro::NULL_CODEC /*codec*/, metadata);
7381
datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
7482
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
7583
return {};

src/iceberg/json_internal.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,10 @@ nlohmann::json ToJson(const Schema& schema) {
309309
return json;
310310
}
311311

312+
Result<std::string> ToJsonString(const Schema& schema) {
313+
return ToJsonString(ToJson(schema));
314+
}
315+
312316
nlohmann::json ToJson(const SnapshotRef& ref) {
313317
nlohmann::json json;
314318
json[kSnapshotId] = ref.snapshot_id;
@@ -490,6 +494,10 @@ nlohmann::json ToJson(const PartitionSpec& partition_spec) {
490494
return json;
491495
}
492496

497+
Result<std::string> ToJsonString(const PartitionSpec& partition_spec) {
498+
return ToJsonString(ToJson(partition_spec));
499+
}
500+
493501
Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
494502
const nlohmann::json& json, bool allow_field_id_missing) {
495503
ICEBERG_ASSIGN_OR_RAISE(auto source_id, GetJsonValue<int32_t>(json, kSourceId));
@@ -785,6 +793,10 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) {
785793
return json;
786794
}
787795

796+
Result<std::string> ToJsonString(const TableMetadata& table_metadata) {
797+
return ToJsonString(ToJson(table_metadata));
798+
}
799+
788800
namespace {
789801

790802
/// \brief Parse the schemas from the JSON object.

src/iceberg/json_internal.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ ICEBERG_EXPORT Result<std::unique_ptr<SortOrder>> SortOrderFromJson(
8080
/// \return The JSON representation of the schema.
8181
ICEBERG_EXPORT nlohmann::json ToJson(const Schema& schema);
8282

83+
/// \brief Convert an Iceberg Schema to JSON.
84+
///
85+
/// \param[in] schema The Iceberg schema to convert.
86+
/// \return The JSON string of the schema.
87+
ICEBERG_EXPORT Result<std::string> ToJsonString(const Schema& schema);
88+
8389
/// \brief Convert JSON to an Iceberg Schema.
8490
///
8591
/// \param[in] json The JSON representation of the schema.
@@ -148,6 +154,18 @@ ICEBERG_EXPORT Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
148154
/// array.
149155
ICEBERG_EXPORT nlohmann::json ToJson(const PartitionSpec& partition_spec);
150156

157+
/// \brief Serializes a `PartitionSpec` object to JSON.
158+
///
159+
/// This function converts a `PartitionSpec` object into a JSON representation.
160+
/// The resulting JSON includes the spec ID and a list of `PartitionField` objects.
161+
/// Each `PartitionField` is serialized as described in the `ToJson(PartitionField)`
162+
/// function.
163+
///
164+
/// \param partition_spec The `PartitionSpec` object to be serialized.
165+
/// \return A JSON string of the `PartitionSpec` with its order ID and fields
166+
/// array.
167+
ICEBERG_EXPORT Result<std::string> ToJsonString(const PartitionSpec& partition_spec);
168+
151169
/// \brief Deserializes a JSON object into a `PartitionSpec` object.
152170
///
153171
/// This function parses the provided JSON and creates a `PartitionSpec` object.
@@ -246,6 +264,12 @@ ICEBERG_EXPORT Result<MetadataLogEntry> MetadataLogEntryFromJson(
246264
/// \return A JSON object representing the `TableMetadata`.
247265
ICEBERG_EXPORT nlohmann::json ToJson(const TableMetadata& table_metadata);
248266

267+
/// \brief Serializes a `TableMetadata` object to JSON.
268+
///
269+
/// \param table_metadata The `TableMetadata` object to be serialized.
270+
/// \return A JSON string of the `TableMetadata`.
271+
ICEBERG_EXPORT Result<std::string> ToJsonString(const TableMetadata& table_metadata);
272+
249273
/// \brief Deserializes a JSON object into a `TableMetadata` object.
250274
///
251275
/// \param json The JSON object representing a `TableMetadata`.

src/iceberg/manifest_adapter.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ class ICEBERG_EXPORT ManifestAdapter {
4444
Status StartAppending();
4545
Result<ArrowArray*> FinishAppending();
4646
int64_t size() const { return size_; }
47+
const std::unordered_map<std::string, std::string>& metadata() const {
48+
return metadata_;
49+
}
4750

4851
protected:
4952
ArrowArray array_;

src/iceberg/manifest_writer.cc

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,16 @@ Status ManifestWriter::Close() {
5353
return writer_->Close();
5454
}
5555

56-
Result<std::unique_ptr<Writer>> OpenFileWriter(std::string_view location,
57-
std::shared_ptr<Schema> schema,
58-
std::shared_ptr<FileIO> file_io) {
56+
Result<std::unique_ptr<Writer>> OpenFileWriter(
57+
std::string_view location, std::shared_ptr<Schema> schema,
58+
std::shared_ptr<FileIO> file_io,
59+
std::unordered_map<std::string, std::string> properties) {
5960
ICEBERG_ASSIGN_OR_RAISE(
60-
auto writer,
61-
WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path = std::string(location),
62-
.schema = std::move(schema),
63-
.io = std::move(file_io)}));
61+
auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro,
62+
{.path = std::string(location),
63+
.schema = std::move(schema),
64+
.io = std::move(file_io),
65+
.properties = std::move(properties)}));
6466
return writer;
6567
}
6668

@@ -73,9 +75,9 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
7375
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
7476

7577
auto schema = adapter->schema();
76-
ICEBERG_ASSIGN_OR_RAISE(
77-
auto writer,
78-
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io)));
78+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
79+
OpenFileWriter(manifest_location, std::move(schema),
80+
std::move(file_io), adapter->metadata()));
7981
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
8082
}
8183

@@ -88,9 +90,9 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
8890
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
8991

9092
auto schema = adapter->schema();
91-
ICEBERG_ASSIGN_OR_RAISE(
92-
auto writer,
93-
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io)));
93+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
94+
OpenFileWriter(manifest_location, std::move(schema),
95+
std::move(file_io), adapter->metadata()));
9496
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
9597
}
9698

@@ -104,9 +106,9 @@ Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
104106
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
105107

106108
auto schema = adapter->schema();
107-
ICEBERG_ASSIGN_OR_RAISE(
108-
auto writer,
109-
OpenFileWriter(manifest_location, std::move(schema), std::move(file_io)));
109+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
110+
OpenFileWriter(manifest_location, std::move(schema),
111+
std::move(file_io), adapter->metadata()));
110112
return std::make_unique<ManifestWriter>(std::move(writer), std::move(adapter));
111113
}
112114

@@ -142,9 +144,9 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV1Writer(
142144
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
143145

144146
auto schema = adapter->schema();
145-
ICEBERG_ASSIGN_OR_RAISE(
146-
auto writer,
147-
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io)));
147+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
148+
OpenFileWriter(manifest_list_location, std::move(schema),
149+
std::move(file_io), adapter->metadata()));
148150
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
149151
}
150152

@@ -158,9 +160,9 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV2Writer(
158160
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
159161

160162
auto schema = adapter->schema();
161-
ICEBERG_ASSIGN_OR_RAISE(
162-
auto writer,
163-
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io)));
163+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
164+
OpenFileWriter(manifest_list_location, std::move(schema),
165+
std::move(file_io), adapter->metadata()));
164166

165167
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
166168
}
@@ -175,9 +177,9 @@ Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV3Writer(
175177
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
176178

177179
auto schema = adapter->schema();
178-
ICEBERG_ASSIGN_OR_RAISE(
179-
auto writer,
180-
OpenFileWriter(manifest_list_location, std::move(schema), std::move(file_io)));
180+
ICEBERG_ASSIGN_OR_RAISE(auto writer,
181+
OpenFileWriter(manifest_list_location, std::move(schema),
182+
std::move(file_io), adapter->metadata()));
181183
return std::make_unique<ManifestListWriter>(std::move(writer), std::move(adapter));
182184
}
183185

src/iceberg/test/metadata_io_test.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ TEST_F(MetadataIOTest, ReadWriteMetadata) {
9090
TEST_F(MetadataIOTest, ReadWriteCompressedMetadata) {
9191
TableMetadata metadata = PrepareMetadata();
9292

93-
auto json = ToJson(metadata);
94-
auto ret = ToJsonString(json);
93+
auto ret = ToJsonString(metadata);
9594
ASSERT_TRUE(ret.has_value());
9695
auto json_string = ret.value();
9796

src/iceberg/v1_metadata.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "iceberg/v1_metadata.h"
2121

22+
#include "iceberg/json_internal.h"
2223
#include "iceberg/manifest_entry.h"
2324
#include "iceberg/manifest_list.h"
2425
#include "iceberg/schema.h"
@@ -47,15 +48,14 @@ Status ManifestEntryAdapterV1::Init() {
4748
DataFile::kSplitOffsets.field_id(),
4849
DataFile::kSortOrderId.field_id(),
4950
};
50-
// TODO(xiao.dong) schema to json
51-
metadata_["schema"] = "{}";
52-
// TODO(xiao.dong) partition spec to json
53-
metadata_["partition-spec"] = "{}";
51+
ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
52+
ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
5453
if (partition_spec_ != nullptr) {
54+
ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_));
5555
metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id());
5656
}
5757
metadata_["format-version"] = "1";
58-
return InitSchema(kManifestEntryFieldIds);
58+
return {};
5959
}
6060

6161
Status ManifestEntryAdapterV1::Append(const ManifestEntry& entry) {

src/iceberg/v2_metadata.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
#include "iceberg/v2_metadata.h"
2121

22+
#include "iceberg/json_internal.h"
2223
#include "iceberg/manifest_entry.h"
2324
#include "iceberg/manifest_list.h"
2425
#include "iceberg/schema.h"
26+
#include "iceberg/util/macros.h"
2527

2628
namespace iceberg {
2729

@@ -50,16 +52,15 @@ Status ManifestEntryAdapterV2::Init() {
5052
DataFile::kSortOrderId.field_id(),
5153
DataFile::kReferencedDataFile.field_id(),
5254
};
53-
// TODO(xiao.dong) schema to json
54-
metadata_["schema"] = "{}";
55-
// TODO(xiao.dong) partition spec to json
56-
metadata_["partition-spec"] = "{}";
55+
ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
56+
ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
5757
if (partition_spec_ != nullptr) {
58+
ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_));
5859
metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id());
5960
}
6061
metadata_["format-version"] = "2";
6162
metadata_["content"] = "data";
62-
return InitSchema(kManifestEntryFieldIds);
63+
return {};
6364
}
6465

6566
Status ManifestEntryAdapterV2::Append(const ManifestEntry& entry) {

src/iceberg/v3_metadata.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "iceberg/v3_metadata.h"
2121

22+
#include "iceberg/json_internal.h"
2223
#include "iceberg/manifest_entry.h"
2324
#include "iceberg/manifest_list.h"
2425
#include "iceberg/schema.h"
@@ -54,16 +55,15 @@ Status ManifestEntryAdapterV3::Init() {
5455
DataFile::kContentOffset.field_id(),
5556
DataFile::kContentSize.field_id(),
5657
};
57-
// TODO(xiao.dong) schema to json
58-
metadata_["schema"] = "{}";
59-
// TODO(xiao.dong) partition spec to json
60-
metadata_["partition-spec"] = "{}";
58+
ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
59+
ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
6160
if (partition_spec_ != nullptr) {
61+
ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], ToJsonString(*partition_spec_));
6262
metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id());
6363
}
6464
metadata_["format-version"] = "3";
6565
metadata_["content"] = "data";
66-
return InitSchema(kManifestEntryFieldIds);
66+
return {};
6767
}
6868

6969
Status ManifestEntryAdapterV3::Append(const ManifestEntry& entry) {

0 commit comments

Comments
 (0)