Skip to content

Commit 39d4a91

Browse files
author
xuan.zhao
committed
feat(puffin): add format constants, utilities, and JSON serialization
1 parent 743c318 commit 39d4a91

File tree

10 files changed

+588
-1
lines changed

10 files changed

+588
-1
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ set(ICEBERG_SOURCES
6363
partition_spec.cc
6464
partition_summary.cc
6565
puffin/file_metadata.cc
66+
puffin/puffin_format.cc
67+
puffin/puffin_json_internal.cc
6668
row/arrow_array_wrapper.cc
6769
row/manifest_wrapper.cc
6870
row/partition_values.cc

src/iceberg/meson.build

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ iceberg_sources = files(
8181
'partition_spec.cc',
8282
'partition_summary.cc',
8383
'puffin/file_metadata.cc',
84+
'puffin/puffin_format.cc',
85+
'puffin/puffin_json_internal.cc',
8486
'row/arrow_array_wrapper.cc',
8587
'row/manifest_wrapper.cc',
8688
'row/partition_values.cc',

src/iceberg/puffin/meson.build

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
install_headers(['file_metadata.h'], subdir: 'iceberg/puffin')
18+
install_headers(['file_metadata.h', 'puffin_format.h'], subdir: 'iceberg/puffin')
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/puffin/puffin_format.h"
21+
22+
#include <cstring>
23+
#include <utility>
24+
25+
#include "iceberg/util/endian.h"
26+
#include "iceberg/util/macros.h"
27+
28+
namespace iceberg::puffin {
29+
30+
namespace {
31+
32+
constexpr std::pair<int, int> GetFlagPosition(PuffinFlag flag) {
33+
switch (flag) {
34+
case PuffinFlag::kFooterPayloadCompressed:
35+
return {0, 0};
36+
}
37+
std::unreachable();
38+
}
39+
40+
} // namespace
41+
42+
bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag) {
43+
auto [byte_num, bit_num] = GetFlagPosition(flag);
44+
return (flags[byte_num] & (1 << bit_num)) != 0;
45+
}
46+
47+
void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag) {
48+
auto [byte_num, bit_num] = GetFlagPosition(flag);
49+
flags[byte_num] |= (1 << bit_num);
50+
}
51+
52+
void WriteInt32LittleEndian(int32_t value, std::span<uint8_t, 4> output) {
53+
auto le_value = ToLittleEndian(value);
54+
std::memcpy(output.data(), &le_value, sizeof(le_value));
55+
}
56+
57+
int32_t ReadInt32LittleEndian(std::span<const uint8_t, 4> input) {
58+
int32_t value;
59+
std::memcpy(&value, input.data(), sizeof(value));
60+
return FromLittleEndian(value);
61+
}
62+
63+
int32_t ReadInt32LittleEndian(std::span<const uint8_t> data, int32_t offset) {
64+
ICEBERG_DCHECK(offset >= 0, "Offset must be non-negative");
65+
ICEBERG_DCHECK(static_cast<size_t>(offset) + 4 <= data.size(), "Offset out of bounds");
66+
return ReadInt32LittleEndian(std::span<const uint8_t, 4>(data.data() + offset, 4));
67+
}
68+
69+
Result<std::vector<uint8_t>> Compress(PuffinCompressionCodec codec,
70+
std::span<const uint8_t> input) {
71+
switch (codec) {
72+
case PuffinCompressionCodec::kNone:
73+
return std::vector<uint8_t>(input.begin(), input.end());
74+
case PuffinCompressionCodec::kLz4:
75+
return NotSupported("LZ4 compression is not yet supported");
76+
case PuffinCompressionCodec::kZstd:
77+
return NotSupported("Zstd compression is not yet supported");
78+
}
79+
std::unreachable();
80+
}
81+
82+
Result<std::vector<uint8_t>> Decompress(PuffinCompressionCodec codec,
83+
std::span<const uint8_t> input) {
84+
switch (codec) {
85+
case PuffinCompressionCodec::kNone:
86+
return std::vector<uint8_t>(input.begin(), input.end());
87+
case PuffinCompressionCodec::kLz4:
88+
return NotSupported("LZ4 decompression is not yet supported");
89+
case PuffinCompressionCodec::kZstd:
90+
return NotSupported("Zstd decompression is not yet supported");
91+
}
92+
std::unreachable();
93+
}
94+
95+
} // namespace iceberg::puffin

