Skip to content

Commit cd0ab9d

Browse files
authored
[improve][metadata] Add streaming scanChildren to MetadataStore (apache#25701)
1 parent 35aa105 commit cd0ab9d

10 files changed

Lines changed: 452 additions & 0 deletions

File tree

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,33 @@ default CompletableFuture<List<GetResult>> findByIndex(
287287
new MetadataStoreException("Secondary index queries not supported by this store"));
288288
}
289289

290+
/**
291+
* Stream all direct children of {@code parentPath} together with their values.
292+
*
293+
* <p>This is the value-bearing counterpart to {@link #getChildren} — same semantics for
294+
* what counts as a "child" (one hierarchical level below {@code parentPath}, no
295+
* descendants), but each record carries the value and {@link Stat} alongside the path.
296+
* Results are delivered to {@code consumer} as they become available so callers don't
297+
* have to materialize a potentially-large list in memory.
298+
*
299+
* <p>The consumer's {@link ScanConsumer#onNext} is invoked for each child, then either
300+
* {@link ScanConsumer#onCompleted} (success) or {@link ScanConsumer#onError} (failure)
301+
* exactly once. The returned future completes when the scan terminates and mirrors the
302+
* terminal callback — callers may rely on either.
303+
*
304+
* <p>Backends with a native range-scan primitive (Oxia, RocksDB, in-memory NavigableMap)
305+
* issue a single store-side scan. Other backends fall back to {@link #getChildren} +
306+
* sequential {@link #get}, at the cost of one extra round trip per child.
307+
*
308+
* @param parentPath path whose direct children should be streamed
309+
* @param consumer callback that receives records, completion, or an error
310+
* @return a future that completes when the scan terminates
311+
*/
312+
default CompletableFuture<Void> scanChildren(String parentPath, ScanConsumer consumer) {
313+
return CompletableFuture.failedFuture(
314+
new MetadataStoreException("scanChildren not supported by this store"));
315+
}
316+
290317
/**
291318
* Returns the default metadata cache config.
292319
*
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.metadata.api;
20+
21+
/**
22+
* Streaming consumer for {@link MetadataStore#scanChildren} results.
23+
*
24+
* <p>The store invokes {@link #onNext} for each child record (in key order), and then either
25+
* {@link #onCompleted} (success) or {@link #onError} (failure) exactly once. Implementations must
26+
* be safe to invoke from a metadata-store internal thread; back-pressure is the consumer's
27+
* responsibility (long blocking work in {@code onNext} can stall the scan).
28+
*/
29+
public interface ScanConsumer {
30+
31+
/**
32+
* Called once per record. The result's {@link Stat#getPath()} carries the full key.
33+
*
34+
* @param result a child record under the requested parent path
35+
*/
36+
void onNext(GetResult result);
37+
38+
/**
39+
* Called at most once when the scan fails. After this call no further callbacks are made.
40+
*
41+
* @param throwable the cause of the failure
42+
*/
43+
void onError(Throwable throwable);
44+
45+
/**
46+
* Called at most once when the scan finishes without error. After this call no further
47+
* callbacks are made.
48+
*/
49+
void onCompleted();
50+
}

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Optional;
4242
import java.util.Set;
4343
import java.util.concurrent.CompletableFuture;
44+
import java.util.concurrent.CompletionException;
4445
import java.util.concurrent.CopyOnWriteArrayList;
4546
import java.util.concurrent.Executor;
4647
import java.util.concurrent.ExecutorService;
@@ -69,6 +70,7 @@
6970
import org.apache.pulsar.metadata.api.MetadataStoreException;
7071
import org.apache.pulsar.metadata.api.Notification;
7172
import org.apache.pulsar.metadata.api.NotificationType;
73+
import org.apache.pulsar.metadata.api.ScanConsumer;
7274
import org.apache.pulsar.metadata.api.Stat;
7375
import org.apache.pulsar.metadata.api.extended.CreateOption;
7476
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -529,6 +531,55 @@ public CompletableFuture<List<GetResult>> findByIndex(
529531
return storeFindByIndex(scanPathPrefix, indexName, secondaryKey, fallbackFilter);
530532
}
531533

