Skip to content

Commit b313290

Browse files
Add static channel
1 parent 65a88c5 commit b313290

6 files changed

Lines changed: 450 additions & 14 deletions

File tree

.github/workflows/cmake.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,11 @@ jobs:
7676

7777
- name: Build
7878
working-directory: ${{github.workspace}}/build
79-
run: cmake --build . --config ${{ matrix.config.build_type }} --target tests -j
79+
run: cmake --build . --config ${{ matrix.config.build_type }} --target channel_tests -j
8080

8181
- name: Test
8282
working-directory: ${{github.workspace}}/build
83-
run: ctest -C ${{ matrix.config.build_type }} --verbose -R channel_test --output-on-failure -j
83+
run: ctest -C ${{ matrix.config.build_type }} --verbose -L channel_tests --output-on-failure -j
8484

8585
- name: Run examples
8686
working-directory: ${{github.workspace}}/build
@@ -106,7 +106,7 @@ jobs:
106106

107107
- name: Test
108108
working-directory: ${{github.workspace}}/build
109-
run: ctest -C Debug --verbose -R channel_test -j
109+
run: ctest -C Debug --verbose -L channel_test -j
110110

111111
- name: Upload coverage reports to Codecov
112112
uses: codecov/codecov-action@v5

include/msd/blocking_iterator.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class blocking_iterator {
6464
*/
6565
reference operator*()
6666
{
67-
chan_ >> value_;
67+
chan_.read(value_);
6868

6969
return value_;
7070
}

include/msd/channel.hpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,9 @@ class channel {
134134
return false;
135135
}
136136

137-
if (!(size_ == 0)) {
138-
out = std::move(queue_.front());
139-
queue_.pop();
140-
--size_;
141-
}
137+
out = std::move(queue_.front());
138+
queue_.pop();
139+
--size_;
142140
}
143141

144142
cnd_.notify_one();
@@ -247,8 +245,6 @@ class channel {
247245
cnd_.wait(lock, [this]() { return size_ < cap_; });
248246
}
249247
}
250-
251-
friend class blocking_iterator<channel>;
252248
};
253249

254250
template <typename T>

