Skip to content

Commit e8f219b

Browse files
committed
Logic to kill orphan DAGs left behind by crashed HS2
1 parent 4a5ec5d commit e8f219b

5 files changed

Lines changed: 53 additions & 1 deletion

File tree

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@
2929
import org.apache.hadoop.security.Credentials;
3030
import org.apache.hadoop.yarn.api.records.ApplicationId;
3131
import org.apache.tez.client.TezClient;
32+
import org.apache.tez.dag.api.DAG;
3233
import org.apache.tez.dag.api.TezConfiguration;
3334
import org.apache.tez.dag.api.TezException;
35+
import org.apache.tez.dag.api.client.DAGClient;
36+
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
37+
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC;
3438
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
3539

3640
/**
@@ -187,4 +191,35 @@ public boolean killQuery(String reason) throws HiveException {
187191
killQuery.killQuery(queryId, reason, conf, false);
188192
return true;
189193
}
194+
195+
@Override
196+
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
197+
try {
198+
return getTezClient().submitDAG(dag);
199+
} catch (TezException e) {
200+
if (e.getMessage() == null || !e.getMessage().contains("App master already running a DAG")) {
201+
throw e;
202+
}
203+
tryKillRunningDAGs(getTezClient());
204+
return getTezClient().submitDAG(dag);
205+
}
206+
}
207+
208+
private void tryKillRunningDAGs(TezClient session) throws TezException {
209+
LOG.info("External session has an AM which is already running a DAG on app ID {}", externalAppId);
210+
DAGClientAMProtocolBlockingPB proxy = session.sendAMHeartbeat(null);
211+
if (proxy == null) {
212+
throw new TezException("Error while trying to connect to AM for app ID " + externalAppId);
213+
}
214+
try {
215+
DAGClientAMProtocolRPC.GetAllDAGsResponseProto allDAGSResponse =
216+
proxy.getAllDAGs(null, DAGClientAMProtocolRPC.GetAllDAGsRequestProto.newBuilder().build());
217+
for (String dagId : allDAGSResponse.getDagIdList()) {
218+
LOG.info("External session: attempting to kill dagId {} on app ID {}", dagId, externalAppId);
219+
proxy.tryKillDAG(null, DAGClientAMProtocolRPC.TryKillDAGRequestProto.newBuilder().setDagId(dagId).build());
220+
}
221+
} catch (Exception e) {
222+
throw new TezException("Error while trying to kill existing DAG running on app ID " + externalAppId, e);
223+
}
224+
}
190225
}

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import org.apache.hadoop.hive.ql.wm.WmContext;
3535
import org.apache.hadoop.yarn.api.records.LocalResource;
3636
import org.apache.tez.client.TezClient;
37+
import org.apache.tez.dag.api.DAG;
3738
import org.apache.tez.dag.api.TezException;
39+
import org.apache.tez.dag.api.client.DAGClient;
3840
import org.apache.tez.dag.api.client.DAGStatus;
3941

4042
/**
@@ -86,6 +88,7 @@ public String toString() {
8688

8789
HiveConf getConf();
8890
TezClient getTezClient();
91+
DAGClient submitDAG(DAG dag) throws TezException, IOException;
8992
boolean isOpen();
9093
boolean isOpening();
9194
boolean getDoAsEnabled();

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
3535
import org.apache.hadoop.yarn.api.records.LocalResource;
3636
import org.apache.tez.client.TezClient;
37+
import org.apache.tez.dag.api.DAG;
3738
import org.apache.tez.dag.api.TezException;
39+
import org.apache.tez.dag.api.client.DAGClient;
3840
import org.apache.tez.dag.api.client.DAGStatus;
3941
import org.slf4j.Logger;
4042
import org.slf4j.LoggerFactory;
@@ -337,6 +339,11 @@ public TezClient getTezClient() {
337339
return baseSession.getTezClient();
338340
}
339341

342+
@Override
343+
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
344+
return baseSession.submitDAG(dag);
345+
}
346+
340347
@Override
341348
public boolean isOpening() {
342349
return baseSession.isOpening();

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,13 @@
7676
import org.apache.hadoop.yarn.conf.YarnConfiguration;
7777
import org.apache.tez.client.TezClient;
7878
import org.apache.tez.common.TezUtils;
79+
import org.apache.tez.dag.api.DAG;
7980
import org.apache.tez.dag.api.PreWarmVertex;
8081
import org.apache.tez.dag.api.SessionNotRunning;
8182
import org.apache.tez.dag.api.TezConfiguration;
8283
import org.apache.tez.dag.api.TezException;
8384
import org.apache.tez.dag.api.UserPayload;
85+
import org.apache.tez.dag.api.client.DAGClient;
8486
import org.apache.tez.dag.api.client.DAGStatus;
8587
import org.apache.tez.dag.api.client.Progress;
8688
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
@@ -820,6 +822,11 @@ public TezClient getTezClient() {
820822
return session;
821823
}
822824

825+
@Override
826+
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
827+
return getTezClient().submitDAG(dag);
828+
}
829+
823830
@Override
824831
public LocalResource getAppJarLr() {
825832
return appJarLr;

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,7 @@ DAGClient submit(DAG dag, Ref<TezSession> sessionStateRef) throws Exception {
692692

693693
private DAGClient submitInternal(DAG dag, TezSession sessionState) throws TezException, IOException {
694694
runtimeContext.init(sessionState);
695-
return sessionState.getTezClient().submitDAG(dag);
695+
return sessionState.submitDAG(dag);
696696
}
697697

698698
private void sessionDestroyOrReturnToPool(Ref<TezSession> sessionStateRef,

0 commit comments

Comments
 (0)