Skip to content

Commit 9aec03a

Browse files
committed
Merge branch 'master' into bewaremypower/improve-topic-name-parse
2 parents 355140a + d95cf3f commit 9aec03a

129 files changed

Lines changed: 5954 additions & 3489 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

bin/proto/MLDataFormats_pb2.py

Lines changed: 51 additions & 330 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

buildtools/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
<log4j2.version>2.23.1</log4j2.version>
5252
<slf4j.version>2.0.13</slf4j.version>
5353
<testng.version>7.7.1</testng.version>
54-
<commons-lang3.version>3.17.0</commons-lang3.version>
54+
<commons-lang3.version>3.18.0</commons-lang3.version>
5555
<license-maven-plugin.version>4.1</license-maven-plugin.version>
5656
<puppycrawl.checkstyle.version>10.14.2</puppycrawl.checkstyle.version>
5757
<maven-checkstyle-plugin.version>3.1.2</maven-checkstyle-plugin.version>

conf/broker.conf

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,16 @@ enableBrokerSideSubscriptionPatternEvaluation=true
733733
# Longer patterns are rejected to avoid patterns that are crafted to overload the broker.
734734
subscriptionPatternMaxLength=50
735735

736+
# The regular expression implementation to use for topic pattern matching.
737+
# RE2J_WITH_JDK_FALLBACK is the default. It uses the RE2J implementation and falls back to
738+
# the JDK implementation for backwards compatibility reasons when the pattern compilation fails
739+
# with the RE2/j library.
740+
# RE2J is more performant but does not support all regex features (e.g. negative lookaheads).
741+
# JDK uses the standard Java regex implementation which supports all features but can be slower.
742+
# Bad or malicious regex patterns requiring extensive backtracing could cause high resource usage
743+
# with RE2J_WITH_JDK_FALLBACK or JDK implementations.
744+
topicsPatternRegexImplementation=RE2J_WITH_JDK_FALLBACK
745+
736746
### --- Authentication --- ###
737747
# Role names that are treated as "proxy roles". If the broker sees a request with
738748
#role as proxyRoles - it will demand to see a valid original principal.

conf/standalone.conf

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,16 @@ enableBrokerSideSubscriptionPatternEvaluation=true
386386
# Longer patterns are rejected to avoid patterns that are crafted to overload the broker.
387387
subscriptionPatternMaxLength=50
388388

389+
# The regular expression implementation to use for topic pattern matching.
390+
# RE2J_WITH_JDK_FALLBACK is the default. It uses the RE2J implementation and falls back to
391+
# the JDK implementation for backwards compatibility reasons when the pattern compilation fails
392+
# with the RE2/j library.
393+
# RE2J is more performant but does not support all regex features (e.g. negative lookaheads).
394+
# JDK uses the standard Java regex implementation which supports all features but can be slower.
395+
# Bad or malicious regex patterns requiring extensive backtracing could cause high resource usage
396+
# with RE2J_WITH_JDK_FALLBACK or JDK implementations.
397+
topicsPatternRegexImplementation=RE2J_WITH_JDK_FALLBACK
398+
389399
### --- Metadata Store --- ###
390400

391401
# Whether we should enable metadata operations batching

distribution/server/src/assemble/LICENSE.bin.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ The Apache Software License, Version 2.0
260260
- com.fasterxml.jackson.datatype-jackson-datatype-jdk8-2.17.2.jar
261261
- com.fasterxml.jackson.datatype-jackson-datatype-jsr310-2.17.2.jar
262262
- com.fasterxml.jackson.module-jackson-module-parameter-names-2.17.2.jar
263-
* Caffeine -- com.github.ben-manes.caffeine-caffeine-2.9.1.jar
263+
* Caffeine -- com.github.ben-manes.caffeine-caffeine-3.2.1.jar
264264
* Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar
265265
* Fastutil -- it.unimi.dsi-fastutil-8.5.14.jar
266266
* Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.51.0.jar
@@ -293,7 +293,7 @@ The Apache Software License, Version 2.0
293293
- org.apache.commons-commons-collections4-4.4.jar
294294
- org.apache.commons-commons-compress-1.27.1.jar
295295
- org.apache.commons-commons-configuration2-2.12.0.jar
296-
- org.apache.commons-commons-lang3-3.17.0.jar
296+
- org.apache.commons-commons-lang3-3.18.0.jar
297297
- org.apache.commons-commons-text-1.13.1.jar
298298
* Netty
299299
- io.netty-netty-buffer-4.1.122.Final.jar
@@ -512,7 +512,7 @@ The Apache Software License, Version 2.0
512512
- com.google.http-client-google-http-client-gson-1.41.0.jar
513513
- com.google.http-client-google-http-client-1.41.0.jar
514514
- com.google.auto.value-auto-value-annotations-1.11.0.jar
515-
- com.google.re2j-re2j-1.7.jar
515+
- com.google.re2j-re2j-1.8.jar
516516
* Jetcd - shaded
517517
* IPAddress
518518
- com.github.seancfoley-ipaddress-5.5.0.jar