534+
@Override
535+
public CompletableFuture<Void> scanChildren(String parentPath, ScanConsumer consumer) {
536+
if (isClosed()) {
537+
CompletableFuture<Void> failed = alreadyClosedFailedFuture();
538+
failed.whenComplete((__, ex) -> {
539+
if (ex != null) {
540+
consumer.onError(ex);
541+
}
542+
});
543+
return failed;
544+
}
545+
if (parentPath == null) {
546+
MetadataStoreException ex = new MetadataStoreException("parentPath must be non-null");
547+
consumer.onError(ex);
548+
return FutureUtil.failedFuture(ex);
549+
}
550+
return storeScanChildren(parentPath, consumer);
551+
}
552+
553+
/**
554+
* Backend hook for {@link #scanChildren}. The default implementation lists the parent's
555+
* children with {@link #getChildrenFromStore} and fetches each value sequentially with
556+
* {@link #storeGet}. Backends with a native range-scan primitive (Oxia, RocksDB,
557+
* in-memory NavigableMap) override this method for a single store-side scan.
558+
*/
559+
protected CompletableFuture<Void> storeScanChildren(String parentPath, ScanConsumer consumer) {
560+
CompletableFuture<Void> result = new CompletableFuture<>();
561+
getChildrenFromStore(parentPath).thenCompose(children -> {
562+
CompletableFuture<Void> chain = CompletableFuture.completedFuture(null);
563+
for (String child : children) {
564+
String childPath = parentPath.equals("/") ? "/" + child : parentPath + "/" + child;
565+
chain = chain.thenCompose(__ -> storeGet(childPath))
566+
.thenAccept(opt -> opt.ifPresent(consumer::onNext));
567+
}
568+
return chain;
569+
}).whenComplete((v, ex) -> {
570+
if (ex != null) {
571+
Throwable cause = ex instanceof CompletionException && ex.getCause() != null
572+
? ex.getCause() : ex;
573+
consumer.onError(cause);
574+
result.completeExceptionally(cause);
575+
} else {
576+
consumer.onCompleted();
577+
result.complete(null);
578+
}
579+
});
580+
return result;
581+
}
582+
532583
protected CompletableFuture<List<GetResult>> storeFindByIndex(
533584
String scanPathPrefix, String indexName, String secondaryKey,
534585
Predicate<GetResult> fallbackFilter) {

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.pulsar.metadata.api.MetadataStoreException;
5252
import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
5353
import org.apache.pulsar.metadata.api.Notification;
54+
import org.apache.pulsar.metadata.api.ScanConsumer;
5455
import org.apache.pulsar.metadata.api.Stat;
5556
import org.apache.pulsar.metadata.api.extended.CreateOption;
5657
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -309,6 +310,16 @@ public CompletableFuture<List<GetResult>> findByIndex(
309310
};
310311
}
311312

