Skip to content

Commit 78a5208

Browse files
DRILL-8527: Hive Limit Push Down (#2997)
1 parent b4dd738 commit 78a5208

7 files changed

Lines changed: 103 additions & 18 deletions

File tree

contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public class HiveScan extends AbstractGroupScan {
6565
private final HiveReadEntry hiveReadEntry;
6666
private final HiveMetadataProvider metadataProvider;
6767
private final Map<String, String> confProperties;
68-
68+
private final int maxRecords;
6969
private List<List<LogicalInputSplit>> mappings;
7070
private List<LogicalInputSplit> inputSplits;
7171

@@ -77,21 +77,23 @@ public HiveScan(@JsonProperty("userName") final String userName,
7777
@JsonProperty("hiveStoragePluginConfig") final HiveStoragePluginConfig hiveStoragePluginConfig,
7878
@JsonProperty("columns") final List<SchemaPath> columns,
7979
@JsonProperty("confProperties") final Map<String, String> confProperties,
80+
@JsonProperty("maxRecords") final int maxRecords,
8081
@JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
8182
this(userName,
8283
hiveReadEntry,
8384
pluginRegistry.resolve(hiveStoragePluginConfig, HiveStoragePlugin.class),
8485
columns,
85-
null, confProperties);
86+
null, confProperties, maxRecords);
8687
}
8788

8889
public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin hiveStoragePlugin,
89-
final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider, final Map<String, String> confProperties) throws ExecutionSetupException {
90+
final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider, final Map<String, String> confProperties, int maxRecords) throws ExecutionSetupException {
9091
super(userName);
9192
this.hiveReadEntry = hiveReadEntry;
9293
this.columns = columns;
9394
this.hiveStoragePlugin = hiveStoragePlugin;
9495
this.confProperties = confProperties;
96+
this.maxRecords = maxRecords;
9597
if (metadataProvider == null) {
9698
this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry, getHiveConf());
9799
} else {
@@ -106,10 +108,20 @@ public HiveScan(final HiveScan that) {
106108
this.hiveStoragePlugin = that.hiveStoragePlugin;
107109
this.metadataProvider = that.metadataProvider;
108110
this.confProperties = that.confProperties;
111+
this.maxRecords = that.maxRecords;
112+
}
113+
public HiveScan(final HiveScan that, int maxRecords) {
114+
super(that);
115+
this.columns = that.columns;
116+
this.hiveReadEntry = that.hiveReadEntry;
117+
this.hiveStoragePlugin = that.hiveStoragePlugin;
118+
this.metadataProvider = that.metadataProvider;
119+
this.confProperties = that.confProperties;
120+
this.maxRecords = maxRecords;
109121
}
110122

111123
public HiveScan clone(final HiveReadEntry hiveReadEntry) throws ExecutionSetupException {
112-
return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, columns, metadataProvider, confProperties);
124+
return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, columns, metadataProvider, confProperties, maxRecords);
113125
}
114126

115127
@JsonProperty
@@ -133,6 +145,11 @@ public Map<String, String> getConfProperties() {
133145
return confProperties;
134146
}
135147

148+
@JsonProperty
149+
public int getMaxRecords() {
150+
return maxRecords;
151+
}
152+
136153
@JsonIgnore
137154
public HiveStoragePlugin getStoragePlugin() {
138155
return hiveStoragePlugin;
@@ -167,6 +184,19 @@ public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> end
167184
}
168185
}
169186

187+
@Override
188+
public GroupScan applyLimit(int maxRecords) {
189+
if (maxRecords == this.maxRecords){
190+
return null;
191+
}
192+
return new HiveScan(this, maxRecords);
193+
}
194+
195+
@Override
196+
public boolean supportsLimitPushdown() {
197+
return true;
198+
}
199+
170200
@Override
171201
public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException {
172202
try {
@@ -189,7 +219,7 @@ public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupE
189219
}
190220

191221
final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.getTableWrapper(), parts);
192-
return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, hiveStoragePlugin, confProperties);
222+
return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, hiveStoragePlugin, maxRecords, confProperties);
193223
} catch (IOException | ReflectiveOperationException e) {
194224
throw new ExecutionSetupException(e);
195225
}
@@ -271,6 +301,7 @@ public String toString() {
271301
+ ", partitions= " + partitions
272302
+ ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry)
273303
+ ", confProperties=" + confProperties
304+
+ ", maxRecords=" + maxRecords
274305
+ "]";
275306
}
276307

contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public HiveScan getPhysicalScan(String userName, JSONOptions selection, List<Sch
113113
}
114114
}
115115

116-
return new HiveScan(userName, hiveReadEntry, this, columns, null, confProperties);
116+
return new HiveScan(userName, hiveReadEntry, this, columns, null, confProperties, -1);
117117
} catch (ExecutionSetupException e) {
118118
throw new IOException(e);
119119
}

contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
5858
private final List<HivePartition> partitions;
5959
private final List<SchemaPath> columns;
6060
private final Map<String, String> confProperties;
61-
61+
private final int maxRecords;
6262
@JsonCreator
6363
public HiveSubScan(@JacksonInject StoragePluginRegistry registry,
6464
@JsonProperty("userName") String userName,
@@ -67,6 +67,7 @@ public HiveSubScan(@JacksonInject StoragePluginRegistry registry,
6767
@JsonProperty("splitClasses") List<String> splitClasses,
6868
@JsonProperty("columns") List<SchemaPath> columns,
6969
@JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig,
70+
@JsonProperty("maxRecords") int maxRecords,
7071
@JsonProperty("confProperties") Map<String, String> confProperties)
7172
throws IOException, ExecutionSetupException, ReflectiveOperationException {
7273
this(userName,
@@ -75,6 +76,7 @@ public HiveSubScan(@JacksonInject StoragePluginRegistry registry,
7576
splitClasses,
7677
columns,
7778
registry.resolve(hiveStoragePluginConfig, HiveStoragePlugin.class),
79+
maxRecords,
7880
confProperties);
7981
}
8082

@@ -84,6 +86,7 @@ public HiveSubScan(final String userName,
8486
final List<String> splitClasses,
8587
final List<SchemaPath> columns,
8688
final HiveStoragePlugin hiveStoragePlugin,
89+
final Integer maxRecords,
8790
final Map<String, String> confProperties)
8891
throws IOException, ReflectiveOperationException {
8992
super(userName);
@@ -94,6 +97,7 @@ public HiveSubScan(final String userName,
9497
this.splitClasses = splitClasses;
9598
this.columns = columns;
9699
this.hiveStoragePlugin = hiveStoragePlugin;
100+
this.maxRecords = maxRecords;
97101
this.confProperties = confProperties;
98102

99103
for (int i = 0; i < splits.size(); i++) {
@@ -121,6 +125,10 @@ public List<SchemaPath> getColumns() {
121125
return columns;
122126
}
123127

128+
public int getMaxRecords() {
129+
return maxRecords;
130+
}
131+
124132
@JsonProperty
125133
public HiveStoragePluginConfig getHiveStoragePluginConfig() {
126134
return hiveStoragePlugin.getConfig();
@@ -164,7 +172,7 @@ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVis
164172
@Override
165173
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
166174
try {
167-
return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, hiveStoragePlugin, confProperties);
175+
return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, hiveStoragePlugin, maxRecords, confProperties);
168176
} catch (IOException | ReflectiveOperationException e) {
169177
throw new ExecutionSetupException(e);
170178
}

contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public class HiveDefaultRecordReader extends AbstractRecordReader {
217217
*/
218218
private StructField[] selectedStructFieldRefs;
219219

220-
220+
private final int maxRecords;
221221
/**
222222
* Readers constructor called by initializer.
223223
*
@@ -231,7 +231,7 @@ public class HiveDefaultRecordReader extends AbstractRecordReader {
231231
*/
232232
public HiveDefaultRecordReader(HiveTableWithColumnCache table, HivePartition partition,
233233
Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns,
234-
FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi) {
234+
FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi, int maxRecords) {
235235
this.hiveTable = table;
236236
this.partition = partition;
237237
this.hiveConf = hiveConf;
@@ -243,6 +243,7 @@ public HiveDefaultRecordReader(HiveTableWithColumnCache table, HivePartition par
243243
this.partitionValues = new Object[0];
244244
setColumns(projectedColumns);
245245
this.fragmentContext = context;
246+
this.maxRecords = maxRecords;
246247
}
247248

248249
@Override
@@ -396,7 +397,8 @@ public int next() {
396397

397398
try {
398399
int recordCount;
399-
for (recordCount = 0; (recordCount < TARGET_RECORD_COUNT && hasNextValue(valueHolder)); recordCount++) {
400+
int record = maxRecords > 0 ? maxRecords : TARGET_RECORD_COUNT;
401+
for (recordCount = 0; (recordCount < record && hasNextValue(valueHolder)); recordCount++) {
400402
Object deserializedHiveRecord = partitionToTableSchemaConverter.convert(partitionDeserializer.deserialize((Writable) valueHolder));
401403
outputWriter.setPosition(recordCount);
402404
readHiveRecordAndInsertIntoRecordBatch(deserializedHiveRecord);

contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ public class HiveTextRecordReader extends HiveDefaultRecordReader {
5858
*/
5959
public HiveTextRecordReader(HiveTableWithColumnCache table, HivePartition partition,
6060
Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns,
61-
FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi) {
62-
super(table, partition, inputSplits, projectedColumns, context, hiveConf, proxyUgi);
61+
FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi, int maxRecords) {
62+
super(table, partition, inputSplits, projectedColumns, context, hiveConf, proxyUgi, maxRecords);
6363
}
6464

6565
@Override

contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,10 @@ public static List<RecordReader> init(ExecutorFragmentContext ctx, HiveSubScan c
5656
final UserGroupInformation proxyUgi = ImpersonationUtil.createProxyUgi(config.getUserName(), ctx.getQueryUserName());
5757
final List<List<InputSplit>> inputSplits = config.getInputSplits();
5858
final HiveConf hiveConf = config.getHiveConf();
59-
59+
final int maxRecords = config.getMaxRecords();
6060
if (inputSplits.isEmpty()) {
6161
return Collections.singletonList(
62-
readerFactory.createReader(config.getTable(), null /*partition*/, null /*split*/, config.getColumns(), ctx, hiveConf, proxyUgi)
62+
readerFactory.createReader(config.getTable(), null /*partition*/, null /*split*/, config.getColumns(), ctx, hiveConf, proxyUgi, maxRecords)
6363
);
6464
} else {
6565
IndexedPartitions partitions = getPartitions(config);
@@ -70,7 +70,7 @@ public static List<RecordReader> init(ExecutorFragmentContext ctx, HiveSubScan c
7070
partitions.get(idx),
7171
inputSplits.get(idx),
7272
config.getColumns(),
73-
ctx, hiveConf, proxyUgi))
73+
ctx, hiveConf, proxyUgi, maxRecords))
7474
.collect(Collectors.toList());
7575
}
7676
}
@@ -109,8 +109,7 @@ private interface HiveReaderFactory {
109109

110110
RecordReader createReader(HiveTableWithColumnCache table, HivePartition partition,
111111
Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns,
112-
FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi);
113-
112+
FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi, int maxRecords);
114113
}
115114

116115
/**
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.drill.exec.hive;
19+
20+
import org.apache.drill.categories.HiveStorageTest;
21+
import org.apache.drill.categories.SlowTest;
22+
import org.junit.Rule;
23+
import org.junit.Test;
24+
import org.junit.experimental.categories.Category;
25+
import org.junit.rules.ExpectedException;
26+
27+
import static org.junit.Assert.assertEquals;
28+
29+
30+
@Category({SlowTest.class, HiveStorageTest.class})
31+
public class TestHivePushDown extends HiveTestBase {
32+
33+
@Rule
34+
public ExpectedException thrown = ExpectedException.none();
35+
36+
@Test
37+
public void testLimitPushDown() throws Exception {
38+
String query = "SELECT * FROM hive.`default`.kv LIMIT 1";
39+
40+
int actualRowCount = testSql(query);
41+
assertEquals("Expected and actual row count should match", 1, actualRowCount);
42+
43+
testPlanMatchingPatterns(query, new String[]{"LIMIT"}, new String[]{"maxRecords=1"});
44+
}
45+
}

0 commit comments

Comments
 (0)