distribution/shell/src/assemble/LICENSE.bin.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ The Apache Software License, Version 2.0
324324
- jackson-datatype-jdk8-2.17.2.jar
325325
- jackson-datatype-jsr310-2.17.2.jar
326326
- jackson-module-parameter-names-2.17.2.jar
327-
* Caffeine -- caffeine-2.9.1.jar
327+
* Caffeine -- caffeine-3.2.1.jar
328328
* simpleclient_caffeine-0.16.0.jar
329329
* Conscrypt -- conscrypt-openjdk-uber-2.5.2.jar
330330
* Gson
@@ -343,7 +343,7 @@ The Apache Software License, Version 2.0
343343
- commons-codec-1.18.0.jar
344344
- commons-io-2.19.0.jar
345345
- commons-logging-1.2.jar
346-
- commons-lang3-3.17.0.jar
346+
- commons-lang3-3.18.0.jar
347347
- commons-text-1.13.1.jar
348348
- commons-compress-1.27.1.jar
349349
- commons-beanutils-1.11.0.jar
@@ -419,7 +419,7 @@ The Apache Software License, Version 2.0
419419
* Apache Avro
420420
- avro-1.11.4.jar
421421
- avro-protobuf-1.11.4.jar
422-
* RE2j -- re2j-1.7.jar
422+
* RE2j -- re2j-1.8.jar
423423
* Spotify completable-futures -- completable-futures-0.3.6.jar
424424
* RoaringBitmap -- RoaringBitmap-1.2.0.jar
425425
* Fastutil -- fastutil-8.5.14.jar

docker/pulsar/Dockerfile

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ RUN apk add --no-cache \
8888
bash \
8989
python3 \
9090
py3-pip \
91-
py3-grpcio \
9291
py3-yaml \
9392
gcompat \
9493
libgcc \
@@ -105,9 +104,15 @@ RUN apk add --no-cache \
105104
RUN apk upgrade --no-cache
106105

