3333#include < Framework/DataSpecUtils.h>
3434#include < Framework/ExternalFairMQDeviceProxy.h>
3535#include < Framework/DataDescriptorQueryBuilder.h>
36- #include < Framework/O2ControlLabels .h>
36+ #include < Framework/O2ControlParameters .h>
3737#include < Mergers/MergerInfrastructureBuilder.h>
3838#include < Mergers/MergerBuilder.h>
3939#include < DataSampling/DataSampling.h>
@@ -55,7 +55,8 @@ using SubSpec = o2::header::DataHeader::SubSpecificationType;
5555namespace o2 ::quality_control::core
5656{
5757
58- uint16_t defaultPolicyPort = 42349 ;
58+ constexpr uint16_t defaultPolicyPort = 42349 ;
59+ constexpr auto proxyMemoryKillThresholdMB = " 5000" ;
5960
6061struct DataSamplingPolicySpec {
6162 DataSamplingPolicySpec (std::string name, std::string control, std::string remoteMachine = " " )
@@ -386,6 +387,9 @@ void InfrastructureGenerator::generateDataSamplingPolicyLocalProxyBind(framework
386387 channelConfig.c_str (),
387388 channelSelector));
388389 workflow.back ().labels .emplace_back (control == " odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
390+ if (getenv (" O2_QC_KILL_PROXIES" ) != nullptr ) {
391+ workflow.back ().metadata .push_back (DataProcessorMetadata{ ecs::privateMemoryKillThresholdMB, proxyMemoryKillThresholdMB });
392+ }
389393}
390394
391395void InfrastructureGenerator::generateDataSamplingPolicyRemoteProxyConnect (framework::WorkflowSpec& workflow,
@@ -410,6 +414,9 @@ void InfrastructureGenerator::generateDataSamplingPolicyRemoteProxyConnect(frame
410414 // if not in RUNNING, we should drop all the incoming messages, we set the corresponding proxy option.
411415 enableDraining (proxy.options );
412416 workflow.emplace_back (std::move (proxy));
417+ if (getenv (" O2_QC_KILL_PROXIES" ) != nullptr ) {
418+ workflow.back ().metadata .push_back (DataProcessorMetadata{ ecs::privateMemoryKillThresholdMB, proxyMemoryKillThresholdMB });
419+ }
413420}
414421
415422void InfrastructureGenerator::generateDataSamplingPolicyLocalProxyConnect (framework::WorkflowSpec& workflow,
@@ -434,6 +441,9 @@ void InfrastructureGenerator::generateDataSamplingPolicyLocalProxyConnect(framew
434441 channelConfig.c_str (),
435442 channelSelector));
436443 workflow.back ().labels .emplace_back (control == " odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
444+ if (getenv (" O2_QC_KILL_PROXIES" ) != nullptr ) {
445+ workflow.back ().metadata .push_back (DataProcessorMetadata{ ecs::privateMemoryKillThresholdMB, proxyMemoryKillThresholdMB });
446+ }
437447}
438448
439449void InfrastructureGenerator::generateDataSamplingPolicyRemoteProxyBind (framework::WorkflowSpec& workflow,
@@ -455,6 +465,9 @@ void InfrastructureGenerator::generateDataSamplingPolicyRemoteProxyBind(framewor
455465 proxy.labels .emplace_back (control == " odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
456466 // if not in RUNNING, we should drop all the incoming messages, we set the corresponding proxy option.
457467 enableDraining (proxy.options );
468+ if (getenv (" O2_QC_KILL_PROXIES" ) != nullptr ) {
469+ proxy.metadata .push_back (DataProcessorMetadata{ ecs::privateMemoryKillThresholdMB, proxyMemoryKillThresholdMB });
470+ }
458471 workflow.emplace_back (std::move (proxy));
459472}
460473
@@ -475,6 +488,9 @@ void InfrastructureGenerator::generateLocalTaskLocalProxy(framework::WorkflowSpe
475488 { proxyInput },
476489 channelConfig.c_str ()));
477490 workflow.back ().labels .emplace_back (taskSpec.localControl == " odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
491+ if (getenv (" O2_QC_KILL_PROXIES" ) != nullptr ) {
492+ workflow.back ().metadata .push_back (DataProcessorMetadata{ ecs::privateMemoryKillThresholdMB, proxyMemoryKillThresholdMB });
493+ }
478494}
479495
480496void InfrastructureGenerator::generateLocalTaskRemoteProxy (framework::WorkflowSpec& workflow, const TaskSpec& taskSpec, size_t numberOfLocalMachines)
@@ -501,6 +517,9 @@ void InfrastructureGenerator::generateLocalTaskRemoteProxy(framework::WorkflowSp
501517 proxy.labels .emplace_back (taskSpec.localControl == " odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
502518 // if not in RUNNING, we should drop all the incoming messages, we set the corresponding proxy option.
503519 enableDraining (proxy.options );
520+ if (getenv (" O2_QC_KILL_PROXIES" ) != nullptr ) {
521+ proxy.metadata .push_back (DataProcessorMetadata{ ecs::privateMemoryKillThresholdMB, proxyMemoryKillThresholdMB });
522+ }
504523 workflow.emplace_back (std::move (proxy));
505524}
506525void InfrastructureGenerator::generateMergers (framework::WorkflowSpec& workflow, const std::string& taskName,
0 commit comments