Skip to content

Commit 8c6f824

Browse files
authored
HIVE-29647: Parallelize Parquet split generation directory listing (#6526)
1 parent 9f387af commit 8c6f824

1 file changed

Lines changed: 20 additions & 17 deletions

File tree

ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,44 +15,34 @@
1515

1616
import java.io.IOException;
1717
import java.util.List;
18-
import java.util.Map;
1918

2019
import org.apache.hadoop.conf.Configuration;
2120
import org.apache.hadoop.fs.FileStatus;
2221
import org.apache.hadoop.fs.FileSystem;
23-
import org.apache.hadoop.fs.Path;
2422
import org.apache.hadoop.hive.common.io.DataCache;
2523
import org.apache.hadoop.hive.common.io.FileMetadataCache;
2624
import org.apache.hadoop.hive.conf.HiveConf;
25+
import org.apache.hadoop.hive.ql.exec.Utilities;
2726
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
2827
import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport;
29-
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
3028
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
3129
import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface;
32-
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils;
33-
import org.apache.hadoop.hive.ql.plan.MapWork;
34-
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
35-
import org.apache.hadoop.mapred.FileSplit;
36-
import org.apache.hadoop.mapred.JobConf;
37-
import org.slf4j.Logger;
38-
import org.slf4j.LoggerFactory;
39-
import org.apache.hadoop.hive.ql.exec.Utilities;
4030
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
4131
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
4232
import org.apache.hadoop.io.ArrayWritable;
4333
import org.apache.hadoop.io.NullWritable;
4434
import org.apache.hadoop.mapred.FileInputFormat;
35+
import org.apache.hadoop.mapred.JobConf;
4536
import org.apache.hadoop.mapred.RecordReader;
46-
4737
import org.apache.parquet.hadoop.ParquetInputFormat;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
40+
41+
import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS;
4842

4943

5044
/**
51-
*
52-
* A Parquet InputFormat for Hive (with the deprecated package mapred)
53-
*
54-
* NOTE: With HIVE-9235 we removed "implements VectorizedParquetInputFormat" since all data types
55-
* are not currently supported. Removing the interface turns off vectorization.
45+
* A Parquet InputFormat for Hive (with the deprecated package mapred).
5646
*/
5747
public class MapredParquetInputFormat extends FileInputFormat<NullWritable, ArrayWritable>
5848
implements InputFormatChecker, VectorizedInputFormatInterface, LlapCacheOnlyInputFormatInterface {
@@ -72,6 +62,19 @@ protected MapredParquetInputFormat(final ParquetInputFormat<ArrayWritable> input
7262
vectorizedSelf = new VectorizedParquetInputFormat();
7363
}
7464

65+
/**
66+
* Parallelize split-generation file listing by sizing Hadoop's {@code LocatedFileStatusFetcher}
67+
* from {@code hive.compute.splits.num.threads}. We set it on the job conf here because that
68+
* property's cluster default does not reach the conf split generation uses; a value of 1 stays
69+
* serial.
70+
*/
71+
@Override
72+
protected FileStatus[] listStatus(JobConf job) throws IOException {
73+
job.setInt(LIST_STATUS_NUM_THREADS,
74+
HiveConf.getIntVar(job, HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS));
75+
return super.listStatus(job);
76+
}
77+
7578
@SuppressWarnings({ "unchecked", "rawtypes" })
7679
@Override
7780
public org.apache.hadoop.mapred.RecordReader<NullWritable, ArrayWritable> getRecordReader(

0 commit comments

Comments
 (0)