107106
# Python dependencies
107+
# The pinned grpcio and protobuf versions should be compatible with the generated Protobuf and gRPC stubs used
108+
# in Pulsar Functions Python runtime. You should also update the grpcio version in src/update_python_protobuf_stubs.sh
109+
# and regenerate the Python stubs if you change the grpcio version here. Please see
110+
# pulsar-functions/instance/src/main/python/README.md for more details.
108111
ARG PULSAR_CLIENT_PYTHON_VERSION
109112
RUN pip3 install --break-system-packages --no-cache-dir \
110-
--only-binary grpcio \
113+
--only-binary \
114+
grpcio==1.73.1 \
115+
protobuf==6.31.1 \
111116
pulsar-client[all]==${PULSAR_CLIENT_PYTHON_VERSION} \
112117
kazoo
113118

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger;
20+
21+
import io.netty.buffer.ByteBuf;
22+
import org.apache.pulsar.common.classification.InterfaceAudience;
23+
import org.apache.pulsar.common.classification.InterfaceStability;
24+
25+
@InterfaceAudience.LimitedPrivate
26+
@InterfaceStability.Evolving
27+
public interface EntryProcessor {
28+
29+
void process(Position position, ByteBuf buffer);
30+
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,23 @@ void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, L
420420
*/
421421
long getOffloadedSize();
422422

423+
/**
424+
* Resets the exception thrown by the PayloadProcessor during an add entry operation to null.
425+
* <p>
426+
* **Context:** When an add entry operation fails due to an interceptor, all subsequent incoming add entry
427+
* operations will also fail. This behavior ensures message ordering and consistency.
428+
* <p>
429+
* **Important:** This method MUST only be called after all pending add operations are fully completed
430+
* (e.g., after a Topic is unfenced). Calling it prematurely will prevent the Managed Ledger (ML)
431+
* from being able to write indefinitely.
432+
* <p>
433+
* **Implementation Note:** Downstream projects that support the ML PayloadProcessor should implement
434+
* this method. Otherwise, do not implement it.
435+
*/
436+
default void unfenceForInterceptorException() {
437+
// Default implementation does nothing
438+
}
439+
423440
/**
424441
* Get last offloaded ledgerId. If no offloaded yet, it returns 0.
425442
*
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.mledger;
20+
21+
import static org.apache.bookkeeper.mledger.util.ManagedLedgerUtils.readEntries;
22+
import java.util.Optional;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.Executor;
25+
import lombok.Getter;
26+
import lombok.RequiredArgsConstructor;
27+
import lombok.extern.slf4j.Slf4j;
28+
import org.jspecify.annotations.Nullable;
29+
30+
/**
31+
* The task to perform replay on the whole managed ledger from a given position.
32+
*/
33+
@RequiredArgsConstructor
34+
@Slf4j
35+
public class ManagedLedgerReplayTask {
36+
37+
private final String name;
38+
private final Executor executor; // run user-provided processor on entry
39+
private final int maxEntriesPerRead;
40+
@Getter // NOTE: the getter must be called in the callback of `replay` for thread safety
41+
private int numEntriesProcessed = 0;
42+
43+
/**
44+
* This method will read entries from `cursor` until the last confirmed entry. `processor` will be applied on each
45+
* entry.
46+
*
47+
* @param cursor the managed cursor to read entries
48+
* @param processor the user-provided processor that accepts the position and data buffer of the entry
49+
* @return the future of the optional last position processed:
50+
* 1. If there is no more entry to read, return an empty optional.
51+
* 2. Otherwise, if no exception was thrown, it will always be the position of the last entry.
52+
* 3. If any exception is thrown from {@link EntryProcessor#process}, it will be the position of the last
53+
* entry that has been processed successfully.
54+
* 4. If an unexpected exception is thrown, the future will complete exceptionally.
55+
* @apiNote The implementation of `processor` should not call `release()` on the buffer because this method will
56+
* eventually release the buffer after it's processed.
57+
*/
58+
public CompletableFuture<Optional<Position>> replay(ManagedCursor cursor, EntryProcessor processor) {
59+
try {
60+
numEntriesProcessed = 0;
61+
cursor.setAlwaysInactive(); // don't cache the replayed entries
62+
if (!cursor.hasMoreEntries()) {
63+
return CompletableFuture.completedFuture(Optional.empty());
64+
}
65+
return readAndProcess(cursor, null, processor);
66+
} catch (Throwable throwable) {
67+
return CompletableFuture.failedFuture(throwable);
68+
}
69+
}
70+
71+
private CompletableFuture<Optional<Position>> readAndProcess(
72+
ManagedCursor cursor, @Nullable Position lastProcessedPosition, EntryProcessor processor) {
73+
return readEntries(cursor, maxEntriesPerRead, PositionFactory.LATEST).thenComposeAsync(entries -> {
74+
try {
75+
Position processedPosition = lastProcessedPosition;
76+
for (final var entry : entries) {
77+
final var position = entry.getPosition();
78+
final var buffer = entry.getDataBuffer();
79+
// Pass a duplicated buffer to `processor` in case the buffer is retained and stored somewhere else
80+
// and then process all buffers in batch.
81+
try {
82+
processor.process(position, buffer);
83+
} catch (Throwable throwable) {
84+
log.error("[{}] Failed to process entry {}", name, position, throwable);
85+
return CompletableFuture.completedFuture(Optional.ofNullable(processedPosition));
86+
}
87+
// It does not need to be atomic because the update happens before the future completes
88+
numEntriesProcessed++;
89+
processedPosition = position;
90+
}
91+
if (cursor.hasMoreEntries()) {
92+
return readAndProcess(cursor, processedPosition, processor);
93+
} else {
94+
return CompletableFuture.completedFuture(Optional.ofNullable(processedPosition));
95+
}
96+
} finally {
97+
entries.forEach(Entry::release);
98+
}
99+
}, executor);
100+
}
101+
}

0 commit comments

Comments
 (0)