Skip to content

Commit cdf05d6

Browse files
authored
feat: implement IncrementalAppendScan (#590)
1 parent 133742d commit cdf05d6

File tree

7 files changed

+1118
-144
lines changed

7 files changed

+1118
-144
lines changed

src/iceberg/manifest/manifest_list.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,9 @@ struct ICEBERG_EXPORT ManifestFile {
230230
kFirstRowIdFieldId, "first_row_id", int64(),
231231
"Starting row ID to assign to new rows in ADDED data files");
232232

233-
bool operator==(const ManifestFile& other) const = default;
233+
bool operator==(const ManifestFile& other) const {
234+
return manifest_path == other.manifest_path;
235+
}
234236

235237
static const std::shared_ptr<StructType>& Type();
236238
};
@@ -272,3 +274,12 @@ ICEBERG_EXPORT inline constexpr Result<ManifestContent> ManifestContentFromStrin
272274
}
273275

274276
} // namespace iceberg
277+
278+
namespace std {
279+
template <>
280+
struct hash<iceberg::ManifestFile> {
281+
size_t operator()(const iceberg::ManifestFile& manifest_file) const {
282+
return std::hash<std::string>{}(manifest_file.manifest_path);
283+
}
284+
};
285+
} // namespace std

src/iceberg/table_scan.cc

Lines changed: 210 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,90 @@ Status TableScanContext::Validate() const {
171171
return {};
172172
}
173173

174+
bool IsScanCurrentLineage(const TableScanContext& context) {
175+
return !context.from_snapshot_id.has_value() && !context.to_snapshot_id.has_value();
176+
}
177+
178+
Result<int64_t> ToSnapshotIdInclusive(const TableScanContext& context,
179+
const TableMetadata& metadata) {
180+
// Get the branch's current snapshot ID if branch is set
181+
std::shared_ptr<Snapshot> branch_snapshot;
182+
const std::string& branch = context.branch;
183+
if (!branch.empty()) {
184+
auto iter = metadata.refs.find(branch);
185+
ICEBERG_CHECK(iter != metadata.refs.end() && iter->second != nullptr,
186+
"Cannot find branch: {}", branch);
187+
ICEBERG_ASSIGN_OR_RAISE(branch_snapshot,
188+
metadata.SnapshotById(iter->second->snapshot_id));
189+
}
190+
191+
if (context.to_snapshot_id.has_value()) {
192+
int64_t to_snapshot_id_value = context.to_snapshot_id.value();
193+
194+
if (branch_snapshot != nullptr) {
195+
// Validate `to_snapshot_id` is on the current branch
196+
ICEBERG_ASSIGN_OR_RAISE(
197+
bool is_ancestor,
198+
SnapshotUtil::IsAncestorOf(metadata, branch_snapshot->snapshot_id,
199+
to_snapshot_id_value));
200+
ICEBERG_CHECK(is_ancestor,
201+
"End snapshot is not a valid snapshot on the current branch: {}",
202+
branch);
203+
}
204+
205+
return to_snapshot_id_value;
206+
}
207+
208+
// If to_snapshot_id is not set, use branch's current snapshot if branch is set
209+
if (branch_snapshot != nullptr) {
210+
return branch_snapshot->snapshot_id;
211+
}
212+
213+
// Get current snapshot from table's current snapshot
214+
std::shared_ptr<Snapshot> current_snapshot;
215+
ICEBERG_ASSIGN_OR_RAISE(current_snapshot, metadata.Snapshot());
216+
ICEBERG_CHECK(current_snapshot != nullptr,
217+
"End snapshot is not set and table has no current snapshot");
218+
return current_snapshot->snapshot_id;
219+
}
220+
221+
Result<std::optional<int64_t>> FromSnapshotIdExclusive(const TableScanContext& context,
222+
const TableMetadata& metadata,
223+
int64_t to_snapshot_id_inclusive) {
224+
if (!context.from_snapshot_id.has_value()) {
225+
return std::nullopt;
226+
}
227+
228+
int64_t from_snapshot_id = context.from_snapshot_id.value();
229+
230+
// Validate `from_snapshot_id` is an ancestor of `to_snapshot_id_inclusive`
231+
if (context.from_snapshot_id_inclusive) {
232+
ICEBERG_ASSIGN_OR_RAISE(
233+
bool is_ancestor,
234+
SnapshotUtil::IsAncestorOf(metadata, to_snapshot_id_inclusive, from_snapshot_id));
235+
ICEBERG_CHECK(
236+
is_ancestor,
237+
"Starting snapshot (inclusive) {} is not an ancestor of end snapshot {}",
238+
from_snapshot_id, to_snapshot_id_inclusive);
239+
240+
// For inclusive behavior, return the parent snapshot ID (can be nullopt)
241+
ICEBERG_ASSIGN_OR_RAISE(auto from_snapshot, metadata.SnapshotById(from_snapshot_id));
242+
return from_snapshot->parent_snapshot_id;
243+
}
244+
245+
// Validate there is an ancestor of `to_snapshot_id_inclusive` where parent is
246+
// `from_snapshot_id`
247+
ICEBERG_ASSIGN_OR_RAISE(bool is_parent_ancestor,
248+
SnapshotUtil::IsParentAncestorOf(
249+
metadata, to_snapshot_id_inclusive, from_snapshot_id));
250+
ICEBERG_CHECK(
251+
is_parent_ancestor,
252+
"Starting snapshot (exclusive) {} is not a parent ancestor of end snapshot {}",
253+
from_snapshot_id, to_snapshot_id_inclusive);
254+
255+
return from_snapshot_id;
256+
}
257+
174258
} // namespace internal
175259

