Skip to content

Commit f1c632f

Browse files
committed
DRILL-8526: Hive Predicate Push Down for ORC and Parquet
1 parent 4663992 commit f1c632f

8 files changed

Lines changed: 898 additions & 8 deletions

File tree

contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.List;
2424
import java.util.Map;
2525

26+
import org.apache.drill.common.PlanStringBuilder;
2627
import org.apache.drill.common.exceptions.DrillRuntimeException;
2728
import org.apache.drill.common.exceptions.ExecutionSetupException;
2829
import org.apache.drill.common.expression.SchemaPath;
@@ -39,11 +40,13 @@
3940
import org.apache.drill.exec.store.hive.HiveMetadataProvider.HiveStats;
4041
import org.apache.drill.exec.store.hive.HiveMetadataProvider.LogicalInputSplit;
4142
import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
43+
import org.apache.drill.exec.store.hive.readers.filter.HiveFilter;
4244
import org.apache.drill.exec.util.Utilities;
4345
import org.apache.hadoop.hive.conf.HiveConf;
4446
import org.apache.hadoop.hive.metastore.api.FieldSchema;
4547
import org.apache.hadoop.hive.metastore.api.Partition;
4648
import org.apache.hadoop.hive.metastore.api.Table;
49+
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
4750
import org.slf4j.Logger;
4851
import org.slf4j.LoggerFactory;
4952

@@ -54,6 +57,7 @@
5457
import com.fasterxml.jackson.annotation.JsonTypeName;
5558

5659
import static org.apache.drill.exec.store.hive.HiveUtilities.createPartitionWithSpecColumns;
60+
import static org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg.SARG_PUSHDOWN;
5761

