Skip to content

Commit 261a809

Browse files
authored
Merge e112181 into sapling-pr-archive-ktf
2 parents 6cfeb29 + e112181 commit 261a809

4 files changed

Lines changed: 9 additions & 6 deletions

File tree

Framework/AnalysisSupport/src/AODWriterHelpers.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ AlgorithmSpec AODWriterHelpers::getOutputTTreeWriter(ConfigContext const& ctx)
154154
}
155155

156156
// skip non-AOD refs
157-
if (!DataSpecUtils::partialMatch(*ref.spec, AODOrigins)) {
157+
if (!DataSpecUtils::partialMatch(*ref.spec, writableAODOrigins)) {
158158
continue;
159159
}
160160
startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref)->startTime;

Framework/Core/include/Framework/AnalysisSupportHelpers.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
namespace o2::framework
2222
{
2323
static constexpr std::array<header::DataOrigin, 5> AODOrigins{header::DataOrigin{"AOD"}, header::DataOrigin{"AOD1"}, header::DataOrigin{"AOD2"}, header::DataOrigin{"EMB"}, header::DataOrigin{"AMD"}};
24+
static constexpr std::array<header::DataOrigin, 3> writableAODOrigins{header::DataOrigin{"AOD"}, header::DataOrigin{"AOD1"}, header::DataOrigin{"AOD2"}};
2425

2526
class DataOutputDirector;
2627
struct ConfigContext;

Framework/Core/src/AnalysisSupportHelpers.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ std::shared_ptr<DataOutputDirector> AnalysisSupportHelpers::getDataOutputDirecto
102102
// use the dangling outputs
103103
std::vector<InputSpec> danglingOutputs;
104104
for (auto ii = 0u; ii < OutputsInputs.size(); ii++) {
105-
if (DataSpecUtils::partialMatch(OutputsInputs[ii], AODOrigins) && isDangling[ii]) {
105+
if (DataSpecUtils::partialMatch(OutputsInputs[ii], writableAODOrigins) && isDangling[ii]) {
106106
danglingOutputs.emplace_back(OutputsInputs[ii]);
107107
}
108108
}

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,8 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef running, ServiceRegi
212212
});
213213
}
214214

215-
// Callback to execute the processing. Notice how the data is
216-
// is a vector of DataProcessorContext so that we can index the correct
217-
// one with the thread id. For the moment we simply use the first one.
215+
// Callback to execute the processing. Receives and relays data (doPrepare)
216+
// happens on the main thread before this is queued, so we only dispatch here.
218217
void run_callback(uv_work_t* handle)
219218
{
220219
auto* task = (TaskStreamInfo*)handle->data;
@@ -223,7 +222,6 @@ void run_callback(uv_work_t* handle)
223222
auto& dataProcessorContext = ref.get<DataProcessorContext>();
224223
O2_SIGNPOST_ID_FROM_POINTER(sid, device, &dataProcessorContext);
225224
O2_SIGNPOST_START(device, sid, "run_callback", "Starting run callback on stream %d", task->id.index);
226-
DataProcessingDevice::doPrepare(ref);
227225
DataProcessingDevice::doRun(ref);
228226
O2_SIGNPOST_END(device, sid, "run_callback", "Done processing data for stream %d", task->id.index);
229227
}
@@ -1333,6 +1331,10 @@ void DataProcessingDevice::Run()
13331331
handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
13341332
}
13351333

1334+
// Receive and relay incoming data on the main thread so that I/O
1335+
// overlaps with computation running concurrently on work threads.
1336+
DataProcessingDevice::doPrepare(ref);
1337+
13361338
assert(mStreams.size() == mHandles.size());
13371339
/// Decide which task to use
13381340
TaskStreamRef streamRef{-1};

0 commit comments

Comments
 (0)