Skip to content

Commit f16bb4a

Browse files
committed
Async reading
1 parent a1adc36 commit f16bb4a

7 files changed

Lines changed: 93 additions & 36 deletions

File tree

include/openPMD/toolkit/Aws.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ struct ExternalBlockStorageAws : ExternalBlockStorageBackend
5252
auto
5353
put(std::string const &identifier, auxiliary::WriteBuffer data, size_t len)
5454
-> std::string override;
55-
void get(std::string const &external_ref, void *data, size_t len) override;
55+
void
56+
get(std::string const &external_ref,
57+
std::shared_ptr<void> data,
58+
size_t len) override;
5659
[[nodiscard]] auto externalStorageLocation() const
5760
-> nlohmann::json override;
5861
void sync() override;

include/openPMD/toolkit/ExternalBlockStorage.hpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
#include <nlohmann/json.hpp>
99

1010
#include <cstddef>
11-
#include <cstdint>
1211
#include <memory>
1312
#include <optional>
1413
#include <string>
@@ -26,7 +25,9 @@ struct ExternalBlockStorageBackend
2625
put(std::string const &identifier, auxiliary::WriteBuffer data, size_t len)
2726
-> std::string = 0;
2827
virtual void
29-
get(std::string const &external_ref, void *data, size_t len) = 0;
28+
get(std::string const &external_ref,
29+
std::shared_ptr<void> data,
30+
size_t len) = 0;
3031
[[nodiscard]] virtual auto externalStorageLocation() const
3132
-> nlohmann::json = 0;
3233

@@ -98,15 +99,15 @@ class ExternalBlockStorage
9899
std::string const &identifier,
99100
nlohmann::json const &fullJsonDataset,
100101
nlohmann::json::json_pointer const &path,
101-
T *data);
102+
std::shared_ptr<void> &data);
102103

103104
template <typename DatatypeHandling, typename T>
104105
void read(
105106
Offset const &blockOffset,
106107
Extent const &blockExtent,
107108
nlohmann::json const &fullJsonDataset,
108109
nlohmann::json::json_pointer const &path,
109-
T *data);
110+
std::shared_ptr<void> &data);
110111

111112
void sync();
112113

include/openPMD/toolkit/Stdio.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ struct ExternalBlockStorageStdio : ExternalBlockStorageBackend
1515
auto
1616
put(std::string const &identifier, auxiliary::WriteBuffer data, size_t len)
1717
-> std::string override;
18-
void get(std::string const &external_ref, void *data, size_t len) override;
18+
void
19+
get(std::string const &external_ref,
20+
std::shared_ptr<void> data,
21+
size_t len) override;
1922
[[nodiscard]] auto externalStorageLocation() const
2023
-> nlohmann::json override;
2124
~ExternalBlockStorageStdio() override;

src/IO/JSON/JSONIOHandlerImpl.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1605,11 +1605,10 @@ namespace
16051605
struct RetrieveExternally
16061606
{
16071607
template <typename T, typename... Args>
1608-
static void
1609-
call(ExternalBlockStorage &blockStorage, void *ptr, Args &&...args)
1608+
static void call(ExternalBlockStorage &blockStorage, Args &&...args)
16101609
{
16111610
blockStorage.read<internal::JsonDatatypeHandling, T>(
1612-
std::forward<Args>(args)..., static_cast<T *>(ptr));
1611+
std::forward<Args>(args)...);
16131612
}
16141613

16151614
static constexpr char const *errorMsg = "RetrieveExternally";
@@ -1654,11 +1653,11 @@ void JSONIOHandlerImpl::readDataset(
16541653
switchDatasetType<RetrieveExternally>(
16551654
parameters.dtype,
16561655
*external,
1657-
parameters.data.get(),
16581656
parameters.offset,
16591657
parameters.extent,
16601658
jsonRoot,
1661-
filePosition->id);
1659+
filePosition->id,
1660+
parameters.data);
16621661
}},
16631662
localMode.as_base());
16641663
}

