Skip to content

Commit 4f89097

Browse files
shangxinliclaude
andcommitted
refactor: use a dedicated ThreadPool instead of std::async for parallel deletes
Addresses review feedback on apache#649: std::async spins up a fresh std::thread per DeleteFiles call (and per worker), which thrashes thread creation when CleanFiles invokes DeleteFiles 3-4 times in a row. Replace with a per-strategy ThreadPool that owns its workers for the lifetime of the strategy. - New util/thread_pool_internal.h / .cc: minimal worker pool with eager thread start, mutex+cv task queue, Submit returning a future<void>, and a RunAndWait fan-out helper for span-of-items workloads. Drained on destruction. - FileCleanupStrategy now holds a ThreadPool sized once at construction (min(8, hardware_concurrency)). DeleteFiles short-circuits empty/single-item batches and otherwise delegates to pool_.RunAndWait. The pool member is declared last so workers are joined before file_io_ and delete_func_ are destroyed. - Drops the RunInParallel free template, the per-call WorkerCount, and the <future>/<span> includes in expire_snapshots.cc. - Adds util_test::ThreadPoolTest covering ctor validation, single submit, fan-out, empty no-op, observed concurrency, exception isolation, and dtor drain. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 994f4e4 commit 4f89097

6 files changed

Lines changed: 332 additions & 57 deletions

File tree

