Skip to content

Commit 27f3359

Browse files
DRILL-8528: Hbase Limit Push Down (#3000)
1 parent 78a5208 commit 27f3359

7 files changed

Lines changed: 121 additions & 14 deletions

File tree

contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.TreeMap;
3636
import java.util.concurrent.TimeUnit;
3737

38+
import org.apache.drill.common.PlanStringBuilder;
3839
import org.apache.drill.common.exceptions.DrillRuntimeException;
3940
import org.apache.drill.common.exceptions.ExecutionSetupException;
4041
import org.apache.drill.common.expression.SchemaPath;
@@ -99,22 +100,26 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
99100

100101
private long scanSizeInBytes = 0;
101102

103+
private final int maxRecords;
104+
102105
@JsonCreator
103106
public HBaseGroupScan(@JsonProperty("userName") String userName,
104107
@JsonProperty("hbaseScanSpec") HBaseScanSpec hbaseScanSpec,
105108
@JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig,
106109
@JsonProperty("columns") List<SchemaPath> columns,
110+
@JsonProperty("maxRecords") int maxRecords,
107111
@JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException {
108-
this (userName, pluginRegistry.resolve(storagePluginConfig, HBaseStoragePlugin.class), hbaseScanSpec, columns);
112+
this (userName, pluginRegistry.resolve(storagePluginConfig, HBaseStoragePlugin.class), hbaseScanSpec, columns, maxRecords);
109113
}
110114

111115
public HBaseGroupScan(String userName, HBaseStoragePlugin storagePlugin, HBaseScanSpec scanSpec,
112-
List<SchemaPath> columns) {
116+
List<SchemaPath> columns, int maxRecords) {
113117
super(userName);
114118
this.storagePlugin = storagePlugin;
115119
this.storagePluginConfig = storagePlugin.getConfig();
116120
this.hbaseScanSpec = scanSpec;
117121
this.columns = columns == null ? ALL_COLUMNS : columns;
122+
this.maxRecords = maxRecords;
118123
init();
119124
}
120125

@@ -134,6 +139,22 @@ private HBaseGroupScan(HBaseGroupScan that) {
134139
this.filterPushedDown = that.filterPushedDown;
135140
this.statsCalculator = that.statsCalculator;
136141
this.scanSizeInBytes = that.scanSizeInBytes;
142+
this.maxRecords = that.maxRecords;
143+
}
144+
145+
private HBaseGroupScan(HBaseGroupScan that, int maxRecords) {
146+
super(that);
147+
this.columns = that.columns == null ? ALL_COLUMNS : that.columns;
148+
this.hbaseScanSpec = that.hbaseScanSpec;
149+
this.endpointFragmentMapping = that.endpointFragmentMapping;
150+
this.regionsToScan = that.regionsToScan;
151+
this.storagePlugin = that.storagePlugin;
152+
this.storagePluginConfig = that.storagePluginConfig;
153+
this.hTableDesc = that.hTableDesc;
154+
this.filterPushedDown = that.filterPushedDown;
155+
this.statsCalculator = that.statsCalculator;
156+
this.scanSizeInBytes = that.scanSizeInBytes;
157+
this.maxRecords = maxRecords;
137158
}
138159

139160
@Override
@@ -329,7 +350,7 @@ public HBaseSubScan getSpecificScan(int minorFragmentId) {
329350
assert minorFragmentId < endpointFragmentMapping.size() : String.format(
330351
"Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(),
331352
minorFragmentId);
332-
return new HBaseSubScan(getUserName(), storagePlugin, endpointFragmentMapping.get(minorFragmentId), columns);
353+
return new HBaseSubScan(getUserName(), storagePlugin, endpointFragmentMapping.get(minorFragmentId), columns, maxRecords);
333354
}
334355

335356
@Override
@@ -380,9 +401,11 @@ public String getDigest() {
380401

381402
@Override
382403
public String toString() {
383-
return "HBaseGroupScan [HBaseScanSpec="
384-
+ hbaseScanSpec + ", columns="
385-
+ columns + "]";
404+
return new PlanStringBuilder(this)
405+
.field("hbaseScanSpec", hbaseScanSpec)
406+
.field("columns", columns)
407+
.field("maxRecords", maxRecords)
408+
.toString();
386409
}
387410

388411
@JsonProperty("storage")
@@ -396,6 +419,31 @@ public List<SchemaPath> getColumns() {
396419
return columns;
397420
}
398421

422+
@JsonProperty("maxRecords")
423+
public int getMaxRecords() {
424+
return maxRecords;
425+
}
426+
427+
/**
428+
* Default is not to support limit pushdown.
429+
*/
430+
@Override
431+
public boolean supportsLimitPushdown() {
432+
return true;
433+
}
434+
435+
/**
436+
* By default, return null to indicate row count based prune is not supported.
437+
* Each group scan subclass should override, if it supports row count based prune.
438+
*/
439+
@Override
440+
public GroupScan applyLimit(int maxRecords) {
441+
if (maxRecords == this.maxRecords){
442+
return null;
443+
}
444+
return new HBaseGroupScan(this, maxRecords);
445+
}
446+
399447
@JsonProperty
400448
public HBaseScanSpec getHBaseScanSpec() {
401449
return hbaseScanSpec;
@@ -423,6 +471,7 @@ public boolean isFilterPushedDown() {
423471
@VisibleForTesting
424472
public HBaseGroupScan() {
425473
super((String)null);
474+
maxRecords = -1;
426475
}
427476

428477
/**

contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ protected void doPushFilterToScan(final RelOptRuleCall call, final FilterPrel fi
122122
}
123123

124124
final HBaseGroupScan newGroupsScan = new HBaseGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
125-
newScanSpec, groupScan.getColumns());
125+
newScanSpec, groupScan.getColumns(), groupScan.getMaxRecords());
126126
newGroupsScan.setFilterPushedDown(true);
127127

128128
final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable());

contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,15 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
8484

8585
private final Connection connection;
8686

87-
public HBaseRecordReader(Connection connection, HBaseSubScan.HBaseSubScanSpec subScanSpec, List<SchemaPath> projectedColumns) {
87+
public HBaseRecordReader(Connection connection, HBaseSubScan.HBaseSubScanSpec subScanSpec, List<SchemaPath> projectedColumns, int maxRecords) {
8888
this.connection = connection;
8989
hbaseTableName = TableName.valueOf(
9090
Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan spec").getTableName());
9191
hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
9292
hbaseScanColumnsOnly = new Scan();
93+
// Set the limit of rows for this scan. We will terminate the scan if the number of returned rows reaches this value.
9394
hbaseScan
95+
.setLimit(maxRecords)
9496
.setFilter(subScanSpec.getScanFilter())
9597
.setCaching(TARGET_RECORD_COUNT);
9698

contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public ScanBatch getBatch(ExecutorFragmentContext context, HBaseSubScan subScan,
4444
if ((columns = subScan.getColumns())==null) {
4545
columns = GroupScan.ALL_COLUMNS;
4646
}
47-
readers.add(new HBaseRecordReader(subScan.getStorageEngine().getConnection(), scanSpec, columns));
47+
readers.add(new HBaseRecordReader(subScan.getStorageEngine().getConnection(), scanSpec, columns, subScan.getMaxRecords()));
4848
} catch (Exception e1) {
4949
throw new ExecutionSetupException(e1);
5050
}

contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public boolean supportsRead() {
6161
@Override
6262
public HBaseGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
6363
HBaseScanSpec scanSpec = selection.getListWith(new TypeReference<HBaseScanSpec>() {});
64-
return new HBaseGroupScan(userName, this, scanSpec, null);
64+
return new HBaseGroupScan(userName, this, scanSpec, null, -1);
6565
}
6666

6767
@Override

contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,27 +50,30 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
5050
private final HBaseStoragePlugin hbaseStoragePlugin;
5151
private final List<HBaseSubScanSpec> regionScanSpecList;
5252
private final List<SchemaPath> columns;
53+
private final int maxRecords;
5354

5455
@JsonCreator
5556
public HBaseSubScan(@JacksonInject StoragePluginRegistry registry,
5657
@JsonProperty("userName") String userName,
5758
@JsonProperty("hbaseStoragePluginConfig") HBaseStoragePluginConfig hbaseStoragePluginConfig,
5859
@JsonProperty("regionScanSpecList") LinkedList<HBaseSubScanSpec> regionScanSpecList,
59-
@JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
60+
@JsonProperty("columns") List<SchemaPath> columns,
61+
@JsonProperty("maxRecords") int maxRecords) throws ExecutionSetupException {
6062
this(userName,
6163
registry.resolve(hbaseStoragePluginConfig, HBaseStoragePlugin.class),
6264
regionScanSpecList,
63-
columns);
65+
columns, maxRecords);
6466
}
6567

6668
public HBaseSubScan(String userName,
6769
HBaseStoragePlugin hbaseStoragePlugin,
6870
List<HBaseSubScanSpec> regionInfoList,
69-
List<SchemaPath> columns) {
71+
List<SchemaPath> columns, int maxRecords) {
7072
super(userName);
7173
this.hbaseStoragePlugin = hbaseStoragePlugin;
7274
this.regionScanSpecList = regionInfoList;
7375
this.columns = columns;
76+
this.maxRecords = maxRecords;
7477
}
7578

7679
@JsonProperty
@@ -98,6 +101,11 @@ public HBaseStoragePlugin getStorageEngine(){
98101
return hbaseStoragePlugin;
99102
}
100103

104+
@JsonIgnore
105+
public int getMaxRecords() {
106+
return maxRecords;
107+
}
108+
101109
@Override
102110
public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
103111
return physicalVisitor.visitSubScan(this, value);
@@ -106,7 +114,7 @@ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVis
106114
@Override
107115
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
108116
Preconditions.checkArgument(children.isEmpty());
109-
return new HBaseSubScan(getUserName(), hbaseStoragePlugin, regionScanSpecList, columns);
117+
return new HBaseSubScan(getUserName(), hbaseStoragePlugin, regionScanSpecList, columns, maxRecords);
110118
}
111119

112120
@Override

contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,54 @@ public void testFilterPushDownRowKeyEqual() throws Exception {
4444
PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan, excludedPlan);
4545
}
4646

47+
@Test
48+
public void testLimitPushDown() throws Exception {
49+
final String sql = "SELECT\n"
50+
+ " *\n"
51+
+ "FROM\n"
52+
+ " hbase.`[TABLE_NAME]` tableName\n"
53+
+ "LIMIT 3\n";
54+
55+
runHBaseSQLVerifyCount(sql, 3);
56+
57+
final String[] expectedPlan = {"Limit\\(fetch\\=\\[3\\]\\)", "maxRecords\\=3"};
58+
final String[] excludedPlan ={};
59+
final String sqlHBase = canonizeHBaseSQL(sql);
60+
PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan, excludedPlan);
61+
}
62+
63+
@Test
64+
public void testLimitPushDownWithSpecial() throws Exception {
65+
final String sql = "SELECT\n"
66+
+ " *\n"
67+
+ "FROM\n"
68+
+ " hbase.`[TABLE_NAME]` tableName\n"
69+
+ "LIMIT 0\n";
70+
71+
runHBaseSQLVerifyCount(sql, 0);
72+
73+
final String[] expectedPlan = {"Limit\\(fetch\\=\\[0\\]\\)", "Limit\\(offset\\=\\[0\\]\\, fetch\\=\\[0\\]\\)", "maxRecords\\=0"};
74+
final String[] excludedPlan ={};
75+
final String sqlHBase = canonizeHBaseSQL(sql);
76+
PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan, excludedPlan);
77+
}
78+
79+
@Test
80+
public void testLimitPushDownWithOffset() throws Exception {
81+
final String sql = "SELECT\n"
82+
+ " *\n"
83+
+ "FROM\n"
84+
+ " hbase.`[TABLE_NAME]` tableName\n"
85+
+ "LIMIT 2 offset 2\n";
86+
87+
runHBaseSQLVerifyCount(sql, 2);
88+
89+
final String[] expectedPlan = {"Limit\\(offset\\=\\[2\\]\\, fetch\\=\\[2\\]\\)", "maxRecords\\=4"};
90+
final String[] excludedPlan ={};
91+
final String sqlHBase = canonizeHBaseSQL(sql);
92+
PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan, excludedPlan);
93+
}
94+
4795
@Test
4896
public void testFilterPushDownRowKeyNotEqual() throws Exception {
4997
setColumnWidths(new int[] {8, 38, 38});

0 commit comments

Comments
 (0)