Skip to content

Commit 9715653

Browse files
[fix][broker] Fix memory leak when metrics are updated in a thread other than FastThreadLocalThread (#24719)
1 parent a2b69cc commit 9715653

4 files changed

Lines changed: 239 additions & 125 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java

Lines changed: 8 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,10 @@
1919
package org.apache.pulsar.broker.stats.prometheus.metrics;
2020

2121
import com.yahoo.sketches.quantiles.DoublesSketch;
22-
import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
2322
import com.yahoo.sketches.quantiles.DoublesUnion;
2423
import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
25-
import io.netty.util.concurrent.FastThreadLocal;
26-
import java.util.Map;
27-
import java.util.concurrent.ConcurrentHashMap;
2824
import java.util.concurrent.TimeUnit;
2925
import java.util.concurrent.atomic.LongAdder;
30-
import java.util.concurrent.locks.StampedLock;
3126
import org.apache.bookkeeper.stats.OpStatsData;
3227
import org.apache.bookkeeper.stats.OpStatsLogger;
3328

@@ -65,15 +60,7 @@ public void registerFailedEvent(long eventLatency, TimeUnit unit) {
6560

6661
failCountAdder.increment();
6762
failSumAdder.add((long) valueMillis);
68-
69-
LocalData localData = current.localData.get();
70-
71-
long stamp = localData.lock.readLock();
72-
try {
73-
localData.failSketch.update(valueMillis);
74-
} finally {
75-
localData.lock.unlockRead(stamp);
76-
}
63+
current.getLocalData().updateFail(valueMillis);
7764
}
7865

7966
@Override
@@ -82,45 +69,21 @@ public void registerSuccessfulEvent(long eventLatency, TimeUnit unit) {
8269

8370
successCountAdder.increment();
8471
successSumAdder.add((long) valueMillis);
85-
86-
LocalData localData = current.localData.get();
87-
88-
long stamp = localData.lock.readLock();
89-
try {
90-
localData.successSketch.update(valueMillis);
91-
} finally {
92-
localData.lock.unlockRead(stamp);
93-
}
72+
current.getLocalData().updateSuccess(valueMillis);
9473
}
9574

9675
@Override
9776
public void registerSuccessfulValue(long value) {
9877
successCountAdder.increment();
9978
successSumAdder.add(value);
100-
101-
LocalData localData = current.localData.get();
102-
103-
long stamp = localData.lock.readLock();
104-
try {
105-
localData.successSketch.update(value);
106-
} finally {
107-
localData.lock.unlockRead(stamp);
108-
}
79+
current.getLocalData().updateSuccess(value);
10980
}
11081

11182
@Override
11283
public void registerFailedValue(long value) {
11384
failCountAdder.increment();
11485
failSumAdder.add(value);
115-
116-
LocalData localData = current.localData.get();
117-
118-
long stamp = localData.lock.readLock();
119-
try {
120-
localData.failSketch.update(value);
121-
} finally {
122-
localData.lock.unlockRead(stamp);
123-
}
86+
current.getLocalData().updateFail(value);
12487
}
12588

12689
@Override
@@ -141,21 +104,11 @@ public void rotateLatencyCollection() {
141104
current = replacement;
142105
replacement = local;
143106

144-
final DoublesUnion aggregateSuccesss = new DoublesUnionBuilder().build();
107+
final DoublesUnion aggregateSuccess = new DoublesUnionBuilder().build();
145108
final DoublesUnion aggregateFail = new DoublesUnionBuilder().build();
146-
local.map.forEach((localData, b) -> {
147-
long stamp = localData.lock.writeLock();
148-
try {
149-
aggregateSuccesss.update(localData.successSketch);
150-
localData.successSketch.reset();
151-
aggregateFail.update(localData.failSketch);
152-
localData.failSketch.reset();
153-
} finally {
154-
localData.lock.unlockWrite(stamp);
155-
}
156-
});
157-
158-
successResult = aggregateSuccesss.getResultAndReset();
109+
local.record(aggregateSuccess, aggregateFail);
110+
111+
successResult = aggregateSuccess.getResultAndReset();
159112
failResult = aggregateFail.getResultAndReset();
160113
}
161114

@@ -171,28 +124,4 @@ public double getQuantileValue(boolean success, double quantile) {
171124
DoublesSketch s = success ? successResult : failResult;
172125
return s != null ? s.getQuantile(quantile) : Double.NaN;
173126
}
174-
175-
private static class LocalData {
176-
private final DoublesSketch successSketch = new DoublesSketchBuilder().build();
177-
private final DoublesSketch failSketch = new DoublesSketchBuilder().build();
178-
private final StampedLock lock = new StampedLock();
179-
}
180-
181-
private static class ThreadLocalAccessor {
182-
private final Map<LocalData, Boolean> map = new ConcurrentHashMap<>();
183-
private final FastThreadLocal<LocalData> localData = new FastThreadLocal<LocalData>() {
184-
185-
@Override
186-
protected LocalData initialValue() throws Exception {
187-
LocalData localData = new LocalData();
188-
map.put(localData, Boolean.TRUE);
189-
return localData;
190-
}
191-
192-
@Override
193-
protected void onRemoval(LocalData value) throws Exception {
194-
map.remove(value);
195-
}
196-
};
197-
}
198127
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,10 @@
1919
package org.apache.pulsar.broker.stats.prometheus.metrics;
2020

2121
import com.yahoo.sketches.quantiles.DoublesSketch;
22-
import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
2322
import com.yahoo.sketches.quantiles.DoublesUnion;
2423
import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
25-
import io.netty.util.concurrent.FastThreadLocal;
26-
import java.util.Map;
27-
import java.util.concurrent.ConcurrentHashMap;
2824
import java.util.concurrent.TimeUnit;
2925
import java.util.concurrent.atomic.LongAdder;
30-
import java.util.concurrent.locks.StampedLock;
3126

3227
public class DataSketchesSummaryLogger {
3328

@@ -55,14 +50,7 @@ public void registerEvent(long eventLatency, TimeUnit unit) {
5550
countAdder.increment();
5651
sumAdder.add((long) valueMillis);
5752

58-
LocalData localData = current.localData.get();
59-
60-
long stamp = localData.lock.readLock();
61-
try {
62-
localData.successSketch.update(valueMillis);
63-
} finally {
64-
localData.lock.unlockRead(stamp);
65-
}
53+
current.getLocalData().updateSuccess(valueMillis);
6654
}
6755

6856
public void rotateLatencyCollection() {
@@ -72,15 +60,7 @@ public void rotateLatencyCollection() {
7260
replacement = local;
7361

7462
final DoublesUnion aggregateValues = new DoublesUnionBuilder().build();
75-
local.map.forEach((localData, b) -> {
76-
long stamp = localData.lock.writeLock();
77-
try {
78-
aggregateValues.update(localData.successSketch);
79-
localData.successSketch.reset();
80-
} finally {
81-
localData.lock.unlockWrite(stamp);
82-
}
83-
});
63+
local.record(aggregateValues, null);
8464

8565
values = aggregateValues.getResultAndReset();
8666
}
@@ -97,27 +77,4 @@ public double getQuantileValue(double quantile) {
9777
DoublesSketch s = values;
9878
return s != null ? s.getQuantile(quantile) : Double.NaN;
9979
}
100-
101-
private static class LocalData {
102-
private final DoublesSketch successSketch = new DoublesSketchBuilder().build();
103-
private final StampedLock lock = new StampedLock();
104-
}
105-
106-
private static class ThreadLocalAccessor {
107-
private final Map<LocalData, Boolean> map = new ConcurrentHashMap<>();
108-
private final FastThreadLocal<LocalData> localData = new FastThreadLocal<LocalData>() {
109-
110-
@Override
111-
protected LocalData initialValue() throws Exception {
112-
LocalData localData = new LocalData();
113-
map.put(localData, Boolean.TRUE);
114-
return localData;
115-
}
116-
117-
@Override
118-
protected void onRemoval(LocalData value) throws Exception {
119-
map.remove(value);
120-
}
121-
};
122-
}
123-
}
80+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.stats.prometheus.metrics;
20+
21+
import com.google.common.annotations.VisibleForTesting;
22+
import com.yahoo.sketches.quantiles.DoublesSketch;
23+
import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
24+
import com.yahoo.sketches.quantiles.DoublesUnion;
25+
import io.netty.util.concurrent.FastThreadLocal;
26+
import io.netty.util.concurrent.FastThreadLocalThread;
27+
import java.lang.ref.WeakReference;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.locks.StampedLock;
30+
import org.jspecify.annotations.Nullable;
31+
32+
class ThreadLocalAccessor {
33+
34+
private final ConcurrentHashMap<LocalData, Boolean> map = new ConcurrentHashMap<>();
35+
private final FastThreadLocal<LocalData> localData = new FastThreadLocal<>() {
36+
37+
@Override
38+
protected LocalData initialValue() {
39+
LocalData localData = new LocalData(Thread.currentThread());
40+
map.put(localData, Boolean.TRUE);
41+
return localData;
42+
}
43+
44+
@Override
45+
protected void onRemoval(LocalData value) {
46+
map.remove(value);
47+
}
48+
};
49+
50+
void record(DoublesUnion aggregateSuccess, @Nullable DoublesUnion aggregateFail) {
51+
map.keySet().forEach(key -> {
52+
key.record(aggregateSuccess, aggregateFail);
53+
if (key.shouldRemove()) {
54+
map.remove(key);
55+
}
56+
});
57+
}
58+
59+
LocalData getLocalData() {
60+
return localData.get();
61+
}
62+
63+
@VisibleForTesting
64+
int getLocalDataCount() {
65+
return map.keySet().size();
66+
}
67+
68+
static class LocalData {
69+
70+
private final DoublesSketch successSketch = new DoublesSketchBuilder().build();
71+
private final DoublesSketch failSketch = new DoublesSketchBuilder().build();
72+
private final StampedLock lock = new StampedLock();
73+
// Keep a weak reference to the owner thread so that we can remove the LocalData when the thread
74+
// is not alive anymore or has been garbage collected.
75+
// This reference isn't needed when the owner thread is a FastThreadLocalThread and will be null in that case.
76+
// The removal is handled by FastThreadLocal#onRemoval when the owner thread is a FastThreadLocalThread.
77+
private final WeakReference<Thread> ownerThreadReference;
78+
79+
LocalData(Thread ownerThread) {
80+
if (ownerThread instanceof FastThreadLocalThread) {
81+
ownerThreadReference = null;
82+
} else {
83+
ownerThreadReference = new WeakReference<>(ownerThread);
84+
}
85+
}
86+
87+
private boolean shouldRemove() {
88+
if (ownerThreadReference == null) {
89+
// the owner is a FastThreadLocalThread which handles the removal using FastThreadLocal#onRemoval
90+
return false;
91+
} else {
92+
Thread ownerThread = ownerThreadReference.get();
93+
if (ownerThread == null) {
94+
// the thread has already been garbage collected, LocalData should be removed
95+
return true;
96+
} else {
97+
// the thread isn't alive anymore, LocalData should be removed
98+
return !ownerThread.isAlive();
99+
}
100+
}
101+
}
102+
103+
void record(DoublesUnion aggregateSuccess, @Nullable DoublesUnion aggregateFail) {
104+
long stamp = lock.writeLock();
105+
try {
106+
aggregateSuccess.update(successSketch);
107+
successSketch.reset();
108+
if (aggregateFail != null) {
109+
aggregateFail.update(failSketch);
110+
failSketch.reset();
111+
}
112+
} finally {
113+
lock.unlockWrite(stamp);
114+
}
115+
}
116+
117+
void updateSuccess(double value) {
118+
long stamp = lock.readLock();
119+
try {
120+
successSketch.update(value);
121+
} finally {
122+
lock.unlockRead(stamp);
123+
}
124+
}
125+
126+
void updateFail(double value) {
127+
long stamp = lock.readLock();
128+
try {
129+
failSketch.update(value);
130+
} finally {
131+
lock.unlockRead(stamp);
132+
}
133+
}
134+
}
135+
}

0 commit comments

Comments
 (0)