Skip to content

Commit 1c908dd

Browse files
committed
feat: Impl Incremental changelog scan
1 parent 5666e67 commit 1c908dd

File tree

5 files changed

+802
-10
lines changed

5 files changed

+802
-10
lines changed

src/iceberg/table_scan.cc

Lines changed: 147 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@
2424

2525
#include "iceberg/expression/binder.h"
2626
#include "iceberg/expression/expression.h"
27+
#include "iceberg/expression/residual_evaluator.h"
2728
#include "iceberg/file_reader.h"
2829
#include "iceberg/manifest/manifest_entry.h"
2930
#include "iceberg/manifest/manifest_group.h"
3031
#include "iceberg/result.h"
3132
#include "iceberg/schema.h"
3233
#include "iceberg/snapshot.h"
3334
#include "iceberg/table_metadata.h"
35+
#include "iceberg/util/content_file_util.h"
3436
#include "iceberg/util/macros.h"
3537
#include "iceberg/util/snapshot_util_internal.h"
3638
#include "iceberg/util/timepoint.h"
@@ -294,6 +296,24 @@ Result<ArrowArrayStream> FileScanTask::ToArrow(
294296
return MakeArrowArrayStream(std::move(reader));
295297
}
296298

299+
// ChangelogScanTask implementation
300+
301+
int64_t ChangelogScanTask::size_bytes() const {
302+
int64_t total_size = data_file_->file_size_in_bytes;
303+
for (const auto& delete_file : delete_files_) {
304+
total_size +=
305+
(delete_file->IsDeletionVector() ? delete_file->content_size_in_bytes.value_or(0)
306+
: delete_file->file_size_in_bytes);
307+
}
308+
return total_size;
309+
}
310+
311+
int32_t ChangelogScanTask::files_count() const { return 1 + delete_files_.size(); }
312+
313+
int64_t ChangelogScanTask::estimated_row_count() const {
314+
return data_file_->record_count;
315+
}
316+
297317
// Generic template implementation for Make
298318
template <typename ScanType>
299319
Result<std::unique_ptr<TableScanBuilder<ScanType>>> TableScanBuilder<ScanType>::Make(
@@ -747,11 +767,13 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFi
747767
// IncrementalChangelogScan implementation
748768

749769
Result<std::unique_ptr<IncrementalChangelogScan>> IncrementalChangelogScan::Make(
750-
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
751-
[[maybe_unused]] std::shared_ptr<Schema> schema,
752-
[[maybe_unused]] std::shared_ptr<FileIO> io,
753-
[[maybe_unused]] internal::TableScanContext context) {
754-
return NotImplemented("IncrementalChangelogScan is not implemented");
770+
std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
771+
std::shared_ptr<FileIO> io, internal::TableScanContext context) {
772+
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
773+
ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
774+
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
775+
return std::unique_ptr<IncrementalChangelogScan>(new IncrementalChangelogScan(
776+
std::move(metadata), std::move(schema), std::move(io), std::move(context)));
755777
}
756778

757779
Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
@@ -762,7 +784,126 @@ IncrementalChangelogScan::PlanFiles() const {
762784
Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
763785
IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_exclusive,
764786
int64_t to_snapshot_id_inclusive) const {
765-
return NotImplemented("IncrementalChangelogScan::PlanFiles is not implemented");
787+
ICEBERG_ASSIGN_OR_RAISE(
788+
auto ancestors_snapshots,
789+
SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
790+
from_snapshot_id_exclusive));
791+
792+
std::vector<std::pair<std::shared_ptr<Snapshot>, std::unique_ptr<SnapshotCache>>>
793+
changelog_snapshots;
794+
795+
for (const auto& snapshot : std::ranges::reverse_view(ancestors_snapshots)) {
796+
auto operation = snapshot->Operation();
797+
if (!operation.has_value() || operation.value() != DataOperation::kReplace) {
798+
auto snapshot_cache = std::make_unique<SnapshotCache>(snapshot.get());
799+
ICEBERG_ASSIGN_OR_RAISE(auto delete_manifests,
800+
snapshot_cache->DeleteManifests(io_));
801+
if (!delete_manifests.empty()) {
802+
return NotSupported(
803+
"Delete files are currently not supported in changelog scans");
804+
}
805+
changelog_snapshots.emplace_back(snapshot, std::move(snapshot_cache));
806+
}
807+
}
808+
if (changelog_snapshots.empty()) {
809+
return std::vector<std::shared_ptr<ChangelogScanTask>>{};
810+
}
811+
812+
std::unordered_set<int64_t> snapshot_ids;
813+
std::unordered_map<int64_t, int32_t> snapshot_ordinals;
814+
int32_t ordinal = 0;
815+
for (const auto& snapshot : changelog_snapshots) {
816+
snapshot_ids.insert(snapshot.first->snapshot_id);
817+
snapshot_ordinals[snapshot.first->snapshot_id] = ordinal++;
818+
}
819+
820+
std::vector<ManifestFile> data_manifests;
821+
std::unordered_set<std::string> seen_manifest_paths;
822+
for (const auto& snapshot : changelog_snapshots) {
823+
ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot.second->DataManifests(io_));
824+
for (auto& manifest : manifests) {
825+
if (manifest.added_snapshot_id == snapshot.first->snapshot_id &&
826+
seen_manifest_paths.insert(manifest.manifest_path).second) {
827+
data_manifests.push_back(manifest);
828+
}
829+
}
830+
}
831+
if (data_manifests.empty()) {
832+
return std::vector<std::shared_ptr<ChangelogScanTask>>{};
833+
}
834+
835+
TableMetadataCache metadata_cache(metadata_.get());
836+
ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById());
837+
838+
ICEBERG_ASSIGN_OR_RAISE(
839+
auto manifest_group,
840+
ManifestGroup::Make(io_, schema_, specs_by_id, std::move(data_manifests), {}));
841+
842+
manifest_group->CaseSensitive(context_.case_sensitive)
843+
.Select(ScanColumns())
844+
.FilterData(filter())
845+
.FilterManifestEntries([&snapshot_ids](const ManifestEntry& entry) {
846+
return entry.snapshot_id.has_value() &&
847+
snapshot_ids.contains(entry.snapshot_id.value());
848+
})
849+
.IgnoreExisting()
850+
.ColumnsToKeepStats(context_.columns_to_keep_stats);
851+
852+
if (context_.ignore_residuals) {
853+
manifest_group->IgnoreResiduals();
854+
}
855+
856+
auto create_tasks_func =
857+
[&snapshot_ordinals](
858+
std::vector<ManifestEntry>&& entries,
859+
const TaskContext& ctx) -> Result<std::vector<std::shared_ptr<ScanTask>>> {
860+
std::vector<std::shared_ptr<ScanTask>> tasks;
861+
tasks.reserve(entries.size());
862+
863+
for (auto& entry : entries) {
864+
if (!entry.snapshot_id.has_value() || entry.data_file == nullptr) {
865+
continue;
866+
}
867+
868+
int64_t commit_snapshot_id = entry.snapshot_id.value();
869+
auto ordinal_it = snapshot_ordinals.find(commit_snapshot_id);
870+
if (ordinal_it == snapshot_ordinals.end()) {
871+
continue;
872+
}
873+
int32_t change_ordinal = ordinal_it->second;
874+
875+
if (ctx.drop_stats) {
876+
ContentFileUtil::DropAllStats(*entry.data_file);
877+
} else if (!ctx.columns_to_keep_stats.empty()) {
878+
ContentFileUtil::DropUnselectedStats(*entry.data_file, ctx.columns_to_keep_stats);
879+
}
880+
881+
ICEBERG_ASSIGN_OR_RAISE(auto residual,
882+
ctx.residuals->ResidualFor(entry.data_file->partition));
883+
884+
switch (entry.status) {
885+
case ManifestStatus::kAdded:
886+
tasks.push_back(std::make_shared<AddedRowsScanTask>(
887+
change_ordinal, commit_snapshot_id, std::move(entry.data_file),
888+
std::vector<std::shared_ptr<DataFile>>{}, std::move(residual)));
889+
break;
890+
case ManifestStatus::kDeleted:
891+
tasks.push_back(std::make_shared<DeletedDataFileScanTask>(
892+
change_ordinal, commit_snapshot_id, std::move(entry.data_file),
893+
std::vector<std::shared_ptr<DataFile>>{}, std::move(residual)));
894+
break;
895+
case ManifestStatus::kExisting:
896+
return InvalidArgument("Unexpected entry status: EXISTING");
897+
}
898+
}
899+
return tasks;
900+
};
901+
902+
ICEBERG_ASSIGN_OR_RAISE(auto tasks, manifest_group->Plan(create_tasks_func));
903+
return tasks | std::views::transform([](const auto& task) {
904+
return std::static_pointer_cast<ChangelogScanTask>(task);
905+
}) |
906+
std::ranges::to<std::vector>();
766907
}
767908

