Skip to content

Commit efd5cbf

Browse files
author
xuan.zhao
committed
feat(puffin): add puffin file reader and writer
- PuffinWriter: in-memory writer that builds complete Puffin files - Add() writes blobs with optional compression - Finish() serializes footer with JSON metadata - Tracks BlobMetadata for all written blobs - PuffinReader: in-memory reader that parses Puffin files - ReadFileMetadata() parses footer and validates magic bytes - ReadBlob() reads and decompresses individual blobs - ReadAll() reads all blobs from metadata - Expose Compress/Decompress as public API in puffin_format.h - Register new sources in CMake and Meson build systems - Add comprehensive tests including Java binary compatibility
1 parent d690aaa commit efd5cbf

12 files changed

Lines changed: 1026 additions & 16 deletions

src/iceberg/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,9 @@ set(ICEBERG_DATA_SOURCES
170170
deletes/roaring_position_bitmap.cc
171171
puffin/file_metadata.cc
172172
puffin/json_serde.cc
173-
puffin/puffin_format.cc)
173+
puffin/puffin_format.cc
174+
puffin/puffin_reader.cc
175+
puffin/puffin_writer.cc)
174176

175177
set(ICEBERG_DATA_STATIC_BUILD_INTERFACE_LIBS)
176178
set(ICEBERG_DATA_SHARED_BUILD_INTERFACE_LIBS)

src/iceberg/meson.build

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ iceberg_data_sources = files(
152152
'puffin/file_metadata.cc',
153153
'puffin/json_serde.cc',
154154
'puffin/puffin_format.cc',
155+
'puffin/puffin_reader.cc',
156+
'puffin/puffin_writer.cc',
155157
)
156158

157159
# CRoaring does not export symbols, so on Windows it must

src/iceberg/puffin/meson.build

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@
1616
# under the License.
1717

1818
install_headers(
19-
['file_metadata.h', 'puffin_format.h', 'type_fwd.h'],
19+
[
20+
'file_metadata.h',
21+
'puffin_format.h',
22+
'puffin_reader.h',
23+
'puffin_writer.h',
24+
'type_fwd.h',
25+
],
2026
subdir: 'iceberg/puffin',
2127
)

src/iceberg/puffin/puffin_format.cc

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,18 @@ constexpr std::pair<int, int> GetFlagPosition(PuffinFlag flag) {
3636
std::unreachable();
3737
}
3838

39+
} // namespace
40+
41+
bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag) {
42+
auto [byte_num, bit_num] = GetFlagPosition(flag);
43+
return (flags[byte_num] & (1 << bit_num)) != 0;
44+
}
45+
46+
void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag) {
47+
auto [byte_num, bit_num] = GetFlagPosition(flag);
48+
flags[byte_num] |= (1 << bit_num);
49+
}
50+
3951
// TODO(zhaoxuan1994): Move compression logic to a unified codec interface.
4052
Result<std::vector<std::byte>> Compress(PuffinCompressionCodec codec,
4153
std::span<const std::byte> input) {
@@ -63,16 +75,4 @@ Result<std::vector<std::byte>> Decompress(PuffinCompressionCodec codec,
6375
std::unreachable();
6476
}
6577

66-
} // namespace
67-
68-
bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag flag) {
69-
auto [byte_num, bit_num] = GetFlagPosition(flag);
70-
return (flags[byte_num] & (1 << bit_num)) != 0;
71-
}
72-
73-
void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag) {
74-
auto [byte_num, bit_num] = GetFlagPosition(flag);
75-
flags[byte_num] |= (1 << bit_num);
76-
}
77-
7878
} // namespace iceberg::puffin

src/iceberg/puffin/puffin_format.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
/// Puffin file format constants and utilities.
2424

2525
#include <array>
26+
#include <cstddef>
2627
#include <cstdint>
2728
#include <span>
29+
#include <vector>
2830

