Skip to content

Commit 097122e

Browse files
committed
DRILL-7164: KafkaFilterPushdownTest is sometimes failing to pattern match correctly
1 parent a148aac commit 097122e

2 files changed

Lines changed: 101 additions & 43 deletions

File tree

contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java

Lines changed: 69 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
*/
1818
package org.apache.drill.exec.store.kafka;
1919

20+
import org.apache.drill.PlanTestBase;
2021
import org.apache.drill.categories.KafkaStorageTest;
2122
import org.apache.drill.categories.SlowTest;
2223
import org.apache.kafka.common.serialization.StringSerializer;
23-
2424
import org.junit.BeforeClass;
2525
import org.junit.Test;
2626
import org.junit.experimental.categories.Category;
@@ -32,10 +32,8 @@
3232
@Category({KafkaStorageTest.class, SlowTest.class})
3333
public class KafkaFilterPushdownTest extends KafkaTestBase {
3434
private static final int NUM_PARTITIONS = 5;
35-
private static final String expectedSubStr = " \"kafkaScanSpec\" : {\n" +
36-
" \"topicName\" : \"drill-pushdown-topic\"\n" +
37-
" },\n" +
38-
" \"cost\"";
35+
private static final String expectedPattern = "kafkaScanSpec.*\\n.*\"topicName\" : \"drill-pushdown-topic\"\\n(" +
36+
".*\\n)?(.*\\n)?(.*\\n)?.*cost\"(.*\\n)(.*\\n).*outputRowCount\" : (%s.0)";
3937

4038
@BeforeClass
4139
public static void setup() throws Exception {
@@ -63,7 +61,9 @@ public void testPushdownOnOffset() throws Exception {
6361
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
6462

6563
runKafkaSQLVerifyCount(queryString, expectedRowCount);
66-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
64+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
65+
new String[] {String.format(expectedPattern, expectedRowCount)},
66+
new String[]{});
6767
}
6868

6969
/**
@@ -79,7 +79,9 @@ public void testPushdownOnPartition() throws Exception {
7979
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
8080

8181
runKafkaSQLVerifyCount(queryString, expectedRowCount);
82-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
82+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
83+
new String[] {String.format(expectedPattern, expectedRowCount)},
84+
new String[]{});
8385
}
8486

8587
/**
@@ -95,7 +97,9 @@ public void testPushdownOnTimestamp() throws Exception {
9597
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
9698

9799
runKafkaSQLVerifyCount(queryString,expectedRowCount);
98-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
100+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
101+
new String[] {String.format(expectedPattern, expectedRowCount)},
102+
new String[]{});
99103
}
100104

101105
/**
@@ -112,7 +116,9 @@ public void testPushdownUnorderedTimestamp() throws Exception {
112116
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
113117

114118
runKafkaSQLVerifyCount(queryString,expectedRowCount);
115-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowInPlan));
119+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
120+
new String[] {String.format(expectedPattern, expectedRowInPlan)},
121+
new String[]{});
116122
}
117123

118124
/**
@@ -128,7 +134,9 @@ public void testPushdownWhenTimestampDoesNotExist() throws Exception {
128134
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
129135

130136
runKafkaSQLVerifyCount(queryString,expectedRowCount);
131-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
137+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
138+
new String[] {String.format(expectedPattern, expectedRowCount)},
139+
new String[]{});
132140
}
133141

134142
/**
@@ -144,7 +152,9 @@ public void testPushdownWhenPartitionDoesNotExist() throws Exception {
144152
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
145153

146154
runKafkaSQLVerifyCount(queryString,expectedRowCount);
147-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
155+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
156+
new String[] {String.format(expectedPattern, expectedRowCount)},
157+
new String[]{});
148158
}
149159

150160
/**
@@ -161,7 +171,9 @@ public void testPushdownForEmptyScanSpec() throws Exception {
161171
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
162172

163173
runKafkaSQLVerifyCount(queryString,expectedRowCount);
164-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
174+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
175+
new String[] {String.format(expectedPattern, expectedRowCount)},
176+
new String[]{});
165177
}
166178

167179
/**
@@ -178,42 +190,54 @@ public void testPushdownOffsetNoRecordsReturnedWithBoundaryConditions() throws E
178190
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 10");
179191

180192
runKafkaSQLVerifyCount(queryString,expectedRowCount);
181-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
193+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
194+
new String[] {String.format(expectedPattern, expectedRowCount)},
195+
new String[]{});
182196

183197
//"equal" such that value < startOffset
184198
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
185199
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = -1");
186200

187201
runKafkaSQLVerifyCount(queryString,expectedRowCount);
188-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
202+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
203+
new String[] {String.format(expectedPattern, expectedRowCount)},
204+
new String[]{});
189205

190206
//"greater_than" such that value = endOffset-1
191207
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
192208
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 9");
193209

194210
runKafkaSQLVerifyCount(queryString,expectedRowCount);
195-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
211+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
212+
new String[] {String.format(expectedPattern, expectedRowCount)},
213+
new String[]{});
196214

197215
//"greater_than_or_equal" such that value = endOffset
198216
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
199217
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 10");
200218

201219
runKafkaSQLVerifyCount(queryString,expectedRowCount);
202-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
220+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
221+
new String[] {String.format(expectedPattern, expectedRowCount)},
222+
new String[]{});
203223

204224
//"less_than" such that value = startOffset
205225
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
206226
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset < 0");
207227

208228
runKafkaSQLVerifyCount(queryString,expectedRowCount);
209-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
229+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
230+
new String[] {String.format(expectedPattern, expectedRowCount)},
231+
new String[]{});
210232

211233
//"less_than_or_equal" such that value < startOffset
212234
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
213235
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset <= -1");
214236

215237
runKafkaSQLVerifyCount(queryString,expectedRowCount);
216-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
238+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
239+
new String[] {String.format(expectedPattern, expectedRowCount)},
240+
new String[]{});
217241
}
218242

219243
/**
@@ -230,21 +254,27 @@ public void testPushdownOffsetOneRecordReturnedWithBoundaryConditions() throws E
230254
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 9");
231255

232256
runKafkaSQLVerifyCount(queryString, expectedRowCount);
233-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
257+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
258+
new String[] {String.format(expectedPattern, expectedRowCount)},
259+
new String[]{});
234260

235261
//"greater_than" such that value = endOffset-2
236262
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
237263
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 8");
238264

239265
runKafkaSQLVerifyCount(queryString,expectedRowCount);
240-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
266+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
267+
new String[] {String.format(expectedPattern, expectedRowCount)},
268+
new String[]{});
241269

242270
//"greater_than_or_equal" such that value = endOffset-1
243271
queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
244272
TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 9");
245273

246274
runKafkaSQLVerifyCount(queryString,expectedRowCount);
247-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
275+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
276+
new String[] {String.format(expectedPattern, expectedRowCount)},
277+
new String[]{});
248278
}
249279

250280
/**
@@ -262,7 +292,9 @@ public void testPushdownWithOr() throws Exception {
262292
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
263293

264294
runKafkaSQLVerifyCount(queryString,expectedRowCount);
265-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
295+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
296+
new String[] {String.format(expectedPattern, expectedRowCount)},
297+
new String[]{});
266298
}
267299

268300
/**
@@ -280,7 +312,9 @@ public void testPushdownWithOr1() throws Exception {
280312
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
281313

282314
runKafkaSQLVerifyCount(queryString,expectedRowCount);
283-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowInPlan));
315+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
316+
new String[] {String.format(expectedPattern, expectedRowInPlan)},
317+
new String[]{});
284318
}
285319

286320
/**
@@ -299,7 +333,9 @@ public void testPushdownWithAndOrCombo() throws Exception {
299333
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3);
300334

301335
runKafkaSQLVerifyCount(queryString,expectedRowCount);
302-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
336+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
337+
new String[] {String.format(expectedPattern, expectedRowCount)},
338+
new String[]{});
303339
}
304340

305341
/**
@@ -319,7 +355,9 @@ public void testPushdownWithAndOrCombo2() throws Exception {
319355
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3, predicate4);
320356

321357
runKafkaSQLVerifyCount(queryString,expectedRowCount);
322-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan));
358+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
359+
new String[] {String.format(expectedPattern, expectedRowCountInPlan)},
360+
new String[]{});
323361
}
324362

325363
/**
@@ -338,7 +376,9 @@ public void testPushdownTimestampWithNonMetaField() throws Exception {
338376
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
339377

340378
runKafkaSQLVerifyCount(queryString,expectedRowCount);
341-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan));
379+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
380+
new String[] {String.format(expectedPattern, expectedRowCountInPlan)},
381+
new String[]{});
342382
}
343383

344384
/**
@@ -358,7 +398,9 @@ public void testNoPushdownOfOffsetWithNonMetadataField() throws Exception {
358398
TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3);
359399

360400
runKafkaSQLVerifyCount(queryString,expectedRowCount);
361-
testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan));
401+
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
402+
new String[] {String.format(expectedPattern, expectedRowCountInPlan)},
403+
new String[]{});
362404
}
363405

364406
}

exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,8 @@
1717
*/
1818
package org.apache.drill;
1919

20-
import java.nio.file.Paths;
21-
import static org.junit.Assert.assertFalse;
22-
import static org.junit.Assert.assertTrue;
23-
24-
import java.io.File;
25-
import java.io.IOException;
26-
import java.nio.file.Files;
27-
import java.util.List;
28-
import java.util.Stack;
29-
import java.util.regex.Matcher;
30-
import java.util.regex.Pattern;
31-
20+
import org.apache.calcite.sql.SqlExplain.Depth;
21+
import org.apache.calcite.sql.SqlExplainLevel;
3222
import org.apache.commons.io.FileUtils;
3323
import org.apache.drill.common.expression.SchemaPath;
3424
import org.apache.drill.exec.record.RecordBatchLoader;
@@ -37,13 +27,23 @@
3727
import org.apache.drill.exec.store.parquet.metadata.Metadata;
3828
import org.apache.drill.exec.vector.NullableVarCharVector;
3929
import org.apache.drill.exec.vector.ValueVector;
40-
import org.apache.calcite.sql.SqlExplain.Depth;
41-
import org.apache.calcite.sql.SqlExplainLevel;
42-
30+
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
4331
import org.apache.drill.shaded.guava.com.google.common.base.Strings;
4432
import org.apache.drill.test.BaseTestQuery;
4533
import org.apache.drill.test.QueryTestUtil;
4634

35+
import java.io.File;
36+
import java.io.IOException;
37+
import java.nio.file.Files;
38+
import java.nio.file.Paths;
39+
import java.util.List;
40+
import java.util.Stack;
41+
import java.util.regex.Matcher;
42+
import java.util.regex.Pattern;
43+
44+
import static org.junit.Assert.assertFalse;
45+
import static org.junit.Assert.assertTrue;
46+
4747
public class PlanTestBase extends BaseTestQuery {
4848
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlanTestBase.class);
4949

@@ -92,7 +92,23 @@ public static void testPlanMatchingPatterns(String query, String[] expectedPatte
9292

9393
public static void testPlanMatchingPatterns(String query, Pattern[] expectedPatterns, Pattern[] excludedPatterns)
9494
throws Exception {
95-
final String plan = getPlanInString("EXPLAIN PLAN for " + QueryTestUtil.normalizeQuery(query), OPTIQ_FORMAT);
95+
testPlanMatchingPatterns(query, OPTIQ_FORMAT, expectedPatterns, excludedPatterns);
96+
}
97+
98+
public static void testPlanMatchingPatterns(String query, String planFormat,
99+
String[] expectedPatterns, String[] excludedPatterns)
100+
throws Exception {
101+
Preconditions.checkArgument((planFormat.equals(OPTIQ_FORMAT) || planFormat.equals(JSON_FORMAT)), "Unsupported " +
102+
"plan format %s is provided for explain plan query. Supported formats are: %s, %s", planFormat, OPTIQ_FORMAT,
103+
JSON_FORMAT);
104+
testPlanMatchingPatterns(query, planFormat, stringsToPatterns(expectedPatterns),
105+
stringsToPatterns(excludedPatterns));
106+
}
107+
108+
private static void testPlanMatchingPatterns(String query, String planFormat,
109+
Pattern[] expectedPatterns, Pattern[] excludedPatterns)
110+
throws Exception {
111+
final String plan = getPlanInString("EXPLAIN PLAN for " + QueryTestUtil.normalizeQuery(query), planFormat);
96112

97113
// Check and make sure all expected patterns are in the plan
98114
if (expectedPatterns != null) {

0 commit comments

Comments
 (0)