11package com .wavefront .agent .queueing ;
22
33import com .google .common .annotations .VisibleForTesting ;
4- import com .google .common .util .concurrent .RateLimiter ;
54import com .wavefront .agent .data .DataSubmissionTask ;
65import com .wavefront .agent .handlers .HandlerKey ;
76import com .wavefront .common .Managed ;
7+ import com .wavefront .common .NamedThreadFactory ;
88import com .wavefront .common .Pair ;
99import com .wavefront .common .TaggedMetricName ;
1010import com .yammer .metrics .Metrics ;
1111import com .yammer .metrics .core .Gauge ;
1212import java .io .IOException ;
1313import java .util .Comparator ;
1414import java .util .List ;
15- import java .util .Timer ;
1615import java .util .TimerTask ;
16+ import java .util .concurrent .Executors ;
17+ import java .util .concurrent .ScheduledExecutorService ;
18+ import java .util .concurrent .TimeUnit ;
1719import java .util .concurrent .atomic .AtomicBoolean ;
20+ import java .util .concurrent .atomic .AtomicInteger ;
21+ import java .util .concurrent .atomic .AtomicLong ;
1822import java .util .function .Consumer ;
1923import java .util .function .Supplier ;
2024import java .util .logging .Logger ;
@@ -32,23 +36,18 @@ public class QueueController<T extends DataSubmissionTask<T>> extends TimerTask
3236
3337 // min difference in queued timestamps for the schedule adjuster to kick in
3438 private static final int TIME_DIFF_THRESHOLD_SECS = 60 ;
35- private static final int REPORT_QUEUE_STATS_DELAY_SECS = 15 ;
3639 private static final double MIN_ADJ_FACTOR = 0.25d ;
3740 private static final double MAX_ADJ_FACTOR = 1.5d ;
41+ private static ScheduledExecutorService executor ;
3842
3943 protected final HandlerKey handlerKey ;
4044 protected final List <QueueProcessor <T >> processorTasks ;
41- @ Nullable private final Consumer <Integer > backlogSizeSink ;
4245 protected final Supplier <Long > timeProvider ;
43- protected final Timer timer ;
44-
45- @ SuppressWarnings ("UnstableApiUsage" )
46- protected final RateLimiter reportRateLimiter = RateLimiter .create (0.1 );
47-
48- private long currentWeight ;
49- private int queueSize ;
46+ private final Consumer <Integer > backlogSizeSink ;
5047
5148 private final AtomicBoolean isRunning = new AtomicBoolean (false );
49+ private final AtomicLong currentWeight = new AtomicLong ();
50+ private final AtomicInteger queueSize = new AtomicInteger ();
5251
5352 /**
5453 * @param handlerKey Pipeline handler key
@@ -79,7 +78,6 @@ public QueueController(
7978 this .processorTasks = processorTasks ;
8079 this .backlogSizeSink = backlogSizeSink ;
8180 this .timeProvider = timeProvider == null ? System ::currentTimeMillis : timeProvider ;
82- this .timer = new Timer ("timer-queuedservice-" + handlerKey .toString ());
8381
8482 Metrics .newGauge (
8583 new TaggedMetricName (
@@ -92,7 +90,7 @@ public QueueController(
9290 new Gauge <Integer >() {
9391 @ Override
9492 public Integer value () {
95- return queueSize ;
93+ return queueSize . get () ;
9694 }
9795 });
9896 Metrics .newGauge (
@@ -101,48 +99,11 @@ public Integer value() {
10199 new Gauge <Long >() {
102100 @ Override
103101 public Long value () {
104- return currentWeight ;
102+ return currentWeight . get () ;
105103 }
106104 });
107105 }
108106
109- @ Override
110- public void run () {
111- // 1. grab current queue sizes (tasks count) and report to EntityProperties
112- int backlog = processorTasks .stream ().mapToInt (x -> x .getTaskQueue ().size ()).sum ();
113- queueSize = backlog ;
114- if (backlogSizeSink != null ) {
115- backlogSizeSink .accept (backlog );
116- }
117-
118- // 2. grab queue sizes (points/etc count)
119- long totalWeight = 0L ;
120- for (QueueProcessor <T > task : processorTasks ) {
121- TaskQueue <T > taskQueue = task .getTaskQueue ();
122- if ((taskQueue != null ) && (taskQueue .weight () != null )) {
123- totalWeight += taskQueue .weight ();
124- }
125- }
126- long previousWeight = currentWeight ;
127- currentWeight = totalWeight ;
128-
129- // 3. adjust timing
130- adjustTimingFactors (processorTasks );
131-
132- // 4. print stats when there's backlog
133- if ((previousWeight != 0 ) || (currentWeight != 0 )) {
134- printQueueStats ();
135- if (currentWeight == 0 ) {
136- logger .info (
137- "["
138- + handlerKey .getHandle ()
139- + "] "
140- + handlerKey .getEntityType ()
141- + " backlog has been cleared!" );
142- }
143- }
144- }
145-
146107 /**
147108 * Compares timestamps of tasks at the head of all backing queues. If the time difference between
148109 * most recently queued head and the oldest queued head (across all backing queues) is less than
@@ -177,16 +138,34 @@ static <T extends DataSubmissionTask<T>> void adjustTimingFactors(
177138 }
178139 }
179140
141+ @ Override
142+ public void run () {
143+ // 1. grab current queue sizes (tasks count) and report to EntityProperties
144+ int backlog = processorTasks .stream ().mapToInt (x -> x .getTaskQueue ().size ()).sum ();
145+ if (backlogSizeSink != null ) {
146+ backlogSizeSink .accept (backlog );
147+ }
148+ // 2. adjust timing
149+ adjustTimingFactors (processorTasks );
150+ }
151+
180152 private void printQueueStats () {
181- long oldestTaskTimestamp =
182- processorTasks .stream ()
183- .filter (x -> x .getTaskQueue ().size () > 0 )
184- .mapToLong (QueueProcessor ::getHeadTaskTimestamp )
185- .min ()
186- .orElse (Long .MAX_VALUE );
187- //noinspection UnstableApiUsage
188- if ((oldestTaskTimestamp < timeProvider .get () - REPORT_QUEUE_STATS_DELAY_SECS * 1000 )
189- && (reportRateLimiter .tryAcquire ())) {
153+ // 1. grab current queue sizes (tasks count)
154+ int backlog = processorTasks .stream ().mapToInt (x -> x .getTaskQueue ().size ()).sum ();
155+ queueSize .set (backlog );
156+
157+ // 2. grab queue sizes (points/etc count)
158+ long actualWeight = 0L ;
159+ for (QueueProcessor <T > task : processorTasks ) {
160+ TaskQueue <T > taskQueue = task .getTaskQueue ();
161+ if ((taskQueue != null ) && (taskQueue .weight () != null )) {
162+ actualWeight += taskQueue .weight ();
163+ }
164+ }
165+ long previousWeight = currentWeight .getAndSet (actualWeight );
166+
167+ // 4. print stats when there's backlog
168+ if ((previousWeight != 0 ) || (actualWeight != 0 )) {
190169 logger .info (
191170 "["
192171 + handlerKey .getHandle ()
@@ -198,35 +177,59 @@ private void printQueueStats() {
198177 + currentWeight
199178 + " "
200179 + handlerKey .getEntityType ());
180+ if (actualWeight == 0 ) {
181+ logger .info (
182+ "["
183+ + handlerKey .getHandle ()
184+ + "] "
185+ + handlerKey .getEntityType ()
186+ + " backlog has been cleared!" );
187+ }
201188 }
202189 }
203190
204191 @ Override
205192 public void start () {
206193 if (isRunning .compareAndSet (false , true )) {
207- timer .scheduleAtFixedRate (this , 1000 , 1000 );
194+ if ((executor == null ) || (executor .isShutdown ())) { // need it for unittests
195+ executor = Executors .newScheduledThreadPool (2 , new NamedThreadFactory ("QueueController" ));
196+ }
197+ executor .scheduleAtFixedRate (this ::run , 1 , 1 , TimeUnit .SECONDS );
198+ executor .scheduleAtFixedRate (this ::printQueueStats , 10 , 10 , TimeUnit .SECONDS );
208199 processorTasks .forEach (QueueProcessor ::start );
209200 }
210201 }
211202
212203 @ Override
213204 public void stop () {
214205 if (isRunning .compareAndSet (true , false )) {
215- timer . cancel ();
206+ executor . shutdown ();
216207 processorTasks .forEach (QueueProcessor ::stop );
217208 }
218209 }
219210
220211 public void truncateBuffers () {
221212 processorTasks .forEach (
222213 tQueueProcessor -> {
223- System .out .print ("-- size: " + tQueueProcessor .getTaskQueue ().size ());
214+ logger .info (
215+ "["
216+ + handlerKey .getHandle ()
217+ + "] "
218+ + handlerKey .getEntityType ()
219+ + "-- size before truncate: "
220+ + tQueueProcessor .getTaskQueue ().size ());
224221 try {
225222 tQueueProcessor .getTaskQueue ().clear ();
226223 } catch (IOException e ) {
227224 e .printStackTrace ();
228225 }
229- System .out .println ("--> size: " + tQueueProcessor .getTaskQueue ().size ());
226+ logger .info (
227+ "["
228+ + handlerKey .getHandle ()
229+ + "] "
230+ + handlerKey .getEntityType ()
231+ + "--> size after truncate: "
232+ + tQueueProcessor .getTaskQueue ().size ());
230233 });
231234 }
232235}
0 commit comments