Skip to content

Commit 1eddf6a

Browse files
committed
Address kill dag Race condition in TezExternalSessionState
1 parent 203e136 commit 1eddf6a

1 file changed

Lines changed: 35 additions & 0 deletions

File tree

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
package org.apache.hadoop.hive.ql.exec.tez;
2020

2121
import java.io.IOException;
22+
import java.util.concurrent.TimeUnit;
2223

2324
import org.apache.hadoop.conf.Configuration;
2425
import org.apache.hadoop.hive.conf.HiveConf;
26+
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
2527
import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
2628
import org.apache.hadoop.hive.ql.metadata.HiveException;
2729
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -35,6 +37,7 @@
3537
import org.apache.tez.dag.api.client.DAGClient;
3638
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
3739
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC;
40+
import org.apache.tez.dag.api.records.DAGProtos;
3841
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
3942

4043
/**
@@ -211,15 +214,47 @@ private void tryKillRunningDAGs(TezClient session) throws TezException {
211214
if (proxy == null) {
212215
throw new TezException("Error while trying to connect to AM for app ID " + externalAppId);
213216
}
217+
long killTimeoutMs = TimeUnit.SECONDS.toMillis(
218+
HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS));
214219
try {
215220
DAGClientAMProtocolRPC.GetAllDAGsResponseProto allDAGSResponse =
216221
proxy.getAllDAGs(null, DAGClientAMProtocolRPC.GetAllDAGsRequestProto.newBuilder().build());
217222
for (String dagId : allDAGSResponse.getDagIdList()) {
218223
LOG.info("External session: attempting to kill dagId {} on app ID {}", dagId, externalAppId);
219224
proxy.tryKillDAG(null, DAGClientAMProtocolRPC.TryKillDAGRequestProto.newBuilder().setDagId(dagId).build());
225+
waitForDagTerminal(proxy, dagId, killTimeoutMs);
220226
}
221227
} catch (Exception e) {
222228
throw new TezException("Error while trying to kill existing DAG running on app ID " + externalAppId, e);
223229
}
224230
}
231+
232+
private void waitForDagTerminal(DAGClientAMProtocolBlockingPB proxy, String dagId, long timeoutMs)
233+
throws Exception {
234+
long startTimeMs = System.currentTimeMillis();
235+
long pollIntervalMs = conf.getTimeVar(ConfVars.TEZ_DAG_STATUS_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
236+
while (System.currentTimeMillis() - startTimeMs < timeoutMs) {
237+
long remainingMs = timeoutMs - (System.currentTimeMillis() - startTimeMs);
238+
DAGClientAMProtocolRPC.GetDAGStatusResponseProto response = proxy.getDAGStatus(null,
239+
DAGClientAMProtocolRPC.GetDAGStatusRequestProto.newBuilder()
240+
.setDagId(dagId)
241+
.setTimeout(Math.min(pollIntervalMs, remainingMs))
242+
.build());
243+
if (response.hasDagStatus() && response.getDagStatus().hasState()
244+
&& isTerminalDagState(response.getDagStatus().getState())) {
245+
LOG.info("External session: dagId {} on app ID {} reached terminal state {}", dagId, externalAppId,
246+
response.getDagStatus().getState());
247+
return;
248+
}
249+
}
250+
throw new TezException("Timed out after " + timeoutMs + " ms waiting for orphan DAG " + dagId
251+
+ " on app ID " + externalAppId + " to reach terminal state after kill");
252+
}
253+
254+
private static boolean isTerminalDagState(DAGProtos.DAGStatusStateProto state) {
255+
return switch (state) {
256+
case DAG_SUCCEEDED, DAG_KILLED, DAG_FAILED, DAG_ERROR -> true;
257+
default -> false;
258+
};
259+
}
225260
}

0 commit comments

Comments
 (0)