2931
#include "iceberg/iceberg_data_export.h"
3032
#include "iceberg/puffin/file_metadata.h"
@@ -66,4 +68,12 @@ ICEBERG_DATA_EXPORT bool IsFlagSet(std::span<const uint8_t, 4> flags, PuffinFlag
6668
/// \brief Set a flag in the flags bytes.
6769
ICEBERG_DATA_EXPORT void SetFlag(std::span<uint8_t, 4> flags, PuffinFlag flag);
6870

71+
/// \brief Compress data using the specified codec.
72+
ICEBERG_DATA_EXPORT Result<std::vector<std::byte>> Compress(
73+
PuffinCompressionCodec codec, std::span<const std::byte> input);
74+
75+
/// \brief Decompress data using the specified codec.
76+
ICEBERG_DATA_EXPORT Result<std::vector<std::byte>> Decompress(
77+
PuffinCompressionCodec codec, std::span<const std::byte> input);
78+
6979
} // namespace iceberg::puffin
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/puffin/puffin_reader.h"
21+
22+
#include <algorithm>
23+
#include <array>
24+
#include <cstdint>
25+
#include <cstring>
26+
#include <string_view>
27+
28+
#include "iceberg/file_io.h"
29+
#include "iceberg/puffin/json_serde_internal.h"
30+
#include "iceberg/puffin/puffin_format.h"
31+
#include "iceberg/util/endian.h"
32+
#include "iceberg/util/macros.h"
33+
34+
namespace iceberg::puffin {
35+
36+
namespace {
37+
38+
// Validate magic bytes in a buffer at the given offset.
39+
Status CheckMagic(std::span<const std::byte> data, int64_t offset) {
40+
if (offset < 0 ||
41+
offset + PuffinFormat::kMagicLength > static_cast<int64_t>(data.size())) {
42+
return Invalid("Invalid file: cannot read magic at offset {}", offset);
43+
}
44+
auto* begin = reinterpret_cast<const uint8_t*>(data.data() + offset);
45+
if (!std::equal(PuffinFormat::kMagicV1.begin(), PuffinFormat::kMagicV1.end(), begin)) {
46+
return Invalid(
47+
"Invalid file: expected magic at offset {}, got [{:#04x}, {:#04x}, "
48+
"{:#04x}, {:#04x}]",
49+
offset, begin[0], begin[1], begin[2], begin[3]);
50+
}
51+
return {};
52+
}
53+
54+
// Validate that no unknown flag bits are set.
55+
Status CheckUnknownFlags(std::span<const uint8_t, 4> flags) {
56+
constexpr uint8_t kKnownBitsMask = 0x01;
57+
if ((flags[0] & ~kKnownBitsMask) != 0 || flags[1] != 0 || flags[2] != 0 ||
58+
flags[3] != 0) {
59+
return Invalid(
60+
"Invalid file: unknown footer flags set [{:#04x}, {:#04x}, {:#04x}, {:#04x}]",
61+
flags[0], flags[1], flags[2], flags[3]);
62+
}
63+
return {};
64+
}
65+
66+
} // namespace
67+
68+
PuffinReader::PuffinReader(std::span<const std::byte> data)
69+
: data_(data), file_size_(static_cast<int64_t>(data.size())) {}
70+
71+
PuffinReader::PuffinReader(std::unique_ptr<InputFile> input_file)
72+
: input_file_(std::move(input_file)) {}
73+
74+
PuffinReader::~PuffinReader() = default;
75+
76+
Result<std::vector<std::byte>> PuffinReader::ReadBytes(int64_t offset, int64_t length) {
77+
if (IsFileMode()) {
78+
if (!stream_) {
79+
ICEBERG_ASSIGN_OR_RAISE(stream_, input_file_->Open());
80+
}
81+
std::vector<std::byte> buf(length);
82+
ICEBERG_RETURN_UNEXPECTED(stream_->ReadFully(offset, buf));
83+
return buf;
84+
}
85+
// Memory mode
86+
if (offset < 0 || length < 0 || offset > file_size_ || length > file_size_ - offset) {
87+
return Invalid("Read out of bounds: offset {} + length {} exceeds file size {}",
88+
offset, length, file_size_);
89+
}
90+
return std::vector<std::byte>(data_.data() + offset, data_.data() + offset + length);
91+
}
92+
93+
Result<FileMetadata> PuffinReader::ReadFileMetadata() {
94+
// Get file size
95+
if (IsFileMode()) {
96+
ICEBERG_ASSIGN_OR_RAISE(file_size_, input_file_->Size());
97+
}
98+
99+
if (file_size_ < PuffinFormat::kFooterStructLength) {
100+
return Invalid("Invalid file: file length {} is less than minimal footer size {}",
101+
file_size_, PuffinFormat::kFooterStructLength);
102+
}
103+
104+
// Validate header magic
105+
ICEBERG_ASSIGN_OR_RAISE(auto header_bytes, ReadBytes(0, PuffinFormat::kMagicLength));
106+
ICEBERG_RETURN_UNEXPECTED(CheckMagic(header_bytes, 0));
107+
108+
// Read footer struct from end of file
109+
auto footer_struct_offset = file_size_ - PuffinFormat::kFooterStructLength;
110+
ICEBERG_ASSIGN_OR_RAISE(
111+
auto footer_struct,
112+
ReadBytes(footer_struct_offset, PuffinFormat::kFooterStructLength));
113+
114+
// Validate footer end magic
115+
ICEBERG_RETURN_UNEXPECTED(
116+
CheckMagic(footer_struct, PuffinFormat::kFooterStructMagicOffset));
117+
118+
// Read payload size
119+
auto payload_size = ReadLittleEndian<int32_t>(
120+
footer_struct.data() + PuffinFormat::kFooterStructPayloadSizeOffset);
121+
122+
if (payload_size < 0) {
123+
return Invalid("Invalid file: negative payload size {}", payload_size);
124+
}
125+
126+
// Calculate total footer size and validate
127+
int64_t footer_size = PuffinFormat::kFooterStartMagicLength +
128+
static_cast<int64_t>(payload_size) +
129+
PuffinFormat::kFooterStructLength;
130+
auto footer_offset = file_size_ - footer_size;
131+
if (footer_offset < 0) {
132+
return Invalid("Invalid file: footer size {} exceeds file size {}", footer_size,
133+
file_size_);
134+
}
135+
136+
// Validate footer start magic
137+
ICEBERG_ASSIGN_OR_RAISE(auto footer_start_magic,
138+
ReadBytes(footer_offset, PuffinFormat::kMagicLength));
139+
ICEBERG_RETURN_UNEXPECTED(CheckMagic(footer_start_magic, 0));
140+
141+
// Check flags
142+
std::array<uint8_t, 4> flags{};
143+
std::memcpy(flags.data(), footer_struct.data() + PuffinFormat::kFooterStructFlagsOffset,
144+
4);
145+
ICEBERG_RETURN_UNEXPECTED(CheckUnknownFlags(flags));
146+
147+
PuffinCompressionCodec footer_compression = PuffinCompressionCodec::kNone;
148+
if (IsFlagSet(flags, PuffinFlag::kFooterPayloadCompressed)) {
149+
footer_compression = PuffinFormat::kDefaultFooterCompressionCodec;
150+
}
151+
152+
// Read and decompress footer payload
153+
auto payload_offset = footer_offset + PuffinFormat::kFooterStartMagicLength;
154+
ICEBERG_ASSIGN_OR_RAISE(auto payload_bytes, ReadBytes(payload_offset, payload_size));
155+
ICEBERG_ASSIGN_OR_RAISE(auto decompressed,
156+
Decompress(footer_compression, payload_bytes));
157+
158+
// Parse JSON
159+
std::string_view json_str(reinterpret_cast<const char*>(decompressed.data()),
160+
decompressed.size());
161+
return FileMetadataFromJsonString(json_str);
162+
}
163+
164+
Result<std::pair<BlobMetadata, std::vector<std::byte>>> PuffinReader::ReadBlob(
165+
const BlobMetadata& blob_metadata) {
166+
if (blob_metadata.offset < 0 || blob_metadata.length < 0 ||
167+
blob_metadata.offset > file_size_ ||
168+
blob_metadata.length > file_size_ - blob_metadata.offset) {
169+
return Invalid("Invalid blob: offset {} + length {} exceeds file size {}",
170+
blob_metadata.offset, blob_metadata.length, file_size_);
171+
}
172+
173+
ICEBERG_ASSIGN_OR_RAISE(auto raw_data,
174+
ReadBytes(blob_metadata.offset, blob_metadata.length));
175+
176+
// Determine compression codec
177+
ICEBERG_ASSIGN_OR_RAISE(
178+
auto codec, PuffinCompressionCodecFromName(blob_metadata.compression_codec));
179+
ICEBERG_ASSIGN_OR_RAISE(auto decompressed, Decompress(codec, raw_data));
180+
181+
return std::pair{blob_metadata, std::move(decompressed)};
182+
}
183+
184+
Result<std::vector<std::pair<BlobMetadata, std::vector<std::byte>>>>
185+
PuffinReader::ReadAll(const std::vector<BlobMetadata>& blobs) {
186+
std::vector<std::pair<BlobMetadata, std::vector<std::byte>>> results;
187+
results.reserve(blobs.size());
188+
for (const auto& blob : blobs) {
189+
ICEBERG_ASSIGN_OR_RAISE(auto blob_pair, ReadBlob(blob));
190+
results.push_back(std::move(blob_pair));
191+
}
192+
return results;
193+
}
194+
195+
} // namespace iceberg::puffin

