Skip to content

Commit 1d53d36

Browse files
TakaHiR07fanjianye
andauthored
[fix][broker] fix broker may lost rack information (#23331)
Co-authored-by: fanjianye <fanjianye@bigo.sg>
1 parent 5812084 commit 1d53d36

1 file changed

Lines changed: 12 additions & 2 deletions

File tree

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,13 @@ private CompletableFuture<Versioned<Set<BookieId>>> getBookiesThenFreshCache(Str
181181
@Override
182182
public CompletableFuture<Void> watchWritableBookies(RegistrationListener registrationListener) {
183183
writableBookiesWatchers.add(registrationListener);
184+
// trigger all listeners in writableBookiesWatchers one by one. It aims to keep a sync way
185+
// to make sure the previous listener has finished when a new listener is register.
186+
// Though it would bring duplicate trigger listener problem, but since watchWritableBookies
187+
// is only executed when bookieClient construct, the duplicate problem is acceptable.
184188
return getWritableBookies()
185-
.thenAcceptAsync(registrationListener::onBookiesChanged, executor);
189+
.thenAcceptAsync(bookies ->
190+
writableBookiesWatchers.forEach(w -> w.onBookiesChanged(bookies)), executor);
186191
}
187192

188193
@Override
@@ -193,8 +198,13 @@ public void unwatchWritableBookies(RegistrationListener registrationListener) {
193198
@Override
194199
public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener registrationListener) {
195200
readOnlyBookiesWatchers.add(registrationListener);
201+
// trigger all listeners in readOnlyBookiesWatchers one by one. It aims to keep a sync way
202+
// to make sure the previous listener has finished when a new listener is register.
203+
// Though it would bring duplicate trigger listener problem, but since watchReadOnlyBookies
204+
// is only executed when bookieClient construct, the duplicate problem is acceptable.
196205
return getReadOnlyBookies()
197-
.thenAcceptAsync(registrationListener::onBookiesChanged, executor);
206+
.thenAcceptAsync(bookies ->
207+
readOnlyBookiesWatchers.forEach(w -> w.onBookiesChanged(bookies)), executor);
198208
}
199209

200210
@Override

0 commit comments

Comments
 (0)