Skip to content

Commit 8de07c9

Browse files
WZhuowgtmac
authored andcommitted
fix comments
1 parent 56056b8 commit 8de07c9

3 files changed

Lines changed: 126 additions & 60 deletions

File tree

src/iceberg/table_scan.cc

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,10 @@ Result<ArrowArrayStream> FileScanTask::ToArrow(
301301
int64_t ChangelogScanTask::size_bytes() const {
302302
int64_t total_size = data_file_->file_size_in_bytes;
303303
for (const auto& delete_file : delete_files_) {
304+
ICEBERG_DCHECK(delete_file->content_size_in_bytes.has_value(),
305+
"Delete file content size must be available");
304306
total_size +=
305-
(delete_file->IsDeletionVector() ? delete_file->content_size_in_bytes.value_or(0)
307+
(delete_file->IsDeletionVector() ? delete_file->content_size_in_bytes.value()
306308
: delete_file->file_size_in_bytes);
307309
}
308310
return total_size;
@@ -811,10 +813,9 @@ IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_excl
811813

812814
std::unordered_set<int64_t> snapshot_ids;
813815
std::unordered_map<int64_t, int32_t> snapshot_ordinals;
814-
int32_t ordinal = 0;
815816
for (const auto& snapshot : changelog_snapshots) {
816817
snapshot_ids.insert(snapshot.first->snapshot_id);
817-
snapshot_ordinals[snapshot.first->snapshot_id] = ordinal++;
818+
snapshot_ordinals.try_emplace(snapshot.first->snapshot_id, snapshot_ordinals.size());
818819
}
819820

820821
std::vector<ManifestFile> data_manifests;
@@ -837,7 +838,8 @@ IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_excl
837838

838839
ICEBERG_ASSIGN_OR_RAISE(
839840
auto manifest_group,
840-
ManifestGroup::Make(io_, schema_, specs_by_id, std::move(data_manifests), {}));
841+
ManifestGroup::Make(io_, schema_, specs_by_id, std::move(data_manifests),
842+
/*delete_manifests=*/{}));
841843

842844
manifest_group->CaseSensitive(context_.case_sensitive)
843845
.Select(ScanColumns())
@@ -861,15 +863,14 @@ IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_excl
861863
tasks.reserve(entries.size());
862864

863865
for (auto& entry : entries) {
864-
if (!entry.snapshot_id.has_value() || entry.data_file == nullptr) {
865-
continue;
866-
}
866+
ICEBERG_PRECHECK(entry.snapshot_id.has_value() && entry.data_file,
867+
"Invalid manifest entry with missing snapshot id or data file");
867868

868869
int64_t commit_snapshot_id = entry.snapshot_id.value();
869870
auto ordinal_it = snapshot_ordinals.find(commit_snapshot_id);
870-
if (ordinal_it == snapshot_ordinals.end()) {
871-
continue;
872-
}
871+
ICEBERG_PRECHECK(ordinal_it != snapshot_ordinals.end(),
872+
"Invalid manifest entry with missing snapshot ordinal");
873+
873874
int32_t change_ordinal = ordinal_it->second;
874875

875876
if (ctx.drop_stats) {

src/iceberg/table_scan.h

Lines changed: 50 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -138,18 +138,10 @@ class ICEBERG_EXPORT ChangelogScanTask : public ScanTask {
138138
virtual ChangelogOperation operation() const = 0;
139139

140140
/// \brief The position of this change in the changelog order (0-based).
141-
virtual int32_t change_ordinal() const { return change_ordinal_; }
141+
int32_t change_ordinal() const { return change_ordinal_; }
142142

143143
/// \brief The snapshot ID that committed this change.
144-
virtual int64_t commit_snapshot_id() const { return commit_snapshot_id_; }
145-
146-
/// \brief The data file containing the added rows.
147-
const std::shared_ptr<DataFile>& data_file() const { return data_file_; }
148-
149-
/// \brief Delete files that apply to this data file.
150-
const std::vector<std::shared_ptr<DataFile>>& delete_files() const {
151-
return delete_files_;
152-
}
144+
int64_t commit_snapshot_id() const { return commit_snapshot_id_; }
153145

154146
/// \brief Residual filter to apply after reading.
155147
const std::shared_ptr<Expression>& residual_filter() const { return residual_filter_; }
@@ -162,27 +154,70 @@ class ICEBERG_EXPORT ChangelogScanTask : public ScanTask {
162154
std::shared_ptr<Expression> residual_filter_;
163155
};
164156

165-
/// \brief A scan task for reading rows that were added between snapshots.
157+
/// \brief A scan task for inserts generated by adding a data file to the table.
166158
///
167159
/// This task represents data files that were added to the table, along with any
168160
/// delete files that should be applied when reading the data.
161+
///
162+
/// Added data files may have matching delete files. This may happen if a
163+
/// matching position delete file is committed in the same snapshot or if changes
164+
/// for multiple snapshots are squashed together.
165+
///
166+
/// Suppose snapshot S1 adds data files F1, F2, F3 and a position delete file,
167+
/// D1, that marks particular records in F1 as deleted. A scan for changes
168+
/// generated by S1 should include the following tasks:
169+
/// - AddedRowsScanTask(file=F1, deletes=[D1], snapshot=S1)
170+
/// - AddedRowsScanTask(file=F2, deletes=[], snapshot=S1)
171+
/// - AddedRowsScanTask(file=F3, deletes=[], snapshot=S1)
172+
///
173+
/// Readers consuming these tasks should produce added records with metadata
174+
/// like change ordinal and commit snapshot ID.
169175
class ICEBERG_EXPORT AddedRowsScanTask : public ChangelogScanTask {
170176
public:
171177
using ChangelogScanTask::ChangelogScanTask;
172178

173179
ChangelogOperation operation() const override { return ChangelogOperation::kInsert; }
180+
181+
/// \brief The data file containing the added rows.
182+
const std::shared_ptr<DataFile>& data_file() const { return data_file_; }
183+
184+
/// \brief A list of delete files to apply when reading the data file in this task.
185+
///
186+
/// @return A list of delete files to apply
187+
const std::vector<std::shared_ptr<DataFile>>& delete_files() const {
188+
return delete_files_;
189+
}
174190
};
175191

176-
/// \brief A scan task for reading data files that were deleted between snapshots.
192+
/// \brief A scan task for deletes generated by removing a data file from the table.
193+
///
194+
/// All historical delete files added earlier must be applied while reading the data file.
195+
/// This is required to output only those data records that were live when the data file
196+
/// was removed.
177197
///
178-
/// This task represents data files that were removed from the table. Unlike
179-
/// AddedRowsScanTask, delete files are not applicable here since the entire
180-
/// data file was deleted.
198+
/// Suppose snapshot S1 contains data files F1, F2, F3. Then snapshot S2 adds a position
199+
/// delete file, D1, that deletes records from F2 and snapshot S3 removes F2 entirely. A
200+
/// scan for changes generated by S3 should include the following task:
201+
/// - DeletedDataFileScanTask(file=F2, existing-deletes=[D1], snapshot=S3)
202+
///
203+
/// Readers consuming these tasks should produce deleted records with metadata like
204+
/// change ordinal and commit snapshot ID.
181205
class ICEBERG_EXPORT DeletedDataFileScanTask : public ChangelogScanTask {
182206
public:
183207
using ChangelogScanTask::ChangelogScanTask;
184208

185209
ChangelogOperation operation() const override { return ChangelogOperation::kDelete; }
210+
211+
/// \brief The data file that was deleted.
212+
const std::shared_ptr<DataFile>& data_file() const { return data_file_; }
213+
214+
/// \brief A list of previously added delete files to apply when reading the
215+
/// data file in this task.
216+
///
217+
/// \return A list of delete files to apply
218+
const std::vector<std::shared_ptr<DataFile>>& existing_deletes() const {
219+
return delete_files_;
220+
}
186221
};
187222

188223
namespace internal {
@@ -206,24 +241,8 @@ struct TableScanContext {
206241

207242
// Validate the context parameters to see if they have conflicts.
208243
[[nodiscard]] Status Validate() const;
209-
210-
/// \brief Returns true if this scan is a current lineage scan, which means it does not
211-
/// specify from/to snapshot IDs.
212-
bool IsScanCurrentLineage() const;
213-
214-
/// \brief Get the snapshot ID to scan up to (inclusive) based on the context.
215-
Result<int64_t> ToSnapshotIdInclusive(const TableMetadata& metadata) const;
216-
217-
/// \brief Get the snapshot ID to scan from (exclusive) based on the context.
218-
Result<std::optional<int64_t>> FromSnapshotIdExclusive(
219-
const TableMetadata& metadata, int64_t to_snapshot_id_inclusive) const;
220244
};
221245

222-
// Internal validation functions for IncrementalScanBuilder
223-
Status CheckSnapshotValid(const TableMetadata& metadata, int64_t snapshot_id);
224-
Result<int64_t> CheckRefValid(const TableMetadata& metadata, const std::string& ref);
225-
Status CheckBranchValid(const TableMetadata& metadata, const std::string& branch);
226-
227246
} // namespace internal
228247

229248
// Concept to check if a type is an incremental scan

src/iceberg/test/incremental_changelog_scan_test.cc

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,22 @@ namespace iceberg {
3434

3535
namespace {
3636

37+
const std::string& TaskFilePath(const std::shared_ptr<ChangelogScanTask>& task) {
38+
if (auto added = std::dynamic_pointer_cast<AddedRowsScanTask>(task)) {
39+
return added->data_file()->file_path;
40+
}
41+
if (auto deleted = std::dynamic_pointer_cast<DeletedDataFileScanTask>(task)) {
42+
return deleted->data_file()->file_path;
43+
}
44+
45+
static const std::string empty_path;
46+
return empty_path;
47+
}
48+
3749
/// \brief Sort changelog scan tasks for deterministic ordering.
3850
/// Sorts by change_ordinal, then by operation type name, then by file path.
39-
void SortTasks(std::vector<std::shared_ptr<ChangelogScanTask>>& tasks) {
51+
template <typename TaskType>
52+
void SortTasks(std::vector<std::shared_ptr<TaskType>>& tasks) {
4053
std::ranges::sort(tasks, [](const auto& t1, const auto& t2) {
4154
if (t1->change_ordinal() != t2->change_ordinal()) {
4255
return t1->change_ordinal() < t2->change_ordinal();
@@ -45,7 +58,8 @@ void SortTasks(std::vector<std::shared_ptr<ChangelogScanTask>>& tasks) {
4558
return static_cast<uint8_t>(t1->operation()) <
4659
static_cast<uint8_t>(t2->operation());
4760
}
48-
return t1->data_file()->file_path < t2->data_file()->file_path;
61+
return TaskFilePath(std::static_pointer_cast<ChangelogScanTask>(t1)) <
62+
TaskFilePath(std::static_pointer_cast<ChangelogScanTask>(t2));
4963
});
5064
}
5165

@@ -68,6 +82,10 @@ TEST_P(IncrementalChangelogScanTest, DataFilters) {
6882
auto snapshot_a = MakeAppendSnapshotWithPartitionValues(
6983
version, 1000L, std::nullopt, 1L, {{"/path/to/file_a.parquet", partition_a}},
7084
partitioned_spec_);
85+
SnapshotCache cache_a(snapshot_a.get());
86+
ICEBERG_UNWRAP_OR_FAIL(auto manifests_a, cache_a.DataManifests(file_io_));
87+
ASSERT_EQ(manifests_a.size(), 1);
88+
const auto& manifest_a = manifests_a[0];
7189

7290
// Create snapshot 2 with file_b (separate manifest list, not inheriting from snap1)
7391
auto snapshot_b = MakeAppendSnapshotWithPartitionValues(
@@ -81,6 +99,10 @@ TEST_P(IncrementalChangelogScanTest, DataFilters) {
8199
.snapshot_id = 2000L, .retention = SnapshotRef::Branch{}})}},
82100
partitioned_spec_);
83101

102+
// Make the first manifest unavailable. Planning should still succeed because the
103+
// partition filter can skip reading file_a's manifest entirely.
104+
EXPECT_THAT(file_io_->DeleteFile(manifest_a.manifest_path), IsOk());
105+
84106
// Filter by data="k" which should match only file_b (bucket("k", 16) = 1)
85107
ICEBERG_UNWRAP_OR_FAIL(auto builder, IncrementalChangelogScanBuilder::Make(
86108
partitioned_metadata, file_io_));
@@ -94,8 +116,10 @@ TEST_P(IncrementalChangelogScanTest, DataFilters) {
94116
EXPECT_EQ(t1->change_ordinal(), 1);
95117
EXPECT_EQ(t1->commit_snapshot_id(), 2000L);
96118
EXPECT_EQ(t1->operation(), ChangelogOperation::kInsert);
97-
EXPECT_EQ(t1->data_file()->file_path, "/path/to/file_b.parquet");
98-
EXPECT_TRUE(t1->delete_files().empty());
119+
auto insert_t1 = std::dynamic_pointer_cast<AddedRowsScanTask>(t1);
120+
ASSERT_NE(insert_t1, nullptr);
121+
EXPECT_EQ(insert_t1->data_file()->file_path, "/path/to/file_b.parquet");
122+
EXPECT_TRUE(insert_t1->delete_files().empty());
99123
}
100124

101125
TEST_P(IncrementalChangelogScanTest, Overwrites) {
@@ -130,16 +154,20 @@ TEST_P(IncrementalChangelogScanTest, Overwrites) {
130154
EXPECT_EQ(t1->change_ordinal(), 0);
131155
EXPECT_EQ(t1->commit_snapshot_id(), 2000L);
132156
EXPECT_EQ(t1->operation(), ChangelogOperation::kInsert);
133-
EXPECT_EQ(t1->data_file()->file_path, "/path/to/file_a2.parquet");
134-
EXPECT_TRUE(t1->delete_files().empty());
157+
auto insert_t1 = std::dynamic_pointer_cast<AddedRowsScanTask>(t1);
158+
ASSERT_NE(insert_t1, nullptr);
159+
EXPECT_EQ(insert_t1->data_file()->file_path, "/path/to/file_a2.parquet");
160+
EXPECT_TRUE(insert_t1->delete_files().empty());
135161

136162
// Second task: deleted file (DELETE operation)
137163
auto t2 = tasks[1];
138164
EXPECT_EQ(t2->change_ordinal(), 0);
139165
EXPECT_EQ(t2->commit_snapshot_id(), 2000L);
140166
EXPECT_EQ(t2->operation(), ChangelogOperation::kDelete);
141-
EXPECT_EQ(t2->data_file()->file_path, "/path/to/file_a.parquet");
142-
EXPECT_TRUE(t2->delete_files().empty());
167+
auto delete_t2 = std::dynamic_pointer_cast<DeletedDataFileScanTask>(t2);
168+
ASSERT_NE(delete_t2, nullptr);
169+
EXPECT_EQ(delete_t2->data_file()->file_path, "/path/to/file_a.parquet");
170+
EXPECT_TRUE(delete_t2->existing_deletes().empty());
143171
}
144172

145173
TEST_P(IncrementalChangelogScanTest, DuplicatedManifests) {
@@ -190,10 +218,14 @@ TEST_P(IncrementalChangelogScanTest, DuplicatedManifests) {
190218
ASSERT_EQ(tasks.size(), 2);
191219
SortTasks(tasks);
192220

193-
EXPECT_EQ(tasks[0]->data_file()->file_path, "/path/to/file_a.parquet");
221+
auto insert_t1 = std::dynamic_pointer_cast<AddedRowsScanTask>(tasks[0]);
222+
ASSERT_NE(insert_t1, nullptr);
223+
EXPECT_EQ(insert_t1->data_file()->file_path, "/path/to/file_a.parquet");
194224
EXPECT_EQ(tasks[0]->commit_snapshot_id(), 1000L);
195225

196-
EXPECT_EQ(tasks[1]->data_file()->file_path, "/path/to/file_b.parquet");
226+
auto insert_t2 = std::dynamic_pointer_cast<AddedRowsScanTask>(tasks[1]);
227+
ASSERT_NE(insert_t2, nullptr);
228+
EXPECT_EQ(insert_t2->data_file()->file_path, "/path/to/file_b.parquet");
197229
EXPECT_EQ(tasks[1]->commit_snapshot_id(), 2000L);
198230
}
199231

@@ -225,8 +257,10 @@ TEST_P(IncrementalChangelogScanTest, FileDeletes) {
225257
EXPECT_EQ(t1->change_ordinal(), 0);
226258
EXPECT_EQ(t1->commit_snapshot_id(), 2000L);
227259
EXPECT_EQ(t1->operation(), ChangelogOperation::kDelete);
228-
EXPECT_EQ(t1->data_file()->file_path, "/path/to/file_a.parquet");
229-
EXPECT_TRUE(t1->delete_files().empty());
260+
auto delete_t1 = std::dynamic_pointer_cast<DeletedDataFileScanTask>(t1);
261+
ASSERT_NE(delete_t1, nullptr);
262+
EXPECT_EQ(delete_t1->data_file()->file_path, "/path/to/file_a.parquet");
263+
EXPECT_TRUE(delete_t1->existing_deletes().empty());
230264
}
231265

232266
TEST_P(IncrementalChangelogScanTest, ExistingEntriesInNewDataManifestsAreIgnored) {
@@ -278,8 +312,10 @@ TEST_P(IncrementalChangelogScanTest, ExistingEntriesInNewDataManifestsAreIgnored
278312
EXPECT_EQ(t1->change_ordinal(), 0);
279313
EXPECT_EQ(t1->commit_snapshot_id(), 3000L);
280314
EXPECT_EQ(t1->operation(), ChangelogOperation::kInsert);
281-
EXPECT_EQ(t1->data_file()->file_path, "/path/to/file_c.parquet");
282-
EXPECT_TRUE(t1->delete_files().empty());
315+
auto insert_t1 = std::dynamic_pointer_cast<AddedRowsScanTask>(t1);
316+
ASSERT_NE(insert_t1, nullptr);
317+
EXPECT_EQ(insert_t1->data_file()->file_path, "/path/to/file_c.parquet");
318+
EXPECT_TRUE(insert_t1->delete_files().empty());
283319
}
284320

285321
TEST_P(IncrementalChangelogScanTest, DataFileRewrites) {
@@ -330,13 +366,17 @@ TEST_P(IncrementalChangelogScanTest, DataFileRewrites) {
330366
EXPECT_EQ(t1->change_ordinal(), 0);
331367
EXPECT_EQ(t1->commit_snapshot_id(), 1000L);
332368
EXPECT_EQ(t1->operation(), ChangelogOperation::kInsert);
333-
EXPECT_EQ(t1->data_file()->file_path, "/path/to/file_a.parquet");
369+
auto insert_t1 = std::dynamic_pointer_cast<AddedRowsScanTask>(t1);
370+
ASSERT_NE(insert_t1, nullptr);
371+
EXPECT_EQ(insert_t1->data_file()->file_path, "/path/to/file_a.parquet");
334372

335373
auto t2 = tasks[1];
336374
EXPECT_EQ(t2->change_ordinal(), 1);
337375
EXPECT_EQ(t2->commit_snapshot_id(), 2000L);
338376
EXPECT_EQ(t2->operation(), ChangelogOperation::kInsert);
339-
EXPECT_EQ(t2->data_file()->file_path, "/path/to/file_b.parquet");
377+
auto insert_t2 = std::dynamic_pointer_cast<AddedRowsScanTask>(t2);
378+
ASSERT_NE(insert_t2, nullptr);
379+
EXPECT_EQ(insert_t2->data_file()->file_path, "/path/to/file_b.parquet");
340380
}
341381

342382
TEST_P(IncrementalChangelogScanTest, ManifestRewritesAreIgnored) {
@@ -393,19 +433,25 @@ TEST_P(IncrementalChangelogScanTest, ManifestRewritesAreIgnored) {
393433
EXPECT_EQ(t1->change_ordinal(), 0);
394434
EXPECT_EQ(t1->commit_snapshot_id(), 1000L);
395435
EXPECT_EQ(t1->operation(), ChangelogOperation::kInsert);
396-
EXPECT_EQ(t1->data_file()->file_path, "/path/to/file_a.parquet");
436+
auto insert_t1 = std::dynamic_pointer_cast<AddedRowsScanTask>(t1);
437+
ASSERT_NE(insert_t1, nullptr);
438+
EXPECT_EQ(insert_t1->data_file()->file_path, "/path/to/file_a.parquet");
397439

398440
auto t2 = tasks[1];
399441
EXPECT_EQ(t2->change_ordinal(), 1);
400442
EXPECT_EQ(t2->commit_snapshot_id(), 2000L);
401443
EXPECT_EQ(t2->operation(), ChangelogOperation::kInsert);
402-
EXPECT_EQ(t2->data_file()->file_path, "/path/to/file_b.parquet");
444+
auto insert_t2 = std::dynamic_pointer_cast<AddedRowsScanTask>(t2);
445+
ASSERT_NE(insert_t2, nullptr);
446+
EXPECT_EQ(insert_t2->data_file()->file_path, "/path/to/file_b.parquet");
403447

404448
auto t3 = tasks[2];
405449
EXPECT_EQ(t3->change_ordinal(), 2);
406450
EXPECT_EQ(t3->commit_snapshot_id(), 4000L);
407451
EXPECT_EQ(t3->operation(), ChangelogOperation::kInsert);
408-
EXPECT_EQ(t3->data_file()->file_path, "/path/to/file_c.parquet");
452+
auto insert_t3 = std::dynamic_pointer_cast<AddedRowsScanTask>(t3);
453+
ASSERT_NE(insert_t3, nullptr);
454+
EXPECT_EQ(insert_t3->data_file()->file_path, "/path/to/file_c.parquet");
409455
}
410456

411457
TEST_P(IncrementalChangelogScanTest, DeleteFilesAreNotSupported) {

0 commit comments

Comments
 (0)