Skip to content

Commit 02b2bc4

Browse files
authored
Merge pull request #9 from weiguoz/polish_code
Polish code
2 parents 7fbf795 + 9469299 commit 02b2bc4

File tree

5 files changed

+22
-77
lines changed

5 files changed

+22
-77
lines changed

pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,6 @@
4040
<artifactId>commons-lang3</artifactId>
4141
<version>3.9</version>
4242
</dependency>
43-
<!-- coding tools -->
44-
<dependency>
45-
<groupId>org.projectlombok</groupId>
46-
<artifactId>lombok</artifactId>
47-
<version>1.18.10</version>
48-
</dependency>
4943
<!-- unit test tools -->
5044
<dependency>
5145
<groupId>junit</groupId>

src/main/java/org/sqlflow/client/SQLFlow.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
package org.sqlflow.client;
1717

1818
import io.grpc.StatusRuntimeException;
19-
import org.sqlflow.client.model.RequestHeader;
2019
import proto.Sqlflow.JobStatus;
20+
import proto.Sqlflow.Session;
2121

2222
public interface SQLFlow {
2323
/**
@@ -31,15 +31,21 @@ public interface SQLFlow {
3131
/**
3232
* Submit a task to SQLFlow server. This method return immediately.
3333
*
34-
* @param header: specify datasource, user ...
34+
* @param session: specify dbConnStr(datasource), user Id ...
35+
* <p>datasource == maxcomputer
36+
* maxcompute://{accesskey_id}:{accesskey_secret}@{endpoint}?curr_project={curr_project}&scheme={scheme}
37+
* <p>datasource == mysql
38+
* mysql://{username}:{password}@tcp({address})/{dbname}[?param1=value1&...&paramN=valueN]
39+
* <p>datasource == hive
40+
* hive://user:password@ip:port/dbname[?auth=<auth_mechanism>&session.<cfg_key1>=<cfg_value1>...&session<cfg_keyN>=valueN]
3541
* @param sql: sql program.
3642
* <p>Example: "SELECT * FROM iris.test; SELECT * FROM iris.iris TO TRAIN DNNClassifier
3743
* COLUMN..." *
3844
* @return return a job id for tracking.
3945
* @throws IllegalArgumentException header or sql error
4046
* @throws StatusRuntimeException
4147
*/
42-
String submit(RequestHeader header, String sql)
48+
String submit(Session session, String sql)
4349
throws IllegalArgumentException, StatusRuntimeException;
4450

4551
/**
@@ -58,5 +64,5 @@ String submit(RequestHeader header, String sql)
5864
*
5965
* @throws InterruptedException thrown by awaitTermination
6066
*/
61-
void shutdown() throws InterruptedException;
67+
void release() throws InterruptedException;
6268
}

src/main/java/org/sqlflow/client/impl/SQLFlowImpl.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515

1616
package org.sqlflow.client.impl;
1717

18+
import com.google.common.annotations.VisibleForTesting;
1819
import io.grpc.ManagedChannel;
1920
import io.grpc.ManagedChannelBuilder;
2021
import io.grpc.StatusRuntimeException;
2122
import java.util.concurrent.TimeUnit;
2223
import org.apache.commons.lang3.StringUtils;
2324
import org.sqlflow.client.SQLFlow;
24-
import org.sqlflow.client.model.RequestHeader;
2525
import proto.SQLFlowGrpc;
2626
import proto.Sqlflow.Job;
2727
import proto.Sqlflow.JobStatus;
@@ -37,30 +37,21 @@ public void init(String serverUrl) {
3737
blockingStub = SQLFlowGrpc.newBlockingStub(channel);
3838
}
3939

40+
@VisibleForTesting
4041
public SQLFlowImpl(ManagedChannel channel) {
4142
this.channel = channel;
4243
blockingStub = SQLFlowGrpc.newBlockingStub(channel);
4344
}
4445

45-
public String submit(RequestHeader header, String sql)
46+
public String submit(Session session, String sql)
4647
throws IllegalArgumentException, StatusRuntimeException {
47-
if (header == null || StringUtils.isAnyBlank(header.getDataSource(), header.getUserId())) {
48+
if (session == null || StringUtils.isAnyBlank(session.getDbConnStr(), session.getUserId())) {
4849
throw new IllegalArgumentException("data source and userId are not allowed to be empty");
4950
}
5051
if (StringUtils.isBlank(sql)) {
5152
throw new IllegalArgumentException("sql is empty");
5253
}
5354

54-
Session session =
55-
Session.newBuilder()
56-
.setDbConnStr(header.getDataSource())
57-
.setUserId(header.getUserId())
58-
.setExitOnSubmit(header.isExitOnSubmit())
59-
.setHiveLocation(StringUtils.defaultString(header.getHiveLocation()))
60-
.setHdfsNamenodeAddr(StringUtils.defaultString(header.getHdfsNameNode()))
61-
.setHdfsUser(StringUtils.defaultString(header.getHdfsUser()))
62-
.setHdfsPass(StringUtils.defaultString(header.getHdfsPassword()))
63-
.build();
6455
Request req = Request.newBuilder().setSession(session).setSql(sql).build();
6556
try {
6657
Job job = blockingStub.submit(req);
@@ -81,7 +72,7 @@ public JobStatus fetch(String jobId) throws StatusRuntimeException {
8172
}
8273
}
8374

84-
public void shutdown() throws InterruptedException {
75+
public void release() throws InterruptedException {
8576
try {
8677
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
8778
} catch (InterruptedException e) {

src/main/java/org/sqlflow/client/model/RequestHeader.java

Lines changed: 0 additions & 47 deletions
This file was deleted.

src/test/java/org/sqlflow/client/SQLFlowTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.mockito.ArgumentCaptor;
3535
import org.mockito.ArgumentMatchers;
3636
import org.sqlflow.client.impl.SQLFlowImpl;
37-
import org.sqlflow.client.model.RequestHeader;
3837
import proto.SQLFlowGrpc;
3938
import proto.Sqlflow.Job;
4039
import proto.Sqlflow.JobStatus;
@@ -96,10 +95,12 @@ public void testSubmit() {
9695
String sql = "SELECT * TO TRAIN DNNClassify WITH ... COLUMN ... INTO ..";
9796

9897
ArgumentCaptor<Request> requestCaptor = ArgumentCaptor.forClass(Request.class);
99-
RequestHeader header = new RequestHeader();
100-
header.setUserId(userId);
101-
header.setDataSource("mysql://root@root@127.0.0.1:3306/iris");
102-
String jobId = client.submit(header, sql);
98+
Session session =
99+
Session.newBuilder()
100+
.setUserId(userId)
101+
.setDbConnStr("mysql://root@root@127.0.0.1:3306/iris")
102+
.build();
103+
String jobId = client.submit(session, sql);
103104
assertEquals(mockJobId(userId, sql), jobId);
104105
verify(grpcService)
105106
.submit(requestCaptor.capture(), ArgumentMatchers.<StreamObserver<Job>>any());
@@ -125,6 +126,6 @@ public void testFetch() {
125126

126127
@After
127128
public void tearDown() throws Exception {
128-
client.shutdown();
129+
client.release();
129130
}
130131
}

0 commit comments

Comments
 (0)