Skip to content

Commit 7aca270

Browse files
Adopting latest Beast classes (tcp_stream and asio::ssl::stream)
Store the identifier/UUID of the session Clean sending queue at disconnection time Update the Jupyter notebook to put a dataspace with ACLs and LegalTags
1 parent cd25ad3 commit 7aca270

9 files changed

Lines changed: 113 additions & 33 deletions

File tree

example/withFesapi/etpClient.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,7 @@ int main(int argc, char **argv)
737737

738738
std::cout << "Creating a client session..." << std::endl;
739739
auto clientSession = ETP_NS::ClientSessionLaunchers::createClientSession(&initializationParams, authorization);
740+
clientSession->setVerbose(true);
740741

741742
repo.setHdfProxyFactory(new ETP_NS::FesapiHdfProxyFactory(clientSession.get()));
742743

@@ -752,7 +753,6 @@ int main(int argc, char **argv)
752753
}
753754

754755
clientSession->setTimeOut(60000);
755-
clientSession->setVerbose(false);
756756
askUser(clientSession, repo);
757757

758758
sessionThread.join();

python/example/PutHorizon.ipynb

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -155,18 +155,26 @@
155155
"dataspace.uri = \"eml:///dataspace('\" + dataspace.path + \"')\"\n",
156156
"dataspace.customData = fetpapi.MapStringDataValue()\n",
157157
"dataValue = fetpapi.DataValue()\n",
158-
"aos = fetpapi.ArrayOfString\n",
158+
"aos = fetpapi.ArrayOfString()\n",
159159
"dataValue.item = fetpapi.DataValueitem_t()\n",
160-
"aos.values = [\"data.default.viewers@osdu.example.com\"]\n",
160+
"tmp = fetpapi.StringVector()\n",
161+
"tmp.push_back(\"data.default.viewers@osdu.example.com\")\n",
162+
"aos.values = tmp\n",
161163
"dataValue.item.set_ArrayOfString(aos)\n",
162164
"dataspace.customData[\"viewers\"] = dataValue\n",
163-
"aos.values = [\"data.default.owners@osdu.example.com\"]\n",
165+
"tmp.clear()\n",
166+
"tmp.push_back(\"data.default.owners@osdu.example.com\")\n",
167+
"aos.values = tmp\n",
164168
"dataValue.item.set_ArrayOfString(aos)\n",
165169
"dataspace.customData[\"owners\"] = dataValue\n",
166-
"aos.values = [\"osdu-public-usa-dataset\"]\n",
170+
"tmp.clear()\n",
171+
"tmp.push_back(\"osdu-public-usa-dataset\")\n",
172+
"aos.values = tmp\n",
167173
"dataValue.item.set_ArrayOfString(aos)\n",
168174
"dataspace.customData[\"legaltags\"] = dataValue\n",
169-
"aos.values = [\"US\"]\n",
175+
"tmp.clear()\n",
176+
"tmp.push_back(\"US\")\n",
177+
"aos.values = tmp\n",
170178
"dataValue.item.set_ArrayOfString(aos)\n",
171179
"dataspace.customData[\"otherRelevantDataCountries\"] = dataValue"
172180
]

src/etp/AbstractClientSessionCRTP.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ namespace ETP_NS
3030

3131
virtual ~AbstractClientSessionCRTP() = default;
3232

