Skip to content

Commit c3e546f

Browse files
authored
Refactor CDCClient (#37372)
* Refactor CDCClient * Refactor CDCClient
1 parent ce81563 commit c3e546f

2 files changed

Lines changed: 22 additions & 34 deletions

File tree

kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/CDCClient.java

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.shardingsphere.data.pipeline.cdc.client;
1919

20+
import com.google.common.base.Preconditions;
2021
import com.google.common.hash.Hashing;
2122
import io.netty.bootstrap.Bootstrap;
2223
import io.netty.buffer.PooledByteBufAllocator;
@@ -29,6 +30,7 @@
2930
import io.netty.handler.codec.protobuf.ProtobufEncoder;
3031
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
3132
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
33+
import lombok.RequiredArgsConstructor;
3234
import lombok.SneakyThrows;
3335
import lombok.extern.slf4j.Slf4j;
3436
import org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
@@ -59,6 +61,7 @@
5961
/**
6062
* CDC client.
6163
*/
64+
@RequiredArgsConstructor
6265
@Slf4j
6366
public final class CDCClient implements AutoCloseable {
6467

@@ -68,20 +71,6 @@ public final class CDCClient implements AutoCloseable {
6871

6972
private Channel channel;
7073

71-
public CDCClient(final CDCClientConfiguration config) {
72-
validateParameter(config);
73-
this.config = config;
74-
}
75-
76-
private void validateParameter(final CDCClientConfiguration parameter) {
77-
if (null == parameter.getAddress() || parameter.getAddress().isEmpty()) {
78-
throw new IllegalArgumentException("The address parameter can't be null");
79-
}
80-
if (parameter.getPort() <= 0) {
81-
throw new IllegalArgumentException("The port must be greater than 0");
82-
}
83-
}
84-
8574
/**
8675
* Connect.
8776
*
@@ -127,13 +116,11 @@ public void await() throws InterruptedException {
127116
* @throws IllegalStateException the channel is not active
128117
*/
129118
public synchronized void login(final CDCLoginParameter parameter) {
130-
checkChannelActive();
119+
Preconditions.checkState(null != channel && channel.isActive(), "The channel is not active, call the `connect` method first.");
131120
ClientConnectionContext connectionContext = channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
132-
if (ClientConnectionStatus.LOGGED_IN == connectionContext.getStatus().get()) {
133-
throw new IllegalStateException("The client is already logged in");
134-
}
135-
LoginRequestBody loginRequestBody = LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder().setUsername(parameter.getUsername())
136-
.setPassword(Hashing.sha256().hashBytes(parameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
121+
Preconditions.checkState(ClientConnectionStatus.LOGGED_IN != connectionContext.getStatus().get(), "The client is already logged in.");
122+
LoginRequestBody loginRequestBody = LoginRequestBody.newBuilder().setType(LoginType.BASIC).setBasicBody(BasicBody.newBuilder()
123+
.setUsername(parameter.getUsername()).setPassword(Hashing.sha256().hashBytes(parameter.getPassword().getBytes()).toString().toUpperCase()).build()).build();
137124
String requestId = RequestIdUtils.generateRequestId();
138125
CDCRequest data = CDCRequest.newBuilder().setType(Type.LOGIN).setVersion(1).setRequestId(requestId).setLoginRequestBody(loginRequestBody).build();
139126
ResponseFuture responseFuture = new ResponseFuture(requestId, Type.LOGIN);
@@ -143,21 +130,15 @@ public synchronized void login(final CDCLoginParameter parameter) {
143130
log.info("Login success, username: {}", parameter.getUsername());
144131
}
145132

146-
private void checkChannelActive() {
147-
if (null == channel || !channel.isActive()) {
148-
throw new IllegalStateException("The channel is not active, call the `connect` method first");
149-
}
150-
}
151-
152133
/**
153134
* Start streaming.
154135
*
155136
* @param parameter parameter
156-
* @return streaming id
137+
* @return streaming ID
157138
*/
158139
public String startStreaming(final StartStreamingParameter parameter) {
159-
StreamDataRequestBody streamDataRequestBody = StreamDataRequestBody.newBuilder().setDatabase(parameter.getDatabase()).setFull(parameter.isFull())
160-
.addAllSourceSchemaTable(parameter.getSchemaTables()).build();
140+
StreamDataRequestBody streamDataRequestBody = StreamDataRequestBody.newBuilder()
141+
.setDatabase(parameter.getDatabase()).setFull(parameter.isFull()).addAllSourceSchemaTable(parameter.getSchemaTables()).build();
161142
String requestId = RequestIdUtils.generateRequestId();
162143
CDCRequest request = CDCRequest.newBuilder().setRequestId(requestId).setType(Type.STREAM_DATA).setStreamDataRequestBody(streamDataRequestBody).build();
163144
ClientConnectionContext connectionContext = channel.attr(ClientConnectionContext.CONTEXT_KEY).get();
@@ -172,7 +153,7 @@ public String startStreaming(final StartStreamingParameter parameter) {
172153
/**
173154
* Restart streaming.
174155
*
175-
* @param streamingId streaming id
156+
* @param streamingId streaming ID
176157
*/
177158
public void restartStreaming(final String streamingId) {
178159
String requestId = RequestIdUtils.generateRequestId();
@@ -189,7 +170,7 @@ public void restartStreaming(final String streamingId) {
189170
/**
190171
* Stop streaming.
191172
*
192-
* @param streamingId streaming id
173+
* @param streamingId streaming ID
193174
*/
194175
public void stopStreaming(final String streamingId) {
195176
String requestId = RequestIdUtils.generateRequestId();
@@ -207,7 +188,7 @@ public void stopStreaming(final String streamingId) {
207188
/**
208189
* Drop streaming.
209190
*
210-
* @param streamingId streaming id
191+
* @param streamingId streaming ID
211192
*/
212193
public void dropStreaming(final String streamingId) {
213194
String requestId = RequestIdUtils.generateRequestId();

kernel/data-pipeline/scenario/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/config/CDCClientConfiguration.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717

1818
package org.apache.shardingsphere.data.pipeline.cdc.client.config;
1919

20+
import com.google.common.base.Preconditions;
2021
import lombok.Getter;
21-
import lombok.RequiredArgsConstructor;
2222

2323
/**
2424
* CDC client configuration.
2525
*/
26-
@RequiredArgsConstructor
2726
@Getter
2827
public final class CDCClientConfiguration {
2928

@@ -32,4 +31,12 @@ public final class CDCClientConfiguration {
3231
private final int port;
3332

3433
private final int timeoutMillis;
34+
35+
public CDCClientConfiguration(final String address, final int port, final int timeoutMillis) {
36+
Preconditions.checkArgument(null != address && !address.isEmpty(), "The address parameter can't be null.");
37+
Preconditions.checkArgument(port > 0, "The port must be greater than 0.");
38+
this.address = address;
39+
this.port = port;
40+
this.timeoutMillis = timeoutMillis;
41+
}
3542
}

0 commit comments

Comments
 (0)