Skip to content

Commit acd0888

Browse files
committed
fix the exec::repeat_effect algorithm
1 parent 3b32a65 commit acd0888

3 files changed

Lines changed: 72 additions & 25 deletions

File tree

include/exec/repeat_effect_until.hpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,10 +241,9 @@ namespace exec {
241241

242242
template <class _Sender>
243243
auto transform_sender(_Sender &&__sndr, __ignore) {
244-
return __sexpr_apply(
245-
static_cast<_Sender &&>(__sndr), []<class _Child>(__ignore, __ignore, _Child __child) {
246-
return repeat_effect_until_t{}(stdexec::then(std::move(__child)), _never{});
247-
});
244+
return __sexpr_apply(static_cast<_Sender &&>(__sndr), [](__ignore, __ignore, auto __child) {
245+
return repeat_effect_until_t{}(stdexec::then(std::move(__child), _never{}));
246+
});
248247
}
249248
};
250249
} // namespace __repeat_effect

test/exec/test_repeat_effect_until.cpp

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
*/
1717

1818
#include "exec/repeat_effect_until.hpp"
19-
#include "exec/on.hpp"
20-
#include "exec/trampoline_scheduler.hpp"
2119
#include "exec/static_thread_pool.hpp"
22-
#include "stdexec/concepts.hpp"
2320
#include "stdexec/execution.hpp"
2421

2522
#include <test_common/schedulers.hpp>
@@ -163,7 +160,9 @@ namespace {
163160
REQUIRE(called);
164161
}
165162

166-
TEST_CASE("repeat_effect_until works with bulk on a static_thread_pool", "[adaptors][repeat_n]") {
163+
TEST_CASE(
164+
"repeat_effect_until works with bulk on a static_thread_pool",
165+
"[adaptors][repeat_effect_until]") {
167166
exec::static_thread_pool pool{2};
168167
std::atomic<bool> failed{false};
169168
const auto tid = std::this_thread::get_id();
@@ -180,4 +179,22 @@ namespace {
180179
stdexec::sync_wait(std::move(snd));
181180
REQUIRE(called);
182181
}
182+
183+
TEST_CASE("repeat_effect repeats until an error is encountered", "[adaptors][repeat_effect]") {
184+
int counter = 0;
185+
ex::sender auto snd = exec::repeat_effect(
186+
succeed_n_sender(10, ex::set_error, std::string("error")) | ex::then([&] { ++counter; }));
187+
auto op = ex::connect(std::move(snd), expect_error_receiver{std::string("error")});
188+
ex::start(op);
189+
REQUIRE(counter == 10);
190+
}
191+
192+
TEST_CASE("repeat_effect repeats until stopped is encountered", "[adaptors][repeat_effect]") {
193+
int counter = 0;
194+
ex::sender auto snd = exec::repeat_effect(
195+
succeed_n_sender(10, ex::set_stopped) | ex::then([&] { ++counter; }));
196+
auto op = ex::connect(std::move(snd), expect_stopped_receiver{});
197+
ex::start(op);
198+
REQUIRE(counter == 10);
199+
}
183200
} // namespace

test/test_common/senders.hpp

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <catch2/catch.hpp>
2020
#include <test_common/type_helpers.hpp>
2121
#include <stdexec/execution.hpp>
22+
#include <memory>
2223

2324
namespace ex = stdexec;
2425

@@ -35,8 +36,8 @@ namespace {
3536
};
3637

3738
template <class Receiver>
38-
friend auto tag_invoke(ex::connect_t, a_sender_of, Receiver&&) noexcept {
39-
return operation();
39+
auto connect(Receiver rcvr) const noexcept {
40+
return operation{};
4041
}
4142
};
4243

@@ -56,20 +57,14 @@ namespace {
5657
Receiver rcvr_;
5758

5859
void start() & noexcept {
59-
STDEXEC_TRY {
60-
std::apply(
61-
[&](Values&... ts) { ex::set_value(std::move(rcvr_), std::move(ts)...); }, values_);
62-
}
63-
STDEXEC_CATCH_ALL {
64-
ex::set_error(std::move(rcvr_), std::current_exception());
65-
}
60+
std::apply(
61+
[&](Values&... ts) { ex::set_value(std::move(rcvr_), std::move(ts)...); }, values_);
6662
}
6763
};
6864

6965
template <class Receiver>
70-
friend auto tag_invoke(ex::connect_t, fallible_just&& self, Receiver&& rcvr)
71-
-> operation<std::decay_t<Receiver>> {
72-
return {{}, std::move(self.values_), std::forward<Receiver>(rcvr)};
66+
auto connect(Receiver rcvr) && -> operation<std::decay_t<Receiver>> {
67+
return {{}, std::move(values_), std::forward<Receiver>(rcvr)};
7368
}
7469

7570
std::tuple<Values...> values_;
@@ -114,9 +109,8 @@ namespace {
114109
};
115110

116111
template <class Receiver>
117-
friend auto tag_invoke(ex::connect_t, just_with_env&& self, Receiver&& rcvr)
118-
-> operation<std::decay_t<Receiver>> {
119-
return {{}, std::move(self.values_), std::forward<Receiver>(rcvr)};
112+
auto connect(Receiver rcvr) && -> operation<Receiver> {
113+
return {{}, std::move(values_), std::forward<Receiver>(rcvr)};
120114
}
121115

122116
auto get_env() const noexcept -> Env {
@@ -180,8 +174,7 @@ namespace {
180174
};
181175

182176
template <ex::__decays_to<completes_if> Self, class Receiver>
183-
friend auto tag_invoke(ex::connect_t, Self&& self, Receiver&& rcvr) noexcept
184-
-> operation<std::decay_t<Receiver>> {
177+
static auto connect(Self&& self, Receiver rcvr) noexcept -> operation<Receiver> {
185178
return {self.condition_, std::forward<Receiver>(rcvr)};
186179
}
187180
};
@@ -199,4 +192,42 @@ namespace {
199192
return lhs.x == rhs.x;
200193
}
201194
};
195+
196+
template <class Tag, class... Args>
197+
struct succeed_n_sender {
198+
using sender_concept = stdexec::sender_t;
199+
using completion_signatures = ex::completion_signatures<ex::set_value_t(), Tag(Args...)>;
200+
201+
explicit succeed_n_sender(int count, Tag, Args... args)
202+
: args_(std::move(args)...)
203+
, counter_(std::make_shared<std::atomic<int>>(count)) {
204+
}
205+
206+
template <class Receiver>
207+
struct operation {
208+
void start() noexcept {
209+
if (--*counter_ == -1) {
210+
std::apply(
211+
[&](Args&... args) -> void { Tag{}(std::move(rcvr_), std::move(args)...); }, args_);
212+
} else {
213+
ex::set_value(std::move(rcvr_));
214+
}
215+
}
216+
217+
std::tuple<Args...> args_;
218+
std::shared_ptr<std::atomic<int>> counter_;
219+
Receiver rcvr_;
220+
};
221+
222+
template <class Receiver>
223+
auto connect(Receiver rcvr) const -> operation<Receiver> {
224+
return {args_, counter_, std::forward<Receiver>(rcvr)};
225+
}
226+
227+
private:
228+
std::tuple<Args...> args_;
229+
std::shared_ptr<std::atomic<int>> counter_;
230+
};
231+
232+
202233
} // namespace

0 commit comments

Comments
 (0)