Skip to content

Commit 49ec3e4

Browse files
committed
fast-path
1 parent 780e165 commit 49ec3e4

5 files changed

Lines changed: 315 additions & 18 deletions

File tree

ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReader.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,32 @@ public interface ParquetDataColumnReader {
104104
*/
105105
byte[] readDecimal();
106106

107+
/**
108+
* True when this reader can supply DECIMAL_64 values as raw unscaled longs at the column's scale,
109+
* with no per-row HiveDecimal/byte[] conversion -- i.e. an INT32/INT64-backed decimal whose file
110+
* scale equals the requested Hive scale. When true, the long-backed reader may call
111+
* {@link #readDecimal64()} / {@link #readDecimal64(int)} instead of {@link #readDecimal()} /
112+
* {@link #readDecimal(int)}.
113+
*/
114+
default boolean isFastDecimal64() {
115+
return false;
116+
}
117+
118+
/**
119+
* @return the next value as a raw unscaled decimal64 long. Only valid when {@link #isFastDecimal64()}.
120+
* {@link #isValid()} is set false when the value does not fit the Hive precision (caller -> NULL).
121+
*/
122+
default long readDecimal64() {
123+
throw new UnsupportedOperationException();
124+
}
125+
126+
/**
127+
* @return the dictionary value at {@code id} as a raw unscaled decimal64 long. See {@link #readDecimal64()}.
128+
*/
129+
default long readDecimal64(int id) {
130+
throw new UnsupportedOperationException();
131+
}
132+
107133
/**
108134
* @return the next Double from the page
109135
*/

ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetDataColumnReaderFactory.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,19 @@ byte[] validatedScaledDecimal(int inpScale) {
382382
}
383383
}
384384

