Skip to content

Commit ef872f3

Browse files
authored
Fix concurrent modification of non-thread-safe data structures caused by parallel dispatching
1 parent 3b9513e commit ef872f3

2 files changed

Lines changed: 12 additions & 7 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,8 @@
3434

3535
import java.time.ZoneId;
3636
import java.util.HashSet;
37-
import java.util.LinkedList;
38-
import java.util.List;
3937
import java.util.Set;
38+
import java.util.concurrent.ConcurrentHashMap;
4039
import java.util.function.LongConsumer;
4140

4241
/**
@@ -66,7 +65,8 @@ public class MPPQueryContext {
6665
// When some DataNode cannot be connected, its endPoint will be put
6766
// in this list. And the following retry will avoid planning fragment
6867
// onto this node.
69-
private final List<TEndPoint> endPointBlackList;
68+
// When dispatch FI fails, this structure may be modified concurrently
69+
private final Set<TEndPoint> endPointBlackList;
7070

7171
private final TypeProvider typeProvider = new TypeProvider();
7272

@@ -97,7 +97,7 @@ public class MPPQueryContext {
9797

9898
public MPPQueryContext(QueryId queryId) {
9999
this.queryId = queryId;
100-
this.endPointBlackList = new LinkedList<>();
100+
this.endPointBlackList = ConcurrentHashMap.newKeySet();
101101
this.memoryReservationManager =
102102
new NotThreadSafeMemoryReservationManager(queryId, this.getClass().getName());
103103
}
@@ -222,7 +222,7 @@ public void addFailedEndPoint(TEndPoint endPoint) {
222222
this.endPointBlackList.add(endPoint);
223223
}
224224

225-
public List<TEndPoint> getEndPointBlackList() {
225+
public Set<TEndPoint> getEndPointBlackList() {
226226
return endPointBlackList;
227227
}
228228

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,17 @@ private Future<FragInstanceDispatchResult> topologicalParallelDispatchRead(
146146
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
147147
futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
148148
}
149+
FragInstanceDispatchResult failedResult = null;
149150
for (Future<FragInstanceDispatchResult> future : futures) {
151+
// Make sure all executing tasks are finished to avoid concurrency issues
150152
FragInstanceDispatchResult result = future.get();
151-
if (!result.isSuccessful()) {
152-
return immediateFuture(result);
153+
if (!result.isSuccessful() && failedResult == null) {
154+
failedResult = result;
153155
}
154156
}
157+
if (failedResult != null) {
158+
return immediateFuture(failedResult);
159+
}
155160
} catch (InterruptedException e) {
156161
Thread.currentThread().interrupt();
157162
LOGGER.warn("Interrupted when dispatching read async", e);

0 commit comments

Comments
 (0)