Skip to content

Commit 82abb17

Browse files
committed
fix comments
1 parent 22fc519 commit 82abb17

File tree

3 files changed

+118
-49
lines changed

3 files changed

+118
-49
lines changed

src/iceberg/table_scan.cc

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,10 @@ Result<ArrowArrayStream> FileScanTask::ToArrow(
301301
int64_t ChangelogScanTask::size_bytes() const {
302302
int64_t total_size = data_file_->file_size_in_bytes;
303303
for (const auto& delete_file : delete_files_) {
304+
ICEBERG_DCHECK(delete_file->content_size_in_bytes.has_value(),
305+
"Delete file content size must be available");
304306
total_size +=
305-
(delete_file->IsDeletionVector() ? delete_file->content_size_in_bytes.value_or(0)
307+
(delete_file->IsDeletionVector() ? delete_file->content_size_in_bytes.value()
306308
: delete_file->file_size_in_bytes);
307309
}
308310
return total_size;
@@ -811,10 +813,9 @@ IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_excl
811813

812814
std::unordered_set<int64_t> snapshot_ids;
813815
std::unordered_map<int64_t, int32_t> snapshot_ordinals;
814-
int32_t ordinal = 0;
815816
for (const auto& snapshot : changelog_snapshots) {
816817
snapshot_ids.insert(snapshot.first->snapshot_id);
817-
snapshot_ordinals[snapshot.first->snapshot_id] = ordinal++;
818+
snapshot_ordinals[snapshot.first->snapshot_id] = snapshot_ordinals.size();
818819
}
819820

820821
std::vector<ManifestFile> data_manifests;
@@ -837,7 +838,8 @@ IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_excl
837838

838839
ICEBERG_ASSIGN_OR_RAISE(
839840
auto manifest_group,
840-
ManifestGroup::Make(io_, schema_, specs_by_id, std::move(data_manifests), {}));
841+
ManifestGroup::Make(io_, schema_, specs_by_id, std::move(data_manifests),
842+
/*delete_manifests=*/{}));
841843

842844
manifest_group->CaseSensitive(context_.case_sensitive)
843845
.Select(ScanColumns())
@@ -861,15 +863,14 @@ IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_excl
861863
tasks.reserve(entries.size());
862864

863865
for (auto& entry : entries) {
864-
if (!entry.snapshot_id.has_value() || entry.data_file == nullptr) {
865-
continue;
866-
}
866+
ICEBERG_PRECHECK(entry.snapshot_id.has_value() && entry.data_file,
867+
"Invalid manifest entry with missing snapshot id or data file");
867868

868869
int64_t commit_snapshot_id = entry.snapshot_id.value();
869870
auto ordinal_it = snapshot_ordinals.find(commit_snapshot_id);
870-
if (ordinal_it == snapshot_ordinals.end()) {
871-
continue;
872-
}
871+
ICEBERG_PRECHECK(ordinal_it != snapshot_ordinals.end(),
872+
"Invalid manifest entry with missing snapshot ordinal");
873+
873874
int32_t change_ordinal = ordinal_it->second;
874875

875876
if (ctx.drop_stats) {

src/iceberg/table_scan.h

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -138,18 +138,10 @@ class ICEBERG_EXPORT ChangelogScanTask : public ScanTask {
138138
virtual ChangelogOperation operation() const = 0;
139139

140140
/// \brief The position of this change in the changelog order (0-based).
141-
virtual int32_t change_ordinal() const { return change_ordinal_; }
141+
int32_t change_ordinal() const { return change_ordinal_; }
142142

143143
/// \brief The snapshot ID that committed this change.
144-
virtual int64_t commit_snapshot_id() const { return commit_snapshot_id_; }
145-
146-
/// \brief The data file containing the added rows.
147-
const std::shared_ptr<DataFile>& data_file() const { return data_file_; }
148-
149-
/// \brief Delete files that apply to this data file.
150-
const std::vector<std::shared_ptr<DataFile>>& delete_files() const {
151-
return delete_files_;
152-
}
144+
int64_t commit_snapshot_id() const { return commit_snapshot_id_; }
153145

154146
/// \brief Residual filter to apply after reading.
155147
const std::shared_ptr<Expression>& residual_filter() const { return residual_filter_; }
@@ -162,27 +154,70 @@ class ICEBERG_EXPORT ChangelogScanTask : public ScanTask {
162154
std::shared_ptr<Expression> residual_filter_;
163155
};
164156

165-
/// \brief A scan task for reading rows that were added between snapshots.
157+
/// \brief A scan task for inserts generated by adding a data file to the table.
166158
///
167159
/// This task represents data files that were added to the table, along with any
168160
/// delete files that should be applied when reading the data.
161+
///
162+
/// Added data files may have matching delete files. This may happen if a
163+
/// matching position delete file is committed in the same snapshot or if changes
164+
/// for multiple snapshots are squashed together.
165+
///
166+
/// Suppose snapshot S1 adds data files F1, F2, F3 and a position delete file,
167+
/// D1, that marks particular records in F1 as deleted. A scan for changes
168+
/// generated by S1 should include the following tasks:
169+
/// - AddedRowsScanTask(file=F1, deletes=[D1], snapshot=S1)
170+
/// - AddedRowsScanTask(file=F2, deletes=[], snapshot=S1)
171+
/// - AddedRowsScanTask(file=F3, deletes=[], snapshot=S1)
172+
///
173+
/// Readers consuming these tasks should produce added records with metadata
174+
/// like change ordinal and commit snapshot ID.
169175
class ICEBERG_EXPORT AddedRowsScanTask : public ChangelogScanTask {
170176
public:
171177
using ChangelogScanTask::ChangelogScanTask;
172178

173179
ChangelogOperation operation() const override { return ChangelogOperation::kInsert; }
180+
181+
/// \brief The data file containing the added rows.
182+
const std::shared_ptr<DataFile>& data_file() const { return data_file_; }
183+
184+
/// \brief A list of delete files to apply when reading the data file in this task.
185+
///
186+
/// @return A list of delete files to apply
187+
const std::vector<std::shared_ptr<DataFile>>& delete_files() const {
188+
return delete_files_;
189+
}
174190
};
175191

176-
/// \brief A scan task for reading data files that were deleted between snapshots.
192+
/// \brief A scan task for deletes generated by removing a data file from the table.
193+
///
194+
/// All historical delete files added earlier must be applied while reading the data file.
195+
/// This is required to output only those data records that were live when the data file
196+
/// was removed.
197+
///
198+
/// Suppose snapshot S1 contains data files F1, F2, F3. Then snapshot S2 adds a position
199+
/// delete file, D1, that deletes records from F2 and snapshot S3 removes F2 entirely. A
200+
/// scan for changes generated by S3 should include the following task:
201+
/// - DeletedDataFileScanTask(file=F2, existing-deletes=[D1], snapshot=S3)
177202
///
178-
/// This task represents data files that were removed from the table. Unlike
179-
/// AddedRowsScanTask, delete files are not applicable here since the entire
180-
/// data file was deleted.
203+
/// Readers consuming these tasks should produce deleted records with metadata like
204+
/// change ordinal and commit snapshot ID.
181205
class ICEBERG_EXPORT DeletedDataFileScanTask : public ChangelogScanTask {
182206
public:
183207
using ChangelogScanTask::ChangelogScanTask;
184208

185209
ChangelogOperation operation() const override { return ChangelogOperation::kDelete; }
210+
211+
/// \brief The data file that was deleted.
212+
const std::shared_ptr<DataFile>& data_file() const { return data_file_; }
213+
214+
/// \brief A list of previously added delete files to apply when reading the
215+
/// data file in this task.
216+
///
217+
/// \return A list of delete files to apply
218+
const std::vector<std::shared_ptr<DataFile>>& existing_deletes() const {
219+
return delete_files_;
220+
}
186221
};
187222

188223
namespace internal {
@@ -219,11 +254,6 @@ struct TableScanContext {
219254
const TableMetadata& metadata, int64_t to_snapshot_id_inclusive) const;
220255
};
221256

222-
// Internal validation functions for IncrementalScanBuilder
223-
Status CheckSnapshotValid(const TableMetadata& metadata, int64_t snapshot_id);
224-
Result<int64_t> CheckRefValid(const TableMetadata& metadata, const std::string& ref);
225-
Status CheckBranchValid(const TableMetadata& metadata, const std::string& branch);
226-
227257
} // namespace internal
228258

229259
// Concept to check if a type is an incremental scan

src/iceberg/test/incremental_changelog_scan_test.cc

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,22 @@ namespace iceberg {
3434

3535
namespace {
3636

37+
const std::string& TaskFilePath(const std::shared_ptr<ChangelogScanTask>& task) {
38+
if (auto added = std::dynamic_pointer_cast<AddedRowsScanTask>(task)) {
39+
return added->data_file()->file_path;
40+
}
41+
if (auto deleted = std::dynamic_pointer_cast<DeletedDataFileScanTask>(task)) {
42+
return deleted->data_file()->file_path;
43+
}
44+
45+
static const std::string empty_path;
46+
return empty_path;
47+
}
48+
3749
/// \brief Sort changelog scan tasks for deterministic ordering.
3850
/// Sorts by change_ordinal, then by operation type name, then by file path.
39-
void SortTasks(std::vector<std::shared_ptr<ChangelogScanTask>>& tasks) {
51+
template <typename TaskType>
52+
void SortTasks(std::vector<std::shared_ptr<TaskType>>& tasks) {
4053
std::ranges::sort(tasks, [](const auto& t1, const auto& t2) {
4154
if (t1->change_ordinal() != t2->change_ordinal()) {
4255
return t1->change_ordinal() < t2->change_ordinal();
@@ -45,7 +58,8 @@ void SortTasks(std::vector<std::shared_ptr<ChangelogScanTask>>& tasks) {
4558
return static_cast<uint8_t>(t1->operation()) <
4659
static_cast<uint8_t>(t2->operation());
4760
}
48-
return t1->data_file()->file_path < t2->data_file()->file_path;
61+
return TaskFilePath(std::static_pointer_cast<ChangelogScanTask>(t1)) <
62+
TaskFilePath(std::static_pointer_cast<ChangelogScanTask>(t2));
4963
});
5064
}
5165

@@ -94,8 +108,10 @@ TEST_P(IncrementalChangelogScanTest, DataFilters) {
94108
EXPECT_EQ(t1->change_ordinal(), 1);
95109
EXPECT_EQ(t1->commit_snapshot_id(), 2000L);
96110
EXPECT_EQ(t1->operation(), ChangelogOperation::kInsert);
97-
EXPECT_EQ(t1->data_file()->file_path, "/path/to/file_b.parquet");
98-
EXPECT_TRUE(t1->delete_files().empty());
111+
auto insert_t1 = std::dynamic_pointer_cast<AddedRowsScanTask>(t1);
112+
ASSERT_NE(insert_t1, nullptr);
113+
EXPECT_EQ(insert_t1->data_file()->file_path, "/path/to/file_b.parquet");
114+
EXPECT_TRUE(insert_t1->delete_files().empty());
99115
}
100116

101117
TEST_P(IncrementalChangelogScanTest, Overwrites) {
@@ -130,16 +146,20 @@ TEST_P(IncrementalChangelogScanTest, Overwrites) {
130146
EXPECT_EQ(t1->change_ordinal(), 0);
131147
EXPECT_EQ(t1->commit_snapshot_id(), 2000L);
132148
EXPECT_EQ(t1->operation(), ChangelogOperation::kInsert);
133-
EXPECT_EQ(t1->data_file()->file_path, "/path/to/file_a2.parquet");
134-
EXPECT_TRUE(t1->delete_files().empty());
149+
auto insert_t1 = std::dynamic_pointer_cast<AddedRowsScanTask>(t1);
150+
ASSERT_NE(insert_t1, nullptr);
151+
EXPECT_EQ(insert_t1->data_file()->file_path, "/path/to/file_a2.parquet");
152+
EXPECT_TRUE(insert_t1->delete_files().empty());
135153

136154
// Second task: deleted file (DELETE operation)
137155
auto t2 = tasks[1];
138156
EXPECT_EQ(t2->change_ordinal(), 0);
139157
EXPECT_EQ(t2->commit_snapshot_id(), 2000L);
140158
EXPECT_EQ(t2->operation(), ChangelogOperation::kDelete);
141-
EXPECT_EQ(t2->data_file()->file_path, "/path/to/file_a.parquet");
142-
EXPECT_TRUE(t2->delete_files().empty());
159+
auto delete_t2 = std::dynamic_pointer_cast<DeletedDataFileScanTask>(t2);
160+
ASSERT_NE(delete_t2, nullptr);
161+
EXPECT_EQ(delete_t2->data_file()->file_path, "/path/to/file_a.parquet");
162+
EXPECT_TRUE(delete_t2->existing_deletes().empty());
143163
}
144164

145165
TEST_P(IncrementalChangelogScanTest, DuplicatedManifests) {
@@ -190,10 +210,14 @@ TEST_P(IncrementalChangelogScanTest, DuplicatedManifests) {
190210
ASSERT_EQ(tasks.size(), 2);
191211
SortTasks(tasks);
192212

193-
EXPECT_EQ(tasks[0]->data_file()->file_path, "/path/to/file_a.parquet");
213+
auto insert_t1 = std::dynamic_pointer_cast<AddedRowsScanTask>(tasks[0]);
214+
ASSERT_NE(insert_t1, nullptr);
215+
EXPECT_EQ(insert_t1->data_file()->file_path, "/path/to/file_a.parquet");
194216
EXPECT_EQ(tasks[0]->commit_snapshot_id(), 1000L);
195217

196-
EXPECT_EQ(tasks[1]->data_file()->file_path, "/path/to/file_b.parquet");
218+
auto insert_t2 = std::dynamic_pointer_cast<AddedRowsScanTask>(tasks[1]);
219+
ASSERT_NE(insert_t2, nullptr);
220+
EXPECT_EQ(insert_t2->data_file()->file_path, "/path/to/file_b.parquet");
197221
EXPECT_EQ(tasks[1]->commit_snapshot_id(), 2000L);
198222
}
199223

@@ -225,8 +249,10 @@ TEST_P(IncrementalChangelogScanTest, FileDeletes) {
225249
EXPECT_EQ(t1->change_ordinal(), 0);
226250
EXPECT_EQ(t1->commit_snapshot_id(), 2000L);
227251
EXPECT_EQ(t1->operation(), ChangelogOperation::kDelete);
228-
EXPECT_EQ(t1->data_file()->file_path, "/path/to/file_a.parquet");
229-
EXPECT_TRUE(t1->delete_files().empty());
252+
auto delete_t1 = std::dynamic_pointer_cast<DeletedDataFileScanTask>(t1);
253+
ASSERT_NE(delete_t1, nullptr);
254+
EXPECT_EQ(delete_t1->data_file()->file_path, "/path/to/file_a.parquet");
255+
EXPECT_TRUE(delete_t1->existing_deletes().empty());
230256
}
231257

232258
TEST_P(IncrementalChangelogScanTest, ExistingEntriesInNewDataManifestsAreIgnored) {
@@ -278,8 +304,10 @@ TEST_P(IncrementalChangelogScanTest, ExistingEntriesInNewDataManifestsAreIgnored
278304
EXPECT_EQ(t1->change_ordinal(), 0);
279305
EXPECT_EQ(t1->commit_snapshot_id(), 3000L);
280306
EXPECT_EQ(t1->operation(), ChangelogOperation::kInsert);
281-
EXPECT_EQ(t1->data_file()->file_path, "/path/to/file_c.parquet");
282-
EXPECT_TRUE(t1->delete_files().empty());
307+
auto insert_t1 = std::dynamic_pointer_cast<AddedRowsScanTask>(t1);
308+
ASSERT_NE(insert_t1, nullptr);
309+
EXPECT_EQ(insert_t1->data_file()->file_path, "/path/to/file_c.parquet");
310+
EXPECT_TRUE(insert_t1->delete_files().empty());
283311
}
284312

285313
TEST_P(IncrementalChangelogScanTest, DataFileRewrites) {
@@ -330,13 +358,17 @@ TEST_P(IncrementalChangelogScanTest, DataFileRewrites) {
330358
EXPECT_EQ(t1->change_ordinal(), 0);
331359
EXPECT_EQ(t1->commit_snapshot_id(), 1000L);
332360
EXPECT_EQ(t1->operation(), ChangelogOperation::kInsert);
333-
EXPECT_EQ(t1->data_file()->file_path, "/path/to/file_a.parquet");
361+
auto insert_t1 = std::dynamic_pointer_cast<AddedRowsScanTask>(t1);
362+
ASSERT_NE(insert_t1, nullptr);
363+
EXPECT_EQ(insert_t1->data_file()->file_path, "/path/to/file_a.parquet");
334364

335365
auto t2 = tasks[1];
336366
EXPECT_EQ(t2->change_ordinal(), 1);
337367
EXPECT_EQ(t2->commit_snapshot_id(), 2000L);
338368
EXPECT_EQ(t2->operation(), ChangelogOperation::kInsert);
339-
EXPECT_EQ(t2->data_file()->file_path, "/path/to/file_b.parquet");
369+
auto insert_t2 = std::dynamic_pointer_cast<AddedRowsScanTask>(t2);
370+
ASSERT_NE(insert_t2, nullptr);
371+
EXPECT_EQ(insert_t2->data_file()->file_path, "/path/to/file_b.parquet");
340372
}
341373

342374
TEST_P(IncrementalChangelogScanTest, ManifestRewritesAreIgnored) {
@@ -393,19 +425,25 @@ TEST_P(IncrementalChangelogScanTest, ManifestRewritesAreIgnored) {
393425
EXPECT_EQ(t1->change_ordinal(), 0);
394426
EXPECT_EQ(t1->commit_snapshot_id(), 1000L);
395427
EXPECT_EQ(t1->operation(), ChangelogOperation::kInsert);
396-
EXPECT_EQ(t1->data_file()->file_path, "/path/to/file_a.parquet");
428+
auto insert_t1 = std::dynamic_pointer_cast<AddedRowsScanTask>(t1);
429+
ASSERT_NE(insert_t1, nullptr);
430+
EXPECT_EQ(insert_t1->data_file()->file_path, "/path/to/file_a.parquet");
397431

398432
auto t2 = tasks[1];
399433
EXPECT_EQ(t2->change_ordinal(), 1);
400434
EXPECT_EQ(t2->commit_snapshot_id(), 2000L);
401435
EXPECT_EQ(t2->operation(), ChangelogOperation::kInsert);
402-
EXPECT_EQ(t2->data_file()->file_path, "/path/to/file_b.parquet");
436+
auto insert_t2 = std::dynamic_pointer_cast<AddedRowsScanTask>(t2);
437+
ASSERT_NE(insert_t2, nullptr);
438+
EXPECT_EQ(insert_t2->data_file()->file_path, "/path/to/file_b.parquet");
403439

404440
auto t3 = tasks[2];
405441
EXPECT_EQ(t3->change_ordinal(), 2);
406442
EXPECT_EQ(t3->commit_snapshot_id(), 4000L);
407443
EXPECT_EQ(t3->operation(), ChangelogOperation::kInsert);
408-
EXPECT_EQ(t3->data_file()->file_path, "/path/to/file_c.parquet");
444+
auto insert_t3 = std::dynamic_pointer_cast<AddedRowsScanTask>(t3);
445+
ASSERT_NE(insert_t3, nullptr);
446+
EXPECT_EQ(insert_t3->data_file()->file_path, "/path/to/file_c.parquet");
409447
}
410448

411449
TEST_P(IncrementalChangelogScanTest, DeleteFilesAreNotSupported) {

0 commit comments

Comments
 (0)