Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 41ed8f1

Browse files
author
Enrico Olivelli
authored
[cleanup] Remove static LOOKUP_CLIENT_MAP (#1720)
### Motivation There is no need to use a static Map to access the LookupClient. ### Modifications Keep only a reference to the field in the ProtocolHandler and pass it. This way the lifecycle is cleaner. Also we close all the "clients" as last step, otherwise other components may see unexpected errors due to the clients being shutdown.
1 parent 5398f77 commit 41ed8f1

8 files changed

Lines changed: 40 additions & 36 deletions

File tree

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
5050
private final KopBrokerLookupManager kopBrokerLookupManager;
5151
@Getter
5252
private final KafkaTopicManagerSharedState kafkaTopicManagerSharedState;
53+
private final LookupClient lookupClient;
5354

5455
private final AdminManager adminManager;
5556
private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
@@ -80,13 +81,15 @@ public KafkaChannelInitializer(PulsarService pulsarService,
8081
boolean skipMessagesWithoutIndex,
8182
RequestStats requestStats,
8283
OrderedScheduler sendResponseScheduler,
83-
KafkaTopicManagerSharedState kafkaTopicManagerSharedState) {
84+
KafkaTopicManagerSharedState kafkaTopicManagerSharedState,
85+
LookupClient lookupClient) {
8486
super();
8587
this.pulsarService = pulsarService;
8688
this.kafkaConfig = kafkaConfig;
8789
this.tenantContextManager = tenantContextManager;
8890
this.replicaManager = replicaManager;
8991
this.kopBrokerLookupManager = kopBrokerLookupManager;
92+
this.lookupClient = lookupClient;
9093
this.adminManager = adminManager;
9194
this.producePurgatory = producePurgatory;
9295
this.fetchPurgatory = fetchPurgatory;
@@ -127,7 +130,7 @@ public KafkaRequestHandler newCnx() throws Exception {
127130
tenantContextManager, replicaManager, kopBrokerLookupManager, adminManager,
128131
producePurgatory, fetchPurgatory,
129132
enableTls, advertisedEndPoint, skipMessagesWithoutIndex, requestStats, sendResponseScheduler,
130-
kafkaTopicManagerSharedState);
133+
kafkaTopicManagerSharedState, lookupClient);
131134
}
132135

133136
@VisibleForTesting
@@ -138,6 +141,6 @@ public KafkaRequestHandler newCnx(final TenantContextManager tenantContextManage
138141
enableTls, advertisedEndPoint, skipMessagesWithoutIndex,
139142
requestStats,
140143
sendResponseScheduler,
141-
kafkaTopicManagerSharedState);
144+
kafkaTopicManagerSharedState, lookupClient);
142145
}
143146
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.concurrent.ConcurrentHashMap;
4444
import java.util.concurrent.TimeUnit;
4545
import lombok.Getter;
46-
import lombok.NonNull;
4746
import lombok.extern.slf4j.Slf4j;
4847
import org.apache.bookkeeper.common.util.OrderedScheduler;
4948
import org.apache.commons.configuration.Configuration;
@@ -52,7 +51,6 @@
5251
import org.apache.kafka.common.record.CompressionType;
5352
import org.apache.kafka.common.utils.Time;
5453
import org.apache.pulsar.broker.PulsarServerException;
55-
import org.apache.pulsar.broker.PulsarService;
5654
import org.apache.pulsar.broker.ServiceConfiguration;
5755
import org.apache.pulsar.broker.namespace.NamespaceService;
5856
import org.apache.pulsar.broker.protocol.ProtocolHandler;
@@ -71,8 +69,6 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
7169

7270
public static final String PROTOCOL_NAME = "kafka";
7371
public static final String TLS_HANDLER = "tls";
74-
private static final Map<PulsarService, LookupClient> LOOKUP_CLIENT_MAP = new ConcurrentHashMap<>();
75-
7672
@Getter
7773
private RequestStats requestStats;
7874
private PrometheusMetricsProvider statsProvider;
@@ -84,6 +80,7 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
8480
private SystemTopicClient txnTopicClient;
8581
private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
8682
private DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
83+
private LookupClient lookupClient;
8784
@VisibleForTesting
8885
@Getter
8986
private Map<InetSocketAddress, ChannelInitializer<SocketChannel>> channelInitializerMap;
@@ -202,12 +199,12 @@ public void start(BrokerService service) {
202199
throw new IllegalStateException(e);
203200
}
204201

