3434import org .apache .curator .framework .recipes .cache .PathChildrenCacheListener ;
3535import org .apache .curator .framework .recipes .locks .InterProcessMutex ;
3636import org .apache .curator .framework .state .ConnectionState ;
37- import org .apache .curator .framework .state .ConnectionStateListener ;
3837import org .apache .curator .retry .ExponentialBackoffRetry ;
3938import org .apache .hadoop .hive .conf .HiveConf ;
4039import org .apache .hadoop .hive .conf .HiveConf .ConfVars ;
4948// TODO: tez should provide this registry
5049public class ZookeeperExternalSessionsRegistryClient implements ExternalSessionsRegistry {
5150 private static final Logger LOG = LoggerFactory .getLogger (ZookeeperExternalSessionsRegistryClient .class );
51+ private static final String PATH_SEPARATOR = "/" ;
5252
5353 private final HiveConf initConf ;
5454 private final Set <String > available = new HashSet <>();
@@ -59,7 +59,6 @@ public class ZookeeperExternalSessionsRegistryClient implements ExternalSessions
5959 private CuratorCache cache ;
6060 private CuratorCache claimsCache ;
6161 private InterProcessMutex globalQueue ;
62- private String sessionsPath ;
6362 private String claimsPath ;
6463 private volatile boolean isInitialized ;
6564
@@ -70,7 +69,7 @@ public ZookeeperExternalSessionsRegistryClient(final HiveConf initConf) {
7069 }
7170
7271 private static String getApplicationId (final ChildData childData ) {
73- return childData .getPath ().substring (childData .getPath ().lastIndexOf ("/" ) + 1 );
72+ return childData .getPath ().substring (childData .getPath ().lastIndexOf (PATH_SEPARATOR ) + 1 );
7473 }
7574
7675 private void init () {
@@ -83,8 +82,8 @@ private void init() {
8382 TimeUnit .MILLISECONDS );
8483 int maxRetries = HiveConf .getIntVar (initConf , ConfVars .HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES );
8584 String zkNamespace = HiveConf .getVar (initConf , ConfVars .HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE );
86- this . sessionsPath = normalizeZkPath (zkNamespace );
87- this .claimsPath = this . sessionsPath + "-claims" ;
85+ String effectivePath = normalizeZkPath (zkNamespace );
86+ this .claimsPath = effectivePath + "-claims" ;
8887 // After connection state changes to SUSPENDED, the client has already consumed ~2/3 of the negotiated session
8988 // timeout. Use 33% of the remaining window so LOST aligns with when the ZK server expires the session and drops
9089 // ephemeral claim nodes. For Ref: Curator TN14
@@ -99,27 +98,25 @@ private void init() {
9998 synchronized (lock ) {
10099 client .start ();
101100
102- client .getConnectionStateListenable ().addListener (new ConnectionStateListener () {
103- @ Override
104- public void stateChanged (CuratorFramework client , ConnectionState newState ) {
105- if (newState == ConnectionState .LOST ) {
106- Set <String > sessionsToKill ;
107- synchronized (lock ) {
108- LOG .error ("ZK connection state has changed to lost; killing running DAGs on claimed AMs: {}" , taken );
109- sessionsToKill = new HashSet <>(taken );
110- taken .clear ();
111- }
112- for (String appId : sessionsToKill ) {
113- TezJobMonitor .killRunningDAGsForApplication (appId );
114- }
115- }
101+ client .getConnectionStateListenable ().addListener ((client , newState ) -> {
102+ if (newState != ConnectionState .LOST ) {
103+ return ;
104+ }
105+ Set <String > sessionsToKill ;
106+ synchronized (lock ) {
107+ LOG .error ("ZK connection state has changed to lost; killing running DAGs on claimed AMs: {}" , taken );
108+ sessionsToKill = new HashSet <>(taken );
109+ taken .clear ();
110+ }
111+ for (String appId : sessionsToKill ) {
112+ TezJobMonitor .killRunningDAGsForApplication (appId );
116113 }
117114 });
118115
119- this .globalQueue = new InterProcessMutex (client , sessionsPath + "-queue" );
120- this .cache = CuratorCache .build (client , sessionsPath );
116+ this .globalQueue = new InterProcessMutex (client , effectivePath + "-queue" );
117+ this .cache = CuratorCache .build (client , effectivePath );
121118 CuratorCacheListener listener = CuratorCacheListener .builder ()
122- .forPathChildrenCache (sessionsPath , client , new ExternalSessionsPathListener ())
119+ .forPathChildrenCache (effectivePath , client , new ExternalSessionsPathListener ())
123120 .build ();
124121 cache .listenable ().addListener (listener );
125122 cache .start ();
@@ -142,11 +139,12 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) {
142139 String applicationId = getApplicationId (childData );
143140 synchronized (lock ) {
144141 if (!taken .contains (applicationId )) {
145- if (cache .get (sessionsPath + "/" + applicationId ).isPresent ()) {
142+ if (cache .get (effectivePath + PATH_SEPARATOR + applicationId ).isPresent ()) {
146143 available .add (applicationId );
147144 lock .notifyAll ();
148145 } else {
149- LOG .info ("Ignoring AM claim removal for {} because the base AM node no longer exists." , applicationId );
146+ LOG .info ("Ignoring AM claim removal for {} because the base AM node no longer exists." ,
147+ applicationId );
150148 }
151149 }
152150 }
@@ -156,7 +154,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) {
156154
157155 cache .stream ()
158156 .filter (childData -> childData .getPath () != null
159- && childData .getPath ().startsWith (sessionsPath + "/" ))
157+ && childData .getPath ().startsWith (effectivePath + PATH_SEPARATOR ))
160158 .forEach (childData -> available .add (getApplicationId (childData )));
161159 LOG .info ("Initial external sessions: {}" , available );
162160 isInitialized = true ;
@@ -165,7 +163,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) {
165163
166164 @ VisibleForTesting
167165 static String normalizeZkPath (String zkNamespace ) {
168- return (zkNamespace .startsWith ("/" ) ? zkNamespace : "/" + zkNamespace );
166+ return (zkNamespace .startsWith (PATH_SEPARATOR ) ? zkNamespace : PATH_SEPARATOR + zkNamespace );
169167 }
170168
171169 @ Override
@@ -192,7 +190,7 @@ public String getSession() throws Exception {
192190 while (iter .hasNext ()) {
193191 String appId = iter .next ();
194192 try {
195- String claimNodePath = claimsPath + "/" + appId ;
193+ String claimNodePath = claimsPath + PATH_SEPARATOR + appId ;
196194 client .create ().creatingParentsIfNeeded ().withMode (CreateMode .EPHEMERAL ).forPath (claimNodePath );
197195 iter .remove ();
198196 taken .add (appId );
@@ -225,7 +223,7 @@ public void returnSession(String appId) {
225223 }
226224
227225 try {
228- client .delete ().guaranteed ().forPath (claimsPath + "/" + appId );
226+ client .delete ().guaranteed ().forPath (claimsPath + PATH_SEPARATOR + appId );
229227 } catch (KeeperException .NoNodeException e ) {
230228 // If the claim Node has already been deleted, we can ignore it.
231229 LOG .debug ("Claim Node has already been deleted for the session {}" , appId , e );
@@ -270,7 +268,7 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve
270268 if (available .contains (applicationId ) || taken .contains (applicationId )) {
271269 return ; // We do not expect updates to existing sessions; ignore them for now.
272270 }
273- if (claimsCache .get (claimsPath + "/" + applicationId ).isPresent ()) {
271+ if (claimsCache .get (claimsPath + PATH_SEPARATOR + applicationId ).isPresent ()) {
274272 LOG .info ("Ignoring newly added AM {} because it is already claimed by another session." , applicationId );
275273 return ;
276274 }
0 commit comments