forked from databricks/databricks-jdbc
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathLazyThriftInlineArrowResult.java
More file actions
336 lines (299 loc) · 11.3 KB
/
LazyThriftInlineArrowResult.java
File metadata and controls
336 lines (299 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
package com.databricks.jdbc.api.impl.arrow;
import static com.databricks.jdbc.common.util.ArrowUtil.createArrowByteStream;
import static com.databricks.jdbc.common.util.ArrowUtil.getColumnInfoList;
import static com.databricks.jdbc.common.util.ArrowUtil.getSerializedSchema;
import static com.databricks.jdbc.common.util.ArrowUtil.getTotalRowsInResponse;
import com.databricks.jdbc.api.impl.IExecutionResult;
import com.databricks.jdbc.api.internal.IDatabricksSession;
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
import com.databricks.jdbc.exception.DatabricksParsingException;
import com.databricks.jdbc.exception.DatabricksSQLException;
import com.databricks.jdbc.log.JdbcLogger;
import com.databricks.jdbc.log.JdbcLoggerFactory;
import com.databricks.jdbc.model.client.thrift.generated.TFetchResultsResp;
import com.databricks.jdbc.model.core.ColumnInfo;
import com.databricks.jdbc.model.core.ColumnInfoTypeName;
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
import java.io.ByteArrayInputStream;
import java.sql.SQLException;
import java.util.List;
/**
* Lazy implementation for thrift-based inline Arrow results that fetches arrow batches on demand.
* Similar to LazyThriftResult but processes Arrow data instead of columnar thrift data.
*/
public class LazyThriftInlineArrowResult implements IExecutionResult {
private static final JdbcLogger LOGGER =
JdbcLoggerFactory.getLogger(LazyThriftInlineArrowResult.class);
private TFetchResultsResp currentResponse;
private ArrowResultChunk currentChunk;
private ArrowResultChunkIterator currentChunkIterator;
private long globalRowIndex;
private final IDatabricksSession session;
private final IDatabricksStatementInternal statement;
private boolean hasReachedEnd;
private boolean isClosed;
private long totalRowsFetched;
private List<ColumnInfo> columnInfos;
private byte[] cachedSchema; // Cache schema from first response for subsequent batches
/**
* Creates a new LazyThriftInlineArrowResult that lazily fetches arrow data on demand.
*
* @param initialResponse the initial response from the server
* @param statement the statement that generated this result
* @param session the session to use for fetching additional data
* @throws DatabricksSQLException if the initial response cannot be processed
*/
public LazyThriftInlineArrowResult(
TFetchResultsResp initialResponse,
IDatabricksStatementInternal statement,
IDatabricksSession session)
throws DatabricksSQLException {
this.currentResponse = initialResponse;
this.statement = statement;
this.session = session;
this.globalRowIndex = -1;
this.hasReachedEnd = false;
this.isClosed = false;
this.totalRowsFetched = 0;
// Initialize column info from metadata
this.columnInfos = getColumnInfoList(initialResponse.getResultSetMetadata());
// Cache the schema from the first response for use in subsequent batches
try {
this.cachedSchema = getSerializedSchema(initialResponse.getResultSetMetadata());
} catch (DatabricksParsingException e) {
LOGGER.error("Failed to cache Arrow schema: {}", e.getMessage(), e);
throw new DatabricksSQLException(
"Failed to cache Arrow schema", e, DatabricksDriverErrorCode.INLINE_CHUNK_PARSING_ERROR);
}
// Load initial chunk
loadCurrentChunk();
LOGGER.debug(
"LazyThriftInlineArrowResult initialized with {} rows in first chunk, hasMoreRows: {}",
currentChunk.numRows,
currentResponse.hasMoreRows);
}
/**
* Gets the value at the specified column index for the current row.
*
* @param columnIndex the zero-based column index
* @return the value at the specified column
* @throws DatabricksSQLException if the result is closed, cursor is invalid, or column index is
* out of bounds
*/
@Override
public Object getObject(int columnIndex) throws DatabricksSQLException {
validateGetObjectState(columnIndex);
ColumnInfo columnInfo = columnInfos.get(columnIndex);
ColumnInfoTypeName requiredType = columnInfo.getTypeName();
String arrowMetadata = currentChunkIterator.getType(columnIndex);
if (arrowMetadata == null) {
arrowMetadata = columnInfo.getTypeText();
}
return ArrowStreamResult.getObjectWithComplexTypeHandling(
session, currentChunkIterator, columnIndex, requiredType, arrowMetadata, columnInfo);
}
/**
* Validates the state before getting an object at the specified column index.
*
* @param columnIndex the zero-based column index to validate
* @throws DatabricksSQLException if the result is closed, cursor is invalid, or column index is
* out of bounds
*/
private void validateGetObjectState(int columnIndex) throws DatabricksSQLException {
if (isClosed) {
LOGGER.warn("Attempted to get object from closed result");
throw new DatabricksSQLException(
"Result is already closed", DatabricksDriverErrorCode.STATEMENT_CLOSED);
}
if (globalRowIndex == -1) {
LOGGER.warn("Attempted to get object before calling next()");
throw new DatabricksSQLException(
"Cursor is before first row", DatabricksDriverErrorCode.INVALID_STATE);
}
if (currentChunkIterator == null) {
LOGGER.warn("No current chunk available when getting object");
throw new DatabricksSQLException(
"No current chunk available", DatabricksDriverErrorCode.INVALID_STATE);
}
if (columnIndex < 0 || columnIndex >= columnInfos.size()) {
LOGGER.warn("Column index {} out of bounds (size: {})", columnIndex, columnInfos.size());
throw new DatabricksSQLException(
"Column index out of bounds " + columnIndex, DatabricksDriverErrorCode.INVALID_STATE);
}
}
/**
* Gets the current row index (0-based). Returns -1 if before the first row.
*
* @return the current row index
*/
@Override
public long getCurrentRow() {
return globalRowIndex;
}
/**
* Moves the cursor to the next row. Fetches additional data from server if needed.
*
* @return true if there is a next row, false if at the end
* @throws DatabricksSQLException if an error occurs while fetching data
*/
@Override
public boolean next() throws SQLException {
if (isClosed || hasReachedEnd) {
return false;
}
if (!hasNext()) {
return false;
}
// Try to advance in current chunk
if (currentChunkIterator != null && currentChunkIterator.hasNextRow()) {
boolean advanced = currentChunkIterator.nextRow();
if (advanced) {
globalRowIndex++;
return true;
}
}
// Need to fetch next chunk
while (currentResponse.hasMoreRows) {
fetchNextChunk();
// If we got a chunk with data, advance to first row
if (currentChunkIterator != null && currentChunkIterator.hasNextRow()) {
boolean advanced = currentChunkIterator.nextRow();
if (advanced) {
globalRowIndex++;
return true;
}
}
}
// No more data available
hasReachedEnd = true;
return false;
}
/**
* Checks if there are more rows available without advancing the cursor.
*
* @return true if there are more rows, false otherwise
*/
@Override
public boolean hasNext() {
if (isClosed || hasReachedEnd) {
return false;
}
// Check if there are more rows in current chunk
if (currentChunkIterator != null && currentChunkIterator.hasNextRow()) {
return true;
}
// Check if there are more chunks to fetch
return currentResponse.hasMoreRows;
}
/** Closes this result and releases associated resources. */
@Override
public void close() {
this.isClosed = true;
if (currentChunk != null) {
currentChunk.releaseChunk();
}
this.currentChunk = null;
this.currentChunkIterator = null;
this.currentResponse = null;
LOGGER.debug(
"LazyThriftInlineArrowResult closed after fetching {} total rows", totalRowsFetched);
}
/**
* Gets the number of rows in the current chunk.
*
* @return the number of rows in the current chunk
*/
@Override
public long getRowCount() {
return currentChunk != null ? currentChunk.numRows : 0;
}
/**
* Gets the chunk count. Always returns 0 for lazy thrift inline arrow results.
*
* @return 0 (lazy results don't use chunks in the same sense as buffered results)
*/
@Override
public long getChunkCount() {
return 0;
}
/**
* Gets the Arrow metadata for the current chunk.
*
* @return list of arrow metadata strings, or null if no chunk is loaded
* @throws DatabricksSQLException if an error occurs
*/
public List<String> getArrowMetadata() throws DatabricksSQLException {
if (currentChunk == null) {
return null;
}
return currentChunk.getArrowMetadata();
}
private void loadCurrentChunk() throws DatabricksSQLException {
try {
ByteArrayInputStream byteStream =
createArrowByteStream(cachedSchema, currentResponse, getClass());
long rowCount = getTotalRowsInResponse(currentResponse);
ArrowResultChunk.Builder builder =
ArrowResultChunk.builder().withInputStream(byteStream, rowCount);
if (statement != null) {
builder.withStatementId(statement.getStatementId());
}
currentChunk = builder.build();
currentChunkIterator = currentChunk.getChunkIterator();
totalRowsFetched += rowCount;
LOGGER.debug(
"Loaded arrow chunk with {} rows, total fetched: {}", rowCount, totalRowsFetched);
} catch (DatabricksParsingException e) {
LOGGER.error("Failed to load current chunk: {}", e.getMessage());
// Clean up any partially loaded chunk to prevent memory leaks
if (currentChunk != null) {
currentChunk.releaseChunk();
currentChunk = null;
}
currentChunkIterator = null;
hasReachedEnd = true;
throw new DatabricksSQLException(
"Failed to process arrow data", DatabricksDriverErrorCode.INLINE_CHUNK_PARSING_ERROR);
}
}
/**
* Fetches the next chunk of data from the server and creates arrow chunks.
*
* @throws SQLException if the fetch operation fails
*/
private void fetchNextChunk() throws SQLException {
try {
LOGGER.debug("Fetching next arrow chunk, current total rows fetched: {}", totalRowsFetched);
currentResponse = session.getDatabricksClient().getMoreResults(statement);
// Release previous chunk to free memory
if (currentChunk != null) {
currentChunk.releaseChunk();
}
loadCurrentChunk();
LOGGER.debug(
"Fetched arrow chunk with {} rows, hasMoreRows: {}",
currentChunk.numRows,
currentResponse.hasMoreRows);
} catch (DatabricksSQLException e) {
LOGGER.error("Failed to fetch next arrow chunk: {}", e.getMessage());
hasReachedEnd = true;
throw e;
}
}
/**
* Gets the total number of rows fetched from the server so far.
*
* @return the total number of rows fetched from the server
*/
long getTotalRowsFetched() {
return totalRowsFetched;
}
/**
* Checks if all data has been fetched from the server.
*
* @return true if all data has been fetched (either reached end or maxRows limit)
*/
boolean isCompletelyFetched() {
return hasReachedEnd || !currentResponse.hasMoreRows;
}
}