Skip to content

Commit 70ad82e

Browse files
committed
Add more tests
1 parent 1e0ece9 commit 70ad82e

2 files changed

Lines changed: 79 additions & 1 deletion

File tree

java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PullConsumerImplTest.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818
package org.apache.rocketmq.client.java.impl.consumer;
1919

2020
import static org.junit.Assert.assertTrue;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.anyLong;
2123
import static org.mockito.ArgumentMatchers.anyString;
2224
import static org.mockito.Mockito.doReturn;
2325
import static org.mockito.Mockito.spy;
2426
import static org.mockito.Mockito.times;
2527
import static org.mockito.Mockito.verify;
2628

29+
import apache.rocketmq.v2.UpdateOffsetRequest;
30+
import apache.rocketmq.v2.UpdateOffsetResponse;
2731
import java.time.Duration;
2832
import java.util.ArrayList;
2933
import java.util.List;
@@ -38,14 +42,14 @@
3842
import org.apache.rocketmq.client.java.message.MessageViewImpl;
3943
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
4044
import org.apache.rocketmq.client.java.route.TopicRouteData;
45+
import org.apache.rocketmq.client.java.rpc.RpcFuture;
4146
import org.apache.rocketmq.client.java.tool.TestBase;
4247
import org.junit.Assert;
4348
import org.junit.Before;
4449
import org.junit.Test;
4550
import org.junit.runner.RunWith;
4651
import org.mockito.junit.MockitoJUnitRunner;
4752

48-
@SuppressWarnings("resource")
4953
@RunWith(MockitoJUnitRunner.class)
5054
public class PullConsumerImplTest extends TestBase {
5155

@@ -252,4 +256,62 @@ public void onChanged(String topic, Set<MessageQueue> messageQueues) {
252256
pullConsumer.registerMessageQueueChangeListenerByTopic(FAKE_TOPIC_0, listener);
253257
verify(pullConsumer, times(1)).fetchMessageQueues(anyString());
254258
}
259+
260+
@Test(expected = IllegalArgumentException.class)
261+
public void testSeekWithMessageQueueIsNotContained() {
262+
doReturn(true).when(pullConsumer).isRunning();
263+
final MessageQueueImpl mq = fakeMessageQueueImpl(FAKE_TOPIC_0);
264+
pullConsumer.seek(mq, 1);
265+
}
266+
267+
@Test
268+
public void testSeek() {
269+
doReturn(true).when(pullConsumer).isRunning();
270+
MessageQueueImpl mq = fakeMessageQueueImpl(FAKE_TOPIC_0);
271+
List<MessageQueue> mqs = new ArrayList<>();
272+
mqs.add(mq);
273+
pullConsumer.assign(mqs);
274+
pullConsumer.seek(mq, 1);
275+
verify(pullConsumer, times(1)).dropProcessQueue(any(MessageQueueImpl.class));
276+
verify(pullConsumer, times(1))
277+
.tryPullMessageByMessageQueueImmediately(any(MessageQueueImpl.class), any(FilterExpression.class),
278+
anyLong());
279+
}
280+
281+
@Test
282+
public void testSeekToBegin() throws ClientException {
283+
doReturn(true).when(pullConsumer).isRunning();
284+
final MessageQueueImpl mq = fakeMessageQueueImpl(FAKE_TOPIC_0);
285+
List<MessageQueue> mqs = new ArrayList<>();
286+
mqs.add(mq);
287+
pullConsumer.assign(mqs);
288+
doReturn(okQueryOffsetResponseFuture()).when(pullConsumer).queryOffset(any(MessageQueueImpl.class),
289+
any(OffsetPolicy.class));
290+
pullConsumer.seekToBegin(mq);
291+
verify(pullConsumer, times(1)).queryOffset(any(MessageQueueImpl.class),
292+
any(OffsetPolicy.class));
293+
}
294+
295+
@Test
296+
public void testSeekToEnd() throws ClientException {
297+
doReturn(true).when(pullConsumer).isRunning();
298+
final MessageQueueImpl mq = fakeMessageQueueImpl(FAKE_TOPIC_0);
299+
List<MessageQueue> mqs = new ArrayList<>();
300+
mqs.add(mq);
301+
pullConsumer.assign(mqs);
302+
doReturn(okQueryOffsetResponseFuture()).when(pullConsumer).queryOffset(any(MessageQueueImpl.class),
303+
any(OffsetPolicy.class));
304+
pullConsumer.seekToEnd(mq);
305+
verify(pullConsumer, times(1)).queryOffset(any(MessageQueueImpl.class),
306+
any(OffsetPolicy.class));
307+
}
308+
309+
@Test
310+
public void testCommit() throws ClientException {
311+
List<RpcFuture<UpdateOffsetRequest, UpdateOffsetResponse>> futures = new ArrayList<>();
312+
final RpcFuture<UpdateOffsetRequest, UpdateOffsetResponse> future = okUpdateOffsetResponseFuture();
313+
futures.add(future);
314+
doReturn(futures).when(pullConsumer).commit0();
315+
pullConsumer.commit();
316+
}
255317
}

