Skip to content

Commit 58ff020

Browse files
authored
Enable draining in READY for input proxies (#1426)
1 parent 3469e1f commit 58ff020

1 file changed

Lines changed: 28 additions & 10 deletions

File tree

Framework/src/InfrastructureGenerator.cxx

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,16 @@ struct DataSamplingPolicySpec {
6868
std::string remoteMachine;
6969
};
7070

71+
void enableDraining(framework::Options& options)
72+
{
73+
if (auto readyStatePolicy = std::find_if(options.begin(), options.end(), [](const auto& option) { return option.name == "ready-state-policy"; });
74+
readyStatePolicy != options.end()) {
75+
readyStatePolicy->defaultValue = "drain";
76+
} else {
77+
ILOG(Error) << "Could not find 'ready-state-policy' option to enable draining in READY" << ENDM;
78+
}
79+
}
80+
7181
framework::WorkflowSpec InfrastructureGenerator::generateStandaloneInfrastructure(const boost::property_tree::ptree& configurationTree)
7282
{
7383
printVersion();
@@ -385,12 +395,15 @@ void InfrastructureGenerator::generateDataSamplingPolicyRemoteProxyConnect(frame
385395
std::string channelConfig = "name=" + channelName + ",type=sub,method=connect,address=tcp://" +
386396
localMachine + ":" + localPort + ",rateLogging=60,transport=zeromq,rcvBufSize=1";
387397

388-
workflow.emplace_back(specifyExternalFairMQDeviceProxy(
398+
auto proxy = specifyExternalFairMQDeviceProxy(
389399
proxyName.c_str(),
390400
outputSpecs,
391401
channelConfig.c_str(),
392-
dplModelAdaptor()));
393-
workflow.back().labels.emplace_back(control == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
402+
dplModelAdaptor());
403+
proxy.labels.emplace_back(control == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
404+
// if not in RUNNING, we should drop all the incoming messages, we set the corresponding proxy option.
405+
enableDraining(proxy.options);
406+
workflow.emplace_back(std::move(proxy));
394407
}
395408

396409
void InfrastructureGenerator::generateDataSamplingPolicyLocalProxyConnect(framework::WorkflowSpec& workflow,
@@ -428,12 +441,15 @@ void InfrastructureGenerator::generateDataSamplingPolicyRemoteProxyBind(framewor
428441

429442
std::string channelConfig = "name=" + channelName + ",type=sub,method=bind,address=tcp://*:" + remotePort + ",rateLogging=60,transport=zeromq,rcvBufSize=1";
430443

431-
workflow.emplace_back(specifyExternalFairMQDeviceProxy(
444+
auto proxy = specifyExternalFairMQDeviceProxy(
432445
proxyName.c_str(),
433446
outputSpecs,
434447
channelConfig.c_str(),
435-
dplModelAdaptor()));
436-
workflow.back().labels.emplace_back(control == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
448+
dplModelAdaptor());
449+
proxy.labels.emplace_back(control == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
450+
// if not in RUNNING, we should drop all the incoming messages, we set the corresponding proxy option.
451+
enableDraining(proxy.options);
452+
workflow.emplace_back(std::move(proxy));
437453
}
438454

439455
void InfrastructureGenerator::generateLocalTaskLocalProxy(framework::WorkflowSpec& workflow, size_t id,
@@ -471,14 +487,16 @@ void InfrastructureGenerator::generateLocalTaskRemoteProxy(framework::WorkflowSp
471487
std::string channelConfig = "name=" + channelName + ",type=sub,method=bind,address=tcp://*:" + remotePort +
472488
",rateLogging=60,transport=zeromq,rcvBufSize=1";
473489

474-
workflow.emplace_back(specifyExternalFairMQDeviceProxy(
490+
auto proxy = specifyExternalFairMQDeviceProxy(
475491
proxyName.c_str(),
476492
proxyOutputs,
477493
channelConfig.c_str(),
478-
dplModelAdaptor()));
479-
workflow.back().labels.emplace_back(taskSpec.localControl == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
494+
dplModelAdaptor());
495+
proxy.labels.emplace_back(taskSpec.localControl == "odc" ? ecs::preserveRawChannelsLabel : ecs::uniqueProxyLabel);
496+
// if not in RUNNING, we should drop all the incoming messages, we set the corresponding proxy option.
497+
enableDraining(proxy.options);
498+
workflow.emplace_back(std::move(proxy));
480499
}
481-
482500
void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow, std::string taskName,
483501
size_t numberOfLocalMachines, double cycleDurationSeconds,
484502
std::string mergingMode, size_t resetAfterCycles, std::string monitoringUrl,

0 commit comments

Comments
 (0)