Skip to content

Commit 8e0a2ea

Browse files
Fix infinite loop when a disconnection occurs during DataArray reading.
1 parent c57a227 commit 8e0a2ea

4 files changed

Lines changed: 37 additions & 46 deletions

File tree

cmake/swigEtp1_2Include.i.in

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1639,17 +1639,7 @@ namespace ETP_NS
16391639
void setDataspaceProtocolHandlers(std::shared_ptr<DataspaceHandlers> dataspaceHandlers);
16401640
void setDataspaceOSDUProtocolHandlers(std::shared_ptr<DataspaceOSDUHandlers> dataspaceOSDUHandlers);
16411641

1642-
template<typename T> int64_t sendWithSpecificHandler(const T & mb, std::shared_ptr<ETP_NS::ProtocolHandlers> specificHandler, int64_t correlationId = 0, int32_t messageFlags = 0)
1643-
{
1644-
int64_t msgId = encode(mb, correlationId, messageFlags); // put the message to write in the queue
1645-
1646-
if (sendingQueue.size() == 1) {
1647-
do_write();
1648-
}
1649-
specificProtocolHandlers[msgId] = specificHandler;
1650-
1651-
return msgId;
1652-
}
1642+
template<typename T> int64_t sendWithSpecificHandler(const T & mb, std::shared_ptr<ETP_NS::ProtocolHandlers> specificHandler, int64_t correlationId = 0, int32_t messageFlags = 0) {}
16531643
%template(sendWithSpecificHandler) sendWithSpecificHandler<Energistics::Etp::v12::Protocol::Core::RequestSession>;
16541644
%template(sendWithSpecificHandler) sendWithSpecificHandler<Energistics::Etp::v12::Protocol::Core::OpenSession>;
16551645
%template(sendWithSpecificHandler) sendWithSpecificHandler<Energistics::Etp::v12::Protocol::Core::CloseSession>;
@@ -1724,15 +1714,7 @@ namespace ETP_NS
17241714
%template(sendWithSpecificHandler) sendWithSpecificHandler<Energistics::Etp::v12::Protocol::StoreNotification::SubscribeNotifications>;
17251715
%template(sendWithSpecificHandler) sendWithSpecificHandler<Energistics::Etp::v12::Protocol::StoreNotification::UnsolicitedStoreNotifications>;
17261716

1727-
template<typename T> int64_t send(const T & mb, int64_t correlationId = 0, int32_t messageFlags = 0)
1728-
{
1729-
if (protocolHandlers.size() > mb.protocolId) {
1730-
return sendWithSpecificHandler(mb, protocolHandlers[mb.protocolId], correlationId, messageFlags);
1731-
}
1732-
else {
1733-
throw std::logic_error("The agent has no registered handler at all for the protocol " + std::to_string(mb.protocolId));
1734-
}
1735-
}
1717+
template<typename T> int64_t send(const T & mb, int64_t correlationId = 0, int32_t messageFlags = 0) {}
17361718
%template(send) send<Energistics::Etp::v12::Protocol::Core::RequestSession>;
17371719
%template(send) send<Energistics::Etp::v12::Protocol::Core::OpenSession>;
17381720
%template(send) send<Energistics::Etp::v12::Protocol::Core::CloseSession>;
@@ -1807,12 +1789,7 @@ namespace ETP_NS
18071789
%template(send) send<Energistics::Etp::v12::Protocol::StoreNotification::SubscribeNotifications>;
18081790
%template(send) send<Energistics::Etp::v12::Protocol::StoreNotification::UnsolicitedStoreNotifications>;
18091791

1810-
template<typename T> void sendAndBlock(const T & mb, int64_t correlationId = 0, int32_t messageFlags = 0)
1811-
{
1812-
int64_t msgId = send(mb, correlationId, messageFlags);
1813-
while (isMessageStillProcessing(msgId)) {}
1814-
}
1815-
1792+
template<typename T> int64_t sendAndBlock(const T & mb, int64_t correlationId = 0, int32_t messageFlags = 0) {}
18161793
%template(sendAndBlock) sendAndBlock<Energistics::Etp::v12::Protocol::Core::RequestSession>;
18171794
%template(sendAndBlock) sendAndBlock<Energistics::Etp::v12::Protocol::Core::OpenSession>;
18181795
%template(sendAndBlock) sendAndBlock<Energistics::Etp::v12::Protocol::Core::CloseSession>;

