-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathBlockingQueue.h
More file actions
124 lines (97 loc) · 3.58 KB
/
BlockingQueue.h
File metadata and controls
124 lines (97 loc) · 3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// ===========================================================================
// BlockingQueue.h
// ===========================================================================
#pragma once
#include "../Logger/Logger.h"
#include <condition_variable> // for std::condition_variable
#include <cstddef> // for std::size_t
#include <mutex> // for std::mutex
#include <utility> // for std::move
#include <queue> // for std::queue
namespace ProducerConsumerQueue
{
template<typename T, std::size_t QueueSize = 10>
class BlockingQueue
{
private:
std::queue<T> m_data; // queue container used to simulate a bounded buffer
mutable std::mutex m_mutex;
// Monitor Concept (Dijkstra)
std::condition_variable m_conditionIsEmpty;
std::condition_variable m_conditionIsFull;
public:
// default c'tor
BlockingQueue()
{
Logger::log(std::cout, "Using Blocking Queue with Condition Variables");
}
// don't need other constructors or assignment operators
BlockingQueue(const BlockingQueue&) = delete;
BlockingQueue(BlockingQueue&&) = delete;
BlockingQueue& operator= (const BlockingQueue&) = delete;
BlockingQueue& operator= (BlockingQueue&&) = delete;
// public interface
void push(const T& item)
{
{
std::unique_lock<std::mutex> guard{ m_mutex };
// wait until there's space (handles lost/spurious wakeups)
m_conditionIsFull.wait(
guard,
[this]() -> bool { return m_data.size() < QueueSize; }
);
// push item
m_data.push(item);
Logger::log(std::cout, " Size: ", m_data.size());
}
// wakeup any sleeping consumers
m_conditionIsEmpty.notify_all();
}
void push(T&& item)
{
{
std::unique_lock<std::mutex> guard{ m_mutex };
// wait until there's space
m_conditionIsFull.wait(
guard,
[this]() -> bool { return m_data.size() < QueueSize; }
);
// push moved item
m_data.push(std::move(item));
Logger::log(std::cout, " Size: ", m_data.size());
}
// wakeup any sleeping consumers
m_conditionIsEmpty.notify_all();
}
void pop(T& item)
{
{
std::unique_lock<std::mutex> guard{ m_mutex };
// wait until there's at least one item
m_conditionIsEmpty.wait(
guard,
[this]() -> bool { return !m_data.empty(); }
);
// retrieve and pop front
item = std::move(m_data.front());
m_data.pop();
Logger::log(std::cout, " Size: ", m_data.size());
}
// wakeup any sleeping producers
m_conditionIsFull.notify_all();
}
bool empty() const
{
std::lock_guard<std::mutex> guard{ m_mutex };
return m_data.empty();
}
std::size_t size() const
{
std::lock_guard<std::mutex> guard{ m_mutex };
return m_data.size();
}
};
}
// ===========================================================================
// End-of-File
// ===========================================================================