Skip to content

Commit bacb162

Browse files
authored
[cleanup][client] Remove unneeded sync scope in TransactionImpl (#22932)
1 parent 6a1bbe6 commit bacb162

1 file changed

Lines changed: 20 additions & 24 deletions

File tree

pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -106,18 +106,16 @@ public void run(Timeout timeout) throws Exception {
106106
public CompletableFuture<Void> registerProducedTopic(String topic) {
107107
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
108108
if (checkIfOpen(completableFuture)) {
109-
synchronized (TransactionImpl.this) {
110-
// we need to issue the request to TC to register the produced topic
111-
return registerPartitionMap.compute(topic, (key, future) -> {
112-
if (future != null) {
113-
return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
114-
} else {
115-
return tcClient.addPublishPartitionToTxnAsync(
116-
txnId, Lists.newArrayList(topic))
117-
.thenCompose(ignored -> CompletableFuture.completedFuture(null));
118-
}
119-
});
120-
}
109+
// we need to issue the request to TC to register the produced topic
110+
return registerPartitionMap.compute(topic, (key, future) -> {
111+
if (future != null) {
112+
return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
113+
} else {
114+
return tcClient.addPublishPartitionToTxnAsync(
115+
txnId, Lists.newArrayList(topic))
116+
.thenCompose(ignored -> CompletableFuture.completedFuture(null));
117+
}
118+
});
121119
}
122120
return completableFuture;
123121
}
@@ -147,18 +145,16 @@ public void registerSendOp(CompletableFuture<MessageId> newSendFuture) {
147145
public CompletableFuture<Void> registerAckedTopic(String topic, String subscription) {
148146
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
149147
if (checkIfOpen(completableFuture)) {
150-
synchronized (TransactionImpl.this) {
151-
// we need to issue the request to TC to register the acked topic
152-
return registerSubscriptionMap.compute(Pair.of(topic, subscription), (key, future) -> {
153-
if (future != null) {
154-
return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
155-
} else {
156-
return tcClient.addSubscriptionToTxnAsync(
157-
txnId, topic, subscription)
158-
.thenCompose(ignored -> CompletableFuture.completedFuture(null));
159-
}
160-
});
161-
}
148+
// we need to issue the request to TC to register the acked topic
149+
return registerSubscriptionMap.compute(Pair.of(topic, subscription), (key, future) -> {
150+
if (future != null) {
151+
return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
152+
} else {
153+
return tcClient.addSubscriptionToTxnAsync(
154+
txnId, topic, subscription)
155+
.thenCompose(ignored -> CompletableFuture.completedFuture(null));
156+
}
157+
});
162158
}
163159
return completableFuture;
164160
}

0 commit comments

Comments
 (0)