768909
} // namespace iceberg

src/iceberg/table_scan.h

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,87 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
102102
std::shared_ptr<Expression> residual_filter_;
103103
};
104104

105+
enum class ChangelogOperation : uint8_t {
106+
kInsert,
107+
kDelete,
108+
kUpdateBefore,
109+
kUpdateAfter,
110+
};
111+
105112
/// \brief A scan task for reading changelog entries between snapshots.
106113
class ICEBERG_EXPORT ChangelogScanTask : public ScanTask {
107114
public:
115+
/// \brief Construct an AddedRowsScanTask.
116+
///
117+
/// \param change_ordinal Position in the changelog order (0-based).
118+
/// \param commit_snapshot_id The snapshot ID that committed this change.
119+
/// \param data_file The data file containing the added rows.
120+
/// \param delete_files Delete files that apply to this data file.
121+
/// \param residual_filter Optional residual filter to apply after reading.
122+
ChangelogScanTask(int32_t change_ordinal, int64_t commit_snapshot_id,
123+
std::shared_ptr<DataFile> data_file,
124+
std::vector<std::shared_ptr<DataFile>> delete_files = {},
125+
std::shared_ptr<Expression> residual_filter = nullptr)
126+
: change_ordinal_(change_ordinal),
127+
commit_snapshot_id_(commit_snapshot_id),
128+
data_file_(std::move(data_file)),
129+
delete_files_(std::move(delete_files)),
130+
residual_filter_(std::move(residual_filter)) {}
131+
108132
Kind kind() const override { return Kind::kChangelogScanTask; }
109-
// TODO(): Return actual values once member fields are implemented
110-
int64_t size_bytes() const override { return 0; }
111-
int32_t files_count() const override { return 0; }
112-
int64_t estimated_row_count() const override { return 0; }
133+
134+
int64_t size_bytes() const override;
135+
int32_t files_count() const override;
136+
int64_t estimated_row_count() const override;
137+
138+
virtual ChangelogOperation operation() const = 0;
139+
140+
/// \brief The position of this change in the changelog order (0-based).
141+
virtual int32_t change_ordinal() const { return change_ordinal_; }
142+
143+
/// \brief The snapshot ID that committed this change.
144+
virtual int64_t commit_snapshot_id() const { return commit_snapshot_id_; }
145+
146+
/// \brief The data file containing the added rows.
147+
const std::shared_ptr<DataFile>& data_file() const { return data_file_; }
148+
149+
/// \brief Delete files that apply to this data file.
150+
const std::vector<std::shared_ptr<DataFile>>& delete_files() const {
151+
return delete_files_;
152+
}
153+
154+
/// \brief Residual filter to apply after reading.
155+
const std::shared_ptr<Expression>& residual_filter() const { return residual_filter_; }
156+
157+
protected:
158+
int32_t change_ordinal_;
159+
int64_t commit_snapshot_id_;
160+
std::shared_ptr<DataFile> data_file_;
161+
std::vector<std::shared_ptr<DataFile>> delete_files_;
162+
std::shared_ptr<Expression> residual_filter_;
163+
};
164+
165+
/// \brief A scan task for reading rows that were added between snapshots.
166+
///
167+
/// This task represents data files that were added to the table, along with any
168+
/// delete files that should be applied when reading the data.
169+
class ICEBERG_EXPORT AddedRowsScanTask : public ChangelogScanTask {
170+
public:
171+
using ChangelogScanTask::ChangelogScanTask;
172+
173+
ChangelogOperation operation() const override { return ChangelogOperation::kInsert; }
174+
};
175+
176+
/// \brief A scan task for reading data files that were deleted between snapshots.
177+
///
178+
/// This task represents data files that were removed from the table. Unlike
179+
/// AddedRowsScanTask, delete files are not applicable here since the entire
180+
/// data file was deleted.
181+
class ICEBERG_EXPORT DeletedDataFileScanTask : public ChangelogScanTask {
182+
public:
183+
using ChangelogScanTask::ChangelogScanTask;
184+
185+
ChangelogOperation operation() const override { return ChangelogOperation::kDelete; }
113186
};
114187