385+
// Validate a raw unscaled decimal64 long (already at the Hive scale) against the Hive precision.
386+
// Sets isValid; returns the value unchanged when in range, else 0 (caller marks the entry NULL).
387+
// Used by the Decimal64 identity fast path; bounds via HiveDecimalWritable to avoid hand-rolling.
388+
long validatedDecimal64(long unscaledValue) {
389+
long absMax = HiveDecimalWritable.getDecimal64AbsMax(hivePrecision);
390+
if (unscaledValue >= -absMax && unscaledValue <= absMax) {
391+
this.isValid = true;
392+
return unscaledValue;
393+
}
394+
this.isValid = false;
395+
return 0;
396+
}
397+
385398
/**
386399
* Helper function to validate double data. Sets the isValid to true if the data is valid
387400
* for the type it will be read in, otherwise false.
@@ -1622,6 +1635,23 @@ public byte[] readDecimal(int id) {
16221635
hiveDecimalWritable.set(hiveDecimal);
16231636
return super.validatedScaledDecimal(scale);
16241637
}
1638+
1639+
@Override
1640+
public boolean isFastDecimal64() {
1641+
// Identity fast path: the file scale equals the Hive scale, so the stored unscaled value IS the
1642+
// Decimal64 value -- no rescale/rounding, only a precision bounds check.
1643+
return scale == hiveScale;
1644+
}
1645+
1646+
@Override
1647+
public long readDecimal64() {
1648+
return validatedDecimal64(valuesReader.readInteger());
1649+
}
1650+
1651+
@Override
1652+
public long readDecimal64(int id) {
1653+
return validatedDecimal64(dict.decodeToInt(id));
1654+
}
16251655
}
16261656

16271657
/**
@@ -1784,6 +1814,23 @@ public byte[] readDecimal(int id) {
17841814
hiveDecimalWritable.set(hiveDecimal);
17851815
return super.validatedScaledDecimal(scale);
17861816
}
1817+
1818+
@Override
1819+
public boolean isFastDecimal64() {
1820+
// Identity fast path: the file scale equals the Hive scale, so the stored unscaled long IS the
1821+
// Decimal64 value -- no rescale/rounding, only a precision bounds check.
1822+
return scale == hiveScale;
1823+
}
1824+
1825+
@Override
1826+
public long readDecimal64() {
1827+
return validatedDecimal64(valuesReader.readLong());
1828+
}
1829+
1830+
@Override
1831+
public long readDecimal64(int id) {
1832+
return validatedDecimal64(dict.decodeToLong(id));
1833+
}
17871834
}
17881835

17891836
/**

ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -611,17 +611,30 @@ private void decodeDictionaryIds(
611611
case DECIMAL:
612612
if (column instanceof Decimal64ColumnVector dec64) {
613613
fillDecimal64PrecisionScale(dec64);
614+
boolean fast = dictionary.isFastDecimal64();
614615
short valueScale = getEncodedDecimalScale();
615616
for (int i = rowId; i < rowId + num; ++i) {
616617
if (!column.isNull[i]) {
617-
byte[] bytes = dictionary.readDecimal((int) dictionaryIds.vector[i]);
618-
if (dictionary.isValid()) {
619-
// set() enforces the column precision/scale and marks the entry NULL on overflow.
620-
dec64.set(i, bytes, valueScale);
618+
int id = (int) dictionaryIds.vector[i];
619+
boolean stored;
620+
if (fast) {
621+
// Identity fast path: store the raw unscaled long directly.
622+
long v = dictionary.readDecimal64(id);
623+
stored = dictionary.isValid();
624+
if (stored) {
625+
dec64.vector[i] = v;
626+
}
621627
} else {
622-
setNullValue(column, i);
628+
// set() enforces the column precision/scale and marks the entry NULL on overflow.
629+
byte[] bytes = dictionary.readDecimal(id);
630+
stored = dictionary.isValid();
631+
if (stored) {
632+
dec64.set(i, bytes, valueScale);
633+
stored = !dec64.isNull[i];
634+
}
623635
}
624-
if (dec64.isNull[i]) {
636+
if (!stored) {
637+
setNullValue(column, i);
625638
dec64.vector[i] = 0;
626639
}
627640
}
@@ -716,22 +729,34 @@ private short[] getDecimalPrecisionScale() {
716729
*/
717730
private void readDecimal64(int total, Decimal64ColumnVector c, int rowId) {
718731
fillDecimal64PrecisionScale(c);
732+
boolean fast = dataColumn.isFastDecimal64();
719733
short valueScale = getEncodedDecimalScale();
720734
int left = total;
721735
while (left > 0) {
722736
readRepetitionAndDefinitionLevels();
723737
if (definitionLevel >= maxDefLevel) {
724-
byte[] bytes = dataColumn.readDecimal();
725-
if (dataColumn.isValid()) {
726-
c.isNull[rowId] = false;
727-
// set() enforces the column precision/scale and marks the entry NULL on overflow.
728-
c.set(rowId, bytes, valueScale);
729-
if (c.isNull[rowId]) {
730-
c.vector[rowId] = 0;
731-
c.isRepeating = false;
732-
} else {
733-
c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
738+
boolean stored;
739+
if (fast) {
740+
// Identity fast path: store the raw unscaled long directly (no HiveDecimal/byte[] per row).
741+
long v = dataColumn.readDecimal64();
742+
stored = dataColumn.isValid();
743+
if (stored) {
744+
c.vector[rowId] = v;
745+
}
746+
} else {
747+
// set() enforces the column precision/scale and marks the entry NULL if the value does not
748+
// fit (e.g. schema-evolved data whose larger file scale can't be held at the column scale).
749+
byte[] bytes = dataColumn.readDecimal();
750+
stored = dataColumn.isValid();
751+
if (stored) {
752+
c.isNull[rowId] = false;
753+
c.set(rowId, bytes, valueScale);
754+
stored = !c.isNull[rowId];
734755
}
756+
}
757+
if (stored) {
758+
c.isNull[rowId] = false;
759+
c.isRepeating = c.isRepeating && (c.vector[0] == c.vector[rowId]);
735760
} else {
736761
c.vector[rowId] = 0;
737762
setNullValue(c, rowId);

ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,16 @@ public void testDecimal64ReadInt64() throws Exception {
134134
decimal64ReadInt64();
135135
}
136136

137+
@Test
138+
public void testDecimal64ReadScaleEvolution() throws Exception {
139+
decimal64ReadScaleEvolution();
140+
}
141+
142+
@Test
143+
public void testDecimal64ReadPrecisionNarrowing() throws Exception {
144+
decimal64ReadPrecisionNarrowing();
145+
}
146+
137147
@Test
138148
public void verifyBatchOffsets() throws Exception {
139149
super.verifyBatchOffsets();

0 commit comments

Comments
 (0)