src/toolkit/Aws.cpp

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -173,13 +173,14 @@ auto ExternalBlockStorageAws::put(
173173
async_counter.add_and_notify_result();
174174
};
175175
async_counter.add_task();
176-
m_client.PutObjectAsync(put_request, responseReceivedHandler);
176+
m_client.PutObjectAsync(
177+
put_request, std::move(responseReceivedHandler));
177178
}
178179
return sanitized;
179180
}
180181

181182
void ExternalBlockStorageAws::get(
182-
std::string const &external_ref, void *data, size_t len)
183+
std::string const &external_ref, std::shared_ptr<void> data, size_t len)
183184
{
184185
if (len == 0)
185186
{
@@ -190,23 +191,69 @@ void ExternalBlockStorageAws::get(
190191
get_request.SetBucket(m_bucketName);
191192
get_request.SetKey(external_ref);
192193

193-
auto get_outcome = m_client.GetObject(get_request);
194-
if (!get_outcome.IsSuccess())
194+
auto processGetOutcome = [len](
195+
Aws::S3::Model::GetObjectOutcome const
196+
&get_outcome,
197+
void *data_lambda) {
198+
auto &body = get_outcome.GetResult().GetBody();
199+
body.read(
200+
reinterpret_cast<char *>(data_lambda),
201+
static_cast<std::streamsize>(len));
202+
std::streamsize read_bytes = body.gcount();
203+
if (read_bytes != static_cast<std::streamsize>(len))
204+
{
205+
throw std::runtime_error(
206+
"ExternalBlockStorageAws: failed to read expected number of "
207+
"bytes "
208+
"from S3 object");
209+
}
210+
};
211+
212+
if (!m_async.has_value())
195213
{
196-
throw std::runtime_error(
197-
std::string("ExternalBlockStorageAws::get failed: ") +
198-
get_outcome.GetError().GetMessage());
199-
}
214+
auto get_outcome = m_client.GetObject(get_request);
215+
if (!get_outcome.IsSuccess())
216+
{
217+
throw std::runtime_error(
218+
std::string("ExternalBlockStorageAws::get failed: ") +
219+
get_outcome.GetError().GetMessage());
220+
}
200221

201-
auto &body = get_outcome.GetResult().GetBody();
202-
body.read(
203-
reinterpret_cast<char *>(data), static_cast<std::streamsize>(len));
204-
std::streamsize read_bytes = body.gcount();
205-
if (read_bytes != static_cast<std::streamsize>(len))
222+
processGetOutcome(get_outcome, data.get());
223+
}
224+
else
206225
{
207-
throw std::runtime_error(
208-
"ExternalBlockStorageAws: failed to read expected number of bytes "
209-
"from S3 object");
226+
auto &async_counter = this->m_async->shared_ptr_operations;
227+
auto responseReceivedHandler =
228+
[&async_counter,
229+
external_ref,
230+
processGetOutcome_lambda = std::move(processGetOutcome),
231+
data_lambda = std::move(data)](
232+
const Aws::S3::S3Client *,
233+
const Aws::S3::Model::GetObjectRequest &,
234+
const Aws::S3::Model::GetObjectOutcome &get_outcome,
235+
const std::shared_ptr<const Aws::Client::AsyncCallerContext>
236+
&) {
237+
if (get_outcome.IsSuccess())
238+
{
239+
// std::cout << "File asynchronously downloaded successfully
240+
// "
241+
// "from S3!"
242+
// << std::endl;
243+
}
244+
else
245+
{
246+
std::cerr << "Asynchronous download failed for '"
247+
<< external_ref
248+
<< "': " << get_outcome.GetError().GetMessage()
249+
<< std::endl;
250+
}
251+
processGetOutcome_lambda(get_outcome, data_lambda.get());
252+
async_counter.add_and_notify_result();
253+
};
254+
async_counter.add_task();
255+
m_client.GetObjectAsync(
256+
get_request, std::move(responseReceivedHandler));
210257
}
211258
}
212259

src/toolkit/ExternalBlockStorage.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "openPMD/auxiliary/Memory.hpp"
66
#include "openPMD/auxiliary/StringManip.hpp"
77

8+
#include <memory>
89
#include <nlohmann/json.hpp>
910

1011
#include <numeric>
@@ -38,12 +39,15 @@ namespace
3839
void read_impl(
3940
internal::ExternalBlockStorageBackend *backend,
4041
nlohmann::json const &external_block,
41-
T *data,
42+
std::shared_ptr<void> &data,
4243
size_t len)
4344
{
4445
auto const &external_ref =
4546
external_block.at("external_ref").get<std::string>();
46-
backend->get(external_ref, data, sizeof(T) * len);
47+
backend->get(
48+
external_ref,
49+
std::static_pointer_cast<void>(data),
50+
sizeof(T) * len);
4751
}
4852
} // namespace
4953

