Skip to content

Commit 66ac4b1

Browse files
Now PutDataArray slabs in an async mode.
1 parent 8e0a2ea commit 66ac4b1

2 files changed

Lines changed: 57 additions & 4 deletions

File tree

src/etp/fesapi/FesapiHdfProxy.cpp

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,33 @@ void FesapiHdfProxy::writeArrayNdSlab(
326326
const uint64_t* numValuesInEachDimension,
327327
const uint64_t* offsetInEachDimension,
328328
unsigned int numDimensions)
329+
{
330+
std::set<int64_t>stillProcessingMsgIds = async_writeArrayNdSlab(groupName, datasetName,
331+
datatype, values, numValuesInEachDimension,
332+
offsetInEachDimension, numDimensions);
333+
334+
auto t_start = std::chrono::high_resolution_clock::now();
335+
while (!stillProcessingMsgIds.empty()) {
336+
for (int64_t msgId : stillProcessingMsgIds) {
337+
if (!session_->isMessageStillProcessing(msgId)) {
338+
stillProcessingMsgIds.erase(msgId);
339+
}
340+
}
341+
if (std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() > session_->getTimeOut()) {
342+
throw std::runtime_error("Time out waiting for a writeArrayNdSlab response");
343+
}
344+
}
345+
std::cerr << "writeArrayNdSlab Response got in " << std::chrono::duration<double, std::milli>(std::chrono::high_resolution_clock::now() - t_start).count() << " ms" << std::endl;
346+
}
347+
348+
std::set<int64_t> FesapiHdfProxy::async_writeArrayNdSlab(
349+
const string& groupName,
350+
const string& datasetName,
351+
COMMON_NS::AbstractObject::numericalDatatypeEnum datatype,
352+
const void* values,
353+
const uint64_t* numValuesInEachDimension,
354+
const uint64_t* offsetInEachDimension,
355+
unsigned int numDimensions)
329356
{
330357
if (!isOpened())
331358
open();
@@ -370,6 +397,7 @@ void FesapiHdfProxy::writeArrayNdSlab(
370397
"You need to give a COMMON_NS::AbstractObject::numericalDatatypeEnum as the datatype");
371398
}
372399

400+
std::set<int64_t> sentMessageIds;
373401
if (totalCount * valueSize <= maxArraySize_) {
374402
std::vector<int64_t> counts;
375403
std::vector<int64_t> starts;
@@ -389,7 +417,7 @@ void FesapiHdfProxy::writeArrayNdSlab(
389417
pdsa.dataSubarrays["0"].data = convertVoidArrayIntoAvroAnyArray(datatype, values, totalCount);
390418

391419
// Send putDataSubarrays Message
392-
session_->sendAndBlock(pdsa, 0, 0x02);
420+
sentMessageIds.insert(session_->send(pdsa, 0, 0x02));
393421
}
394422
else {
395423
std::unique_ptr<uint64_t[]> counts(new uint64_t[numDimensions]);
@@ -405,10 +433,11 @@ void FesapiHdfProxy::writeArrayNdSlab(
405433
if (numValuesInEachDimension[dimIdx] > 1) {
406434
uint64_t previousCount = counts[dimIdx];
407435
counts[dimIdx] /= 2;
408-
409-
writeArrayNdSlab(groupName, datasetName,
436+
437+
std::set<int64_t> intermediateResult = async_writeArrayNdSlab(groupName, datasetName,
410438
datatype, values, counts.get(),
411439
starts.get(), numDimensions);
440+
sentMessageIds.insert(intermediateResult.begin(), intermediateResult.end());
412441

413442
writtenTotalCount = std::accumulate(counts.get(), counts.get() + numDimensions, 1, std::multiplies<size_t>());
414443

@@ -424,10 +453,13 @@ void FesapiHdfProxy::writeArrayNdSlab(
424453
+ std::to_string(maxArraySize_) + " bytes.");
425454
}
426455

427-
writeArrayNdSlab(groupName, datasetName, datatype,
456+
std::set<int64_t> intermediateResult = async_writeArrayNdSlab(groupName, datasetName, datatype,
428457
(int8_t*)values + (writtenTotalCount * valueSize), counts.get(),
429458
starts.get(), numDimensions);
459+
sentMessageIds.insert(intermediateResult.begin(), intermediateResult.end());
430460
}
461+
462+
return sentMessageIds;
431463
}
432464

433465
void FesapiHdfProxy::readArrayNdOfDoubleValues(

src/etp/fesapi/FesapiHdfProxy.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ under the License.
2323
#include "../AbstractSession.h"
2424
#include "../ProtocolHandlers/GetFullDataArrayHandlers.h"
2525

26+
#include <set>
2627
#include <type_traits>
2728

2829
namespace ETP_NS
@@ -184,6 +185,26 @@ namespace ETP_NS
184185
const uint64_t* offsetValuesInEachDimension,
185186
unsigned int numDimensions) final;
186187

188+
/**
189+
* Find the array associated with @p groupName and @p name and write to it asynchronously.
190+
* @param groupName The name of the group associated with the array.
191+
* @param name The name of the array (potentially with multi dimensions).
192+
* @param datatype The specific datatype of the values to write.
193+
* @param values 1d array of specific datatype ordered firstly by fastest direction.
194+
* @param numValuesInEachDimension Number of values in each dimension of the array to write. They are ordered from fastest index to slowest index.
195+
* @param offsetValuesInEachDimension Offset values in each dimension of the array to write. They are ordered from fastest index to slowest index.
196+
* @param numDimensions The number of the dimensions of the array to write.
197+
* @return All message ids which have been sent to the ETP server
198+
*/
199+
std::set<int64_t> async_writeArrayNdSlab(
200+
const std::string& groupName,
201+
const std::string& name,
202+
COMMON_NS::AbstractObject::numericalDatatypeEnum datatype,
203+
const void* values,
204+
const uint64_t* numValuesInEachDimension,
205+
const uint64_t* offsetValuesInEachDimension,
206+
unsigned int numDimensions);
207+
187208
/**
188209
* Write some string attributes into a group
189210
*/

0 commit comments

Comments
 (0)