5862
@JsonTypeName("hive-scan")
5963
public class HiveScan extends AbstractGroupScan {
@@ -70,7 +74,7 @@ public class HiveScan extends AbstractGroupScan {
7074
private List<LogicalInputSplit> inputSplits;
7175

7276
protected List<SchemaPath> columns;
73-
77+
private boolean filterPushedDown = false;
7478
@JsonCreator
7579
public HiveScan(@JsonProperty("userName") final String userName,
7680
@JsonProperty("hiveReadEntry") final HiveReadEntry hiveReadEntry,
@@ -154,6 +158,16 @@ public boolean supportsPartitionFilterPushdown() {
154158
return !(partitionKeys == null || partitionKeys.size() == 0);
155159
}
156160

161+
@JsonIgnore
162+
public void setFilterPushedDown(boolean isPushedDown) {
163+
this.filterPushedDown = isPushedDown;
164+
}
165+
166+
@JsonIgnore
167+
public boolean isFilterPushedDown() {
168+
return filterPushedDown;
169+
}
170+
157171
@Override
158172
public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) {
159173
mappings = new ArrayList<>();
@@ -265,13 +279,17 @@ public String getDigest() {
265279
public String toString() {
266280
List<HivePartitionWrapper> partitions = hiveReadEntry.getHivePartitionWrappers();
267281
int numPartitions = partitions == null ? 0 : partitions.size();
268-
return "HiveScan [table=" + hiveReadEntry.getHiveTableWrapper()
269-
+ ", columns=" + columns
270-
+ ", numPartitions=" + numPartitions
271-
+ ", partitions= " + partitions
272-
+ ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry)
273-
+ ", confProperties=" + confProperties
274-
+ "]";
282+
String SearchArgumentString = confProperties.get(SARG_PUSHDOWN);
283+
SearchArgument searchArgument = SearchArgumentString == null ? null : HiveFilter.create(SearchArgumentString);
284+
285+
return new PlanStringBuilder(this)
286+
.field("table", hiveReadEntry.getHiveTableWrapper())
287+
.field("columns", columns)
288+
.field("numPartitions", numPartitions)
289+
.field("inputDirectories", metadataProvider.getInputDirectories(hiveReadEntry))
290+
.field("confProperties", confProperties)
291+
.field("SearchArgument", searchArgument)
292+
.toString();
275293
}
276294

277295
@Override

contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.drill.exec.store.AbstractStoragePlugin;
4949
import org.apache.drill.exec.store.SchemaConfig;
5050
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
51+
import org.apache.drill.exec.store.hive.readers.filter.HivePushFilterIntoScan;
5152
import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory;
5253
import com.google.common.collect.ImmutableSet;
5354

@@ -200,6 +201,8 @@ public Set<StoragePluginOptimizerRule> getOptimizerRules(OptimizerRulesContext o
200201
options.getBoolean(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER)) {
201202
ruleBuilder.add(ConvertHiveParquetScanToDrillParquetScan.INSTANCE);
202203
}
204+
ruleBuilder.add(HivePushFilterIntoScan.FILTER_ON_PROJECT);
205+
ruleBuilder.add(HivePushFilterIntoScan.FILTER_ON_SCAN);
203206
return ruleBuilder.build();
204207
}
205208
default:
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.drill.exec.store.hive.readers.filter;
19+
20+
import com.google.common.collect.ImmutableMap;
21+
import com.google.common.collect.ImmutableSet;
22+
import org.apache.drill.common.FunctionNames;
23+
import org.apache.drill.common.expression.CastExpression;
24+
import org.apache.drill.common.expression.FunctionCall;
25+
import org.apache.drill.common.expression.LogicalExpression;
26+
import org.apache.drill.common.expression.SchemaPath;
27+
import org.apache.drill.common.expression.ValueExpressions;
28+
import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
29+
import org.apache.drill.common.expression.ValueExpressions.DateExpression;
30+
import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
31+
import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
32+
import org.apache.drill.common.expression.ValueExpressions.IntExpression;
33+
import org.apache.drill.common.expression.ValueExpressions.LongExpression;
34+
import org.apache.drill.common.expression.ValueExpressions.QuotedString;
35+
import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
36+
import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
37+
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
38+
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
39+
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
40+
41+
import java.sql.Timestamp;
42+
43+
public class HiveCompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
44+
45+
private static final ImmutableSet<String> IS_FUNCTIONS_SET;
46+
47+
private Object value;
48+
private PredicateLeaf.Type valueType;
49+
private boolean success;
50+
private SchemaPath path;
51+
private String functionName;
52+
53+
public HiveCompareFunctionsProcessor(String functionName) {
54+
this.success = false;
55+
this.functionName = functionName;
56+
}
57+
58+
static {
59+
ImmutableSet.Builder<String> builder = ImmutableSet.builder();
60+
IS_FUNCTIONS_SET = builder
61+
.add(FunctionNames.IS_NOT_NULL)
62+
.add("isNotNull")
63+
.add("is not null")
64+
.add(FunctionNames.IS_NULL)
65+
.add("isNull")
66+
.add("is null")
67+
.add(FunctionNames.IS_TRUE)
68+
.add(FunctionNames.IS_NOT_TRUE)
69+
.add(FunctionNames.IS_FALSE)
70+
.add(FunctionNames.IS_NOT_FALSE)
71+
.build();
72+
73+
}
74+
75+
private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
76+
static {
77+
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
78+
COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
79+
// binary functions
80+
.put(FunctionNames.LIKE, FunctionNames.LIKE)
81+
.put(FunctionNames.EQ, FunctionNames.EQ)
82+
.put(FunctionNames.NE, FunctionNames.NE)
83+
.put(FunctionNames.GE, FunctionNames.LE)
84+
.put(FunctionNames.GT, FunctionNames.LT)
85+
.put(FunctionNames.LE, FunctionNames.GE)
86+
.put(FunctionNames.LT, FunctionNames.GT)
87+
.build();
88+
}
89+
90+
private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
91+
static {
92+
ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
93+
VALUE_EXPRESSION_CLASSES = builder
94+
.add(BooleanExpression.class)
95+
.add(DateExpression.class)
96+
.add(DoubleExpression.class)
97+
.add(FloatExpression.class)
98+
.add(IntExpression.class)
99+
.add(LongExpression.class)
100+
.add(QuotedString.class)
101+
.add(TimeExpression.class)
102+
.build();
103+
}
104+
105+
public static boolean isCompareFunction(final String functionName) {
106+
return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
107+
}
108+
109+
public static boolean isIsFunction(final String funcName) {
110+
return IS_FUNCTIONS_SET.contains(funcName);
111+
}
112+
113+
// shows whether function is simplified IS FALSE
114+
public static boolean isNot(final FunctionCall call, final String funcName) {
115+
return !call.args().isEmpty()
116+
&& FunctionNames.NOT.equals(funcName);
117+
}
118+
119+
public static HiveCompareFunctionsProcessor createFunctionsProcessorInstance(final FunctionCall call) {
120+
String functionName = call.getName();
121+
HiveCompareFunctionsProcessor evaluator = new HiveCompareFunctionsProcessor(functionName);
122+
123+
return createFunctionsProcessorInstanceInternal(call, evaluator);
124+
}
125+
126+
protected static <T extends HiveCompareFunctionsProcessor> T createFunctionsProcessorInstanceInternal(FunctionCall call, T evaluator) {
127+
LogicalExpression nameArg = call.arg(0);
128+
LogicalExpression valueArg = call.argCount() >= 2 ? call.arg(1) : null;
129+
if (valueArg != null) { // binary function
130+
if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
131+
LogicalExpression swapArg = valueArg;
132+
valueArg = nameArg;
133+
nameArg = swapArg;
134+
evaluator.setFunctionName(COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(evaluator.getFunctionName()));
135+
}
136+
evaluator.setSuccess(nameArg.accept(evaluator, valueArg));
137+
} else if (call.arg(0) instanceof SchemaPath) {
138+
evaluator.setPath((SchemaPath) nameArg);
139+
}
140+
evaluator.setSuccess(true);
141+
return evaluator;
142+
}
143+
144+
public boolean isSuccess() {
145+
return success;
146+
}
147+
148+
protected void setSuccess(boolean success) {
149+
this.success = success;
150+
}
151+
152+
public SchemaPath getPath() {
153+
return path;
154+
}
155+
156+
protected void setPath(SchemaPath path) {
157+
this.path = path;
158+
}
159+
160+
public String getFunctionName() {
161+
return functionName;
162+
}
163+
164+
protected void setFunctionName(String functionName) {
165+
this.functionName = functionName;
166+
}
167+
168+
public Object getValue() {
169+
return value;
170+
}
171+
172+
public void setValue(Object value) {
173+
this.value = value;
174+
}
175+
176+
public PredicateLeaf.Type getValueType() {
177+
return valueType;
178+
}
179+
180+
public void setValueType(PredicateLeaf.Type valueType) {
181+
this.valueType = valueType;
182+
}
183+
184+
@Override
185+
public Boolean visitCastExpression(CastExpression e, LogicalExpression valueArg) throws RuntimeException {
186+
if (e.getInput() instanceof CastExpression || e.getInput() instanceof SchemaPath) {
187+
return e.getInput().accept(this, valueArg);
188+
}
189+
return false;
190+
}
191+
192+
@Override
193+
public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
194+
return false;
195+
}
196+
197+
@Override
198+
public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException {
199+
if (valueArg instanceof QuotedString) {
200+
this.value = ((QuotedString) valueArg).getString();
201+
this.path = path;
202+
this.valueType = PredicateLeaf.Type.STRING;
203+
return true;
204+
}
205+
206+
if (valueArg instanceof IntExpression) {
207+
int expValue = ((IntExpression) valueArg).getInt();
208+
this.value = ((Integer) expValue).longValue();
209+
this.path = path;
210+
this.valueType = PredicateLeaf.Type.LONG;
211+
return true;
212+
}
213+
214+
if (valueArg instanceof LongExpression) {
215+
this.value = ((LongExpression) valueArg).getLong();
216+
this.path = path;
217+
this.valueType = PredicateLeaf.Type.LONG;
218+
return true;
219+
}
220+
221+
if (valueArg instanceof FloatExpression) {
222+
this.value = ((FloatExpression) valueArg).getFloat();
223+
this.path = path;
224+
this.valueType = PredicateLeaf.Type.FLOAT;
225+
return true;
226+
}
227+
228+
if (valueArg instanceof DoubleExpression) {
229+
this.value = ((DoubleExpression) valueArg).getDouble();
230+
this.path = path;
231+
this.valueType = PredicateLeaf.Type.FLOAT;
232+
return true;
233+
}
234+
235+
if (valueArg instanceof BooleanExpression) {
236+
this.value = ((BooleanExpression) valueArg).getBoolean();
237+
this.path = path;
238+
this.valueType = PredicateLeaf.Type.BOOLEAN;
239+
return true;
240+
}
241+
242+
if (valueArg instanceof DateExpression) {
243+
this.value = ((DateExpression) valueArg).getDate();
244+
this.path = path;
245+
this.valueType = PredicateLeaf.Type.LONG;
246+
return true;
247+
}
248+
249+
if (valueArg instanceof TimeStampExpression) {
250+
long timeStamp = ((TimeStampExpression) valueArg).getTimeStamp();
251+
this.value = new Timestamp(timeStamp);
252+
this.path = path;
253+
this.valueType = PredicateLeaf.Type.TIMESTAMP;
254+
return true;
255+
}
256+
257+
if (valueArg instanceof ValueExpressions.VarDecimalExpression) {
258+
double v = ((ValueExpressions.VarDecimalExpression) valueArg).getBigDecimal().doubleValue();
259+
this.value = new HiveDecimalWritable(String.valueOf(v));
260+
this.path = path;
261+
this.valueType = PredicateLeaf.Type.DECIMAL;
262+
return true;
263+
}
264+
return false;
265+
}
266+
}

0 commit comments

Comments
 (0)