33-
void on_connect(boost::system::error_code ec) {
33+
void on_ssl_handshake(boost::system::error_code ec) {
3434
if (ec) {
3535
std::cerr << "ERROR at Websocket connection : " << ec.message() << std::endl;
3636
return;

src/etp/AbstractSession.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ void AbstractSession::on_read(boost::system::error_code ec, std::size_t bytes_tr
5050
// This indicates that the web socket (and consequently etp) session was closed
5151
std::cerr << "The other endpoint closed the web socket (and consequently etp) connection" << std::endl;
5252
}
53+
else if (ec == boost::beast::error::timeout) {
54+
// This indicates that the web socket (and consequently etp) session was closed
55+
std::cerr << "Beast timeout has been reached" << std::endl;
56+
}
5357
else {
5458
// This indicates an unexpected error
5559
std::cerr << "on_read : error code number " << ec.value() << std::endl;
@@ -63,6 +67,9 @@ void AbstractSession::on_read(boost::system::error_code ec, std::size_t bytes_tr
6367

6468
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
6569
specificProtocolHandlers.clear();
70+
const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
71+
std::queue< std::tuple<int64_t, std::vector<uint8_t>, std::shared_ptr<ETP_NS::ProtocolHandlers>> > empty;
72+
std::swap(sendingQueue, empty);
6673
webSocketSessionClosed = true;
6774
etpSessionClosed = true;
6875

src/etp/AbstractSession.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,7 @@ namespace ETP_NS
5757
* If the ETP session is not set up, it returns the nil UUID.
5858
*/
5959
const boost::uuids::uuid& getIdentifier() {
60-
if (isEtpSessionClosed()) {
61-
identifier = boost::uuids::nil_uuid();
62-
}
63-
60+
std::lock_guard<std::mutex> lock(identifierMutex);
6461
return identifier;
6562
}
6663

@@ -246,14 +243,14 @@ namespace ETP_NS
246243
*/
247244
template<typename T> int64_t sendWithSpecificHandlerAndBlock(const T& mb, std::shared_ptr<ETP_NS::ProtocolHandlers> specificHandler, int64_t correlationId = 0, int32_t messageFlags = 0)
248245
{
249-
int64_t msgId = sendWithSpecificHandler(mb, specificHandler, correlationId, messageFlags);
246+
const int64_t msgId = sendWithSpecificHandler(mb, specificHandler, correlationId, messageFlags);
250247
// The correlationId of the first message MUST be set to 0 and the correlationId of all successive
251248
// messages in the same multipart request or notification MUST be set to the messageId of the first
252249
// message of the multipart request or notification.
253250
// If the request message is itself multipart, the correlationId of each message of the multipart
254251
// response MUST be set to the messageId of the FIRST message in the multipart request.
255252

256-
auto t_start = std::chrono::high_resolution_clock::now();
253+
const auto t_start = std::chrono::high_resolution_clock::now();
257254
while (isMessageStillProcessing(correlationId == 0 ? msgId : correlationId)) {
258255
if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
259256
throw std::runtime_error("Time out waiting for a response of message id " + std::to_string(msgId));
@@ -633,7 +630,8 @@ namespace ETP_NS
633630
/// The next available message id.
634631
std::atomic<int64_t> messageId;
635632
/// The identifier of the session
636-
boost::uuids::uuid identifier;
633+
boost::uuids::uuid identifier{ boost::uuids::nil_uuid() };
634+
std::mutex identifierMutex;
637635
/// Indicates that the endpoint request to close the websocket session
638636
bool isCloseRequested_{ false };
639637
size_t reconnectionTryCount_ = 0;
@@ -762,5 +760,7 @@ namespace ETP_NS
762760

763761
protocolHandlers[protocolId] = coreHandlers;
764762
}
763+
764+
friend void CoreHandlers::decodeMessageBody(const Energistics::Etp::v12::Datatypes::MessageHeader& mh, avro::DecoderPtr d);
765765
};
766766
}

src/etp/ClientSession.h

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,10 @@ namespace ETP_NS
8585
std::cerr << "on_resolve : " << ec.message() << std::endl;
8686
}
8787

88-
// Reality check: IPv6 is unlikely to be available yet
89-
endpoints = std::vector<tcp::endpoint>(results.begin(), results.end());
90-
std::stable_partition(endpoints.begin(), endpoints.end(), [](auto entry) {return entry.protocol() == tcp::v4(); });
91-
92-
asyncConnect();
88+
asyncConnect(results);
9389
}
9490

95-
virtual void asyncConnect() = 0;
91+
virtual void asyncConnect(const tcp::resolver::results_type& results) = 0;
9692

9793
virtual bool isTls() const = 0;
9894

@@ -129,7 +125,6 @@ namespace ETP_NS
129125
std::string proxyAuthorization;
130126
std::map<std::string, std::string> additionalHandshakeHeaderFields_;
131127
websocket::response_type responseType; // In order to check handshake sec_websocket_protocol
132-
std::vector<tcp::endpoint> endpoints; // Store the resolved endpoints to prevent calling resolve once again when reconnecting
133128
Energistics::Etp::v12::Protocol::Core::RequestSession requestSession;
134129

135130
/**

src/etp/PlainClientSession.h

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,33 +35,61 @@ namespace ETP_NS
3535
virtual ~PlainClientSession() = default;
3636

3737
// Called by the base class
38+
#if BOOST_VERSION < 107000
3839
FETPAPI_DLL_IMPORT_OR_EXPORT std::unique_ptr<websocket::stream<tcp::socket>>& ws() { return ws_; }
40+
#else
41+
FETPAPI_DLL_IMPORT_OR_EXPORT std::unique_ptr<websocket::stream<boost::beast::tcp_stream>>& ws() { return ws_; }
42+
#endif
3943

4044
bool isTls() const final{ return false; }
4145

42-
void asyncConnect()
46+
void asyncConnect(const tcp::resolver::results_type& results)
4347
{
44-
ws_.reset(new websocket::stream<tcp::socket>(ioc));
45-
ws_->binary(true);
48+
// Reality check: IPv6 is unlikely to be available yet
49+
std::vector<tcp::endpoint> endpoints = std::vector<tcp::endpoint>(results.begin(), results.end());
50+
std::stable_partition(endpoints.begin(), endpoints.end(), [](auto entry) {return entry.protocol() == tcp::v4(); });
51+
4652
#if BOOST_VERSION < 107000
53+
ws_.reset(new websocket::stream<tcp::socket>(ioc));
4754
ws_->write_buffer_size(frameSize_);
48-
#else
49-
ws_->write_buffer_bytes(frameSize_);
50-
#endif
55+
ws_->binary(true);
5156

5257
// Make the connection on the IP address we get from a lookup
5358
boost::asio::async_connect(
5459
ws_->next_layer(),
5560
endpoints.begin(),
5661
endpoints.end(),
5762
std::bind(
58-
&AbstractClientSessionCRTP::on_connect,
63+
&AbstractClientSessionCRTP::on_ssl_handshake,
5964
std::static_pointer_cast<AbstractClientSessionCRTP>(shared_from_this()),
6065
std::placeholders::_1));
66+
#else
67+
ws_.reset(new websocket::stream<boost::beast::tcp_stream>(ioc));
68+
ws_->write_buffer_bytes(frameSize_);
69+
ws_->binary(true);
70+
71+
// Make the connection on the IP address we get from a lookup
72+
boost::beast::get_lowest_layer(*ws_).async_connect(
73+
endpoints,
74+
boost::beast::bind_front_handler(
75+
&PlainClientSession::on_connect,
76+
std::static_pointer_cast<PlainClientSession>(shared_from_this())));
77+
#endif
78+
}
79+
80+
#if BOOST_VERSION > 106900
81+
void on_connect(boost::beast::error_code ec, tcp::resolver::results_type::endpoint_type)
82+
{
83+
on_ssl_handshake(ec);
6184
}
85+
#endif
6286

6387
private:
88+
#if BOOST_VERSION < 107000
6489
std::unique_ptr<websocket::stream<tcp::socket>> ws_;
90+
#else
91+
std::unique_ptr<websocket::stream<boost::beast::tcp_stream>> ws_;
92+
#endif
6593
std::size_t frameSize_;
6694
};
6795
}

src/etp/ProtocolHandlers/CoreHandlers.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ void CoreHandlers::decodeMessageBody(const Energistics::Etp::v12::Datatypes::Mes
5454
}
5555

5656
session->setEtpSessionClosed(false);
57+
{
58+
std::lock_guard<std::mutex> lock(session->identifierMutex);
59+
std::copy(os.sessionId.array.begin(), os.sessionId.array.end(), session->identifier.begin());
60+
}
5761
on_OpenSession(os, mh.correlationId);
5862
}
5963
else if (mh.messageType == Energistics::Etp::v12::Protocol::Core::CloseSession::messageTypeId) {
@@ -104,7 +108,7 @@ void CoreHandlers::on_RequestSession(const Energistics::Etp::v12::Protocol::Core
104108

105109
void CoreHandlers::on_OpenSession(const Energistics::Etp::v12::Protocol::Core::OpenSession &, int64_t)
106110
{
107-
session->fesapi_log("The session has been opened with the default core protocol handlers.");
111+
session->fesapi_log("The session", session->getIdentifier(), "has been opened with the default core protocol handlers.");
108112
}
109113

110114
void CoreHandlers::on_CloseSession(const Energistics::Etp::v12::Protocol::Core::CloseSession &, int64_t)

src/etp/ssl/SslClientSession.h

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@ under the License.
2525
#include "ssl_stream.h"
2626
#elif BOOST_VERSION < 107000
2727
#include <boost/beast/experimental/core/ssl_stream.hpp>
28-
#else
28+
#elif BOOST_VERSION < 108600
2929
#include <boost/beast/http.hpp>
3030
#include <boost/beast/ssl.hpp>
3131
#include <boost/beast/websocket/ssl.hpp>
32+
#else
33+
#include <boost/asio/ssl.hpp>
34+
#include <boost/beast/websocket/ssl.hpp>
3235
#endif
3336

3437
namespace http = boost::beast::http; // from <boost/beast/http.hpp>
@@ -48,13 +51,26 @@ namespace ETP_NS
4851
virtual ~SslClientSession() {}
4952

5053
// Called by the base class
54+
#if BOOST_VERSION < 107000
5155
FETPAPI_DLL_IMPORT_OR_EXPORT std::unique_ptr<websocket::stream<boost::beast::ssl_stream<tcp::socket>>>& ws() { return ws_; }
56+
#elif BOOST_VERSION < 108600
57+
FETPAPI_DLL_IMPORT_OR_EXPORT std::unique_ptr<websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>>>& ws() { return ws_; }
58+
#else
59+
FETPAPI_DLL_IMPORT_OR_EXPORT std::unique_ptr< websocket::stream<boost::asio::ssl::stream<boost::beast::tcp_stream>>>& ws() { return ws_; }
60+
#endif
5261

5362
bool isTls() const final { return true; }
5463

55-
void asyncConnect()
64+
void asyncConnect(const tcp::resolver::results_type& results)
5665
{
66+
#if BOOST_VERSION < 107000
5767
ws_.reset(new websocket::stream<boost::beast::ssl_stream<tcp::socket>>(ioc, sslContext_));
68+
#elif BOOST_VERSION < 108600
69+
ws_.reset(new websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>>(ioc, sslContext_));
70+
#else
71+
ws_.reset(new websocket::stream<boost::asio::ssl::stream<boost::beast::tcp_stream>>(ioc, sslContext_));
72+
#endif
73+
5874
ws_->binary(true);
5975
#if BOOST_VERSION < 107000
6076
ws_->write_buffer_size(frameSize_);
@@ -69,7 +85,12 @@ namespace ETP_NS
6985
std::cerr << "Websocket on connect (SNI): " << ecSNI.message() << std::endl;
7086
}
7187

88+
// Reality check: IPv6 is unlikely to be available yet
89+
std::vector<tcp::endpoint> endpoints = std::vector<tcp::endpoint>(results.begin(), results.end());
90+
std::stable_partition(endpoints.begin(), endpoints.end(), [](auto entry) {return entry.protocol() == tcp::v4(); });
91+
7292
// Make the connection on the IP address we get from a lookup
93+
#if BOOST_VERSION < 107000
7394
boost::asio::async_connect(
7495
ws_->next_layer().next_layer(),
7596
endpoints.begin(),
@@ -78,9 +99,20 @@ namespace ETP_NS
7899
&SslClientSession::on_ssl_connect,
79100
std::static_pointer_cast<SslClientSession>(shared_from_this()),
80101
std::placeholders::_1));
102+
#else
103+
boost::beast::get_lowest_layer(*ws_).async_connect(
104+
endpoints,
105+
boost::beast::bind_front_handler(
106+
&SslClientSession::on_ssl_connect,
107+
std::static_pointer_cast<SslClientSession>(shared_from_this())));
108+
#endif
81109
}
82110

111+
#if BOOST_VERSION < 107000
83112
void on_ssl_connect(boost::system::error_code ec) {
113+
#else
114+
void on_ssl_connect(boost::beast::error_code ec, tcp::resolver::results_type::endpoint_type) {
115+
#endif
84116
if (ec) {
85117
std::cerr << "on_ssl_connect : " << ec.message() << std::endl;
86118
}
@@ -110,7 +142,7 @@ namespace ETP_NS
110142
ws_->next_layer().async_handshake(
111143
boost::asio::ssl::stream_base::client,
112144
std::bind(
113-
&AbstractClientSessionCRTP::on_connect,
145+
&AbstractClientSessionCRTP::on_ssl_handshake,
114146
std::static_pointer_cast<AbstractClientSessionCRTP>(shared_from_this()),
115147
std::placeholders::_1));
116148
}
@@ -166,14 +198,20 @@ namespace ETP_NS
166198
ws_->next_layer().async_handshake(
167199
boost::asio::ssl::stream_base::client,
168200
std::bind(
169-
&AbstractClientSessionCRTP::on_connect,
201+
&AbstractClientSessionCRTP::on_ssl_handshake,
170202
std::static_pointer_cast<AbstractClientSessionCRTP>(shared_from_this()),
171203
std::placeholders::_1));
172204
}
173205

174206
private:
175207
boost::asio::ssl::context sslContext_;
208+
#if BOOST_VERSION < 107000
176209
std::unique_ptr<websocket::stream<boost::beast::ssl_stream<tcp::socket>>> ws_;
210+
#elif BOOST_VERSION < 108600
211+
std::unique_ptr<websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>>> ws_;
212+
#else
213+
std::unique_ptr<websocket::stream<boost::asio::ssl::stream<boost::beast::tcp_stream>>> ws_;
214+
#endif
177215
http::request<http::empty_body> proxyHandshake;
178216
http::response<http::empty_body> proxyHandshakeResponse;
179217
// use own response parser

0 commit comments

Comments
 (0)