Skip to content

Commit 6c9954e

Browse files
closeAndBlock now force closing after timeout by default
Add a forceClose(bool) capability: closeAndBlock now accepts an optional forceCloseAfterTimeOut parameter (default true) and will call a new forceClose() when the websocket doesn't close before timeout. Fix #30 Replace usages of std::chrono::high_resolution_clock with std::chrono::steady_clock for timeout/duration calculations to ensure monotonic timing and avoid issues with system clock adjustments.
1 parent 30c102d commit 6c9954e

15 files changed

Lines changed: 96 additions & 52 deletions

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
# Prepare the dependencies
1212
Download (build and install if necessary) third party libraries:
1313
- BOOST : All versions from version 1.66 should be ok but you may experience some [min/max build issues](https://github.com/boostorg/beast/issues/1980) using version 1.72 or 1.73.
14-
- AVRO : https://avro.apache.org/releases.html#Download (starting from version 1.9.0 [except 1.11.1](https://issues.apache.org/jira/browse/AVRO-3601), build it with the above boost library.)
14+
- AVRO : https://avro.apache.org/releases.html#Download (starting from version 1.9.0 [except 1.11.1](https://issues.apache.org/jira/browse/AVRO-3601), build it with the above boost library if necessary.)
1515
- (OPTIONALLY) OpenSSL : version 3.4 is known to work. OpenSSL is mandatory on Unix environement if you want SSL/TLS support. On Windows environment, you can depends on [Wintls](https://github.com/laudrup/boost-wintls/tree/master) instead
1616
- (OPTIONALLY) [Wintls](https://github.com/laudrup/boost-wintls/tree/master) : Starting from version 0.9.9. Only on Windows and only if you don't want to use OpenSSL to support SSL/TLS. FYI, Wintls uses native Windows API [(SSPI/Schannel)](https://docs.microsoft.com/en-us/windows-server/security/tls/tls-ssl-schannel-ssp-overview) functionality instead of OpenSSL for providing TLS encrypted stream functionality.
1717
- (OPTIONALLY) [FESAPI](https://github.com/F2I-Consulting/fesapi/releases) : All versions from version 2.7.0.0 should be ok but a minimal version of 2.11.0.0 is recommended to automatically recognize FESAPI CMake Variables using CMake find Module and build silently the EtpClient example.

cmake/swigEtp1_2Include.i.in

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1734,8 +1734,12 @@ namespace ETP_NS
17341734
* Send a message for closing to the server after having sent all previous messages.
17351735
* The session would really close only after all messages have been sent and responded.
17361736
* This method does block.
1737+
*
1738+
* @param forceCloseAfterTimeOut If true, the websocket session will be closed
1739+
* even if the ETP server did not close the session before the time out.
1740+
* By default, it is true.
17371741
*/
1738-
void closeAndBlock();
1742+
void closeAndBlock(bool forceCloseAfterTimeOut = true);
17391743

17401744
/**
17411745
* Check if the ETP session (starting after the Core.OpenSession or Core.RequestSession message) is not opened yet or has been closed.

example/withFesapi/etpClient.cpp

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -302,14 +302,14 @@ void askUser(std::shared_ptr<ETP_NS::AbstractSession> session, COMMON_NS::DataOb
302302
repo.addOrReplaceGsoapProxy(dataobjectEntry.second.data, ETP_NS::EtpHelpers::getDataObjectType(dataobjectEntry.second.resource.uri), ETP_NS::EtpHelpers::getDataspaceUri(dataobjectEntry.second.resource.uri));
303303
}
304304
// Parse reps
305-
auto global_start = std::chrono::high_resolution_clock::now();
305+
auto global_start = std::chrono::steady_clock::now();
306306
for (auto* rep : repo.getDataObjects<RESQML2_NS::AbstractRepresentation>()) {
307307
std::cout << "Representation " << rep->getTitle() << std::endl;
308308
std::unique_ptr<double[]> ijkGridPoints(new double[rep->getXyzPointCountOfAllPatches() * 3]);
309-
auto t_start = std::chrono::high_resolution_clock::now();
309+
auto t_start = std::chrono::steady_clock::now();
310310
try {
311311
rep->getXyzPointsOfAllPatches(ijkGridPoints.get());
312-
std::cout << "XYZ POINTS IN " << std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() << " ms" << std::endl;
312+
std::cout << "XYZ POINTS IN " << std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - t_start).count() << " ms" << std::endl;
313313
}
314314
catch (...) {
315315
std::cerr << "Error reading XYZ points." << std::endl;
@@ -322,20 +322,20 @@ void askUser(std::shared_ptr<ETP_NS::AbstractSession> session, COMMON_NS::DataOb
322322
if (dynamic_cast<RESQML2_NS::ContinuousProperty*>(prop) != nullptr) {
323323
std::cout << "Continuous Prop " << propIndex++ << "/" << allProps.size() << " : " << prop->getTitle() << std::endl;
324324
std::unique_ptr<double[]> propValues(new double[valuesCount]);
325-
t_start = std::chrono::high_resolution_clock::now();
325+
t_start = std::chrono::steady_clock::now();
326326
prop->getDoubleValuesOfPatch(0, propValues.get());
327-
std::cout << "Continuous Prop IN " << std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() << " ms" << std::endl;
327+
std::cout << "Continuous Prop IN " << std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - t_start).count() << " ms" << std::endl;
328328
}
329329
else {
330330
std::cout << "Non Continuous Prop " << propIndex++ << "/" << allProps.size() << " : " << prop->getTitle() << std::endl;
331331
std::unique_ptr<int[]> propValues(new int[valuesCount]);
332-
t_start = std::chrono::high_resolution_clock::now();
332+
t_start = std::chrono::steady_clock::now();
333333
prop->getInt32ValuesOfPatch(0, propValues.get());
334-
std::cout << "Non Continuous Prop IN " << std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() << " ms" << std::endl;
334+
std::cout << "Non Continuous Prop IN " << std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - t_start).count() << " ms" << std::endl;
335335
}
336336
}
337337
}
338-
std::cout << "GLOBALLY DONE IN " << std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - global_start).count() << " ms" << std::endl;
338+
std::cout << "GLOBALLY DONE IN " << std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - global_start).count() << " ms" << std::endl;
339339
}
340340
else {
341341
std::cout << "There is no dataobject in this dataspace" << std::endl;
@@ -546,7 +546,7 @@ void askUser(std::shared_ptr<ETP_NS::AbstractSession> session, COMMON_NS::DataOb
546546
std::vector< std::string > dataspacesToLock;
547547
dataspacesToLock.push_back(dataspace.uri);
548548

549-
auto transaction_start = std::chrono::high_resolution_clock::now();
549+
auto transaction_start = std::chrono::steady_clock::now();
550550

551551
std::string result = session->startTransaction(dataspacesToLock);
552552
if (result.empty()) {
@@ -562,13 +562,13 @@ void askUser(std::shared_ptr<ETP_NS::AbstractSession> session, COMMON_NS::DataOb
562562
0.0, 1.0, 0.0, 50.0);
563563

564564
for (size_t propIndex = 0; propIndex < 100; ++propIndex) {
565-
auto t_start = std::chrono::high_resolution_clock::now();
565+
auto t_start = std::chrono::steady_clock::now();
566566
auto* prop = tmpRepo.createContinuousProperty(horizon_grid_2d_representation, "", "", 1, gsoap_eml2_3::eml23__IndexableElement::nodes, gsoap_resqml2_0_1::resqml20__ResqmlUom::m,
567567
gsoap_resqml2_0_1::resqml20__ResqmlPropertyKind::length);
568568
std::unique_ptr<double[]> prop_values(new double[ni * nj]);
569569
prop->pushBackDoubleHdf5Array2dOfValues(prop_values.get(), ni, nj, hdf_proxy);
570-
std::cout << " Pushed prop " << propIndex << " in " << std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() << " ms" << std::endl;
571-
std::cout << " Global time " << std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - transaction_start).count() << " s" << std::endl;
570+
std::cout << " Pushed prop " << propIndex << " in " << std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - t_start).count() << " ms" << std::endl;
571+
std::cout << " Global time " << std::chrono::duration<double>(std::chrono::steady_clock::now() - transaction_start).count() << " s" << std::endl;
572572
}
573573

574574
tmpRepo.setUriSource(dataspace.uri);
@@ -706,7 +706,7 @@ void askUser(std::shared_ptr<ETP_NS::AbstractSession> session, COMMON_NS::DataOb
706706
for (auto* hdfProxy : repo.getHdfProxySet()) {
707707
hdfProxy->close();
708708
}
709-
session->close();
709+
session->closeAndBlock();
710710
}
711711

712712
int main(int argc, char **argv)
@@ -750,10 +750,10 @@ int main(int argc, char **argv)
750750
std::thread sessionThread(&ETP_NS::ClientSession::run, clientSession);
751751

752752
// Wait for the ETP session to be opened
753-
auto t_start = std::chrono::high_resolution_clock::now();
753+
auto t_start = std::chrono::steady_clock::now();
754754
while (clientSession->isEtpSessionClosed()) {
755-
auto timeOut = std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count();
756-
if (timeOut > 50000) {
755+
auto timeOut = std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - t_start).count();
756+
if (timeOut > 10000) {
757757
throw std::invalid_argument("Time out : " + std::to_string(timeOut) + " ms.\n");
758758
}
759759
}

src/fetpapi/etp/AbstractSession.h

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,10 +187,10 @@ namespace ETP_NS
187187
// If the request message is itself multipart, the correlationId of each message of the multipart
188188
// response MUST be set to the messageId of the FIRST message in the multipart request.
189189

190-
auto t_start = std::chrono::high_resolution_clock::now();
190+
auto t_start = std::chrono::steady_clock::now();
191191
while (isMessageStillProcessing(correlationId == 0 ? msgId : correlationId)) {
192192
std::this_thread::sleep_for(std::chrono::milliseconds(100));
193-
if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
193+
if (std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - t_start).count() > _timeOut) {
194194
throw std::runtime_error("Time out waiting for a response of message id " + std::to_string(msgId));
195195
}
196196
}
@@ -285,10 +285,10 @@ namespace ETP_NS
285285
// If the request message is itself multipart, the correlationId of each message of the multipart
286286
// response MUST be set to the messageId of the FIRST message in the multipart request.
287287

288-
const auto t_start = std::chrono::high_resolution_clock::now();
288+
const auto t_start = std::chrono::steady_clock::now();
289289
while (isMessageStillProcessing(correlationId == 0 ? msgId : correlationId)) {
290290
std::this_thread::sleep_for(std::chrono::milliseconds(100));
291-
if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
291+
if (std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - t_start).count() > _timeOut) {
292292
throw std::runtime_error("Time out waiting for a response of message id " + std::to_string(msgId));
293293
}
294294
}
@@ -385,14 +385,24 @@ namespace ETP_NS
385385
* Send a message for closing to the server after having sent all previous messages.
386386
* The session would really close only after all messages have been sent and responded.
387387
* This method does block.
388+
*
389+
* @param forceCloseAfterTimeOut If true, the websocket session will be closed
390+
* even if the ETP server did not close the session before the time out.
391+
* By default, it is true.
388392
*/
389-
FETPAPI_DLL_IMPORT_OR_EXPORT void closeAndBlock() {
393+
FETPAPI_DLL_IMPORT_OR_EXPORT void closeAndBlock(bool forceCloseAfterTimeOut = true) {
390394
close();
391-
auto t_start = std::chrono::high_resolution_clock::now();
395+
auto t_start = std::chrono::steady_clock::now();
392396
while (!webSocketSessionClosed) {
393397
std::this_thread::sleep_for(std::chrono::milliseconds(100));
394-
if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > _timeOut) {
395-
throw std::runtime_error("Time out waiting for closing");
398+
if (std::chrono::duration<double, std::milli>(std::chrono::steady_clock::now() - t_start).count() > _timeOut) {
399+
if (forceCloseAfterTimeOut) {
400+
fesapi_log("Time out waiting for closing. Forcefully closing.");
401+
forceClose();
402+
}
403+
else {
404+
throw std::runtime_error("Time out waiting for closing");
405+
}
396406
}
397407
}
398408
}
@@ -691,6 +701,11 @@ namespace ETP_NS
691701
*/
692702
virtual void do_write() = 0;
693703

704+
/**
705+
* Force closing of the session
706+
*/
707+
virtual void forceClose() = 0;
708+
694709
/**
695710
* Reads the message header currently stored in the decoder.
696711
* @param decoder Must be initialized with stream containing a coded message header.

src/fetpapi/etp/ClientSession.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,17 +139,16 @@ namespace ETP_NS
139139

140140
/**
141141
* @param initializationParams The initialization parameters of the session including IP host, port, requestedProtocols, supportedDataObjects
142-
* @param target usually "/" but a server can decide to serve etp on a particular target
143142
* @param etpServerAuth The HTTP authorization attribute to send to the ETP server. It may be empty if not needed.
144143
* @param proxyAuth The HTTP authorization attribute to send to the proxy server. It may be empty if not needed.
145144
*/
146145
ClientSession(
147-
InitializationParameters const* initializationParams, const std::string& target, const std::string& etpServerAuth, const std::string& proxyAuth = "") :
146+
InitializationParameters const* initializationParams, const std::string& etpServerAuth, const std::string& proxyAuth = "") :
148147
ioc(),
149148
resolver(ioc),
150149
etpServerHost(initializationParams->getEtpServerHost()),
151150
etpServerPort(std::to_string(initializationParams->getEtpServerPort())),
152-
etpServerTarget(target),
151+
etpServerTarget(initializationParams->getEtpServerUrlPath()),
153152
etpServerAuthorization(etpServerAuth),
154153
proxyHost(initializationParams->getProxyHost()),
155154
proxyPort(std::to_string(initializationParams->getProxyPort())),

src/fetpapi/etp/ClientSessionLaunchers.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ std::shared_ptr<ETP_NS::ClientSession> ETP_NS::ClientSessionLaunchers::createCli
119119

120120
std::size_t preferredMaxFrameSize = getNegotiatedMaxWebSocketFramePayloadSize(restClientSession->getResponse().body(), initializationParams->getPreferredMaxFrameSize());
121121

122-
result = std::make_shared<SslClientSession>(std::move(ctx), initializationParams, "/" + initializationParams->getEtpServerUrlPath(),
122+
result = std::make_shared<SslClientSession>(std::move(ctx), initializationParams,
123123
authorization, proxyAuthorization,
124124
initializationParams->getAdditionalHandshakeHeaderFields(), preferredMaxFrameSize);
125125
}
@@ -134,7 +134,7 @@ std::shared_ptr<ETP_NS::ClientSession> ETP_NS::ClientSessionLaunchers::createCli
134134

135135
std::size_t preferredMaxFrameSize = getNegotiatedMaxWebSocketFramePayloadSize(restClientSession->getResponse().body(), initializationParams->getPreferredMaxFrameSize());
136136

137-
result = std::make_shared<PlainClientSession>(initializationParams, "/" + initializationParams->getEtpServerUrlPath(),
137+
result = std::make_shared<PlainClientSession>(initializationParams,
138138
authorization, proxyAuthorization,
139139
initializationParams->getAdditionalHandshakeHeaderFields(), preferredMaxFrameSize);
140140
#if WITH_ETP_SSL

src/fetpapi/etp/InitializationParameters.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ namespace {
5454
else {
5555
hostEnd = portStart++;
5656
portEnd = url.find("/", portStart);
57-
uint16_t readPort = static_cast<uint16_t>(stoi(url.substr(portStart, portEnd - portStart)));
57+
uint16_t readPort = static_cast<uint16_t>(stoi(portEnd == std::string::npos
58+
? url.substr(portStart)
59+
: url.substr(portStart, portEnd - portStart)));
5860
if (readPort < 1 || readPort > (std::numeric_limits<uint16_t>::max)()) {
5961
throw std::out_of_range("The port " + std::to_string(readPort) + " is out of the allowed range for TCP ports (0,2^16)");
6062
}
@@ -66,7 +68,7 @@ namespace {
6668
std::get<0>(result) = url.substr(hostStart);
6769
}
6870
else {
69-
std::get<2>(result) = portEnd < url.size() - 1 ? url.substr(portEnd + 1) : "";
71+
std::get<2>(result) = portEnd == std::string::npos ? "" : url.substr(portEnd);
7072
std::get<0>(result) = url.substr(hostStart, hostEnd - hostStart);
7173
}
7274

src/fetpapi/etp/InitializationParameters.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,16 @@ namespace ETP_NS
110110
* @param port The port number to connect to.
111111
* @param urlPath The rest of the locator consists of data specific to the scheme, and is known as the "url-path".
112112
* It supplies the details of how the specified resource can be accessed.
113-
* Note that the "/" between the host (or port) and the url-path is NOT part of the url-path.
113+
* It must start with a slash or be empty.
114114
*/
115115
InitializationParameters(boost::uuids::uuid instanceUuid,
116116
const std::string& host, uint16_t port, const std::string& urlPath = "") :
117117
identifier(instanceUuid), etpServerHost(host), etpServerPort(port), etpServerUrlPath(urlPath)
118-
{}
118+
{
119+
if (!etpServerUrlPath.empty() && etpServerUrlPath[0] != '/') {
120+
throw std::invalid_argument("urlPath must start with a slash or be empty");
121+
}
122+
}
119123

120124
/**
121125
* Only to be used for direct connection to the ETP server URL (not whenpassing through a proxy)
@@ -126,14 +130,17 @@ namespace ETP_NS
126130
* @param port The port number to connect to.
127131
* @param urlPath The rest of the locator consists of data specific to the scheme, and is known as the "url-path".
128132
* It supplies the details of how the specified resource can be accessed.
129-
* Note that the "/" between the host (or port) and the url-path is NOT part of the url-path.
133+
* It must start with a slash or be empty.
130134
*/
131135
InitializationParameters(const std::string & instanceUuid,
132136
const std::string& host, uint16_t port, const std::string& urlPath = "") :
133137
etpServerHost(host), etpServerPort(port), etpServerUrlPath(urlPath)
134138
{
135139
std::stringstream ss(instanceUuid);
136140
ss >> identifier;
141+
if (!etpServerUrlPath.empty() && etpServerUrlPath[0] != '/') {
142+
throw std::invalid_argument("urlPath must start with a slash or be empty");
143+
}
137144
}
138145

139146
virtual ~InitializationParameters() = default;

src/fetpapi/etp/PlainClientSession.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ under the License.
2222
using namespace ETP_NS;
2323

2424
PlainClientSession::PlainClientSession(
25-
InitializationParameters const* initializationParams, const std::string & target, const std::string & authorization, const std::string& proxyAuthorization,
25+
InitializationParameters const* initializationParams,const std::string & authorization, const std::string& proxyAuthorization,
2626
const std::map<std::string, std::string>& additionalHandshakeHeaderFields, std::size_t frameSize) :
27-
AbstractClientSessionCRTP<PlainClientSession>(initializationParams, target, authorization, proxyAuthorization), frameSize_(frameSize)
27+
AbstractClientSessionCRTP<PlainClientSession>(initializationParams, authorization, proxyAuthorization), frameSize_(frameSize)
2828
{
2929
additionalHandshakeHeaderFields_ = additionalHandshakeHeaderFields;
3030
}

src/fetpapi/etp/PlainClientSession.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ namespace ETP_NS
2929
* @param frameSize Sets the size of the write buffer used by the implementation to send frames : https://www.boost.org/doc/libs/1_75_0/libs/beast/doc/html/beast/ref/boost__beast__websocket__stream/write_buffer_bytes/overload1.html.
3030
*/
3131
FETPAPI_DLL_IMPORT_OR_EXPORT PlainClientSession(
32-
InitializationParameters const* initializationParams, const std::string & target, const std::string & authorization, const std::string& proxyAuthorization = "",
32+
InitializationParameters const* initializationParams, const std::string & authorization, const std::string& proxyAuthorization = "",
3333
const std::map<std::string, std::string>& additionalHandshakeHeaderFields = {}, std::size_t frameSize = 4096);
3434

3535
virtual ~PlainClientSession() = default;
@@ -91,5 +91,13 @@ namespace ETP_NS
9191
std::unique_ptr<websocket::stream<boost::beast::tcp_stream>> ws_;
9292
#endif
9393
std::size_t frameSize_;
94+
95+
/**
96+
* Force closing of the session
97+
*/
98+
void forceClose() {
99+
ws_->next_layer().close();
100+
webSocketSessionClosed = true;
101+
}
94102
};
95103
}

0 commit comments

Comments
 (0)