11package de .marcelsauer .profiler .processor ;
22
3+ import de .marcelsauer .profiler .config .Config ;
34import org .apache .log4j .Logger ;
45
56import java .util .ArrayList ;
67import java .util .Collection ;
8+ import java .util .HashSet ;
79import java .util .List ;
10+ import java .util .Set ;
811import java .util .concurrent .*;
912import java .util .concurrent .atomic .AtomicBoolean ;
1013import java .util .concurrent .atomic .AtomicInteger ;
@@ -28,6 +31,9 @@ public abstract class AbstractAsyncStackProcessor implements StackProcessor {
2831 private final BlockingQueue <RecordingEvent > workerQueue = new ArrayBlockingQueue <>(CAPACITY );
2932 private final AtomicBoolean started = new AtomicBoolean (false );
3033 private final List <RecordingEvent > drainBuffer = new ArrayList <>(DRAIN_BATCH_SIZE );
34+ private final List <RecordingEvent > dedupBuffer = new ArrayList <>(DRAIN_BATCH_SIZE );
35+ private final Set <Integer > seenStackHashes = new HashSet <>();
36+ private long nextStackHashResetAtMillis = 0L ;
3137
3238 /**
3339 * acting like cron
@@ -56,6 +62,7 @@ public void process(RecordingEvent event) {
5662 @ Override
5763 public void start () {
5864 logger .info ("starting " + this .getClass ().getName ());
65+ resetDedupWindow ();
5966 doStart ();
6067 startScheduler ();
6168 }
@@ -112,11 +119,17 @@ private void flushQueue() {
112119 if (snapshot .isEmpty ()) {
113120 return ;
114121 }
115- doProcess (snapshot );
116- successfullyProcessedStacksCounter .addAndGet (snapshot .size ());
122+
123+ Collection <RecordingEvent > reportableSnapshot = filterReportableStacks (snapshot );
124+ if (reportableSnapshot .isEmpty ()) {
125+ continue ;
126+ }
127+
128+ doProcess (reportableSnapshot );
129+ successfullyProcessedStacksCounter .addAndGet (reportableSnapshot .size ());
117130 logger .info (
118131 String .format ("delegated %d stacks to %s. successfully processed so far %d" ,
119- snapshot .size (),
132+ reportableSnapshot .size (),
120133 this .getClass ().getSimpleName (),
121134 successfullyProcessedStacksCounter .intValue ()));
122135 }
@@ -128,6 +141,49 @@ private Collection<RecordingEvent> drainNextBatch() {
128141 return drainBuffer ;
129142 }
130143
144+ private Collection <RecordingEvent > filterReportableStacks (Collection <RecordingEvent > snapshot ) {
145+ if (isReportAllStacks ()) {
146+ return snapshot ;
147+ }
148+
149+ maybeResetSeenStackHashes ();
150+
151+ dedupBuffer .clear ();
152+ for (RecordingEvent event : snapshot ) {
153+ if (seenStackHashes .add (event .getStackHash ())) {
154+ dedupBuffer .add (event );
155+ }
156+ }
157+ return dedupBuffer ;
158+ }
159+
160+ private void maybeResetSeenStackHashes () {
161+ long now = currentTimeMillis ();
162+ if (now < nextStackHashResetAtMillis ) {
163+ return ;
164+ }
165+
166+ seenStackHashes .clear ();
167+ nextStackHashResetAtMillis = now + getStackHashResetIntervalMillis ();
168+ }
169+
170+ private void resetDedupWindow () {
171+ seenStackHashes .clear ();
172+ nextStackHashResetAtMillis = currentTimeMillis () + getStackHashResetIntervalMillis ();
173+ }
174+
175+ protected boolean isReportAllStacks () {
176+ return Config .get ().processor .reportAllStacks ;
177+ }
178+
179+ protected long getStackHashResetIntervalMillis () {
180+ return Math .max (1_000L , Config .get ().processor .stackHashResetIntervalMillis );
181+ }
182+
183+ protected long currentTimeMillis () {
184+ return System .currentTimeMillis ();
185+ }
186+
131187 class WorkQueueProcessorTask implements Runnable {
132188
133189 @ Override
0 commit comments