forked from sccn/liblsl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsend_buffer.cpp
More file actions
68 lines (57 loc) · 2.34 KB
/
send_buffer.cpp
File metadata and controls
68 lines (57 loc) · 2.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include "send_buffer.h"
#include "consumer_queue.h"
#include <memory>
using namespace lsl;
/**
* Add a new consumer to the send buffer.
* Each consumer will get all samples (although the oldest samples will be dropped when the buffer
*capacity is overrun).
* @param max_buffered If non-zero, the queue size for this consumer will be constrained to be no
*larger than this value. Note that the actual queue size will also never exceed the max_capacity of
*the send_buffer (so this is a global limit).
* @return Shared pointer to the newly created queue.
*/
std::shared_ptr<consumer_queue> send_buffer::new_consumer(int max_buffered) {
max_buffered = max_buffered ? std::min(max_buffered, max_capacity_) : max_capacity_;
return std::make_shared<consumer_queue>(max_buffered, shared_from_this());
}
/**
* Push a sample onto the send buffer.
* Will subsequently be seen by all consumers.
*/
void send_buffer::push_sample(const sample_p &s) {
std::lock_guard<std::mutex> lock(consumers_mut_);
for (auto &consumer : consumers_) consumer->push_sample(s);
}
/// Registered a new consumer.
void send_buffer::register_consumer(consumer_queue *q) {
{
std::lock_guard<std::mutex> lock(consumers_mut_);
if (std::find(consumers_.begin(), consumers_.end(), q) != consumers_.end())
LOG_F(WARNING, "Duplicate consumer queue in send buffer");
else
consumers_.push_back(q);
}
some_registered_.notify_all();
}
/// Unregister a previously registered consumer.
void send_buffer::unregister_consumer(consumer_queue *q) {
std::lock_guard<std::mutex> lock(consumers_mut_);
auto pos = std::find(consumers_.begin(), consumers_.end(), q);
if (pos == consumers_.end()) LOG_F(ERROR, "Trying to remove consumer queue not in send buffer");
// Put the element to be removed at the end (if it isn't there already) and
// remove the last element
if (*pos != consumers_.back()) std::swap(*pos, consumers_.back());
consumers_.pop_back();
}
/// Check whether there currently are consumers.
bool send_buffer::have_consumers() {
std::lock_guard<std::mutex> lock(consumers_mut_);
return some_registered();
}
/// Wait until some consumers are present.
bool send_buffer::wait_for_consumers(double timeout) {
std::unique_lock<std::mutex> lock(consumers_mut_);
return some_registered_.wait_for(
lock, std::chrono::duration<double>(timeout), [this]() { return some_registered(); });
}