Skip to content

Commit 1ba5080

Browse files
committed
feat: Add incremental scan API with IncrementalAppendScan and IncrementalChangelogScan
1 parent 43b83c5 commit 1ba5080

6 files changed

Lines changed: 320 additions & 92 deletions

File tree

src/iceberg/table.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,16 @@ Result<std::unique_ptr<TableScanBuilder>> Table::NewScan() const {
152152
return TableScanBuilder::Make(metadata_, io_);
153153
}
154154

155+
Result<std::unique_ptr<IncrementalAppendScanBuilder>> Table::NewIncrementalAppendScan()
156+
const {
157+
return IncrementalAppendScanBuilder::Make(metadata_, io_);
158+
}
159+
160+
Result<std::unique_ptr<IncrementalChangelogScanBuilder>>
161+
Table::NewIncrementalChangelogScan() const {
162+
return IncrementalChangelogScanBuilder::Make(metadata_, io_);
163+
}
164+
155165
Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
156166
// Create a brand new transaction object for the table. Users are expected to commit the
157167
// transaction manually.

src/iceberg/table.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
129129
/// filter data.
130130
virtual Result<std::unique_ptr<TableScanBuilder>> NewScan() const;
131131

132+
/// \brief Create a new incremental append scan builder for this table
133+
virtual Result<std::unique_ptr<IncrementalAppendScanBuilder>> NewIncrementalAppendScan()
134+
const;
135+
136+
/// \brief Create a new incremental changelog scan builder for this table
137+
virtual Result<std::unique_ptr<IncrementalChangelogScanBuilder>>
138+
NewIncrementalChangelogScan() const;
139+
132140
/// \brief Create a new Transaction to commit multiple table operations at once.
133141
virtual Result<std::shared_ptr<Transaction>> NewTransaction();
134142

src/iceberg/table_scan.cc

Lines changed: 109 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -313,29 +313,6 @@ TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) {
313313
return UseSnapshot(snapshot_id);
314314
}
315315

