Skip to content

Commit 55d4b7b

Browse files
authored
Merge fe755c4 into sapling-pr-archive-ktf
2 parents b7b1016 + fe755c4 commit 55d4b7b

3 files changed

Lines changed: 60 additions & 0 deletions

File tree

Framework/Core/src/CommonServices.cxx

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,46 @@ auto decongestionCallbackOrdered = [](AsyncTask& task, size_t id) -> void {
589589
}
590590
};
591591

592+
// Callback for consumeWhenPastOldestPossibleTimeframe.
593+
// Runs in the async queue at the beginning of the next iteration,
594+
// after Retry slots unblocked by an oldestPossibleInput change have
595+
// been consumed and freed. Rescans all slots and forwards the
596+
// (now up-to-date) oldestPossibleOutput downstream.
597+
auto decongestionCallbackPastOldest = [](AsyncTask& task, size_t id) -> void {
598+
auto& ref = task.user<DecongestionContext>().ref;
599+
600+
auto& decongestion = ref.get<DecongestionService>();
601+
auto& timesliceIndex = ref.get<TimesliceIndex>();
602+
auto& relayer = ref.get<DataRelayer>();
603+
auto& proxy = ref.get<FairMQDeviceProxy>();
604+
O2_SIGNPOST_ID_GENERATE(cid, async_queue);
605+
606+
timesliceIndex.rescan();
607+
timesliceIndex.updateOldestPossibleOutput(decongestion.nextEnumerationTimesliceRewinded);
608+
auto oldestPossibleOutput = relayer.getOldestPossibleOutput();
609+
610+
if (oldestPossibleOutput.timeslice.value <= decongestion.lastTimeslice) {
611+
O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice",
612+
"consumeWhenPastOldestPossibleTimeframe: not forwarding already sent value %" PRIu64,
613+
(uint64_t)oldestPossibleOutput.timeslice.value);
614+
return;
615+
}
616+
O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "oldest_possible_timeslice",
617+
"consumeWhenPastOldestPossibleTimeframe: forwarding oldest possible timeslice %" PRIu64,
618+
(uint64_t)oldestPossibleOutput.timeslice.value);
619+
DataProcessingHelpers::broadcastOldestPossibleTimeslice(ref, oldestPossibleOutput.timeslice.value);
620+
621+
for (int fi = 0; fi < proxy.getNumForwardChannels(); fi++) {
622+
auto& info = proxy.getForwardChannelInfo(ChannelIndex{fi});
623+
auto& state = proxy.getForwardChannelState(ChannelIndex{fi});
624+
if (info.channelType != ChannelAccountingType::DPL) {
625+
continue;
626+
}
627+
DataProcessingHelpers::sendOldestPossibleTimeframe(ref, info, state, oldestPossibleOutput.timeslice.value);
628+
}
629+
decongestion.lastTimeslice = oldestPossibleOutput.timeslice.value;
630+
};
631+
592632
// Decongestion service
593633
// If we do not have any Timeframe input, it means we must be creating timeslices
594634
// in order and that we should propagate the oldest possible timeslice at the end
@@ -712,6 +752,22 @@ o2::framework::ServiceSpec
712752
timesliceIndex.rescan();
713753
}
714754

755+
// When consumeWhenPastOldestPossibleTimeframe is active, we always
756+
// schedule the callback even when oldestPossibleOutput has not changed
757+
// yet. Retry slots held by this policy will be consumed after this
758+
// domainInfoUpdated call (once getReadyToProcess re-checks them), and
759+
// the callback — running in the next iteration — will recompute
760+
// oldestPossibleOutput and forward the updated value downstream.
761+
if (decongestion.consumeWhenPastOldestPossibleTimeframeActive) {
762+
auto& queue = services.get<AsyncQueue>();
763+
AsyncQueueHelpers::post(
764+
queue, AsyncTask{.timeslice = TimesliceId{oldestPossibleTimeslice},
765+
.id = decongestion.oldestPossibleTimesliceTask,
766+
.debounce = -1,
767+
.callback = decongestionCallbackPastOldest}
768+
.user<DecongestionContext>({.ref = services, .oldestPossibleOutput = oldestPossibleOutput}));
769+
}
770+
715771
if (oldestPossibleOutput.timeslice.value == decongestion.lastTimeslice) {
716772
O2_SIGNPOST_EVENT_EMIT(data_processor_context, cid, "oldest_possible_timeslice", "Synchronous: Not sending already sent value: %" PRIu64, (uint64_t)oldestPossibleOutput.timeslice.value);
717773
return;

Framework/Core/src/CompletionPolicyHelpers.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,8 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAnyZeroCount(const char* na
267267
CompletionPolicy CompletionPolicyHelpers::consumeWhenPastOldestPossibleTimeframe(const char* name, CompletionPolicy::Matcher matcher)
268268
{
269269
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const&, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
270+
auto& decongestionService = ref.get<DecongestionService>();
271+
decongestionService.consumeWhenPastOldestPossibleTimeframeActive = true;
270272
size_t currentTimeslice = -1;
271273
for (auto& input : inputs) {
272274
if (input.header == nullptr) {

Framework/Core/src/DecongestionService.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ struct DecongestionService {
3333
int64_t nextTimeslice = 0;
3434
/// Ordered completion policy is active.
3535
bool orderedCompletionPolicyActive = false;
36+
/// consumeWhenPastOldestPossibleTimeframe completion policy is active.
37+
bool consumeWhenPastOldestPossibleTimeframeActive = false;
3638
// Task to enqueue the oldest possible timeslice propagation
3739
// at the end of any processing chain.
3840
o2::framework::AsyncTaskId oldestPossibleTimesliceTask = {0};

0 commit comments

Comments
 (0)