forked from databricks/databricks-jdbc
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathLazyThriftResult.java
More file actions
246 lines (219 loc) · 8.01 KB
/
LazyThriftResult.java
File metadata and controls
246 lines (219 loc) · 8.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
package com.databricks.jdbc.api.impl;
import static com.databricks.jdbc.common.util.DatabricksThriftUtil.createColumnarView;
import com.databricks.jdbc.api.internal.IDatabricksSession;
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
import com.databricks.jdbc.exception.DatabricksSQLException;
import com.databricks.jdbc.log.JdbcLogger;
import com.databricks.jdbc.log.JdbcLoggerFactory;
import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp;
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
import java.sql.SQLException;
public class LazyThriftResult implements IExecutionResult {
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(LazyThriftResult.class);
private TFetchResultsResp currentResponse;
private ColumnarRowView currentBatch;
private int currentBatchIndex;
private long globalRowIndex;
private final IDatabricksSession session;
private final IDatabricksStatementInternal statement;
private boolean hasReachedEnd;
private boolean isClosed;
private long totalRowsFetched;
/**
* Creates a new LazyThriftResult that lazily fetches data on demand.
*
* @param initialResponse the initial response from the server
* @param statement the statement that generated this result
* @param session the session to use for fetching additional data
* @throws DatabricksSQLException if the initial response cannot be processed
*/
public LazyThriftResult(
TFetchResultsResp initialResponse,
IDatabricksStatementInternal statement,
IDatabricksSession session)
throws DatabricksSQLException {
this.currentResponse = initialResponse;
this.statement = statement;
this.session = session;
this.globalRowIndex = -1;
this.currentBatchIndex = -1;
this.hasReachedEnd = false;
this.isClosed = false;
this.totalRowsFetched = 0;
// Load initial batch
loadCurrentBatch();
LOGGER.debug(
"LazyThriftResult initialized with {} rows in first batch, hasMoreRows: {}",
currentBatch.getRowCount(),
currentResponse.hasMoreRows);
}
/**
* Gets the value at the specified column index for the current row.
*
* @param columnIndex the zero-based column index
* @return the value at the specified column
* @throws DatabricksSQLException if the result is closed, cursor is invalid, or column index is
* out of bounds
*/
@Override
public Object getObject(int columnIndex) throws DatabricksSQLException {
if (isClosed) {
throw new DatabricksSQLException(
"Result is already closed", DatabricksDriverErrorCode.STATEMENT_CLOSED);
}
if (globalRowIndex == -1) {
throw new DatabricksSQLException(
"Cursor is before first row", DatabricksDriverErrorCode.INVALID_STATE);
}
if (currentBatchIndex < 0 || currentBatchIndex >= currentBatch.getRowCount()) {
throw new DatabricksSQLException(
"Invalid cursor position", DatabricksDriverErrorCode.INVALID_STATE);
}
if (columnIndex < 0 || columnIndex >= currentBatch.getColumnCount()) {
throw new DatabricksSQLException(
"Column index out of bounds " + columnIndex, DatabricksDriverErrorCode.INVALID_STATE);
}
return currentBatch.getValue(currentBatchIndex, columnIndex);
}
/**
* Gets the current row index (0-based). Returns -1 if before the first row.
*
* @return the current row index
*/
@Override
public long getCurrentRow() {
return globalRowIndex;
}
/**
* Moves the cursor to the next row. Fetches additional data from server if needed.
*
* @return true if there is a next row, false if at the end
* @throws SQLException if an error occurs while fetching data
*/
@Override
public boolean next() throws SQLException {
if (isClosed || hasReachedEnd) {
return false;
}
if (!hasNext()) {
// Ideally the client code should first call, hasNext() and then next()
// However, the client code like in DatabricksResultSet#next directly calls next
// So, this is a safeguard to ensure we don't move past the end
return false;
}
// Move to next row in current batch
currentBatchIndex++;
globalRowIndex++;
// Check if we need to fetch the next batch
if (currentBatchIndex >= currentBatch.getRowCount()) {
// Keep fetching until we get a non-empty batch or no more rows
while (currentResponse.hasMoreRows) {
fetchNextBatch();
// If we got a non-empty batch, we can proceed
if (currentBatch.getRowCount() > 0) {
currentBatchIndex = 0; // Reset to first row of new batch
break;
}
// If batch is still empty but hasMoreRows is false after fetch, we'll exit the loop
}
// If we exited the loop and still have an empty batch, we've reached the end
if (currentBatch.getRowCount() == 0) {
hasReachedEnd = true;
globalRowIndex--; // Revert the increment since we didn't actually move to a new row
return false;
}
}
return true;
}
/**
* Checks if there are more rows available without advancing the cursor.
*
* @return true if there are more rows, false otherwise
*/
@Override
public boolean hasNext() {
if (isClosed || hasReachedEnd) {
return false;
}
// Check if there are more rows in current batch
if (currentBatchIndex + 1 < currentBatch.getRowCount()) {
return true;
}
// Check if there are more batches to fetch
return currentResponse.hasMoreRows;
}
/** Closes this result and releases associated resources. */
@Override
public void close() {
this.isClosed = true;
this.currentBatch = null;
this.currentResponse = null;
LOGGER.debug("LazyThriftResult closed after fetching {} total rows", totalRowsFetched);
}
/**
* Gets the number of rows in the current batch.
*
* @return the number of rows in the current batch
*/
@Override
public long getRowCount() {
// Return the number of rows in the current batch
return currentBatch != null ? currentBatch.getRowCount() : 0;
}
/**
* Gets the chunk count. Always returns 0 for thrift columnar results.
*
* @return 0 (thrift results don't use chunks like Arrow)
*/
@Override
public long getChunkCount() {
// For thrift columnar results, we don't have chunks in the same sense as Arrow
return 0;
}
private void loadCurrentBatch() throws DatabricksSQLException {
currentBatch = createColumnarView(currentResponse.getResults());
currentBatchIndex = -1; // Reset batch index
totalRowsFetched += currentBatch.getRowCount();
LOGGER.debug(
"Loaded batch with {} rows, total fetched: {}",
currentBatch.getRowCount(),
totalRowsFetched);
}
/**
* Fetches the next batch of data from the server and creates columnar views.
*
* @throws SQLException if the fetch operation fails
*/
private void fetchNextBatch() throws SQLException {
try {
LOGGER.debug("Fetching next batch, current total rows fetched: {}", totalRowsFetched);
currentResponse = session.getDatabricksClient().getMoreResults(statement);
loadCurrentBatch();
LOGGER.debug(
"Fetched batch with {} rows, hasMoreRows: {}",
currentBatch.getRowCount(),
currentResponse.hasMoreRows);
} catch (DatabricksSQLException e) {
LOGGER.error("Failed to fetch next batch: {}", e.getMessage());
hasReachedEnd = true;
throw e; // Propagate exception to fail fast
}
}
/**
* Gets the total number of rows fetched from the server so far. This is different from
* getRowCount() which returns current batch size.
*
* @return the total number of rows fetched from the server
*/
public long getTotalRowsFetched() {
return totalRowsFetched;
}
/**
* Checks if all data has been fetched from the server.
*
* @return true if all data has been fetched (either reached end or maxRows limit)
*/
public boolean isCompletelyFetched() {
return hasReachedEnd || !currentResponse.hasMoreRows;
}
}