Skip to content

Commit 879cc01

Browse files
Shawyeokclaude
authored andcommitted
[fix][bk] Fix NPE in IsolatedBookieEnsemblePlacementPolicy when policy class does not match (#25825)
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> (cherry picked from commit b93fe9e)
1 parent c13b393 commit 879cc01

2 files changed

Lines changed: 152 additions & 12 deletions

File tree

pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.bookie.rackawareness;
2020

21+
import static java.util.Collections.emptySet;
2122
import static org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE;
2223
import com.google.common.annotations.VisibleForTesting;
2324
import com.google.common.collect.Sets;
@@ -164,33 +165,36 @@ private static Optional<EnsemblePlacementPolicyConfig> getEnsemblePlacementPolic
164165
return Optional.empty();
165166
}
166167

167-
private static Pair<Set<String>, Set<String>> getIsolationGroup(
168+
@VisibleForTesting
169+
Pair<Set<String>, Set<String>> getIsolationGroup(
168170
EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) {
169-
MutablePair<Set<String>, Set<String>> pair = new MutablePair<>();
170-
String className = IsolatedBookieEnsemblePlacementPolicy.class.getName();
171-
if (ensemblePlacementPolicyConfig.getPolicyClass().getName().equals(className)) {
171+
// Retain compatibility with ZkIsolatedBookieEnsemblePlacementPolicy
172+
Class<?> policyClass = ensemblePlacementPolicyConfig.getPolicyClass();
173+
if (IsolatedBookieEnsemblePlacementPolicy.class.isAssignableFrom(policyClass)) {
174+
MutablePair<Set<String>, Set<String>> pair = new MutablePair<>(emptySet(), emptySet());
172175
Map<String, Object> properties = ensemblePlacementPolicyConfig.getProperties();
173176
String primaryIsolationGroupString = ConfigurationStringUtil
174177
.castToString(properties.getOrDefault(ISOLATION_BOOKIE_GROUPS, ""));
175178
String secondaryIsolationGroupString = ConfigurationStringUtil
176179
.castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, ""));
177180
if (!primaryIsolationGroupString.isEmpty()) {
178181
pair.setLeft(Sets.newHashSet(primaryIsolationGroupString.split(",")));
179-
} else {
180-
pair.setLeft(Collections.emptySet());
181182
}
182183
if (!secondaryIsolationGroupString.isEmpty()) {
183184
pair.setRight(Sets.newHashSet(secondaryIsolationGroupString.split(",")));
184-
} else {
185-
pair.setRight(Collections.emptySet());
186185
}
186+
return pair;
187+
} else {
188+
log.info("The ensemble placement policy class [{}] is not compatible with "
189+
+ "IsolatedBookieEnsemblePlacementPolicy, fallback to use defaultIsolationGroups",
190+
ensemblePlacementPolicyConfig.getPolicyClass().getName());
191+
return defaultIsolationGroups;
187192
}
188-
return pair;
189193
}
190194

