Skip to content

Commit f8c15ee

Browse files
author
xuan.zhao
committed
feat(puffin): add puffin file reader and writer
- PuffinWriter: in-memory writer that builds complete Puffin files - Add() writes blobs with optional compression - Finish() serializes footer with JSON metadata - Tracks BlobMetadata for all written blobs - PuffinReader: in-memory reader that parses Puffin files - ReadFileMetadata() parses footer and validates magic bytes - ReadBlob() reads and decompresses individual blobs - ReadAll() reads all blobs from metadata - Expose Compress/Decompress as public API in puffin_format.h - Register new sources in CMake and Meson build systems - Add comprehensive tests including Java binary compatibility
1 parent a9114ce commit f8c15ee

12 files changed

Lines changed: 896 additions & 15 deletions

src/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ set(ICEBERG_SOURCES
6868
partition_summary.cc
6969
puffin/file_metadata.cc
7070
puffin/puffin_format.cc
71+
puffin/puffin_reader.cc
72+
puffin/puffin_writer.cc
7173
puffin/json_serde.cc
7274
row/arrow_array_wrapper.cc
7375
row/manifest_wrapper.cc

src/iceberg/meson.build

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ iceberg_sources = files(
9191
'puffin/file_metadata.cc',
9292
'puffin/json_serde.cc',
9393
'puffin/puffin_format.cc',
94+
'puffin/puffin_reader.cc',
95+
'puffin/puffin_writer.cc',
9496
'row/arrow_array_wrapper.cc',
9597
'row/manifest_wrapper.cc',
9698
'row/partition_values.cc',

src/iceberg/puffin/meson.build

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616
# under the License.
1717

1818
install_headers(
19-
['file_metadata.h', 'puffin_format.h', 'type_fwd.h'],
19+
[
20+
'file_metadata.h',
21+
'puffin_format.h',
22+
'puffin_reader.h',
23+
'puffin_writer.h',
24+
'type_fwd.h',
25+
],
2026
subdir: 'iceberg/puffin',
2127
)

src/iceberg/puffin/puffin_format.cc

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,18 @@ constexpr std::pair<int, int> GetFlagPosition(PuffinFlag flag) {
3636
std::unreachable();
3737
}
3838

39+
} // namespace
40+
41+
bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag) {
42+
auto [byte_num, bit_num] = GetFlagPosition(flag);
43+
return (flags[byte_num] & (1 << bit_num)) != 0;
44+
}
45+
46+
void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag) {
47+
auto [byte_num, bit_num] = GetFlagPosition(flag);
48+
flags[byte_num] |= (1 << bit_num);
49+
}
50+
3951
// TODO(zhaoxuan1994): Move compression logic to a unified codec interface.
4052
Result<std::vector<std::byte>> Compress(PuffinCompressionCodec codec,
4153
std::span<const std::byte> input) {
@@ -63,16 +75,4 @@ Result<std::vector<std::byte>> Decompress(PuffinCompressionCodec codec,
6375
std::unreachable();
6476
}
6577

66-
} // namespace
67-
68-
bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag) {
69-
auto [byte_num, bit_num] = GetFlagPosition(flag);
70-
return (flags[byte_num] & (1 << bit_num)) != 0;
71-
}
72-
73-
void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag) {
74-
auto [byte_num, bit_num] = GetFlagPosition(flag);
75-
flags[byte_num] |= (1 << bit_num);
76-
}
77-
7878
} // namespace iceberg::puffin

src/iceberg/puffin/puffin_format.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
/// Puffin file format constants and utilities.
2424

2525
#include <array>
26+
#include <cstddef>
2627
#include <cstdint>
2728
#include <span>
29+
#include <vector>
2830

2931
#include "iceberg/iceberg_export.h"
3032
#include "iceberg/puffin/file_metadata.h"
@@ -66,4 +68,12 @@ ICEBERG_EXPORT bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag
6668
/// \brief Set a flag in the flags bytes.
6769
ICEBERG_EXPORT void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag);
6870

