Skip to content

Commit 6d415c6

Browse files
[improve][meta] PIP-453: Improve the metadata store threading model (#25187)
1 parent 5aab2f0 commit 6d415c6

20 files changed

Lines changed: 316 additions & 232 deletions

File tree

conf/broker.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -954,6 +954,8 @@ metadataStoreBatchingMaxOperations=1000
954954
# Maximum size of a batch
955955
metadataStoreBatchingMaxSizeKb=128
956956

957+
# The number of threads used for serializing and deserializing data to and from the metadata store
958+
metadataStoreSerDesThreads=1
957959

958960
### --- Authentication --- ###
959961

conf/standalone.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,9 @@ metadataStoreBatchingMaxOperations=1000
430430
# Maximum size of a batch
431431
metadataStoreBatchingMaxSizeKb=128
432432

433+
# The number of threads used for serializing and deserializing data to and from the metadata store
434+
metadataStoreSerDesThreads=1
435+
433436
### --- TLS --- ###
434437
# Deprecated - Use webServicePortTls and brokerServicePortTls instead
435438
tlsEnabled=false

pip/pip-453.md

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ Additionally, some code paths execute the compute intensive tasks in the metadat
4040

4141
# High Level Design
4242

43-
Create 3 set of threads:
43+
Create 4 sets of threads:
4444
- `<name>-event`: the original metadata store thread, which is now only responsible to handle notifications. This executor won't be a `ScheduledExecutorService` anymore.
45+
- `<name>-scheduler`: a single thread, which is used to schedule tasks like flushing and retrying failed operations.
4546
- `<name>-batch-flusher`: a single thread, which is used to schedule the flushing task at a fixed rate. It won't be created if `metadataStoreBatchingEnabled` is false.
4647
- `<name>-worker`: a fixed thread pool shared by all `MetadataCache` instances to execute compute intensive tasks like serialization and deserialization. The same path will be handled by the same thread to keep the processing order on the same path.
4748

@@ -53,25 +54,6 @@ The only concern is that introducing a new thread to execute callbacks allows wa
5354
metadataStore.get(path).thenApply(__ -> metadataStore.get(otherPath).join());;
5455
```
5556

56-
Other tasks like the retry on failure is executed in JVM's common `ForkJoinPool` by `CompletableFuture` APIs. For example:
57-
58-
```diff
59-
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
60-
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
61-
@@ -245,9 +245,8 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
62-
countsByType, totalSize, opsForLog);
63-
64-
// Retry with the individual operations
65-
- executor.schedule(() -> {
66-
- ops.forEach(o -> batchOperation(Collections.singletonList(o)));
67-
- }, 100, TimeUnit.MILLISECONDS);
68-
+ CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS).execute(() ->
69-
+ ops.forEach(o -> batchOperation(Collections.singletonList(o))));
70-
} else {
71-
MetadataStoreException e = getException(code, path);
72-
ops.forEach(o -> o.getFuture().completeExceptionally(e));
73-
```
74-
7557
# Detailed Design
7658

7759
## Public-facing Changes
@@ -85,9 +67,11 @@ Add a configurations to specify the number of worker threads for `MetadataCache`
8567
category = CATEGORY_SERVER,
8668
doc = "The number of threads uses for serializing and deserializing data to and from the metadata store"
8769
)
88-
private int metadataStoreSerDesThreads = Runtime.getRuntime().availableProcessors();
70+
private int metadataStoreSerDesThreads = 1;
8971
```
9072

73+
Use 1 as the default value since the serialization and deserialization tasks are not frequent. This separated thread pool is mainly added to avoid blocking the metadata store callback thread.
74+
9175
### Metrics
9276

9377
The `pulsar_batch_metadata_store_executor_queue_size` metric will be removed because the `<name>-batch-flusher` thread won't execute other tasks except for flushing.

pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map;
2929
import java.util.Optional;
3030
import java.util.Set;
31+
import lombok.Getter;
3132
import lombok.extern.slf4j.Slf4j;
3233
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
3334
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
@@ -57,6 +58,8 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac
5758
// the secondary group.
5859
private ImmutablePair<Set<String>, Set<String>> defaultIsolationGroups;
5960

61+
@Getter
62+
@VisibleForTesting
6063
private MetadataCache<BookiesRackConfiguration> bookieMappingCache;
6164

6265
private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*";

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
492492
)
493493
private boolean metadataStoreAllowReadOnlyOperations;
494494

