Skip to content

Commit 9ee9997

Browse files
authored
Merge pull request #50 from 0aix/parallel
Add option to run queries in parallel
2 parents 2a6cea0 + ab2f92e commit 9ee9997

5 files changed

Lines changed: 110 additions & 6 deletions

File tree

README.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,12 @@ You may want queries, asserts or query metadata to use a specific date column, o
183183

184184
Simply pass `--parameter KEY=VALUE` in the CLI and the `KEY` will be replaced with `VALUE` in all queries, query metadata and test assertions. For example, to query June 23rd 2015, you could use `--parameter DATE=2015-06-23`. If the query uses `${DATE}` in the query it will be replaced before execution with `2015-06-23`.
185185

186+
### Query Parallelism
187+
188+
You may want to run queries in parallel rather than sequentially especially if you have many time-consuming queries.
189+
190+
To enable this feature, pass in `--query-parallel-enable true` when launching Validatar. By default, this will run all queries in parallel. If this number needs to be limited, pass in `--query-parallel-max VALUE` where `VALUE` is the max number of queries that should run concurrently.
191+
186192
## Execution Engines
187193

188194
### Hive
@@ -330,6 +336,17 @@ Option Description
330336
fully qualified classes to plug in>
331337
332338
339+
Engine Options:
340+
Option Description
341+
------ -----------
342+
--query-parallel-enable <Boolean: Whether or not queries should run in
343+
Query parallelism option> parallel. (default: false)
344+
--query-parallel-max <Integer: Max The max number of queries that will
345+
query parallelism> run concurrently. If non-positive or
346+
unspecified, all queries will run at
347+
once. (default: 0)
348+
349+
333350
Hive engine options:
334351
Option (* = required) Description
335352
--------------------- -----------
@@ -488,10 +505,11 @@ Version | Notes
488505
0.5.4 | Added a flag ```--report-on-failure-only``` to only generate reports if there were failures in tests (including warnOnly) or queries
489506
0.5.5 | Shaded ```org.objectweb.asm``` to not clash with asm in Hadoop environments
490507
0.5.6 | Fixed a bug with pretty-printing results with nulls
491-
0.6.0 | Better reporting (show data with only the assertion columns with the assertion result column, columns now in sorted order, EMail Subject Prefix). Can now write multiple reports per invocation (specify more than one report formatter using --report-format)
508+
0.6.0 | Better reporting (show data with only the assertion columns with the assertion result column, columns now in sorted order, Email Subject Prefix). Can now write multiple reports per invocation (specify more than one report formatter using --report-format)
492509
0.6.1 | Added a flag to configure the Email reporter SMTP strategy. Use ```--email-smtp-strategy``` to pass in ```SMTP_PLAIN```, ```SMTP_TLS``` or ```SMTP_SSL```.
493510
0.6.2 | Bintray EOL. First rerelease of 0.6.1 on Maven Central instead
494511
0.6.3 | Screwdriver migration. First rerelease of 0.6.1 using Screwdriver instead of Travis.
512+
0.6.4 | Added support for running queries in parallel
495513

496514
## Members
497515

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@
401401
<dependency>
402402
<groupId>org.apache.httpcomponents</groupId>
403403
<artifactId>httpclient</artifactId>
404-
<version>4.5.1</version>
404+
<version>4.5.13</version>
405405
</dependency>
406406
<dependency>
407407
<groupId>com.google.guava</groupId>

src/main/java/com/yahoo/validatar/execution/EngineManager.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@
1111
import com.yahoo.validatar.execution.hive.Apiary;
1212
import com.yahoo.validatar.execution.pig.Sty;
1313
import com.yahoo.validatar.execution.rest.JSON;
14+
import joptsimple.OptionParser;
15+
import joptsimple.OptionSet;
1416
import lombok.extern.slf4j.Slf4j;
1517

1618
import java.util.Arrays;
1719
import java.util.Collections;
1820
import java.util.HashMap;
1921
import java.util.List;
2022
import java.util.Map;
23+
import java.util.concurrent.ForkJoinPool;
2124
import java.util.stream.Collectors;
2225