313+
@Override
314+
public CompletableFuture<Void> scanChildren(String parentPath, ScanConsumer consumer) {
315+
return switch (migrationState.getPhase()) {
316+
case NOT_STARTED, PREPARATION, COPYING, FAILED ->
317+
sourceStore.scanChildren(parentPath, consumer);
318+
case COMPLETED ->
319+
targetStore.scanChildren(parentPath, consumer);
320+
};
321+
}
322+
312323
@Override
313324
public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> expectedVersion,
314325
EnumSet<CreateOption> options, Map<String, String> secondaryIndexes) {

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.pulsar.metadata.api.MetadataSerde;
3939
import org.apache.pulsar.metadata.api.MetadataStoreException;
4040
import org.apache.pulsar.metadata.api.Notification;
41+
import org.apache.pulsar.metadata.api.ScanConsumer;
4142
import org.apache.pulsar.metadata.api.Stat;
4243
import org.apache.pulsar.metadata.api.extended.CreateOption;
4344
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -60,6 +61,7 @@ public enum OperationType {
6061
EXISTS,
6162
PUT,
6263
DELETE,
64+
SCAN_CHILDREN,
6365
}
6466

6567
@Data
@@ -155,6 +157,17 @@ public CompletableFuture<Void> deleteRecursive(String path) {
155157
return store.deleteRecursive(path);
156158
}
157159

160+
@Override
161+
public CompletableFuture<Void> scanChildren(String parentPath, ScanConsumer consumer) {
162+
Optional<MetadataStoreException> ex = programmedFailure(OperationType.SCAN_CHILDREN, parentPath);
163+
if (ex.isPresent()) {
164+
consumer.onError(ex.get());
165+
return FutureUtil.failedFuture(ex.get());
166+
}
167+
168+
return store.scanChildren(parentPath, consumer);
169+
}
170+
158171
@Override
159172
public void registerListener(Consumer<Notification> listener) {
160173
store.registerListener(listener);

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.pulsar.metadata.api.MetadataStoreProvider;
4646
import org.apache.pulsar.metadata.api.Notification;
4747
import org.apache.pulsar.metadata.api.NotificationType;
48+
import org.apache.pulsar.metadata.api.ScanConsumer;
4849
import org.apache.pulsar.metadata.api.Stat;
4950
import org.apache.pulsar.metadata.api.extended.CreateOption;
5051
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
@@ -121,6 +122,39 @@ public CompletableFuture<Optional<GetResult>> storeGet(String path) {
121122
}
122123
}
123124

125+
@Override
126+
protected CompletableFuture<Void> storeScanChildren(String parentPath, ScanConsumer consumer) {
127+
// Snapshot the immediate children under the lock, then dispatch outside it so a slow
128+
// consumer can't stall other store operations.
129+
List<GetResult> snapshot = new ArrayList<>();
130+
synchronized (map) {
131+
String firstKey = parentPath.equals("/") ? "/" : parentPath + "/";
132+
String lastKey = parentPath.equals("/") ? "0" : parentPath + "0";
133+
map.subMap(firstKey, false, lastKey, false).forEach((key, value) -> {
134+
// Filter to direct children only — paths with no further "/" beyond the
135+
// parent's level. Same scoping `getChildrenFromStore` applies.
136+
int relStart = firstKey.length();
137+
if (key.indexOf('/', relStart) >= 0) {
138+
return;
139+
}
140+
snapshot.add(new GetResult(
141+
value.data,
142+
new Stat(key, value.version, value.createdTimestamp, value.modifiedTimestamp,
143+
value.isEphemeral(), true)));
144+
});
145+
}
146+
try {
147+
for (GetResult r : snapshot) {
148+
consumer.onNext(r);
149+
}
150+
consumer.onCompleted();
151+
return CompletableFuture.completedFuture(null);
152+
} catch (Throwable t) {
153+
consumer.onError(t);
154+
return FutureUtil.failedFuture(t);
155+
}
156+
}
157+
124158
@Override
125159
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
126160
if (!isValidPath(path)) {

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.pulsar.metadata.api.MetadataStoreProvider;
5454
import org.apache.pulsar.metadata.api.Notification;
5555
import org.apache.pulsar.metadata.api.NotificationType;
56+
import org.apache.pulsar.metadata.api.ScanConsumer;
5657
import org.apache.pulsar.metadata.api.Stat;
5758
import org.apache.pulsar.metadata.api.extended.CreateOption;
5859
import org.rocksdb.ColumnFamilyDescriptor;
@@ -406,6 +407,85 @@ public CompletableFuture<Optional<GetResult>> storeGet(String path) {
406407
}
407408
}
408409

410+
@Override
411+
protected CompletableFuture<Void> storeScanChildren(String parentPath, ScanConsumer consumer) {
412+
// Native iterator-based scan over the parent's key range, with the same direct-child
413+
// filter getChildrenFromStore applies. Snapshot under the read lock then dispatch
414+
// outside it.
415+
List<GetResult> snapshot = new ArrayList<>();
416+
try {
417+
dbStateLock.readLock().lock();
418+
if (isClosed()) {
419+
CompletableFuture<Void> failed = alreadyClosedFailedFuture();
420+
failed.whenComplete((__, ex) -> {
421+
if (ex != null) {
422+
consumer.onError(ex);
423+
}
424+
});
425+
return failed;
426+
}
427+
String firstKey = parentPath.equals("/") ? "/" : parentPath + "/";
428+
String lastKey = parentPath.equals("/") ? "0" : parentPath + "0";
429+
byte[] endBytes = toBytes(lastKey);
430+
try (RocksIterator iterator = db.newIterator(optionDontCache)) {
431+
for (iterator.seek(toBytes(firstKey)); iterator.isValid(); iterator.next()) {
432+
byte[] keyBytes = iterator.key();
433+
if (compareUnsigned(keyBytes, endBytes) >= 0) {
434+
break;
435+
}
436+
String currentPath = toString(keyBytes);
437+
// Direct children only.
438+
if (currentPath.indexOf('/', firstKey.length()) >= 0) {
439+
continue;
440+
}
441+
byte[] value = iterator.value();
442+
if (value == null) {
443+
continue;
444+
}
445+
MetaValue metaValue = MetaValue.parse(value);
446+
if (metaValue.ephemeral && metaValue.owner != instanceId) {
447+
// Ephemeral record left behind by a different session; skip.
448+
continue;
449+
}
450+
snapshot.add(new GetResult(metaValue.getData(),
451+
new Stat(currentPath,
452+
metaValue.getVersion(),
453+
metaValue.getCreatedTimestamp(),
454+
metaValue.getModifiedTimestamp(),
455+
metaValue.ephemeral,
456+
metaValue.getOwner() == instanceId)));
457+
}
458+
}
459+
} catch (Throwable e) {
460+
MetadataStoreException ex = MetadataStoreException.wrap(e);
461+
consumer.onError(ex);
462+
return FutureUtil.failedFuture(ex);
463+
} finally {
464+
dbStateLock.readLock().unlock();
465+
}
466+
try {
467+
for (GetResult r : snapshot) {
468+
consumer.onNext(r);
469+
}
470+
consumer.onCompleted();
471+
return CompletableFuture.completedFuture(null);
472+
} catch (Throwable t) {
473+
consumer.onError(t);
474+
return FutureUtil.failedFuture(t);
475+
}
476+
}
477+
478+
private static int compareUnsigned(byte[] a, byte[] b) {
479+
int len = Math.min(a.length, b.length);
480+
for (int i = 0; i < len; i++) {
481+
int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
482+
if (diff != 0) {
483+
return diff;
484+
}
485+
}
486+
return a.length - b.length;
487+
}
488+
409489
@Override
410490
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
411491
log.debug().attr("path", path).attr("instanceId", instanceId).log("getChildrenFromStore");

0 commit comments

Comments
 (0)