Skip to content

Commit 7cfc0e9

Browse files
committed
fix: review comments
1 parent 141eadc commit 7cfc0e9

File tree

13 files changed

+225
-182
lines changed

13 files changed

+225
-182
lines changed

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ set(ICEBERG_SOURCES
4949
manifest/manifest_group.cc
5050
manifest/manifest_list.cc
5151
manifest/manifest_reader.cc
52+
manifest/manifest_util_internal.cc
5253
manifest/manifest_writer.cc
5354
manifest/rolling_manifest_writer.cc
5455
manifest/v1_metadata.cc

src/iceberg/constants.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ namespace iceberg {
3232

3333
constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
3434
constexpr int64_t kInvalidSnapshotId = -1;
35+
constexpr int64_t kInvalidSequenceNumber = -1;
3536
/// \brief Stand-in for the current sequence number that will be assigned when the commit
3637
/// is successful. This is replaced when writing a manifest list by the ManifestFile
3738
/// adapter.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/manifest/manifest_util_internal.h"
21+
22+
#include <memory>
23+
#include <optional>
24+
25+
#include "iceberg/inheritable_metadata.h"
26+
#include "iceberg/manifest/manifest_entry.h"
27+
#include "iceberg/manifest/manifest_reader.h"
28+
#include "iceberg/manifest/manifest_writer.h"
29+
#include "iceberg/result.h"
30+
#include "iceberg/schema.h"
31+
#include "iceberg/snapshot.h"
32+
#include "iceberg/util/macros.h"
33+
34+
namespace iceberg {
35+
36+
Result<ManifestFile> CopyAppendManifest(
37+
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
38+
std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec,
39+
int64_t snapshot_id, const std::string& output_path, int8_t format_version,
40+
SnapshotSummaryBuilder* summary_builder) {
41+
ICEBERG_ASSIGN_OR_RAISE(auto reader,
42+
ManifestReader::Make(manifest, file_io, schema, spec));
43+
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
44+
45+
// use metadata that will add the current snapshot's ID for the rewrite
46+
ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata,
47+
InheritableMetadataFactory::ForCopy(snapshot_id));
48+
49+
// do not produce row IDs for the copy
50+
ICEBERG_ASSIGN_OR_RAISE(
51+
auto writer,
52+
ManifestWriter::MakeWriter(format_version, snapshot_id, output_path, file_io, spec,
53+
schema, ManifestContent::kData));
54+
55+
// Write all entries as added entries with the new snapshot ID
56+
for (auto& entry : entries) {
57+
ICEBERG_PRECHECK(entry.status == ManifestStatus::kAdded,
58+
"Manifest to copy must only contain added entries");
59+
60+
ICEBERG_RETURN_UNEXPECTED(inheritable_metadata->Apply(entry));
61+
62+
if (summary_builder != nullptr && entry.data_file != nullptr) {
63+
summary_builder->AddedFile(*spec, *entry.data_file);
64+
}
65+
66+
ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
67+
}
68+
69+
ICEBERG_RETURN_UNEXPECTED(writer->Close());
70+
ICEBERG_ASSIGN_OR_RAISE(auto new_manifest, writer->ToManifestFile());
71+
72+
return new_manifest;
73+
}
74+
75+
} // namespace iceberg
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#pragma once
21+
22+
/// \file iceberg/manifest/manifest_util_internal.h
23+
/// Internal utility functions for manifest operations.
24+
25+
#include <cstdint>
26+
#include <memory>
27+
#include <optional>
28+
#include <string>
29+
30+
#include "iceberg/iceberg_export.h"
31+
#include "iceberg/result.h"
32+
#include "iceberg/type_fwd.h"
33+
34+
namespace iceberg {
35+
36+
/// \brief Copy an append manifest with a new snapshot ID.
37+
///
38+
/// This function copies a manifest file that contains only ADDED entries,
39+
/// rewriting it with a new snapshot ID. This is similar to Java's
40+
/// ManifestFiles.copyAppendManifest.
41+
///
42+
/// \param manifest The manifest file to copy
43+
/// \param file_io File IO implementation to use
44+
/// \param schema Table schema
45+
/// \param spec Partition spec for the manifest
46+
/// \param snapshot_id The new snapshot ID to assign to entries
47+
/// \param output_path Path where the new manifest will be written
48+
/// \param format_version Table format version
49+
/// \param summary_builder Optional summary builder to update with file metrics
50+
/// \return The copied manifest file, or an error
51+
ICEBERG_EXPORT Result<ManifestFile> CopyAppendManifest(
52+
const ManifestFile& manifest, std::shared_ptr<FileIO> file_io,
53+
std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec,
54+
int64_t snapshot_id, const std::string& output_path, int8_t format_version,
55+
SnapshotSummaryBuilder* summary_builder = nullptr);
56+
57+
} // namespace iceberg

src/iceberg/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ iceberg_sources = files(
6767
'manifest/manifest_group.cc',
6868
'manifest/manifest_list.cc',
6969
'manifest/manifest_reader.cc',
70+
'manifest/manifest_util_internal.cc',
7071
'manifest/manifest_writer.cc',
7172
'manifest/rolling_manifest_writer.cc',
7273
'manifest/v1_metadata.cc',

src/iceberg/table_metadata.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ struct ICEBERG_EXPORT TableMetadata {
7575
static constexpr int8_t kMinFormatVersionRowLineage = 3;
7676
static constexpr int8_t kMinFormatVersionDefaultValues = 3;
7777
static constexpr int64_t kInitialSequenceNumber = 0;
78-
static constexpr int64_t kInvalidSequenceNumber = -1;
7978
static constexpr int64_t kInitialRowId = 0;
8079

8180
static inline const std::unordered_map<TypeId, int8_t> kMinFormatVersions = {};

src/iceberg/test/manifest_writer_versions_test.cc

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
#include "iceberg/arrow/arrow_file_io.h"
2929
#include "iceberg/avro/avro_register.h"
30+
#include "iceberg/constants.h"
3031
#include "iceberg/file_format.h"
3132
#include "iceberg/manifest/manifest_entry.h"
3233
#include "iceberg/manifest/manifest_list.h"
@@ -411,12 +412,11 @@ class ManifestWriterVersionsTest : public ::testing::Test {
411412

412413
TEST_F(ManifestWriterVersionsTest, TestV1Write) {
413414
auto manifest = WriteManifest(/*format_version=*/1, {data_file_});
414-
CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber,
415-
TableMetadata::kInvalidSequenceNumber);
415+
CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber);
416416
auto entries = ReadManifest(manifest);
417417
ASSERT_EQ(entries.size(), 1);
418-
CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber,
419-
TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData);
418+
CheckEntry(entries[0], kInvalidSequenceNumber, kInvalidSequenceNumber,
419+
DataFile::Content::kData);
420420
}
421421

422422
TEST_F(ManifestWriterVersionsTest, TestV1WriteDelete) {
@@ -449,13 +449,12 @@ TEST_F(ManifestWriterVersionsTest, TestV1WriteWithInheritance) {
449449

450450
TEST_F(ManifestWriterVersionsTest, TestV2Write) {
451451
auto manifest = WriteManifest(/*format_version=*/2, {data_file_});
452-
CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber,
453-
TableMetadata::kInvalidSequenceNumber);
452+
CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber);
454453
auto entries = ReadManifest(manifest);
455454
ASSERT_EQ(entries.size(), 1);
456455
ASSERT_EQ(manifest.content, ManifestContent::kData);
457-
CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber,
458-
TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData);
456+
CheckEntry(entries[0], kInvalidSequenceNumber, kInvalidSequenceNumber,
457+
DataFile::Content::kData);
459458
}
460459

