2121import java .util .HashSet ;
2222import java .util .Iterator ;
2323import java .util .Set ;
24+ import java .util .concurrent .TimeUnit ;
2425
2526import com .google .common .annotations .VisibleForTesting ;
2627import org .apache .curator .framework .CuratorFramework ;
3132import org .apache .curator .framework .recipes .cache .CuratorCacheListener ;
3233import org .apache .curator .framework .recipes .cache .PathChildrenCacheEvent ;
3334import org .apache .curator .framework .recipes .cache .PathChildrenCacheListener ;
35+ import org .apache .curator .framework .recipes .locks .InterProcessMutex ;
3436import org .apache .curator .retry .ExponentialBackoffRetry ;
3537import org .apache .hadoop .hive .conf .HiveConf ;
3638import org .apache .hadoop .hive .conf .HiveConf .ConfVars ;
39+ import org .apache .zookeeper .CreateMode ;
40+ import org .apache .zookeeper .KeeperException ;
3741import org .slf4j .Logger ;
3842import org .slf4j .LoggerFactory ;
3943
@@ -48,8 +52,11 @@ public class ZookeeperExternalSessionsRegistryClient implements ExternalSessions
4852 private final Set <String > taken = new HashSet <>();
4953 private final Object lock = new Object ();
5054 private final int maxAttempts ;
51-
55+ private CuratorFramework client ;
5256 private CuratorCache cache ;
57+ private CuratorCache claimsCache ;
58+ private InterProcessMutex globalQueue ;
59+ private String claimsPath ;
5360 private volatile boolean isInitialized ;
5461
5562
@@ -66,15 +73,27 @@ private void init() {
6673 String zkServer = HiveConf .getVar (initConf , ConfVars .HIVE_ZOOKEEPER_QUORUM );
6774 String zkNamespace = HiveConf .getVar (initConf , ConfVars .HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE );
6875 String effectivePath = normalizeZkPath (zkNamespace );
69- CuratorFramework client = CuratorFrameworkFactory .newClient (zkServer , new ExponentialBackoffRetry (1000 , 3 ));
76+ String queuePath = effectivePath + "-queue" ;
77+ this .claimsPath = effectivePath + "-claims" ;
78+ this .client = CuratorFrameworkFactory .newClient (zkServer , new ExponentialBackoffRetry (1000 , 3 ));
79+
7080 synchronized (lock ) {
7181 client .start ();
82+ this .globalQueue = new InterProcessMutex (client , queuePath );
7283 this .cache = CuratorCache .build (client , effectivePath );
7384 CuratorCacheListener listener = CuratorCacheListener .builder ()
7485 .forPathChildrenCache (effectivePath , client , new ExternalSessionsPathListener ())
7586 .build ();
7687 cache .listenable ().addListener (listener );
7788 cache .start ();
89+
90+ this .claimsCache = CuratorCache .build (client , claimsPath );
91+ CuratorCacheListener claimsListener = CuratorCacheListener .builder ()
92+ .forPathChildrenCache (claimsPath , client , new ClaimsPathListener ())
93+ .build ();
94+ claimsCache .listenable ().addListener (claimsListener );
95+ claimsCache .start ();
96+
7897 cache .stream ()
7998 .filter (childData -> childData .getPath () != null
8099 && childData .getPath ().startsWith (effectivePath + "/" ))
@@ -91,22 +110,47 @@ static String normalizeZkPath(String zkNamespace) {
91110
92111 @ Override
93112 public String getSession () throws Exception {
94- synchronized (lock ) {
95- if (!isInitialized ) {
96- init ();
97- }
98- long endTimeNs = System .nanoTime () + (1000000000L * maxAttempts );
99- while (available .isEmpty () && ((endTimeNs - System .nanoTime ()) > 0 )) {
100- lock .wait (1000L );
113+ if (!isInitialized ) {
114+ synchronized (lock ) {
115+ if (!isInitialized ) {
116+ init ();
117+ }
101118 }
102- Iterator <String > iter = available .iterator ();
103- if (!iter .hasNext ()) {
104- throw new IOException ("Cannot get a session after " + maxAttempts + " attempts" );
119+ }
120+
121+ long endTimeNs = System .nanoTime () + (1000000000L * maxAttempts );
122+ long queueWaitTimeMs = Math .max (0 , (endTimeNs - System .nanoTime ()) / 1000000L );
123+ if (!globalQueue .acquire (queueWaitTimeMs , TimeUnit .MILLISECONDS )) {
124+ throw new IOException ("Cannot get a session (timed out in queue) after " + maxAttempts + " seconds" );
125+ }
126+
127+ try {
128+ synchronized (lock ) {
129+ while (System .nanoTime () < endTimeNs ) {
130+ Iterator <String > iter = available .iterator ();
131+
132+ while (iter .hasNext ()) {
133+ String appId = iter .next ();
134+ try {
135+ String claimNodePath = claimsPath + "/" + appId ;
136+ client .create ().creatingParentsIfNeeded ().withMode (CreateMode .EPHEMERAL ).forPath (claimNodePath );
137+ iter .remove ();
138+ taken .add (appId );
139+ return appId ;
140+ } catch (KeeperException .NodeExistsException e ) {
141+ iter .remove ();
142+ }
143+ }
144+ long remainingTimeNs = endTimeNs - System .nanoTime ();
145+ if (remainingTimeNs > 0 ) {
146+ long waitTimeMs = Math .min (1000L , remainingTimeNs / 1_000_000L );
147+ lock .wait (waitTimeMs );
148+ }
149+ }
150+ throw new IOException ("Cannot get a session after waiting for " + maxAttempts + " seconds (timeout exhausted)" );
105151 }
106- String appId = iter .next ();
107- iter .remove ();
108- taken .add (appId );
109- return appId ;
152+ } finally {
153+ globalQueue .release ();
110154 }
111155 }
112156
@@ -117,18 +161,33 @@ public void returnSession(String appId) {
117161 throw new IllegalStateException ("Not initialized" );
118162 }
119163 if (!taken .remove (appId )) {
120- return ; // Session has been removed from ZK.
164+ return ; // Session has already been removed from ZK.
121165 }
166+
167+ try {
168+ client .delete ().guaranteed ().forPath (claimsPath + "/" + appId );
169+ } catch (KeeperException .NoNodeException e ) {
170+ // If the claim Node has already been deleted, we can ignore it.
171+ } catch (Exception e ) {
172+ LOG .warn ("Failed to delete claim node for session {}" , appId , e );
173+ }
174+
122175 available .add (appId );
123176 lock .notifyAll ();
124177 }
125178 }
126179
127180 @ Override
128181 public void close () {
182+ if (claimsCache != null ) {
183+ claimsCache .close ();
184+ }
129185 if (cache != null ) {
130186 cache .close ();
131187 }
188+ if (client != null ) {
189+ client .close ();
190+ }
132191 }
133192
134193 private final class ExternalSessionsPathListener implements PathChildrenCacheListener {
@@ -148,9 +207,10 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve
148207 switch (event .getType ()) {
149208 case CHILD_UPDATED , CHILD_ADDED :
150209 if (available .contains (applicationId ) || taken .contains (applicationId )) {
151- return ; // We do not expect updates to existing sessions; ignore them for now.
210+ return ;
152211 }
153212 available .add (applicationId );
213+ lock .notifyAll ();
154214 break ;
155215 case CHILD_REMOVED :
156216 if (taken .remove (applicationId )) {
@@ -165,4 +225,34 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve
165225 }
166226 }
167227 }
228+
229+ private final class ClaimsPathListener implements PathChildrenCacheListener {
230+ @ Override
231+ public void childEvent (CuratorFramework client , PathChildrenCacheEvent event ) {
232+ ChildData childData = event .getData ();
233+ if (childData == null ) {
234+ return ;
235+ }
236+
237+ String applicationId = getApplicationId (childData );
238+ synchronized (lock ) {
239+ switch (event .getType ()) {
240+ case CHILD_REMOVED :
241+ if (!taken .contains (applicationId )) {
242+ // if the claim node was released by this particular HS2 itself,
243+ // it will be added back to the available list & locks are notified as part of returnSession()
244+ available .add (applicationId );
245+ lock .notifyAll ();
246+ }
247+ break ;
248+ case CHILD_ADDED :
249+ // A Tez AM was claimed by another HS2, so remove the AM from the available list of this particular HS2
250+ available .remove (applicationId );
251+ break ;
252+ default :
253+ break ;
254+ }
255+ }
256+ }
257+ }
168258}
0 commit comments