@@ -166,7 +170,7 @@ void ExternalBlockStorage::read(
166170
[[maybe_unused]] std::string const &identifier,
167171
[[maybe_unused]] nlohmann::json const &fullJsonDataset,
168172
[[maybe_unused]] nlohmann::json::json_pointer const &path,
169-
[[maybe_unused]] T *data)
173+
[[maybe_unused]] std::shared_ptr<void> &data)
170174
{
171175
throw std::runtime_error("Unimplemented!");
172176
}
@@ -177,7 +181,7 @@ void ExternalBlockStorage::read(
177181
Extent const &blockExtent,
178182
nlohmann::json const &fullJsonDataset,
179183
nlohmann::json::json_pointer const &path,
180-
T *data)
184+
std::shared_ptr<void> &data)
181185
{
182186
auto &dataset = fullJsonDataset[path];
183187
if (!DatatypeHandling::template checkDatatype<T>(dataset))
@@ -199,7 +203,7 @@ void ExternalBlockStorage::read(
199203
continue;
200204
}
201205
found_a_precise_match = true;
202-
read_impl(m_worker.get(), block, data, flat_extent(blockExtent));
206+
read_impl<T>(m_worker.get(), block, data, flat_extent(blockExtent));
203207
break;
204208
}
205209
catch (nlohmann::json::exception const &e)
@@ -256,13 +260,13 @@ void ExternalBlockStorage::sanitizeString(std::string &s)
256260
std::string const &identifier, \
257261
nlohmann::json const &fullJsonDataset, \
258262
nlohmann::json::json_pointer const &path, \
259-
type *data); \
263+
std::shared_ptr<void> &data); \
260264
template void ExternalBlockStorage::read<datatypehandling, type>( \
261265
Offset const &blockOffset, \
262266
Extent const &blockExtent, \
263267
nlohmann::json const &fullJsonDataset, \
264268
nlohmann::json::json_pointer const &path, \
265-
type *data);
269+
std::shared_ptr<void> &data);
266270
#define OPENPMD_INSTANTIATE(type) \
267271
OPENPMD_INSTANTIATE_DATATYPEHANDLING(internal::JsonDatatypeHandling, type)
268272
OPENPMD_FOREACH_DATASET_DATATYPE(OPENPMD_INSTANTIATE)

src/toolkit/Stdio.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ auto ExternalBlockStorageStdio::put(
101101
}
102102

103103
void ExternalBlockStorageStdio::get(
104-
std::string const &external_ref, void *data, size_t len)
104+
std::string const &external_ref, std::shared_ptr<void> data, size_t len)
105105
{
106106
if (len == 0)
107107
{
@@ -118,7 +118,7 @@ void ExternalBlockStorageStdio::get(
118118
filepath);
119119
}
120120

121-
size_t read = std::fread(data, 1, len, file);
121+
size_t read = std::fread(data.get(), 1, len, file);
122122
if (read != len)
123123
{
124124
std::fclose(file);

0 commit comments

Comments
 (0)