71+
/// \brief Compress data using the specified codec.
72+
ICEBERG_EXPORT Result<std::vector<std::byte>> Compress(PuffinCompressionCodec codec,
73+
std::span<const std::byte> input);
74+
75+
/// \brief Decompress data using the specified codec.
76+
ICEBERG_EXPORT Result<std::vector<std::byte>> Decompress(
77+
PuffinCompressionCodec codec, std::span<const std::byte> input);
78+
6979
} // namespace iceberg::puffin
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/puffin/puffin_reader.h"
21+
22+
#include <algorithm>
23+
#include <array>
24+
#include <cstring>
25+
#include <string_view>
26+
27+
#include "iceberg/puffin/json_serde_internal.h"
28+
#include "iceberg/puffin/puffin_format.h"
29+
#include "iceberg/util/endian.h"
30+
#include "iceberg/util/macros.h"
31+
32+
namespace iceberg::puffin {
33+
34+
namespace {
35+
36+
// Validate magic bytes at the given offset.
37+
Status CheckMagic(std::span<const std::byte> data, int64_t offset) {
38+
if (offset < 0 ||
39+
offset + PuffinFormat::kMagicLength > static_cast<int64_t>(data.size())) {
40+
return Invalid("Invalid file: cannot read magic at offset {}", offset);
41+
}
42+
auto* begin = reinterpret_cast<const uint8_t*>(data.data() + offset);
43+
if (!std::equal(PuffinFormat::kMagicV1.begin(), PuffinFormat::kMagicV1.end(), begin)) {
44+
return Invalid("Invalid file: expected magic at offset {}", offset);
45+
}
46+
return {};
47+
}
48+
49+
} // namespace
50+
51+
PuffinReader::PuffinReader(std::span<const std::byte> data) : data_(data) {}
52+
53+
Result<FileMetadata> PuffinReader::ReadFileMetadata() {
54+
auto file_size = static_cast<int64_t>(data_.size());
55+
56+
if (file_size < PuffinFormat::kFooterStructLength) {
57+
return Invalid("Invalid file: file length {} is less than minimal footer size {}",
58+
file_size, PuffinFormat::kFooterStructLength);
59+
}
60+
61+
// Read footer struct from end of file
62+
auto footer_struct_offset = file_size - PuffinFormat::kFooterStructLength;
63+
64+
// Validate footer end magic
65+
ICEBERG_RETURN_UNEXPECTED(
66+
CheckMagic(data_, footer_struct_offset + PuffinFormat::kFooterStructMagicOffset));
67+
68+
// Read payload size from footer struct
69+
auto payload_size = ReadLittleEndian<int32_t>(
70+
data_.data() + footer_struct_offset + PuffinFormat::kFooterStructPayloadSizeOffset);
71+
72+
if (payload_size < 0) {
73+
return Invalid("Invalid file: negative payload size {}", payload_size);
74+
}
75+
76+
// Calculate total footer size and validate
77+
int64_t footer_size = PuffinFormat::kFooterStartMagicLength +
78+
static_cast<int64_t>(payload_size) +
79+
PuffinFormat::kFooterStructLength;
80+
auto footer_offset = file_size - footer_size;
81+
if (footer_offset < 0) {
82+
return Invalid("Invalid file: footer size {} exceeds file size {}", footer_size,
83+
file_size);
84+
}
85+
86+
// Validate footer start magic
87+
ICEBERG_RETURN_UNEXPECTED(CheckMagic(data_, footer_offset));
88+
89+
// Check flags for footer compression
90+
std::array<uint8_t, 4> flags{};
91+
std::memcpy(
92+
flags.data(),
93+
data_.data() + footer_struct_offset + PuffinFormat::kFooterStructFlagsOffset, 4);
94+
95+
PuffinCompressionCodec footer_compression = PuffinCompressionCodec::kNone;
96+
if (IsFlagSet(flags, PuffinFlag::kFooterPayloadCompressed)) {
97+
footer_compression = PuffinFormat::kDefaultFooterCompressionCodec;
98+
}
99+
100+
// Extract footer payload
101+
auto payload_offset = footer_offset + PuffinFormat::kFooterStartMagicLength;
102+
std::span<const std::byte> payload_span(data_.data() + payload_offset, payload_size);
103+
ICEBERG_ASSIGN_OR_RAISE(auto payload_bytes,
104+
Decompress(footer_compression, payload_span));
105+
106+
// Parse JSON
107+
std::string_view json_str(reinterpret_cast<const char*>(payload_bytes.data()),
108+
payload_bytes.size());
109+
ICEBERG_ASSIGN_OR_RAISE(auto file_metadata, FileMetadataFromJsonString(json_str));
110+
111+
// Validate header magic
112+
ICEBERG_RETURN_UNEXPECTED(CheckMagic(data_, 0));
113+
114+
return file_metadata;
115+
}
116+
117+
Result<std::pair<BlobMetadata, std::vector<std::byte>>> PuffinReader::ReadBlob(
118+
const BlobMetadata& blob_metadata) {
119+
auto file_size = static_cast<int64_t>(data_.size());
120+
121+
if (blob_metadata.offset < 0 || blob_metadata.length < 0 ||
122+
blob_metadata.offset > file_size ||
123+
blob_metadata.length > file_size - blob_metadata.offset) {
124+
return Invalid("Invalid blob: offset {} + length {} exceeds file size {}",
125+
blob_metadata.offset, blob_metadata.length, file_size);
126+
}
127+
128+
std::span<const std::byte> raw_data(data_.data() + blob_metadata.offset,
129+
blob_metadata.length);
130+
131+
// Determine compression codec
132+
ICEBERG_ASSIGN_OR_RAISE(
133+
auto codec, PuffinCompressionCodecFromName(blob_metadata.compression_codec));
134+
ICEBERG_ASSIGN_OR_RAISE(auto decompressed, Decompress(codec, raw_data));
135+
136+
return std::pair{blob_metadata, std::move(decompressed)};
137+
}
138+
139+
Result<std::vector<std::pair<BlobMetadata, std::vector<std::byte>>>>
140+
PuffinReader::ReadAll(const std::vector<BlobMetadata>& blobs) {
141+
std::vector<std::pair<BlobMetadata, std::vector<std::byte>>> results;
142+
results.reserve(blobs.size());
143+
for (const auto& blob : blobs) {
144+
ICEBERG_ASSIGN_OR_RAISE(auto blob_pair, ReadBlob(blob));
145+
results.push_back(std::move(blob_pair));
146+
}
147+
return results;
148+
}
149+
150+
} // namespace iceberg::puffin