java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import apache.rocketmq.v2.PullMessageResponse;
4141
import apache.rocketmq.v2.QueryAssignmentRequest;
4242
import apache.rocketmq.v2.QueryAssignmentResponse;
43+
import apache.rocketmq.v2.QueryOffsetRequest;
44+
import apache.rocketmq.v2.QueryOffsetResponse;
4345
import apache.rocketmq.v2.ReceiveMessageRequest;
4446
import apache.rocketmq.v2.ReceiveMessageResponse;
4547
import apache.rocketmq.v2.Resource;
@@ -48,6 +50,8 @@
4850
import apache.rocketmq.v2.SendResultEntry;
4951
import apache.rocketmq.v2.Status;
5052
import apache.rocketmq.v2.SystemProperties;
53+
import apache.rocketmq.v2.UpdateOffsetRequest;
54+
import apache.rocketmq.v2.UpdateOffsetResponse;
5155
import com.google.common.util.concurrent.Futures;
5256
import com.google.common.util.concurrent.ListenableFuture;
5357
import com.google.common.util.concurrent.SettableFuture;
@@ -297,6 +301,12 @@ ForwardMessageToDeadLetterQueueResponse> okForwardMessageToDeadLetterQueueRespon
297301
return new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response));
298302
}
299303

304+
protected RpcFuture<UpdateOffsetRequest, UpdateOffsetResponse> okUpdateOffsetResponseFuture() {
305+
final Status status = Status.newBuilder().setCode(Code.OK).build();
306+
final UpdateOffsetResponse response = UpdateOffsetResponse.newBuilder().setStatus(status).build();
307+
return new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response));
308+
}
309+
300310
protected RpcFuture<ForwardMessageToDeadLetterQueueRequest,
301311
ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueueResponseFuture(Code code) {
302312
final Status status = Status.newBuilder().setCode(code).build();
@@ -383,6 +393,12 @@ protected RpcFuture<GetOffsetRequest, GetOffsetResponse> okGetOffsetResponseFutu
383393
return new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response));
384394
}
385395

396+
protected RpcFuture<QueryOffsetRequest, QueryOffsetResponse> okQueryOffsetResponseFuture() {
397+
final Status status = Status.newBuilder().setCode(Code.OK).build();
398+
QueryOffsetResponse response = QueryOffsetResponse.newBuilder().setOffset(0).setStatus(status).build();
399+
return new RpcFuture<>(fakeRpcContext(), null, Futures.immediateFuture(response));
400+
}
401+
386402
protected ListenableFuture<EndTransactionResponse> okEndTransactionResponseFuture() {
387403
SettableFuture<EndTransactionResponse> future = SettableFuture.create();
388404
final Status status = Status.newBuilder().setCode(Code.OK).build();

0 commit comments

Comments
 (0)