@@ -47,14 +47,7 @@ using namespace AliceO2::Common;
4747
4848TaskRunner::TaskRunner (const std::string& taskName, const std::string& configurationSource, size_t id)
4949 : mDeviceName (createTaskRunnerIdString() + " -" + taskName),
50- mTask (nullptr ),
51- mResetAfterPublish(false ),
52- mMonitorObjectsSpec({ " mo" }, createTaskDataOrigin(), createTaskDataDescription(taskName), id),
53- mNumberBlocks(0 ),
54- mLastNumberObjects(0 ),
55- mCycleOn(false ),
56- mCycleNumber(0 ),
57- mTotalNumberObjectsPublished(0 )
50+ mMonitorObjectsSpec ({ " mo" }, createTaskDataOrigin(), createTaskDataDescription(taskName), id)
5851{
5952 // setup configuration
6053 mConfigFile = ConfigurationFactory::getConfiguration (configurationSource);
@@ -114,15 +107,6 @@ void TaskRunner::run(ProcessingContext& pCtx)
114107 startCycle ();
115108 }
116109 }
117-
118- // if 10 s we publish stats
119- if (mStatsTimer .isTimeout ()) {
120- double current = mStatsTimer .getTime ();
121- int objectsPublished = (mTotalNumberObjectsPublished - mLastNumberObjects );
122- mLastNumberObjects = mTotalNumberObjectsPublished ;
123- mCollector ->send ({ objectsPublished / current, " QC_task_Rate_objects_published_per_10_seconds" });
124- mStatsTimer .increment ();
125- }
126110}
127111
128112CompletionPolicy::CompletionOp TaskRunner::completionPolicyCallback (gsl::span<PartRef const > const & inputs)
@@ -191,9 +175,6 @@ void TaskRunner::start()
191175{
192176 startOfActivity ();
193177
194- mStatsTimer .reset (10000000 ); // 10 s.
195- mLastNumberObjects = 0 ;
196-
197178 if (mTaskConfig .maxNumberCycles >= 0 && mCycleNumber >= mTaskConfig .maxNumberCycles ) {
198179 LOG (INFO ) << " The maximum number of cycles (" << mTaskConfig .maxNumberCycles << " ) has been reached." ;
199180 return ;
@@ -343,52 +324,28 @@ void TaskRunner::endOfActivity()
343324
344325 double rate = mTotalNumberObjectsPublished / mTimerTotalDurationActivity .getTime ();
345326 mCollector ->send ({ rate, " QC_task_Rate_objects_published_per_second_whole_run" });
346- // mCollector->send({ ba::mean(mPCpus), "QC_task_Mean_pcpu_whole_run" });
347- // mCollector->send({ ba::mean(mPMems), "QC_task_Mean_pmem_whole_run" });
348327}
349328
350329void TaskRunner::startCycle ()
351330{
352331 QcInfoLogger::GetInstance () << " cycle " << mCycleNumber << AliceO2::InfoLogger::InfoLogger::endm;
353332 mTask ->startOfCycle ();
354333 mNumberBlocks = 0 ;
334+ mNumberObjectsPublishedInCycle = 0 ;
335+ mTimerDurationCycle .reset ();
355336 mCycleOn = true ;
356337}
357338
358339void TaskRunner::finishCycle (DataAllocator& outputs)
359340{
360341 mTask ->endOfCycle ();
361342
362- double durationCycle = 0 ; // (boost::posix_time::seconds(mTaskConfig.cycleDurationSeconds) -
363- // mCycleTimer->expires_from_now()).total_nanoseconds() / double(1e9);
364- // mCycleTimer->expires_at(mCycleTimer->expires_at() +
365- // boost::posix_time::seconds(mTaskConfig.cycleDurationSeconds));
343+ mNumberObjectsPublishedInCycle += publish (outputs);
344+ mTotalNumberObjectsPublished += mNumberObjectsPublishedInCycle ;
366345
367- // publication
368- unsigned long numberObjectsPublished = publish (outputs);
346+ publishCycleStats ();
369347 mObjectsManager ->updateServiceDiscovery ();
370348
371- // monitoring metrics
372- double durationPublication = 0 ; // (boost::posix_time::seconds(mTaskConfig.cycleDurationSeconds) -
373- // mCycleTimer->expires_from_now()).total_nanoseconds() / double(1e9);
374- mCollector ->send ({ mNumberBlocks , " QC_task_Numberofblocks_in_cycle" });
375- mCollector ->send ({ durationCycle, " QC_task_Module_cycle_duration" });
376- mCollector ->send ({ durationPublication, " QC_task_Publication_duration" });
377- mCollector ->send ({ (int )numberObjectsPublished,
378- " QC_task_Number_objects_published_in_cycle" }); // cast due to Monitoring accepting only int
379- double rate = numberObjectsPublished / (durationCycle + durationPublication);
380- mCollector ->send ({ rate, " QC_task_Rate_objects_published_per_second" });
381- mTotalNumberObjectsPublished += numberObjectsPublished;
382- // std::vector<std::string> pidStatus = mMonitor->getPIDStatus(::getpid());
383- // mPCpus(std::stod(pidStatus[3]));
384- // mPMems(std::stod(pidStatus[4]));
385- double whole_run_rate = mTotalNumberObjectsPublished / mTimerTotalDurationActivity .getTime ();
386- mCollector ->send ({ mTotalNumberObjectsPublished , " QC_task_Total_objects_published_whole_run" });
387- mCollector ->send ({ mTimerTotalDurationActivity .getTime (), " QC_task_Total_duration_activity_whole_run" });
388- mCollector ->send ({ whole_run_rate, " QC_task_Rate_objects_published_per_second_whole_run" });
389- // mCollector->send({std::stod(pidStatus[3]), "QC_task_Mean_pcpu_whole_run"});
390- // mCollector->send({ ba::mean(mPMems), "QC_task_Mean_pmem_whole_run" });
391-
392349 mCycleNumber ++;
393350 mCycleOn = false ;
394351
@@ -398,17 +355,41 @@ void TaskRunner::finishCycle(DataAllocator& outputs)
398355 }
399356}
400357
401- unsigned long TaskRunner::publish (DataAllocator& outputs)
358+ void TaskRunner::publishCycleStats ()
359+ {
360+ double cycleDuration = mTimerDurationCycle .getTime ();
361+ double rate = mNumberObjectsPublishedInCycle / (cycleDuration + mLastPublicationDuration );
362+ double wholeRunRate = mTotalNumberObjectsPublished / mTimerTotalDurationActivity .getTime ();
363+ double totalDurationActivity = mTimerTotalDurationActivity .getTime ();
364+
365+ // monitoring metrics
366+ mCollector ->send ({ mNumberBlocks , " QC_task_Numberofblocks_in_cycle" });
367+ mCollector ->send ({ cycleDuration, " QC_task_Module_cycle_duration" });
368+ mCollector ->send ({ mLastPublicationDuration , " QC_task_Publication_duration" });
369+ mCollector ->send ({ mNumberObjectsPublishedInCycle , " QC_task_Number_objects_published_in_cycle" });
370+ mCollector ->send ({ rate, " QC_task_Rate_objects_published_per_second" });
371+ mCollector ->send ({ mTotalNumberObjectsPublished , " QC_task_Total_objects_published_whole_run" });
372+ mCollector ->send ({ totalDurationActivity, " QC_task_Total_duration_activity_whole_run" });
373+ mCollector ->send ({ wholeRunRate, " QC_task_Rate_objects_published_per_second_whole_run" });
374+ }
375+
376+ int TaskRunner::publish (DataAllocator& outputs)
402377{
378+ AliceO2::Common::Timer publicationDurationTimer;
379+
380+ TObjArray* array = mObjectsManager ->getNonOwningArray ();
381+ int objectsPublished = array->GetEntries ();
382+
403383 auto concreteOutput = framework::DataSpecUtils::asConcreteDataMatcher (mMonitorObjectsSpec );
404384 outputs.adopt (
405385 Output{ concreteOutput.origin ,
406386 concreteOutput.description ,
407387 concreteOutput.subSpec ,
408388 mMonitorObjectsSpec .lifetime },
409- dynamic_cast <TObject*>(mObjectsManager -> getNonOwningArray () ));
389+ dynamic_cast <TObject*>(array ));
410390
411- return 1 ;
391+ mLastPublicationDuration = publicationDurationTimer.getTime ();
392+ return objectsPublished;
412393}
413394
414395} // namespace o2::quality_control::core
0 commit comments