495+
@FieldContext(
496+
category = CATEGORY_SERVER,
497+
doc = "The number of threads used for serializing and deserializing data to and from the metadata store"
498+
)
499+
private int metadataStoreSerDesThreads = 1;
500+
495501
@Deprecated
496502
@FieldContext(
497503
category = CATEGORY_SERVER,

pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
import static org.mockito.Mockito.when;
2323
import static org.testng.Assert.assertEquals;
2424
import static org.testng.Assert.assertFalse;
25+
import static org.testng.Assert.assertNotEquals;
26+
import static org.testng.Assert.assertNotNull;
2527
import static org.testng.Assert.assertTrue;
2628
import static org.testng.Assert.fail;
2729
import com.fasterxml.jackson.databind.ObjectMapper;
2830
import com.google.common.collect.Sets;
2931
import io.netty.util.HashedWheelTimer;
3032
import java.nio.charset.StandardCharsets;
33+
import java.time.Duration;
3134
import java.util.ArrayList;
3235
import java.util.Arrays;
3336
import java.util.Collections;
@@ -288,8 +291,7 @@ public void testBasic() throws Exception {
288291
secondaryBookieGroup.put(BOOKIE4, BookieInfo.builder().rack("rack0").build());
289292
bookieMapping.put("group2", secondaryBookieGroup);
290293

291-
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
292-
Optional.empty()).join();
294+
updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping));
293295

294296
ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(),
295297
null).getResult();
@@ -340,8 +342,7 @@ public void testNoBookieInfo() throws Exception {
340342
+ "\": {\"rack\": \"rack0\", \"hostname\": \"bookie3.example.com\"}, \"" + BOOKIE4
341343
+ "\": {\"rack\": \"rack2\", \"hostname\": \"bookie4.example.com\"}}}";
342344

343-
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(StandardCharsets.UTF_8),
344-
Optional.empty()).join();
345+
updateBookieInfo(isolationPolicy, data.getBytes(StandardCharsets.UTF_8));
345346

346347
List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(),
347348
new HashSet<>()).getResult();
@@ -399,8 +400,7 @@ public void testBookieInfoChange() throws Exception {
399400
bookieMapping.put("group1", mainBookieGroup);
400401
bookieMapping.put("group2", secondaryBookieGroup);
401402

402-
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
403-
Optional.empty()).join();
403+
updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping));
404404

405405
ensemble = isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(),
406406
new HashSet<>()).getResult();
@@ -784,8 +784,7 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception {
784784
bookieMapping.put(isolationGroup2, group2);
785785
bookieMapping.put(isolationGroup3, group3);
786786

787-
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
788-
Optional.empty()).join();
787+
updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping));
789788

790789
groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3));
791790
groups.setRight(Sets.newHashSet(""));
@@ -808,8 +807,7 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception {
808807
bookieMapping.put(isolationGroup1, group1);
809808
bookieMapping.put(isolationGroup2, group2);
810809

811-
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
812-
Optional.empty()).join();
810+
updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping));
813811

814812
groups.setLeft(Sets.newHashSet(isolationGroup1));
815813
groups.setRight(Sets.newHashSet(isolationGroup2));
@@ -831,12 +829,24 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception {
831829
bookieMapping.put(isolationGroup1, group1);
832830
bookieMapping.put(isolationGroup2, group2);
833831

834-
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
835-
Optional.empty()).join();
832+
updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping));
836833