176260
ScanTask::~ScanTask() = default;
@@ -340,10 +424,15 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::AsOfTime(
340424

341425
template <typename ScanType>
342426
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
343-
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive)
427+
int64_t from_snapshot_id, bool inclusive)
344428
requires IsIncrementalScan<ScanType>
345429
{
346-
AddError(NotImplemented("Incremental scan is not implemented"));
430+
if (inclusive) {
431+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore,
432+
metadata_->SnapshotById(from_snapshot_id));
433+
}
434+
this->context_.from_snapshot_id = from_snapshot_id;
435+
this->context_.from_snapshot_id_inclusive = inclusive;
347436
return *this;
348437
}
349438

@@ -352,31 +441,45 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
352441
const std::string& ref, bool inclusive)
353442
requires IsIncrementalScan<ScanType>
354443
{
355-
AddError(NotImplemented("Incremental scan is not implemented"));
356-
return *this;
444+
auto iter = metadata_->refs.find(ref);
445+
ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", ref);
446+
ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
447+
ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kTag,
448+
"Ref {} is not a tag", ref);
449+
return FromSnapshot(iter->second->snapshot_id, inclusive);
357450
}
358451

359452
template <typename ScanType>
360453
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(int64_t to_snapshot_id)
361454
requires IsIncrementalScan<ScanType>
362455
{
363-
AddError(NotImplemented("Incremental scan is not implemented"));
456+
ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, metadata_->SnapshotById(to_snapshot_id));
457+
context_.to_snapshot_id = to_snapshot_id;
364458
return *this;
365459
}
366460

367461
template <typename ScanType>
368462
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(const std::string& ref)
369463
requires IsIncrementalScan<ScanType>
370464
{
371-
AddError(NotImplemented("Incremental scan is not implemented"));
372-
return *this;
465+
auto iter = metadata_->refs.find(ref);
466+
ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", ref);
467+
ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
468+
ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kTag,
469+
"Ref {} is not a tag", ref);
470+
return ToSnapshot(iter->second->snapshot_id);
373471
}
374472

375473
template <typename ScanType>
376474
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseBranch(
377475
const std::string& branch)
378476
requires IsIncrementalScan<ScanType>
379477
{
478+
auto iter = metadata_->refs.find(branch);
479+
ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", branch);
480+
ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", branch);
481+
ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kBranch,
482+
"Ref {} is not a branch", branch);
380483
context_.branch = branch;
381484
return *this;
382485
}
@@ -536,20 +639,109 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
536639
return manifest_group->PlanFiles();
537640
}
538641

