diff --git a/src/extract/extract.cpp b/src/extract/extract.cpp
index c46e7044..60787e6c 100644
--- a/src/extract/extract.cpp
+++ b/src/extract/extract.cpp
@@ -29,28 +29,108 @@ along with this program. If not, see .
#include
#include
-void Extract::open_file(const osmium::io::Header& header, osmium::io::overwrite output_overwrite, osmium::io::fsync sync, OptionClean const* clean) {
+void Extract::writer_loop() {
+ try {
+ while (true) {
+ std::unique_lock lock{m_mutex};
+ m_cv.wait(lock, [this]{ return m_flush_pending || m_shutdown; });
+
+ if (m_shutdown && !m_flush_pending) {
+ break;
+ }
+
+ // Reset m_flush_pending under the lock so that swap_and_flush()
+ // can observe the transition to false only after the buffer has
+ // been moved out and is no longer shared.
+ m_clean->apply_to(m_flush_buffer);
+ auto buf = std::move(m_flush_buffer);
+ m_flush_buffer = osmium::memory::Buffer{buffer_size,
+ osmium::memory::Buffer::auto_grow::no};
+ m_flush_pending = false;
+ lock.unlock();
+ m_cv.notify_one();
+
+ // The osmium Writer call (compression + I/O) runs outside the lock
+ // so the main thread can keep filling m_fill_buffer concurrently.
+ (*m_writer)(std::move(buf));
+ }
+ } catch (...) {
+ std::unique_lock lock{m_mutex};
+ m_writer_exception = std::current_exception();
+ m_flush_pending = false;
+ m_shutdown = true;
+ lock.unlock();
+ m_cv.notify_all();
+ }
+}
+
+void Extract::check_writer_exception() {
+ if (m_writer_exception) {
+ std::rethrow_exception(m_writer_exception);
+ }
+}
+
+void Extract::swap_and_flush() {
+ std::unique_lock lock{m_mutex};
+ m_cv.wait(lock, [this]{ return !m_flush_pending || m_shutdown; });
+ check_writer_exception();
+
+ std::swap(m_fill_buffer, m_flush_buffer);
+ m_fill_buffer = osmium::memory::Buffer{buffer_size,
+ osmium::memory::Buffer::auto_grow::no};
+ m_flush_pending = true;
+ lock.unlock();
+ m_cv.notify_one();
+}
+
+void Extract::open_file(const osmium::io::Header& header,
+ osmium::io::overwrite output_overwrite,
+ osmium::io::fsync sync,
+ OptionClean const* clean) {
m_clean = clean;
- m_writer = std::make_unique(m_output_file, header, output_overwrite, sync);
+ m_writer = std::make_unique(m_output_file, header,
+ output_overwrite, sync);
+ m_writer_thread = std::thread{&Extract::writer_loop, this};
}
void Extract::close_file() {
- if (m_writer) {
- if (m_buffer.committed() > 0) {
- m_clean->apply_to(m_buffer);
- (*m_writer)(std::move(m_buffer));
+ if (!m_writer) {
+ return;
+ }
+
+ if (m_fill_buffer.committed() > 0) {
+ swap_and_flush();
+ }
+
+ {
+ std::unique_lock lock{m_mutex};
+ m_cv.wait(lock, [this]{ return !m_flush_pending || m_shutdown; });
+ check_writer_exception();
+ m_shutdown = true;
+ }
+ m_cv.notify_one();
+ m_writer_thread.join();
+
+ check_writer_exception();
+ m_writer->close();
+}
+
+Extract::~Extract() {
+ if (m_writer_thread.joinable()) {
+ {
+ std::lock_guard lock{m_mutex};
+ m_shutdown = true;
}
- m_writer->close();
+ m_cv.notify_one();
+ m_writer_thread.join();
}
}
void Extract::write(const osmium::memory::Item& item) {
- if (m_buffer.capacity() - m_buffer.committed() < item.padded_size()) {
- m_clean->apply_to(m_buffer);
- (*m_writer)(std::move(m_buffer));
- m_buffer = osmium::memory::Buffer{buffer_size, osmium::memory::Buffer::auto_grow::no};
+ if (m_fill_buffer.capacity() - m_fill_buffer.committed() < item.padded_size()) {
+ swap_and_flush();
}
- m_buffer.push_back(item);
+ m_fill_buffer.push_back(item);
}
std::string Extract::envelope_as_text() const {
@@ -58,4 +138,3 @@ std::string Extract::envelope_as_text() const {
ss << m_envelope;
return ss.str();
}
-
diff --git a/src/extract/extract.hpp b/src/extract/extract.hpp
index 782bafc1..9d00616d 100644
--- a/src/extract/extract.hpp
+++ b/src/extract/extract.hpp
@@ -29,12 +29,17 @@ along with this program. If not, see .
#include
#include
#include
+#include
#include
#include
#include
+#include
+#include
#include
+#include
#include
+#include
#include
#include
@@ -46,20 +51,35 @@ class Extract {
std::string m_description;
std::vector m_header_options;
osmium::Box m_envelope;
- osmium::memory::Buffer m_buffer{buffer_size, osmium::memory::Buffer::auto_grow::no};
+
+ // Double-buffering: the main thread fills m_fill_buffer while the writer
+ // thread flushes m_flush_buffer to disk asynchronously.
+ osmium::memory::Buffer m_fill_buffer{buffer_size, osmium::memory::Buffer::auto_grow::no};
+ osmium::memory::Buffer m_flush_buffer{buffer_size, osmium::memory::Buffer::auto_grow::no};
+
std::unique_ptr m_writer;
const OptionClean* m_clean = nullptr;
+ std::thread m_writer_thread;
+ std::mutex m_mutex;
+ std::condition_variable m_cv;
+ bool m_flush_pending = false;
+ bool m_shutdown = false;
+ std::exception_ptr m_writer_exception;
+
+ void writer_loop();
+ void swap_and_flush();
+ void check_writer_exception();
+
public:
Extract(const osmium::io::File& output_file, const std::string& description, const osmium::Box& envelope) :
m_output_file(output_file),
m_description(description),
- m_envelope(envelope),
- m_writer(nullptr) {
+ m_envelope(envelope) {
}
- virtual ~Extract() = default;
+ virtual ~Extract();
const std::string& output() const noexcept {
return m_output_file.filename();
diff --git a/src/extract/strategy.hpp b/src/extract/strategy.hpp
index e9fe60a1..47188749 100644
--- a/src/extract/strategy.hpp
+++ b/src/extract/strategy.hpp
@@ -104,9 +104,7 @@ class Pass {
break;
case osmium::item_type::way:
self().way(static_cast(object));
- for (auto& e : extracts()) {
- self().eway(&e, static_cast(object));
- }
+ self().eway_all(extracts(), static_cast(object));
break;
case osmium::item_type::relation:
self().relation(static_cast(object));
@@ -155,6 +153,15 @@ class Pass {
void erelation(extract_data*, const osmium::Relation&) {
}
+ // Default implementation: call eway() for each extract separately.
+ // Subclasses may override this to process all extracts in a single
+ // pass over way.nodes().
+ void eway_all(std::vector& exts, const osmium::Way& way) {
+ for (auto& e : exts) {
+ self().eway(&e, way);
+ }
+ }
+
public:
explicit Pass(TStrategy* strategy) :
diff --git a/src/extract/strategy_complete_ways.cpp b/src/extract/strategy_complete_ways.cpp
index 96a65594..e9b855d9 100644
--- a/src/extract/strategy_complete_ways.cpp
+++ b/src/extract/strategy_complete_ways.cpp
@@ -27,6 +27,7 @@ along with this program. If not, see .
#include
#include
+#include
#include
#include
#include
@@ -100,6 +101,46 @@ namespace strategy_complete_ways {
}
}
+ // Override that scans way.nodes() at most twice for all extracts
+ // combined instead of up to twice per extract as the default does.
+ // Pass A finds which extracts claim this way (bitmask, max 64 extracts).
+ // Pass B records all node refs into extra_node_ids for matched extracts.
+ void eway_all(std::vector& exts, const osmium::Way& way) {
+ const std::size_t n = exts.size();
+ std::uint64_t found_mask = 0;
+ std::size_t remaining = n;
+
+ for (const auto& nr : way.nodes()) {
+ const auto node_id = nr.positive_ref();
+ for (std::size_t i = 0; i < n; ++i) {
+ if (!(found_mask & (std::uint64_t{1} << i)) &&
+ exts[i].node_ids.get(node_id)) {
+ found_mask |= std::uint64_t{1} << i;
+ exts[i].way_ids.set(way.positive_id());
+ if (--remaining == 0) {
+ break;
+ }
+ }
+ }
+ if (remaining == 0) {
+ break;
+ }
+ }
+
+ if (found_mask == 0) {
+ return;
+ }
+
+ for (const auto& nr : way.nodes()) {
+ const auto node_ref = nr.ref();
+ for (std::size_t i = 0; i < n; ++i) {
+ if (found_mask & (std::uint64_t{1} << i)) {
+ exts[i].extra_node_ids.set(node_ref);
+ }
+ }
+ }
+ }
+
void relation(const osmium::Relation& relation) {
m_check_order.relation(relation);
m_relations_map_stash.add_members(relation);