@@ -90,8 +90,14 @@ bool MyQueue::push(arm_cmsis_stream::Message &&event)
9090 else
9191 {
9292 LOG_ERR (" Event queue overflow for priority %d\n " , p);
93+ this ->setError (CG_EVENT_QUEUE_FULL);
9394 }
9495 }
96+ else
97+ {
98+ this ->setError (CG_OS_ERROR, CG_UNIDENTIFIED_NODE,
99+ static_cast <int32_t >(error));
100+ }
95101 CG_EXIT_CRITICAL_SECTION (queue_mutex, error);
96102
97103 notifyQueue ();
@@ -156,9 +162,9 @@ void MyQueue::end() noexcept
156162void MyQueue::execute ()
157163{
158164 CG_MUTEX_ERROR_TYPE error;
159- while (!this ->mustEnd ())
165+ while (( !this ->mustEnd ()) && (! this -> mustPause () ))
160166 {
161- while ((!this ->mustEnd ()) && (!isEmpty ()))
167+ while ((!this ->mustEnd ()) && (!this -> mustPause ()) && (! isEmpty ()))
162168 {
163169 Message msg;
164170 bool messageWasReceived = false ;
@@ -182,6 +188,11 @@ void MyQueue::execute()
182188 }
183189 }
184190 }
191+ else
192+ {
193+ this ->setError (CG_OS_ERROR, CG_UNIDENTIFIED_NODE,
194+ static_cast <int32_t >(error));
195+ }
185196 CG_EXIT_CRITICAL_SECTION (queue_mutex, error);
186197
187198 // Process event with no lock held
@@ -226,18 +237,25 @@ void MyQueue::execute()
226237 if (std::holds_alternative<LocalDestination>(msg.destination ))
227238 {
228239 LocalDestination &local = std::get<LocalDestination>(msg.destination );
229- local.dst ->processEvent (local.dstPort , std::move (msg.event ));
240+ cg_status status = local.dst ->processEvent (local.dstPort , std::move (msg.event ));
241+ if (status != CG_SUCCESS)
242+ {
243+ this ->setError (status, local.dst ->nodeID ());
244+ }
230245 }
231246 else if (std::holds_alternative<DistantDestination>(msg.destination ))
232247 {
233248 DistantDestination &dist = std::get<DistantDestination>(msg.destination );
234- this ->callAsyncHandler (dist.src_node_id , std::move (msg.event ));
249+ if (!this ->callAsyncHandler (dist.src_node_id , std::move (msg.event )))
250+ {
251+ this ->setError (CG_EVENT_QUEUE_FULL, dist.src_node_id );
252+ }
235253 }
236254 cg_eventThread_->setPriority (ThreadPriority::High); // Back to highest priority
237255 }
238256 }
239257 }
240- if (this ->mustEnd ())
258+ if (this ->mustEnd () || this -> mustPause () )
241259 {
242260 return ;
243261 }
0 commit comments