forked from databricks/databricks-jdbc
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathInlineChunkProvider.java
More file actions
103 lines (89 loc) · 3.18 KB
/
InlineChunkProvider.java
File metadata and controls
103 lines (89 loc) · 3.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package com.databricks.jdbc.api.impl.arrow;
import static com.databricks.jdbc.common.util.DecompressionUtil.decompress;
import com.databricks.jdbc.common.CompressionCodec;
import com.databricks.jdbc.exception.DatabricksParsingException;
import com.databricks.jdbc.exception.DatabricksSQLException;
import com.databricks.jdbc.log.JdbcLogger;
import com.databricks.jdbc.log.JdbcLoggerFactory;
import com.databricks.jdbc.model.core.ResultData;
import com.databricks.jdbc.model.core.ResultManifest;
import com.databricks.jdbc.model.telemetry.enums.DatabricksDriverErrorCode;
import com.google.common.annotations.VisibleForTesting;
import java.io.ByteArrayInputStream;
/** Class to manage inline Arrow chunks */
public class InlineChunkProvider implements ChunkProvider {
private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(InlineChunkProvider.class);
private long totalRows;
private long currentChunkIndex;
private boolean isClosed;
private final ArrowResultChunk
arrowResultChunk; // There is only one packet of data in case of inline arrow
/**
* Constructor for inline arrow chunk provider from {@link ResultData} and {@link ResultManifest}.
*
* @param resultData Data object containing the result data
* @param resultManifest Manifest object containing the result metadata
* @throws DatabricksSQLException if there is an error in processing the inline arrow data
*/
InlineChunkProvider(ResultData resultData, ResultManifest resultManifest)
throws DatabricksSQLException {
this.currentChunkIndex = -1;
this.totalRows = resultManifest.getTotalRowCount();
// Decompress the inline data if applicable and create an ArrowResultChunk
CompressionCodec compressionType = resultManifest.getResultCompression();
byte[] decompressedBytes =
decompress(
resultData.getAttachment(),
compressionType,
"Data fetch for inline arrow batch with decompression algorithm : " + compressionType);
this.arrowResultChunk =
ArrowResultChunk.builder()
.withInputStream(new ByteArrayInputStream(decompressedBytes), totalRows)
.build();
}
/** {@inheritDoc} */
@Override
public boolean hasNextChunk() {
return this.currentChunkIndex == -1;
}
/** {@inheritDoc} */
@Override
public boolean next() {
if (!hasNextChunk()) {
return false;
}
this.currentChunkIndex++;
return true;
}
/** {@inheritDoc} */
@Override
public ArrowResultChunk getChunk() {
return arrowResultChunk;
}
/** {@inheritDoc} */
@Override
public void close() {
isClosed = true;
arrowResultChunk.releaseChunk();
}
@Override
public long getRowCount() {
return totalRows;
}
@Override
public long getChunkCount() {
return 0;
}
@Override
public boolean isClosed() {
return isClosed;
}
@VisibleForTesting
void handleError(Exception e) throws DatabricksParsingException {
String errorMessage =
String.format("Cannot process inline arrow format. Error: %s", e.getMessage());
LOGGER.error(errorMessage);
throw new DatabricksParsingException(
errorMessage, e, DatabricksDriverErrorCode.INLINE_CHUNK_PARSING_ERROR);
}
}