Skip to content

Commit 7e5916a

Browse files
committed
DPL Analysis: use shm metadata in CCDB tables instead of binary view
1 parent 27d833f commit 7e5916a

8 files changed

Lines changed: 230 additions & 22 deletions

File tree

Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#include "AnalysisCCDBHelpers.h"
1313
#include "CCDBFetcherHelper.h"
14+
#include "Framework/ArrowTypes.h"
1415
#include "Framework/DataProcessingStats.h"
1516
#include "Framework/DeviceSpec.h"
1617
#include "Framework/TimingInfo.h"
@@ -109,20 +110,37 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
109110
auto it = ccdbUrls.find(m.name);
110111
fieldMetadata->Append("url", it != ccdbUrls.end() ? it->second : m.defaultValue.asString());
111112
auto columnName = m.name.substr(strlen("ccdb:"));
112-
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, fieldMetadata));
113+
fields.emplace_back(std::make_shared<arrow::Field>(columnName, soa::asArrowDataType<int64_t[3]>(), false, fieldMetadata));
113114
}
114115
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
115116
}
116117

118+
std::vector<std::pair<uint32_t, std::shared_ptr<arrow::FixedSizeListBuilder>>> allbuilders;
119+
allbuilders.resize([&schemas]() { size_t size = 0; for (auto& schema : schemas) { size += schema->num_fields(); }; return size; }());
120+
auto* pool = arrow::default_memory_pool();
121+
122+
int idx = 0;
123+
int sidx = 0;
124+
for (auto const& schema : schemas) {
125+
for (auto const& _ : schema->fields()) {
126+
auto value_builder = std::make_shared<arrow::Int64Builder>();
127+
allbuilders[idx] = std::make_pair(sidx, std::make_shared<arrow::FixedSizeListBuilder>(pool, std::move(value_builder), 3));
128+
++idx;
129+
}
130+
++sidx;
131+
}
132+
117133
std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
118134
CCDBFetcherHelper::initialiseHelper(*helper, options);
119135
std::unordered_map<std::string, int> bindings;
120136
fillValidRoutes(*helper, spec.outputs, bindings);
121137

