Skip to content

Commit 06742c5

Browse files
authored
lock-free prefetch enqueue (#7)
1 parent 38119b5 commit 06742c5

4 files changed

Lines changed: 67 additions & 4 deletions

File tree

CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@ FetchContent_Declare(
2525
FetchContent_MakeAvailable(sqlitecpp)
2626
include_directories(${sqlitecpp_SOURCE_DIR}/include)
2727

28+
FetchContent_Declare(
29+
concurrentqueue
30+
GIT_REPOSITORY https://github.com/cameron314/readerwriterqueue.git
31+
GIT_TAG v1.0.3
32+
)
33+
FetchContent_MakeAvailable(concurrentqueue)
34+
include_directories(${concurrentqueue_SOURCE_DIR})
35+
2836
project(sqlite_nested_vfs VERSION 1.0
2937
DESCRIPTION "SQLite VFS extension storing database pages in...a SQLite database"
3038
LANGUAGES C CXX)

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ RUN apt-get -qq update && \
1414
ADD . /work
1515
WORKDIR /work
1616

17-
RUN cmake -DCMAKE_BUILD_TYPE=$build_type . -B build && cmake --build build -j $(nproc)
17+
RUN rm -rf build && cmake -DCMAKE_BUILD_TYPE=$build_type . -B build && cmake --build build -j $(nproc)
1818

1919
WORKDIR /work/build
2020
CMD ctest -V

src/SQLiteNestedVFS.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
296296

297297
const size_t MAX_FETCH_CURSORS = 4;
298298
std::vector<std::unique_ptr<FetchJob>> fetch_jobs_;
299-
ThreadPool fetch_thread_pool_;
299+
ThreadPoolWithEnqueueFast fetch_thread_pool_;
300300
std::mutex seek_lock_; // serializes outer db interactions among fetch background threads
301301
std::atomic<bool> seek_interrupt_; // broadcast that main thread wants seek_lock_
302302

@@ -445,7 +445,7 @@ class InnerDatabaseFile : public SQLiteVFS::File {
445445
if (active_jobs + 2 <= fetch_thread_pool_.MaxThreads()) {
446446
job->pageno = pageno_hint;
447447
job->PutState(FetchJob::State::QUEUE);
448-
fetch_thread_pool_.Enqueue(
448+
fetch_thread_pool_.EnqueueFast(
449449
job, [this](void *job) { return this->BackgroundFetchJob(job); }, nullptr);
450450
} else {
451451
assert(active_jobs + 1 == fetch_thread_pool_.MaxThreads());

src/ThreadPool.h

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <functional>
1414
#include <mutex>
1515
#include <queue>
16+
#include <readerwriterqueue.h>
1617
#include <thread>
1718

1819
namespace SQLiteNested {
@@ -88,7 +89,7 @@ class ThreadPool {
8889
public:
8990
ThreadPool(size_t max_threads, size_t max_jobs)
9091
: max_threads_(max_threads), max_jobs_(max_jobs), ser_queue_(job_greater_) {}
91-
~ThreadPool() {
92+
virtual ~ThreadPool() {
9293
{
9394
std::lock_guard<std::mutex> lock(mutex_);
9495
shutdown_ = true;
@@ -140,4 +141,58 @@ class ThreadPool {
140141
}
141142
};
142143

144+
// Adds lock-free EnqueueFast() for use on critical paths.
145+
// - EnqueueFast() never blocks (!)
146+
// - Only one thread should ever use it
147+
class ThreadPoolWithEnqueueFast : public ThreadPool {
148+
// concept: foreground thread adds job onto a lock-free queue, which a single background thread
149+
// consumes to Enqueue()
150+
151+
struct EnqueueFastJob {
152+
bool shutdown = false;
153+
void *x = nullptr;
154+
std::function<void *(void *) noexcept> par;
155+
std::function<void(void *) noexcept> ser;
156+
};
157+
158+
moodycamel::BlockingReaderWriterQueue<EnqueueFastJob> fast_queue_;
159+
std::unique_ptr<std::thread> worker_thread_;
160+
161+
void EnqueueFastWorker() {
162+
EnqueueFastJob job;
163+
while (true) {
164+
fast_queue_.wait_dequeue(job);
165+
if (job.shutdown) {
166+
break;
167+
}
168+
this->Enqueue(job.x, job.par, job.ser);
169+
}
170+
}
171+
172+
public:
173+
ThreadPoolWithEnqueueFast(size_t max_threads, size_t max_jobs)
174+
: ThreadPool(max_threads, max_jobs), fast_queue_(max_jobs) {}
175+
176+
~ThreadPoolWithEnqueueFast() {
177+
if (worker_thread_) {
178+
EnqueueFastJob job;
179+
job.shutdown = true;
180+
fast_queue_.enqueue(job);
181+
worker_thread_->join();
182+
}
183+
}
184+
185+
void EnqueueFast(void *x, std::function<void *(void *) noexcept> par,
186+
std::function<void(void *) noexcept> ser) {
187+
EnqueueFastJob job;
188+
job.x = x;
189+
job.par = par;
190+
job.ser = ser;
191+
fast_queue_.enqueue(job);
192+
if (!worker_thread_) {
193+
worker_thread_.reset(new std::thread([this]() { this->EnqueueFastWorker(); }));
194+
}
195+
}
196+
};
197+
143198
} // namespace SQLiteNested

0 commit comments

Comments
 (0)