src/iceberg/puffin/puffin_reader.h

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/puffin/puffin_reader.h
23+
/// Puffin file reader.
24+
25+
#include <cstddef>
26+
#include <cstdint>
27+
#include <memory>
28+
#include <span>
29+
#include <utility>
30+
#include <vector>
31+
32+
#include "iceberg/iceberg_data_export.h"
33+
#include "iceberg/puffin/file_metadata.h"
34+
#include "iceberg/result.h"
35+
36+
namespace iceberg {
37+
class InputFile;
38+
class SeekableInputStream;
39+
} // namespace iceberg
40+
41+
namespace iceberg::puffin {
42+
43+
/// \brief Reader for Puffin files.
44+
///
45+
/// Supports two modes:
46+
/// - Memory mode: parses from an in-memory buffer.
47+
/// - File mode: reads from an InputFile with seek support (for DV use case).
48+
class ICEBERG_DATA_EXPORT PuffinReader {
49+
public:
50+
/// \brief Construct a memory-mode reader from file data.
51+
explicit PuffinReader(std::span<const std::byte> data);
52+
53+
/// \brief Construct a file-mode reader from an InputFile.
54+
explicit PuffinReader(std::unique_ptr<InputFile> input_file);
55+
56+
~PuffinReader();
57+
58+
/// \brief Read and return the file metadata from the footer.
59+
Result<FileMetadata> ReadFileMetadata();
60+
61+
/// \brief Read a specific blob's data by its metadata.
62+
/// \param blob_metadata The metadata describing the blob to read.
63+
/// \return A pair of (BlobMetadata, decompressed data), or an error.
64+
Result<std::pair<BlobMetadata, std::vector<std::byte>>> ReadBlob(
65+
const BlobMetadata& blob_metadata);
66+
67+
/// \brief Read all blobs described in the file metadata.
68+
/// \return A vector of (BlobMetadata, decompressed data) pairs, or an error.
69+
Result<std::vector<std::pair<BlobMetadata, std::vector<std::byte>>>> ReadAll(
70+
const std::vector<BlobMetadata>& blobs);
71+
72+
private:
73+
/// In-memory data for memory mode.
74+
std::span<const std::byte> data_;
75+
/// Input file for file mode.
76+
std::unique_ptr<InputFile> input_file_;
77+
/// Opened stream (lazily opened in file mode).
78+
std::unique_ptr<SeekableInputStream> stream_;
79+
/// Cached file size.
80+
int64_t file_size_ = 0;
81+
82+
bool IsFileMode() const { return input_file_ != nullptr; }
83+
Result<std::vector<std::byte>> ReadBytes(int64_t offset, int64_t length);
84+
};
85+
86+
} // namespace iceberg::puffin

0 commit comments

Comments
 (0)