forked from apache/datafusion-comet
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathNative.java
More file actions
310 lines (272 loc) · 12.1 KB
/
Native.java
File metadata and controls
310 lines (272 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.comet.parquet;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.comet.IcebergApi;
import org.apache.comet.NativeBase;
public final class Native extends NativeBase {
public static int[] readBatch(long handle, int batchSize) {
return readBatch(handle, batchSize, 0);
}
public static int skipBatch(long handle, int batchSize) {
return skipBatch(handle, batchSize, false);
}
/** Native APIs * */
/**
* Creates a reader for a primitive Parquet column.
*
* @param physicalTypeId id for Parquet physical type
* @param logicalTypeId id for Parquet logical type
* @param expectedPhysicalTypeId id for Parquet physical type, converted from Spark read type.
* This is used for type promotion.
* @param path the path from the root schema to the column, derived from the method
* 'ColumnDescriptor#getPath()'.
* @param maxDl the maximum definition level of the primitive column
* @param maxRl the maximum repetition level of the primitive column
* @param bitWidth (only set when logical type is INT) the bit width for the integer type (INT8,
* INT16, INT32, etc)
* @param isSigned (only set when logical type is INT) whether it is signed or unsigned int.
* @param typeLength number of bytes required to store a value of the type, only set when the
* physical type is FIXED_LEN_BYTE_ARRAY, otherwise it's 0.
* @param precision (only set when logical type is DECIMAL) precision of the decimal type
* @param expectedPrecision (only set when logical type is DECIMAL) precision of the decimal type
* from Spark read schema. This is used for type promotion.
* @param scale (only set when logical type is DECIMAL) scale of the decimal type
* @param tu (only set when logical type is TIMESTAMP) unit for the timestamp
* @param isAdjustedUtc (only set when logical type is TIMESTAMP) whether the timestamp is
* adjusted to UTC or not
* @param batchSize the batch size for the columnar read
* @param useDecimal128 whether to always return 128 bit decimal regardless of precision
* @param useLegacyDateTimestampOrNTZ whether to read legacy dates/timestamps as it is
* @return a pointer to a native Parquet column reader created
*/
public static native long initColumnReader(
int physicalTypeId,
int logicalTypeId,
int expectedPhysicalTypeId,
String[] path,
int maxDl,
int maxRl,
int bitWidth,
int expectedBitWidth,
boolean isSigned,
int typeLength,
int precision,
int expectedPrecision,
int scale,
int expectedScale,
int tu,
boolean isAdjustedUtc,
int batchSize,
boolean useDecimal128,
boolean useLegacyDateTimestampOrNTZ);
/**
* Pass a Parquet dictionary page to the native column reader. Note this should only be called
* once per Parquet column chunk. Otherwise it'll panic.
*
* @param handle the handle to the native Parquet column reader
* @param dictionaryValueCount the number of values in this dictionary
* @param dictionaryData the actual dictionary page data, including repetition/definition levels
* as well as values
* @param encoding the encoding used by the dictionary
*/
public static native void setDictionaryPage(
long handle, int dictionaryValueCount, byte[] dictionaryData, int encoding);
/**
* Passes a Parquet data page V1 to the native column reader.
*
* @param handle the handle to the native Parquet column reader
* @param pageValueCount the number of values in this data page
* @param pageData the actual page data, which should only contain PLAIN-encoded values.
* @param valueEncoding the encoding used by the values
*/
public static native void setPageV1(
long handle, int pageValueCount, byte[] pageData, int valueEncoding);
/**
* Passes a Parquet data page V1 to the native column reader.
*
* @param handle the handle to the native Parquet column reader
* @param pageValueCount the number of values in this data page
* @param buffer the actual page data, represented by a DirectByteBuffer.
* @param valueEncoding the encoding used by the values
*/
public static native void setPageBufferV1(
long handle, int pageValueCount, ByteBuffer buffer, int valueEncoding);
/**
* Passes a Parquet data page V2 to the native column reader.
*
* @param handle the handle to the native Parquet column reader
* @param pageValueCount the number of values in this data page
* @param defLevelData the data for definition levels
* @param repLevelData the data for repetition levels
* @param valueData the data for values
* @param valueEncoding the encoding used by the values
*/
public static native void setPageV2(
long handle,
int pageValueCount,
byte[] defLevelData,
byte[] repLevelData,
byte[] valueData,
int valueEncoding);
/**
* Reset the current columnar batch. This will clear all the content of the batch as well as any
* internal state such as the current offset.
*
* @param handle the handle to the native Parquet column reader
*/
@IcebergApi
public static native void resetBatch(long handle);
/**
* Reads at most 'batchSize' number of rows from the native Parquet column reader. Returns a tuple
* where the first element is the actual number of rows read (including both nulls and non-nulls),
* and the second element is the number of nulls read.
*
* <p>If the returned value is < 'batchSize' then it means the current page has been completely
* drained. In this case, the caller should call {@link Native#setPageV1} or {@link
* Native#setPageV2} before the next 'readBatch' call.
*
* <p>Note that the current page could also be drained if the returned value = 'batchSize', i.e.,
* the remaining number of rows in the page is exactly equal to 'batchSize'. In this case, the
* next 'readBatch' call will return 0 and the caller should call {@link Native#setPageV1} or
* {@link Native#setPageV2} next.
*
* <p>If `nullPadSize` > 0, it pads nulls into the underlying vector before the values will be
* read into.
*
* @param handle the handle to the native Parquet column reader
* @param batchSize the number of rows to be read
* @param nullPadSize the number of nulls to pad before reading.
* @return a tuple: (the actual number of rows read, the number of nulls read)
*/
public static native int[] readBatch(long handle, int batchSize, int nullPadSize);
/**
* Skips at most 'batchSize' number of rows from the native Parquet column reader, and returns the
* actual number of rows skipped.
*
* <p>If the returned value is < 'batchSize' then it means the current page has been completely
* drained. In this case, the caller should call {@link Native#setPageV1} or {@link
* Native#setPageV2} before the next 'skipBatch' call.
*
* <p>Note that the current page could also be drained if the returned value = 'batchSize', i.e.,
* the remaining number of rows in the page is exactly equal to 'batchSize'. In this case, the
* next 'skipBatch' call will return 0 and the caller should call {@link Native#setPageV1} or
* {@link Native#setPageV2} next.
*
* @param handle the handle to the native Parquet column reader
* @param batchSize the number of rows to skip in the current page
* @param discard if true, discard read rows without padding nulls into the underlying vector
* @return the actual number of rows skipped
*/
public static native int skipBatch(long handle, int batchSize, boolean discard);
/**
* Returns the current batch constructed via 'readBatch'
*
* @param handle the handle to the native Parquet column reader
* @param arrayAddr the memory address to the ArrowArray struct
* @param schemaAddr the memory address to the ArrowSchema struct
*/
public static native void currentBatch(long handle, long arrayAddr, long schemaAddr);
/** Set methods to set a constant value for the reader, so it'll return constant vectors */
public static native void setNull(long handle);
public static native void setBoolean(long handle, boolean value);
public static native void setByte(long handle, byte value);
public static native void setShort(long handle, short value);
public static native void setInt(long handle, int value);
public static native void setLong(long handle, long value);
public static native void setFloat(long handle, float value);
public static native void setDouble(long handle, double value);
public static native void setBinary(long handle, byte[] value);
/** Set decimal backed by FixedLengthByteArray */
public static native void setDecimal(long handle, byte[] value);
/** Set position of row index vector for Iceberg Metadata Column */
@IcebergApi
public static native void setPosition(long handle, long value, int size);
/** Set row index vector for Spark row index metadata column and return vector size */
public static native int setIndices(long handle, long offset, int size, long[] indices);
/** Set deleted info for Iceberg Metadata Column */
@IcebergApi
public static native void setIsDeleted(long handle, boolean[] isDeleted);
/**
* Closes the native Parquet column reader and releases all resources associated with it.
*
* @param handle the handle to the native Parquet column reader
*/
public static native void closeColumnReader(long handle);
///////////// Arrow Native Parquet Reader APIs
// TODO: Add partitionValues(?), improve requiredColumns to use a projection mask that corresponds
// to arrow.
// Add batch size, datetimeRebaseModeSpec, metrics(how?)...
/**
* Verify that object store options are valid. An exception will be thrown if the provided options
* are not valid.
*/
public static native void validateObjectStoreConfig(
String filePath, Map<String, String> objectStoreOptions);
/**
* Initialize a record batch reader for a PartitionedFile
*
* @param filePath
* @param starts
* @param lengths
* @return a handle to the record batch reader, used in subsequent calls.
*/
public static native long initRecordBatchReader(
String filePath,
long fileSize,
long[] starts,
long[] lengths,
byte[] filter,
byte[] requiredSchema,
byte[] dataSchema,
String sessionTimezone,
int batchSize,
boolean caseSensitive,
Map<String, String> objectStoreOptions,
CometFileKeyUnwrapper keyUnwrapper,
Object metricsNode);
// arrow native version of read batch
/**
* Read the next batch of data into memory on native side
*
* @param handle
* @return the number of rows read
*/
public static native int readNextRecordBatch(long handle);
// arrow native equivalent of currentBatch. 'columnNum' is number of the column in the record
// batch
/**
* Load the column corresponding to columnNum in the currently loaded record batch into JVM
*
* @param handle
* @param columnNum
* @param arrayAddr
* @param schemaAddr
*/
public static native void currentColumnBatch(
long handle, int columnNum, long arrayAddr, long schemaAddr);
// arrow native version to close record batch reader
/**
* Close the record batch reader. Free the resources
*
* @param handle
*/
public static native void closeRecordBatchReader(long handle);
}