Skip to content

Commit 6ce15e1

Browse files
authored
[feat] PIP-466: Add V5 client API for scalable topics (#25489)
1 parent 9e4e7e1 commit 6ce15e1

57 files changed

Lines changed: 5048 additions & 0 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
plugins {
21+
id("pulsar.java-conventions")
22+
}
23+
24+
dependencies {
25+
compileOnly(libs.protobuf.java)
26+
compileOnly(libs.opentelemetry.api)
27+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api.v5;
20+
21+
import java.io.IOException;
22+
import java.time.Instant;
23+
import org.apache.pulsar.client.api.v5.internal.PulsarClientProvider;
24+
25+
/**
26+
* An opaque, serializable position vector representing a consistent point across all
27+
* internal hash-range segments of a topic.
28+
*
29+
* <p>Checkpoints are created via {@link CheckpointConsumer#checkpoint()} and can be
30+
* serialized for external storage (e.g. Flink state, S3) using {@link #toByteArray()}.
31+
*
32+
* <p>This is the sole position type used with {@link CheckpointConsumer} — for initial
33+
* positioning use the static factories {@link #earliest()}, {@link #latest()},
34+
* {@link #atTimestamp(Instant)}, or {@link #fromByteArray(byte[])} to restore from
35+
* a previously saved checkpoint.
36+
*/
37+
public interface Checkpoint {
38+
39+
/**
40+
* Serialize this checkpoint for external storage.
41+
*
42+
* @return a serializable byte representation of this checkpoint that can be restored
43+
* via {@link #fromByteArray(byte[])}
44+
*/
45+
byte[] toByteArray();
46+
47+
/**
48+
* The time at which this checkpoint was created.
49+
*
50+
* @return the creation timestamp of this checkpoint as an {@link Instant}
51+
*/
52+
Instant creationTime();
53+
54+
// --- Static factories ---
55+
56+
/**
57+
* A sentinel checkpoint representing the beginning of the topic (oldest available data).
58+
*
59+
* @return a sentinel {@link Checkpoint} representing the earliest position in the topic
60+
*/
61+
static Checkpoint earliest() {
62+
return PulsarClientProvider.get().earliestCheckpoint();
63+
}
64+
65+
/**
66+
* A sentinel checkpoint representing the end of the topic (next message to be published).
67+
*
68+
* @return a sentinel {@link Checkpoint} representing the latest position in the topic
69+
*/
70+
static Checkpoint latest() {
71+
return PulsarClientProvider.get().latestCheckpoint();
72+
}
73+
74+
/**
75+
* A checkpoint that positions at the first message published at or after the given timestamp.
76+
*
77+
* @param timestamp the timestamp to position at
78+
* @return a {@link Checkpoint} that will start consuming from the first message at or after
79+
* the given timestamp
80+
*/
81+
static Checkpoint atTimestamp(Instant timestamp) {
82+
return PulsarClientProvider.get().checkpointAtTimestamp(timestamp);
83+
}
84+
85+
/**
86+
* Deserialize a checkpoint from a byte array previously obtained via {@link #toByteArray()}.
87+
*
88+
* @param data the byte array previously obtained from {@link #toByteArray()}
89+
* @return the deserialized {@link Checkpoint}
90+
* @throws IOException if the byte array is malformed or cannot be deserialized
91+
*/
92+
static Checkpoint fromByteArray(byte[] data) throws IOException {
93+
return PulsarClientProvider.get().checkpointFromBytes(data);
94+
}
95+
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api.v5;
20+
21+
import java.io.Closeable;
22+
import java.time.Duration;
23+
import org.apache.pulsar.client.api.v5.async.AsyncCheckpointConsumer;
24+
25+
/**
26+
* An unmanaged consumer designed for connector frameworks (Flink, Spark, etc.).
27+
*
28+
* <p>Unlike {@link StreamConsumer} and {@link QueueConsumer}, this consumer has no
29+
* broker-managed subscription — position tracking is entirely external. The connector
30+
* framework stores checkpoints in its own state backend and uses them to restore
31+
* on failure.
32+
*
33+
* <p>Internally, the consumer reads from all hash-range segments of a topic.
34+
* {@link #checkpoint()} creates an atomic snapshot of positions across all segments,
35+
* returned as an opaque {@link Checkpoint} that can be serialized and stored externally.
36+
*
37+
* <p>This interface provides synchronous (blocking) operations. For non-blocking
38+
* usage, obtain an {@link AsyncCheckpointConsumer} via {@link #async()}.
39+
*
40+
* @param <T> the type of message values
41+
*/
42+
public interface CheckpointConsumer<T> extends Closeable {
43+
44+
/**
45+
* The topic this consumer reads from.
46+
*
47+
* @return the fully qualified topic name
48+
*/
49+
String topic();
50+
51+
// --- Receive ---
52+
53+
/**
54+
* Receive a single message, blocking indefinitely.
55+
*
56+
* @return the received {@link Message}
57+
* @throws PulsarClientException if the consumer is closed or a connection error occurs
58+
*/
59+
Message<T> receive() throws PulsarClientException;
60+
61+
/**
62+
* Receive a single message, blocking up to the given timeout.
63+
* Returns {@code null} if the timeout elapses without a message.
64+
*
65+
* @param timeout the maximum time to wait for a message
66+
* @return the received {@link Message}, or {@code null} if the timeout elapses
67+
* @throws PulsarClientException if the consumer is closed or a connection error occurs
68+
*/
69+
Message<T> receive(Duration timeout) throws PulsarClientException;
70+
71+
/**
72+
* Receive a batch of messages, blocking up to the given timeout.
73+
*
74+
* @param maxMessages the maximum number of messages to return
75+
* @param timeout the maximum time to wait for messages
76+
* @return the received {@link Messages} batch
77+
* @throws PulsarClientException if the consumer is closed or a connection error occurs
78+
*/
79+
Messages<T> receiveMulti(int maxMessages, Duration timeout) throws PulsarClientException;
80+
81+
// --- Checkpoint ---
82+
83+
/**
84+
* Create a consistent checkpoint — an atomic snapshot of positions across all
85+
* internal hash-range segments.
86+
*
87+
* <p>The returned {@link Checkpoint} can be serialized via {@link Checkpoint#toByteArray()}
88+
* and stored in the connector framework's state backend.
89+
*
90+
* @return an opaque {@link Checkpoint} representing the current read positions
91+
*/
92+
Checkpoint checkpoint();
93+
94+
// --- Seek ---
95+
96+
/**
97+
* Seek to a previously saved checkpoint, or to a sentinel position such as
98+
* {@link Checkpoint#earliest()} or {@link Checkpoint#latest()}.
99+
*
100+
* @param checkpoint the checkpoint to seek to
101+
* @throws PulsarClientException if the seek fails or a connection error occurs
102+
*/
103+
void seek(Checkpoint checkpoint) throws PulsarClientException;
104+
105+
// --- Async ---
106+
107+
/**
108+
* Return the asynchronous view of this consumer.
109+
*
110+
* @return the {@link AsyncCheckpointConsumer} counterpart of this consumer
111+
*/
112+
AsyncCheckpointConsumer<T> async();
113+
114+
// --- Lifecycle ---
115+
116+
/**
117+
* Close the consumer and release all resources.
118+
*
119+
* @throws PulsarClientException if an error occurs while closing the consumer
120+
*/
121+
@Override
122+
void close() throws PulsarClientException;
123+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api.v5;
20+
21+
import java.util.Map;
22+
import java.util.concurrent.CompletableFuture;
23+
import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
24+
25+
/**
26+
* Builder for configuring and creating a {@link CheckpointConsumer}.
27+
*
28+
* <p>Since this is an unmanaged consumer (no subscription), the terminal method is
29+
* {@link #create()} rather than {@code subscribe()}.
30+
*
31+
* @param <T> the type of message values the consumer will receive
32+
*/
33+
public interface CheckpointConsumerBuilder<T> {
34+
35+
/**
36+
* Create the checkpoint consumer, blocking until it is ready.
37+
*
38+
* @return the created {@link CheckpointConsumer}
39+
* @throws PulsarClientException if the creation fails or a connection error occurs
40+
*/
41+
CheckpointConsumer<T> create() throws PulsarClientException;
42+
43+
/**
44+
* Create the checkpoint consumer asynchronously.
45+
*
46+
* @return a {@link CompletableFuture} that completes with the created {@link CheckpointConsumer}
47+
*/
48+
CompletableFuture<CheckpointConsumer<T>> createAsync();
49+
50+
// --- Required ---
51+
52+
/**
53+
* The topic to consume from.
54+
*
55+
* @param topicName the topic name
56+
* @return this builder instance for chaining
57+
*/
58+
CheckpointConsumerBuilder<T> topic(String topicName);
59+
60+
// --- Start position ---
61+
62+
/**
63+
* Set the initial position for this consumer.
64+
*
65+
* <p>Use {@link Checkpoint#earliest()}, {@link Checkpoint#latest()},
66+
* {@link Checkpoint#atTimestamp}, or {@link Checkpoint#fromByteArray} to
67+
* create the appropriate starting position.
68+
*
69+
* <p>Defaults to {@link Checkpoint#latest()} if not specified.
70+
*
71+
* @param checkpoint the checkpoint representing the desired start position
72+
* @return this builder instance for chaining
73+
*/
74+
CheckpointConsumerBuilder<T> startPosition(Checkpoint checkpoint);
75+
76+
// --- Optional ---
77+
78+
/**
79+
* A custom name for this consumer instance.
80+
*
81+
* @param name the consumer name
82+
* @return this builder instance for chaining
83+
*/
84+
CheckpointConsumerBuilder<T> consumerName(String name);
85+
86+
/**
87+
* Configure end-to-end message encryption for decryption.
88+
*
89+
* @param policy the encryption policy to use
90+
* @return this builder instance for chaining
91+
* @see EncryptionPolicy#forConsumer
92+
*/
93+
CheckpointConsumerBuilder<T> encryptionPolicy(EncryptionPolicy policy);
94+
95+
// --- Metadata ---
96+
97+
/**
98+
* Add a single property to the consumer metadata.
99+
*
100+
* @param key the property key
101+
* @param value the property value
102+
* @return this builder instance for chaining
103+
*/
104+
CheckpointConsumerBuilder<T> property(String key, String value);
105+
106+
/**
107+
* Add multiple properties to the consumer metadata.
108+
*
109+
* @param properties the properties to add
110+
* @return this builder instance for chaining
111+
*/
112+
CheckpointConsumerBuilder<T> properties(Map<String, String> properties);
113+
}

0 commit comments

Comments
 (0)