Skip to content

Commit 7e71cd0

Browse files
committed
DRILL-7193: Integration changes of the Distributed RM queue configuration with Simple Parallelizer.
Refactor existing ZK based queue to accommodate new Distributed queue for RM. Refactor and rename the existing memory allocation utilities to ZKQueueMemoryAllocationUtilities and DefaultMemoryAllocationUtilities. Parallelizer code is changed to accommodate the memory adjustment for the operators during parallelization phase. With this change, there are 3 different implementation of SimpleParallelizer; they are ZKQueueParallelizer, DistributedQueueParallelizer and DefaultParallelizer which will be used by ZK based RM, Distributed RM and Non RM configuration.
1 parent 097122e commit 7e71cd0

21 files changed

Lines changed: 522 additions & 404 deletions

exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DefaultQueryParallelizer.java renamed to exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DefaultParallelizer.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.drill.exec.ops.QueryContext;
2121
import org.apache.drill.exec.physical.base.PhysicalOperator;
2222
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
23-
import org.apache.drill.exec.util.MemoryAllocationUtilities;
23+
import org.apache.drill.exec.util.memory.DefaultMemoryAllocationUtilities;
2424

2525
import java.util.Collection;
2626
import java.util.List;
@@ -32,18 +32,18 @@
3232
* The memory computation of the operators is based on the earlier logic to assign memory for the buffered
3333
* operators.
3434
*/
35-
public class DefaultQueryParallelizer extends SimpleParallelizer {
35+
public class DefaultParallelizer extends SimpleParallelizer {
3636
private final boolean planHasMemory;
3737
private final QueryContext queryContext;
3838

39-
public DefaultQueryParallelizer(boolean memoryAvailableInPlan, QueryContext queryContext) {
39+
public DefaultParallelizer(boolean memoryAvailableInPlan, QueryContext queryContext) {
4040
super(queryContext);
4141
this.planHasMemory = memoryAvailableInPlan;
4242
this.queryContext = queryContext;
4343
}
4444

45-
public DefaultQueryParallelizer(boolean memoryPlanning, long parallelizationThreshold, int maxWidthPerNode,
46-
int maxGlobalWidth, double affinityFactor) {
45+
public DefaultParallelizer(boolean memoryPlanning, long parallelizationThreshold, int maxWidthPerNode,
46+
int maxGlobalWidth, double affinityFactor) {
4747
super(parallelizationThreshold, maxWidthPerNode, maxGlobalWidth, affinityFactor);
4848
this.planHasMemory = memoryPlanning;
4949
this.queryContext = null;
@@ -56,7 +56,7 @@ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
5656
return;
5757
}
5858
List<PhysicalOperator> bufferedOpers = planningSet.getRootWrapper().getNode().getBufferedOperators(queryContext);
59-
MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(planHasMemory, bufferedOpers, queryContext);
59+
DefaultMemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(planHasMemory, bufferedOpers, queryContext);
6060
}
6161

6262
@Override

exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java renamed to exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DistributedQueueParallelizer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@
4040
* cluster state. However, the memory assignment for each operator, minor fragment and major
4141
* fragment is based on the cluster state and provided queue configuration.
4242
*/
43-
public class QueueQueryParallelizer extends SimpleParallelizer {
43+
public class DistributedQueueParallelizer extends SimpleParallelizer {
4444
private final boolean planHasMemory;
4545
private final QueryContext queryContext;
4646
private final Map<DrillbitEndpoint, Map<PhysicalOperator, Long>> operators;
4747

48-
public QueueQueryParallelizer(boolean memoryPlanning, QueryContext queryContext) {
48+
public DistributedQueueParallelizer(boolean memoryPlanning, QueryContext queryContext) {
4949
super(queryContext);
5050
this.planHasMemory = memoryPlanning;
5151
this.queryContext = queryContext;

exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
2626
import org.apache.drill.exec.physical.base.Exchange;
2727
import org.apache.drill.exec.physical.base.PhysicalOperator;
28+
import org.apache.drill.exec.util.memory.ZKQueueMemoryAllocationUtilities;
2829
import org.apache.drill.exec.work.foreman.ForemanSetupException;
2930

3031
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.drill.exec.planner.fragment;
19+
20+
import org.apache.drill.common.util.function.CheckedConsumer;
21+
import org.apache.drill.exec.ops.QueryContext;
22+
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
23+
import org.apache.drill.exec.physical.base.Exchange;
24+
import org.apache.drill.exec.physical.base.PhysicalOperator;
25+
import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
26+
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
27+
import org.apache.drill.exec.util.memory.ZKQueueMemoryAllocationUtilities;
28+
import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
29+
import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
30+
import org.apache.drill.shaded.guava.com.google.common.collect.Multimap;
31+
import java.util.Collection;
32+
import java.util.Map;
33+
import java.util.HashMap;
34+
import java.util.Set;
35+
import java.util.function.BiFunction;
36+
37+
public class ZKQueueParallelizer extends SimpleParallelizer {
38+
39+
private final boolean planHasMemory;
40+
private final QueryContext queryContext;
41+
private Map<String, Collection<PhysicalOperator>> endpointMap;
42+
private final QueryResourceManager resourceManager;
43+
44+
public ZKQueueParallelizer(boolean memoryAvailableInPlan, QueryResourceManager rm, QueryContext queryContext) {
45+
super(queryContext);
46+
this.planHasMemory = memoryAvailableInPlan;
47+
this.queryContext = queryContext;
48+
this.resourceManager = rm;
49+
}
50+
51+
@Override
52+
public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
53+
Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
54+
if (planHasMemory) {
55+
return;
56+
}
57+
58+
Collector collector = new Collector();
59+
60+
for (Wrapper wrapper : roots) {
61+
traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragment) -> {
62+
fragment.getNode().getRoot().accept(collector, fragment);
63+
}));
64+
}
65+
66+
endpointMap = collector.getNodeMap();
67+
68+
ZKQueueMemoryAllocationUtilities.planMemory(queryContext, this.resourceManager, endpointMap);
69+
}
70+
71+
72+
public class Collector extends AbstractOpWrapperVisitor<Void, RuntimeException> {
73+
private final Multimap<DrillbitEndpoint, PhysicalOperator> bufferedOperators;
74+
75+
public Collector() {
76+
this.bufferedOperators = ArrayListMultimap.create();
77+
}
78+
79+
private void getMinorFragCountPerDrillbit(Wrapper currFragment, PhysicalOperator operator) {
80+
for (DrillbitEndpoint endpoint : currFragment.getAssignedEndpoints()) {
81+
bufferedOperators.put(endpoint, operator);
82+
}
83+
}
84+
85+
@Override
86+
public Void visitSendingExchange(Exchange exchange, Wrapper fragment) throws RuntimeException {
87+
return visitOp(exchange, fragment);
88+
}
89+
90+
@Override
91+
public Void visitReceivingExchange(Exchange exchange, Wrapper fragment) throws RuntimeException {
92+
return null;
93+
}
94+
95+
@Override
96+
public Void visitOp(PhysicalOperator op, Wrapper fragment) {
97+
if (op.isBufferedOperator(queryContext)) {
98+
getMinorFragCountPerDrillbit(fragment, op);
99+
}
100+
for (PhysicalOperator child : op) {
101+
child.accept(this, fragment);
102+
}
103+
return null;
104+
}
105+
106+
public Map<String, Collection<PhysicalOperator>> getNodeMap() {
107+
Map<DrillbitEndpoint, Collection<PhysicalOperator>> endpointCollectionMap = bufferedOperators.asMap();
108+
Map<String, Collection<PhysicalOperator>> nodeMap = new HashMap<>();
109+
for (Map.Entry<DrillbitEndpoint, Collection<PhysicalOperator>> entry : endpointCollectionMap.entrySet()) {
110+
nodeMap.put(entry.getKey().getAddress(), entry.getValue());
111+
}
112+
113+
return nodeMap;
114+
}
115+
}
116+
117+
@Override
118+
protected BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
119+
return (endpoint, operator) -> operator.getMaxAllocation();
120+
}
121+
}

exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.drill.exec.physical.base.FragmentRoot;
3030
import org.apache.drill.exec.physical.base.PhysicalOperator;
3131
import org.apache.drill.exec.planner.PhysicalPlanReader;
32-
import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer;
32+
import org.apache.drill.exec.planner.fragment.DefaultParallelizer;
3333
import org.apache.drill.exec.planner.fragment.Fragment;
3434
import org.apache.drill.exec.planner.fragment.PlanningSet;
3535
import org.apache.drill.exec.planner.fragment.Wrapper;
@@ -57,7 +57,7 @@
5757
* allows not to pollute parent class with non-authentic functionality
5858
*
5959
*/
60-
public class SplittingParallelizer extends DefaultQueryParallelizer {
60+
public class SplittingParallelizer extends DefaultParallelizer {
6161

6262
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SplittingParallelizer.class);
6363

exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java renamed to exec/java-exec/src/main/java/org/apache/drill/exec/util/memory/DefaultMemoryAllocationUtilities.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package org.apache.drill.exec.util;
18+
package org.apache.drill.exec.util.memory;
1919

2020
import java.util.ArrayList;
2121
import java.util.List;
@@ -31,9 +31,9 @@
3131

3232
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
3333

34-
public class MemoryAllocationUtilities {
34+
public class DefaultMemoryAllocationUtilities {
3535

36-
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryAllocationUtilities.class);
36+
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultMemoryAllocationUtilities.class);
3737

3838

3939
public static void setupBufferedMemoryAllocations(PhysicalPlan plan, final QueryContext queryContext) {

0 commit comments

Comments
 (0)