316-
TableScanBuilder& TableScanBuilder::FromSnapshot(
317-
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) {
318-
return AddError(NotImplemented("Incremental scan is not implemented"));
319-
}
320-
321-
TableScanBuilder& TableScanBuilder::FromSnapshot([[maybe_unused]] const std::string& ref,
322-
[[maybe_unused]] bool inclusive) {
323-
return AddError(NotImplemented("Incremental scan is not implemented"));
324-
}
325-
326-
TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] int64_t to_snapshot_id) {
327-
return AddError(NotImplemented("Incremental scan is not implemented"));
328-
}
329-
330-
TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] const std::string& ref) {
331-
return AddError(NotImplemented("Incremental scan is not implemented"));
332-
}
333-
334-
TableScanBuilder& TableScanBuilder::UseBranch(const std::string& branch) {
335-
context_.branch = branch;
336-
return *this;
337-
}
338-
339316
Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
340317
TableScanBuilder::ResolveSnapshotSchema() {
341318
if (snapshot_schema_ == nullptr) {
@@ -352,18 +329,10 @@ TableScanBuilder::ResolveSnapshotSchema() {
352329
return snapshot_schema_;
353330
}
354331

355-
bool TableScanBuilder::IsIncrementalScan() const {
356-
return context_.from_snapshot_id.has_value() || context_.to_snapshot_id.has_value();
357-
}
358-
359-
Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
332+
Result<std::unique_ptr<DataTableScan>> TableScanBuilder::Build() {
360333
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
361334
ICEBERG_RETURN_UNEXPECTED(context_.Validate());
362335

363-
if (IsIncrementalScan()) {
364-
return NotImplemented("Incremental scan is not yet implemented");
365-
}
366-
367336
ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema());
368337
return DataTableScan::Make(metadata_, schema.get(), io_, std::move(context_));
369338
}
@@ -445,8 +414,7 @@ Result<std::unique_ptr<DataTableScan>> DataTableScan::Make(
445414
DataTableScan::DataTableScan(std::shared_ptr<TableMetadata> metadata,
446415
std::shared_ptr<Schema> schema, std::shared_ptr<FileIO> io,
447416
internal::TableScanContext context)
448-
: TableScan(std::move(metadata), std::move(schema), std::move(io),
449-
std::move(context)) {}
417+
: Scan(std::move(metadata), std::move(schema), std::move(io), std::move(context)) {}
450418

451419
Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() const {
452420
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, this->snapshot());
@@ -477,4 +445,111 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
477445
return manifest_group->PlanFiles();
478446
}
479447

448+
BaseIncrementalScanBuilder::BaseIncrementalScanBuilder(
449+
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io)
450+
: TableScanBuilder(std::move(metadata), std::move(io)) {}
451+
452+
BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::FromSnapshot(
453+
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) {
454+
return AddError(NotImplemented("Incremental scan is not implemented"));
455+
}
456+
457+
BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::FromSnapshot(
458+
[[maybe_unused]] const std::string& ref, [[maybe_unused]] bool inclusive) {
459+
return AddError(NotImplemented("Incremental scan is not implemented"));
460+
}
461+
462+
BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::ToSnapshot(
463+
[[maybe_unused]] int64_t to_snapshot_id) {
464+
return AddError(NotImplemented("Incremental scan is not implemented"));
465+
}
466+
467+
BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::ToSnapshot(
468+
[[maybe_unused]] const std::string& ref) {
469+
return AddError(NotImplemented("Incremental scan is not implemented"));
470+
}
471+
472+
BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::UseBranch(
473+
const std::string& branch) {
474+
context_.branch = branch;
475+
return *this;
476+
}
477+
478+
// IncrementalAppendScanBuilder implementation
479+
Result<std::unique_ptr<IncrementalAppendScanBuilder>> IncrementalAppendScanBuilder::Make(
480+
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io) {
481+
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
482+
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
483+
return std::unique_ptr<IncrementalAppendScanBuilder>(
484+
new IncrementalAppendScanBuilder(std::move(metadata), std::move(io)));
485+
}
486+
487+
IncrementalAppendScanBuilder::IncrementalAppendScanBuilder(
488+
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io)
489+
: IncrementalScanBuilder(std::move(metadata), std::move(io)) {}
490+
491+
Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScanBuilder::Build() {
492+
return NotImplemented("IncrementalAppendScanBuilder is not implemented");
493+
}
494+
495+
// IncrementalChangelogScanBuilder implementation
496+
Result<std::unique_ptr<IncrementalChangelogScanBuilder>>
497+
IncrementalChangelogScanBuilder::Make(std::shared_ptr<TableMetadata> metadata,
498+
std::shared_ptr<FileIO> io) {
499+
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
500+
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
501+
return std::unique_ptr<IncrementalChangelogScanBuilder>(
502+
new IncrementalChangelogScanBuilder(std::move(metadata), std::move(io)));
503+
}
504+
505+
IncrementalChangelogScanBuilder::IncrementalChangelogScanBuilder(
506+
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io)
507+
: IncrementalScanBuilder(std::move(metadata), std::move(io)) {}
508+
509+
Result<std::unique_ptr<IncrementalChangelogScan>>
510+
IncrementalChangelogScanBuilder::Build() {
511+
return NotImplemented("IncrementalChangelogScanBuilder is not implemented");
512+
}
513+
514+
// IncrementalAppendScan implementation
515+
516+
Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
517+
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
518+
[[maybe_unused]] std::shared_ptr<Schema> schema,
519+
[[maybe_unused]] std::shared_ptr<FileIO> io,
520+
[[maybe_unused]] internal::TableScanContext context) {
521+
return NotImplemented("IncrementalAppendScan is not implemented");
522+
}
523+
524+
IncrementalAppendScan::IncrementalAppendScan(std::shared_ptr<TableMetadata> metadata,
525+
std::shared_ptr<Schema> schema,
526+
std::shared_ptr<FileIO> io,
527+
internal::TableScanContext context)
528+
: Scan(std::move(metadata), std::move(schema), std::move(io), std::move(context)) {}
529+
530+
Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFiles()
531+
const {
532+
return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
533+
}
534+
535+
// IncrementalChangelogScan implementation
536+
537+
Result<std::unique_ptr<IncrementalChangelogScan>> IncrementalChangelogScan::Make(
538+
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
539+
[[maybe_unused]] std::shared_ptr<Schema> schema,
540+
[[maybe_unused]] std::shared_ptr<FileIO> io,
541+
[[maybe_unused]] internal::TableScanContext context) {
542+
return NotImplemented("IncrementalChangelogScan is not implemented");
543+
}
544+
545+
IncrementalChangelogScan::IncrementalChangelogScan(
546+
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
547+
std::shared_ptr<FileIO> io, internal::TableScanContext context)
548+
: Scan(std::move(metadata), std::move(schema), std::move(io), std::move(context)) {}
549+
550+
Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
551+
IncrementalChangelogScan::PlanFiles() const {
552+
return NotImplemented("IncrementalChangelogScan::PlanFiles is not implemented");
553+
}
554+
480555
} // namespace iceberg

0 commit comments

Comments
 (0)