115188
namespace internal {
@@ -133,8 +206,24 @@ struct TableScanContext {
133206

134207
// Validate the context parameters to see if they have conflicts.
135208
[[nodiscard]] Status Validate() const;
209+
210+
/// \brief Returns true if this scan is a current lineage scan, which means it does not
211+
/// specify from/to snapshot IDs.
212+
bool IsScanCurrentLineage() const;
213+
214+
/// \brief Get the snapshot ID to scan up to (inclusive) based on the context.
215+
Result<int64_t> ToSnapshotIdInclusive(const TableMetadata& metadata) const;
216+
217+
/// \brief Get the snapshot ID to scan from (exclusive) based on the context.
218+
Result<std::optional<int64_t>> FromSnapshotIdExclusive(
219+
const TableMetadata& metadata, int64_t to_snapshot_id_inclusive) const;
136220
};
137221

222+
// Internal validation functions for IncrementalScanBuilder
223+
Status CheckSnapshotValid(const TableMetadata& metadata, int64_t snapshot_id);
224+
Result<int64_t> CheckRefValid(const TableMetadata& metadata, const std::string& ref);
225+
Status CheckBranchValid(const TableMetadata& metadata, const std::string& branch);
226+
138227
} // namespace internal
139228

140229
// Concept to check if a type is an incremental scan

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ if(ICEBERG_BUILD_BUNDLE)
176176
SOURCES
177177
file_scan_task_test.cc
178178
incremental_append_scan_test.cc
179+
incremental_changelog_scan_test.cc
179180
table_scan_test.cc)
180181

181182
add_iceberg_test(table_update_test

0 commit comments

Comments
 (0)