Skip to content

Commit 7c92257

Browse files
Adopting latest Beast classes (tcp_stream and asio::ssl::stream)
Store the identifier/UUID of the session Clean sending queue at disconnection time Update the Jupyter notebook to put a dataspace with ACLs and LegalTags
1 parent cd25ad3 commit 7c92257

10 files changed

Lines changed: 121 additions & 57 deletions

File tree

example/withFesapi/etpClient.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,7 @@ int main(int argc, char **argv)
737737

738738
std::cout << "Creating a client session..." << std::endl;
739739
auto clientSession = ETP_NS::ClientSessionLaunchers::createClientSession(&initializationParams, authorization);
740+
clientSession->setVerbose(true);
740741

741742
repo.setHdfProxyFactory(new ETP_NS::FesapiHdfProxyFactory(clientSession.get()));
742743

@@ -752,7 +753,6 @@ int main(int argc, char **argv)
752753
}
753754

754755
clientSession->setTimeOut(60000);
755-
clientSession->setVerbose(false);
756756
askUser(clientSession, repo);
757757

758758
sessionThread.join();

python/example/PutHorizon.ipynb

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,18 +155,26 @@
155155
"dataspace.uri = \"eml:///dataspace('\" + dataspace.path + \"')\"\n",
156156
"dataspace.customData = fetpapi.MapStringDataValue()\n",
157157
"dataValue = fetpapi.DataValue()\n",
158-
"aos = fetpapi.ArrayOfString\n",
158+
"aos = fetpapi.ArrayOfString()\n",
159159
"dataValue.item = fetpapi.DataValueitem_t()\n",
160-
"aos.values = [\"data.default.viewers@osdu.example.com\"]\n",
160+
"tmp = fetpapi.StringVector()\n",
161+
"tmp.push_back(\"data.default.viewers@osdu.example.com\")\n",
162+
"aos.values = tmp\n",
161163
"dataValue.item.set_ArrayOfString(aos)\n",
162164
"dataspace.customData[\"viewers\"] = dataValue\n",
163-
"aos.values = [\"data.default.owners@osdu.example.com\"]\n",
165+
"tmp.clear()\n",
166+
"tmp.push_back(\"data.default.owners@osdu.example.com\")\n",
167+
"aos.values = tmp\n",
164168
"dataValue.item.set_ArrayOfString(aos)\n",
165169
"dataspace.customData[\"owners\"] = dataValue\n",
166-
"aos.values = [\"osdu-public-usa-dataset\"]\n",
170+
"tmp.clear()\n",
171+
"tmp.push_back(\"osdu-public-usa-dataset\")\n",
172+
"aos.values = tmp\n",
167173
"dataValue.item.set_ArrayOfString(aos)\n",
168174
"dataspace.customData[\"legaltags\"] = dataValue\n",
169-
"aos.values = [\"US\"]\n",
175+
"tmp.clear()\n",
176+
"tmp.push_back(\"US\")\n",
177+
"aos.values = tmp\n",
170178
"dataValue.item.set_ArrayOfString(aos)\n",
171179
"dataspace.customData[\"otherRelevantDataCountries\"] = dataValue"
172180
]

src/etp/AbstractClientSessionCRTP.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ namespace ETP_NS
3030

3131
virtual ~AbstractClientSessionCRTP() = default;
3232

