-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathMultithreadedMeasurement.cpp
More file actions
146 lines (127 loc) · 4.62 KB
/
Copy pathMultithreadedMeasurement.cpp
File metadata and controls
146 lines (127 loc) · 4.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/** Copyright © 2019 Université de Genève, LMU Munich - Faculty of Physics, IAP-CNRS/Sorbonne Université
*
* This library is free software; you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation; either version 3.0 of the License, or (at your option)
* any later version.
*
* This library is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this library; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
/*
* MultiThreadedMeasurement.cpp
*
* Created on: May 23, 2018
* Author: mschefer
*/
#include <chrono>
#include <ElementsKernel/Logging.h>
#include <csignal>
#include "SEImplementation/Plugin/SourceIDs/SourceID.h"
#include "SEImplementation/Measurement/MultithreadedMeasurement.h"
using namespace SourceXtractor;
static Elements::Logging logger = Elements::Logging::getLogger("Multithreading");
MultithreadedMeasurement::~MultithreadedMeasurement() {
if (m_output_thread && m_output_thread->joinable()) {
m_output_thread->join();
}
}
void MultithreadedMeasurement::startThreads() {
m_output_thread = Euclid::make_unique<std::thread>(outputThreadStatic, this);
}
void MultithreadedMeasurement::stopThreads() {
m_input_done = true;
m_thread_pool->block();
if (m_output_thread && m_output_thread->joinable()) {
m_output_thread->join();
}
logger.debug() << "All worker threads done!";
}
void MultithreadedMeasurement::synchronizeThreads() {
// Wait until all worker threads are done
m_thread_pool->block();
// Wait until the output queue is empty
while (true) {
{
std::unique_lock<std::mutex> output_lock(m_output_queue_mutex);
if (m_output_queue.empty()) {
break;
}
else if (m_thread_pool->checkForException(false)) {
logger.fatal() << "An exception was thrown from a worker thread";
m_thread_pool->checkForException(true);
}
else if (m_thread_pool->activeThreads() == 0) {
throw Elements::Exception() << "No active threads and the queue is not empty! Please, report this as a bug";
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void MultithreadedMeasurement::receiveSource(std::unique_ptr<SourceGroupInterface> source_group) {
// Force computation of SourceID here, where the order is still deterministic
for (auto& source : *source_group) {
source.getProperty<SourceID>();
}
// Put the new SourceGroup into the input queue
auto order_number = m_group_counter;
auto lambda = [this, order_number, source_group = std::move(source_group)]() mutable {
// Trigger measurements
for (auto& source : *source_group) {
m_source_to_row(source);
}
// Pass to the output thread
{
std::lock_guard<std::mutex> output_lock(m_output_queue_mutex);
m_output_queue.emplace_back(order_number, std::move(source_group));
}
};
auto lambda_copyable = [lambda = std::make_shared<decltype(lambda)>(std::move(lambda))](){
(*lambda)();
};
m_thread_pool->submit(lambda_copyable);
++m_group_counter;
}
void MultithreadedMeasurement::outputThreadStatic(MultithreadedMeasurement *measurement) {
logger.debug() << "Starting output thread";
try {
measurement->outputThreadLoop();
}
catch (const Elements::Exception& e) {
logger.fatal() << "Output thread got an exception!";
logger.fatal() << e.what();
if (!measurement->m_abort_raised.exchange(true)) {
logger.fatal() << "Aborting the execution";
::raise(SIGTERM);
}
}
logger.debug() << "Stopping output thread";
}
void MultithreadedMeasurement::outputThreadLoop() {
while (m_thread_pool->activeThreads() > 0) {
{
std::unique_lock<std::mutex> output_lock(m_output_queue_mutex);
while (!m_output_queue.empty()) {
auto source = std::move(m_output_queue.front().second);
m_output_queue.pop_front();
output_lock.unlock();
sendSource(std::move(source));
output_lock.lock();
}
if (m_input_done && m_thread_pool->running() + m_thread_pool->queued() == 0 &&
m_output_queue.empty()) {
break;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
void MultithreadedMeasurement::receiveProcessSignal(const ProcessSourcesEvent& event) {
sendProcessSignal(event);
}