Skip to content

Commit 20f53e1

Browse files
committed
Add pull API
1 parent 1184903 commit 20f53e1

5 files changed

Lines changed: 202 additions & 1 deletion

File tree

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.consumer;
19+
20+
import java.io.Closeable;
21+
import java.io.IOException;
22+
import java.time.Duration;
23+
import java.util.Collection;
24+
import java.util.List;
25+
import java.util.Optional;
26+
import org.apache.rocketmq.client.apis.ClientException;
27+
import org.apache.rocketmq.client.apis.message.MessageQueue;
28+
import org.apache.rocketmq.client.apis.message.MessageView;
29+
30+
public interface PullConsumer extends Closeable {
31+
String getConsumerGroup();
32+
33+
void registerMessageQueueChangeListenerByTopic(String topic, TopicMessageQueueChangeListener listener);
34+
35+
Collection<MessageQueue> fetchMessageQueues(String topic) throws ClientException;
36+
37+
PullConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException;
38+
39+
PullConsumer unsubscribe(String topic);
40+
41+
void assign(Collection<MessageQueue> messageQueues);
42+
43+
List<MessageView> poll(Duration timeout);
44+
45+
void seek(MessageQueue messageQueue, long offset);
46+
47+
void pause(Collection<MessageQueue> messageQueues);
48+
49+
void resume(Collection<MessageQueue> messageQueues);
50+
51+
Optional<Long> offsetForTimestamp(MessageQueue messageQueue, Long timestamp);
52+
53+
Optional<Long> committed(MessageQueue messageQueue);
54+
55+
void commit();
56+
57+
void seekToBegin(MessageQueue messageQueue) throws ClientException;
58+
59+
void seekToEnd(MessageQueue messageQueue) throws ClientException;
60+
61+
@Override
62+
void close() throws IOException;
63+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.consumer;
19+
20+
import java.time.Duration;
21+
import org.apache.rocketmq.client.apis.ClientConfiguration;
22+
import org.apache.rocketmq.client.apis.ClientException;
23+
24+
public interface PullConsumerBuilder {
25+
/**
26+
* Set the client configuration for the consumer.
27+
*
28+
* @param clientConfiguration client's configuration.
29+
* @return the consumer builder instance.
30+
*/
31+
PullConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration);
32+
33+
/**
34+
* Set the load balancing group for the consumer.
35+
*
36+
* @param consumerGroup consumer load balancing group.
37+
* @return the consumer builder instance.
38+
*/
39+
PullConsumerBuilder setConsumerGroup(String consumerGroup);
40+
41+
/**
42+
* Automate the consumer's offset commit.
43+
*
44+
* @return the consumer builder instance.
45+
*/
46+
PullConsumerBuilder enableAutoCommit(boolean enable);
47+
48+
/**
49+
* Set the consumer's offset commit interval if auto commit is enabled.
50+
*
51+
* @param duration offset commit interval
52+
* @return the consumer builder instance.
53+
*/
54+
PullConsumerBuilder setAutoCommitInterval(Duration duration);
55+
56+
/**
57+
* Set the maximum number of messages cached locally.
58+
*
59+
* @param count message count.
60+
* @return the consumer builder instance.
61+
*/
62+
PullConsumerBuilder setMaxCacheMessageCountEachQueue(int count);
63+
64+
/**
65+
* Set the maximum bytes of messages cached locally.
66+
*
67+
* @param bytes message size.
68+
* @return the consumer builder instance.
69+
*/
70+
PullConsumerBuilder setMaxCacheMessageSizeInBytesEachQueue(int bytes);
71+
72+
/**
73+
* Finalize the build of {@link PullConsumer} and start.
74+
*
75+
* <p>This method will block until the pull consumer starts successfully.
76+
*
77+
* <p>Especially, if this method is invoked more than once, different pull consumer will be created and started.
78+
*
79+
* @return the pull consumer instance.
80+
*/
81+
PullConsumer build() throws ClientException;
82+
}

java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public interface PushConsumerBuilder {
8686
*
8787
* <p>This method will block until the push consumer starts successfully.
8888
*
89-
* <p>Especially, if this method is invoked more than once, different push consumers will be created and started.
89+
* <p>Especially, if this method is invoked more than once, different push consumer will be created and started.
9090
*
9191
* @return the push consumer instance.
9292
*/
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.consumer;
19+
20+
import java.util.Set;
21+
import org.apache.rocketmq.client.apis.message.MessageQueue;
22+
23+
public interface TopicMessageQueueChangeListener {
24+
/**
25+
* This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is
26+
* expanded or shrunk.
27+
*
28+
* @param topic the topic to listen.
29+
* @param messageQueues
30+
*/
31+
void onChanged(String topic, Set<MessageQueue> messageQueues);
32+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.apis.message;
19+
20+
public interface MessageQueue {
21+
String getTopic();
22+
23+
String getId();
24+
}

0 commit comments

Comments
 (0)