Skip to content

Commit 019ce0d

Browse files
committed
DRILL-8526: Hive Predicate Push Down for ORC and Parquet
1 parent 4663992 commit 019ce0d

6 files changed

Lines changed: 720 additions & 1 deletion

File tree

contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class HiveScan extends AbstractGroupScan {
7070
private List<LogicalInputSplit> inputSplits;
7171

7272
protected List<SchemaPath> columns;
73-
73+
private boolean filterPushedDown = false;
7474
@JsonCreator
7575
public HiveScan(@JsonProperty("userName") final String userName,
7676
@JsonProperty("hiveReadEntry") final HiveReadEntry hiveReadEntry,
@@ -154,6 +154,16 @@ public boolean supportsPartitionFilterPushdown() {
154154
return !(partitionKeys == null || partitionKeys.size() == 0);
155155
}
156156

157+
@JsonIgnore
158+
public void setFilterPushedDown(boolean isPushedDown) {
159+
this.filterPushedDown = isPushedDown;
160+
}
161+
162+
@JsonIgnore
163+
public boolean isFilterPushedDown() {
164+
return filterPushedDown;
165+
}
166+
157167
@Override
158168
public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) {
159169
mappings = new ArrayList<>();

contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.drill.exec.store.AbstractStoragePlugin;
4949
import org.apache.drill.exec.store.SchemaConfig;
5050
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
51+
import org.apache.drill.exec.store.hive.readers.filter.HivePushFilterIntoScan;
5152
import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory;
5253
import com.google.common.collect.ImmutableSet;
5354

@@ -200,6 +201,8 @@ public Set<StoragePluginOptimizerRule> getOptimizerRules(OptimizerRulesContext o
200201
options.getBoolean(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER)) {
201202
ruleBuilder.add(ConvertHiveParquetScanToDrillParquetScan.INSTANCE);
202203
}
204+
ruleBuilder.add(HivePushFilterIntoScan.FILTER_ON_PROJECT);
205+
ruleBuilder.add(HivePushFilterIntoScan.FILTER_ON_SCAN);
203206
return ruleBuilder.build();
204207
}
205208
default:
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.drill.exec.store.hive.readers.filter;
19+
20+
import com.google.common.collect.ImmutableMap;
21+
import com.google.common.collect.ImmutableSet;
22+
import org.apache.drill.common.FunctionNames;
23+
import org.apache.drill.common.expression.CastExpression;
24+
import org.apache.drill.common.expression.ConvertExpression;
25+
import org.apache.drill.common.expression.FunctionCall;
26+
import org.apache.drill.common.expression.LogicalExpression;
27+
import org.apache.drill.common.expression.SchemaPath;
28+
import org.apache.drill.common.expression.ValueExpressions;
29+
import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
30+
import org.apache.drill.common.expression.ValueExpressions.DateExpression;
31+
import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
32+
import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
33+
import org.apache.drill.common.expression.ValueExpressions.IntExpression;
34+
import org.apache.drill.common.expression.ValueExpressions.LongExpression;
35+
import org.apache.drill.common.expression.ValueExpressions.QuotedString;
36+
import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
37+
import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
38+
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
39+
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
40+
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
41+
42+
import java.sql.Timestamp;
43+
import java.util.regex.Pattern;
44+
45+
public class HiveCompareFunctionsProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
46+
47+
private static final ImmutableSet<String> IS_FUNCTIONS_SET;
48+
static {
49+
ImmutableSet.Builder<String> builder = ImmutableSet.builder();
50+
IS_FUNCTIONS_SET = builder
51+
.add(FunctionNames.IS_NOT_NULL)
52+
.add("isNotNull")
53+
.add("is not null")
54+
.add(FunctionNames.IS_NULL)
55+
.add("isNull")
56+
.add("is null")
57+
.add(FunctionNames.IS_TRUE)
58+
.add(FunctionNames.IS_NOT_TRUE)
59+
.add(FunctionNames.IS_FALSE)
60+
.add(FunctionNames.IS_NOT_FALSE)
61+
.build();
62+
63+
}
64+
65+
private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
66+
static {
67+
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
68+
COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
69+
// binary functions
70+
.put(FunctionNames.LIKE, FunctionNames.LIKE)
71+
.put(FunctionNames.EQ, FunctionNames.EQ)
72+
.put(FunctionNames.NE, FunctionNames.NE)
73+
.put(FunctionNames.GE, FunctionNames.LE)
74+
.put(FunctionNames.GT, FunctionNames.LT)
75+
.put(FunctionNames.LE, FunctionNames.GE)
76+
.put(FunctionNames.LT, FunctionNames.GT)
77+
.build();
78+
}
79+
80+
private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
81+
static {
82+
ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
83+
VALUE_EXPRESSION_CLASSES = builder
84+
.add(BooleanExpression.class)
85+
.add(DateExpression.class)
86+
.add(DoubleExpression.class)
87+
.add(FloatExpression.class)
88+
.add(IntExpression.class)
89+
.add(LongExpression.class)
90+
.add(QuotedString.class)
91+
.add(TimeExpression.class)
92+
.build();
93+
}
94+
95+
private Object value;
96+
private PredicateLeaf.Type valueType;
97+
private boolean success;
98+
private SchemaPath path;
99+
private String functionName;
100+
101+
public static boolean isCompareFunction(String functionName) {
102+
return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
103+
}
104+
105+
public static boolean isIsFunction(String funcName) {
106+
return IS_FUNCTIONS_SET.contains(funcName);
107+
}
108+
109+
// shows whether function is simplified IS FALSE
110+
public static boolean isNot(FunctionCall call, String funcName) {
111+
return !call.args().isEmpty()
112+
&& FunctionNames.NOT.equals(funcName);
113+
}
114+
115+
public static HiveCompareFunctionsProcessor createFunctionsProcessorInstance(FunctionCall call) {
116+
String functionName = call.getName();
117+
HiveCompareFunctionsProcessor evaluator = new HiveCompareFunctionsProcessor(functionName);
118+
119+
return createFunctionsProcessorInstanceInternal(call, evaluator);
120+
}
121+
122+
protected static <T extends HiveCompareFunctionsProcessor> T createFunctionsProcessorInstanceInternal(FunctionCall call, T evaluator) {
123+
LogicalExpression nameArg = call.arg(0);
124+
LogicalExpression valueArg = call.argCount() >= 2 ? call.arg(1) : null;
125+
if (valueArg != null) { // binary function
126+
if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
127+
LogicalExpression swapArg = valueArg;
128+
valueArg = nameArg;
129+
nameArg = swapArg;
130+
evaluator.setFunctionName(COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(evaluator.getFunctionName()));
131+
}
132+
evaluator.setSuccess(nameArg.accept(evaluator, valueArg));
133+
} else if (call.arg(0) instanceof SchemaPath) {
134+
evaluator.setPath((SchemaPath) nameArg);
135+
}
136+
evaluator.setSuccess(true);
137+
return evaluator;
138+
}
139+
140+
public HiveCompareFunctionsProcessor(String functionName) {
141+
this.success = false;
142+
this.functionName = functionName;
143+
}
144+
145+
public boolean isSuccess() {
146+
return success;
147+
}
148+
149+
protected void setSuccess(boolean success) {
150+
this.success = success;
151+
}
152+
153+
public SchemaPath getPath() {
154+
return path;
155+
}
156+
157+
protected void setPath(SchemaPath path) {
158+
this.path = path;
159+
}
160+
161+
public String getFunctionName() {
162+
return functionName;
163+
}
164+
165+
protected void setFunctionName(String functionName) {
166+
this.functionName = functionName;
167+
}
168+
169+
public Object getValue() {
170+
return value;
171+
}
172+
173+
public void setValue(Object value) {
174+
this.value = value;
175+
}
176+
177+
public PredicateLeaf.Type getValueType() {
178+
return valueType;
179+
}
180+
181+
public void setValueType(PredicateLeaf.Type valueType) {
182+
this.valueType = valueType;
183+
}
184+
185+
@Override
186+
public Boolean visitCastExpression(CastExpression e, LogicalExpression valueArg) throws RuntimeException {
187+
if (e.getInput() instanceof CastExpression || e.getInput() instanceof SchemaPath) {
188+
return e.getInput().accept(this, valueArg);
189+
}
190+
return false;
191+
}
192+
@Override
193+
public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
194+
return false;
195+
}
196+
197+
@Override
198+
public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException {
199+
if (valueArg instanceof QuotedString) {
200+
this.value = ((QuotedString) valueArg).getString();
201+
this.path = path;
202+
this.valueType = PredicateLeaf.Type.STRING;
203+
return true;
204+
}
205+
206+
if (valueArg instanceof IntExpression) {
207+
int expValue = ((IntExpression) valueArg).getInt();
208+
this.value = ((Integer) expValue).longValue();
209+
this.path = path;
210+
this.valueType = PredicateLeaf.Type.LONG;
211+
return true;
212+
}
213+
214+
if (valueArg instanceof LongExpression) {
215+
this.value = ((LongExpression) valueArg).getLong();
216+
this.path = path;
217+
this.valueType = PredicateLeaf.Type.LONG;
218+
return true;
219+
}
220+
221+
if (valueArg instanceof FloatExpression) {
222+
this.value = ((FloatExpression) valueArg).getFloat();
223+
this.path = path;
224+
this.valueType = PredicateLeaf.Type.FLOAT;
225+
return true;
226+
}
227+
228+
if (valueArg instanceof DoubleExpression) {
229+
this.value = ((DoubleExpression) valueArg).getDouble();
230+
this.path = path;
231+
this.valueType = PredicateLeaf.Type.FLOAT;
232+
return true;
233+
}
234+
235+
if (valueArg instanceof BooleanExpression) {
236+
this.value = ((BooleanExpression) valueArg).getBoolean();
237+
this.path = path;
238+
this.valueType = PredicateLeaf.Type.BOOLEAN;
239+
return true;
240+
}
241+
242+
if (valueArg instanceof DateExpression) {
243+
this.value = ((DateExpression) valueArg).getDate();
244+
this.path = path;
245+
this.valueType = PredicateLeaf.Type.LONG;
246+
return true;
247+
}
248+
249+
if (valueArg instanceof TimeStampExpression) {
250+
long timeStamp = ((TimeStampExpression) valueArg).getTimeStamp();
251+
this.value = new Timestamp(timeStamp);
252+
this.path = path;
253+
this.valueType = PredicateLeaf.Type.TIMESTAMP;
254+
return true;
255+
}
256+
257+
if (valueArg instanceof ValueExpressions.VarDecimalExpression) {
258+
double v = ((ValueExpressions.VarDecimalExpression) valueArg).getBigDecimal().doubleValue();
259+
this.value = new HiveDecimalWritable(String.valueOf(v));
260+
this.path = path;
261+
this.valueType = PredicateLeaf.Type.DECIMAL;
262+
return true;
263+
}
264+
return false;
265+
}
266+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package org.apache.drill.exec.store.hive.readers.filter;
2+
/*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
import org.apache.commons.codec.binary.Base64;
20+
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
21+
import org.apache.hive.com.esotericsoftware.kryo.Kryo;
22+
import org.apache.hive.com.esotericsoftware.kryo.io.Output;
23+
/**
24+
* Primary interface for <a href="http://en.wikipedia.org/wiki/Sargable">
25+
* SearchArgument</a>, which are the subset of predicates
26+
* that can be pushed down to the RecordReader. Each SearchArgument consists
27+
* of a series of SearchClauses that must each be true for the row to be
28+
* accepted by the filter.
29+
*
30+
* This requires that the filter be normalized into conjunctive normal form
31+
* (<a href="http://en.wikipedia.org/wiki/Conjunctive_normal_form">CNF</a>).
32+
*/
33+
public class HiveFilter {
34+
35+
private final SearchArgument searchArgument;
36+
37+
private static String toKryo(SearchArgument sarg) {
38+
Output out = new Output(4 * 1024, 10 * 1024 * 1024);
39+
new Kryo().writeObject(out, sarg);
40+
out.close();
41+
return Base64.encodeBase64String(out.toBytes());
42+
}
43+
44+
public HiveFilter(SearchArgument searchArgument) {
45+
this.searchArgument = searchArgument;
46+
}
47+
48+
public SearchArgument getSearchArgument() {
49+
return searchArgument;
50+
}
51+
52+
public String getSearchArgumentString() {
53+
return toKryo(searchArgument);
54+
}
55+
}

0 commit comments

Comments
 (0)