1212import java .util .concurrent .locks .Condition ;
1313import java .util .concurrent .locks .Lock ;
1414import java .util .concurrent .locks .ReentrantLock ;
15+ import java .util .function .LongSupplier ;
1516
1617class BoundedQueue {
17- // Key represents a tuple of integers (tries, clock).
18+ // Key represents a tuple of integers (tries, clock). enqueuedAtNanos records when the
19+ // payload first entered the queue (in System.nanoTime() units) and is preserved across
20+ // requeues so that "age of oldest item" remains accurate after retries.
1821 static class Key implements Comparable <Key > {
1922 final long tries ;
2023 final long clock ;
24+ final long enqueuedAtNanos ;
2125
2226 Key (long clock ) {
23- this .tries = 0 ;
24- this .clock = clock ;
27+ this (0 , clock , System .nanoTime ());
2528 }
2629
27- private Key (long tries , long clock ) {
30+ private Key (long tries , long clock , long enqueuedAtNanos ) {
2831 this .tries = tries ;
2932 this .clock = clock ;
33+ this .enqueuedAtNanos = enqueuedAtNanos ;
3034 }
3135
3236 Key next () {
33- return new Key (tries + 1 , clock );
37+ return new Key (tries + 1 , clock , enqueuedAtNanos );
3438 }
3539
3640 @ Override
@@ -52,17 +56,36 @@ public int compareTo(Key o) {
5256
5357 final TreeMap <Key , byte []> items = new TreeMap <>();
5458
55- long droppedItems ;
56- long droppedBytes ;
59+ final Telemetry telemetry ;
60+ final LongSupplier nanos ;
5761
5862 Lock lock = new ReentrantLock ();
5963 Condition notEmpty = lock .newCondition ();
6064 Condition notFull = lock .newCondition ();
6165
62- BoundedQueue (long maxBytes , long maxTries , WhenFull whenFull ) {
66+ BoundedQueue (long maxBytes , long maxTries , WhenFull whenFull , Telemetry telemetry ) {
67+ this (maxBytes , maxTries , whenFull , telemetry , System ::nanoTime );
68+ }
69+
70+ BoundedQueue (
71+ long maxBytes ,
72+ long maxTries ,
73+ WhenFull whenFull ,
74+ Telemetry telemetry ,
75+ LongSupplier nanos ) {
6376 this .maxBytes = maxBytes ;
6477 this .maxTries = maxTries ;
6578 this .whenFull = whenFull ;
79+ this .telemetry = telemetry ;
80+ this .nanos = nanos ;
81+ }
82+
83+ long droppedPayloads ;
84+ long droppedBytes ;
85+
86+ private void recordDrop (long bytes ) {
87+ droppedPayloads ++;
88+ droppedBytes += bytes ;
6689 }
6790
6891 void add (byte [] item ) throws InterruptedException {
@@ -72,8 +95,7 @@ void add(byte[] item) throws InterruptedException {
7295 void requeue (Map .Entry <Key , byte []> item ) throws InterruptedException {
7396 Key nextKey = item .getKey ().next ();
7497 if (nextKey .tries > maxTries ) {
75- droppedItems ++;
76- droppedBytes += item .getValue ().length ;
98+ telemetry .onDrop (1 , item .getValue ().length );
7799 return ;
78100 }
79101 put (nextKey , item .getValue (), WhenFull .DROP );
@@ -82,7 +104,7 @@ void requeue(Map.Entry<Key, byte[]> item) throws InterruptedException {
82104 // Must be called when lock is held.
83105 private Key newKey () {
84106 clock ++;
85- return new Key (clock );
107+ return new Key (0 , clock , nanos . getAsLong () );
86108 }
87109
88110 private void put (Key key , byte [] item , WhenFull whenFull ) throws InterruptedException {
@@ -96,7 +118,13 @@ private void put(Key key, byte[] item, WhenFull whenFull) throws InterruptedExce
96118 bytes += item .length ;
97119 notEmpty .signal ();
98120 } finally {
121+ long droppedPayloads = this .droppedPayloads ;
122+ long droppedBytes = this .droppedBytes ;
123+ this .droppedPayloads = 0 ;
124+ this .droppedBytes = 0 ;
99125 lock .unlock ();
126+ // Avoid potential lock ordering issues.
127+ telemetry .onDrop (droppedPayloads , droppedBytes );
100128 }
101129 }
102130
@@ -108,9 +136,8 @@ private void ensureSpace(int length, WhenFull whenFull) throws InterruptedExcept
108136 switch (whenFull ) {
109137 case DROP :
110138 Map .Entry <Key , byte []> last = items .pollLastEntry ();
111- droppedItems ++;
112- droppedBytes += last .getValue ().length ;
113139 bytes -= last .getValue ().length ;
140+ recordDrop (last .getValue ().length );
114141 break ;
115142 case BLOCK :
116143 notFull .await ();
@@ -133,4 +160,23 @@ Map.Entry<Key, byte[]> next() throws InterruptedException {
133160 lock .unlock ();
134161 }
135162 }
163+
164+ void snapshot (long now , Telemetry .Snapshot s ) {
165+ lock .lock ();
166+ try {
167+ long oldestAge = 0L ;
168+ for (Key k : items .keySet ()) {
169+ long age = now - k .enqueuedAtNanos ;
170+ if (age > oldestAge ) {
171+ oldestAge = age ;
172+ }
173+ }
174+ s .queuePayloads = items .size ();
175+ s .queueBytes = bytes ;
176+ s .queueMaxBytes = maxBytes ;
177+ s .oldestEnqueuedAgeNanos = oldestAge ;
178+ } finally {
179+ lock .unlock ();
180+ }
181+ }
136182}
0 commit comments