src/iceberg/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ set(ICEBERG_SOURCES
110110
util/string_util.cc
111111
util/struct_like_set.cc
112112
util/temporal_util.cc
113+
util/thread_pool_internal.cc
113114
util/timepoint.cc
114115
util/transform_util.cc
115116
util/truncate_util.cc

src/iceberg/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ add_iceberg_test(util_test
132132
retry_util_test.cc
133133
string_util_test.cc
134134
struct_like_set_test.cc
135+
thread_pool_test.cc
135136
transform_util_test.cc
136137
truncate_util_test.cc
137138
url_encoder_test.cc
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <atomic>
21+
#include <chrono>
22+
#include <span>
23+
#include <stdexcept>
24+
#include <thread>
25+
#include <vector>
26+
27+
#include <gtest/gtest.h>
28+
29+
#include "iceberg/util/thread_pool_internal.h"
30+
31+
namespace iceberg {
32+
33+
TEST(ThreadPoolTest, ZeroWorkersThrows) {
34+
EXPECT_THROW(ThreadPool(0), std::invalid_argument);
35+
}
36+
37+
TEST(ThreadPoolTest, SubmitRunsTask) {
38+
ThreadPool pool(2);
39+
std::atomic<int> n{0};
40+
auto fut = pool.Submit([&] { n.fetch_add(1, std::memory_order_relaxed); });
41+
fut.wait();
42+
EXPECT_EQ(n.load(), 1);
43+
}
44+
45+
TEST(ThreadPoolTest, RunAndWaitProcessesEveryItem) {
46+
ThreadPool pool(4);
47+
std::vector<int> items(100);
48+
for (int i = 0; i < 100; ++i) items[i] = i;
49+
50+
std::atomic<long> sum{0};
51+
pool.RunAndWait<int>(std::span<const int>(items),
52+
[&](int v) { sum.fetch_add(v, std::memory_order_relaxed); });
53+
54+
// 0 + 1 + ... + 99 = 4950
55+
EXPECT_EQ(sum.load(), 4950);
56+
}
57+
58+
TEST(ThreadPoolTest, RunAndWaitEmptyIsNoOp) {
59+
ThreadPool pool(2);
60+
std::vector<int> items;
61+
std::atomic<int> seen{0};
62+
pool.RunAndWait<int>(std::span<const int>(items),
63+
[&](int) { seen.fetch_add(1, std::memory_order_relaxed); });
64+
EXPECT_EQ(seen.load(), 0);
65+
}
66+
67+
TEST(ThreadPoolTest, RunAndWaitExecutesConcurrently) {
68+
// Use a barrier-style check: each task increments an in-flight counter, sleeps
69+
// briefly, decrements, and records the peak. With multiple workers the peak
70+
// should exceed 1.
71+
constexpr int kWorkers = 4;
72+
constexpr int kItems = 8;
73+
ThreadPool pool(kWorkers);
74+
75+
std::vector<int> items(kItems, 0);
76+
std::atomic<int> in_flight{0};
77+
std::atomic<int> peak{0};
78+
79+
pool.RunAndWait<int>(std::span<const int>(items), [&](int) {
80+
int now = in_flight.fetch_add(1, std::memory_order_acq_rel) + 1;
81+
int prev = peak.load(std::memory_order_relaxed);
82+
while (now > prev &&
83+
!peak.compare_exchange_weak(prev, now, std::memory_order_relaxed)) {
84+
}
85+
std::this_thread::sleep_for(std::chrono::milliseconds(20));
86+
in_flight.fetch_sub(1, std::memory_order_acq_rel);
87+
});
88+
89+
EXPECT_GT(peak.load(), 1) << "expected concurrent execution across workers";
90+
}
91+
92+
TEST(ThreadPoolTest, ExceptionInTaskDoesNotKillWorker) {
93+
ThreadPool pool(1);
94+
// packaged_task captures the exception into the future; the worker loop must
95+
// continue and process the next submission.
96+
auto bad = pool.Submit([] { throw std::runtime_error("boom"); });
97+
EXPECT_THROW(bad.get(), std::runtime_error);
98+
99+
std::atomic<int> ok{0};
100+
auto good = pool.Submit([&] { ok.store(1, std::memory_order_relaxed); });
101+
good.wait();
102+
EXPECT_EQ(ok.load(), 1);
103+
}
104+
105+
TEST(ThreadPoolTest, DestructorJoinsAllPendingWork) {
106+
std::atomic<int> done{0};
107+
{
108+
ThreadPool pool(2);
109+
for (int i = 0; i < 16; ++i) {
110+
// Discard futures: we rely on the destructor to drain queued work.
111+
(void)pool.Submit([&] {
112+
std::this_thread::sleep_for(std::chrono::milliseconds(5));
113+
done.fetch_add(1, std::memory_order_relaxed);
114+
});
115+
}
116+
}
117+
EXPECT_EQ(done.load(), 16);
118+
}
119+
120+
} // namespace iceberg

src/iceberg/update/expire_snapshots.cc

Lines changed: 25 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@
2222
#include <algorithm>
2323
#include <chrono>
2424
#include <cstdint>
25-
#include <future>
2625
#include <iterator>
2726
#include <memory>
2827
#include <optional>
29-
#include <span>
3028
#include <string>
3129
#include <thread>
3230
#include <unordered_set>
@@ -44,6 +42,7 @@
4442
#include "iceberg/util/error_collector.h"
4543
#include "iceberg/util/macros.h"
4644
#include "iceberg/util/snapshot_util_internal.h"
45+
#include "iceberg/util/thread_pool_internal.h"
4746

4847
namespace iceberg {
4948

@@ -58,70 +57,28 @@ Result<std::shared_ptr<ManifestReader>> MakeManifestReader(
5857
return ManifestReader::Make(manifest, file_io, std::move(schema), std::move(spec));
5958
}
6059

61-
/// \brief Cap on per-CleanFiles worker concurrency.
60+
/// \brief Cap on per-strategy worker concurrency.
6261
///
6362
/// Java's RemoveSnapshots takes ExecutorServices from the table operations layer.
64-
/// C++ has no shared executor today, so each strategy spins up an ad-hoc pool via
65-
/// std::async. Cap concurrency to avoid swamping FileIO with hundreds of in-flight
66-
/// requests on hosts with very high core counts.
63+
/// C++ has no shared executor today, so each strategy owns a private ThreadPool.
64+
/// Cap concurrency to avoid swamping FileIO with hundreds of in-flight requests
65+
/// on hosts with very high core counts.
6766
constexpr std::size_t kMaxParallelism = 8;
6867

69-
std::size_t WorkerCount(std::size_t item_count) {
70-
if (item_count <= 1) return 1;
68+
std::size_t WorkerCount() {
7169
std::size_t hw = std::thread::hardware_concurrency();
7270
if (hw == 0) hw = 2;
73-
return std::min({item_count, kMaxParallelism, hw});
74-
}
75-
76-
/// \brief Run `work` over `items` using up to WorkerCount(items) std::async workers.
77-
///
78-
/// Each worker drains a contiguous slice of `items`. Exceptions thrown by `work` are
79-
/// swallowed -- callers that need to know whether a unit succeeded should record it
80-
/// inside `work` (e.g. via an atomic counter or a thread-safe collection).
81-
template <typename Item, typename Fn>
82-
void RunInParallel(std::span<const Item> items, Fn&& work) {
83-
if (items.empty()) return;
84-
std::size_t n = WorkerCount(items.size());
85-
if (n <= 1) {
86-
for (const auto& item : items) {
87-
try {
88-
work(item);
89-
} catch (...) {
90-
// best-effort
91-
}
92-
}
93-
return;
94-
}
95-
96-
std::vector<std::future<void>> futures;
97-
futures.reserve(n);
98-
std::size_t per = (items.size() + n - 1) / n;
99-
for (std::size_t i = 0; i < n; ++i) {
100-
std::size_t begin = i * per;
101-
if (begin >= items.size()) break;
102-
std::size_t end = std::min(begin + per, items.size());
103-
auto slice = items.subspan(begin, end - begin);
104-
futures.emplace_back(std::async(std::launch::async, [slice, &work]() {
105-
for (const auto& item : slice) {
106-
try {
107-
work(item);
108-
} catch (...) {
109-
// best-effort: see RunInParallel doc
110-
}
111-
}
112-
}));
113-
}
114-
for (auto& f : futures) {
115-
f.wait();
116-
}
71+
return std::min(kMaxParallelism, hw);
11772
}
11873

11974
/// \brief Abstract strategy for cleaning up files after snapshot expiration.
12075
class FileCleanupStrategy {
12176
public:
12277
FileCleanupStrategy(std::shared_ptr<FileIO> file_io,
12378
std::function<void(const std::string&)> delete_func)
124-
: file_io_(std::move(file_io)), delete_func_(std::move(delete_func)) {}
79+
: file_io_(std::move(file_io)),
80+
delete_func_(std::move(delete_func)),
81+
pool_(WorkerCount()) {}
12582

12683
virtual ~FileCleanupStrategy() = default;
12784

@@ -174,16 +131,20 @@ class FileCleanupStrategy {
174131
}
175132
}
176133

177-
/// \brief Delete a batch of files, parallelized via RunInParallel.
134+
/// \brief Delete a batch of files in parallel via the strategy's worker pool.
178135
///
179136
/// TODO(shangxinli): When FileIO grows a SupportsBulkOperations-style
180137
/// `DeleteFiles(span<string>)` API, prefer the bulk path here (mirroring
181138
/// Java's FileCleanupStrategy.deleteFiles).
182139
void DeleteFiles(const std::unordered_set<std::string>& paths) {
183140
if (paths.empty()) return;
141+
if (paths.size() == 1) {
142+
DeleteFile(*paths.begin());
143+
return;
144+
}
184145
std::vector<std::string> as_vec(paths.begin(), paths.end());
185-
RunInParallel<std::string>(std::span<const std::string>(as_vec),
186-
[this](const std::string& p) { DeleteFile(p); });
146+
pool_.RunAndWait<std::string>(std::span<const std::string>(as_vec),
147+
[this](const std::string& p) { DeleteFile(p); });
187148
}
188149

189150
bool HasAnyStatisticsFiles(const TableMetadata& metadata) const {
@@ -225,6 +186,13 @@ class FileCleanupStrategy {
225186

226187
std::shared_ptr<FileIO> file_io_;
227188
std::function<void(const std::string&)> delete_func_;
189+
// Worker pool for parallel best-effort deletion. Must be declared after the
190+
// members it does NOT touch -- the pool's worker threads only run tasks
191+
// submitted via Submit/RunAndWait, which capture `this` and dereference
192+
// file_io_ / delete_func_, so those members must outlive the pool. Since
193+
// destruction order is reverse declaration order, listing pool_ last ensures
194+
// workers are joined before file_io_ and delete_func_ are destroyed.
195+
ThreadPool pool_;
228196
};
229197

230198
/// \brief File cleanup strategy that determines safe deletions via full reachability.
@@ -233,7 +201,7 @@ class FileCleanupStrategy {
233201
/// still referenced by retained snapshots, then deletes orphaned manifests, data
234202
/// files, and manifest lists.
235203
///
236-
/// TODO(shangxinli): Add multi-threaded manifest reading and file deletion support.
204+
/// TODO(shangxinli): Add multi-threaded manifest reading.
237205
class ReachableFileCleanup : public FileCleanupStrategy {
238206
public:
239207
using FileCleanupStrategy::FileCleanupStrategy;
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/util/thread_pool_internal.h"
21+
22+
#include <memory>
23+
#include <stdexcept>
24+
25+
namespace iceberg {
26+
27+
ThreadPool::ThreadPool(std::size_t num_workers) {
28+
if (num_workers == 0) {
29+
throw std::invalid_argument("ThreadPool num_workers must be > 0");
30+
}
31+
workers_.reserve(num_workers);
32+
for (std::size_t i = 0; i < num_workers; ++i) {
33+
workers_.emplace_back([this] { WorkerLoop(); });
34+
}
35+
}
36+
37+
ThreadPool::~ThreadPool() {
38+
{
39+
std::lock_guard<std::mutex> lock(mu_);
40+
stop_ = true;
41+
}
42+
cv_.notify_all();
43+
for (auto& w : workers_) {
44+
if (w.joinable()) w.join();
45+
}
46+
}
47+
48+
std::future<void> ThreadPool::Submit(std::function<void()> task) {
49+
auto pkg = std::make_shared<std::packaged_task<void()>>(std::move(task));
50+
std::future<void> fut = pkg->get_future();
51+
{
52+
std::unique_lock<std::mutex> lock(mu_);
53+
if (stop_) {
54+
// Pool is shutting down. Run the task on the calling thread so the future
55+
// becomes ready and callers don't deadlock waiting on it.
56+
lock.unlock();
57+
(*pkg)();
58+
return fut;
59+
}
60+
queue_.emplace([pkg]() { (*pkg)(); });
61+
}
62+
cv_.notify_one();
63+
return fut;
64+
}
65+
66+
void ThreadPool::WorkerLoop() {
67+
while (true) {
68+
std::function<void()> task;
69+
{
70+
std::unique_lock<std::mutex> lock(mu_);
71+
cv_.wait(lock, [this] { return stop_ || !queue_.empty(); });
72+
if (queue_.empty()) {
73+
// Drain mode: only exit once all queued work has been pulled.
74+
return;
75+
}
76+
task = std::move(queue_.front());
77+
queue_.pop();
78+
}
79+
// packaged_task captures exceptions into the future, so this won't throw.
80+
task();
81+
}
82+
}
83+
84+
} // namespace iceberg

0 commit comments

Comments
 (0)