include/msd/static_channel.hpp

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
// Copyright (C) 2020-2025 Andrei Avram
2+
3+
#ifndef MSD_STATIC_CHANNEL_HPP_
4+
#define MSD_STATIC_CHANNEL_HPP_
5+
6+
#include <array>
7+
#include <condition_variable>
8+
#include <cstdlib>
9+
#include <mutex>
10+
11+
#include "channel.hpp"
12+
13+
namespace msd {
14+
15+
/**
16+
* @brief Thread-safe container for sharing data between threads.
17+
*
18+
* Allocates on the stack.
19+
* Does not throw exceptions.
20+
* Implements a blocking input iterator.
21+
*
22+
* @tparam T The type of the elements.
23+
* @tparam Capacity The maximum number of elements the channel can hold before blocking.
24+
*/
25+
template <typename T, std::size_t Capacity>
26+
class static_channel {
27+
public:
28+
static_assert(Capacity > 0, "Channel capacity must be greater than zero.");
29+
30+
/**
31+
* @brief The type of elements stored in the channel.
32+
*/
33+
using value_type = T;
34+
35+
/**
36+
* @brief The iterator type used to traverse the channel.
37+
*/
38+
using iterator = blocking_iterator<static_channel<T, Capacity>>;
39+
40+
/**
41+
* @brief The type used to represent sizes and counts.
42+
*/
43+
using size_type = std::size_t;
44+
45+
/**
46+
* @brief Creates a new channel.
47+
*/
48+
constexpr static_channel() = default;
49+
50+
/**
51+
* @brief Pushes an element into the channel.
52+
*
53+
* @tparam Type The type of the elements.
54+
*
55+
* @param value The element to be pushed into the channel.
56+
*
57+
* @return true If an element was successfully pushed into the channel.
58+
* @return false If the channel is closed.
59+
*/
60+
template <typename Type>
61+
bool write(Type&& value)
62+
{
63+
{
64+
std::unique_lock<std::mutex> lock{mtx_};
65+
waitBeforeWrite(lock);
66+
67+
if (is_closed_) {
68+
return false;
69+
}
70+
71+
array_[(front_ + size_) % Capacity] = std::forward<Type>(value);
72+
++size_;
73+
}
74+
75+
cnd_.notify_one();
76+
77+
return true;
78+
}
79+
80+
/**
81+
* @brief Pops an element from the channel.
82+
*
83+
* @param out Reference to the variable where the popped element will be stored.
84+
*
85+
* @return true If an element was successfully read from the channel.
86+
* @return false If the channel is closed and empty.
87+
*/
88+
bool read(T& out)
89+
{
90+
{
91+
std::unique_lock<std::mutex> lock{mtx_};
92+
waitBeforeRead(lock);
93+
94+
if (is_closed_ && size_ == 0) {
95+
return false;
96+
}
97+
98+
out = std::move(array_[front_]);
99+
front_ = (front_ + 1) % Capacity;
100+
--size_;
101+
}
102+
103+
cnd_.notify_one();
104+
105+
return true;
106+
}
107+
108+
/**
109+
* @brief Returns the current size of the channel.
110+
*
111+
* @return The number of elements in the channel.
112+
*/
113+
NODISCARD size_type size() const noexcept
114+
{
115+
std::unique_lock<std::mutex> lock{mtx_};
116+
return size_;
117+
}
118+
119+
/**
120+
* @brief Checks if the channel is empty.
121+
*
122+
* @return true If the channel contains no elements.
123+
* @return false Otherwise.
124+
*/
125+
NODISCARD bool empty() const noexcept
126+
{
127+
std::unique_lock<std::mutex> lock{mtx_};
128+
return size_ == 0;
129+
}
130+
131+
/**
132+
* @brief Closes the channel, no longer accepting new elements.
133+
*/
134+
void close() noexcept
135+
{
136+
{
137+
std::unique_lock<std::mutex> lock{mtx_};
138+
is_closed_ = true;
139+
}
140+
cnd_.notify_all();
141+
}
142+
143+
/**
144+
* @brief Checks if the channel has been closed.
145+
*
146+
* @return true If no more elements can be added to the channel.
147+
* @return false Otherwise.
148+
*/
149+
NODISCARD bool closed() const noexcept
150+
{
151+
std::unique_lock<std::mutex> lock{mtx_};
152+
return is_closed_;
153+
}
154+
155+
/**
156+
* @brief Checks if the channel has been closed and is empty.
157+
*
158+
* @return true If nothing can be read anymore from the channel.
159+
* @return false Otherwise.
160+
*/
161+
NODISCARD bool drained() noexcept
162+
{
163+
std::unique_lock<std::mutex> lock{mtx_};
164+
return is_closed_ && size_ == 0;
165+
}
166+
167+
/**
168+
* @brief Returns an iterator to the beginning of the channel.
169+
*
170+
* @return A blocking iterator pointing to the start of the channel.
171+
*/
172+
iterator begin() noexcept { return blocking_iterator<static_channel<T, Capacity>>{*this}; }
173+
174+
/**
175+
* @brief Returns an iterator representing the end of the channel.
176+
*
177+
* @return A blocking iterator representing the end condition.
178+
*/
179+
iterator end() noexcept { return blocking_iterator<static_channel<T, Capacity>>{*this}; }
180+
181+
/**
182+
* Channel cannot be copied or moved.
183+
*/
184+
static_channel(const static_channel&) = delete;
185+
static_channel& operator=(const static_channel&) = delete;
186+
static_channel(static_channel&&) = delete;
187+
static_channel& operator=(static_channel&&) = delete;
188+
virtual ~static_channel() = default;
189+
190+
private:
191+
std::array<T, Capacity> array_{};
192+
size_type front_{0};
193+
std::size_t size_{0};
194+
const size_type cap_{Capacity};
195+
mutable std::mutex mtx_;
196+
std::condition_variable cnd_;
197+
bool is_closed_{false};
198+
199+
void waitBeforeRead(std::unique_lock<std::mutex>& lock)
200+
{
201+
cnd_.wait(lock, [this]() { return !(size_ == 0) || is_closed_; });
202+
};
203+
204+
void waitBeforeWrite(std::unique_lock<std::mutex>& lock)
205+
{
206+
if (cap_ > 0 && size_ == cap_) {
207+
cnd_.wait(lock, [this]() { return size_ < cap_; });
208+
}
209+
}
210+
};
211+
212+
} // namespace msd
213+
214+
#endif // MSD_STATIC_CHANNEL_HPP_

tests/CMakeLists.txt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,17 @@ function(package_add_test TESTNAME)
3434
endif ()
3535

3636
add_test(NAME ${TESTNAME} COMMAND ${TESTNAME})
37+
set_tests_properties(${TESTNAME} PROPERTIES LABELS "channel_tests")
3738

38-
add_dependencies(tests ${TESTNAME})
39+
add_dependencies(channel_tests ${TESTNAME})
3940
endfunction()
4041

41-
add_custom_target(tests)
42+
add_custom_target(channel_tests)
4243

4344
# Tests
44-
package_add_test(channel_test channel_test.cpp blocking_iterator_test.cpp)
45+
package_add_test(channel_test channel_test.cpp)
46+
package_add_test(static_channel_test static_channel_test.cpp)
47+
package_add_test(blocking_iterator_test blocking_iterator_test.cpp)
4548

4649
if (CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang")
4750
# Disable warnings about C++17 extensions

0 commit comments

Comments
 (0)