diff --git a/src/extract/extract.cpp b/src/extract/extract.cpp index c46e7044..60787e6c 100644 --- a/src/extract/extract.cpp +++ b/src/extract/extract.cpp @@ -29,28 +29,108 @@ along with this program. If not, see . #include #include -void Extract::open_file(const osmium::io::Header& header, osmium::io::overwrite output_overwrite, osmium::io::fsync sync, OptionClean const* clean) { +void Extract::writer_loop() { + try { + while (true) { + std::unique_lock lock{m_mutex}; + m_cv.wait(lock, [this]{ return m_flush_pending || m_shutdown; }); + + if (m_shutdown && !m_flush_pending) { + break; + } + + // Reset m_flush_pending under the lock so that swap_and_flush() + // can observe the transition to false only after the buffer has + // been moved out and is no longer shared. + m_clean->apply_to(m_flush_buffer); + auto buf = std::move(m_flush_buffer); + m_flush_buffer = osmium::memory::Buffer{buffer_size, + osmium::memory::Buffer::auto_grow::no}; + m_flush_pending = false; + lock.unlock(); + m_cv.notify_one(); + + // The osmium Writer call (compression + I/O) runs outside the lock + // so the main thread can keep filling m_fill_buffer concurrently. + (*m_writer)(std::move(buf)); + } + } catch (...) { + std::unique_lock lock{m_mutex}; + m_writer_exception = std::current_exception(); + m_flush_pending = false; + m_shutdown = true; + lock.unlock(); + m_cv.notify_all(); + } +} + +void Extract::check_writer_exception() { + if (m_writer_exception) { + std::rethrow_exception(m_writer_exception); + } +} + +void Extract::swap_and_flush() { + std::unique_lock lock{m_mutex}; + m_cv.wait(lock, [this]{ return !m_flush_pending || m_shutdown; }); + check_writer_exception(); + + std::swap(m_fill_buffer, m_flush_buffer); + m_fill_buffer = osmium::memory::Buffer{buffer_size, + osmium::memory::Buffer::auto_grow::no}; + m_flush_pending = true; + lock.unlock(); + m_cv.notify_one(); +} + +void Extract::open_file(const osmium::io::Header& header, + osmium::io::overwrite output_overwrite, + osmium::io::fsync sync, + OptionClean const* clean) { m_clean = clean; - m_writer = std::make_unique(m_output_file, header, output_overwrite, sync); + m_writer = std::make_unique(m_output_file, header, + output_overwrite, sync); + m_writer_thread = std::thread{&Extract::writer_loop, this}; } void Extract::close_file() { - if (m_writer) { - if (m_buffer.committed() > 0) { - m_clean->apply_to(m_buffer); - (*m_writer)(std::move(m_buffer)); + if (!m_writer) { + return; + } + + if (m_fill_buffer.committed() > 0) { + swap_and_flush(); + } + + { + std::unique_lock lock{m_mutex}; + m_cv.wait(lock, [this]{ return !m_flush_pending || m_shutdown; }); + check_writer_exception(); + m_shutdown = true; + } + m_cv.notify_one(); + m_writer_thread.join(); + + check_writer_exception(); + m_writer->close(); +} + +Extract::~Extract() { + if (m_writer_thread.joinable()) { + { + std::lock_guard lock{m_mutex}; + m_shutdown = true; } - m_writer->close(); + m_cv.notify_one(); + m_writer_thread.join(); } } void Extract::write(const osmium::memory::Item& item) { - if (m_buffer.capacity() - m_buffer.committed() < item.padded_size()) { - m_clean->apply_to(m_buffer); - (*m_writer)(std::move(m_buffer)); - m_buffer = osmium::memory::Buffer{buffer_size, osmium::memory::Buffer::auto_grow::no}; + if (m_fill_buffer.capacity() - m_fill_buffer.committed() < item.padded_size()) { + swap_and_flush(); } - m_buffer.push_back(item); + m_fill_buffer.push_back(item); } std::string Extract::envelope_as_text() const { @@ -58,4 +138,3 @@ std::string Extract::envelope_as_text() const { ss << m_envelope; return ss.str(); } - diff --git a/src/extract/extract.hpp b/src/extract/extract.hpp index 782bafc1..9d00616d 100644 --- a/src/extract/extract.hpp +++ b/src/extract/extract.hpp @@ -29,12 +29,17 @@ along with this program. If not, see . #include #include #include +#include #include #include #include +#include +#include #include +#include #include +#include #include #include @@ -46,20 +51,35 @@ class Extract { std::string m_description; std::vector m_header_options; osmium::Box m_envelope; - osmium::memory::Buffer m_buffer{buffer_size, osmium::memory::Buffer::auto_grow::no}; + + // Double-buffering: the main thread fills m_fill_buffer while the writer + // thread flushes m_flush_buffer to disk asynchronously. + osmium::memory::Buffer m_fill_buffer{buffer_size, osmium::memory::Buffer::auto_grow::no}; + osmium::memory::Buffer m_flush_buffer{buffer_size, osmium::memory::Buffer::auto_grow::no}; + std::unique_ptr m_writer; const OptionClean* m_clean = nullptr; + std::thread m_writer_thread; + std::mutex m_mutex; + std::condition_variable m_cv; + bool m_flush_pending = false; + bool m_shutdown = false; + std::exception_ptr m_writer_exception; + + void writer_loop(); + void swap_and_flush(); + void check_writer_exception(); + public: Extract(const osmium::io::File& output_file, const std::string& description, const osmium::Box& envelope) : m_output_file(output_file), m_description(description), - m_envelope(envelope), - m_writer(nullptr) { + m_envelope(envelope) { } - virtual ~Extract() = default; + virtual ~Extract(); const std::string& output() const noexcept { return m_output_file.filename(); diff --git a/src/extract/strategy.hpp b/src/extract/strategy.hpp index e9fe60a1..47188749 100644 --- a/src/extract/strategy.hpp +++ b/src/extract/strategy.hpp @@ -104,9 +104,7 @@ class Pass { break; case osmium::item_type::way: self().way(static_cast(object)); - for (auto& e : extracts()) { - self().eway(&e, static_cast(object)); - } + self().eway_all(extracts(), static_cast(object)); break; case osmium::item_type::relation: self().relation(static_cast(object)); @@ -155,6 +153,15 @@ class Pass { void erelation(extract_data*, const osmium::Relation&) { } + // Default implementation: call eway() for each extract separately. + // Subclasses may override this to process all extracts in a single + // pass over way.nodes(). + void eway_all(std::vector& exts, const osmium::Way& way) { + for (auto& e : exts) { + self().eway(&e, way); + } + } + public: explicit Pass(TStrategy* strategy) : diff --git a/src/extract/strategy_complete_ways.cpp b/src/extract/strategy_complete_ways.cpp index 96a65594..e9b855d9 100644 --- a/src/extract/strategy_complete_ways.cpp +++ b/src/extract/strategy_complete_ways.cpp @@ -27,6 +27,7 @@ along with this program. If not, see . #include #include +#include #include #include #include @@ -100,6 +101,46 @@ namespace strategy_complete_ways { } } + // Override that scans way.nodes() at most twice for all extracts + // combined instead of up to twice per extract as the default does. + // Pass A finds which extracts claim this way (bitmask, max 64 extracts). + // Pass B records all node refs into extra_node_ids for matched extracts. + void eway_all(std::vector& exts, const osmium::Way& way) { + const std::size_t n = exts.size(); + std::uint64_t found_mask = 0; + std::size_t remaining = n; + + for (const auto& nr : way.nodes()) { + const auto node_id = nr.positive_ref(); + for (std::size_t i = 0; i < n; ++i) { + if (!(found_mask & (std::uint64_t{1} << i)) && + exts[i].node_ids.get(node_id)) { + found_mask |= std::uint64_t{1} << i; + exts[i].way_ids.set(way.positive_id()); + if (--remaining == 0) { + break; + } + } + } + if (remaining == 0) { + break; + } + } + + if (found_mask == 0) { + return; + } + + for (const auto& nr : way.nodes()) { + const auto node_ref = nr.ref(); + for (std::size_t i = 0; i < n; ++i) { + if (found_mask & (std::uint64_t{1} << i)) { + exts[i].extra_node_ids.set(node_ref); + } + } + } + } + void relation(const osmium::Relation& relation) { m_check_order.relation(relation); m_relations_map_stash.add_members(relation);