191195
@VisibleForTesting
192196
Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
193-
Pair<Set<String>, Set<String>> isolationGroups) {
197+
Pair<Set<String>, Set<String>> isolationGroups) {
194198
Set<BookieId> excludedBookies = new HashSet<>();
195199
if (isolationGroups != null && isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) {
196200
return excludedBookies;
@@ -213,8 +217,8 @@ Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
213217
return excludedBookies;
214218
}
215219
int totalAvailableBookiesInPrimaryGroup = 0;
216-
Set<String> primaryIsolationGroup = Collections.emptySet();
217-
Set<String> secondaryIsolationGroup = Collections.emptySet();
220+
Set<String> primaryIsolationGroup = emptySet();
221+
Set<String> secondaryIsolationGroup = emptySet();
218222
Set<BookieId> primaryGroupBookies = new HashSet<>();
219223
if (isolationGroups != null) {
220224
primaryIsolationGroup = isolationGroups.getLeft();

pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,14 @@
4242
import java.util.Set;
4343
import java.util.concurrent.CompletableFuture;
4444
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
45+
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
4546
import org.apache.bookkeeper.conf.ClientConfiguration;
4647
import org.apache.bookkeeper.feature.SettableFeatureProvider;
4748
import org.apache.bookkeeper.net.BookieId;
4849
import org.apache.bookkeeper.net.BookieSocketAddress;
4950
import org.apache.bookkeeper.stats.NullStatsLogger;
5051
import org.apache.commons.lang3.tuple.MutablePair;
52+
import org.apache.commons.lang3.tuple.Pair;
5153
import org.apache.pulsar.common.policies.data.BookieInfo;
5254
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
5355
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
@@ -57,6 +59,7 @@
5759
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
5860
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
5961
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
62+
import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
6063
import org.awaitility.Awaitility;
6164
import org.testng.Assert;
6265
import org.testng.annotations.AfterMethod;
@@ -844,6 +847,139 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception {
844847
assertTrue(blacklist.isEmpty());
845848
}
846849

850+
/**
851+
* Regression test for the NPE reported in the stack trace below. When custom metadata carries an
852+
* {@link EnsemblePlacementPolicyConfig} whose policy class does NOT match
853+
* {@link IsolatedBookieEnsemblePlacementPolicy}, the old {@code getIsolationGroup()} returned a
854+
* {@code MutablePair} with {@code null} left/right, which caused a {@link NullPointerException} in
855+
* {@code getExcludedBookiesWithIsolationGroups} when {@code getLeft().contains(...)} was called.
856+
*
857+
* <pre>
858+
* java.lang.NullPointerException: Cannot invoke "java.util.Set.contains(Object)"
859+
* because the return value of "org.apache.commons.lang3.tuple.Pair.getLeft()" is null
860+
* at IsolatedBookieEnsemblePlacementPolicy.getExcludedBookiesWithIsolationGroups(...)
861+
* at IsolatedBookieEnsemblePlacementPolicy.getExcludedBookies(...)
862+
* at IsolatedBookieEnsemblePlacementPolicy.replaceBookie(...)
863+
* </pre>
864+
*/
865+
@Test
866+
public void testReplaceBookieWithNonMatchingPolicyClassShouldNotThrowNPE() throws Exception {
867+
Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
868+
Map<String, BookieInfo> group1 = new HashMap<>();
869+
group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
870+
group1.put(BOOKIE2, BookieInfo.builder().rack("rack1").build());
871+
group1.put(BOOKIE3, BookieInfo.builder().rack("rack0").build());
872+
group1.put(BOOKIE4, BookieInfo.builder().rack("rack1").build());
873+
bookieMapping.put("group1", group1);
874+
875+
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
876+
Optional.empty()).join();
877+
878+
IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy();
879+
ClientConfiguration bkClientConf = new ClientConfiguration();
880+
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
881+
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "group1");
882+
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
883+
NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
884+
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
885+
886+
// Use a policy class that does NOT match IsolatedBookieEnsemblePlacementPolicy.
887+
// In the old code this caused getIsolationGroup() to return a MutablePair with null left/right,
888+
// triggering NPE at the getLeft().contains() call in getExcludedBookiesWithIsolationGroups.
889+
EnsemblePlacementPolicyConfig policyConfig = new EnsemblePlacementPolicyConfig(
890+
RackawareEnsemblePlacementPolicy.class, Collections.emptyMap());
891+
Map<String, byte[]> customMetadata = new HashMap<>();
892+
customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, policyConfig.encode());
893+
894+
BookieId bookie1Id = new BookieSocketAddress(BOOKIE1).toBookieId();
895+
BookieId bookie2Id = new BookieSocketAddress(BOOKIE2).toBookieId();
896+
897+
// Must not throw NullPointerException; BKNotEnoughBookiesException is acceptable.
898+
isolationPolicy.replaceBookie(2, 2, 2, customMetadata,
899+
Arrays.asList(bookie1Id, bookie2Id), bookie2Id, null);
900+
}
901+
902+
/**
903+
* Verifies that {@link IsolatedBookieEnsemblePlacementPolicy#getIsolationGroup} treats
904+
* {@link ZkIsolatedBookieEnsemblePlacementPolicy} (a subclass) exactly like
905+
* {@link IsolatedBookieEnsemblePlacementPolicy} itself when reading isolation groups from
906+
* {@link EnsemblePlacementPolicyConfig} properties.
907+
*
908+
* <p>Legacy Pulsar clusters may have persisted {@code EnsemblePlacementPolicyConfig} entries whose
909+
* {@code policyClass} field is set to {@code ZkIsolatedBookieEnsemblePlacementPolicy}. The
910+
* {@code isAssignableFrom} check in {@code getIsolationGroup} must recognise this subclass so that
911+
* the isolation groups are read from the stored properties rather than falling back to the
912+
* policy-level defaults.
913+
*/
914+
@Test
915+
public void testGetIsolationGroupWithZkCompatiblePolicyClass() throws Exception {
916+
// Group1 → default isolation group configured on the policy.
917+
// Group2 → isolation group carried inside the custom metadata (ZkIsolated class).
918+
final String defaultGroup = "Group1";
919+
final String customGroup = "Group2";
920+
921+
Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
922+
Map<String, BookieInfo> group1 = new HashMap<>();
923+
group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
924+
group1.put(BOOKIE2, BookieInfo.builder().rack("rack0").build());
925+
Map<String, BookieInfo> group2 = new HashMap<>();
926+
group2.put(BOOKIE3, BookieInfo.builder().rack("rack1").build());
927+
group2.put(BOOKIE4, BookieInfo.builder().rack("rack1").build());
928+
bookieMapping.put(defaultGroup, group1);
929+
bookieMapping.put(customGroup, group2);
930+
931+
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
932+
Optional.empty()).join();
933+
934+
IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy();
935+
ClientConfiguration bkClientConf = new ClientConfiguration();
936+
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
937+
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, defaultGroup);
938+
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
939+
NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
940+
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
941+
942+
// --- unit-level: getIsolationGroup should parse properties, not fall back to defaults ---
943+
Map<String, Object> props = new HashMap<>();
944+
props.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, customGroup);
945+
props.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "secondaryGroup");
946+
EnsemblePlacementPolicyConfig zkConfig = new EnsemblePlacementPolicyConfig(
947+
ZkIsolatedBookieEnsemblePlacementPolicy.class, props);
948+
949+
Pair<Set<String>, Set<String>> groups = isolationPolicy.getIsolationGroup(zkConfig);
950+
assertEquals(groups.getLeft(), Sets.newHashSet(customGroup),
951+
"primary group must be read from ZkIsolated config properties");
952+
assertEquals(groups.getRight(), Sets.newHashSet("secondaryGroup"),
953+
"secondary group must be read from ZkIsolated config properties");
954+
955+
// --- integration-level: newEnsemble must select bookies from the ZkIsolated config group ---
956+
Map<String, Object> placementPolicyProperties = new HashMap<>();
957+
placementPolicyProperties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, customGroup);
958+
placementPolicyProperties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
959+
EnsemblePlacementPolicyConfig policyConfig = new EnsemblePlacementPolicyConfig(
960+
ZkIsolatedBookieEnsemblePlacementPolicy.class, placementPolicyProperties);
961+
Map<String, byte[]> customMetadata = new HashMap<>();
962+
customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, policyConfig.encode());
963+
964+
Set<BookieId> bookieIdGroup2 = new HashSet<>();
965+
bookieIdGroup2.add(new BookieSocketAddress(BOOKIE3).toBookieId());
966+
bookieIdGroup2.add(new BookieSocketAddress(BOOKIE4).toBookieId());
967+
968+
List<BookieId> ensemble = isolationPolicy
969+
.newEnsemble(2, 2, 2, customMetadata, new HashSet<>()).getResult();
970+
assertTrue(bookieIdGroup2.containsAll(ensemble),
971+
"ensemble should come from " + customGroup + " (ZkIsolated config), got " + ensemble);
972+
973+
// Sanity-check: without custom metadata the default group1 bookies are chosen.
974+
Set<BookieId> bookieIdGroup1 = new HashSet<>();
975+
bookieIdGroup1.add(new BookieSocketAddress(BOOKIE1).toBookieId());
976+
bookieIdGroup1.add(new BookieSocketAddress(BOOKIE2).toBookieId());
977+
List<BookieId> defaultEnsemble = isolationPolicy
978+
.newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>()).getResult();
979+
assertTrue(bookieIdGroup1.containsAll(defaultEnsemble),
980+
"default ensemble should come from " + defaultGroup + ", got " + defaultEnsemble);
981+
}
982+
847983
// The policy gets the bookie info asynchronously before each query or update, when putting the bookie info into
848984
// the metadata store, the cache needs some time to receive the notification and update accordingly.
849985
private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy isolationPolicy, byte[] bookieInfo) {

0 commit comments

Comments
 (0)