Skip to content

Commit 3214b82

Browse files
committed
feat: Add incremental scan API with IncrementalAppendScan and IncrementalChangelogScan
1 parent 43b83c5 commit 3214b82

File tree

6 files changed

+248
-97
lines changed

6 files changed

+248
-97
lines changed

src/iceberg/table.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,16 @@ Result<std::unique_ptr<TableScanBuilder>> Table::NewScan() const {
152152
return TableScanBuilder::Make(metadata_, io_);
153153
}
154154

155+
Result<std::unique_ptr<IncrementalScanBuilder<IncrementalAppendScan>>>
156+
Table::NewIncrementalAppendScan() const {
157+
return IncrementalScanBuilder<IncrementalAppendScan>::Make(metadata_, io_);
158+
}
159+
160+
Result<std::unique_ptr<IncrementalScanBuilder<IncrementalChangelogScan>>>
161+
Table::NewIncrementalChangelogScan() const {
162+
return IncrementalScanBuilder<IncrementalChangelogScan>::Make(metadata_, io_);
163+
}
164+
155165
Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
156166
// Create a brand new transaction object for the table. Users are expected to commit the
157167
// transaction manually.

src/iceberg/table.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
129129
/// filter data.
130130
virtual Result<std::unique_ptr<TableScanBuilder>> NewScan() const;
131131

132+
/// \brief Create a new incremental append scan builder for this table
133+
virtual Result<std::unique_ptr<IncrementalScanBuilder<IncrementalAppendScan>>>
134+
NewIncrementalAppendScan() const;
135+
136+
/// \brief Create a new incremental changelog scan builder for this table
137+
virtual Result<std::unique_ptr<IncrementalScanBuilder<IncrementalChangelogScan>>>
138+
NewIncrementalChangelogScan() const;
139+
132140
/// \brief Create a new Transaction to commit multiple table operations at once.
133141
virtual Result<std::shared_ptr<Transaction>> NewTransaction();
134142

src/iceberg/table_scan.cc

Lines changed: 85 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -313,29 +313,6 @@ TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) {
313313
return UseSnapshot(snapshot_id);
314314
}
315315

316-
TableScanBuilder& TableScanBuilder::FromSnapshot(
317-
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) {
318-
return AddError(NotImplemented("Incremental scan is not implemented"));
319-
}
320-
321-
TableScanBuilder& TableScanBuilder::FromSnapshot([[maybe_unused]] const std::string& ref,
322-
[[maybe_unused]] bool inclusive) {
323-
return AddError(NotImplemented("Incremental scan is not implemented"));
324-
}
325-
326-
TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] int64_t to_snapshot_id) {
327-
return AddError(NotImplemented("Incremental scan is not implemented"));
328-
}
329-
330-
TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] const std::string& ref) {
331-
return AddError(NotImplemented("Incremental scan is not implemented"));
332-
}
333-
334-
TableScanBuilder& TableScanBuilder::UseBranch(const std::string& branch) {
335-
context_.branch = branch;
336-
return *this;
337-
}
338-
339316
Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
340317
TableScanBuilder::ResolveSnapshotSchema() {
341318
if (snapshot_schema_ == nullptr) {
@@ -352,18 +329,10 @@ TableScanBuilder::ResolveSnapshotSchema() {
352329
return snapshot_schema_;
353330
}
354331

355-
bool TableScanBuilder::IsIncrementalScan() const {
356-
return context_.from_snapshot_id.has_value() || context_.to_snapshot_id.has_value();
357-
}
358-
359-
Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
332+
Result<std::unique_ptr<DataTableScan>> TableScanBuilder::Build() {
360333
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
361334
ICEBERG_RETURN_UNEXPECTED(context_.Validate());
362335

363-
if (IsIncrementalScan()) {
364-
return NotImplemented("Incremental scan is not yet implemented");
365-
}
366-
367336
ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema());
368337
return DataTableScan::Make(metadata_, schema.get(), io_, std::move(context_));
369338
}
@@ -442,12 +411,6 @@ Result<std::unique_ptr<DataTableScan>> DataTableScan::Make(
442411
std::move(metadata), std::move(schema), std::move(io), std::move(context)));
443412
}
444413

