3333import org .apache .curator .framework .recipes .cache .PathChildrenCacheEvent ;
3434import org .apache .curator .framework .recipes .cache .PathChildrenCacheListener ;
3535import org .apache .curator .framework .recipes .locks .InterProcessMutex ;
36+ import org .apache .curator .framework .state .ConnectionState ;
37+ import org .apache .curator .framework .state .ConnectionStateListener ;
3638import org .apache .curator .retry .ExponentialBackoffRetry ;
3739import org .apache .hadoop .hive .conf .HiveConf ;
3840import org .apache .hadoop .hive .conf .HiveConf .ConfVars ;
41+ import org .apache .hadoop .hive .ql .exec .tez .monitoring .TezJobMonitor ;
3942import org .apache .zookeeper .CreateMode ;
4043import org .apache .zookeeper .KeeperException ;
4144import org .slf4j .Logger ;
@@ -56,6 +59,7 @@ public class ZookeeperExternalSessionsRegistryClient implements ExternalSessions
5659 private CuratorCache cache ;
5760 private CuratorCache claimsCache ;
5861 private InterProcessMutex globalQueue ;
62+ private String sessionsPath ;
5963 private String claimsPath ;
6064 private volatile boolean isInitialized ;
6165
@@ -71,18 +75,51 @@ private static String getApplicationId(final ChildData childData) {
7175
7276 private void init () {
7377 String zkServer = HiveConf .getVar (initConf , ConfVars .HIVE_ZOOKEEPER_QUORUM );
78+ int sessionTimeoutMs = (int ) HiveConf .getTimeVar (initConf , ConfVars .HIVE_ZOOKEEPER_SESSION_TIMEOUT ,
79+ TimeUnit .MILLISECONDS );
80+ int connectionTimeoutMs = (int ) HiveConf .getTimeVar (initConf , ConfVars .HIVE_ZOOKEEPER_CONNECTION_TIMEOUT ,
81+ TimeUnit .MILLISECONDS );
82+ int baseSleepTimeMs = (int ) HiveConf .getTimeVar (initConf , ConfVars .HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME ,
83+ TimeUnit .MILLISECONDS );
84+ int maxRetries = HiveConf .getIntVar (initConf , ConfVars .HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES );
7485 String zkNamespace = HiveConf .getVar (initConf , ConfVars .HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE );
75- String effectivePath = normalizeZkPath (zkNamespace );
76- String queuePath = effectivePath + "-queue" ;
77- this .claimsPath = effectivePath + "-claims" ;
78- this .client = CuratorFrameworkFactory .newClient (zkServer , new ExponentialBackoffRetry (1000 , 3 ));
79-
86+ this .sessionsPath = normalizeZkPath (zkNamespace );
87+ this .claimsPath = this .sessionsPath + "-claims" ;
88+ // After connection state changes to SUSPENDED, the client has already consumed ~2/3 of the negotiated session
89+ // timeout. Use 33% of the remaining window so LOST aligns with when the ZK server expires the session and drops
90+ // ephemeral claim nodes. For Ref: Curator TN14
91+ this .client = CuratorFrameworkFactory .builder ()
92+ .connectString (zkServer )
93+ .sessionTimeoutMs (sessionTimeoutMs )
94+ .connectionTimeoutMs (connectionTimeoutMs )
95+ .simulatedSessionExpirationPercent (33 )
96+ .retryPolicy (new ExponentialBackoffRetry (baseSleepTimeMs , maxRetries ))
97+ .build ();
98+
8099 synchronized (lock ) {
81100 client .start ();
82- this .globalQueue = new InterProcessMutex (client , queuePath );
83- this .cache = CuratorCache .build (client , effectivePath );
101+
102+ client .getConnectionStateListenable ().addListener (new ConnectionStateListener () {
103+ @ Override
104+ public void stateChanged (CuratorFramework client , ConnectionState newState ) {
105+ if (newState == ConnectionState .LOST ) {
106+ Set <String > sessionsToKill ;
107+ synchronized (lock ) {
108+ LOG .error ("ZK connection state has changed to lost; killing running DAGs on claimed AMs: {}" , taken );
109+ sessionsToKill = new HashSet <>(taken );
110+ taken .clear ();
111+ }
112+ for (String appId : sessionsToKill ) {
113+ TezJobMonitor .killRunningDAGsForApplication (appId );
114+ }
115+ }
116+ }
117+ });
118+
119+ this .globalQueue = new InterProcessMutex (client , sessionsPath + "-queue" );
120+ this .cache = CuratorCache .build (client , sessionsPath );
84121 CuratorCacheListener listener = CuratorCacheListener .builder ()
85- .forPathChildrenCache (effectivePath , client , new ExternalSessionsPathListener ())
122+ .forPathChildrenCache (sessionsPath , client , new ExternalSessionsPathListener ())
86123 .build ();
87124 cache .listenable ().addListener (listener );
88125 cache .start ();
@@ -105,8 +142,12 @@ private void init() {
105142 String applicationId = getApplicationId (childData );
106143 synchronized (lock ) {
107144 if (!taken .contains (applicationId )) {
108- available .add (applicationId );
109- lock .notifyAll ();
145+ if (cache .get (sessionsPath + "/" + applicationId ).isPresent ()) {
146+ available .add (applicationId );
147+ lock .notifyAll ();
148+ } else {
149+ LOG .info ("Ignoring AM claim removal for {} because the base AM node no longer exists." , applicationId );
150+ }
110151 }
111152 }
112153 }).build ();
@@ -115,7 +156,7 @@ private void init() {
115156
116157 cache .stream ()
117158 .filter (childData -> childData .getPath () != null
118- && childData .getPath ().startsWith (effectivePath + "/" ))
159+ && childData .getPath ().startsWith (sessionsPath + "/" ))
119160 .forEach (childData -> available .add (getApplicationId (childData )));
120161 LOG .info ("Initial external sessions: {}" , available );
121162 isInitialized = true ;
@@ -143,7 +184,6 @@ public String getSession() throws Exception {
143184 if (!globalQueue .acquire (queueWaitTimeMs , TimeUnit .MILLISECONDS )) {
144185 throw new IOException ("Cannot get a session (timed out in queue) after " + maxAttempts + " seconds" );
145186 }
146-
147187 try {
148188 synchronized (lock ) {
149189 while (System .nanoTime () - startTimeNs < timeoutNs ) {
@@ -181,14 +221,14 @@ public void returnSession(String appId) {
181221 throw new IllegalStateException ("Not initialized" );
182222 }
183223 if (!taken .remove (appId )) {
184- return ; // Session has already been removed from ZK.
224+ return ; // Session has been removed from ZK.
185225 }
186226
187227 try {
188228 client .delete ().guaranteed ().forPath (claimsPath + "/" + appId );
189229 } catch (KeeperException .NoNodeException e ) {
190230 // If the claim Node has already been deleted, we can ignore it.
191- LOG .debug ("Claim Node has already been deleted for the session {}" , appId , e );
231+ LOG .warn ("Claim Node has already been deleted for the session {}" , appId , e );
192232 } catch (Exception e ) {
193233 LOG .warn ("Failed to delete claim node for session {}" , appId , e );
194234 }
@@ -230,6 +270,10 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve
230270 if (available .contains (applicationId ) || taken .contains (applicationId )) {
231271 return ; // We do not expect updates to existing sessions; ignore them for now.
232272 }
273+ if (claimsCache .get (claimsPath + "/" + applicationId ).isPresent ()) {
274+ LOG .info ("Ignoring newly added AM {} because it is already claimed by another session." , applicationId );
275+ return ;
276+ }
233277 available .add (applicationId );
234278 lock .notifyAll ();
235279 break ;
0 commit comments