Skip to content

Commit ccc7035

Browse files
HIVE-29651: Update ZookeeperExternalSessionsRegistryClient to handle multiple HiveServer2 instances submitting DAGs concurrently to available Tez External Sessions (apache#6528)
1 parent 4c1b518 commit ccc7035

11 files changed

Lines changed: 461 additions & 44 deletions

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,11 @@ public interface ExternalSessionsRegistry {
4040
* Closes the external session registry.
4141
*/
4242
void close();
43+
44+
/**
45+
* Returns true if this registry instance currently holds a claim on the given AM.
46+
*/
47+
default boolean isClaimed(String appId) {
48+
return true; // Non-ZK registries case is always true
49+
}
4350
}

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,26 @@
1919
package org.apache.hadoop.hive.ql.exec.tez;
2020

2121
import java.io.IOException;
22+
import java.util.concurrent.TimeUnit;
2223

24+
import com.google.protobuf.ServiceException;
2325
import org.apache.hadoop.conf.Configuration;
2426
import org.apache.hadoop.hive.conf.HiveConf;
27+
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
2528
import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
2629
import org.apache.hadoop.hive.ql.metadata.HiveException;
30+
import org.apache.hadoop.hive.ql.session.SessionState;
2731
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
2832
import org.apache.hadoop.security.Credentials;
2933
import org.apache.hadoop.yarn.api.records.ApplicationId;
3034
import org.apache.tez.client.TezClient;
35+
import org.apache.tez.dag.api.DAG;
3136
import org.apache.tez.dag.api.TezConfiguration;
3237
import org.apache.tez.dag.api.TezException;
38+
import org.apache.tez.dag.api.client.DAGClient;
39+
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
40+
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC;
41+
import org.apache.tez.dag.api.records.DAGProtos;
3342
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
3443

3544
/**
@@ -139,6 +148,11 @@ public void close(boolean keepDagFilesDir) throws Exception {
139148
// We never close external sessions that don't have errors.
140149
try {
141150
if (externalAppId != null) {
151+
LOG.debug("Returning external session with appID: {}", externalAppId);
152+
SessionState sessionState = SessionState.get();
153+
if (sessionState != null) {
154+
sessionState.setTezSession(null);
155+
}
142156
registry.returnSession(externalAppId);
143157
}
144158
} catch (Exception e) {
@@ -187,4 +201,73 @@ public boolean killQuery(String reason) throws HiveException {
187201
killQuery.killQuery(queryId, reason, conf, false);
188202
return true;
189203
}
204+
205+
@Override
206+
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
207+
if (!registry.isClaimed(externalAppId)) {
208+
throw new TezException("Cannot submit DAG as the Tez Session no-longer owns the AM: " + externalAppId);
209+
}
210+
try {
211+
return getTezClient().submitDAG(dag);
212+
} catch (TezException e) {
213+
if (e.getMessage() == null || !e.getMessage().contains("App master already running a DAG")) {
214+
throw e;
215+
}
216+
tryKillRunningDAGs(getTezClient());
217+
return getTezClient().submitDAG(dag);
218+
}
219+
}
220+
221+
private void tryKillRunningDAGs(TezClient session) throws TezException {
222+
if (!registry.isClaimed(externalAppId)) {
223+
throw new TezException("Cannot kill running DAG as the Tez Session no-longer owns the AM: " + externalAppId);
224+
}
225+
LOG.info("External session has an AM which is already running a DAG on app ID {}", externalAppId);
226+
DAGClientAMProtocolBlockingPB proxy = session.sendAMHeartbeat(null);
227+
if (proxy == null) {
228+
throw new TezException("Error while trying to connect to AM for app ID " + externalAppId);
229+
}
230+
long killTimeoutMs = TimeUnit.SECONDS.toMillis(
231+
HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS));
232+
try {
233+
DAGClientAMProtocolRPC.GetAllDAGsResponseProto allDAGSResponse =
234+
proxy.getAllDAGs(null, DAGClientAMProtocolRPC.GetAllDAGsRequestProto.newBuilder().build());
235+
for (String dagId : allDAGSResponse.getDagIdList()) {
236+
LOG.info("External session: attempting to kill dagId {} on app ID {}", dagId, externalAppId);
237+
proxy.tryKillDAG(null, DAGClientAMProtocolRPC.TryKillDAGRequestProto.newBuilder().setDagId(dagId).build());
238+
waitForDagTerminal(proxy, dagId, killTimeoutMs);
239+
}
240+
} catch (Exception e) {
241+
throw new TezException("Error while trying to kill existing DAG running on app ID " + externalAppId, e);
242+
}
243+
}
244+
245+
private void waitForDagTerminal(DAGClientAMProtocolBlockingPB proxy, String dagId, long timeoutMs)
246+
throws TezException, ServiceException {
247+
long startTimeMs = System.currentTimeMillis();
248+
long pollIntervalMs = conf.getTimeVar(ConfVars.TEZ_DAG_STATUS_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
249+
while (System.currentTimeMillis() - startTimeMs < timeoutMs) {
250+
long remainingMs = timeoutMs - (System.currentTimeMillis() - startTimeMs);
251+
DAGClientAMProtocolRPC.GetDAGStatusResponseProto response = proxy.getDAGStatus(null,
252+
DAGClientAMProtocolRPC.GetDAGStatusRequestProto.newBuilder()
253+
.setDagId(dagId)
254+
.setTimeout(Math.min(pollIntervalMs, remainingMs))
255+
.build());
256+
if (response.hasDagStatus() && response.getDagStatus().hasState()
257+
&& isTerminalDagState(response.getDagStatus().getState())) {
258+
LOG.info("External session: dagId {} on app ID {} reached terminal state {}", dagId, externalAppId,
259+
response.getDagStatus().getState());
260+
return;
261+
}
262+
}
263+
throw new TezException("Timed out after " + timeoutMs + " ms waiting for orphan DAG " + dagId
264+
+ " on app ID " + externalAppId + " to reach terminal state after kill");
265+
}
266+
267+
private static boolean isTerminalDagState(DAGProtos.DAGStatusStateProto state) {
268+
return switch (state) {
269+
case DAG_SUCCEEDED, DAG_KILLED, DAG_FAILED, DAG_ERROR -> true;
270+
default -> false;
271+
};
272+
}
190273
}

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import org.apache.hadoop.hive.ql.wm.WmContext;
3535
import org.apache.hadoop.yarn.api.records.LocalResource;
3636
import org.apache.tez.client.TezClient;
37+
import org.apache.tez.dag.api.DAG;
3738
import org.apache.tez.dag.api.TezException;
39+
import org.apache.tez.dag.api.client.DAGClient;
3840
import org.apache.tez.dag.api.client.DAGStatus;
3941

4042
/**
@@ -86,6 +88,7 @@ public String toString() {
8688

8789
HiveConf getConf();
8890
TezClient getTezClient();
91+
DAGClient submitDAG(DAG dag) throws TezException, IOException;
8992
boolean isOpen();
9093
boolean isOpening();
9194
boolean getDoAsEnabled();

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,20 @@ void returnSession(TezSession tezSessionState) {
353353
+ " belongs to the pool. Put it back in");
354354
defaultSessionPool.returnSession((TezSessionPoolSession)tezSessionState);
355355
}
356+
357+
if (useExternalSessions && !tezSessionState.isDefault()) {
358+
if (tezSessionState.getTezClient() != null
359+
&& tezSessionState.getTezClient().getAppMasterApplicationId() != null) {
360+
try {
361+
tezSessionState.close(false);
362+
} catch (Exception ex) {
363+
LOG.warn("Failed to return external Tez session {}", tezSessionState.getSessionId(), ex);
364+
}
365+
} else {
366+
LOG.warn("Not returning session '{}' as tez client or app id is null", tezSessionState.getSessionId());
367+
}
368+
}
369+
356370
// non default session nothing changes. The user can continue to use the existing
357371
// session in the SessionState
358372
} finally {

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
3535
import org.apache.hadoop.yarn.api.records.LocalResource;
3636
import org.apache.tez.client.TezClient;
37+
import org.apache.tez.dag.api.DAG;
3738
import org.apache.tez.dag.api.TezException;
39+
import org.apache.tez.dag.api.client.DAGClient;
3840
import org.apache.tez.dag.api.client.DAGStatus;
3941
import org.slf4j.Logger;
4042
import org.slf4j.LoggerFactory;
@@ -337,6 +339,11 @@ public TezClient getTezClient() {
337339
return baseSession.getTezClient();
338340
}
339341

342+
@Override
343+
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
344+
return baseSession.submitDAG(dag);
345+
}
346+
340347
@Override
341348
public boolean isOpening() {
342349
return baseSession.isOpening();

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,13 @@
7676
import org.apache.hadoop.yarn.conf.YarnConfiguration;
7777
import org.apache.tez.client.TezClient;
7878
import org.apache.tez.common.TezUtils;
79+
import org.apache.tez.dag.api.DAG;
7980
import org.apache.tez.dag.api.PreWarmVertex;
8081
import org.apache.tez.dag.api.SessionNotRunning;
8182
import org.apache.tez.dag.api.TezConfiguration;
8283
import org.apache.tez.dag.api.TezException;
8384
import org.apache.tez.dag.api.UserPayload;
85+
import org.apache.tez.dag.api.client.DAGClient;
8486
import org.apache.tez.dag.api.client.DAGStatus;
8587
import org.apache.tez.dag.api.client.Progress;
8688
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
@@ -820,6 +822,11 @@ public TezClient getTezClient() {
820822
return session;
821823
}
822824

825+
@Override
826+
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
827+
return getTezClient().submitDAG(dag);
828+
}
829+
823830
@Override
824831
public LocalResource getAppJarLr() {
825832
return appJarLr;

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -472,9 +472,8 @@ void ensureSessionHasResources(
472472
TezSession session, String[] nonConfResources) throws Exception {
473473
TezClient client = session.getTezClient();
474474
// TODO null can also mean that this operation was interrupted. Should we really try to re-create the session in that case ?
475-
if (client == null) {
476-
// Note: the only sane case where this can happen is the non-pool one. We should get rid
477-
// of it, in non-pool case perf doesn't matter so we might as well open at get time
475+
if (client == null || !session.isOpen()) {
476+
// Note: We should get rid of it, in non-pool case perf doesn't matter so we might as well open at get time
478477
// and then call update like we do in the else.
479478
// Can happen if the user sets the tez flag after the session was established.
480479
LOG.info("Tez session hasn't been created yet. Opening session");
@@ -692,7 +691,7 @@ DAGClient submit(DAG dag, Ref<TezSession> sessionStateRef) throws Exception {
692691

693692
private DAGClient submitInternal(DAG dag, TezSession sessionState) throws TezException, IOException {
694693
runtimeContext.init(sessionState);
695-
return sessionState.getTezClient().submitDAG(dag);
694+
return sessionState.submitDAG(dag);
696695
}
697696

698697
private void sessionDestroyOrReturnToPool(Ref<TezSession> sessionStateRef,

0 commit comments

Comments
 (0)