forked from AliceO2Group/AliceO2
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDataProcessingContext.h
More file actions
140 lines (121 loc) · 6.5 KB
/
Copy pathDataProcessingContext.h
File metadata and controls
140 lines (121 loc) · 6.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef O2_FRAMEWORK_DATAPROCESSORCONTEXT_H_
#define O2_FRAMEWORK_DATAPROCESSORCONTEXT_H_
#include "Framework/DataRelayer.h"
#include "Framework/AlgorithmSpec.h"
#include <atomic>
#include <functional>
namespace o2::framework
{
struct DeviceContext;
struct ServiceRegistry;
struct DataAllocator;
struct DataProcessorSpec;
enum struct ForwardPolicy {
AtInjection,
AtCompletionPolicySatisified,
AfterProcessing
};
struct DataProcessorContext {
DataProcessorContext(DataProcessorContext const&) = delete;
DataProcessorContext() = default;
std::atomic<bool> allDone = false;
/// Latest run number we processed globally for this DataProcessor.
int64_t lastRunNumberProcessed = -1;
// These are pointers to the one owned by the DataProcessingDevice
// but they are fully reentrant / thread safe and therefore can
// be accessed without a lock.
// FIXME: move stuff here from the list below... ;-)
ServiceRegistry* registry = nullptr;
std::vector<ExpirationHandler> expirationHandlers;
AlgorithmSpec::InitCallback init;
AlgorithmSpec::ProcessCallback statefulProcess;
AlgorithmSpec::ProcessCallback statelessProcess;
AlgorithmSpec::ErrorCallback error = nullptr;
AlgorithmSpec::InitErrorCallback initError = nullptr;
DataProcessorSpec* spec = nullptr; /// Invoke callbacks to be executed in PreRun(), before the User Start callbacks
/// Invoke callbacks to be executed before starting the processing loop
void preStartCallbacks(ServiceRegistryRef);
/// Invoke callbacks to be executed before every process method invokation
void preProcessingCallbacks(ProcessingContext&);
/// Invoke callbacks to be executed after the outputs have been created
/// by the processing, but before the post processing callbacks.
void finaliseOutputsCallbacks(ProcessingContext&);
/// Invoke callbacks to be executed after every process method invokation
void postProcessingCallbacks(ProcessingContext&);
/// Invoke callbacks to be executed before every dangling check
void preDanglingCallbacks(DanglingContext&);
/// Invoke callbacks to be executed after every dangling check
void postDanglingCallbacks(DanglingContext&);
/// Invoke callbacks to be executed before every EOS user callback invokation
void preEOSCallbacks(EndOfStreamContext&);
/// Invoke callbacks to be executed after every EOS user callback invokation
void postEOSCallbacks(EndOfStreamContext&);
/// Invoke callbacks to monitor inputs after dispatching, regardless of them
/// being discarded, consumed or processed.
void postDispatchingCallbacks(ProcessingContext&);
/// Callback invoked after the late forwarding has been done
void postForwardingCallbacks(ProcessingContext&);
/// Invoke callbacks on stop.
void postStopCallbacks(ServiceRegistryRef);
/// Invoke callbacks before we enter the event loop
void preLoopCallbacks(ServiceRegistryRef);
/// Invoke callbacks on exit.
/// Note how this is a static helper because otherwise we would need to
/// handle differently the deletion of the DataProcessingContext itself.
static void preExitCallbacks(std::vector<ServiceExitHandle>, ServiceRegistryRef);
/// Invoke whenever we get a new DomainInfo message
void domainInfoUpdatedCallback(ServiceRegistryRef, size_t oldestPossibleTimeslice, ChannelIndex channelIndex);
/// Invoke before sending messages @a parts on a channel @a channelindex
void preSendingMessagesCallbacks(ServiceRegistryRef, fair::mq::Parts& parts, ChannelIndex channelindex);
/// Callback for services to be executed before every processing.
/// The callback MUST BE REENTRANT and threadsafe.
mutable std::vector<ServiceProcessingHandle> preProcessingHandlers;
/// Callback for services to be executed after every processing.
/// The callback MUST BE REENTRANT and threadsafe.
mutable std::vector<ServiceProcessingHandle> finaliseOutputsHandles;
/// Callback for services to be executed after every processing.
/// The callback MUST BE REENTRANT and threadsafe.
mutable std::vector<ServiceProcessingHandle> postProcessingHandlers;
/// Callbacks for services to be executed before every dangling check
mutable std::vector<ServiceDanglingHandle> preDanglingHandles;
/// Callbacks for services to be executed after every dangling check
mutable std::vector<ServiceDanglingHandle> postDanglingHandles;
/// Callbacks for services to be executed before every EOS user callback invokation
mutable std::vector<ServiceEOSHandle> preEOSHandles;
/// Callbacks for services to be executed after every EOS user callback invokation
mutable std::vector<ServiceEOSHandle> postEOSHandles;
/// Callbacks for services to be executed after every dispatching
mutable std::vector<ServiceDispatchingHandle> postDispatchingHandles;
/// Callbacks for services to be executed after every dispatching
mutable std::vector<ServiceForwardingHandle> postForwardingHandles;
/// Callbacks for services to be executed before Start
mutable std::vector<ServiceStartHandle> preStartHandles;
/// Callbacks for services to be executed on the Stop transition
mutable std::vector<ServiceStopHandle> postStopHandles;
/// Callbacks for services to be executed on exit
mutable std::vector<ServiceExitHandle> preExitHandles;
/// Callbacks for services to be executed on exit
mutable std::vector<ServiceDomainInfoHandle> domainInfoHandles;
/// Callbacks for services to be executed before sending messages
mutable std::vector<ServicePreSendingMessagesHandle> preSendingMessagesHandles;
/// Callbacks for services to be executed before we enter the event loop
mutable std::vector<ServicePreLoopHandle> preLoopHandles;
/// Wether or not the associated DataProcessor can forward things early
ForwardPolicy forwardPolicy = ForwardPolicy::AtInjection;
bool isSink = false;
bool balancingInputs = true;
std::function<void(o2::framework::RuntimeErrorRef e, InputRecord& record)> errorHandling;
std::function<void(o2::framework::RuntimeErrorRef e)> initErrorHandling;
};
} // namespace o2::framework
#endif // O2_FRAMEWORK_DATAPROCESSINGCONTEXT_H_