Skip to content

Commit a60eb24

Browse files
author
xuan.zhao
committed
feat(puffin): add format constants, utilities, and JSON serialization
1 parent 133742d commit a60eb24

File tree

14 files changed

+626
-27
lines changed

14 files changed

+626
-27
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ set(ICEBERG_SOURCES
6464
partition_spec.cc
6565
partition_summary.cc
6666
puffin/file_metadata.cc
67+
puffin/puffin_format.cc
68+
puffin/json_serde.cc
6769
row/arrow_array_wrapper.cc
6870
row/manifest_wrapper.cc
6971
row/partition_values.cc

src/iceberg/deletes/roaring_position_bitmap.cc

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -49,28 +49,6 @@ int64_t ToPosition(int32_t key, uint32_t pos32) {
4949
return (int64_t{key} << 32) | int64_t{pos32};
5050
}
5151

52-
void WriteLE64(char* buf, int64_t value) {
53-
auto le = ToLittleEndian(static_cast<uint64_t>(value));
54-
std::memcpy(buf, &le, sizeof(le));
55-
}
56-
57-
void WriteLE32(char* buf, int32_t value) {
58-
auto le = ToLittleEndian(static_cast<uint32_t>(value));
59-
std::memcpy(buf, &le, sizeof(le));
60-
}
61-
62-
int64_t ReadLE64(const char* buf) {
63-
uint64_t v;
64-
std::memcpy(&v, buf, sizeof(v));
65-
return static_cast<int64_t>(FromLittleEndian(v));
66-
}
67-
68-
int32_t ReadLE32(const char* buf) {
69-
uint32_t v;
70-
std::memcpy(&v, buf, sizeof(v));
71-
return static_cast<int32_t>(FromLittleEndian(v));
72-
}
73-
7452
Status ValidatePosition(int64_t pos) {
7553
if (pos < 0 || pos > RoaringPositionBitmap::kMaxPosition) {
7654
return InvalidArgument("Bitmap supports positions that are >= 0 and <= {}: {}",
@@ -189,12 +167,12 @@ Result<std::string> RoaringPositionBitmap::Serialize() const {
189167
char* buf = result.data();
190168

191169
// Write bitmap count (array length including empties)
192-
WriteLE64(buf, static_cast<int64_t>(impl_->bitmaps.size()));
170+
WriteLittleEndian(static_cast<int64_t>(impl_->bitmaps.size()), buf);
193171
buf += kBitmapCountSizeBytes;
194172

195173
// Write each bitmap with its key
196174
for (int32_t key = 0; std::cmp_less(key, impl_->bitmaps.size()); ++key) {
197-
WriteLE32(buf, key);
175+
WriteLittleEndian(key, buf);
198176
buf += kBitmapKeySizeBytes;
199177
size_t written = impl_->bitmaps[key].write(buf, /*portable=*/true);
200178
buf += written;
@@ -210,7 +188,7 @@ Result<RoaringPositionBitmap> RoaringPositionBitmap::Deserialize(std::string_vie
210188
ICEBERG_PRECHECK(remaining >= kBitmapCountSizeBytes,
211189
"Buffer too small for bitmap count: {} bytes", remaining);
212190

213-
int64_t bitmap_count = ReadLE64(buf);
191+
auto bitmap_count = ReadLittleEndian<int64_t>(buf);
214192
buf += kBitmapCountSizeBytes;
215193
remaining -= kBitmapCountSizeBytes;
216194

@@ -226,7 +204,7 @@ Result<RoaringPositionBitmap> RoaringPositionBitmap::Deserialize(std::string_vie
226204
ICEBERG_PRECHECK(remaining >= kBitmapKeySizeBytes,
227205
"Buffer too small for bitmap key: {} bytes", remaining);
228206

229-
int32_t key = ReadLE32(buf);
207+
auto key = ReadLittleEndian<int32_t>(buf);
230208
buf += kBitmapKeySizeBytes;
231209
remaining -= kBitmapKeySizeBytes;
232210

src/iceberg/meson.build

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ iceberg_sources = files(
8282
'partition_spec.cc',
8383
'partition_summary.cc',
8484
'puffin/file_metadata.cc',
85+
'puffin/json_serde.cc',
86+
'puffin/puffin_format.cc',
8587
'row/arrow_array_wrapper.cc',
8688
'row/manifest_wrapper.cc',
8789
'row/partition_values.cc',

src/iceberg/puffin/file_metadata.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include <cstdint>
2626
#include <optional>
27+
#include <ostream>
2728
#include <string>
2829
#include <string_view>
2930
#include <unordered_map>
@@ -107,6 +108,10 @@ struct ICEBERG_EXPORT BlobMetadata {
107108

108109
ICEBERG_EXPORT std::string ToString(const BlobMetadata& blob_metadata);
109110

111+
inline std::ostream& operator<<(std::ostream& os, const BlobMetadata& b) {
112+
return os << ToString(b);
113+
}
114+
110115
/// \brief Metadata about a Puffin file.
111116
struct ICEBERG_EXPORT FileMetadata {
112117
std::vector<BlobMetadata> blobs;
@@ -117,4 +122,8 @@ struct ICEBERG_EXPORT FileMetadata {
117122

118123
ICEBERG_EXPORT std::string ToString(const FileMetadata& file_metadata);
119124

125+
inline std::ostream& operator<<(std::ostream& os, const FileMetadata& m) {
126+
return os << ToString(m);
127+
}
128+
120129
} // namespace iceberg::puffin

src/iceberg/puffin/json_serde.cc

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <nlohmann/json.hpp>
21+
22+
#include "iceberg/puffin/json_serde_internal.h"
23+
#include "iceberg/util/json_util_internal.h"
24+
#include "iceberg/util/macros.h"
25+
26+
namespace iceberg::puffin {
27+
28+
namespace {
29+
constexpr std::string_view kBlobs = "blobs";
30+
constexpr std::string_view kProperties = "properties";
31+
constexpr std::string_view kType = "type";
32+
constexpr std::string_view kFields = "fields";
33+
constexpr std::string_view kSnapshotId = "snapshot-id";
34+
constexpr std::string_view kSequenceNumber = "sequence-number";
35+
constexpr std::string_view kOffset = "offset";
36+
constexpr std::string_view kLength = "length";
37+
constexpr std::string_view kCompressionCodec = "compression-codec";
38+
} // namespace
39+
40+
nlohmann::json ToJson(const BlobMetadata& blob_metadata) {
41+
nlohmann::json json;
42+
json[kType] = blob_metadata.type;
43+
json[kFields] = blob_metadata.input_fields;
44+
json[kSnapshotId] = blob_metadata.snapshot_id;
45+
json[kSequenceNumber] = blob_metadata.sequence_number;
46+
json[kOffset] = blob_metadata.offset;
47+
json[kLength] = blob_metadata.length;
48+
49+
SetOptionalStringField(json, kCompressionCodec, blob_metadata.compression_codec);
50+
SetContainerField(json, kProperties, blob_metadata.properties);
51+
52+
return json;
53+
}
54+
55+
Result<BlobMetadata> BlobMetadataFromJson(const nlohmann::json& json) {
56+
BlobMetadata blob_metadata;
57+
58+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.type, GetJsonValue<std::string>(json, kType));
59+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.input_fields,
60+
GetJsonValue<std::vector<int32_t>>(json, kFields));
61+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.snapshot_id,
62+
GetJsonValue<int64_t>(json, kSnapshotId));
63+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.sequence_number,
64+
GetJsonValue<int64_t>(json, kSequenceNumber));
65+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.offset, GetJsonValue<int64_t>(json, kOffset));
66+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.length, GetJsonValue<int64_t>(json, kLength));
67+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.compression_codec,
68+
GetJsonValueOrDefault<std::string>(json, kCompressionCodec));
69+
ICEBERG_ASSIGN_OR_RAISE(blob_metadata.properties,
70+
FromJsonMap<std::string>(json, kProperties));
71+
72+
return blob_metadata;
73+
}
74+
75+
nlohmann::json ToJson(const FileMetadata& file_metadata) {
76+
nlohmann::json json;
77+
78+
nlohmann::json blobs_json = nlohmann::json::array();
79+
for (const auto& blob : file_metadata.blobs) {
80+
blobs_json.push_back(ToJson(blob));
81+
}
82+
json[kBlobs] = std::move(blobs_json);
83+
84+
SetContainerField(json, kProperties, file_metadata.properties);
85+
86+
return json;
87+
}
88+
89+
Result<FileMetadata> FileMetadataFromJson(const nlohmann::json& json) {
90+
FileMetadata file_metadata;
91+
92+
ICEBERG_ASSIGN_OR_RAISE(auto blobs_json, GetJsonValue<nlohmann::json>(json, kBlobs));
93+
if (!blobs_json.is_array()) {
94+
return JsonParseError("Cannot parse blobs from non-array: {}",
95+
SafeDumpJson(blobs_json));
96+
}
97+
98+
for (const auto& blob_json : blobs_json) {
99+
ICEBERG_ASSIGN_OR_RAISE(auto blob, BlobMetadataFromJson(blob_json));
100+
file_metadata.blobs.push_back(std::move(blob));
101+
}
102+
103+
ICEBERG_ASSIGN_OR_RAISE(file_metadata.properties,
104+
FromJsonMap<std::string>(json, kProperties));
105+
106+
return file_metadata;
107+
}
108+
109+
std::string ToJsonString(const FileMetadata& file_metadata, bool pretty) {
110+
auto json = ToJson(file_metadata);
111+
return pretty ? json.dump(2) : json.dump();
112+
}
113+
114+
Result<FileMetadata> FileMetadataFromJsonString(std::string_view json_string) {
115+
if (json_string.empty()) {
116+
return JsonParseError("Cannot parse empty JSON string");
117+
}
118+
try {
119+
auto json = nlohmann::json::parse(json_string);
120+
return FileMetadataFromJson(json);
121+
} catch (const nlohmann::json::parse_error& e) {
122+
return JsonParseError("Failed to parse JSON: {}", e.what());
123+
}
124+
}
125+
126+
} // namespace iceberg::puffin
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/puffin/json_serde_internal.h
23+
/// JSON serialization/deserialization for Puffin file metadata.
24+
25+
#include <string>
26+
#include <string_view>
27+
28+
#include <nlohmann/json_fwd.hpp>
29+
30+
#include "iceberg/iceberg_export.h"
31+
#include "iceberg/puffin/file_metadata.h"
32+
#include "iceberg/result.h"
33+
34+
namespace iceberg::puffin {
35+
36+
/// \brief Serialize a BlobMetadata to JSON.
37+
ICEBERG_EXPORT nlohmann::json ToJson(const BlobMetadata& blob_metadata);
38+
39+
/// \brief Deserialize a BlobMetadata from JSON.
40+
ICEBERG_EXPORT Result<BlobMetadata> BlobMetadataFromJson(const nlohmann::json& json);
41+
42+
/// \brief Serialize a FileMetadata to JSON.
43+
ICEBERG_EXPORT nlohmann::json ToJson(const FileMetadata& file_metadata);
44+
45+
/// \brief Deserialize a FileMetadata from JSON.
46+
ICEBERG_EXPORT Result<FileMetadata> FileMetadataFromJson(const nlohmann::json& json);
47+
48+
/// \brief Serialize a FileMetadata to a JSON string.
49+
ICEBERG_EXPORT std::string ToJsonString(const FileMetadata& file_metadata,
50+
bool pretty = false);
51+
52+
/// \brief Deserialize a FileMetadata from a JSON string.
53+
ICEBERG_EXPORT Result<FileMetadata> FileMetadataFromJsonString(
54+
std::string_view json_string);
55+
56+
} // namespace iceberg::puffin

src/iceberg/puffin/meson.build

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
install_headers(['file_metadata.h'], subdir: 'iceberg/puffin')
18+
install_headers(['file_metadata.h', 'puffin_format.h'], subdir: 'iceberg/puffin')
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/puffin/puffin_format.h"
21+
22+
#include <cstddef>
23+
#include <utility>
24+
#include <vector>
25+
26+
#include "iceberg/util/endian.h"
27+
#include "iceberg/util/macros.h"
28+
29+
namespace iceberg::puffin {
30+
31+
namespace {
32+
33+
// Returns (byte_index, bit_index) for a given flag within the 4-byte flags field.
34+
constexpr std::pair<int, int> GetFlagPosition(PuffinFlag flag) {
35+
switch (flag) {
36+
case PuffinFlag::kFooterPayloadCompressed:
37+
return {0, 0};
38+
}
39+
std::unreachable();
40+
}
41+
42+
// TODO(zhaoxuan1994): Move compression logic to a unified codec interface.
43+
Result<std::vector<std::byte>> Compress(PuffinCompressionCodec codec,
44+
std::span<const std::byte> input) {
45+
switch (codec) {
46+
case PuffinCompressionCodec::kNone:
47+
return std::vector<std::byte>(input.begin(), input.end());
48+
case PuffinCompressionCodec::kLz4:
49+
return NotSupported("LZ4 compression is not yet supported");
50+
case PuffinCompressionCodec::kZstd:
51+
return NotSupported("Zstd compression is not yet supported");
52+
}
53+
std::unreachable();
54+
}
55+
56+
Result<std::vector<std::byte>> Decompress(PuffinCompressionCodec codec,
57+
std::span<const std::byte> input) {
58+
switch (codec) {
59+
case PuffinCompressionCodec::kNone:
60+
return std::vector<std::byte>(input.begin(), input.end());
61+
case PuffinCompressionCodec::kLz4:
62+
return NotSupported("LZ4 decompression is not yet supported");
63+
case PuffinCompressionCodec::kZstd:
64+
return NotSupported("Zstd decompression is not yet supported");
65+
}
66+
std::unreachable();
67+
}
68+
69+
} // namespace
70+
71+
bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag) {
72+
auto [byte_num, bit_num] = GetFlagPosition(flag);
73+
return (flags[byte_num] & (1 << bit_num)) != 0;
74+
}
75+
76+
void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag) {
77+
auto [byte_num, bit_num] = GetFlagPosition(flag);
78+
flags[byte_num] |= (1 << bit_num);
79+
}
80+
81+
Result<int32_t> ReadInt32LittleEndian(std::span<const uint8_t> data, int32_t offset) {
82+
ICEBERG_PRECHECK(offset >= 0 && static_cast<size_t>(offset) + 4 <= data.size(),
83+
"Offset {} out of bounds for buffer of size {}", offset, data.size());
84+
return ReadLittleEndian<int32_t>(data.data() + offset);
85+
}
86+
87+
} // namespace iceberg::puffin

0 commit comments

Comments
 (0)