Skip to content

Commit c6db59e

Browse files
committed
Add PullConsumer APIs
1 parent 1184903 commit c6db59e

6 files changed

Lines changed: 280 additions & 2 deletions

File tree

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.consumer;
19+
20+
import java.io.Closeable;
21+
import java.io.IOException;
22+
import java.time.Duration;
23+
import java.util.Collection;
24+
import java.util.List;
25+
import java.util.Optional;
26+
import org.apache.rocketmq.client.apis.ClientException;
27+
import org.apache.rocketmq.client.apis.message.MessageQueue;
28+
import org.apache.rocketmq.client.apis.message.MessageView;
29+
30+
public interface PullConsumer extends Closeable {
31+
/**
32+
* Get the consumer group of the consumer.
33+
*/
34+
String getConsumerGroup();
35+
36+
/**
37+
* @param topic the topic that needs to be monitored.
38+
* @param listener the callback to detect the message queue changes.
39+
*/
40+
void registerMessageQueueChangeListenerByTopic(String topic, TopicMessageQueueChangeListener listener);
41+
42+
/**
43+
* Fetch message queues of the topic.
44+
*/
45+
Collection<MessageQueue> fetchMessageQueues(String topic) throws ClientException;
46+
47+
/**
48+
* Manually assign a list of message queues to this consumer.
49+
*
50+
* <p>This interface does not allow for incremental assignment and will replace the previous assignment (if
51+
* previous assignment existed).
52+
*
53+
* @param messageQueues the list of message queues that are to be assigned to this consumer.
54+
*/
55+
void assign(Collection<MessageQueue> messageQueues);
56+
57+
/**
58+
* Fetch messages from assigned message queues specified by {@link #assign(Collection)}.
59+
*
60+
* @param timeout the maximum time to block.
61+
* @return list of fetched messages.
62+
*/
63+
List<MessageView> poll(Duration timeout);
64+
65+
/**
66+
* Overrides the fetch offsets that the consumer will use on the next poll. If this method is invoked for the same
67+
* message queue more than once, the latest offset will be used on the next {@link #poll(Duration)}.
68+
*
69+
* @param messageQueue the message queue to override the fetch offset.
70+
* @param offset message offset.
71+
*/
72+
void seek(MessageQueue messageQueue, long offset);
73+
74+
/**
75+
* Suspending message pulling from the message queues.
76+
*
77+
* @param messageQueues message queues that need to be suspended.
78+
*/
79+
void pause(Collection<MessageQueue> messageQueues);
80+
81+
/**
82+
* Resuming message pulling from the message queues.
83+
*
84+
* @param messageQueues message queues that need to be resumed.
85+
*/
86+
void resume(Collection<MessageQueue> messageQueues);
87+
88+
/**
89+
* Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the
90+
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message
91+
* queue.
92+
*
93+
* @param messageQueue message queue that needs to be looked up.
94+
* @param timestamp the timestamp for which to search.
95+
* @return the offset of the message queue, or {@link Optional#empty()} if there is no message.
96+
*/
97+
Optional<Long> offsetForTimestamp(MessageQueue messageQueue, Long timestamp);
98+
99+
/**
100+
* Get the latest committed offset for the given message queue.
101+
*
102+
* @return the latest committed offset, or {@link Optional#empty()} if there was no prior commit.
103+
*/
104+
Optional<Long> committed(MessageQueue messageQueue);
105+
106+
/**
107+
* Commit offset manually.
108+
*/
109+
void commit() throws ClientException;
110+
111+
/**
112+
* Overrides the fetch offsets with the beginning offset that the consumer will use on the next poll. If this
113+
* method is invoked for the same message queue more than once, the latest offset will be used on the next
114+
* {@link #poll(Duration)}.
115+
*
116+
* @param messageQueue the message queue to seek.
117+
*/
118+
void seekToBegin(MessageQueue messageQueue) throws ClientException;
119+
120+
/**
121+
* Overrides the fetch offsets with the end offset that the consumer will use on the next poll. If this method is
122+
* invoked for the same message queue more than once, the latest offset will be used on the next
123+
* {@link #poll(Duration)}.
124+
*
125+
* @param messageQueue the message queue to seek.
126+
*/
127+
void seekToEnd(MessageQueue messageQueue) throws ClientException;
128+
129+
/**
130+
* Close the pull consumer and release all related resources.
131+
*/
132+
@Override
133+
void close() throws IOException;
134+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.consumer;
19+
20+
import java.time.Duration;
21+
import org.apache.rocketmq.client.apis.ClientConfiguration;
22+
import org.apache.rocketmq.client.apis.ClientException;
23+
24+
public interface PullConsumerBuilder {
25+
/**
26+
* Set the client configuration for the consumer.
27+
*
28+
* @param clientConfiguration client's configuration.
29+
* @return the consumer builder instance.
30+
*/
31+
PullConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);
32+
33+
/**
34+
* Set the load balancing group for the consumer.
35+
*
36+
* @param consumerGroup consumer load balancing group.
37+
* @return the consumer builder instance.
38+
*/
39+
PullConsumerBuilder setConsumerGroup(String consumerGroup);
40+
41+
/**
42+
* Automate the consumer's offset commit.
43+
*
44+
* @return the consumer builder instance.
45+
*/
46+
PullConsumerBuilder enableAutoCommit(boolean enable);
47+
48+
/**
49+
* Set the consumer's offset commit interval if auto commit is enabled.
50+
*
51+
* @param duration offset commit interval
52+
* @return the consumer builder instance.
53+
*/
54+
PullConsumerBuilder setAutoCommitInterval(Duration duration);
55+
56+
/**
57+
* Set the maximum number of messages cached locally.
58+
*
59+
* @param count message count.
60+
* @return the consumer builder instance.
61+
*/
62+
PullConsumerBuilder setMaxCacheMessageCountEachQueue(int count);
63+
64+
/**
65+
* Set the maximum bytes of messages cached locally.
66+
*
67+
* @param bytes message size.
68+
* @return the consumer builder instance.
69+
*/
70+
PullConsumerBuilder setMaxCacheMessageSizeInBytesEachQueue(int bytes);
71+
72+
/**
73+
* Finalize the build of {@link PullConsumer} and start.
74+
*
75+
* <p>This method will block until the pull consumer starts successfully.
76+
*
77+
* <p>Especially, if this method is invoked more than once, different pull consumer will be created and started.
78+
*
79+
* @return the pull consumer instance.
80+
*/
81+
PullConsumer build() throws ClientException;
82+
}

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public interface PushConsumer extends Closeable {
9393
*
9494
* <p>Nothing occurs if the specified topic does not exist in subscription expressions of the push consumer.
9595
*
96-
* @param topic the topic to remove the subscription.
96+
* @param topic the topic to remove from the subscription.
9797
* @return push consumer instance.
9898
*/
9999
PushConsumer unsubscribe(String topic) throws ClientException;

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public interface PushConsumerBuilder {
8686
*
8787
* <p>This method will block until the push consumer starts successfully.
8888
*
89-
* <p>Especially, if this method is invoked more than once, different push consumers will be created and started.
89+
* <p>Especially, if this method is invoked more than once, different push consumer will be created and started.
9090
*
9191
* @return the push consumer instance.
9292
*/
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.consumer;
19+
20+
import java.util.Set;
21+
import org.apache.rocketmq.client.apis.message.MessageQueue;
22+
23+
public interface TopicMessageQueueChangeListener {
24+
/**
25+
* This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is
26+
* expanded or shrunk.
27+
*
28+
* @param topic the topic to listen.
29+
* @param messageQueues latest message queues of the topic.
30+
*/
31+
void onChanged(String topic, Set<MessageQueue> messageQueues);
32+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.message;
19+
20+
public interface MessageQueue {
21+
/**
22+
* Topic of the current message queue.
23+
*/
24+
String getTopic();
25+
26+
/**
27+
* Get the identifier of the current message queue.
28+
*/
29+
String getId();
30+
}

0 commit comments

Comments
 (0)