837834
groups.setLeft(Sets.newHashSet(isolationGroup1));
838835
groups.setRight(Sets.newHashSet(isolationGroup2));
839836
blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
840837
assertTrue(blacklist.isEmpty());
841838
}
839+
840+
// The policy gets the bookie info asynchronously before each query or update, when putting the bookie info into
841+
// the metadata store, the cache needs some time to receive the notification and update accordingly.
842+
private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy isolationPolicy, byte[] bookieInfo) {
843+
final var cache = isolationPolicy.getBookieMappingCache();
844+
assertNotNull(cache); // the policy must have been initialized
845+
846+
final var key = BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH;
847+
final var previousBookieInfo = cache.getIfCached(key);
848+
store.put(key, bookieInfo, Optional.empty()).join();
849+
Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() ->
850+
assertNotEquals(cache.getIfCached(key), previousBookieInfo));
851+
}
842852
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro
434434
.synchronizer(synchronizer)
435435
.openTelemetry(openTelemetry)
436436
.nodeSizeStats(new DefaultMetadataNodeSizeStats())
437+
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
437438
.build());
438439
}
439440

@@ -1328,6 +1329,7 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro
13281329
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
13291330
.openTelemetry(openTelemetry)
13301331
.nodeSizeStats(new DefaultMetadataNodeSizeStats())
1332+
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
13311333
.build());
13321334
}
13331335

pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,15 @@
2525
import static org.testng.Assert.assertTrue;
2626
import static org.testng.Assert.fail;
2727
import static org.testng.AssertJUnit.assertSame;
28+
import java.io.IOException;
2829
import java.util.ArrayList;
2930
import java.util.List;
31+
import java.util.Map;
3032
import java.util.Optional;
33+
import java.util.Set;
34+
import java.util.concurrent.ConcurrentHashMap;
3135
import java.util.concurrent.TimeUnit;
36+
import java.util.function.BiConsumer;
3237
import lombok.Cleanup;
3338
import lombok.extern.slf4j.Slf4j;
3439
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -38,6 +43,10 @@
3843
import org.apache.pulsar.common.naming.TopicName;
3944
import org.apache.pulsar.functions.worker.WorkerConfig;
4045
import org.apache.pulsar.functions.worker.WorkerService;
46+
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
47+
import org.apache.pulsar.metadata.api.MetadataSerde;
48+
import org.apache.pulsar.metadata.api.MetadataStore;
49+
import org.apache.pulsar.metadata.api.Stat;
4150
import org.testng.annotations.AfterMethod;
4251
import org.testng.annotations.Test;
4352

