Skip to content

Commit 9d73001

Browse files
committed
feat: Add incremental scan API with IncrementalAppendScan and IncrementalChangelogScan
1 parent 43b83c5 commit 9d73001

File tree

6 files changed

+355
-97
lines changed

6 files changed

+355
-97
lines changed

src/iceberg/table.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,16 @@ Result<std::unique_ptr<TableScanBuilder>> Table::NewScan() const {
152152
return TableScanBuilder::Make(metadata_, io_);
153153
}
154154

155+
Result<std::unique_ptr<IncrementalAppendScanBuilder>> Table::NewIncrementalAppendScan()
156+
const {
157+
return IncrementalAppendScanBuilder::Make(metadata_, io_);
158+
}
159+
160+
Result<std::unique_ptr<IncrementalChangelogScanBuilder>>
161+
Table::NewIncrementalChangelogScan() const {
162+
return IncrementalChangelogScanBuilder::Make(metadata_, io_);
163+
}
164+
155165
Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
156166
// Create a brand new transaction object for the table. Users are expected to commit the
157167
// transaction manually.

src/iceberg/table.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
129129
/// filter data.
130130
virtual Result<std::unique_ptr<TableScanBuilder>> NewScan() const;
131131

132+
/// \brief Create a new incremental append scan builder for this table
133+
virtual Result<std::unique_ptr<IncrementalAppendScanBuilder>> NewIncrementalAppendScan()
134+
const;
135+
136+
/// \brief Create a new incremental changelog scan builder for this table
137+
virtual Result<std::unique_ptr<IncrementalChangelogScanBuilder>>
138+
NewIncrementalChangelogScan() const;
139+
132140
/// \brief Create a new Transaction to commit multiple table operations at once.
133141
virtual Result<std::shared_ptr<Transaction>> NewTransaction();
134142

src/iceberg/table_scan.cc

Lines changed: 69 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -313,29 +313,6 @@ TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) {
313313
return UseSnapshot(snapshot_id);
314314
}
315315

316-
TableScanBuilder& TableScanBuilder::FromSnapshot(
317-
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) {
318-
return AddError(NotImplemented("Incremental scan is not implemented"));
319-
}
320-
321-
TableScanBuilder& TableScanBuilder::FromSnapshot([[maybe_unused]] const std::string& ref,
322-
[[maybe_unused]] bool inclusive) {
323-
return AddError(NotImplemented("Incremental scan is not implemented"));
324-
}
325-
326-
TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] int64_t to_snapshot_id) {
327-
return AddError(NotImplemented("Incremental scan is not implemented"));
328-
}
329-
330-
TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] const std::string& ref) {
331-
return AddError(NotImplemented("Incremental scan is not implemented"));
332-
}
333-
334-
TableScanBuilder& TableScanBuilder::UseBranch(const std::string& branch) {
335-
context_.branch = branch;
336-
return *this;
337-
}
338-
339316
Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
340317
TableScanBuilder::ResolveSnapshotSchema() {
341318
if (snapshot_schema_ == nullptr) {
@@ -352,18 +329,10 @@ TableScanBuilder::ResolveSnapshotSchema() {
352329
return snapshot_schema_;
353330
}
354331

355-
bool TableScanBuilder::IsIncrementalScan() const {
356-
return context_.from_snapshot_id.has_value() || context_.to_snapshot_id.has_value();
357-
}
358-
359-
Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
332+
Result<std::unique_ptr<DataTableScan>> TableScanBuilder::Build() {
360333
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
361334
ICEBERG_RETURN_UNEXPECTED(context_.Validate());
362335

363-
if (IsIncrementalScan()) {
364-
return NotImplemented("Incremental scan is not yet implemented");
365-
}
366-
367336
ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema());
368337
return DataTableScan::Make(metadata_, schema.get(), io_, std::move(context_));
369338
}
@@ -442,12 +411,6 @@ Result<std::unique_ptr<DataTableScan>> DataTableScan::Make(
442411
std::move(metadata), std::move(schema), std::move(io), std::move(context)));
443412
}
444413

445-
DataTableScan::DataTableScan(std::shared_ptr<TableMetadata> metadata,
446-
std::shared_ptr<Schema> schema, std::shared_ptr<FileIO> io,
447-
internal::TableScanContext context)
448-
: TableScan(std::move(metadata), std::move(schema), std::move(io),
449-
std::move(context)) {}
450-
451414
Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() const {
452415
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, this->snapshot());
453416
if (!snapshot) {
@@ -477,4 +440,72 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
477440
return manifest_group->PlanFiles();
478441
}
479442

443+
template <typename ScanTaskType>
444+
Result<std::vector<std::shared_ptr<ScanTaskType>>>
445+
IncrementalScan<ScanTaskType>::PlanFiles() const {
446+
return NotImplemented("IncrementalScan::PlanFiles is not implemented");
447+
}
448+
449+
// IncrementalAppendScan implementation
450+
451+
Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
452+
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
453+
[[maybe_unused]] std::shared_ptr<Schema> schema,
454+
[[maybe_unused]] std::shared_ptr<FileIO> io,
455+
[[maybe_unused]] internal::TableScanContext context) {
456+
return NotImplemented("IncrementalAppendScan is not implemented");
457+
}
458+
459+
Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFiles(
460+
std::optional<int64_t> from_snapshot_id_exclusive,
461+
int64_t to_snapshot_id_inclusive) const {
462+
return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
463+
}
464+
465+
// IncrementalChangelogScan implementation
466+
467+
Result<std::unique_ptr<IncrementalChangelogScan>> IncrementalChangelogScan::Make(
468+
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
469+
[[maybe_unused]] std::shared_ptr<Schema> schema,
470+
[[maybe_unused]] std::shared_ptr<FileIO> io,
471+
[[maybe_unused]] internal::TableScanContext context) {
472+
return NotImplemented("IncrementalChangelogScan is not implemented");
473+
}
474+
475+
Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
476+
IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_exclusive,
477+
int64_t to_snapshot_id_inclusive) const {
478+
return NotImplemented("IncrementalChangelogScan::PlanFiles is not implemented");
479+
}
480+
481+
// IncrementalAppendScanBuilder implementation
482+
483+
Result<std::unique_ptr<IncrementalAppendScanBuilder>> IncrementalAppendScanBuilder::Make(
484+
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io) {
485+
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
486+
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
487+
return std::unique_ptr<IncrementalAppendScanBuilder>(
488+
new IncrementalAppendScanBuilder(std::move(metadata), std::move(io)));
489+
}
490+
491+
Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScanBuilder::Build() {
492+
return NotImplemented("IncrementalAppendScanBuilder is not implemented");
493+
}
494+
495+
// IncrementalChangelogScanBuilder implementation
496+
497+
Result<std::unique_ptr<IncrementalChangelogScanBuilder>>
498+
IncrementalChangelogScanBuilder::Make(std::shared_ptr<TableMetadata> metadata,
499+
std::shared_ptr<FileIO> io) {
500+
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
501+
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
502+
return std::unique_ptr<IncrementalChangelogScanBuilder>(
503+
new IncrementalChangelogScanBuilder(std::move(metadata), std::move(io)));
504+
}
505+
506+
Result<std::unique_ptr<IncrementalChangelogScan>>
507+
IncrementalChangelogScanBuilder::Build() {
508+
return NotImplemented("IncrementalChangelogScanBuilder is not implemented");
509+
}
510+
480511
} // namespace iceberg

0 commit comments

Comments
 (0)