642+
// Friend function template for IncrementalScan that implements the shared PlanFiles
643+
// logic. It resolves the from/to snapshot range from the scan context and delegates
644+
// to the two-arg virtual PlanFiles() override in the concrete subclass.
645+
// Defined as a friend to access the protected two-arg PlanFiles().
646+
template <typename ScanTaskType>
647+
Result<std::vector<std::shared_ptr<ScanTaskType>>> ResolvePlanFiles(
648+
const IncrementalScan<ScanTaskType>& scan) {
649+
if (IsScanCurrentLineage(scan.context())) {
650+
if (scan.metadata()->current_snapshot_id == kInvalidSnapshotId) {
651+
return std::vector<std::shared_ptr<ScanTaskType>>{};
652+
}
653+
}
654+
655+
ICEBERG_ASSIGN_OR_RAISE(
656+
int64_t to_snapshot_id_inclusive,
657+
internal::ToSnapshotIdInclusive(scan.context(), *scan.metadata()));
658+
ICEBERG_ASSIGN_OR_RAISE(
659+
std::optional<int64_t> from_snapshot_id_exclusive,
660+
internal::FromSnapshotIdExclusive(scan.context(), *scan.metadata(),
661+
to_snapshot_id_inclusive));
662+
663+
return scan.PlanFiles(from_snapshot_id_exclusive, to_snapshot_id_inclusive);
664+
}
665+
539666
// IncrementalAppendScan implementation
540667

541668
Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
542-
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
543-
[[maybe_unused]] std::shared_ptr<Schema> schema,
544-
[[maybe_unused]] std::shared_ptr<FileIO> io,
545-
[[maybe_unused]] internal::TableScanContext context) {
546-
return NotImplemented("IncrementalAppendScan is not implemented");
669+
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
670+
std::shared_ptr<FileIO> io, internal::TableScanContext context) {
671+
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
672+
ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
673+
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
674+
return std::unique_ptr<IncrementalAppendScan>(new IncrementalAppendScan(
675+
std::move(metadata), std::move(schema), std::move(io), std::move(context)));
676+
}
677+
678+
Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFiles()
679+
const {
680+
return ResolvePlanFiles<FileScanTask>(*this);
547681
}
548682

549683
Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFiles(
550684
std::optional<int64_t> from_snapshot_id_exclusive,
551685
int64_t to_snapshot_id_inclusive) const {
552-
return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
686+
ICEBERG_ASSIGN_OR_RAISE(
687+
auto ancestors_snapshots,
688+
SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
689+
from_snapshot_id_exclusive));
690+
691+
std::vector<std::shared_ptr<Snapshot>> append_snapshots;
692+
std::ranges::copy_if(ancestors_snapshots, std::back_inserter(append_snapshots),
693+
[](const auto& snapshot) {
694+
return snapshot != nullptr &&
695+
snapshot->Operation().has_value() &&
696+
snapshot->Operation().value() == DataOperation::kAppend;
697+
});
698+
if (append_snapshots.empty()) {
699+
return std::vector<std::shared_ptr<FileScanTask>>{};
700+
}
701+
702+
std::unordered_set<int64_t> snapshot_ids;
703+
std::ranges::transform(append_snapshots,
704+
std::inserter(snapshot_ids, snapshot_ids.end()),
705+
[](const auto& snapshot) { return snapshot->snapshot_id; });
706+
707+
std::unordered_set<ManifestFile> data_manifests;
708+
for (const auto& snapshot : append_snapshots) {
709+
SnapshotCache snapshot_cache(snapshot.get());
710+
ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot_cache.DataManifests(io_));
711+
std::ranges::copy_if(manifests, std::inserter(data_manifests, data_manifests.end()),
712+
[&snapshot_ids](const ManifestFile& manifest) {
713+
return snapshot_ids.contains(manifest.added_snapshot_id);
714+
});
715+
}
716+
if (data_manifests.empty()) {
717+
return std::vector<std::shared_ptr<FileScanTask>>{};
718+
}
719+
720+
TableMetadataCache metadata_cache(metadata_.get());
721+
ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById());
722+
723+
ICEBERG_ASSIGN_OR_RAISE(
724+
auto manifest_group,
725+
ManifestGroup::Make(
726+
io_, schema_, specs_by_id,
727+
std::vector<ManifestFile>(data_manifests.begin(), data_manifests.end()), {}));
728+
729+
manifest_group->CaseSensitive(context_.case_sensitive)
730+
.Select(ScanColumns())
731+
.FilterData(filter())
732+
.FilterManifestEntries([&snapshot_ids](const ManifestEntry& entry) {
733+
return entry.snapshot_id.has_value() &&
734+
snapshot_ids.contains(entry.snapshot_id.value()) &&
735+
entry.status == ManifestStatus::kAdded;
736+
})
737+
.IgnoreDeleted()
738+
.ColumnsToKeepStats(context_.columns_to_keep_stats);
739+
740+
if (context_.ignore_residuals) {
741+
manifest_group->IgnoreResiduals();
742+
}
743+
744+
return manifest_group->PlanFiles();
553745
}
554746

