Skip to content

Commit fb4f5ba

Browse files
authored
Add CDCRequestHandlerTest and RetryStreamingExceptionHandlerTest (#37374)
* Add CDCRequestHandlerTest and RetryStreamingExceptionHandlerTest * Add CDCRequestHandlerTest and RetryStreamingExceptionHandlerTest
1 parent dcf5f59 commit fb4f5ba

3 files changed

Lines changed: 274 additions & 32 deletions

File tree

kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClientTest.java

Lines changed: 0 additions & 32 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
19+
20+
import io.netty.channel.Channel;
21+
import io.netty.channel.ChannelFuture;
22+
import io.netty.channel.ChannelHandlerContext;
23+
import io.netty.util.AttributeMap;
24+
import io.netty.util.DefaultAttributeMap;
25+
import org.apache.shardingsphere.data.pipeline.cdc.client.constant.ClientConnectionStatus;
26+
import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
27+
import org.apache.shardingsphere.data.pipeline.cdc.client.exception.ServerResultException;
28+
import org.apache.shardingsphere.data.pipeline.cdc.client.util.ResponseFuture;
29+
import org.apache.shardingsphere.data.pipeline.cdc.client.util.ServerErrorResult;
30+
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
31+
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
32+
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
33+
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
34+
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult;
35+
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
36+
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
37+
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
38+
import org.junit.jupiter.api.BeforeEach;
39+
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.extension.ExtendWith;
41+
import org.mockito.ArgumentCaptor;
42+
import org.mockito.Mock;
43+
import org.mockito.junit.jupiter.MockitoExtension;
44+
import org.mockito.junit.jupiter.MockitoSettings;
45+
import org.mockito.quality.Strictness;
46+
47+
import java.util.List;
48+
import java.util.function.Consumer;
49+
50+
import static org.hamcrest.MatcherAssert.assertThat;
51+
import static org.hamcrest.Matchers.is;
52+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
53+
import static org.junit.jupiter.api.Assertions.assertThrows;
54+
import static org.junit.jupiter.api.Assertions.assertTrue;
55+
import static org.mockito.ArgumentMatchers.any;
56+
import static org.mockito.ArgumentMatchers.eq;
57+
import static org.mockito.Mockito.mock;
58+
import static org.mockito.Mockito.verify;
59+
import static org.mockito.Mockito.verifyNoInteractions;
60+
import static org.mockito.Mockito.when;
61+
62+
@ExtendWith(MockitoExtension.class)
63+
@MockitoSettings(strictness = Strictness.LENIENT)
64+
class CDCRequestHandlerTest {
65+
66+
@Mock
67+
private Consumer<List<Record>> consumer;
68+
69+
@Mock
70+
private ExceptionHandler exceptionHandler;
71+
72+
@Mock
73+
private ServerErrorResultHandler errorResultHandler;
74+
75+
private CDCRequestHandler handler;
76+
77+
@BeforeEach
78+
void setUp() {
79+
handler = new CDCRequestHandler(consumer, exceptionHandler, errorResultHandler);
80+
}
81+
82+
@Test
83+
void assertChannelRegisteredAndInactive() {
84+
Channel channel = mockChannel(null);
85+
ChannelHandlerContext ctx = mockChannelHandlerContext(channel);
86+
handler.channelRegistered(ctx);
87+
assertThat(channel.attr(ClientConnectionContext.CONTEXT_KEY).get().getStatus().get(), is(ClientConnectionStatus.NOT_LOGGED_IN));
88+
handler.channelInactive(ctx);
89+
verify(ctx).fireChannelInactive();
90+
}
91+
92+
@Test
93+
void assertHandleNonSucceedResponseWithFuture() {
94+
ClientConnectionContext connectionContext = new ClientConnectionContext();
95+
ResponseFuture responseFuture = new ResponseFuture("foo_req", Type.START_STREAMING);
96+
connectionContext.getResponseFutureMap().put("foo_req", responseFuture);
97+
ChannelHandlerContext ctx = mockChannelHandlerContext(mockChannel(connectionContext));
98+
CDCResponse response = CDCResponse.newBuilder().setRequestId("foo_req").setStatus(Status.FAILED).setErrorCode("500").setErrorMessage("mock error").build();
99+
handler.channelRead(ctx, response);
100+
ArgumentCaptor<ServerErrorResult> resultCaptor = ArgumentCaptor.forClass(ServerErrorResult.class);
101+
verify(errorResultHandler).handleServerError(eq(ctx), resultCaptor.capture());
102+
ServerErrorResult actualResult = resultCaptor.getValue();
103+
assertThat(actualResult.getErrorCode(), is("500"));
104+
assertThat(actualResult.getErrorMessage(), is("mock error"));
105+
assertThat(actualResult.getRequestType(), is(Type.START_STREAMING));
106+
assertThat(responseFuture.getErrorCode(), is("500"));
107+
assertThat(responseFuture.getErrorMessage(), is("mock error"));
108+
ServerResultException ex = assertThrows(ServerResultException.class, () -> responseFuture.waitResponseResult(100L, connectionContext));
109+
assertThat(ex.getMessage(), is("Get START_STREAMING response failed, code:500, reason: mock error"));
110+
assertTrue(connectionContext.getResponseFutureMap().isEmpty());
111+
}
112+
113+
@Test
114+
void assertHandleNonSucceedResponseWithoutFuture() {
115+
ChannelHandlerContext ctx = mockChannelHandlerContext(mockChannel(new ClientConnectionContext()));
116+
handler.channelRead(ctx, CDCResponse.newBuilder().setRequestId("foo_req").setStatus(Status.FAILED).setErrorCode("404").setErrorMessage("not found").build());
117+
ArgumentCaptor<ServerErrorResult> resultCaptor = ArgumentCaptor.forClass(ServerErrorResult.class);
118+
verify(errorResultHandler).handleServerError(eq(ctx), resultCaptor.capture());
119+
assertThat(resultCaptor.getValue().getRequestType(), is(Type.UNKNOWN));
120+
}
121+
122+
@Test
123+
void assertHandleServerGreeting() {
124+
ClientConnectionContext connectionContext = new ClientConnectionContext();
125+
ChannelHandlerContext ctx = mockChannelHandlerContext(mockChannel(connectionContext));
126+
CDCResponse response = CDCResponse.newBuilder().setRequestId("foo_req").setStatus(Status.SUCCEED)
127+
.setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion("1.0").setProtocolVersion("1").build()).build();
128+
handler.channelRead(ctx, response);
129+
assertTrue(connectionContext.getStreamingIds().isEmpty());
130+
verifyNoInteractions(errorResultHandler, consumer, exceptionHandler);
131+
}
132+
133+
@Test
134+
void assertHandleLoginResponse() {
135+
Channel channel = mockChannel(null);
136+
ChannelHandlerContext ctx = mockChannelHandlerContext(channel);
137+
handler.channelRegistered(ctx);
138+
ClientConnectionContext connectionContext = channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
139+
ResponseFuture responseFuture = new ResponseFuture("foo_req", Type.LOGIN);
140+
connectionContext.getResponseFutureMap().put("foo_req", responseFuture);
141+
handler.channelRead(ctx, CDCResponse.newBuilder().setRequestId("foo_req").setStatus(Status.SUCCEED).build());
142+
assertThat(connectionContext.getStatus().get(), is(ClientConnectionStatus.LOGGED_IN));
143+
assertDoesNotThrow(() -> responseFuture.waitResponseResult(500L, connectionContext));
144+
}
145+
146+
@Test
147+
void assertHandleStreamDataResult() {
148+
ClientConnectionContext connectionContext = new ClientConnectionContext();
149+
connectionContext.getStatus().set(ClientConnectionStatus.NOT_LOGGED_IN);
150+
ResponseFuture responseFuture = new ResponseFuture("foo_req", Type.STREAM_DATA);
151+
connectionContext.getResponseFutureMap().put("foo_req", responseFuture);
152+
CDCResponse response = CDCResponse.newBuilder()
153+
.setRequestId("foo_req").setStatus(Status.SUCCEED).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId("stream_1").build()).build();
154+
handler.channelRead(mockChannelHandlerContext(mockChannel(connectionContext)), response);
155+
assertTrue(connectionContext.getStreamingIds().contains("stream_1"));
156+
assertThat(responseFuture.getResult(), is("stream_1"));
157+
assertThat(responseFuture.waitResponseResult(500L, connectionContext).toString(), is("stream_1"));
158+
}
159+
160+
@Test
161+
void assertHandleDataRecordResult() {
162+
Channel channel = mockChannel(new ClientConnectionContext());
163+
DataRecordResult recordResult = DataRecordResult.newBuilder().setAckId("ack_1").addRecord(Record.newBuilder().build()).build();
164+
List<Record> expectedRecords = recordResult.getRecordList();
165+
handler.channelRead(mockChannelHandlerContext(channel), CDCResponse.newBuilder().setRequestId("foo_req").setStatus(Status.SUCCEED).setDataRecordResult(recordResult).build());
166+
verify(consumer).accept(expectedRecords);
167+
assertThat(expectedRecords.size(), is(1));
168+
ArgumentCaptor<CDCRequest> requestCaptor = ArgumentCaptor.forClass(CDCRequest.class);
169+
verify(channel).writeAndFlush(requestCaptor.capture());
170+
CDCRequest ackRequest = requestCaptor.getValue();
171+
assertThat(ackRequest.getType(), is(Type.ACK_STREAMING));
172+
assertThat(ackRequest.getAckStreamingRequestBody().getAckId(), is("ack_1"));
173+
}
174+
175+
@Test
176+
void assertHandleSucceedWithoutPayload() {
177+
ClientConnectionContext connectionContext = new ClientConnectionContext();
178+
ResponseFuture responseFuture = new ResponseFuture("foo_req", Type.DROP_STREAMING);
179+
connectionContext.getResponseFutureMap().put("foo_req", responseFuture);
180+
handler.channelRead(mockChannelHandlerContext(mockChannel(connectionContext)), CDCResponse.newBuilder().setRequestId("foo_req").setStatus(Status.SUCCEED).build());
181+
assertDoesNotThrow(() -> responseFuture.waitResponseResult(500L, connectionContext));
182+
}
183+
184+
@Test
185+
void assertExceptionCaughtDelegates() {
186+
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
187+
RuntimeException expectedCause = new RuntimeException("mock");
188+
handler.exceptionCaught(ctx, expectedCause);
189+
verify(exceptionHandler).handleException(ctx, expectedCause);
190+
}
191+
192+
private Channel mockChannel(final ClientConnectionContext context) {
193+
Channel result = mock(Channel.class);
194+
AttributeMap attributeMap = new DefaultAttributeMap();
195+
when(result.attr(ClientConnectionContext.CONTEXT_KEY)).thenAnswer(invocation -> attributeMap.attr(invocation.getArgument(0)));
196+
result.attr(ClientConnectionContext.CONTEXT_KEY).set(context);
197+
when(result.writeAndFlush(any())).thenReturn(mock(ChannelFuture.class));
198+
return result;
199+
}
200+
201+
private ChannelHandlerContext mockChannelHandlerContext(final Channel channel) {
202+
ChannelHandlerContext result = mock(ChannelHandlerContext.class);
203+
when(result.channel()).thenReturn(channel);
204+
when(result.fireChannelInactive()).thenReturn(result);
205+
return result;
206+
}
207+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shardingsphere.data.pipeline.cdc.client.handler;
19+
20+
import io.netty.channel.ChannelHandlerContext;
21+
import io.netty.channel.embedded.EmbeddedChannel;
22+
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
23+
import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.extension.ExtendWith;
27+
import org.mockito.Mock;
28+
import org.mockito.junit.jupiter.MockitoExtension;
29+
30+
import static org.mockito.ArgumentMatchers.any;
31+
import static org.mockito.Mockito.never;
32+
import static org.mockito.Mockito.timeout;
33+
import static org.mockito.Mockito.verify;
34+
import static org.mockito.Mockito.when;
35+
36+
@ExtendWith(MockitoExtension.class)
37+
class RetryStreamingExceptionHandlerTest {
38+
39+
@Mock
40+
private CDCClient cdcClient;
41+
42+
@Mock
43+
private ChannelHandlerContext channelHandlerContext;
44+
45+
@BeforeEach
46+
void setUp() {
47+
ClientConnectionContext connectionContext = new ClientConnectionContext();
48+
connectionContext.getStreamingIds().add("foo_stream_id");
49+
EmbeddedChannel channel = new EmbeddedChannel();
50+
channel.attr(ClientConnectionContext.CONTEXT_KEY).set(connectionContext);
51+
when(channelHandlerContext.channel()).thenReturn(channel);
52+
}
53+
54+
@Test
55+
void assertRestartStreamingWhenRetryTimesNotExceed() {
56+
new RetryStreamingExceptionHandler(cdcClient, 2, 10).handleException(channelHandlerContext, new RuntimeException(""));
57+
verify(cdcClient, timeout(3000L)).restartStreaming("foo_stream_id");
58+
verify(cdcClient, never()).stopStreaming(any());
59+
}
60+
61+
@Test
62+
void assertStopStreamingWhenRetryTimesExceed() {
63+
new RetryStreamingExceptionHandler(cdcClient, 0, 10).handleException(channelHandlerContext, new RuntimeException(""));
64+
verify(cdcClient, timeout(3000L)).stopStreaming("foo_stream_id");
65+
verify(cdcClient, never()).restartStreaming(any());
66+
}
67+
}

0 commit comments

Comments
 (0)