2326
/**
@@ -27,6 +30,29 @@
2730
public class EngineManager extends Pluggable<Engine> implements Helpable {
2831
public static final String CUSTOM_ENGINE = "custom-engine";
2932
public static final String CUSTOM_ENGINE_DESCRIPTION = "Additional custom engine to load.";
33+
public static final String QUERY_PARALLEL_ENABLE = "query-parallel-enable";
34+
public static final String QUERY_PARALLEL_MAX = "query-parallel-max";
35+
private static final int QUERY_PARALLEL_MIN = 1;
36+
37+
protected boolean queryParallelEnable;
38+
protected int queryParallelMax;
39+
40+
private static final OptionParser PARSER = new OptionParser() {
41+
{
42+
accepts(QUERY_PARALLEL_ENABLE, "Whether or not queries should run in parallel.")
43+
.withRequiredArg()
44+
.describedAs("Query parallelism option")
45+
.ofType(Boolean.class)
46+
.defaultsTo(false);
47+
accepts(QUERY_PARALLEL_MAX, "The max number of queries that will run concurrently. If non-positive or " +
48+
"unspecified, all queries will run at once.")
49+
.withRequiredArg()
50+
.describedAs("Max query parallelism")
51+
.ofType(Integer.class)
52+
.defaultsTo(0);
53+
allowsUnrecognizedOptions();
54+
}
55+
};
3056

3157
/**
3258
* The Engine classes to manage.
@@ -85,6 +111,10 @@ public EngineManager(String[] arguments) {
85111
engines.put(engine.getName(), new WorkingEngine(engine));
86112
log.info("Added engine {} to list of engines.", engine.getName());
87113
}
114+
115+
OptionSet parser = PARSER.parse(arguments);
116+
queryParallelEnable = (Boolean) parser.valueOf(QUERY_PARALLEL_ENABLE);
117+
queryParallelMax = (Integer) parser.valueOf(QUERY_PARALLEL_MAX);
88118
}
89119

90120
/**
@@ -107,8 +137,7 @@ protected boolean startEngines(List<Query> queries) {
107137
List<Query> all = queries == null ? Collections.emptyList() : queries;
108138
// Queries -> engine name Set -> start engine -> verify all started
109139
return all.stream().map(q -> q.engine).collect(Collectors.toSet())
110-
.stream().map(this::startEngine)
111-
.allMatch(b -> b);
140+
.stream().allMatch(this::startEngine);
112141
}
113142

114143
private boolean startEngine(String engine) {
@@ -132,6 +161,7 @@ private boolean startEngine(String engine) {
132161

133162
@Override
134163
public void printHelp() {
164+
Helpable.printHelp("Engine Options", PARSER);
135165
engines.values().stream().map(WorkingEngine::getEngine).forEach(Engine::printHelp);
136166
Helpable.printHelp("Advanced Engine Options", getPluginOptionsParser());
137167
}
@@ -155,7 +185,19 @@ public boolean run(List<Query> queries) {
155185
return false;
156186
}
157187
// Run each query.
158-
queries.stream().forEach(this::run);
188+
if (!queryParallelEnable) {
189+
queries.forEach(this::run);
190+
} else {
191+
int poolSize = Math.max(queryParallelMax > 0 ? queryParallelMax : queries.size(), QUERY_PARALLEL_MIN);
192+
log.info("Creating a ForkJoinPool with size {}", poolSize);
193+
ForkJoinPool forkJoinPool = new ForkJoinPool(poolSize);
194+
try {
195+
forkJoinPool.submit(() -> queries.parallelStream().forEach(this::run)).get();
196+
} catch (Exception e) {
197+
log.error("Caught exception", e);
198+
}
199+
forkJoinPool.shutdown();
200+
}
159201
return true;
160202
}
161203
}

src/main/java/com/yahoo/validatar/execution/rest/JSON.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.List;
3636
import java.util.Map;
3737
import java.util.Objects;
38+
import java.util.concurrent.TimeUnit;
3839

3940
import static java.util.Arrays.asList;
4041

@@ -283,9 +284,11 @@ private HttpClient createClient(Map<String, String> metadata) {
283284
int retries = Integer.valueOf(metadata.getOrDefault(METADATA_RETRY_KEY, String.valueOf(defaultRetries)));
284285
RequestConfig config = RequestConfig.custom().setConnectTimeout(timeout)
285286
.setConnectionRequestTimeout(timeout)
286-
.setSocketTimeout(timeout).build();
287+
.setSocketTimeout(timeout)
288+
.build();
287289
return HttpClientBuilder.create()
288290
.setDefaultRequestConfig(config)
291+
.setConnectionTimeToLive(timeout, TimeUnit.MILLISECONDS)
289292
.setRetryHandler(new DefaultHttpRequestRetryHandler(retries, false))
290293
.build();
291294
}

src/test/java/com/yahoo/validatar/execution/EngineManagerTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,4 +255,45 @@ public void testNormalRun() {
255255
Assert.assertEquals((String) actual.get("Foo.b").get(0).data, (String) expected.get("Foo.b").get(0).data);
256256
Assert.assertEquals((String) actual.get("Foo.b").get(1).data, (String) expected.get("Foo.b").get(1).data);
257257
}
258+
259+
@Test
260+
public void testParallelRun() {
261+
query.engine = MockRunningEngine.ENGINE_NAME;
262+
query.name = "Foo";
263+
264+
String[] args = {"--query-parallel-enable", "true"};
265+
manager = new EngineManager(args);
266+
manager.setEngines(engines);
267+
268+
Assert.assertTrue(manager.run(queries));
269+
270+
Map<String, List<TypedObject>> expected = new HashMap<>();
271+
List<TypedObject> columns = new ArrayList<>();
272+
columns.add(new TypedObject("42", TypeSystem.Type.STRING));
273+
expected.put("Foo.a", columns);
274+
columns = new ArrayList<>();
275+
columns.add(new TypedObject("42", TypeSystem.Type.STRING));
276+
columns.add(new TypedObject("52", TypeSystem.Type.STRING));
277+
expected.put("Foo.b", columns);
278+
279+
Map<String, Column> actual = query.getResult().getColumns();
280+
281+
Assert.assertEquals(actual.size(), 2);
282+
Assert.assertEquals(expected.size(), 2);
283+
Assert.assertEquals(actual.get("Foo.a").size(), 1);
284+
Assert.assertEquals(expected.get("Foo.a").size(), 1);
285+
Assert.assertEquals((String) actual.get("Foo.a").get(0).data, (String) expected.get("Foo.a").get(0).data);
286+
Assert.assertEquals(actual.get("Foo.b").size(), 2);
287+
Assert.assertEquals(expected.get("Foo.b").size(), 2);
288+
Assert.assertEquals((String) actual.get("Foo.b").get(0).data, (String) expected.get("Foo.b").get(0).data);
289+
Assert.assertEquals((String) actual.get("Foo.b").get(1).data, (String) expected.get("Foo.b").get(1).data);
290+
}
291+
292+
@Test
293+
public void testConstructor() {
294+
String[] args = {"--query-parallel-enable", "true", "--query-parallel-max", "10"};
295+
manager = new EngineManager(args);
296+
Assert.assertTrue(manager.queryParallelEnable);
297+
Assert.assertEquals(manager.queryParallelMax, 10);
298+
}
258299
}

0 commit comments

Comments
 (0)