33-
void on_connect(boost::system::error_code ec) {
33+
void on_ssl_handshake(boost::system::error_code ec) {
3434
if (ec) {
3535
std::cerr << "ERROR at Websocket connection : " << ec.message() << std::endl;
3636
return;

src/etp/AbstractSession.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ void AbstractSession::on_read(boost::system::error_code ec, std::size_t bytes_tr
5050
// This indicates that the web socket (and consequently etp) session was closed
5151
std::cerr << "The other endpoint closed the web socket (and consequently etp) connection" << std::endl;
5252
}
53+
#if BOOST_VERSION > 106900
54+
else if (ec == boost::beast::error::timeout) {
55+
// This indicates that the web socket (and consequently etp) session was closed
56+
std::cerr << "Beast timeout has been reached" << std::endl;
57+
}
58+
#endif
5359
else {
5460
// This indicates an unexpected error
5561
std::cerr << "on_read : error code number " << ec.value() << std::endl;
@@ -63,6 +69,9 @@ void AbstractSession::on_read(boost::system::error_code ec, std::size_t bytes_tr
6369

6470
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
6571
specificProtocolHandlers.clear();
72+
const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
73+
std::queue< std::tuple<int64_t, std::vector<uint8_t>, std::shared_ptr<ETP_NS::ProtocolHandlers>> > empty;
74+
std::swap(sendingQueue, empty);
6675
webSocketSessionClosed = true;
6776
etpSessionClosed = true;
6877

src/etp/AbstractSession.h

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,7 @@ namespace ETP_NS
5757
* If the ETP session is not set up, it returns the nil UUID.
5858
*/
5959
const boost::uuids::uuid& getIdentifier() {
60-
if (isEtpSessionClosed()) {
61-
identifier = boost::uuids::nil_uuid();
62-
}
63-
60+
std::lock_guard<std::mutex> lock(identifierMutex);
6461
return identifier;
6562
}
6663

@@ -78,11 +75,6 @@ namespace ETP_NS
7875
return _timeOut;
7976
}
8077

81-
/**
82-
* The list of subscriptions recorded by customers on this session.
83-
*/
84-
std::unordered_map<int64_t, Energistics::Etp::v12::Datatypes::Object::SubscriptionInfo> subscriptions;
85-
8678
/**
8779
* Set the Core protocol handlers
8880
*/
@@ -246,14 +238,14 @@ namespace ETP_NS
246238
*/
247239
template<typename T> int64_t sendWithSpecificHandlerAndBlock(const T& mb, std::shared_ptr<ETP_NS::ProtocolHandlers> specificHandler, int64_t correlationId = 0, int32_t messageFlags = 0)
248240
{
249-
int64_t msgId = sendWithSpecificHandler(mb, specificHandler, correlationId, messageFlags);
241+
const int64_t msgId = sendWithSpecificHandler(mb, specificHandler, correlationId, messageFlags);
250242
// The correlationId of the first message MUST be set to 0 and the correlationId of all successive
251243
// messages in the same multipart request or notification MUST be set to the messageId of the first
252244
// message of the multipart request or notification.
253245
// If the request message is itself multipart, the correlationId of each message of the multipart
254246
// response MUST be set to the messageId of the FIRST message in the multipart request.
255247

256-
auto t_start = std::chrono::high_resolution_clock::now();
248+
const auto t_start = std::chrono::high_resolution_clock::now();
257249
while (isMessageStillProcessing(correlationId == 0 ? msgId : correlationId)) {
258250
if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
259251
throw std::runtime_error("Time out waiting for a response of message id " + std::to_string(msgId));
@@ -370,11 +362,6 @@ namespace ETP_NS
370362
*/
371363
FETPAPI_DLL_IMPORT_OR_EXPORT bool isEtpSessionClosed() const { return webSocketSessionClosed || etpSessionClosed; }
372364

373-
void setEtpSessionClosed(bool etpSessionClosed_) {
374-
etpSessionClosed = etpSessionClosed_;
375-
reconnectionTryCount_ = 0;
376-
}
377-
378365
/****************
379366
*** DATASPACE ***
380367
****************/
@@ -633,7 +620,8 @@ namespace ETP_NS
633620
/// The next available message id.
634621
std::atomic<int64_t> messageId;
635622
/// The identifier of the session
636-
boost::uuids::uuid identifier;
623+
boost::uuids::uuid identifier{ boost::uuids::nil_uuid() };
624+
std::mutex identifierMutex;
637625
/// Indicates that the endpoint request to close the websocket session
638626
bool isCloseRequested_{ false };
639627
size_t reconnectionTryCount_ = 0;
@@ -646,6 +634,11 @@ namespace ETP_NS
646634
receivedBuffer.consume(receivedBuffer.size());
647635
}
648636

637+
void setEtpSessionClosed(bool etpSessionClosed_) {
638+
etpSessionClosed = etpSessionClosed_;
639+
reconnectionTryCount_ = 0;
640+
}
641+
649642
/**
650643
* Write the current buffer on the web socket
651644
*/
@@ -762,5 +755,7 @@ namespace ETP_NS
762755

763756
protocolHandlers[protocolId] = coreHandlers;
764757
}
758+
759+
friend void CoreHandlers::decodeMessageBody(const Energistics::Etp::v12::Datatypes::MessageHeader& mh, avro::DecoderPtr d);
765760
};
766761
}

src/etp/ClientSession.h

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,10 @@ namespace ETP_NS
8585
std::cerr << "on_resolve : " << ec.message() << std::endl;
8686
}
8787

88-
// Reality check: IPv6 is unlikely to be available yet
89-
endpoints = std::vector<tcp::endpoint>(results.begin(), results.end());
90-
std::stable_partition(endpoints.begin(), endpoints.end(), [](auto entry) {return entry.protocol() == tcp::v4(); });
91-
92-
asyncConnect();
88+
asyncConnect(results);
9389
}
9490