461460
TEST_F(ManifestWriterVersionsTest, TestV2WriteWithInheritance) {
@@ -470,8 +469,7 @@ TEST_F(ManifestWriterVersionsTest, TestV2WriteWithInheritance) {
470469

471470
TEST_F(ManifestWriterVersionsTest, TestV2PlusWriteDeleteV2) {
472471
auto manifest = WriteDeleteManifest(/*format_version=*/2, delete_file_);
473-
CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber,
474-
TableMetadata::kInvalidSequenceNumber);
472+
CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber);
475473
auto entries = ReadManifest(manifest);
476474
ASSERT_EQ(entries.size(), 1);
477475
ASSERT_EQ(manifest.content, ManifestContent::kDeletes);
@@ -507,7 +505,7 @@ TEST_F(ManifestWriterVersionsTest, TestV2ManifestRewriteWithInheritance) {
507505

508506
// rewrite the manifest file using a v2 manifest
509507
auto rewritten_manifest = RewriteManifest(manifests[0], 2);
510-
CheckRewrittenManifest(rewritten_manifest, TableMetadata::kInvalidSequenceNumber,
508+
CheckRewrittenManifest(rewritten_manifest, kInvalidSequenceNumber,
511509
TableMetadata::kInitialSequenceNumber);
512510

513511
// add the v2 manifest to a v2 manifest list, with a sequence number
@@ -525,14 +523,12 @@ TEST_F(ManifestWriterVersionsTest, TestV2ManifestRewriteWithInheritance) {
525523

526524
TEST_F(ManifestWriterVersionsTest, TestV3Write) {
527525
auto manifest = WriteManifest(/*format_version=*/3, {data_file_});
528-
CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber,
529-
TableMetadata::kInvalidSequenceNumber);
526+
CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber);
530527
auto entries = ReadManifest(manifest);
531528
ASSERT_EQ(entries.size(), 1);
532529
ASSERT_EQ(manifest.content, ManifestContent::kData);
533-
CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber,
534-
TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData,
535-
ManifestStatus::kAdded, kFirstRowId);
530+
CheckEntry(entries[0], kInvalidSequenceNumber, kInvalidSequenceNumber,
531+
DataFile::Content::kData, ManifestStatus::kAdded, kFirstRowId);
536532
}
537533

538534
TEST_F(ManifestWriterVersionsTest, TestV3WriteWithInheritance) {
@@ -598,7 +594,7 @@ TEST_F(ManifestWriterVersionsTest, TestV3ManifestRewriteWithInheritance) {
598594

599595
// rewrite the manifest file using a v3 manifest
600596
auto rewritten_manifest = RewriteManifest(manifests[0], 3);
601-
CheckRewrittenManifest(rewritten_manifest, TableMetadata::kInvalidSequenceNumber,
597+
CheckRewrittenManifest(rewritten_manifest, kInvalidSequenceNumber,
602598
TableMetadata::kInitialSequenceNumber);
603599

604600
// add the v3 manifest to a v3 manifest list, with a sequence number

src/iceberg/type_fwd.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ struct SnapshotLogEntry;
120120
struct SnapshotRef;
121121
struct StatisticsFile;
122122
struct TableMetadata;
123+
class SnapshotSummaryBuilder;
123124

124125
/// \brief Expression.
125126
class BoundPredicate;
@@ -187,7 +188,6 @@ class TableUpdateContext;
187188
class Transaction;
188189

189190
/// \brief Update family.
190-
class AppendFiles;
191191
class ExpireSnapshots;
192192
class FastAppend;
193193
class PendingUpdate;

src/iceberg/update/append_files.h

Lines changed: 0 additions & 70 deletions
This file was deleted.

0 commit comments

Comments
 (0)