src/iceberg/puffin/puffin_reader.h

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/puffin/puffin_reader.h
23+
/// Puffin file reader.
24+
25+
#include <cstddef>
26+
#include <cstdint>
27+
#include <span>
28+
#include <utility>
29+
#include <vector>
30+
31+
#include "iceberg/iceberg_export.h"
32+
#include "iceberg/puffin/file_metadata.h"
33+
#include "iceberg/result.h"
34+
35+
namespace iceberg::puffin {
36+
37+
/// \brief Reader for Puffin files.
38+
///
39+
/// Parses a Puffin file from an in-memory buffer. Usage:
40+
/// PuffinReader reader(file_data);
41+
/// auto metadata = reader.ReadFileMetadata();
42+
/// auto blob = reader.ReadBlob(metadata.value().blobs[0]);
43+
class ICEBERG_EXPORT PuffinReader {
44+
public:
45+
/// \brief Construct a reader from file data.
46+
explicit PuffinReader(std::span<const std::byte> data);
47+
48+
/// \brief Read and return the file metadata from the footer.
49+
Result<FileMetadata> ReadFileMetadata();
50+
51+
/// \brief Read a specific blob's data by its metadata.
52+
/// \param blob_metadata The metadata describing the blob to read.
53+
/// \return A pair of (BlobMetadata, decompressed data), or an error.
54+
Result<std::pair<BlobMetadata, std::vector<std::byte>>> ReadBlob(
55+
const BlobMetadata& blob_metadata);
56+
57+
/// \brief Read all blobs described in the file metadata.
58+
/// \return A vector of (BlobMetadata, decompressed data) pairs, or an error.
59+
Result<std::vector<std::pair<BlobMetadata, std::vector<std::byte>>>> ReadAll(
60+
const std::vector<BlobMetadata>& blobs);
61+
62+
private:
63+
std::span<const std::byte> data_;
64+
};
65+
66+
} // namespace iceberg::puffin

0 commit comments

Comments
 (0)