Skip to content

Commit 717c33c

Browse files
committed
New example empty_notificator_2.
1 parent 573bb0f commit 717c33c

7 files changed

Lines changed: 316 additions & 0 deletions

File tree

dev/sample/so_5/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,5 @@ add_subdirectory(bind_transformer)
8585
add_subdirectory(agent_name)
8686

8787
add_subdirectory(mchain_empty_notificator)
88+
add_subdirectory(mchain_empty_notificator_2)
8889

dev/sample/so_5/build_samples.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,5 @@
9191
example[ 'agent_name' ]
9292

9393
example[ 'mchain_empty_notificator' ]
94+
example[ 'mchain_empty_notificator_2' ]
9495
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
set(SAMPLE sample.so_5.mchain_empty_notificator_2)
2+
add_executable(${SAMPLE} main.cpp)
3+
target_link_libraries(${SAMPLE} sobjectizer::SharedLib)
4+
install(TARGETS ${SAMPLE} DESTINATION bin)
5+
6+
set(SAMPLE_S sample.so_5.mchain_empty_notificator_2_s)
7+
add_executable(${SAMPLE_S} main.cpp)
8+
target_link_libraries(${SAMPLE_S} sobjectizer::StaticLib)
9+
install(TARGETS ${SAMPLE_S} DESTINATION bin)
10+
Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
/*
2+
* An example of using mchain with empty_notificator.
3+
*
4+
* There is a producer that tries to send a message to a mchain, but
5+
* only if the mchain is empty. The emptyness of the mchain is detected
6+
* by a empty_notificator.
7+
*
8+
* The producer sends messages to the target mchain until it becomes full
9+
* and then wait for the emptyness of the mchain to resume sending.
10+
*/
11+
12+
#include <iostream>
13+
#include <chrono>
14+
15+
#include <so_5/all.hpp>
16+
17+
//
18+
// Stuff for logging.
19+
//
20+
using log_msg = std::string;
21+
22+
so_5::mbox_t make_logger( so_5::coop_t & coop )
23+
{
24+
class logger_actor final : public so_5::agent_t {
25+
public:
26+
using so_5::agent_t::agent_t;
27+
28+
void so_define_agent() override {
29+
so_subscribe_self().event( [](mhood_t<log_msg> cmd ) {
30+
std::cout << *cmd << std::endl;
31+
} );
32+
}
33+
};
34+
35+
// Logger will work on its own working thread.
36+
auto logger = coop.make_agent_with_binder< logger_actor >(
37+
so_5::disp::one_thread::make_dispatcher(
38+
coop.environment() ).binder() );
39+
40+
return logger->so_direct_mbox();
41+
}
42+
43+
template< typename A >
44+
inline void operator<<=( const so_5::mbox_t & to, A && a )
45+
{
46+
so_5::send< log_msg >( to, std::forward< A >(a) );
47+
}
48+
49+
struct msg_maker
50+
{
51+
std::ostringstream m_os;
52+
53+
template< typename A >
54+
msg_maker & operator<<( A && a )
55+
{
56+
m_os << std::forward< A >(a);
57+
return *this;
58+
}
59+
};
60+
61+
inline void operator<<=( const so_5::mbox_t & to, msg_maker & maker )
62+
{
63+
to <<= maker.m_os.str();
64+
}
65+
66+
//
67+
// Implementation of producers.
68+
//
69+
70+
// A request to be sent for processing.
71+
struct request
72+
{
73+
std::string m_payload;
74+
};
75+
76+
/*
77+
* Producer agent will send N requests and then closes mchain.
78+
*/
79+
class producer final : public so_5::agent_t
80+
{
81+
// This signal initiates next send attempt.
82+
struct send_next final : public so_5::signal_t {};
83+
84+
// This signal indicates that the mchain is empty.
85+
struct mchain_is_empty final : public so_5::signal_t {};
86+
87+
public :
88+
producer( context_t ctx,
89+
std::string name,
90+
so_5::mbox_t logger_mbox,
91+
unsigned int requests )
92+
: so_5::agent_t{ ctx }
93+
, m_name( std::move(name) )
94+
, m_logger_mbox{ std::move(logger_mbox) }
95+
, m_attempts_left{ requests }
96+
{
97+
// Create the target mchain.
98+
m_target_mchain = so_environment().create_mchain(
99+
so_5::make_limited_without_waiting_mchain_params(
100+
// Use mchain as a buffer of fixed capacity.
101+
5,
102+
so_5::mchain_props::memory_usage_t::preallocated,
103+
so_5::mchain_props::overflow_reaction_t::throw_exception)
104+
.empty_notificator(
105+
[m = so_direct_mbox()]() {
106+
// Tell the agent that the mchain is empty now.
107+
so_5::send< mchain_is_empty >( m );
108+
} )
109+
);
110+
}
111+
112+
// Get the target mchain.
113+
[[nodiscard]]
114+
so_5::mchain_t target_mchain() const
115+
{
116+
return m_target_mchain;
117+
}
118+
119+
void so_define_agent() override
120+
{
121+
this >>= st_sending_enabled;
122+
123+
st_sending_enabled
124+
.event( &producer::evt_send_next )
125+
;
126+
127+
st_sending_blocked
128+
.event( &producer::evt_send_next_when_blocked )
129+
.event( &producer::evt_mchain_is_empty )
130+
;
131+
}
132+
133+
void so_evt_start() override
134+
{
135+
// Initiate request sending loop.
136+
m_send_timer = so_5::send_periodic< send_next >(
137+
*this,
138+
// No pause before the first sent.
139+
std::chrono::milliseconds::zero(),
140+
// Pause before next send.
141+
std::chrono::milliseconds{ 20 });
142+
}
143+
144+
private :
145+
// State in that agent can try to send messages.
146+
state_t st_sending_enabled{ this, "sending_enabled" };
147+
// State in that agent can't send messages because mchain
148+
// isn't empty yet.
149+
state_t st_sending_blocked{ this, "sending_blocked" };
150+
151+
const std::string m_name;
152+
153+
const so_5::mbox_t m_logger_mbox;
154+
155+
// How many attempts remains.
156+
unsigned int m_attempts_left;
157+
158+
// The target mchain to be used.
159+
so_5::mchain_t m_target_mchain;
160+
161+
// Timer ID for periodic send_next signal.
162+
so_5::timer_id_t m_send_timer;
163+
164+
// An event for next attempt to send another requests.
165+
void evt_send_next(mhood_t< send_next >)
166+
{
167+
const auto result = so_5::select(
168+
so_5::from_all().handle_n( 1 ).no_wait_on_empty(),
169+
so_5::send_case(
170+
m_target_mchain,
171+
so_5::message_holder_t< request >::make(
172+
m_name + "_request_" + std::to_string( m_attempts_left ) ),
173+
[this]() {
174+
m_logger_mbox <<= ( msg_maker() << m_name
175+
<< ": message stored to target mbox" );
176+
--m_attempts_left;
177+
} ) );
178+
179+
if( !result.was_sent() )
180+
{
181+
// Message wasn't send.
182+
m_logger_mbox <<= ( msg_maker{} << m_name
183+
<< ": message is not sent because mchain is full" );
184+
185+
// The sending has to be paused.
186+
this >>= st_sending_blocked;
187+
}
188+
else
189+
{
190+
if( !m_attempts_left )
191+
{
192+
// It's time to close the target mchain.
193+
so_5::close_retain_content(
194+
so_5::exceptions_enabled,
195+
m_target_mchain );
196+
197+
// Periodic message has to be cancelled because all
198+
// messages have been sent.
199+
m_send_timer.release();
200+
}
201+
}
202+
}
203+
204+
// An event for next attempt to send another requests.
205+
void evt_send_next_when_blocked(mhood_t< send_next >)
206+
{
207+
m_logger_mbox <<= ( msg_maker{} << m_name
208+
<< ": message can't be sent because mchain is full" );
209+
}
210+
211+
// Reaction to notification about emptyness of the target mbox.
212+
void evt_mchain_is_empty( mhood_t<mchain_is_empty> )
213+
{
214+
m_logger_mbox <<= ( msg_maker{} << m_name
215+
<< ": mchain_is_empty received" );
216+
this >>= st_sending_enabled;
217+
}
218+
};
219+
220+
void run_example()
221+
{
222+
so_5::mbox_t logger_mbox;
223+
so_5::mchain_t chain_to_use;
224+
225+
// Launch SObjectizer without blocking the current thread.
226+
so_5::wrapped_env_t sobjectizer{
227+
so_5::wrapped_env_t::wait_init_completion,
228+
[&logger_mbox, &chain_to_use]( so_5::environment_t & env )
229+
{
230+
chain_to_use = env.introduce_coop(
231+
[&logger_mbox]( so_5::coop_t & coop ) {
232+
// Logger will work on its own context.
233+
logger_mbox = make_logger( coop );
234+
235+
auto * a_producer = coop.make_agent< producer >(
236+
"Alice",
237+
logger_mbox,
238+
20 );
239+
return a_producer->target_mchain();
240+
});
241+
}
242+
};
243+
244+
// Loop for reading messages from chain_to_use.
245+
auto status = so_5::mchain_props::extraction_status_t::no_messages;
246+
while( so_5::mchain_props::extraction_status_t::chain_closed != status )
247+
{
248+
// Try to get a message from the chain.
249+
status = so_5::receive(
250+
so_5::from( chain_to_use )
251+
.handle_all()
252+
.no_wait_on_empty(),
253+
[&logger_mbox]( const request & req )
254+
{
255+
logger_mbox <<= "Bob: start handling of received request";
256+
257+
logger_mbox <<= ( msg_maker{} << "Bob: request payload: "
258+
<< req.m_payload );
259+
260+
// Take a pause.
261+
std::this_thread::sleep_for( std::chrono::milliseconds{ 75 } );
262+
263+
logger_mbox <<= "Bob: finish handling of received request";
264+
} ).status();
265+
}
266+
267+
// SObjectizer will be shutdown automatically.
268+
}
269+
270+
int main()
271+
{
272+
try
273+
{
274+
run_example();
275+
276+
return 0;
277+
}
278+
catch( const std::exception & x )
279+
{
280+
std::cerr << "Exception: " << x.what() << std::endl;
281+
}
282+
283+
return 2;
284+
}
285+
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
require 'mxx_ru/cpp'
2+
3+
MxxRu::Cpp::exe_target {
4+
5+
required_prj 'so_5/prj.rb'
6+
target 'sample.so_5.mchain_empty_notificator_2'
7+
8+
cpp_source 'main.cpp'
9+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
require 'mxx_ru/cpp'
2+
3+
MxxRu::Cpp::exe_target {
4+
5+
required_prj 'so_5/prj_s.rb'
6+
target 'sample.so_5.mchain_empty_notificator_2_s'
7+
8+
cpp_source 'main.cpp'
9+
}

doxygen/dox/so_5/samples.dox

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
* \example so_5/make_pipeline/main.cpp
4242
* \example so_5/many_timers/main.cpp
4343
* \example so_5/mchain_empty_notificator/main.cpp
44+
* \example so_5/mchain_empty_notificator_2/main.cpp
4445
* \example so_5/mchain_fibonacci/main.cpp
4546
* \example so_5/mchain_handler_formats/main.cpp
4647
* \example so_5/mchain_multi_consumers/main.cpp

0 commit comments

Comments
 (0)