@@ -339,4 +348,60 @@ public void testShutdownViaAdminApi() throws Exception {
339348
assertTrue(e instanceof PulsarClientException.TimeoutException);
340349
}
341350
}
351+
352+
@Test
353+
public void testMetadataSerDesThreads() throws Exception {
354+
final var numSerDesThreads = 5;
355+
final var config = new ServiceConfiguration();
356+
config.setMetadataStoreSerDesThreads(numSerDesThreads);
357+
config.setClusterName("test");
358+
config.setMetadataStoreUrl("memory:local");
359+
config.setConfigurationMetadataStoreUrl("memory:local");
360+
361+
@Cleanup final var pulsar = new PulsarService(config);
362+
pulsar.start();
363+
364+
BiConsumer<MetadataStore, String> verifier = (store, prefix) -> {
365+
final var serDes = new CustomMetadataSerDes();
366+
final var cache = store.getMetadataCache(prefix, serDes, MetadataCacheConfig.builder().build());
367+
for (int i = 0; i < 100 && serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) {
368+
cache.create(prefix + i, "value-" + i).join();
369+
final var value = cache.get(prefix + i).join();
370+
assertEquals(value.orElseThrow(), "value-" + i);
371+
final var newValue = cache.readModifyUpdate(prefix + i, s -> s + "-updated").join();
372+
assertEquals(newValue, "value-" + i + "-updated");
373+
// Verify the serialization and deserialization are handled by the same thread
374+
assertEquals(serDes.threadNameToSerializedPaths, serDes.threadNameToDeserializedPaths);
375+
}
376+
log.info("SerDes thread mapping: {}", serDes.threadNameToSerializedPaths);
377+
assertEquals(serDes.threadNameToSerializedPaths.keySet().size(), numSerDesThreads);
378+
// Verify a path cannot be handled by multiple threads
379+
final var paths = serDes.threadNameToSerializedPaths.values().stream()
380+
.flatMap(Set::stream).sorted().toList();
381+
assertEquals(paths.stream().distinct().toList(), paths);
382+
};
383+
384+
verifier.accept(pulsar.getLocalMetadataStore(), "/test-local/");
385+
verifier.accept(pulsar.getConfigurationMetadataStore(), "/test-config/");
386+
}
387+
388+
private static class CustomMetadataSerDes implements MetadataSerde<String> {
389+
390+
final Map<String, Set<String>> threadNameToSerializedPaths = new ConcurrentHashMap<>();
391+
final Map<String, Set<String>> threadNameToDeserializedPaths = new ConcurrentHashMap<>();
392+
393+
@Override
394+
public byte[] serialize(String path, String value) throws IOException{
395+
threadNameToSerializedPaths.computeIfAbsent(Thread.currentThread().getName(),
396+
__ -> ConcurrentHashMap.newKeySet()).add(path);
397+
return value.getBytes();
398+
}
399+
400+
@Override
401+
public String deserialize(String path, byte[] data, Stat stat) throws IOException {
402+
threadNameToDeserializedPaths.computeIfAbsent(Thread.currentThread().getName(),
403+
__ -> ConcurrentHashMap.newKeySet()).add(path);
404+
return new String(data);
405+
}
406+
}
342407
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryMetadataStoreStatsTest.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@
2121
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
2222
import static org.assertj.core.api.Assertions.assertThat;
2323
import io.opentelemetry.api.common.Attributes;
24-
import java.util.concurrent.ExecutorService;
2524
import lombok.Cleanup;
2625
import org.apache.commons.lang3.reflect.FieldUtils;
2726
import org.apache.pulsar.broker.BrokerTestUtil;
2827
import org.apache.pulsar.broker.service.BrokerTestBase;
2928
import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler;
3029
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
3130
import org.apache.pulsar.metadata.api.MetadataStore;
32-
import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
3331
import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
3432
import org.testng.annotations.AfterMethod;
3533
import org.testng.annotations.BeforeMethod;
@@ -53,14 +51,6 @@ protected void setup() throws Exception {
5351
var newStats = new MetadataStoreStats(
5452
localMetadataStoreName, pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
5553
FieldUtils.writeField(localMetadataStore, "metadataStoreStats", newStats, true);
56-
57-
var currentBatchedStats = (BatchMetadataStoreStats) FieldUtils.readField(localMetadataStore,
58-
"batchMetadataStoreStats", true);
59-
currentBatchedStats.close();
60-
var currentExecutor = (ExecutorService) FieldUtils.readField(currentBatchedStats, "executor", true);
61-
var newBatchedStats = new BatchMetadataStoreStats(localMetadataStoreName, currentExecutor,
62-
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
63-
FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats", newBatchedStats, true);
6454
}
6555

6656
@AfterMethod(alwaysRun = true)
@@ -89,7 +79,5 @@ public void testMetadataStoreStats() throws Exception {
8979
var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
9080
assertMetricLongSumValue(metrics, MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME,
9181
attributes, value -> assertThat(value).isPositive());
92-
assertMetricLongSumValue(metrics, BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes,
93-
value -> assertThat(value).isPositive());
9482
}
9583
}

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,7 @@ public class MetadataStoreConfig {
104104
* The estimator to estimate the payload length of metadata node, which used to limit the batch size requested.
105105
*/
106106
private MetadataNodeSizeStats nodeSizeStats;
107+
108+
@Builder.Default
109+
private final int numSerDesThreads = 1;
107110
}

0 commit comments

Comments
 (0)