122-
return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) {
138+
return adaptStateless([schemas, bindings, helper, allbuilders](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) {
123139
O2_SIGNPOST_ID_GENERATE(sid, ccdb);
124140
O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice);
125-
for (auto& schema : schemas) {
141+
std::ranges::for_each(allbuilders, [](auto& builder) { builder.second->Reset(); });
142+
for (auto i = 0U; i < schemas.size(); ++i) {
143+
auto& schema = schemas[i];
126144
std::vector<CCDBFetcherHelper::FetchOp> ops;
127145
auto inputBinding = *schema->metadata()->Get("sourceTable");
128146
auto inputMatcher = DataSpecUtils::fromString(*schema->metadata()->Get("sourceMatcher"));
@@ -134,6 +152,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
134152
auto table = inputs.get<TableConsumer>(inputMatcher)->asArrowTable();
135153
// FIXME: make the fTimestamp column configurable.
136154
auto timestampColumn = table->GetColumnByName("fTimestamp");
155+
auto reserveSize = timestampColumn->length();
137156
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
138157
"There are %zu bindings available", bindings.size());
139158
for (auto& binding : bindings) {
@@ -143,9 +162,16 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
143162
}
144163
int outputRouteIndex = bindings.at(outRouteDesc);
145164
auto& spec = helper->routes[outputRouteIndex].matcher;
146-
std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
147-
for (auto const& _ : schema->fields()) {
148-
builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
165+
auto builders = allbuilders | std::views::filter([&i](auto const& builder) { return builder.first == i; });
166+
unsigned int numBuilders = std::ranges::count_if(allbuilders, [&i](auto const& builder) { return builder.first == i; });
167+
arrow::Status status;
168+
std::ranges::for_each(builders, [&status, &reserveSize](auto& builder) {
169+
if (reserveSize > builder.second->capacity()) {
170+
status &= builder.second->Reserve(reserveSize - builder.second->capacity());
171+
}
172+
});
173+
if (!status.ok()) {
174+
throw framework::runtime_error_f("Failed to reserve arrays: ", status.ToString().c_str());
149175
}
150176

151177
for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
@@ -171,15 +197,20 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
171197
O2_SIGNPOST_START(ccdb, sid, "handlingResponses",
172198
"Got %zu responses from server.",
173199
responses.size());
174-
if (builders.size() != responses.size()) {
175-
LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size());
200+
if (numBuilders != responses.size()) {
201+
LOGP(fatal, "Not enough responses (expected {}, found {})", numBuilders, responses.size());
176202
}
177203
arrow::Status result;
178-
for (size_t bi = 0; bi < responses.size(); bi++) {
179-
auto& builder = builders[bi];
204+
205+
int bi = 0;
206+
for (auto& builder : builders) {
180207
auto& response = responses[bi];
181-
char const* address = reinterpret_cast<char const*>(response.id.value);
182-
result &= builder->Append(std::string_view(address, response.size));
208+
result &= builder.second->Append();
209+
auto* value_builder = dynamic_cast<arrow::Int64Builder*>(builder.second->value_builder());
210+
result &= value_builder->Append(response.id.handle);
211+
result &= value_builder->Append(response.id.segment);
212+
result &= value_builder->Append(response.size);
213+
++bi;
183214
}
184215
if (!result.ok()) {
185216
LOGP(fatal, "Error adding results from CCDB");
@@ -188,9 +219,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
188219
}
189220
}
190221
arrow::ArrayVector arrays;
191-
for (auto& builder : builders) {
192-
arrays.push_back(*builder->Finish());
193-
}
222+
std::ranges::for_each(builders, [&arrays](auto& builder) { arrays.push_back(*builder.second->Finish()); });
194223
auto outTable = arrow::Table::Make(schema, arrays);
195224
auto concrete = DataSpecUtils::asConcreteDataMatcher(spec);
196225
allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable);

Framework/Core/include/Framework/ASoA.h

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "Framework/ArrowTableSlicingCache.h" // IWYU pragma: export
2525
#include "Framework/SliceCache.h" // IWYU pragma: export
2626
#include "Framework/VariantHelpers.h" // IWYU pragma: export
27+
#include <fairmq/Version.h>
2728
#include <arrow/array/array_binary.h>
2829
#include <arrow/table.h> // IWYU pragma: export
2930
#include <arrow/array.h> // IWYU pragma: export
@@ -36,9 +37,16 @@
3637
#include <cstring>
3738
#include <gsl/span> // IWYU pragma: export
3839

40+
namespace fair::mq::shmem {
41+
struct MetaHeader;
42+
}
43+
3944
namespace o2::framework
4045
{
4146
using ListVector = std::vector<std::vector<int64_t>>;
47+
#if (FAIRMQ_VERSION_DEC >= 111000)
48+
using PointerReconstructor = std::function<std::byte*(fair::mq::shmem::MetaHeader&&)>;
49+
#endif
4250

4351
std::string cutString(std::string&& str);
4452
std::string strToUpper(std::string&& str);
@@ -1081,6 +1089,9 @@ concept can_bind = requires(T&& t) {
10811089
template <typename... C>
10821090
concept has_index = (is_indexing_column<C> || ...);
10831091

1092+
template <typename C>
1093+
concept needs_ptr_rec = C::needs_ptr_rec;
1094+
10841095
template <typename D, typename O, typename IP, typename... C>
10851096
struct TableIterator : IP, C... {
10861097
public:
@@ -1248,6 +1259,21 @@ struct TableIterator : IP, C... {
12481259
{
12491260
doSetCurrentInternal(internal_index_columns_t{}, table);
12501261
}
1262+
#if (FAIRMQ_VERSION_DEC >= 111000)
1263+
void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor)
1264+
{
1265+
[&pointerReconstructor, this]<typename... Cs>(framework::pack<Cs...>) {
1266+
([&pointerReconstructor, this]<typename CC>() {
1267+
if constexpr (needs_ptr_rec<CC>) {
1268+
if (pointerReconstructor) {
1269+
CC::ptrRec = &pointerReconstructor;
1270+
}
1271+
}
1272+
}.template operator()<Cs>(),
1273+
...);
1274+
}(all_columns{});
1275+
}
1276+
#endif
12511277

12521278
private:
12531279
/// Helper to move at the end of columns which actually have an iterator.
@@ -2285,7 +2311,12 @@ class Table
22852311
{
22862312
return self_t{mTable->Slice(0, 0), 0};
22872313
}
2288-
2314+
#if (FAIRMQ_VERSION_DEC >= 111000)
2315+
void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor)
2316+
{
2317+
mBegin.setPointerReconstructor(pointerReconstructor);
2318+
}
2319+
#endif
22892320
private:
22902321
template <typename T>
22912322
arrow::ChunkedArray* lookupColumn()
@@ -2464,6 +2495,56 @@ consteval static std::string_view namespace_prefix()
24642495
}; \
24652496
[[maybe_unused]] static constexpr o2::framework::expressions::BindingNode _Getter_ { _Label_, _Name_::hash, o2::framework::expressions::selectArrowType<_Type_>() }
24662497

2498+
#if (FAIRMQ_VERSION_DEC >= 111000)
2499+
#define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \
2500+
struct _Name_ : o2::soa::Column<int64_t[3], _Name_> { \
2501+
static constexpr const char* mLabel = _Label_; \
2502+
static constexpr const char* query = _CCDBQuery_; \
2503+
static constexpr const uint32_t hash = crc32(namespace_prefix<_Name_>(), std::string_view{#_Getter_}); \
2504+
static constexpr bool needs_ptr_rec = true; \
2505+
std::function<std::byte*(fair::mq::shmem::MetaHeader&&)> const* ptrRec = nullptr; \
2506+
using base = o2::soa::Column<int64_t[3], _Name_>; \
2507+
using type = int64_t[3]; \
2508+
using column_t = _Name_; \
2509+
_Name_(arrow::ChunkedArray const* column) \
2510+
: o2::soa::Column<int64_t[3], _Name_>(o2::soa::ColumnIterator<int64_t[3]>(column)) \
2511+
{ \
2512+
} \
2513+
\
2514+
_Name_() = default; \
2515+
_Name_(_Name_ const& other) = default; \
2516+
_Name_& operator=(_Name_ const& other) = default; \
2517+
\
2518+
decltype(auto) _Getter_() const \
2519+
{ \
2520+
auto& [handle, segment, size] = *mColumnIterator; \
2521+
auto span = std::span<std::byte>{(*ptrRec)(fair::mq::shmem::MetaHeader{ \
2522+
static_cast<size_t>(size), \
2523+
0, handle, 0, 0, \
2524+
static_cast<uint16_t>(segment), true}), static_cast<size_t>(size)}; \
2525+
if constexpr (std::same_as<_ConcreteType_, std::span<std::byte>>) { \
2526+
return span; \
2527+
} else { \
2528+
static std::byte* payload = nullptr; \
2529+
static _ConcreteType_* deserialised = nullptr; \
2530+
static TClass* c = TClass::GetClass(#_ConcreteType_); \
2531+
if (payload != (std::byte*)span.data()) { \
2532+
payload = (std::byte*)span.data(); \
2533+
delete deserialised; \
2534+
TBufferFile f(TBufferFile::EMode::kRead, span.size(), (char*)span.data(), kFALSE); \
2535+
deserialised = (_ConcreteType_*)soa::extractCCDBPayload((char*)payload, span.size(), c, "ccdb_object"); \
2536+
} \
2537+
return *deserialised; \
2538+
} \
2539+
} \
2540+
\
2541+
decltype(auto) \
2542+
get() const \
2543+
{ \
2544+
return _Getter_(); \
2545+
} \
2546+
};
2547+
#else
24672548
#define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \
24682549
struct _Name_ : o2::soa::Column<std::span<std::byte>, _Name_> { \
24692550
static constexpr const char* mLabel = _Label_; \
@@ -2506,6 +2587,7 @@ consteval static std::string_view namespace_prefix()
25062587
return _Getter_(); \
25072588
} \
25082589
};
2590+
#endif
25092591

25102592
#define DECLARE_SOA_CCDB_COLUMN(_Name_, _Getter_, _ConcreteType_, _CCDBQuery_) \
25112593
DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, "f" #_Name_, _Getter_, _ConcreteType_, _CCDBQuery_)
@@ -3849,7 +3931,12 @@ class FilteredBase : public T
38493931
{
38503932
return mCached;
38513933
}
3852-
3934+
#if (FAIRMQ_VERSION_DEC >= 111000)
3935+
void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor)
3936+
{
3937+
mFilteredBegin.setPointerReconstructor(pointerReconstructor);
3938+
}
3939+
#endif
38533940
private:
38543941
void resetRanges()
38553942
{

0 commit comments

Comments
 (0)