Skip to content

Commit da8314e

Browse files
committed
Address Co-pilot comments and fix the cases for default Sessions
1 parent ff84296 commit da8314e

6 files changed

Lines changed: 42 additions & 17 deletions

File tree

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ExternalSessionsRegistry.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,11 @@ public interface ExternalSessionsRegistry {
4040
* Closes the external session registry.
4141
*/
4242
void close();
43+
44+
/**
45+
* Returns true if this registry instance currently holds a claim on the given AM.
46+
*/
47+
default boolean isClaimed(String appId) {
48+
return true; // Non-ZK registries case is always true
49+
}
4350
}

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ public boolean killQuery(String reason) throws HiveException {
198198

199199
@Override
200200
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
201+
if (!registry.isClaimed(externalAppId)) {
202+
throw new TezException("Cannot submit DAG as the Tez Session no-longer owns the AM: " + externalAppId);
203+
}
201204
try {
202205
return getTezClient().submitDAG(dag);
203206
} catch (TezException e) {
@@ -210,6 +213,9 @@ public DAGClient submitDAG(DAG dag) throws TezException, IOException {
210213
}
211214

212215
private void tryKillRunningDAGs(TezClient session) throws TezException {
216+
if (!registry.isClaimed(externalAppId)) {
217+
throw new TezException("Cannot kill running DAG as the Tez Session no-longer owns the AM: " + externalAppId);
218+
}
213219
LOG.info("External session has an AM which is already running a DAG on app ID {}", externalAppId);
214220
DAGClientAMProtocolBlockingPB proxy = session.sendAMHeartbeat(null);
215221
if (proxy == null) {

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ void returnSession(TezSession tezSessionState) {
354354
defaultSessionPool.returnSession((TezSessionPoolSession)tezSessionState);
355355
}
356356

357-
if (useExternalSessions) {
357+
if (useExternalSessions && !tezSessionState.isDefault()) {
358358
if (tezSessionState.getTezClient() != null
359359
&& tezSessionState.getTezClient().getAppMasterApplicationId() != null) {
360360
try {

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -472,9 +472,8 @@ void ensureSessionHasResources(
472472
TezSession session, String[] nonConfResources) throws Exception {
473473
TezClient client = session.getTezClient();
474474
// TODO null can also mean that this operation was interrupted. Should we really try to re-create the session in that case ?
475-
if (client == null) {
476-
// Note: the only sane case where this can happen is the non-pool one. We should get rid
477-
// of it, in non-pool case perf doesn't matter so we might as well open at get time
475+
if (client == null || !session.isOpen()) {
476+
// Note: We should get rid of it, in non-pool case perf doesn't matter so we might as well open at get time
478477
// and then call update like we do in the else.
479478
// Can happen if the user sets the tez flag after the session was established.
480479
LOG.info("Tez session hasn't been created yet. Opening session");

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class ZookeeperExternalSessionsRegistryClient implements ExternalSessions
6161
private InterProcessMutex globalQueue;
6262
private String claimsPath;
6363
private volatile boolean isInitialized;
64+
private volatile boolean zkConnectionHealthy = false;
6465

6566

6667
public ZookeeperExternalSessionsRegistryClient(final HiveConf initConf) {
@@ -99,17 +100,17 @@ private void init() {
99100
client.start();
100101

101102
client.getConnectionStateListenable().addListener((curatorClient, newState) -> {
102-
if (newState != ConnectionState.LOST) {
103-
return;
104-
}
105-
Set<String> sessionsToKill;
106-
synchronized (lock) {
107-
LOG.error("ZK connection state has changed to lost; killing running DAGs on claimed AMs: {}", taken);
108-
sessionsToKill = new HashSet<>(taken);
109-
taken.clear();
110-
}
111-
for (String appId : sessionsToKill) {
112-
TezJobMonitor.killRunningDAGsForApplication(appId);
103+
if (newState == ConnectionState.CONNECTED || newState == ConnectionState.RECONNECTED) {
104+
zkConnectionHealthy = true;
105+
} else if (newState == ConnectionState.LOST) {
106+
zkConnectionHealthy = false;
107+
Set<String> sessionsToKill;
108+
synchronized (lock) {
109+
LOG.error("ZK connection state has changed to lost; killing running DAGs on claimed AMs: {}", taken);
110+
sessionsToKill = new HashSet<>(taken);
111+
taken.clear();
112+
}
113+
sessionsToKill.forEach(TezJobMonitor::killRunningDAGsForApplication);
113114
}
114115
});
115116

@@ -201,7 +202,9 @@ public String getSession() throws Exception {
201202
}
202203
long remainingTimeNs = timeoutNs - (System.nanoTime() - startTimeNs);
203204
if (remainingTimeNs > 0) {
204-
long waitTimeMs = Math.min(1000L, remainingTimeNs / 1_000_000L);
205+
// Add one to remainingTime milliseconds computation to prevent the case where
206+
// (remainingTimeNs / 1000000L) can return 0 causing the lock to be held indefinitely.
207+
long waitTimeMs = Math.min(1000L, (remainingTimeNs / 1000000L) + 1);
205208
lock.wait(waitTimeMs);
206209
}
207210
}
@@ -288,4 +291,14 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve
288291
}
289292
}
290293
}
294+
295+
@Override
296+
public boolean isClaimed(String appId) {
297+
if (!zkConnectionHealthy) {
298+
return false;
299+
}
300+
synchronized (lock) {
301+
return taken.contains(appId);
302+
}
303+
}
291304
}

ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ public static void killRunningDAGsForApplication(String applicationId) {
507507
c.tryKillDAG();
508508
}
509509
} catch (Exception e) {
510-
LOG.error("Error while trying to kill running DAG on tez session {}", applicationId);
510+
LOG.error("Error while trying to kill running DAG on tez session {}", applicationId, e);
511511
}
512512
}
513513
}

0 commit comments

Comments
 (0)