Skip to content

Commit 050c6c5

Browse files
authored
Improve BigInteger handling in Parquet files (Closes #3125) (#3126)
* Allow unrecognized values in metadata JSON to be ignored. Allow BigIntegers to be encoded as Deciomal types with a precision/scale of 1, instead of requiring a serializable codec * Improve BigInteger handling * Applied spotless * Allow overriding default codecs of BigInteger and BigDecimal by ones set by users in ParquetInstructions. Fix unit test * add comment * Missed a file * Comments * Spotless * DO a table walk to determine precision, use a default valid value if everything is null * spotless * Renames * moar spotless
1 parent 7032e19 commit 050c6c5

10 files changed

Lines changed: 242 additions & 55 deletions

File tree

engine/table/src/main/java/io/deephaven/engine/table/impl/CodecLookup.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.io.Externalizable;
1919
import java.math.BigDecimal;
20+
import java.math.BigInteger;
2021

2122
/**
2223
* Utility class to concentrate {@link ObjectCodec} lookups.
@@ -76,7 +77,11 @@ private static boolean noCodecRequired(@NotNull final Class<?> dataType) {
7677
// appropriate precision and scale calculated from column data,
7778
// unless the user explicitly requested something else
7879
// via instructions.
79-
dataType == BigDecimal.class;
80+
dataType == BigDecimal.class ||
81+
82+
// BigIntegers can be encoded as a DecimalLogicalType using a precision of 1 and scale of 0, which lets
83+
// them be read by other parquet tools.
84+
dataType == BigInteger.class;
8085
}
8186

8287
/**

engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
* refreshing tables, we need the user to tell us.
2525
*/
2626
public class BigDecimalUtils {
27+
private static final PrecisionAndScale EMPTY_TABLE_PRECISION_AND_SCALE = new PrecisionAndScale(1, 1);
28+
private static final int TARGET_CHUNK_SIZE = 4096;
2729
public static final int INVALID_PRECISION_OR_SCALE = -1;
2830

2931
/**
@@ -53,7 +55,8 @@ public static PrecisionAndScale computePrecisionAndScale(
5355
}
5456

5557
/**
56-
* Compute an overall precision and scale that would fit all existing values in a column source.
58+
* Compute an overall precision and scale that would fit all existing values in a column source. Note that this
59+
* requires a full table scan to ensure the correct values are determined.
5760
*
5861
* @param rowSet The rowset for the provided column
5962
* @param source a {@code ColumnSource} of {@code BigDecimal} type
@@ -62,29 +65,45 @@ public static PrecisionAndScale computePrecisionAndScale(
6265
public static PrecisionAndScale computePrecisionAndScale(
6366
final RowSet rowSet,
6467
final ColumnSource<BigDecimal> source) {
65-
final int sz = 4096;
66-
// we first compute max(precision - scale) and max(scale), which corresponds to
67-
// max(digits left of the decimal point), max(digits right of the decimal point).
68-
// Then we convert to (precision, scale) before returning.
69-
int maxPrecisionMinusScale = 0;
70-
int maxScale = 0;
71-
try (final ChunkSource.GetContext context = source.makeGetContext(sz);
68+
if (rowSet.isEmpty()) {
69+
return EMPTY_TABLE_PRECISION_AND_SCALE;
70+
}
71+
72+
// We will walk the entire table to determine the max(precision - scale) and
73+
// max(scale), which corresponds to max(digits left of the decimal point), max(digits right of the decimal
74+
// point). Then we convert to (precision, scale) before returning.
75+
int maxPrecisionMinusScale = -1;
76+
int maxScale = -1;
77+
try (final ChunkSource.GetContext context = source.makeGetContext(TARGET_CHUNK_SIZE);
7278
final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) {
73-
final RowSequence rowSeq = it.getNextRowSequenceWithLength(sz);
74-
final ObjectChunk<BigDecimal, ? extends Values> chunk = source.getChunk(context, rowSeq).asObjectChunk();
75-
for (int i = 0; i < chunk.size(); ++i) {
76-
final BigDecimal x = chunk.get(i);
77-
final int precision = x.precision();
78-
final int scale = x.scale();
79-
final int precisionMinusScale = precision - scale;
80-
if (precisionMinusScale > maxPrecisionMinusScale) {
81-
maxPrecisionMinusScale = precisionMinusScale;
82-
}
83-
if (scale > maxScale) {
84-
maxScale = scale;
79+
while (it.hasMore()) {
80+
final RowSequence rowSeq = it.getNextRowSequenceWithLength(TARGET_CHUNK_SIZE);
81+
final ObjectChunk<BigDecimal, ? extends Values> chunk =
82+
source.getChunk(context, rowSeq).asObjectChunk();
83+
for (int i = 0; i < chunk.size(); ++i) {
84+
final BigDecimal x = chunk.get(i);
85+
if (x == null) {
86+
continue;
87+
}
88+
89+
final int precision = x.precision();
90+
final int scale = x.scale();
91+
final int precisionMinusScale = precision - scale;
92+
if (precisionMinusScale > maxPrecisionMinusScale) {
93+
maxPrecisionMinusScale = precisionMinusScale;
94+
}
95+
if (scale > maxScale) {
96+
maxScale = scale;
97+
}
8598
}
8699
}
87100
}
101+
102+
// If these are < 0, then every value we visited was null
103+
if (maxPrecisionMinusScale < 0 && maxScale < 0) {
104+
return EMPTY_TABLE_PRECISION_AND_SCALE;
105+
}
106+
88107
return new PrecisionAndScale(maxPrecisionMinusScale + maxScale, maxScale);
89108
}
90109

engine/table/src/test/java/io/deephaven/engine/table/impl/TestCodecColumns.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.deephaven.engine.table.Table;
99
import io.deephaven.engine.table.TableDefinition;
1010
import io.deephaven.engine.testutil.TstUtils;
11+
import io.deephaven.parquet.table.BigIntegerParquetBytesCodec;
1112
import io.deephaven.parquet.table.ParquetTools;
1213
import io.deephaven.engine.util.TableTools;
1314
import io.deephaven.parquet.table.ParquetInstructions;
@@ -51,16 +52,18 @@ public class TestCodecColumns {
5152
ColumnDefinition.fromGenericType("VWBA", byte[].class, byte.class);
5253
writeBuilder.addColumnCodec("VWBA", SimpleByteArrayCodec.class.getName());
5354
readBuilder.addColumnCodec("VWBA", SimpleByteArrayCodec.class.getName());
55+
5456
VARIABLE_WIDTH_COLUMN_DEFINITION_2 = ColumnDefinition.fromGenericType("VWCD", ArrayTuple.class);
5557
readBuilder.addColumnCodec("VWCD", ExternalizableCodec.class.getName(), ArrayTuple.class.getName());
5658
FIXED_WIDTH_BYTE_ARRAY_COLUMN_DEFINITION = ColumnDefinition.fromGenericType("FWBA", byte[].class, byte.class);
5759
writeBuilder.addColumnCodec("FWBA", SimpleByteArrayCodec.class.getName(), "9");
5860
readBuilder.addColumnCodec("FWBA", SimpleByteArrayCodec.class.getName(), "9");
61+
5962
VARIABLE_WIDTH_BIG_INTEGER_COLUMN_DEFINITION = ColumnDefinition.fromGenericType("VWBI", BigInteger.class);
60-
writeBuilder.addColumnCodec("VWBI", BigIntegerCodec.class.getName());
61-
readBuilder.addColumnCodec("VWBI", BigIntegerCodec.class.getName());
63+
writeBuilder.addColumnCodec("VWBI", SerializableCodec.class.getName());
64+
readBuilder.addColumnCodec("VWBI", SerializableCodec.class.getName());
65+
6266
VARIABLE_WIDTH_BIG_INTEGER_COLUMN_DEFINITION_S = ColumnDefinition.fromGenericType("VWBIS", BigInteger.class);
63-
readBuilder.addColumnCodec("VWBIS", SerializableCodec.class.getName());
6467
expectedReadInstructions = readBuilder.build();
6568
writeInstructions = writeBuilder.build();
6669
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package io.deephaven.parquet.table;
2+
3+
import io.deephaven.datastructures.util.CollectionUtil;
4+
import io.deephaven.util.codec.ObjectCodec;
5+
import org.jetbrains.annotations.NotNull;
6+
import org.jetbrains.annotations.Nullable;
7+
8+
import java.math.BigInteger;
9+
import java.nio.ByteBuffer;
10+
11+
public class BigIntegerParquetBytesCodec implements ObjectCodec<BigInteger> {
12+
private final int encodedSizeInBytes;
13+
private final byte[] nullBytes;
14+
15+
/**
16+
*
17+
* @param encodedSizeInBytes encoded size in bytes, if fixed size, or -1 if variable size. note that according to
18+
* the parquet spec, the minimum number of bytes required to represent the unscaled value should be used for
19+
* a variable sized (binary) encoding; in any case, the maximum encoded bytes is implicitly limited by
20+
* precision.
21+
*/
22+
public BigIntegerParquetBytesCodec(final int encodedSizeInBytes) {
23+
this.encodedSizeInBytes = encodedSizeInBytes;
24+
if (encodedSizeInBytes > 0) {
25+
nullBytes = new byte[encodedSizeInBytes];
26+
for (int i = 0; i < encodedSizeInBytes; ++i) {
27+
nullBytes[i] = (byte) 0xff;
28+
}
29+
} else {
30+
nullBytes = CollectionUtil.ZERO_LENGTH_BYTE_ARRAY;
31+
}
32+
}
33+
34+
// Given how parquet encoding works for nulls, the actual value provided for a null is irrelevant.
35+
@Override
36+
public boolean isNullable() {
37+
return true;
38+
}
39+
40+
@Override
41+
public int getPrecision() {
42+
return 0;
43+
}
44+
45+
@Override
46+
public int getScale() {
47+
return 1;
48+
}
49+
50+
@Override
51+
public int expectedObjectWidth() {
52+
return encodedSizeInBytes <= 0 ? VARIABLE_WIDTH_SENTINEL : encodedSizeInBytes;
53+
}
54+
55+
@NotNull
56+
@Override
57+
public byte[] encode(@Nullable final BigInteger input) {
58+
if (input == null) {
59+
return nullBytes;
60+
}
61+
62+
return input.toByteArray();
63+
}
64+
65+
@Nullable
66+
@Override
67+
public BigInteger decode(@NotNull final byte[] input, final int offset, final int length) {
68+
if (length <= 0) {
69+
return null;
70+
}
71+
72+
if (length == encodedSizeInBytes) {
73+
boolean allPreviousBitsSet = true;
74+
for (int i = 0; i < encodedSizeInBytes; ++i) {
75+
if (input[offset + i] != (byte) 0xff) {
76+
allPreviousBitsSet = false;
77+
break;
78+
}
79+
}
80+
if (allPreviousBitsSet) {
81+
return null;
82+
}
83+
}
84+
85+
final ByteBuffer buffer = ByteBuffer.wrap(input, offset, length);
86+
final byte[] unscaledValueBytes = new byte[length];
87+
buffer.get(unscaledValueBytes);
88+
return new BigInteger(unscaledValueBytes);
89+
}
90+
}

extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetSchemaReader.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.io.File;
2727
import java.io.IOException;
28+
import java.math.BigInteger;
2829
import java.util.*;
2930
import java.util.function.BiFunction;
3031
import java.util.function.Supplier;
@@ -327,6 +328,11 @@ public Optional<Class<?>> visit(final LogicalTypeAnnotation.EnumLogicalTypeAnnot
327328
@Override
328329
public Optional<Class<?>> visit(
329330
final LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
331+
// This pair of values (precision=1, scale=0) is set at write tiem as a marker so that we can recover
332+
// the fact that the type is a BigInteger, not a BigDecimal when the fies are read.
333+
if (decimalLogicalType.getPrecision() == 1 && decimalLogicalType.getScale() == 0) {
334+
return Optional.of(BigInteger.class);
335+
}
330336
return Optional.of(java.math.BigDecimal.class);
331337
}
332338

extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.io.IOException;
5454
import java.lang.reflect.Field;
5555
import java.math.BigDecimal;
56+
import java.math.BigInteger;
5657
import java.nio.*;
5758
import java.nio.file.Path;
5859
import java.nio.file.Paths;
@@ -775,14 +776,23 @@ private static <DATA_TYPE> TransferObject<?> getDestinationBuffer(
775776
return new ByteTransfer(columnSource, maxValuesPerPage);
776777
} else if (String.class.equals(columnType)) {
777778
return new StringTransfer(columnSource, maxValuesPerPage);
778-
} else if (BigDecimal.class.equals(columnType)) {
779-
// noinspection unchecked
780-
final ColumnSource<BigDecimal> bigDecimalColumnSource = (ColumnSource<BigDecimal>) columnSource;
781-
final BigDecimalUtils.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale(
782-
computedCache, columnDefinition.getName(), tableRowSet, () -> bigDecimalColumnSource);
783-
final ObjectCodec<BigDecimal> codec = new BigDecimalParquetBytesCodec(
784-
precisionAndScale.precision, precisionAndScale.scale, -1);
785-
return new CodecTransfer<>(bigDecimalColumnSource, codec, maxValuesPerPage);
779+
}
780+
781+
// If there's an explicit codec, we should disregard the defaults for these CodecLookup#lookup() will properly
782+
// select the codec assigned by the instructions so we only need to check and redirect once.
783+
if (!CodecLookup.explicitCodecPresent(instructions.getCodecName(columnDefinition.getName()))) {
784+
if (BigDecimal.class.equals(columnType)) {
785+
// noinspection unchecked
786+
final ColumnSource<BigDecimal> bigDecimalColumnSource = (ColumnSource<BigDecimal>) columnSource;
787+
final BigDecimalUtils.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale(
788+
computedCache, columnDefinition.getName(), tableRowSet, () -> bigDecimalColumnSource);
789+
final ObjectCodec<BigDecimal> codec = new BigDecimalParquetBytesCodec(
790+
precisionAndScale.precision, precisionAndScale.scale, -1);
791+
return new CodecTransfer<>(bigDecimalColumnSource, codec, maxValuesPerPage);
792+
} else if (BigInteger.class.equals(columnType)) {
793+
return new CodecTransfer<>((ColumnSource<BigInteger>) columnSource, new BigIntegerParquetBytesCodec(-1),
794+
maxValuesPerPage);
795+
}
786796
}
787797

788798
final ObjectCodec<? super DATA_TYPE> codec = CodecLookup.lookup(columnDefinition, instructions);

extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.io.Externalizable;
2525
import java.math.BigDecimal;
26+
import java.math.BigInteger;
2627
import java.util.*;
2728
import java.util.function.Supplier;
2829

@@ -34,7 +35,6 @@
3435
* and the data translation.
3536
*/
3637
class TypeInfos {
37-
3838
private static final TypeInfo[] TYPE_INFOS = new TypeInfo[] {
3939
IntType.INSTANCE,
4040
LongType.INSTANCE,
@@ -45,15 +45,16 @@ class TypeInfos {
4545
CharType.INSTANCE,
4646
ByteType.INSTANCE,
4747
StringType.INSTANCE,
48-
DateTimeType.INSTANCE
48+
DateTimeType.INSTANCE,
49+
BigIntegerType.INSTANCE
4950
};
5051

5152
private static final Map<Class<?>, TypeInfo> BY_CLASS;
5253

5354
static {
5455
final Map<Class<?>, TypeInfo> fa = new HashMap<>();
55-
for (TypeInfo typeInfo : TYPE_INFOS) {
56-
for (Class<?> type : typeInfo.getTypes()) {
56+
for (final TypeInfo typeInfo : TYPE_INFOS) {
57+
for (final Class<?> type : typeInfo.getTypes()) {
5758
fa.put(type, typeInfo);
5859
}
5960
}
@@ -95,6 +96,7 @@ static Pair<String, String> getCodecAndArgs(
9596
if (!CodecLookup.codecRequired(columnDefinition)) {
9697
return null;
9798
}
99+
98100
// Impute an appropriate codec for the data type
99101
final Class<?> dataType = columnDefinition.getDataType();
100102
if (Externalizable.class.isAssignableFrom(dataType)) {
@@ -365,6 +367,28 @@ public PrimitiveBuilder<PrimitiveType> getBuilder(boolean required, boolean repe
365367
}
366368
}
367369

370+
/**
371+
* We will encode BigIntegers as Decimal types. Parquet has no special type for BigIntegers, but we can maintain
372+
* external compatibility by encoding them as fixed length decimals of scale 1. Internally, we'll record that we
373+
* wrote this as a decimal, so we can properly decode it back to BigInteger.
374+
*/
375+
private enum BigIntegerType implements TypeInfo {
376+
INSTANCE;
377+
378+
private static final Set<Class<?>> clazzes = Collections.singleton(BigInteger.class);
379+
380+
@Override
381+
public Set<Class<?>> getTypes() {
382+
return clazzes;
383+
}
384+
385+
@Override
386+
public PrimitiveBuilder<PrimitiveType> getBuilder(boolean required, boolean repeating, Class<?> dataType) {
387+
return type(PrimitiveTypeName.BINARY, required, repeating)
388+
.as(LogicalTypeAnnotation.decimalType(0, 1));
389+
}
390+
}
391+
368392
interface TypeInfo {
369393

370394
Set<Class<?>> getTypes();

0 commit comments

Comments
 (0)