Skip to content

Commit eaef770

Browse files
authored
[PECOBLR-384] Add implementation for fetching statement status for async execution (#824)
* Add implementation for fetching execution status for async execution
1 parent c0719a7 commit eaef770

8 files changed

Lines changed: 173 additions & 11 deletions

File tree

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### Added
66
- Support for fetching tables and views across all catalogs using SHOW TABLES FROM/IN ALL CATALOGS in the SQL Exec API.
77
- Support for Token Exchange in OAuth flows where in third party tokens are exchanged for InHouse tokens.
8+
- Added support for polling of statementStatus and sqlState for async SQL execution.
89

910
### Updated
1011
-
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.databricks.jdbc.api;
2+
3+
/**
4+
* Represents the possible states of a SQL statement execution in Databricks. This enum is used to
5+
* track the progress and status of SQL statements executed through the Databricks JDBC API.
6+
*/
7+
public enum ExecutionState {
8+
// The statement is in a pending state and has not yet started executing.
9+
PENDING,
10+
// The statement is currently executing.
11+
RUNNING,
12+
// The statement has completed successfully.
13+
SUCCEEDED,
14+
// The statement has completed with an error.
15+
FAILED,
16+
// The statement has been closed and is no longer available.
17+
CLOSED,
18+
// The statement has been cancelled by the user.
19+
ABORTED;
20+
}

src/main/java/com/databricks/jdbc/api/IDatabricksResultSet.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,19 @@ public interface IDatabricksResultSet extends ResultSet {
4343
* to monitor the execution progress and state of the statement.
4444
*
4545
* @return The current {@link StatementStatus} of the statement
46+
* @deprecated Use {@link #getExecutionStatus()} instead.
4647
*/
48+
@Deprecated
4749
StatementStatus getStatementStatus();
4850

51+
/**
52+
* Retrieves the current status of the statement associated with this result set. This can be used
53+
* to monitor the execution progress and state of the statement.
54+
*
55+
* @return The current {@link StatementStatus} of the statement
56+
*/
57+
IExecutionStatus getExecutionStatus();
58+
4959
/**
5060
* Retrieves the number of rows affected by the SQL statement. For SELECT statements or statements
5161
* that don't modify data, this will return 0.
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.databricks.jdbc.api;
2+
3+
public interface IExecutionStatus {
4+
/**
5+
* Returns the error message if the statement execution failed.
6+
*
7+
* @return the error message, or null if there was no error
8+
*/
9+
String getErrorMessage();
10+
11+
/**
12+
* Returns the SQL state code if the statement execution failed.
13+
*
14+
* @return the SQL state code, or null if there was no error
15+
*/
16+
String getSqlState();
17+
18+
/**
19+
* Returns the current state of the statement execution.
20+
*
21+
* @return the current state of the statement execution
22+
*/
23+
ExecutionState getExecutionState();
24+
}

src/main/java/com/databricks/jdbc/api/impl/DatabricksResultSet.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static com.databricks.jdbc.common.util.DatabricksTypeUtil.STRUCT;
77

88
import com.databricks.jdbc.api.IDatabricksResultSet;
9+
import com.databricks.jdbc.api.IExecutionStatus;
910
import com.databricks.jdbc.api.impl.arrow.ArrowStreamResult;
1011
import com.databricks.jdbc.api.impl.converters.ConverterHelper;
1112
import com.databricks.jdbc.api.impl.converters.ObjectConverter;
@@ -56,7 +57,7 @@ enum ResultSetType {
5657

5758
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(DatabricksResultSet.class);
5859
protected static final String AFFECTED_ROWS_COUNT = "num_affected_rows";
59-
private final StatementStatus statementStatus;
60+
private final ExecutionStatus executionStatus;
6061
private final StatementId statementId;
6162
private final IExecutionResult executionResult;
6263
private final DatabricksResultSetMetaData resultSetMetaData;
@@ -81,7 +82,7 @@ public DatabricksResultSet(
8182
IDatabricksSession session,
8283
IDatabricksStatementInternal parentStatement)
8384
throws DatabricksSQLException {
84-
this.statementStatus = statementStatus;
85+
this.executionStatus = new ExecutionStatus(statementStatus);
8586
this.statementId = statementId;
8687
if (resultData != null) {
8788
this.executionResult =
@@ -122,7 +123,7 @@ public DatabricksResultSet(
122123
IExecutionResult executionResult,
123124
DatabricksResultSetMetaData resultSetMetaData,
124125
boolean complexDatatypeSupport) {
125-
this.statementStatus = statementStatus;
126+
this.executionStatus = new ExecutionStatus(statementStatus);
126127
this.statementId = statementId;
127128
this.executionResult = executionResult;
128129
this.resultSetMetaData = resultSetMetaData;
@@ -143,7 +144,7 @@ public DatabricksResultSet(
143144
IDatabricksStatementInternal parentStatement,
144145
IDatabricksSession session)
145146
throws SQLException {
146-
this.statementStatus = statementStatus;
147+
this.executionStatus = new ExecutionStatus(statementStatus);
147148
this.statementId = statementId;
148149
if (resultsResp != null) {
149150
this.executionResult =
@@ -193,7 +194,7 @@ public DatabricksResultSet(
193194
int[] isNullables,
194195
Object[][] rows,
195196
StatementType statementType) {
196-
this.statementStatus = statementStatus;
197+
this.executionStatus = new ExecutionStatus(statementStatus);
197198
this.statementId = statementId;
198199
this.executionResult = ExecutionResultFactory.getResultSet(rows);
199200
this.resultSetMetaData =
@@ -223,7 +224,7 @@ public DatabricksResultSet(
223224
List<Nullable> columnNullables,
224225
List<List<Object>> rows,
225226
StatementType statementType) {
226-
this.statementStatus = statementStatus;
227+
this.executionStatus = new ExecutionStatus(statementStatus);
227228
this.statementId = statementId;
228229
this.executionResult = ExecutionResultFactory.getResultSet(rows);
229230
this.resultSetMetaData =
@@ -249,7 +250,7 @@ public DatabricksResultSet(
249250
List<ColumnMetadata> columnMetadataList,
250251
List<List<Object>> rows,
251252
StatementType statementType) {
252-
this.statementStatus = statementStatus;
253+
this.executionStatus = new ExecutionStatus(statementStatus);
253254
this.statementId = statementId;
254255
this.executionResult = ExecutionResultFactory.getResultSet(rows);
255256
this.resultSetMetaData =
@@ -1734,7 +1735,12 @@ public String getStatementId() {
17341735

17351736
@Override
17361737
public StatementStatus getStatementStatus() {
1737-
return statementStatus;
1738+
return executionStatus.getSdkStatus();
1739+
}
1740+
1741+
@Override
1742+
public IExecutionStatus getExecutionStatus() {
1743+
return executionStatus;
17381744
}
17391745

17401746
@Override
@@ -1837,7 +1843,7 @@ private BigDecimal applyScaleToBigDecimal(BigDecimal bigDecimal, int columnIndex
18371843
@Override
18381844
public String toString() {
18391845
return (new ToStringer(DatabricksResultSet.class))
1840-
.add("statementStatus", this.statementStatus)
1846+
.add("statementStatus", this.executionStatus)
18411847
.add("statementId", this.statementId)
18421848
.add("statementType", this.statementType)
18431849
.add("updateCount", this.updateCount)

src/main/java/com/databricks/jdbc/api/impl/EmptyResultSet.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.databricks.jdbc.api.impl;
22

33
import com.databricks.jdbc.api.IDatabricksResultSet;
4+
import com.databricks.jdbc.api.IExecutionStatus;
45
import com.databricks.jdbc.api.internal.IDatabricksResultSetInternal;
56
import com.databricks.jdbc.exception.DatabricksSQLException;
67
import com.databricks.jdbc.model.core.StatementStatus;
@@ -1127,6 +1128,11 @@ public StatementStatus getStatementStatus() {
11271128
return new StatementStatus().setState(StatementState.SUCCEEDED);
11281129
}
11291130

1131+
@Override
1132+
public IExecutionStatus getExecutionStatus() {
1133+
return new ExecutionStatus(new StatementStatus().setState(StatementState.SUCCEEDED));
1134+
}
1135+
11301136
@Override
11311137
public long getUpdateCount() throws SQLException {
11321138
return 0;
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package com.databricks.jdbc.api.impl;
2+
3+
import com.databricks.jdbc.api.ExecutionState;
4+
import com.databricks.jdbc.api.IExecutionStatus;
5+
import com.databricks.jdbc.model.core.StatementStatus;
6+
import com.databricks.sdk.service.sql.StatementState;
7+
8+
/**
9+
* This class implements the IStatementStatus interface and provides a default implementation for
10+
* the methods defined in the interface. It is used to represent the status of a SQL statement
11+
* execution in Databricks.
12+
*/
13+
class ExecutionStatus implements IExecutionStatus {
14+
private final ExecutionState state;
15+
private final String errorMessage;
16+
private final String sqlState;
17+
private final StatementStatus sdkStatus;
18+
19+
public ExecutionStatus(StatementStatus status) {
20+
this.state = getStateFromSdkState(status.getState());
21+
this.errorMessage = status.getError() != null ? status.getError().getMessage() : null;
22+
this.sqlState = status.getSqlState();
23+
this.sdkStatus = status;
24+
}
25+
26+
@Override
27+
public String getErrorMessage() {
28+
return errorMessage;
29+
}
30+
31+
@Override
32+
public String getSqlState() {
33+
return sqlState;
34+
}
35+
36+
@Override
37+
public ExecutionState getExecutionState() {
38+
return state;
39+
}
40+
41+
StatementStatus getSdkStatus() {
42+
return sdkStatus;
43+
}
44+
45+
private ExecutionState getStateFromSdkState(StatementState state) {
46+
if (state == null) {
47+
return ExecutionState.PENDING;
48+
}
49+
// Map the SDK statement state to the JDBC statement state
50+
switch (state) {
51+
case PENDING:
52+
return ExecutionState.PENDING;
53+
case RUNNING:
54+
return ExecutionState.RUNNING;
55+
case SUCCEEDED:
56+
return ExecutionState.SUCCEEDED;
57+
case FAILED:
58+
return ExecutionState.FAILED;
59+
case CANCELED:
60+
return ExecutionState.ABORTED;
61+
case CLOSED:
62+
return ExecutionState.CLOSED;
63+
// should never reach here
64+
default:
65+
throw new IllegalArgumentException("Unknown statement execution state: " + state);
66+
}
67+
}
68+
}

src/test/java/com/databricks/jdbc/api/impl/DatabricksResultSetTest.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
import static org.mockito.Mockito.mock;
99
import static org.mockito.Mockito.when;
1010

11+
import com.databricks.jdbc.api.ExecutionState;
1112
import com.databricks.jdbc.api.IDatabricksResultSet;
13+
import com.databricks.jdbc.api.IExecutionStatus;
1214
import com.databricks.jdbc.api.impl.volume.VolumeOperationResult;
1315
import com.databricks.jdbc.api.internal.IDatabricksResultSetInternal;
1416
import com.databricks.jdbc.api.internal.IDatabricksSession;
@@ -20,6 +22,7 @@
2022
import com.databricks.jdbc.exception.DatabricksSQLFeatureNotSupportedException;
2123
import com.databricks.jdbc.model.client.thrift.generated.*;
2224
import com.databricks.jdbc.model.core.StatementStatus;
25+
import com.databricks.sdk.service.sql.ServiceError;
2326
import com.databricks.sdk.service.sql.StatementState;
2427
import java.io.*;
2528
import java.math.BigDecimal;
@@ -63,6 +66,18 @@ private DatabricksResultSet getResultSet(
6366
false);
6467
}
6568

69+
private DatabricksResultSet getResultSet(
70+
StatementStatus statementState, IDatabricksStatementInternal statement) {
71+
return new DatabricksResultSet(
72+
statementState,
73+
STATEMENT_ID,
74+
StatementType.METADATA,
75+
statement,
76+
mockedExecutionResult,
77+
mockedResultSetMetadata,
78+
false);
79+
}
80+
6681
private DatabricksResultSet getThriftResultSetMetadata() throws SQLException {
6782
TColumnValue columnValue = new TColumnValue();
6883
columnValue.setStringVal(new TStringValue().setValue("testString"));
@@ -118,8 +133,20 @@ void testThriftResultSet() throws SQLException {
118133

119134
@Test
120135
void testGetStatementStatus() {
121-
DatabricksResultSet resultSet = getResultSet(StatementState.PENDING, null);
122-
assertEquals(StatementState.PENDING, resultSet.getStatementStatus().getState());
136+
StatementStatus statementStatus =
137+
new StatementStatus()
138+
.setState(StatementState.FAILED)
139+
.setError(new ServiceError().setMessage("error"))
140+
.setSqlState("sqlState");
141+
DatabricksResultSet resultSet = getResultSet(statementStatus, null);
142+
assertEquals(STATEMENT_ID.toString(), resultSet.getStatementId());
143+
assertEquals(StatementState.FAILED, resultSet.getStatementStatus().getState());
144+
assertEquals(statementStatus, resultSet.getStatementStatus());
145+
146+
IExecutionStatus executionStatus = resultSet.getExecutionStatus();
147+
assertEquals(ExecutionState.FAILED, executionStatus.getExecutionState());
148+
assertEquals("error", executionStatus.getErrorMessage());
149+
assertEquals("sqlState", executionStatus.getSqlState());
123150
}
124151

125152
@Test

0 commit comments

Comments
 (0)