Skip to content

Commit 988f363

Browse files
authored
feat: parallelize update and scan processing (apache#770)
1 parent 3c9d13d commit 988f363

14 files changed

Lines changed: 649 additions & 347 deletions

src/iceberg/delete_file_index.cc

Lines changed: 132 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
#include <algorithm>
2323
#include <cstdint>
2424
#include <iterator>
25+
#include <mutex>
2526
#include <ranges>
27+
#include <shared_mutex>
2628
#include <vector>
2729

2830
#include "iceberg/expression/expression.h"
@@ -37,6 +39,7 @@
3739
#include "iceberg/schema.h"
3840
#include "iceberg/util/checked_cast.h"
3941
#include "iceberg/util/content_file_util.h"
42+
#include "iceberg/util/executor_util_internal.h"
4043
#include "iceberg/util/macros.h"
4144

4245
namespace iceberg {
@@ -528,107 +531,153 @@ DeleteFileIndex::Builder& DeleteFileIndex::Builder::IgnoreResiduals() {
528531
return *this;
529532
}
530533

534+
DeleteFileIndex::Builder& DeleteFileIndex::Builder::PlanWith(OptionalExecutor executor) {
535+
executor_ = executor;
536+
return *this;
537+
}
538+
531539
Result<std::vector<ManifestEntry>> DeleteFileIndex::Builder::LoadDeleteFiles() {
532-
// Build expression caches per spec ID
533-
std::unordered_map<int32_t, std::shared_ptr<Expression>> part_expr_cache;
540+
// TODO(zehua): Replace with a thread-safe LRU cache.
541+
std::shared_mutex projected_expr_cache_mutex;
542+
std::unordered_map<int32_t, std::shared_ptr<Expression>> projected_expr_cache;
543+
std::shared_mutex eval_cache_mutex;
534544
std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>> eval_cache;
535545

536546
auto data_filter = ignore_residuals_ ? True::Instance() : data_filter_;
537547

538-
// Filter and read manifests into manifest entries
539-
std::vector<ManifestEntry> files;
540-
for (const auto& manifest : delete_manifests_) {
541-
if (manifest.content != ManifestContent::kDeletes) {
542-
continue;
548+
auto and_filters =
549+
[](std::shared_ptr<Expression> left,
550+
std::shared_ptr<Expression> right) -> Result<std::shared_ptr<Expression>> {
551+
if (left && right) {
552+
return And::MakeFolded(std::move(left), std::move(right));
543553
}
544-
if (!manifest.has_added_files() && !manifest.has_existing_files()) {
545-
continue;
554+
return right ? std::move(right) : std::move(left);
555+
};
556+
557+
auto get_projected_expr = [&](int32_t spec_id,
558+
const std::shared_ptr<PartitionSpec>& spec)
559+
-> Result<std::shared_ptr<Expression>> {
560+
if (!data_filter_) {
561+
return std::shared_ptr<Expression>();
546562
}
547563

548-
const int32_t spec_id = manifest.partition_spec_id;
549-
auto spec_iter = specs_by_id_.find(spec_id);
550-
ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
551-
"Partition spec ID {} not found when loading delete files", spec_id);
552-
553-
const auto& spec = spec_iter->second;
554-
555-
// Get or compute projected partition expression
556-
if (!part_expr_cache.contains(spec_id) && data_filter_) {
557-
auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_);
558-
ICEBERG_ASSIGN_OR_RAISE(auto projected, projector->Project(data_filter_));
559-
part_expr_cache[spec_id] = std::move(projected);
564+
{
565+
std::shared_lock lock(projected_expr_cache_mutex);
566+
auto iter = projected_expr_cache.find(spec_id);
567+
if (iter != projected_expr_cache.end()) {
568+
return iter->second;
569+
}
560570
}
561571

562-
// Get or create manifest evaluator
563-
if (!eval_cache.contains(spec_id)) {
564-
auto filter = partition_filter_;
565-
if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) {
566-
if (filter) {
567-
ICEBERG_ASSIGN_OR_RAISE(filter, And::Make(filter, it->second));
568-
} else {
569-
filter = it->second;
570-
}
571-
}
572-
if (filter) {
573-
ICEBERG_ASSIGN_OR_RAISE(auto evaluator,
574-
ManifestEvaluator::MakePartitionFilter(
575-
std::move(filter), spec, *schema_, case_sensitive_));
576-
eval_cache[spec_id] = std::move(evaluator);
577-
}
572+
std::lock_guard lock(projected_expr_cache_mutex);
573+
auto iter = projected_expr_cache.find(spec_id);
574+
if (iter != projected_expr_cache.end()) {
575+
return iter->second;
578576
}
579577

580-
// Evaluate manifest against filter
581-
if (auto it = eval_cache.find(spec_id); it != eval_cache.end()) {
582-
ICEBERG_ASSIGN_OR_RAISE(auto should_match, it->second->Evaluate(manifest));
583-
if (!should_match) {
584-
continue; // Manifest doesn't match filter
585-
}
578+
auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_);
579+
ICEBERG_ASSIGN_OR_RAISE(auto projected, projector->Project(data_filter_));
580+
auto [inserted_iter, _] = projected_expr_cache.emplace(spec_id, std::move(projected));
581+
return inserted_iter->second;
582+
};
583+
584+
auto get_manifest_evaluator =
585+
[&](int32_t spec_id, const std::shared_ptr<PartitionSpec>& spec,
586+
const std::shared_ptr<Expression>& filter) -> Result<ManifestEvaluator*> {
587+
if (!filter) {
588+
return nullptr;
586589
}
587590

588-
// Read manifest entries
589-
ICEBERG_ASSIGN_OR_RAISE(auto reader,
590-
ManifestReader::Make(manifest, io_, schema_, spec));
591-
592-
auto partition_filter = partition_filter_;
593-
if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) {
594-
if (partition_filter) {
595-
ICEBERG_ASSIGN_OR_RAISE(partition_filter,
596-
And::Make(partition_filter, it->second));
597-
} else {
598-
partition_filter = it->second;
591+
{
592+
std::shared_lock lock(eval_cache_mutex);
593+
auto iter = eval_cache.find(spec_id);
594+
if (iter != eval_cache.end()) {
595+
return iter->second.get();
599596
}
600597
}
601-
if (partition_filter) {
602-
reader->FilterPartitions(std::move(partition_filter));
603-
}
604-
if (partition_set_) {
605-
reader->FilterPartitions(partition_set_);
606-
}
607-
reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats();
608-
609-
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());
610-
files.reserve(files.size() + entries.size());
611-
612-
for (auto& entry : entries) {
613-
ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data file");
614-
ICEBERG_CHECK(entry.sequence_number.has_value(),
615-
"Missing sequence number from delete file: {}",
616-
entry.data_file->file_path);
617-
if (entry.sequence_number.value() > min_sequence_number_) {
618-
auto& file = *entry.data_file;
619-
// keep minimum stats to avoid memory pressure
620-
std::unordered_set<int32_t> columns =
621-
file.content == DataFile::Content::kPositionDeletes
622-
? std::unordered_set<int32_t>{MetadataColumns::kDeleteFilePathColumnId}
623-
: std::unordered_set<int32_t>(file.equality_ids.begin(),
624-
file.equality_ids.end());
625-
ContentFileUtil::DropUnselectedStats(*entry.data_file, columns);
626-
files.emplace_back(std::move(entry));
627-
}
598+
599+
std::lock_guard lock(eval_cache_mutex);
600+
auto iter = eval_cache.find(spec_id);
601+
if (iter != eval_cache.end()) {
602+
return iter->second.get();
628603
}
629-
}
630604

631-
return files;
605+
ICEBERG_ASSIGN_OR_RAISE(auto evaluator, ManifestEvaluator::MakePartitionFilter(
606+
filter, spec, *schema_, case_sensitive_));
607+
auto [inserted_iter, _] = eval_cache.emplace(spec_id, std::move(evaluator));
608+
return inserted_iter->second.get();
609+
};
610+
611+
return ParallelCollect(
612+
executor_, delete_manifests_,
613+
[&](const ManifestFile& manifest) -> Result<std::vector<ManifestEntry>> {
614+
std::vector<ManifestEntry> manifest_result;
615+
if (manifest.content != ManifestContent::kDeletes) {
616+
return manifest_result;
617+
}
618+
if (!manifest.has_added_files() && !manifest.has_existing_files()) {
619+
return manifest_result;
620+
}
621+
622+
const int32_t spec_id = manifest.partition_spec_id;
623+
auto spec_iter = specs_by_id_.find(spec_id);
624+
ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
625+
"Partition spec ID {} not found when loading delete files",
626+
spec_id);
627+
628+
const auto& spec = spec_iter->second;
629+
630+
ICEBERG_ASSIGN_OR_RAISE(auto projected_data_filter,
631+
get_projected_expr(spec_id, spec));
632+
ICEBERG_ASSIGN_OR_RAISE(auto delete_partition_filter,
633+
and_filters(partition_filter_, projected_data_filter));
634+
ICEBERG_ASSIGN_OR_RAISE(
635+
auto manifest_evaluator,
636+
get_manifest_evaluator(spec_id, spec, delete_partition_filter));
637+
if (manifest_evaluator != nullptr) {
638+
ICEBERG_ASSIGN_OR_RAISE(auto should_match,
639+
manifest_evaluator->Evaluate(manifest));
640+
if (!should_match) {
641+
return manifest_result;
642+
}
643+
}
644+
645+
// Read manifest entries
646+
ICEBERG_ASSIGN_OR_RAISE(auto reader,
647+
ManifestReader::Make(manifest, io_, schema_, spec));
648+
649+
if (delete_partition_filter) {
650+
reader->FilterPartitions(std::move(delete_partition_filter));
651+
}
652+
if (partition_set_) {
653+
reader->FilterPartitions(partition_set_);
654+
}
655+
reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats();
656+
657+
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());
658+
manifest_result.reserve(entries.size());
659+
660+
for (auto& entry : entries) {
661+
ICEBERG_CHECK(entry.data_file != nullptr,
662+
"ManifestEntry must have a data file");
663+
ICEBERG_CHECK(entry.sequence_number.has_value(),
664+
"Missing sequence number from delete file: {}",
665+
entry.data_file->file_path);
666+
if (entry.sequence_number.value() > min_sequence_number_) {
667+
auto& file = *entry.data_file;
668+
// keep minimum stats to avoid memory pressure
669+
std::unordered_set<int32_t> columns =
670+
file.content == DataFile::Content::kPositionDeletes
671+
? std::unordered_set<
672+
int32_t>{MetadataColumns::kDeleteFilePathColumnId}
673+
: std::unordered_set<int32_t>(file.equality_ids.begin(),
674+
file.equality_ids.end());
675+
ContentFileUtil::DropUnselectedStats(*entry.data_file, columns);
676+
manifest_result.emplace_back(std::move(entry));
677+
}
678+
}
679+
return manifest_result;
680+
});
632681
}
633682

634683
Status DeleteFileIndex::Builder::AddDV(

src/iceberg/delete_file_index.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "iceberg/result.h"
3636
#include "iceberg/type_fwd.h"
3737
#include "iceberg/util/error_collector.h"
38+
#include "iceberg/util/executor.h"
3839
#include "iceberg/util/partition_value_util.h"
3940

4041
namespace iceberg {
@@ -356,6 +357,12 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector {
356357
/// \brief Ignore residual expressions after partition filtering.
357358
Builder& IgnoreResiduals();
358359

360+
/// \brief Configure an optional executor for reading delete manifests.
361+
///
362+
/// \param executor Executor to use, or std::nullopt to read manifests serially.
363+
/// \return Reference to this for method chaining.
364+
Builder& PlanWith(OptionalExecutor executor);
365+
359366
/// \brief Build the DeleteFileIndex.
360367
Result<std::unique_ptr<DeleteFileIndex>> Build();
361368

@@ -388,6 +395,7 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector {
388395
std::shared_ptr<Expression> data_filter_;
389396
std::shared_ptr<Expression> partition_filter_;
390397
std::shared_ptr<PartitionSet> partition_set_;
398+
OptionalExecutor executor_;
391399
bool case_sensitive_ = true;
392400
bool ignore_residuals_ = false;
393401
};

src/iceberg/manifest/manifest_group.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ ManifestGroup& ManifestGroup::ColumnsToKeepStats(std::unordered_set<int32_t> col
194194

195195
ManifestGroup& ManifestGroup::PlanWith(OptionalExecutor executor) {
196196
executor_ = executor;
197+
delete_index_builder_.PlanWith(executor);
197198
return *this;
198199
}
199200

@@ -314,8 +315,7 @@ Result<std::unique_ptr<ManifestReader>> ManifestGroup::MakeReader(
314315

315316
auto columns = columns_;
316317
if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue &&
317-
!columns.empty() &&
318-
std::ranges::find(columns, Schema::kAllColumns) == columns.end()) {
318+
!columns.empty() && !std::ranges::contains(columns, Schema::kAllColumns)) {
319319
auto data_file_schema = DataFileFilterSchema();
320320
ICEBERG_ASSIGN_OR_RAISE(
321321
auto bound_file_filter,

src/iceberg/table_scan.cc

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,12 @@ TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::MinRowsRequested(
288288
return *this;
289289
}
290290

291+
template <typename ScanType>
292+
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::PlanWith(Executor& executor) {
293+
context_.plan_executor = std::ref(executor);
294+
return *this;
295+
}
296+
291297
template <typename ScanType>
292298
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::UseSnapshot(int64_t snapshot_id) {
293299
ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(),
@@ -538,7 +544,8 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
538544
.Select(ScanColumns())
539545
.FilterData(filter())
540546
.IgnoreDeleted()
541-
.ColumnsToKeepStats(context_.columns_to_keep_stats);
547+
.ColumnsToKeepStats(context_.columns_to_keep_stats)
548+
.PlanWith(context_.plan_executor);
542549
if (context_.ignore_residuals) {
543550
manifest_group->IgnoreResiduals();
544551
}
@@ -641,7 +648,8 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFi
641648
entry.status == ManifestStatus::kAdded;
642649
})
643650
.IgnoreDeleted()
644-
.ColumnsToKeepStats(context_.columns_to_keep_stats);
651+
.ColumnsToKeepStats(context_.columns_to_keep_stats)
652+
.PlanWith(context_.plan_executor);
645653

646654
if (context_.ignore_residuals) {
647655
manifest_group->IgnoreResiduals();
@@ -737,7 +745,8 @@ IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_excl
737745
snapshot_ids.contains(entry.snapshot_id.value());
738746
})
739747
.IgnoreExisting()
740-
.ColumnsToKeepStats(context_.columns_to_keep_stats);
748+
.ColumnsToKeepStats(context_.columns_to_keep_stats)
749+
.PlanWith(context_.plan_executor);
741750

742751
if (context_.ignore_residuals) {
743752
manifest_group->IgnoreResiduals();

src/iceberg/table_scan.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "iceberg/table_metadata.h"
3333
#include "iceberg/type_fwd.h"
3434
#include "iceberg/util/error_collector.h"
35+
#include "iceberg/util/executor.h"
3536

3637
namespace iceberg {
3738

@@ -228,6 +229,7 @@ struct TableScanContext {
228229
std::optional<int64_t> to_snapshot_id;
229230
std::string branch{};
230231
std::optional<int64_t> min_rows_requested;
232+
OptionalExecutor plan_executor;
231233

232234
// Validate the context parameters to see if they have conflicts.
233235
[[nodiscard]] Status Validate() const;
@@ -302,6 +304,12 @@ class ICEBERG_TEMPLATE_CLASS_EXPORT TableScanBuilder : public ErrorCollector {
302304
/// \param num_rows The minimum number of rows requested
303305
TableScanBuilder& MinRowsRequested(int64_t num_rows);
304306

307+
/// \brief Configure an executor for manifest planning.
308+
///
309+
/// \param executor Executor to use while planning manifests.
310+
/// \return Reference to this for method chaining.
311+
TableScanBuilder& PlanWith(Executor& executor);
312+
305313
/// \brief Request this scan to use the given snapshot by ID.
306314
/// \param snapshot_id a snapshot ID
307315
/// \note InvalidArgument will be returned if the snapshot cannot be found

src/iceberg/test/arrow_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,7 @@ TEST(ArrowExecutorAdapterTest, RunsTaskGroupOnThreadPool) {
638638
std::mutex mutex;
639639
std::vector<std::thread::id> thread_ids;
640640

641-
auto status = TaskGroup<>()
641+
auto status = TaskGroup()
642642
.SetExecutor(std::ref(executor))
643643
.Submit([&]() -> Status {
644644
std::lock_guard lock(mutex);

0 commit comments

Comments
 (0)