Skip to content

Commit 99fdca8

Browse files
authored
[fix][proxy] Fix memory leaks in ParserProxyHandler (apache#25142)
1 parent 39dbbf0 commit 99fdca8

4 files changed

Lines changed: 155 additions & 50 deletions

File tree

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -426,26 +426,27 @@ private void startDirectProxying(CommandConnected connected) {
426426
} else {
427427
// Enable parsing feature, proxyLogLevel(1 or 2)
428428
// Add parser handler
429+
ParserProxyHandler.Context parserContext = ParserProxyHandler.createContext();
429430
if (connected.hasMaxMessageSize()) {
430431
FrameDecoderUtil.replaceFrameDecoder(inboundChannel.pipeline(),
431432
connected.getMaxMessageSize());
432433
FrameDecoderUtil.replaceFrameDecoder(outboundChannel.pipeline(),
433434
connected.getMaxMessageSize());
434435
inboundChannel.pipeline().addBefore("handler", "inboundParser",
435-
new ParserProxyHandler(service,
436+
new ParserProxyHandler(parserContext, service,
436437
ParserProxyHandler.FRONTEND_CONN,
437438
connected.getMaxMessageSize(), outboundChannel.id()));
438439
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
439-
new ParserProxyHandler(service,
440+
new ParserProxyHandler(parserContext, service,
440441
ParserProxyHandler.BACKEND_CONN,
441442
connected.getMaxMessageSize(), inboundChannel.id()));
442443
} else {
443444
inboundChannel.pipeline().addBefore("handler", "inboundParser",
444-
new ParserProxyHandler(service,
445+
new ParserProxyHandler(parserContext, service,
445446
ParserProxyHandler.FRONTEND_CONN,
446447
Commands.DEFAULT_MAX_MESSAGE_SIZE, outboundChannel.id()));
447448
outboundChannel.pipeline().addBefore("proxyOutboundHandler", "outboundParser",
448-
new ParserProxyHandler(service,
449+
new ParserProxyHandler(parserContext, service,
449450
ParserProxyHandler.BACKEND_CONN,
450451
Commands.DEFAULT_MAX_MESSAGE_SIZE, inboundChannel.id()));
451452
}

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ParserProxyHandler.java

Lines changed: 82 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@
3131
import java.util.List;
3232
import java.util.Map;
3333
import java.util.concurrent.ConcurrentHashMap;
34+
import lombok.Getter;
3435
import org.apache.commons.lang3.mutable.MutableLong;
3536
import org.apache.pulsar.common.api.proto.BaseCommand;
3637
import org.apache.pulsar.common.api.raw.MessageParser;
3738
import org.apache.pulsar.common.api.raw.RawMessage;
3839
import org.apache.pulsar.common.naming.TopicName;
40+
import org.apache.pulsar.common.util.StringInterner;
3941
import org.apache.pulsar.proxy.stats.TopicStats;
4042
import org.slf4j.Logger;
4143
import org.slf4j.LoggerFactory;
@@ -53,27 +55,41 @@ public class ParserProxyHandler extends ChannelInboundHandlerAdapter {
5355

5456
private final int maxMessageSize;
5557
private final ChannelId peerChannelId;
58+
@Getter
59+
private final Context context;
5660
private final ProxyService service;
5761

5862

59-
/**
60-
* producerid + channelid as key.
61-
*/
62-
private static final Map<String, String> producerHashMap = new ConcurrentHashMap<>();
63+
public static class Context {
64+
/**
65+
* producerid as key.
66+
*/
67+
@Getter
68+
private final Map<Long, String> producerIdToTopicName = new ConcurrentHashMap<>();
6369

64-
/**
65-
* consumerid + channelid as key.
66-
*/
67-
private static final Map<String, String> consumerHashMap = new ConcurrentHashMap<>();
70+
/**
71+
* consumerid as key.
72+
*/
73+
@Getter
74+
private final Map<Long, String> consumerIdToTopicName = new ConcurrentHashMap<>();
6875

69-
public ParserProxyHandler(ProxyService service, String type, int maxMessageSize,
76+
private Context() {
77+
}
78+
}
79+
80+
public ParserProxyHandler(Context context, ProxyService service, String type, int maxMessageSize,
7081
ChannelId peerChannelId) {
82+
this.context = context;
7183
this.service = service;
7284
this.connType = type;
7385
this.maxMessageSize = maxMessageSize;
7486
this.peerChannelId = peerChannelId;
7587
}
7688

89+
public static Context createContext() {
90+
return new Context();
91+
}
92+
7793
private void logging(Channel conn, BaseCommand.Type cmdtype, String info, List<RawMessage> messages) {
7894

7995
if (messages != null) {
@@ -115,64 +131,86 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
115131

116132
switch (cmd.getType()) {
117133
case PRODUCER:
118-
ParserProxyHandler.producerHashMap.put(cmd.getProducer().getProducerId() + "," + ctx.channel().id(),
119-
cmd.getProducer().getTopic());
134+
topicName = StringInterner.intern(cmd.getProducer().getTopic());
135+
context.producerIdToTopicName.put(cmd.getProducer().getProducerId(), topicName);
120136

121137
String producerName = "";
122138
if (cmd.getProducer().hasProducerName()){
123139
producerName = cmd.getProducer().getProducerName();
124140
}
125141
logging(ctx.channel(), cmd.getType(), "{producer:" + producerName
126-
+ ",topic:" + cmd.getProducer().getTopic() + "}", null);
142+
+ ",topic:" + topicName + "}", null);
143+
break;
144+
case CLOSE_PRODUCER:
145+
context.producerIdToTopicName.remove(cmd.getCloseProducer().getProducerId());
146+
logging(ctx.channel(), cmd.getType(), "", null);
127147
break;
128-
129148
case SEND:
130149
if (service.getProxyLogLevel() != 2) {
131150
logging(ctx.channel(), cmd.getType(), "", null);
132151
break;
133152
}
134-
topicName = TopicName.toFullTopicName(ParserProxyHandler.producerHashMap.get(
135-
cmd.getSend().getProducerId() + "," + ctx.channel().id()));
136-
MutableLong msgBytes = new MutableLong(0);
137-
MessageParser.parseMessage(topicName, -1L,
138-
-1L, buffer, (message) -> {
139-
messages.add(message);
140-
msgBytes.add(message.getData().readableBytes());
141-
}, maxMessageSize);
142-
// update topic stats
143-
TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName,
144-
topic -> new TopicStats());
145-
topicStats.getMsgInRate().recordMultipleEvents(messages.size(), msgBytes.longValue());
146-
logging(ctx.channel(), cmd.getType(), "", messages);
153+
long producerId = cmd.getSend().getProducerId();
154+
String topicForProducer = context.producerIdToTopicName.get(producerId);
155+
if (topicForProducer != null) {
156+
topicName = TopicName.toFullTopicName(topicForProducer);
157+
MutableLong msgBytes = new MutableLong(0);
158+
MessageParser.parseMessage(topicName, -1L,
159+
-1L, buffer, (message) -> {
160+
messages.add(message);
161+
msgBytes.add(message.getData().readableBytes());
162+
}, maxMessageSize);
163+
// update topic stats
164+
TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName,
165+
topic -> new TopicStats());
166+
topicStats.getMsgInRate().recordMultipleEvents(messages.size(), msgBytes.longValue());
167+
logging(ctx.channel(), cmd.getType(), "", messages);
168+
} else {
169+
logging(ctx.channel(), cmd.getType(),
170+
"Cannot find topic name for producerId " + producerId, null);
171+
}
147172
break;
148173

149174
case SUBSCRIBE:
150-
ParserProxyHandler.consumerHashMap.put(cmd.getSubscribe().getConsumerId() + ","
151-
+ ctx.channel().id(), cmd.getSubscribe().getTopic());
175+
topicName = StringInterner.intern(cmd.getSubscribe().getTopic());
176+
context.consumerIdToTopicName.put(cmd.getSubscribe().getConsumerId(), topicName);
152177

153178
logging(ctx.channel(), cmd.getType(), "{consumer:" + cmd.getSubscribe().getConsumerName()
154-
+ ",topic:" + cmd.getSubscribe().getTopic() + "}", null);
179+
+ ",topic:" + topicName + "}", null);
180+
break;
181+
case CLOSE_CONSUMER:
182+
context.consumerIdToTopicName.remove(cmd.getCloseConsumer().getConsumerId());
183+
logging(ctx.channel(), cmd.getType(), "", null);
184+
break;
185+
case UNSUBSCRIBE:
186+
context.consumerIdToTopicName.remove(cmd.getUnsubscribe().getConsumerId());
187+
logging(ctx.channel(), cmd.getType(), "", null);
155188
break;
156-
157189
case MESSAGE:
158190
if (service.getProxyLogLevel() != 2) {
159191
logging(ctx.channel(), cmd.getType(), "", null);
160192
break;
161193
}
162-
topicName = TopicName.toFullTopicName(ParserProxyHandler.consumerHashMap.get(
163-
cmd.getMessage().getConsumerId() + "," + peerChannelId));
164-
165-
msgBytes = new MutableLong(0);
166-
MessageParser.parseMessage(topicName, -1L,
167-
-1L, buffer, (message) -> {
168-
messages.add(message);
169-
msgBytes.add(message.getData().readableBytes());
170-
}, maxMessageSize);
171-
// update topic stats
172-
topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(),
173-
topic -> new TopicStats());
174-
topicStats.getMsgOutRate().recordMultipleEvents(messages.size(), msgBytes.longValue());
175-
logging(ctx.channel(), cmd.getType(), "", messages);
194+
long consumerId = cmd.getMessage().getConsumerId();
195+
String topicForConsumer = context.consumerIdToTopicName.get(consumerId);
196+
if (topicForConsumer != null) {
197+
topicName = TopicName.toFullTopicName(topicForConsumer);
198+
199+
MutableLong msgBytes = new MutableLong(0);
200+
MessageParser.parseMessage(topicName, -1L,
201+
-1L, buffer, (message) -> {
202+
messages.add(message);
203+
msgBytes.add(message.getData().readableBytes());
204+
}, maxMessageSize);
205+
// update topic stats
206+
TopicStats topicStats = this.service.getTopicStats().computeIfAbsent(topicName.toString(),
207+
topic -> new TopicStats());
208+
topicStats.getMsgOutRate().recordMultipleEvents(messages.size(), msgBytes.longValue());
209+
logging(ctx.channel(), cmd.getType(), "", messages);
210+
} else {
211+
logging(ctx.channel(), cmd.getType(), "Cannot find topic name for consumerId " + consumerId,
212+
null);
213+
}
176214
break;
177215

178216
default:

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import java.util.concurrent.TimeUnit;
5656
import java.util.concurrent.atomic.AtomicReference;
5757
import lombok.Getter;
58-
import lombok.Setter;
5958
import org.apache.pulsar.broker.ServiceConfigurationUtils;
6059
import org.apache.pulsar.broker.authentication.AuthenticationService;
6160
import org.apache.pulsar.broker.authorization.AuthorizationService;
@@ -116,7 +115,6 @@ public class ProxyService implements Closeable {
116115
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
117116

118117
@Getter
119-
@Setter
120118
protected int proxyLogLevel;
121119

122120
@Getter
@@ -590,4 +588,14 @@ public boolean isGracefulShutdown() {
590588
public void setGracefulShutdown(boolean gracefulShutdown) {
591589
this.gracefulShutdown = gracefulShutdown;
592590
}
591+
592+
public void setProxyLogLevel(int proxyLogLevel) {
593+
this.proxyLogLevel = proxyLogLevel;
594+
// clear the topic stats when proxy log level is changed to < 2
595+
// this is a way to avoid the proxy consuming too much memory when there are a lot of topics and log level
596+
// has been temporarily set to 2
597+
if (proxyLogLevel < 2) {
598+
topicStats.clear();
599+
}
600+
}
593601
}

pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import static java.util.Objects.requireNonNull;
2222
import static org.mockito.Mockito.doReturn;
2323
import static org.testng.Assert.assertEquals;
24+
import static org.testng.Assert.assertFalse;
2425
import static org.testng.Assert.assertNotNull;
26+
import static org.testng.Assert.assertTrue;
2527
import com.google.gson.Gson;
2628
import com.google.gson.reflect.TypeToken;
2729
import java.util.List;
@@ -214,6 +216,62 @@ public void testTopicStats() throws Exception {
214216

215217
consumer.close();
216218
consumer2.close();
219+
220+
// check that topic stats are cleared after setting proxy log level to 0
221+
assertFalse(proxyService.getTopicStats().isEmpty());
222+
proxyService.setProxyLogLevel(0);
223+
assertTrue(proxyService.getTopicStats().isEmpty());
224+
}
225+
226+
@Test
227+
public void testMemoryLeakFixed() throws Exception {
228+
proxyService.setProxyLogLevel(2);
229+
final String topicName = "persistent://sample/test/local/topic-stats";
230+
final String topicName2 = "persistent://sample/test/local/topic-stats-2";
231+
232+
@Cleanup
233+
PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()).build();
234+
Producer<byte[]> producer1 = client.newProducer(Schema.BYTES).topic(topicName).enableBatching(false)
235+
.producerName("producer1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
236+
237+
Producer<byte[]> producer2 = client.newProducer(Schema.BYTES).topic(topicName2).enableBatching(false)
238+
.producerName("producer2").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
239+
240+
Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
241+
Consumer<byte[]> consumer2 = client.newConsumer().topic(topicName2).subscriptionName("my-sub")
242+
.subscribe();
243+
244+
int totalMessages = 10;
245+
for (int i = 0; i < totalMessages; i++) {
246+
producer1.send("test".getBytes());
247+
producer2.send("test".getBytes());
248+
}
249+
250+
for (int i = 0; i < totalMessages; i++) {
251+
Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
252+
requireNonNull(msg);
253+
consumer.acknowledge(msg);
254+
msg = consumer2.receive(1, TimeUnit.SECONDS);
255+
}
256+
257+
ParserProxyHandler.Context context = proxyService.getClientCnxs().stream().map(proxyConnection -> {
258+
ParserProxyHandler parserProxyHandler = proxyConnection.ctx().pipeline().get(ParserProxyHandler.class);
259+
return parserProxyHandler != null ? parserProxyHandler.getContext() : null;
260+
}).filter(c -> c != null && !c.getConsumerIdToTopicName().isEmpty()).findFirst().get();
261+
262+
assertEquals(context.getConsumerIdToTopicName().size(), 2);
263+
assertEquals(context.getProducerIdToTopicName().size(), 2);
264+
265+
266+
consumer.close();
267+
assertEquals(context.getConsumerIdToTopicName().size(), 1);
268+
consumer2.close();
269+
assertEquals(context.getConsumerIdToTopicName().size(), 0);
270+
271+
producer1.close();
272+
assertEquals(context.getProducerIdToTopicName().size(), 1);
273+
producer2.close();
274+
assertEquals(context.getProducerIdToTopicName().size(), 0);
217275
}
218276

219277
/**

0 commit comments

Comments
 (0)