555747
// IncrementalChangelogScan implementation
@@ -562,6 +754,11 @@ Result<std::unique_ptr<IncrementalChangelogScan>> IncrementalChangelogScan::Make
562754
return NotImplemented("IncrementalChangelogScan is not implemented");
563755
}
564756

757+
Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
758+
IncrementalChangelogScan::PlanFiles() const {
759+
return ResolvePlanFiles<ChangelogScanTask>(*this);
760+
}
761+
565762
Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
566763
IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_exclusive,
567764
int64_t to_snapshot_id_inclusive) const {

src/iceberg/table_scan.h

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include "iceberg/arrow_c_data.h"
3131
#include "iceberg/result.h"
32+
#include "iceberg/table_metadata.h"
3233
#include "iceberg/type_fwd.h"
3334
#include "iceberg/util/error_collector.h"
3435

@@ -355,22 +356,23 @@ class ICEBERG_EXPORT DataTableScan : public TableScan {
355356
/// \brief A base template class for incremental scans that read changes between
356357
/// snapshots, and return scan tasks of the specified type.
357358
template <typename ScanTaskType>
358-
class ICEBERG_EXPORT IncrementalScan : public TableScan {
359+
class IncrementalScan : public TableScan {
359360
public:
360361
~IncrementalScan() override = default;
361362

362-
/// \brief Plans the scan tasks by resolving manifests and data files.
363-
/// \return A Result containing scan tasks or an error.
364-
Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles() const {
365-
return NotImplemented("IncrementalScan::PlanFiles is not implemented");
366-
}
363+
virtual Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles() const = 0;
367364

368365
protected:
369366
virtual Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles(
370367
std::optional<int64_t> from_snapshot_id_exclusive,
371368
int64_t to_snapshot_id_inclusive) const = 0;
372369

373370
using TableScan::TableScan;
371+
372+
// Allow the free function ResolvePlanFiles to access protected members.
373+
template <typename T>
374+
friend Result<std::vector<std::shared_ptr<T>>> ResolvePlanFiles(
375+
const IncrementalScan<T>& scan);
374376
};
375377

376378
/// \brief A scan that reads data files added between snapshots (incremental appends).
@@ -383,6 +385,8 @@ class ICEBERG_EXPORT IncrementalAppendScan : public IncrementalScan<FileScanTask
383385

384386
~IncrementalAppendScan() override = default;
385387

388+
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const override;
389+
386390
protected:
387391
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles(
388392
std::optional<int64_t> from_snapshot_id_exclusive,
@@ -402,6 +406,8 @@ class ICEBERG_EXPORT IncrementalChangelogScan
402406

403407
~IncrementalChangelogScan() override = default;
404408

409+
Result<std::vector<std::shared_ptr<ChangelogScanTask>>> PlanFiles() const override;
410+
405411
protected:
406412
Result<std::vector<std::shared_ptr<ChangelogScanTask>>> PlanFiles(
407413
std::optional<int64_t> from_snapshot_id_exclusive,

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ if(ICEBERG_BUILD_BUNDLE)
173173
USE_BUNDLE
174174
SOURCES
175175
file_scan_task_test.cc
176+
incremental_append_scan_test.cc
176177
table_scan_test.cc)
177178

178179
add_iceberg_test(table_update_test

0 commit comments

Comments
 (0)