95-
virtual void asyncConnect() = 0;
91+
virtual void asyncConnect(const tcp::resolver::results_type& results) = 0;
9692

9793
virtual bool isTls() const = 0;
9894

@@ -129,7 +125,6 @@ namespace ETP_NS
129125
std::string proxyAuthorization;
130126
std::map<std::string, std::string> additionalHandshakeHeaderFields_;
131127
websocket::response_type responseType; // In order to check handshake sec_websocket_protocol
132-
std::vector<tcp::endpoint> endpoints; // Store the resolved endpoints to prevent calling resolve once again when reconnecting
133128
Energistics::Etp::v12::Protocol::Core::RequestSession requestSession;
134129

135130
/**

src/etp/PlainClientSession.h

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,33 +35,61 @@ namespace ETP_NS
3535
virtual ~PlainClientSession() = default;
3636

3737
// Called by the base class
38+
#if BOOST_VERSION < 107000
3839
FETPAPI_DLL_IMPORT_OR_EXPORT std::unique_ptr<websocket::stream<tcp::socket>>& ws() { return ws_; }
40+
#else
41+
FETPAPI_DLL_IMPORT_OR_EXPORT std::unique_ptr<websocket::stream<boost::beast::tcp_stream>>& ws() { return ws_; }
42+
#endif
3943

4044
bool isTls() const final{ return false; }
4145

42-
void asyncConnect()
46+
void asyncConnect(const tcp::resolver::results_type& results)
4347
{
44-
ws_.reset(new websocket::stream<tcp::socket>(ioc));
45-
ws_->binary(true);
48+
// Reality check: IPv6 is unlikely to be available yet
49+
std::vector<tcp::endpoint> endpoints = std::vector<tcp::endpoint>(results.begin(), results.end());
50+
std::stable_partition(endpoints.begin(), endpoints.end(), [](auto entry) {return entry.protocol() == tcp::v4(); });
51+
4652
#if BOOST_VERSION < 107000
53+
ws_.reset(new websocket::stream<tcp::socket>(ioc));
4754
ws_->write_buffer_size(frameSize_);
48-
#else
49-
ws_->write_buffer_bytes(frameSize_);
50-
#endif
55+
ws_->binary(true);
5156

5257
// Make the connection on the IP address we get from a lookup
5358
boost::asio::async_connect(
5459
ws_->next_layer(),
5560
endpoints.begin(),
5661
endpoints.end(),
5762
std::bind(
58-
&AbstractClientSessionCRTP::on_connect,
63+
&AbstractClientSessionCRTP::on_ssl_handshake,
5964
std::static_pointer_cast<AbstractClientSessionCRTP>(shared_from_this()),
6065
std::placeholders::_1));
66+
#else
67+
ws_.reset(new websocket::stream<boost::beast::tcp_stream>(ioc));
68+
ws_->write_buffer_bytes(frameSize_);
69+
ws_->binary(true);
70+
71+
// Make the connection on the IP address we get from a lookup
72+
boost::beast::get_lowest_layer(*ws_).async_connect(
73+
endpoints,
74+
boost::beast::bind_front_handler(
75+
&PlainClientSession::on_connect,
76+
std::static_pointer_cast<PlainClientSession>(shared_from_this())));
77+
#endif
78+
}
79+
80+
#if BOOST_VERSION > 106900
81+
void on_connect(boost::beast::error_code ec, tcp::resolver::results_type::endpoint_type)
82+
{
83+
on_ssl_handshake(ec);
6184
}
85+
#endif
6286

6387
private:
88+
#if BOOST_VERSION < 107000
6489
std::unique_ptr<websocket::stream<tcp::socket>> ws_;
90+
#else
91+
std::unique_ptr<websocket::stream<boost::beast::tcp_stream>> ws_;
92+
#endif
6593
std::size_t frameSize_;
6694
};
6795
}

src/etp/ProtocolHandlers/CoreHandlers.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ void CoreHandlers::decodeMessageBody(const Energistics::Etp::v12::Datatypes::Mes
5454
}
5555

5656
session->setEtpSessionClosed(false);
57+
{
58+
std::lock_guard<std::mutex> lock(session->identifierMutex);
59+
std::copy(os.sessionId.array.begin(), os.sessionId.array.end(), session->identifier.begin());
60+
}
5761
on_OpenSession(os, mh.correlationId);
5862
}
5963
else if (mh.messageType == Energistics::Etp::v12::Protocol::Core::CloseSession::messageTypeId) {
@@ -104,7 +108,7 @@ void CoreHandlers::on_RequestSession(const Energistics::Etp::v12::Protocol::Core
104108

105109
void CoreHandlers::on_OpenSession(const Energistics::Etp::v12::Protocol::Core::OpenSession &, int64_t)
106110
{
107-
session->fesapi_log("The session has been opened with the default core protocol handlers.");
111+
session->fesapi_log("The session", session->getIdentifier(), "has been opened with the default core protocol handlers.");
108112
}
109113

110114
void CoreHandlers::on_CloseSession(const Energistics::Etp::v12::Protocol::Core::CloseSession &, int64_t)

src/etp/ProtocolHandlers/StoreNotificationHandlers.cpp

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -103,20 +103,7 @@ void StoreNotificationHandlers::on_UnsubscribeNotifications(const Energistics::E
103103
{
104104
session->fesapi_log("on_UnsubscribeNotifications");
105105

106-
int64_t toRemove = (std::numeric_limits<int64_t>::max)();
107-
for (const auto& pair : session->subscriptions) {
108-
if (pair.second.requestUuid.array == msg.requestUuid.array) {
109-
toRemove = pair.first;
110-
break;
111-
}
112-
}
113-
114-
if (toRemove != (std::numeric_limits<int64_t>::max)()) {
115-
session->subscriptions.erase(toRemove);
116-
}
117-
else {
118-
session->send(ETP_NS::EtpHelpers::buildSingleMessageProtocolException(5, "The subscription request UUID is unknown by the store."), messageId, 0x02);
119-
}
106+
session->send(ETP_NS::EtpHelpers::buildSingleMessageProtocolException(7, "The StoreHandlers::on_UnsubscribeNotifications method has not been overriden by the agent."), 0x02);
120107
}
121108

122109
void StoreNotificationHandlers::on_UnsolicitedStoreNotifications(const Energistics::Etp::v12::Protocol::StoreNotification::UnsolicitedStoreNotifications &, int64_t)

0 commit comments

Comments
 (0)