205-
LOOKUP_CLIENT_MAP.put(brokerService.pulsar(), new LookupClient(brokerService.pulsar(), kafkaConfig));
202+
lookupClient = new LookupClient(brokerService.pulsar(), kafkaConfig);
206203
offsetTopicClient = new SystemTopicClient(brokerService.pulsar(), kafkaConfig);
207204
txnTopicClient = new SystemTopicClient(brokerService.pulsar(), kafkaConfig);
208205

209206
try {
210-
kopBrokerLookupManager = new KopBrokerLookupManager(kafkaConfig, brokerService.getPulsar());
207+
kopBrokerLookupManager = new KopBrokerLookupManager(kafkaConfig, brokerService.getPulsar(), lookupClient);
211208
} catch (Exception ex) {
212209
log.error("Failed to get kopBrokerLookupManager", ex);
213210
throw new IllegalStateException(ex);
@@ -402,7 +399,8 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi
402399
kafkaConfig.isSkipMessagesWithoutIndex(),
403400
requestStats,
404401
sendResponseScheduler,
405-
kafkaTopicManagerSharedState);
402+
kafkaTopicManagerSharedState,
403+
lookupClient);
406404
}
407405

408406
// this is called after initialize, and with kafkaConfig, brokerService all set.
@@ -456,16 +454,6 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
456454

