Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions pulsar-client-api-v5/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

plugins {
id("pulsar.java-conventions")
}

dependencies {
compileOnly(libs.protobuf.java)
compileOnly(libs.opentelemetry.api)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.v5;

import java.io.IOException;
import java.time.Instant;
import org.apache.pulsar.client.api.v5.internal.PulsarClientProvider;

/**
* An opaque, serializable position vector representing a consistent point across all
* internal hash-range segments of a topic.
*
* <p>Checkpoints are created via {@link CheckpointConsumer#checkpoint()} and can be
* serialized for external storage (e.g. Flink state, S3) using {@link #toByteArray()}.
*
* <p>This is the sole position type used with {@link CheckpointConsumer} — for initial
* positioning use the static factories {@link #earliest()}, {@link #latest()},
* {@link #atTimestamp(Instant)}, or {@link #fromByteArray(byte[])} to restore from
* a previously saved checkpoint.
*/
public interface Checkpoint {

/**
* Serialize this checkpoint for external storage.
*
* @return a serializable byte representation of this checkpoint that can be restored
* via {@link #fromByteArray(byte[])}
*/
byte[] toByteArray();

/**
* The time at which this checkpoint was created.
*
* @return the creation timestamp of this checkpoint as an {@link Instant}
*/
Instant creationTime();

// --- Static factories ---

/**
* A sentinel checkpoint representing the beginning of the topic (oldest available data).
*
* @return a sentinel {@link Checkpoint} representing the earliest position in the topic
*/
static Checkpoint earliest() {
return PulsarClientProvider.get().earliestCheckpoint();
}

/**
* A sentinel checkpoint representing the end of the topic (next message to be published).
*
* @return a sentinel {@link Checkpoint} representing the latest position in the topic
*/
static Checkpoint latest() {
return PulsarClientProvider.get().latestCheckpoint();
}

/**
* A checkpoint that positions at the first message published at or after the given timestamp.
*
* @param timestamp the timestamp to position at
* @return a {@link Checkpoint} that will start consuming from the first message at or after
* the given timestamp
*/
static Checkpoint atTimestamp(Instant timestamp) {
return PulsarClientProvider.get().checkpointAtTimestamp(timestamp);
}

/**
* Deserialize a checkpoint from a byte array previously obtained via {@link #toByteArray()}.
*
* @param data the byte array previously obtained from {@link #toByteArray()}
* @return the deserialized {@link Checkpoint}
* @throws IOException if the byte array is malformed or cannot be deserialized
*/
static Checkpoint fromByteArray(byte[] data) throws IOException {
return PulsarClientProvider.get().checkpointFromBytes(data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.v5;

import java.io.Closeable;
import java.time.Duration;
import org.apache.pulsar.client.api.v5.async.AsyncCheckpointConsumer;

/**
* An unmanaged consumer designed for connector frameworks (Flink, Spark, etc.).
*
* <p>Unlike {@link StreamConsumer} and {@link QueueConsumer}, this consumer has no
* broker-managed subscription — position tracking is entirely external. The connector
* framework stores checkpoints in its own state backend and uses them to restore
* on failure.
*
* <p>Internally, the consumer reads from all hash-range segments of a topic.
* {@link #checkpoint()} creates an atomic snapshot of positions across all segments,
* returned as an opaque {@link Checkpoint} that can be serialized and stored externally.
*
* <p>This interface provides synchronous (blocking) operations. For non-blocking
* usage, obtain an {@link AsyncCheckpointConsumer} via {@link #async()}.
*
* @param <T> the type of message values
*/
public interface CheckpointConsumer<T> extends Closeable {

/**
* The topic this consumer reads from.
*
* @return the fully qualified topic name
*/
String topic();

// --- Receive ---

/**
* Receive a single message, blocking indefinitely.
*
* @return the received {@link Message}
* @throws PulsarClientException if the consumer is closed or a connection error occurs
*/
Message<T> receive() throws PulsarClientException;

/**
* Receive a single message, blocking up to the given timeout.
* Returns {@code null} if the timeout elapses without a message.
*
* @param timeout the maximum time to wait for a message
* @return the received {@link Message}, or {@code null} if the timeout elapses
* @throws PulsarClientException if the consumer is closed or a connection error occurs
*/
Message<T> receive(Duration timeout) throws PulsarClientException;

/**
* Receive a batch of messages, blocking up to the given timeout.
*
* @param maxMessages the maximum number of messages to return
* @param timeout the maximum time to wait for messages
* @return the received {@link Messages} batch
* @throws PulsarClientException if the consumer is closed or a connection error occurs
*/
Messages<T> receiveMulti(int maxMessages, Duration timeout) throws PulsarClientException;

// --- Checkpoint ---

/**
* Create a consistent checkpoint — an atomic snapshot of positions across all
* internal hash-range segments.
*
* <p>The returned {@link Checkpoint} can be serialized via {@link Checkpoint#toByteArray()}
* and stored in the connector framework's state backend.
*
* @return an opaque {@link Checkpoint} representing the current read positions
*/
Checkpoint checkpoint();

// --- Seek ---

/**
* Seek to a previously saved checkpoint, or to a sentinel position such as
* {@link Checkpoint#earliest()} or {@link Checkpoint#latest()}.
*
* @param checkpoint the checkpoint to seek to
* @throws PulsarClientException if the seek fails or a connection error occurs
*/
void seek(Checkpoint checkpoint) throws PulsarClientException;

// --- Async ---

/**
* Return the asynchronous view of this consumer.
*
* @return the {@link AsyncCheckpointConsumer} counterpart of this consumer
*/
AsyncCheckpointConsumer<T> async();

// --- Lifecycle ---

/**
* Close the consumer and release all resources.
*
* @throws PulsarClientException if an error occurs while closing the consumer
*/
@Override
void close() throws PulsarClientException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.v5;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;

/**
* Builder for configuring and creating a {@link CheckpointConsumer}.
*
* <p>Since this is an unmanaged consumer (no subscription), the terminal method is
* {@link #create()} rather than {@code subscribe()}.
*
* @param <T> the type of message values the consumer will receive
*/
public interface CheckpointConsumerBuilder<T> {

/**
* Create the checkpoint consumer, blocking until it is ready.
*
* @return the created {@link CheckpointConsumer}
* @throws PulsarClientException if the creation fails or a connection error occurs
*/
CheckpointConsumer<T> create() throws PulsarClientException;

/**
* Create the checkpoint consumer asynchronously.
*
* @return a {@link CompletableFuture} that completes with the created {@link CheckpointConsumer}
*/
CompletableFuture<CheckpointConsumer<T>> createAsync();

// --- Required ---

/**
* The topic to consume from.
*
* @param topicName the topic name
* @return this builder instance for chaining
*/
CheckpointConsumerBuilder<T> topic(String topicName);

// --- Start position ---

/**
* Set the initial position for this consumer.
*
* <p>Use {@link Checkpoint#earliest()}, {@link Checkpoint#latest()},
* {@link Checkpoint#atTimestamp}, or {@link Checkpoint#fromByteArray} to
* create the appropriate starting position.
*
* <p>Defaults to {@link Checkpoint#latest()} if not specified.
*
* @param checkpoint the checkpoint representing the desired start position
* @return this builder instance for chaining
*/
CheckpointConsumerBuilder<T> startPosition(Checkpoint checkpoint);

// --- Optional ---

/**
* A custom name for this consumer instance.
*
* @param name the consumer name
* @return this builder instance for chaining
*/
CheckpointConsumerBuilder<T> consumerName(String name);

/**
* Configure end-to-end message encryption for decryption.
*
* @param policy the encryption policy to use
* @return this builder instance for chaining
* @see EncryptionPolicy#forConsumer
*/
CheckpointConsumerBuilder<T> encryptionPolicy(EncryptionPolicy policy);

// --- Metadata ---

/**
* Add a single property to the consumer metadata.
*
* @param key the property key
* @param value the property value
* @return this builder instance for chaining
*/
CheckpointConsumerBuilder<T> property(String key, String value);

/**
* Add multiple properties to the consumer metadata.
*
* @param properties the properties to add
* @return this builder instance for chaining
*/
CheckpointConsumerBuilder<T> properties(Map<String, String> properties);
}
Loading
Loading