Skip to content

Commit 7abed3c

Browse files
Session survivability experimental support
1 parent b4e50cb commit 7abed3c

29 files changed

Lines changed: 2694 additions & 2317 deletions

example/withFesapi/etpClient.cpp

Lines changed: 65 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -360,15 +360,14 @@ void askUser(std::shared_ptr<ETP_NS::AbstractSession> session, COMMON_NS::DataOb
360360
else if (commandTokens[0] == "PutDataObject") {
361361
auto* dataObj = repo.getDataObjectByUuid(commandTokens[1]);
362362
if (dataObj != nullptr) {
363-
Energistics::Etp::v12::Protocol::Store::PutDataObjects putDataObjects;
364363
Energistics::Etp::v12::Datatypes::Object::DataObject dataObject = ETP_NS::FesapiHelpers::buildEtpDataObjectFromEnergisticsObject(dataObj);
365-
putDataObjects.dataObjects["0"] = dataObject;
366-
367-
session->send(putDataObjects, 0, 0x10 | 0x02); // 0x10 requires Acknowledge from the store
364+
std::map<std::string, Energistics::Etp::v12::Datatypes::Object::DataObject> dataObjects;
365+
dataObjects["0"] = dataObject;
366+
session->putDataObjects(dataObjects);
368367
}
369368
}
370369
else if (commandTokens[0] == "SubscribeNotif") {
371-
Energistics::Etp::v12::Protocol::StoreNotification::SubscribeNotifications mb;
370+
auto mb = std::make_shared<Energistics::Etp::v12::Protocol::StoreNotification::SubscribeNotifications>();
372371
Energistics::Etp::v12::Datatypes::Object::SubscriptionInfo subscriptionInfo;
373372
subscriptionInfo.context.uri = commandTokens[1];
374373
subscriptionInfo.scope = Energistics::Etp::v12::Datatypes::Object::ContextScopeKind::self;
@@ -402,7 +401,7 @@ void askUser(std::shared_ptr<ETP_NS::AbstractSession> session, COMMON_NS::DataOb
402401
}
403402
}
404403

405-
mb.request["0"] = subscriptionInfo;
404+
mb->request["0"] = subscriptionInfo;
406405

407406
session->send(mb, 0, 0x02);
408407

@@ -483,10 +482,10 @@ void askUser(std::shared_ptr<ETP_NS::AbstractSession> session, COMMON_NS::DataOb
483482
//session->deleteDataspaces({ {"0", "eml:///dataspace('project/study')"} });
484483
}
485484
else if (commandTokens[0] == "Ping") {
486-
Energistics::Etp::v12::Protocol::Core::Ping ping;
487-
ping.currentDateTime = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
485+
auto ping = std::make_shared<Energistics::Etp::v12::Protocol::Core::Ping>();
486+
ping->currentDateTime = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
488487
session->send(ping, 0, 0x02);
489-
std::cout << "PING at " << ping.currentDateTime << std::endl;
488+
std::cout << "PING at " << ping->currentDateTime << std::endl;
490489
std::cout << "Please Set Verbosity to 1 if you don't see anything" << std::endl;
491490
}
492491
else if (commandTokens[0] == "PutDummyHorizon") {
@@ -549,45 +548,49 @@ void askUser(std::shared_ptr<ETP_NS::AbstractSession> session, COMMON_NS::DataOb
549548

550549
auto transaction_start = std::chrono::high_resolution_clock::now();
551550

552-
session->startTransaction(dataspacesToLock);
553-
554-
const size_t ni = 10000;
555-
const size_t nj = 1000;
556-
std::unique_ptr<double[]> resqml_points(new double[ni * nj]);
557-
for (double i = 0; i < ni * nj; ++i) {
558-
resqml_points[(int)i] = i * 100;
559-
}
560-
horizon_grid_2d_representation->setGeometryAsArray2dOfExplicitZ(resqml_points.get(), ni, nj, hdf_proxy,
561-
0.0, 0.0, 0.0,
562-
1.0, 0.0, 0.0, 25.0,
563-
0.0, 1.0, 0.0, 50.0);
564-
565-
for (size_t propIndex = 0; propIndex < 1; ++propIndex) {
566-
auto t_start = std::chrono::high_resolution_clock::now();
567-
auto* prop = tmpRepo.createContinuousProperty(horizon_grid_2d_representation, "", "", 1, gsoap_eml2_3::eml23__IndexableElement::nodes, gsoap_resqml2_0_1::resqml20__ResqmlUom::m,
568-
gsoap_resqml2_0_1::resqml20__ResqmlPropertyKind::length);
569-
std::unique_ptr<double[]> prop_values(new double[ni * nj]);
570-
prop->pushBackDoubleHdf5Array2dOfValues(prop_values.get(), ni, nj, hdf_proxy);
571-
std::cout << " Pushed prop " << propIndex << " in " << std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() << " ms" << std::endl;
572-
std::cout << " Global time " << std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - transaction_start).count() << " s" << std::endl;
573-
}
551+
std::string result = session->startTransaction(dataspacesToLock);
552+
if (result.empty()) {
553+
const size_t ni = 10000;
554+
const size_t nj = 1000;
555+
std::unique_ptr<double[]> resqml_points(new double[ni * nj]);
556+
for (double i = 0; i < ni * nj; ++i) {
557+
resqml_points[(int)i] = i * 100;
558+
}
559+
horizon_grid_2d_representation->setGeometryAsArray2dOfExplicitZ(resqml_points.get(), ni, nj, hdf_proxy,
560+
0.0, 0.0, 0.0,
561+
1.0, 0.0, 0.0, 25.0,
562+
0.0, 1.0, 0.0, 50.0);
563+
564+
for (size_t propIndex = 0; propIndex < 100; ++propIndex) {
565+
auto t_start = std::chrono::high_resolution_clock::now();
566+
auto* prop = tmpRepo.createContinuousProperty(horizon_grid_2d_representation, "", "", 1, gsoap_eml2_3::eml23__IndexableElement::nodes, gsoap_resqml2_0_1::resqml20__ResqmlUom::m,
567+
gsoap_resqml2_0_1::resqml20__ResqmlPropertyKind::length);
568+
std::unique_ptr<double[]> prop_values(new double[ni * nj]);
569+
prop->pushBackDoubleHdf5Array2dOfValues(prop_values.get(), ni, nj, hdf_proxy);
570+
std::cout << " Pushed prop " << propIndex << " in " << std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() << " ms" << std::endl;
571+
std::cout << " Global time " << std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - transaction_start).count() << " s" << std::endl;
572+
}
574573

575-
tmpRepo.setUriSource(dataspace.uri);
576-
std::map<std::string, Energistics::Etp::v12::Datatypes::Object::DataObject> dataobjects;
577-
auto allUuids = tmpRepo.getUuids();
578-
int index = 0;
579-
for (auto& uuid : allUuids)
580-
dataobjects[std::to_string(index++)] = ETP_NS::FesapiHelpers::buildEtpDataObjectFromEnergisticsObject(tmpRepo, uuid);
581-
successKeys = session->putDataObjects(dataobjects);
582-
for (std::string& str : successKeys)
583-
std::cout << "successKey : " << str << std::endl;
574+
tmpRepo.setUriSource(dataspace.uri);
575+
std::map<std::string, Energistics::Etp::v12::Datatypes::Object::DataObject> dataobjects;
576+
auto allUuids = tmpRepo.getUuids();
577+
int index = 0;
578+
for (auto& uuid : allUuids)
579+
dataobjects[std::to_string(index++)] = ETP_NS::FesapiHelpers::buildEtpDataObjectFromEnergisticsObject(tmpRepo, uuid);
580+
successKeys = session->putDataObjects(dataobjects);
581+
for (std::string& str : successKeys)
582+
std::cout << "successKey : " << str << std::endl;
584583

585-
std::cout << "commit : " << session->commitTransaction() << std::endl;
584+
std::cout << "commit : " << session->commitTransaction() << std::endl;
586585

587-
if (session != nullptr && !session->isWebSocketSessionClosed())
588-
horizon_grid_2d_representation->getZValues(resqml_points.get());
586+
if (session != nullptr && !session->isWebSocketSessionClosed())
587+
horizon_grid_2d_representation->getZValues(resqml_points.get());
589588

590-
tmpRepo.clear();
589+
tmpRepo.clear();
590+
}
591+
else {
592+
std::cerr << "Error when opening transaction : " << result << std::endl;
593+
}
591594
}
592595
else if (commandTokens[0] == "GetDataspaces") {
593596
const auto dataspaces = session->getDataspaces();
@@ -634,11 +637,10 @@ void askUser(std::shared_ptr<ETP_NS::AbstractSession> session, COMMON_NS::DataOb
634637
h1i1PointSetRep->pushBackXyzGeometryPatch(6, pointCoords, nullptr, crs);
635638

636639
// Now send the XML part
637-
Energistics::Etp::v12::Protocol::Store::PutDataObjects putDataObjects;
638640
Energistics::Etp::v12::Datatypes::Object::DataObject dataObject = ETP_NS::FesapiHelpers::buildEtpDataObjectFromEnergisticsObject(h1i1PointSetRep);
639-
putDataObjects.dataObjects["0"] = dataObject;
640-
641-
session->send(putDataObjects, 0, 0x02 | 0x10); // 0x10 requires Acknowledge from the store
641+
std::map<std::string, Energistics::Etp::v12::Datatypes::Object::DataObject> dataObjects;
642+
dataObjects["0"] = dataObject;
643+
session->putDataObjects(dataObjects);
642644
}
643645
else if (commandTokens[0] == "PutAllDataObjects") {
644646
std::map<std::string, Energistics::Etp::v12::Datatypes::Object::DataObject> putDataObjectsMap;
@@ -654,34 +656,34 @@ void askUser(std::shared_ptr<ETP_NS::AbstractSession> session, COMMON_NS::DataOb
654656
}
655657
else if (commandTokens.size() == 3) {
656658
if (commandTokens[0] == "GetDataArray") {
657-
Energistics::Etp::v12::Protocol::DataArray::GetDataArrays gda;
658-
gda.dataArrays["0"].uri = commandTokens[1];
659-
gda.dataArrays["0"].pathInResource = commandTokens[2];
660-
std::cout << gda.dataArrays["0"].pathInResource << std::endl;
659+
auto gda = std::make_shared< Energistics::Etp::v12::Protocol::DataArray::GetDataArrays>();
660+
gda->dataArrays["0"].uri = commandTokens[1];
661+
gda->dataArrays["0"].pathInResource = commandTokens[2];
662+
std::cout << gda->dataArrays["0"].pathInResource << std::endl;
661663
session->send(gda, 0, 0x02);
662664
std::cout << "Please Set Verbosity to 1 if you don't see anything" << std::endl;
663665
}
664666
else if (commandTokens[0] == "GetDataArrayMetadata") {
665-
Energistics::Etp::v12::Protocol::DataArray::GetDataArrayMetadata msg;
666-
msg.dataArrays["0"].uri = commandTokens[1];
667-
msg.dataArrays["0"].pathInResource = commandTokens[2];
668-
std::cout << msg.dataArrays["0"].pathInResource << std::endl;
667+
auto msg = std::make_shared< Energistics::Etp::v12::Protocol::DataArray::GetDataArrayMetadata>();
668+
msg->dataArrays["0"].uri = commandTokens[1];
669+
msg->dataArrays["0"].pathInResource = commandTokens[2];
670+
std::cout << msg->dataArrays["0"].pathInResource << std::endl;
669671
session->send(msg, 0, 0x02);
670672
std::cout << "Please Set Verbosity to 1 if you don't see anything" << std::endl;
671673
}
672674
else if (commandTokens[0] == "PutDataArray") {
673-
Energistics::Etp::v12::Protocol::DataArray::PutDataArrays pda;
674-
pda.dataArrays["0"].uid.uri = commandTokens[1];
675-
pda.dataArrays["0"].uid.pathInResource = commandTokens[2];
675+
auto pda = std::make_shared< Energistics::Etp::v12::Protocol::DataArray::PutDataArrays>();
676+
pda->dataArrays["0"].uid.uri = commandTokens[1];
677+
pda->dataArrays["0"].uid.pathInResource = commandTokens[2];
676678

677679
std::vector<int64_t> dimensions = { 10 };
678-
pda.dataArrays["0"].array.dimensions = dimensions;
680+
pda->dataArrays["0"].array.dimensions = dimensions;
679681

680682
Energistics::Etp::v12::Datatypes::AnyArray data;
681683
Energistics::Etp::v12::Datatypes::ArrayOfInt arrayOfInt;
682684
arrayOfInt.values = { 0,1,2,3,4,5,6,7,8,9 };
683685
data.item.set_ArrayOfInt(std::move(arrayOfInt));
684-
pda.dataArrays["0"].array.data = data;
686+
pda->dataArrays["0"].array.data = data;
685687
std::cout << "Start sending the array" << std::endl;
686688

687689
session->send(pda, 0, 0x02);
@@ -751,12 +753,12 @@ int main(int argc, char **argv)
751753
auto t_start = std::chrono::high_resolution_clock::now();
752754
while (clientSession->isEtpSessionClosed()) {
753755
auto timeOut = std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count();
754-
if (timeOut > 5000) {
756+
if (timeOut > 50000) {
755757
throw std::invalid_argument("Time out : " + std::to_string(timeOut) + " ms.\n");
756758
}
757759
}
758760

759-
clientSession->setTimeOut(60000);
761+
clientSession->setTimeOut(10000);
760762
askUser(clientSession, repo);
761763

762764
sessionThread.join();

src/etp/AbstractClientSessionCRTP.h

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -127,35 +127,48 @@ namespace ETP_NS
127127
return;
128128
}
129129

130+
auto& front = sendingQueue.front();
130131
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
131-
bool previousSentMessageCompleted = specificProtocolHandlers.find(std::get<0>(sendingQueue.front())) == specificProtocolHandlers.end();
132+
bool previousSentMessageCompleted = specificProtocolHandlers.find(std::get<0>(front)->messageHeader.messageId) == specificProtocolHandlers.end();
132133

133134
if (!previousSentMessageCompleted) {
134-
fesapi_log("Cannot send Message id :", std::to_string(std::get<0>(sendingQueue.front())), "because the previous message has not finished to be sent.");
135+
fesapi_log("Cannot send Message id :", std::to_string(std::get<0>(front)->messageHeader.messageId), "because the previous message has not finished to be sent.");
135136
}
136137
else {
137-
fesapi_log("Sending Message id :", std::to_string(std::get<0>(sendingQueue.front())));
138-
139-
derived().ws()->async_write(
140-
boost::asio::buffer(std::get<1>(sendingQueue.front())),
141-
[this, self{ this->shared_from_this() }](boost::system::error_code ec, std::size_t)
142-
->void
143-
{
144-
if (ec) {
145-
std::cerr << "on_write : " << ec.message() << std::endl;
146-
}
147-
else {
148-
// Register the handler to respond to the sent message
149-
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
150-
specificProtocolHandlers[std::get<0>(sendingQueue.front())] = std::get<2>(sendingQueue.front());
151-
}
152-
153-
// Remove the sent message from the queue
154-
const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
155-
sendingQueue.pop();
156-
157-
do_write();
158-
});
138+
fesapi_log("Sending Message id :", std::to_string(std::get<0>(front)->messageHeader.messageId));
139+
140+
auto avroBytes = std::get<0>(front)->encodeHeaderAndBody();
141+
142+
//asio::buffer is a non-owning view. We must keep the underlying storage alive until the I/O completes.
143+
if (avroBytes->size() < maxWebSocketMessagePayloadSize) {
144+
derived().ws()->async_write(
145+
boost::asio::buffer(*avroBytes),
146+
[this, self{ this->shared_from_this() }, avroBytes](boost::system::error_code ec, std::size_t)
147+
->void
148+
{
149+
150+
if (ec) {
151+
std::cerr << "on_write : " << ec.message() << std::endl;
152+
}
153+
else {
154+
// Register the handler to respond to the sent message
155+
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
156+
auto& front = sendingQueue.front();
157+
auto nextMessage = std::get<0>(front);
158+
specificProtocolHandlers[nextMessage->messageHeader.messageId] =
159+
std::make_tuple(nextMessage, std::get<1>(front), std::get<2>(front));
160+
}
161+
162+
// Remove the sent message from the queue
163+
const std::lock_guard<std::mutex> sendingQueueLock(sendingQueueMutex);
164+
sendingQueue.pop();
165+
166+
do_write();
167+
});
168+
}
169+
else {
170+
throw std::invalid_argument("You cannot send a message which is too big. Please use message part or chunk or whatever else.");
171+
}
159172
}
160173
}
161174
};

0 commit comments

Comments
 (0)