Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/iceberg/expression/literal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ std::strong_ordering CompareFloat(T lhs, T rhs) {
return lhs_is_negative <=> rhs_is_negative;
}

bool Literal::operator==(const iceberg::Literal& other) const {
Comment thread
dongxiao1198 marked this conversation as resolved.
Outdated
return (*this <=> other) == 0;
}

// Three-way comparison operator
std::partial_ordering Literal::operator<=>(const Literal& other) const {
// If types are different, comparison is unordered
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/expression/literal.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class ICEBERG_EXPORT Literal {
/// was not valid
Result<Literal> CastTo(const std::shared_ptr<PrimitiveType>& target_type) const;

bool operator==(const Literal& other) const;

/// \brief Compare two PrimitiveLiterals. Both literals must have the same type
/// and should not be AboveMax or BelowMin.
std::partial_ordering operator<=>(const Literal& other) const;
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/file_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum class ICEBERG_EXPORT FileFormatType {
kAvro,
kOrc,
kPuffin,
kUnknown = 99
Comment thread
dongxiao1198 marked this conversation as resolved.
Outdated
};

/// \brief Convert a FileFormatType to a string
Expand Down
8 changes: 8 additions & 0 deletions src/iceberg/manifest_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@

namespace iceberg {

bool ManifestEntry::operator==(const iceberg::ManifestEntry& other) const {
Comment thread
dongxiao1198 marked this conversation as resolved.
Outdated
return status == other.status && snapshot_id == other.snapshot_id &&
sequence_number == other.sequence_number &&
file_sequence_number == other.file_sequence_number &&
(data_file && other.data_file && *data_file == *other.data_file) ||
(!data_file && !other.data_file);
}

std::shared_ptr<StructType> DataFile::Type(std::shared_ptr<StructType> partition_type) {
return std::make_shared<StructType>(std::vector<SchemaField>{
kContent,
Expand Down
12 changes: 8 additions & 4 deletions src/iceberg/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ struct ICEBERG_EXPORT DataFile {
/// Field id: 134
/// Type of content stored by the data file: data, equality deletes, or position
/// deletes (all v1 files are data files)
Content content;
Content content = Content::kData;
/// Field id: 100
/// Full URI for the file with FS scheme
std::string file_path;
/// Field id: 101
/// File format type, avro, orc, parquet, or puffin
FileFormatType file_format;
FileFormatType file_format = FileFormatType::kUnknown;
Comment thread
dongxiao1198 marked this conversation as resolved.
Outdated
/// Field id: 102
/// Partition data tuple, schema based on the partition spec output using partition
/// field ids
Expand Down Expand Up @@ -146,7 +146,7 @@ struct ICEBERG_EXPORT DataFile {
std::optional<int32_t> sort_order_id;
/// This field is not included in spec, so it is not serialized into the manifest file.
/// It is just store in memory representation used in process.
int32_t partition_spec_id;
int32_t partition_spec_id = 0;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int32_t partition_spec_id = 0;
int32_t partition_spec_id = PartitionSpec::kInitialSpecId;

/// Field id: 142
/// The _row_id for the first row in the data file.
///
Expand Down Expand Up @@ -261,6 +261,8 @@ struct ICEBERG_EXPORT DataFile {
SchemaField::MakeOptional(145, "content_size_in_bytes", iceberg::int64(),
"The length of referenced content stored in the file");

bool operator==(const DataFile& other) const = default;

static std::shared_ptr<StructType> Type(std::shared_ptr<StructType> partition_type);
};

Expand All @@ -272,7 +274,7 @@ struct ICEBERG_EXPORT ManifestEntry {
/// Field id: 0
/// Used to track additions and deletions. Deletes are informational only and not used
/// in scans.
ManifestStatus status;
ManifestStatus status = ManifestStatus::kAdded;
/// Field id: 1
/// Snapshot id where the file was added, or deleted if status is 2. Inherited when
/// null.
Expand All @@ -297,6 +299,8 @@ struct ICEBERG_EXPORT ManifestEntry {
inline static const SchemaField kFileSequenceNumber =
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());

bool operator==(const ManifestEntry& other) const;

static std::shared_ptr<StructType> TypeFromPartitionType(
std::shared_ptr<StructType> partition_type);
static std::shared_ptr<StructType> TypeFromDataFileType(
Expand Down
20 changes: 12 additions & 8 deletions src/iceberg/manifest_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace iceberg {
struct ICEBERG_EXPORT PartitionFieldSummary {
/// Field id: 509
/// Whether the manifest contains at least one partition with a null value for the field
bool contains_null;
bool contains_null = false;
Comment thread
dongxiao1198 marked this conversation as resolved.
Outdated
/// Field id: 518
/// Whether the manifest contains at least one partition with a NaN value for the field
std::optional<bool> contains_nan;
Expand All @@ -64,6 +64,8 @@ struct ICEBERG_EXPORT PartitionFieldSummary {
inline static const SchemaField kUpperBound = SchemaField::MakeOptional(
511, "upper_bound", iceberg::binary(), "Partition upper bound for all files");

bool operator==(const PartitionFieldSummary& other) const = default;

static const StructType& Type();
};

Expand All @@ -83,26 +85,26 @@ struct ICEBERG_EXPORT ManifestFile {
std::string manifest_path;
/// Field id: 501
/// Length of the manifest file in bytes
int64_t manifest_length;
int64_t manifest_length = 0;
/// Field id: 502
/// ID of a partition spec used to write the manifest; must be listed in table metadata
/// partition-specs
int32_t partition_spec_id;
int32_t partition_spec_id = 0;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int32_t partition_spec_id = 0;
int32_t partition_spec_id = PartitionSpec::kInitialSpecId;

/// Field id: 517
/// The type of files tracked by the manifest, either data or delete files; 0 for all v1
/// manifests
Content content;
Content content = Content::kData;
/// Field id: 515
/// The sequence number when the manifest was added to the table; use 0 when reading v1
/// manifest lists
int64_t sequence_number;
int64_t sequence_number = 0;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int64_t sequence_number = 0;
int64_t sequence_number = TableMetadata::kInitialSequenceNumber;

/// Field id: 516
/// The minimum data sequence number of all live data or delete files in the manifest;
/// use 0 when reading v1 manifest lists
int64_t min_sequence_number;
int64_t min_sequence_number = 0;
Comment thread
dongxiao1198 marked this conversation as resolved.
Outdated
/// Field id: 503
/// ID of the snapshot where the manifest file was added
int64_t added_snapshot_id;
int64_t added_snapshot_id = 0;
Comment thread
dongxiao1198 marked this conversation as resolved.
Outdated
/// Field id: 504
/// Number of entries in the manifest that have status ADDED (1), when null this is
/// assumed to be non-zero
Expand Down Expand Up @@ -137,7 +139,7 @@ struct ICEBERG_EXPORT ManifestFile {
std::vector<uint8_t> key_metadata;
/// Field id: 520
/// The starting _row_id to assign to rows added by ADDED data files
int64_t first_row_id;
int64_t first_row_id = 0;
Comment thread
dongxiao1198 marked this conversation as resolved.
Outdated

/// \brief Checks if this manifest file contains entries with ADDED status.
bool has_added_files() const { return added_files_count.value_or(1) > 0; }
Expand Down Expand Up @@ -188,6 +190,8 @@ struct ICEBERG_EXPORT ManifestFile {
520, "first_row_id", iceberg::int64(),
"Starting row ID to assign to new rows in ADDED data files");

bool operator==(const ManifestFile& other) const = default;

static const StructType& Type();
};

Expand Down
111 changes: 43 additions & 68 deletions test/manifest_list_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,46 @@ class ManifestListReaderTest : public TempFileTestBase {
file_io_ = std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs_);
}

std::vector<ManifestFile> prepare_test_manifest_list() {
Comment thread
dongxiao1198 marked this conversation as resolved.
Outdated
std::vector<ManifestFile> manifest_files;
std::string test_dir_prefix = "/tmp/db/db/iceberg_test/metadata/";
std::vector<std::string> paths = {"2bccd69e-d642-4816-bba0-261cd9bd0d93-m0.avro",
"9b6ffacd-ef10-4abf-a89c-01c733696796-m0.avro",
"2541e6b5-4923-4bd5-886d-72c6f7228400-m0.avro",
"3118c801-d2e0-4df6-8c7a-7d4eaade32f8-m0.avro"};
std::vector<int64_t> file_size = {7433, 7431, 7433, 7431};
std::vector<int64_t> snapshot_id = {7412193043800610213, 5485972788975780755,
1679468743751242972, 1579605567338877265};
std::vector<std::vector<uint8_t>> bounds = {{'x', ';', 0x07, 0x00},
{'(', 0x19, 0x07, 0x00},
{0xd0, 0xd4, 0x06, 0x00},
{0xb8, 0xd4, 0x06, 0x00}};
for (int i = 0; i < 4; ++i) {
ManifestFile manifest_file;
manifest_file.manifest_path = test_dir_prefix + paths[i];
manifest_file.manifest_length = file_size[i];
manifest_file.partition_spec_id = 0;
manifest_file.content = ManifestFile::Content::kData;
manifest_file.sequence_number = 4 - i;
manifest_file.min_sequence_number = 4 - i;
manifest_file.added_snapshot_id = snapshot_id[i];
manifest_file.added_files_count = 1;
manifest_file.existing_files_count = 0;
manifest_file.deleted_files_count = 0;
manifest_file.added_rows_count = 1;
manifest_file.existing_rows_count = 0;
manifest_file.deleted_rows_count = 0;
PartitionFieldSummary partition;
partition.contains_null = false;
partition.contains_nan = false;
partition.lower_bound = bounds[i];
partition.upper_bound = bounds[i];
manifest_file.partitions.emplace_back(partition);
manifest_files.emplace_back(manifest_file);
}
return manifest_files;
}

std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
std::shared_ptr<FileIO> file_io_;
};
Expand All @@ -55,74 +95,9 @@ TEST_F(ManifestListReaderTest, BasicTest) {
auto read_result = manifest_reader->Files();
ASSERT_EQ(read_result.has_value(), true);
ASSERT_EQ(read_result.value().size(), 4);
std::string test_dir_prefix = "/tmp/db/db/iceberg_test/metadata/";
for (const auto& file : read_result.value()) {
auto manifest_path = file.manifest_path.substr(test_dir_prefix.size());
if (manifest_path == "2bccd69e-d642-4816-bba0-261cd9bd0d93-m0.avro") {
ASSERT_EQ(file.added_snapshot_id, 7412193043800610213);
ASSERT_EQ(file.manifest_length, 7433);
ASSERT_EQ(file.sequence_number, 4);
ASSERT_EQ(file.min_sequence_number, 4);
ASSERT_EQ(file.partitions.size(), 1);
const auto& partition = file.partitions[0];
ASSERT_EQ(partition.contains_null, false);
ASSERT_EQ(partition.contains_nan.value(), false);
ASSERT_EQ(partition.lower_bound.value(),
std::vector<uint8_t>({'x', ';', 0x07, 0x00}));
ASSERT_EQ(partition.upper_bound.value(),
std::vector<uint8_t>({'x', ';', 0x07, 0x00}));
} else if (manifest_path == "9b6ffacd-ef10-4abf-a89c-01c733696796-m0.avro") {
ASSERT_EQ(file.added_snapshot_id, 5485972788975780755);
ASSERT_EQ(file.manifest_length, 7431);
ASSERT_EQ(file.sequence_number, 3);
ASSERT_EQ(file.min_sequence_number, 3);
ASSERT_EQ(file.partitions.size(), 1);
const auto& partition = file.partitions[0];
ASSERT_EQ(partition.contains_null, false);
ASSERT_EQ(partition.contains_nan.value(), false);
ASSERT_EQ(partition.lower_bound.value(),
std::vector<uint8_t>({'(', 0x19, 0x07, 0x00}));
ASSERT_EQ(partition.upper_bound.value(),
std::vector<uint8_t>({'(', 0x19, 0x07, 0x00}));
} else if (manifest_path == "2541e6b5-4923-4bd5-886d-72c6f7228400-m0.avro") {
ASSERT_EQ(file.added_snapshot_id, 1679468743751242972);
ASSERT_EQ(file.manifest_length, 7433);
ASSERT_EQ(file.sequence_number, 2);
ASSERT_EQ(file.min_sequence_number, 2);
ASSERT_EQ(file.partitions.size(), 1);
const auto& partition = file.partitions[0];
ASSERT_EQ(partition.contains_null, false);
ASSERT_EQ(partition.contains_nan.value(), false);
ASSERT_EQ(partition.lower_bound.value(),
std::vector<uint8_t>({0xd0, 0xd4, 0x06, 0x00}));
ASSERT_EQ(partition.upper_bound.value(),
std::vector<uint8_t>({0xd0, 0xd4, 0x06, 0x00}));
} else if (manifest_path == "3118c801-d2e0-4df6-8c7a-7d4eaade32f8-m0.avro") {
ASSERT_EQ(file.added_snapshot_id, 1579605567338877265);
ASSERT_EQ(file.manifest_length, 7431);
ASSERT_EQ(file.sequence_number, 1);
ASSERT_EQ(file.min_sequence_number, 1);
ASSERT_EQ(file.partitions.size(), 1);
const auto& partition = file.partitions[0];
ASSERT_EQ(partition.contains_null, false);
ASSERT_EQ(partition.contains_nan.value(), false);
ASSERT_EQ(partition.lower_bound.value(),
std::vector<uint8_t>({0xb8, 0xd4, 0x06, 0x00}));
ASSERT_EQ(partition.upper_bound.value(),
std::vector<uint8_t>({0xb8, 0xd4, 0x06, 0x00}));
} else {
ASSERT_TRUE(false) << "Unexpected manifest file: " << manifest_path;
}
ASSERT_EQ(file.partition_spec_id, 0);
ASSERT_EQ(file.content, ManifestFile::Content::kData);
ASSERT_EQ(file.added_files_count, 1);
ASSERT_EQ(file.existing_files_count, 0);
ASSERT_EQ(file.deleted_files_count, 0);
ASSERT_EQ(file.added_rows_count, 1);
ASSERT_EQ(file.existing_rows_count, 0);
ASSERT_EQ(file.deleted_rows_count, 0);
ASSERT_EQ(file.key_metadata.empty(), true);
}

auto expected_manifest_list = prepare_test_manifest_list();
Comment thread
dongxiao1198 marked this conversation as resolved.
Outdated
ASSERT_EQ(read_result.value(), expected_manifest_list);
}

} // namespace iceberg
Loading