src/iceberg/puffin/puffin_format.h

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/puffin/puffin_format.h
23+
/// Puffin file format constants and utilities.
24+
25+
#include <array>
26+
#include <cstdint>
27+
#include <span>
28+
#include <vector>
29+
30+
#include "iceberg/iceberg_export.h"
31+
#include "iceberg/puffin/file_metadata.h"
32+
#include "iceberg/result.h"
33+
34+
namespace iceberg::puffin {
35+
36+
/// \brief Puffin file format constants.
37+
struct ICEBERG_EXPORT PuffinFormat {
38+
/// Magic bytes: "PFA1" (Puffin Fratercula arctica, version 1)
39+
static constexpr std::array<uint8_t, 4> kMagic = {0x50, 0x46, 0x41, 0x31};
40+
41+
static constexpr int32_t kMagicLength = 4;
42+
static constexpr int32_t kFooterStartMagicOffset = 0;
43+
static constexpr int32_t kFooterStartMagicLength = kMagicLength;
44+
static constexpr int32_t kFooterStructPayloadSizeOffset = 0;
45+
static constexpr int32_t kFooterStructFlagsOffset = kFooterStructPayloadSizeOffset + 4;
46+
static constexpr int32_t kFooterStructFlagsLength = 4;
47+
static constexpr int32_t kFooterStructMagicOffset =
48+
kFooterStructFlagsOffset + kFooterStructFlagsLength;
49+
50+
/// Total length of the footer struct: payload_size(4) + flags(4) + magic(4)
51+
static constexpr int32_t kFooterStructLength = kFooterStructMagicOffset + kMagicLength;
52+
53+
/// Default compression codec for footer payload.
54+
static constexpr PuffinCompressionCodec kFooterCompressionCodec =
55+
PuffinCompressionCodec::kLz4;
56+
};
57+
58+
/// \brief Footer flags for Puffin files.
59+
enum class PuffinFlag : uint8_t {
60+
/// Whether the footer payload is compressed.
61+
kFooterPayloadCompressed = 0,
62+
};
63+
64+
/// \brief Check if a flag is set in the flags bytes.
65+
ICEBERG_EXPORT bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag);
66+
67+
/// \brief Set a flag in the flags bytes.
68+
ICEBERG_EXPORT void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag);
69+
70+
/// \brief Write a 32-bit integer in little-endian format.
71+
ICEBERG_EXPORT void WriteInt32LittleEndian(int32_t value, std::span<uint8_t, 4> output);
72+
73+
/// \brief Read a 32-bit integer from a fixed-size span in little-endian format.
74+
ICEBERG_EXPORT int32_t ReadInt32LittleEndian(std::span<const uint8_t, 4> input);
75+
76+
/// \brief Read a 32-bit integer from a buffer at the given offset in little-endian
77+
/// format.
78+
ICEBERG_EXPORT int32_t ReadInt32LittleEndian(std::span<const uint8_t> data,
79+
int32_t offset);
80+
81+
/// \brief Compress data using the specified codec.
82+
ICEBERG_EXPORT Result<std::vector<uint8_t>> Compress(PuffinCompressionCodec codec,
83+
std::span<const uint8_t> input);
84+
85+
/// \brief Decompress data using the specified codec.
86+
ICEBERG_EXPORT Result<std::vector<uint8_t>> Decompress(PuffinCompressionCodec codec,
87+
std::span<const uint8_t> input);
88+
89+
} // namespace iceberg::puffin
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/puffin/puffin_json_internal.h"
21+
22+
#include <nlohmann/json.hpp>
23+
24+
#include "iceberg/util/json_util_internal.h"
25+
#include "iceberg/util/macros.h"
26+
27+
namespace iceberg::puffin {
28+
29+
namespace {
30+
constexpr std::string_view kBlobs = "blobs";
31+
constexpr std::string_view kProperties = "properties";
32+
constexpr std::string_view kType = "type";
33+
constexpr std::string_view kFields = "fields";
34+
constexpr std::string_view kSnapshotId = "snapshot-id";
35+
constexpr std::string_view kSequenceNumber = "sequence-number";
36+
constexpr std::string_view kOffset = "offset";
37+
constexpr std::string_view kLength = "length";
38+
constexpr std::string_view kCompressionCodec = "compression-codec";
39+
} // namespace
40+
41+
nlohmann::json ToJson(const BlobMetadata& blob_metadata) {
42+
nlohmann::json json;
43+
json[kType] = blob_metadata.type;
44+
json[kFields] = blob_metadata.input_fields;
45+
json[kSnapshotId] = blob_metadata.snapshot_id;
46+
json[kSequenceNumber] = blob_metadata.sequence_number;
47+
json[kOffset] = blob_metadata.offset;
48+
json[kLength] = blob_metadata.length;
49+
50+
SetOptionalStringField(json, kCompressionCodec, blob_metadata.compression_codec);
51+
SetContainerField(json, kProperties, blob_metadata.properties);
52+
53+
return json;
54+
}
55+
56+
Result<BlobMetadata> BlobMetadataFromJson(const nlohmann::json& json) {
57+
BlobMetadata blob_metadata;
58+
59+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.type, GetJsonValue<std::string>(json, kType));
60+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.input_fields,
61+
GetJsonValue<std::vector<int32_t>>(json, kFields));
62+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.snapshot_id,
63+
GetJsonValue<int64_t>(json, kSnapshotId));
64+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.sequence_number,
65+
GetJsonValue<int64_t>(json, kSequenceNumber));
66+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.offset, GetJsonValue<int64_t>(json, kOffset));
67+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.length, GetJsonValue<int64_t>(json, kLength));
68+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.compression_codec,
69+
GetJsonValueOrDefault<std::string>(json, kCompressionCodec));
70+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.properties,
71+
FromJsonMap<std::string>(json, kProperties));
72+
73+
return blob_metadata;
74+
}
75+
76+
nlohmann::json ToJson(const FileMetadata& file_metadata) {
77+
nlohmann::json json;
78+
79+
nlohmann::json blobs_json = nlohmann::json::array();
80+
for (const auto& blob : file_metadata.blobs) {
81+
blobs_json.push_back(ToJson(blob));
82+
}
83+
json[kBlobs] = std::move(blobs_json);
84+
85+
SetContainerField(json, kProperties, file_metadata.properties);
86+
87+
return json;
88+
}
89+
90+
Result<FileMetadata> FileMetadataFromJson(const nlohmann::json& json) {
91+
FileMetadata file_metadata;
92+
93+
ICEBERG_ASSIGN_OR_RAISE(auto blobs_json, GetJsonValue<nlohmann::json>(json, kBlobs));
94+
if (!blobs_json.is_array()) {
95+
return JsonParseError("Cannot parse blobs from non-array: {}",
96+
SafeDumpJson(blobs_json));
97+
}
98+
99+
for (const auto& blob_json : blobs_json) {
100+
ICEBERG_ASSIGN_OR_RAISE(auto blob, BlobMetadataFromJson(blob_json));
101+
file_metadata.blobs.push_back(std::move(blob));
102+
}
103+
104+
ICEBERG_ASSIGN_OR_RAISE(file_metadata.properties,
105+
FromJsonMap<std::string>(json, kProperties));
106+
107+
return file_metadata;
108+
}
109+
110+
std::string ToJsonString(const FileMetadata& file_metadata, bool pretty) {
111+
auto json = ToJson(file_metadata);
112+
return pretty ? json.dump(2) : json.dump();
113+
}
114+
115+
Result<FileMetadata> FileMetadataFromJsonString(const std::string& json_string) {
116+
if (json_string.empty()) {
117+
return JsonParseError("Cannot parse empty JSON string");
118+
}
119+
try {
120+
auto json = nlohmann::json::parse(json_string);
121+
return FileMetadataFromJson(json);
122+
} catch (const nlohmann::json::parse_error& e) {
123+
return JsonParseError("Failed to parse JSON: {}", e.what());
124+
}
125+
}
126+
127+
} // namespace iceberg::puffin
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/puffin/puffin_json_internal.h
23+
/// JSON serialization/deserialization for Puffin file metadata.
24+
25+
#include <string>
26+
27+
#include <nlohmann/json_fwd.hpp>
28+
29+
#include "iceberg/iceberg_export.h"
30+
#include "iceberg/puffin/file_metadata.h"
31+
#include "iceberg/result.h"
32+
33+
namespace iceberg::puffin {
34+
35+
/// \brief Serialize a BlobMetadata to JSON.
36+
ICEBERG_EXPORT nlohmann::json ToJson(const BlobMetadata& blob_metadata);
37+
38+
/// \brief Deserialize a BlobMetadata from JSON.
39+
ICEBERG_EXPORT Result<BlobMetadata> BlobMetadataFromJson(const nlohmann::json& json);
40+
41+
/// \brief Serialize a FileMetadata to JSON.
42+
ICEBERG_EXPORT nlohmann::json ToJson(const FileMetadata& file_metadata);
43+
44+
/// \brief Deserialize a FileMetadata from JSON.
45+
ICEBERG_EXPORT Result<FileMetadata> FileMetadataFromJson(const nlohmann::json& json);
46+
47+
/// \brief Serialize a FileMetadata to a JSON string.
48+
ICEBERG_EXPORT std::string ToJsonString(const FileMetadata& file_metadata,
49+
bool pretty = false);
50+
51+
/// \brief Deserialize a FileMetadata from a JSON string.
52+
ICEBERG_EXPORT Result<FileMetadata> FileMetadataFromJsonString(
53+
const std::string& json_string);
54+
55+
} // namespace iceberg::puffin

0 commit comments

Comments
 (0)