457455
@Override
458456
public void close() {
459-
Optional.ofNullable(LOOKUP_CLIENT_MAP.remove(brokerService.pulsar())).ifPresent(LookupClient::close);
460-
if (offsetTopicClient != null) {
461-
offsetTopicClient.close();
462-
}
463-
if (txnTopicClient != null) {
464-
txnTopicClient.close();
465-
}
466-
if (adminManager != null) {
467-
adminManager.shutdown();
468-
}
469457
if (producePurgatory != null) {
470458
producePurgatory.shutdown();
471459
}
@@ -483,6 +471,19 @@ public void close() {
483471
kopBrokerLookupManager.close();
484472
statsProvider.stop();
485473
sendResponseScheduler.shutdown();
474+
475+
if (offsetTopicClient != null) {
476+
offsetTopicClient.close();
477+
}
478+
if (txnTopicClient != null) {
479+
txnTopicClient.close();
480+
}
481+
if (adminManager != null) {
482+
adminManager.shutdown();
483+
}
484+
if (lookupClient != null) {
485+
lookupClient.close();
486+
}
486487
}
487488

488489
@VisibleForTesting
@@ -571,8 +572,4 @@ public TransactionCoordinator initTransactionCoordinator(String tenant, PulsarAd
571572

572573
return transactionCoordinator;
573574
}
574-
575-
public static @NonNull LookupClient getLookupClient(final PulsarService pulsarService) {
576-
return LOOKUP_CLIENT_MAP.computeIfAbsent(pulsarService, ignored -> new LookupClient(pulsarService));
577-
}
578575
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
213213
private final TenantContextManager tenantContextManager;
214214
private final ReplicaManager replicaManager;
215215
private final KopBrokerLookupManager kopBrokerLookupManager;
216+
217+
@Getter
218+
private final LookupClient lookupClient;
216219
@Getter
217220
private final KafkaTopicManagerSharedState kafkaTopicManagerSharedState;
218221

@@ -313,12 +316,14 @@ public KafkaRequestHandler(PulsarService pulsarService,
313316
boolean skipMessagesWithoutIndex,
314317
RequestStats requestStats,
315318
OrderedScheduler sendResponseScheduler,
316-
KafkaTopicManagerSharedState kafkaTopicManagerSharedState) throws Exception {
319+
KafkaTopicManagerSharedState kafkaTopicManagerSharedState,
320+
LookupClient lookupClient) throws Exception {
317321
super(requestStats, kafkaConfig, sendResponseScheduler);
318322
this.pulsarService = pulsarService;
319323
this.tenantContextManager = tenantContextManager;
320324
this.replicaManager = replicaManager;
321325
this.kopBrokerLookupManager = kopBrokerLookupManager;
326+
this.lookupClient = lookupClient;
322327
this.clusterName = kafkaConfig.getClusterName();
323328
this.executor = pulsarService.getExecutor();
324329
this.admin = pulsarService.getAdminClient();

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class KafkaTopicManager {
4646
PulsarService pulsarService = kafkaRequestHandler.getPulsarService();
4747
this.brokerService = pulsarService.getBrokerService();
4848
this.internalServerCnx = new InternalServerCnx(requestHandler);
49-
this.lookupClient = KafkaProtocolHandler.getLookupClient(pulsarService);
49+
this.lookupClient = kafkaRequestHandler.getLookupClient();
5050
this.kafkaTopicLookupService = new KafkaTopicLookupService(pulsarService.getBrokerService());
5151
}
5252

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,10 @@ public class KopBrokerLookupManager {
4848
public static final ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>>
4949
LOOKUP_CACHE = new ConcurrentHashMap<>();
5050

51-
public KopBrokerLookupManager(KafkaServiceConfiguration conf, PulsarService pulsarService) throws Exception {
51+
public KopBrokerLookupManager(KafkaServiceConfiguration conf, PulsarService pulsarService,
52+
LookupClient lookupClient) throws Exception {
5253
this.pulsar = pulsarService;
53-
this.lookupClient = KafkaProtocolHandler.getLookupClient(pulsarService);
54+
this.lookupClient = lookupClient;
5455
this.metadataStoreCacheLoader = new MetadataStoreCacheLoader(pulsarService.getPulsarResources(),
5556
conf.getBrokerLookupTimeoutMs());
5657
this.selfAdvertisedListeners = conf.getKafkaAdvertisedListeners();

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/LookupClient.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,6 @@ public LookupClient(final PulsarService pulsarService, final KafkaServiceConfigu
3030
super(createPulsarClient(pulsarService, kafkaConfig, conf -> {}));
3131
}
3232

33-
public LookupClient(final PulsarService pulsarService) {
34-
super(createPulsarClient(pulsarService));
35-
log.warn("This constructor should not be called, it's only called "
36-
+ "when the PulsarService doesn't exist in KafkaProtocolHandlers.LOOKUP_CLIENT_UP");
37-
}
38-
3933
public CompletableFuture<InetSocketAddress> getBrokerAddress(final TopicName topicName) {
4034
return getPulsarClient().getLookup().getBroker(topicName).thenApply(Pair::getLeft);
4135
}

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopBrokerLookupManagerTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class KopBrokerLookupManagerTest extends KopProtocolHandlerTestBase {
3030
private static final String TENANT = "test";
3131
private static final String NAMESPACE = TENANT + "/" + "kop-broker-lookup-manager-test";
3232

33+
private LookupClient lookupClient;
3334
private KopBrokerLookupManager kopBrokerLookupManager;
3435

3536
@BeforeClass
@@ -43,13 +44,16 @@ protected void setup() throws Exception {
4344
.build());
4445
admin.namespaces().createNamespace(NAMESPACE);
4546
admin.namespaces().setDeduplicationStatus(NAMESPACE, true);
46-
kopBrokerLookupManager = new KopBrokerLookupManager(conf, pulsar);
47+
lookupClient = new LookupClient(pulsar, conf);
48+
kopBrokerLookupManager = new KopBrokerLookupManager(conf, pulsar, lookupClient);
4749
}
4850

4951
@AfterClass
5052
@Override
5153
protected void cleanup() throws Exception {
5254
super.internalCleanup();
55+
kopBrokerLookupManager.close();
56+
lookupClient.close();
5357
}
5458

5559
@Test(timeOut = 20 * 1000)

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KopProtocolHandlerTestBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ protected void createAdmin() throws Exception {
247247
}
248248

249249
protected void createClient() throws Exception {
250-
this.pulsarClient = KafkaProtocolHandler.getLookupClient(pulsar).getPulsarClient();
250+
this.pulsarClient = new LookupClient(pulsar, conf).getPulsarClient();
251251
}
252252

253253
protected String getAdvertisedAddress() {

0 commit comments

Comments
 (0)