src/etp/AbstractSession.h

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ namespace ETP_NS
169169
* @param messageFlags The message flags to be sent within the header
170170
* @return The ID of the message that has been put in the sending queue.
171171
*/
172-
template<typename T> void sendAndBlock(const T & mb, int64_t correlationId = 0, int32_t messageFlags = 0)
172+
template<typename T> int64_t sendAndBlock(const T & mb, int64_t correlationId = 0, int32_t messageFlags = 0)
173173
{
174174
int64_t msgId = send(mb, correlationId, messageFlags);
175175
// The correlationId of the first message MUST be set to 0 and the correlationId of all successive
@@ -184,6 +184,8 @@ namespace ETP_NS
184184
throw std::runtime_error("Time out waiting for a response of message id " + std::to_string(msgId));
185185
}
186186
}
187+
188+
return msgId;
187189
}
188190

189191
/**
@@ -223,6 +225,34 @@ namespace ETP_NS
223225
return std::get<0>(queueItem);
224226
}
225227

228+
/**
229+
* Send a message to the server and register a specific handler for the response and block the thread until the answer of the server has been processed by the handlers
230+
* Please look at setTimeOut if you want to set the default timeout value which is 10 000 ms.
231+
*
232+
* @param mb The ETP message body to send
233+
* @param correlationId The ID of the message which this message is answering to.
234+
* @param messageFlags The message flags to be sent within the header
235+
* @return The ID of the message that has been put in the sending queue.
236+
*/
237+
template<typename T> int64_t sendWithSpecificHandlerAndBlock(const T& mb, std::shared_ptr<ETP_NS::ProtocolHandlers> specificHandler, int64_t correlationId = 0, int32_t messageFlags = 0)
238+
{
239+
int64_t msgId = sendWithSpecificHandler(mb, specificHandler, correlationId, messageFlags);
240+
// The correlationId of the first message MUST be set to 0 and the correlationId of all successive
241+
// messages in the same multipart request or notification MUST be set to the messageId of the first
242+
// message of the multipart request or notification.
243+
// If the request message is itself multipart, the correlationId of each message of the multipart
244+
// response MUST be set to the messageId of the FIRST message in the multipart request.
245+
246+
auto t_start = std::chrono::high_resolution_clock::now();
247+
while (isMessageStillProcessing(correlationId == 0 ? msgId : correlationId)) {
248+
if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
249+
throw std::runtime_error("Time out waiting for a response of message id " + std::to_string(msgId));
250+
}
251+
}
252+
253+
return msgId;
254+
}
255+
226256
/**
227257
* Close the web socket session (without sending any ETP message)
228258
*/

src/etp/fesapi/FesapiHdfProxy.cpp

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,21 +53,11 @@ Energistics::Etp::v12::Datatypes::DataArrayTypes::DataArrayMetadata FesapiHdfPro
5353
// We don't care about the template parameter in this particular case
5454
auto handlers = std::make_shared<GetFullDataArrayHandlers<int64_t>>(session_, nullptr);
5555

56-
const int64_t msgId = session_->sendWithSpecificHandler(
56+
const int64_t msgId = session_->sendWithSpecificHandlerAndBlock(
5757
buildGetDataArrayMetadataMessage(datasetName),
5858
handlers,
5959
0, 0x02);
6060

61-
// Blocking loop
62-
auto t_start = std::chrono::high_resolution_clock::now();
63-
// Use timeOut value for session.
64-
auto timeOut = session_->getTimeOut();
65-
while (session_->isMessageStillProcessing(msgId)) {
66-
if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > timeOut) {
67-
throw std::runtime_error("Time out waiting for a response of GetDataArrayMetadata message id " + std::to_string(msgId));
68-
}
69-
}
70-
7161
return handlers->getDataArrayMetadata();
7262
}
7363

src/etp/fesapi/FesapiHdfProxy.h

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -515,13 +515,10 @@ namespace ETP_NS
515515
auto specializedHandler = std::make_shared<GetFullDataArrayHandlers<T>>(session_, values);
516516
if (wholeSize + (valueCount + 1) * 8 <= maxAllowedDataArraySize) { // There can be valueCount array block and there is the length of the last array block
517517
// Get all values at once
518-
const int64_t msgId = session_->sendWithSpecificHandler(
518+
const int64_t msgId = session_->sendWithSpecificHandlerAndBlock(
519519
buildGetDataArraysMessage(datasetName),
520520
specializedHandler,
521521
0, 0x02);
522-
523-
// Blocking loop
524-
while (session_->isMessageStillProcessing(msgId)) {}
525522
}
526523
else {
527524
// Get all values using several data subarrays allowing more granular streaming
@@ -581,13 +578,10 @@ namespace ETP_NS
581578
}
582579

583580
// Send message
584-
const int64_t msgId = session_->sendWithSpecificHandler(
581+
const int64_t msgId = session_->sendWithSpecificHandlerAndBlock(
585582
msg,
586583
specializedHandler,
587584
0, 0x02);
588-
589-
// Blocking loop
590-
while (session_->isMessageStillProcessing(msgId)) {}
591585
}
592586
}
593587
};

0 commit comments

Comments
 (0)