Skip to content

Commit 5805534

Browse files
committed
Fix NPE when retried query reuses stale FragmentInstanceContext
When QueryExecution.retry() re-plans a query, doDistributionPlan() creates fresh PlanFragmentId objects with nextFragmentInstanceId reset to 0. Because the queryId is unchanged, retry generates fragment instance IDs identical to the first execution (e.g. queryId_11.0). FragmentInstanceManager.instanceContext retains completed contexts for 5 minutes for statistics caching. When a retry dispatches the same FI ID, instanceContext.computeIfAbsent() returns the stale old context whose releaseResource() has already been called, setting dataRegion to null. New drivers then NPE at dataRegion.tryReadLock() inside FragmentInstanceContext.initQueryDataSource(). Fix: replace computeIfAbsent() with compute() in execDataQueryFragmentInstance() so that a released context (dataRegion == null) is atomically replaced with a fresh one carrying the new dataRegion reference. Defensive fix: add a null guard for dataRegion in getSharedQueryDataSource() that returns null (treated by DataDriver as an aborted FI) instead of propagating NPE.
1 parent 9d72f66 commit 5805534

2 files changed

Lines changed: 23 additions & 11 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,10 @@ public boolean initRegionScanQueryDataSource(List<IFullPath> pathList) {
781781

782782
public synchronized IQueryDataSource getSharedQueryDataSource() throws QueryProcessException {
783783
if (sharedQueryDataSource == null) {
784+
if (dataRegion == null) {
785+
// Context was released (releaseResource() already ran). Signal aborted to the driver.
786+
return null;
787+
}
784788
switch (queryDataSourceType) {
785789
case SERIES_SCAN:
786790
if (initQueryDataSource(sourcePaths)) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -151,19 +151,27 @@ public FragmentInstanceInfo execDataQueryFragmentInstance(
151151
DataNodeQueryContext dataNodeQueryContext =
152152
getOrCreateDataNodeQueryContext(instanceId.getQueryId(), dataNodeFINum);
153153

154+
// Use compute() instead of computeIfAbsent() to handle the retry scenario:
155+
// QueryExecution.retry() re-creates the distribution plan with PlanFragmentId
156+
// counters reset to 0, generating fragment instance IDs identical to the first
157+
// execution. instanceContext retains released contexts (dataRegion == null) for
158+
// statistics caching. Without this check, a retried FI reuses the stale context
159+
// and NPEs at dataRegion.tryReadLock().
154160
FragmentInstanceContext context =
155-
instanceContext.computeIfAbsent(
161+
instanceContext.compute(
156162
instanceId,
157-
fragmentInstanceId ->
158-
createFragmentInstanceContext(
159-
fragmentInstanceId,
160-
stateMachine,
161-
instance.getSessionInfo(),
162-
dataRegion,
163-
instance.getGlobalTimePredicate(),
164-
dataNodeQueryContextMap,
165-
instance.isDebug(),
166-
instance.isVerbose()));
163+
(fiId, existingContext) ->
164+
(existingContext == null || existingContext.getDataRegion() == null)
165+
? createFragmentInstanceContext(
166+
fiId,
167+
stateMachine,
168+
instance.getSessionInfo(),
169+
dataRegion,
170+
instance.getGlobalTimePredicate(),
171+
dataNodeQueryContextMap,
172+
instance.isDebug(),
173+
instance.isVerbose())
174+
: existingContext);
167175
context.setHighestPriority(instance.isHighestPriority());
168176

169177
try {

0 commit comments

Comments
 (0)