445-
DataTableScan::DataTableScan(std::shared_ptr<TableMetadata> metadata,
446-
std::shared_ptr<Schema> schema, std::shared_ptr<FileIO> io,
447-
internal::TableScanContext context)
448-
: TableScan(std::move(metadata), std::move(schema), std::move(io),
449-
std::move(context)) {}
450-
451414
Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() const {
452415
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, this->snapshot());
453416
if (!snapshot) {
@@ -477,4 +440,88 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
477440
return manifest_group->PlanFiles();
478441
}
479442

443+
// BaseIncrementalScanBuilder implementation
444+
445+
BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::FromSnapshot(
446+
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) {
447+
return AddError(NotImplemented("Incremental scan is not implemented"));
448+
}
449+
450+
BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::FromSnapshot(
451+
[[maybe_unused]] const std::string& ref, [[maybe_unused]] bool inclusive) {
452+
return AddError(NotImplemented("Incremental scan is not implemented"));
453+
}
454+
455+
BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::ToSnapshot(
456+
[[maybe_unused]] int64_t to_snapshot_id) {
457+
return AddError(NotImplemented("Incremental scan is not implemented"));
458+
}
459+
460+
BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::ToSnapshot(
461+
[[maybe_unused]] const std::string& ref) {
462+
return AddError(NotImplemented("Incremental scan is not implemented"));
463+
}
464+
465+
BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::UseBranch(
466+
const std::string& branch) {
467+
context_.branch = branch;
468+
return *this;
469+
}
470+
471+
template <typename ScanType>
472+
Result<std::unique_ptr<IncrementalScanBuilder<ScanType>>>
473+
IncrementalScanBuilder<ScanType>::Make(std::shared_ptr<TableMetadata> metadata,
474+
std::shared_ptr<FileIO> io) {
475+
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
476+
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
477+
return std::unique_ptr<IncrementalScanBuilder<ScanType>>(
478+
new IncrementalScanBuilder<ScanType>(std::move(metadata), std::move(io)));
479+
}
480+
481+
template <typename ScanType>
482+
Result<std::unique_ptr<ScanType>> IncrementalScanBuilder<ScanType>::Build() {
483+
return NotImplemented("IncrementalAppendScanBuilder is not implemented");
484+
}
485+
486+
template class IncrementalScanBuilder<IncrementalAppendScan>;
487+
template class IncrementalScanBuilder<IncrementalChangelogScan>;
488+
489+
template <typename ScanTaskType>
490+
Result<std::vector<std::shared_ptr<ScanTaskType>>>
491+
IncrementalScan<ScanTaskType>::PlanFiles() const {
492+
return NotImplemented("Incremental scan is not implemented");
493+
}
494+
495+
// IncrementalAppendScan implementation
496+
497+
Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
498+
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
499+
[[maybe_unused]] std::shared_ptr<Schema> schema,
500+
[[maybe_unused]] std::shared_ptr<FileIO> io,
501+
[[maybe_unused]] internal::TableScanContext context) {
502+
return NotImplemented("IncrementalAppendScan is not implemented");
503+
}
504+
505+
Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFiles(
506+
std::optional<int64_t> from_snapshot_id_exclusive,
507+
int64_t to_snapshot_id_inclusive) const {
508+
return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
509+
}
510+
511+
// IncrementalChangelogScan implementation
512+
513+
Result<std::unique_ptr<IncrementalChangelogScan>> IncrementalChangelogScan::Make(
514+
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
515+
[[maybe_unused]] std::shared_ptr<Schema> schema,
516+
[[maybe_unused]] std::shared_ptr<FileIO> io,
517+
[[maybe_unused]] internal::TableScanContext context) {
518+
return NotImplemented("IncrementalChangelogScan is not implemented");
519+
}
520+
521+
Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
522+
IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_exclusive,
523+
int64_t to_snapshot_id_inclusive) const {
524+
return NotImplemented("IncrementalChangelogScan::PlanFiles is not implemented");
525+
}
526+
480527
} // namespace iceberg

0 commit comments

Comments
 (0)