1717 */
1818package org .apache .drill .exec .coord .zk ;
1919
20- import static org .apache .drill .shaded .guava .com .google .common .collect .Collections2 .transform ;
21- import java .io .IOException ;
22- import java .util .Collection ;
23- import java .util .Collections ;
24- import java .util .ArrayList ;
25- import java .util .Set ;
26- import java .util .HashSet ;
27- import java .util .concurrent .ConcurrentHashMap ;
28- import java .util .concurrent .CountDownLatch ;
29- import java .util .concurrent .TimeUnit ;
30- import java .util .regex .Matcher ;
31- import java .util .regex .Pattern ;
32-
33- import org .apache .curator .framework .imps .DefaultACLProvider ;
34- import org .apache .drill .shaded .guava .com .google .common .base .Throwables ;
35- import org .apache .commons .collections .keyvalue .MultiKey ;
3620import org .apache .curator .RetryPolicy ;
3721import org .apache .curator .framework .CuratorFramework ;
3822import org .apache .curator .framework .CuratorFrameworkFactory ;
3923import org .apache .curator .framework .api .ACLProvider ;
24+ import org .apache .curator .framework .imps .DefaultACLProvider ;
4025import org .apache .curator .framework .state .ConnectionState ;
4126import org .apache .curator .framework .state .ConnectionStateListener ;
4227import org .apache .curator .retry .RetryNTimes ;
5742import org .apache .drill .exec .coord .store .TransientStoreFactory ;
5843import org .apache .drill .exec .proto .CoordinationProtos .DrillbitEndpoint ;
5944import org .apache .drill .exec .proto .CoordinationProtos .DrillbitEndpoint .State ;
60- import org .apache .drill .shaded .guava .com .google .common .base .Function ;
45+ import org .apache .drill .shaded .guava .com .google .common .base .Throwables ;
46+
47+ import java .io .IOException ;
48+ import java .util .Collection ;
49+ import java .util .Collections ;
50+ import java .util .HashMap ;
51+ import java .util .HashSet ;
52+ import java .util .Map ;
53+ import java .util .Set ;
54+ import java .util .concurrent .ConcurrentHashMap ;
55+ import java .util .concurrent .CountDownLatch ;
56+ import java .util .concurrent .TimeUnit ;
57+ import java .util .regex .Matcher ;
58+ import java .util .regex .Pattern ;
59+ import java .util .stream .Collectors ;
6160
6261/**
6362 * Manages cluster coordination utilizing zookeeper. *
@@ -74,8 +73,8 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
7473 private ServiceCache <DrillbitEndpoint > serviceCache ;
7574 private DrillbitEndpoint endpoint ;
7675
77- // endpointsMap maps Multikey( comprises of endoint address and port) to Drillbit endpoints
78- private ConcurrentHashMap <MultiKey , DrillbitEndpoint > endpointsMap = new ConcurrentHashMap <MultiKey , DrillbitEndpoint >();
76+ // endpointsMap maps String UUID to Drillbit endpoints
77+ private ConcurrentHashMap <String , DrillbitEndpoint > endpointsMap = new ConcurrentHashMap <>();
7978 private static final Pattern ZK_COMPLEX_STRING = Pattern .compile ("(^.*?)/(.*)/([^/]*)$" );
8079
8180 public ZKClusterCoordinator (DrillConfig config , String connect ) {
@@ -237,7 +236,12 @@ public RegistrationHandle update(RegistrationHandle handle, State state) {
237236
238237 @ Override
239238 public Collection <DrillbitEndpoint > getAvailableEndpoints () {
240- return this .endpoints ;
239+ return getAvailableEndpointsUUID ().values ();
240+ }
241+
242+ @ Override
243+ public Map <String , DrillbitEndpoint > getAvailableEndpointsUUID () {
244+ return this .endpointsMap ;
241245 }
242246
243247 /*
@@ -249,14 +253,19 @@ public Collection<DrillbitEndpoint> getAvailableEndpoints() {
249253 */
250254 @ Override
251255 public Collection <DrillbitEndpoint > getOnlineEndPoints () {
252- Collection <DrillbitEndpoint > runningEndPoints = new ArrayList <>();
253- for (DrillbitEndpoint endpoint : endpoints ){
254- if (isDrillbitInState (endpoint , State .ONLINE )) {
255- runningEndPoints .add (endpoint );
256+ return getOnlineEndpointsUUID ().keySet ();
257+ }
258+
259+ @ Override
260+ public Map <DrillbitEndpoint , String > getOnlineEndpointsUUID () {
261+ Map <DrillbitEndpoint , String > onlineEndpointsUUID = new HashMap <>();
262+ for (Map .Entry <String , DrillbitEndpoint > endpointEntry : endpointsMap .entrySet ()) {
263+ if (isDrillbitInState (endpointEntry .getValue (), State .ONLINE )) {
264+ onlineEndpointsUUID .put (endpointEntry .getValue (), endpointEntry .getKey ());
256265 }
257266 }
258- logger .debug ("Online endpoints in ZK are" + runningEndPoints .toString ());
259- return runningEndPoints ;
267+ logger .debug ("Online endpoints in ZK are" + onlineEndpointsUUID . keySet () .toString ());
268+ return onlineEndpointsUUID ;
260269 }
261270
262271 @ Override
@@ -273,14 +282,11 @@ public <V> TransientStore<V> getOrCreateTransientStore(final TransientStoreConfi
273282 private synchronized void updateEndpoints () {
274283 try {
275284 // All active bits in the Zookeeper
276- Collection <DrillbitEndpoint > newDrillbitSet =
277- transform (discovery .queryForInstances (serviceName ),
278- new Function <ServiceInstance <DrillbitEndpoint >, DrillbitEndpoint >() {
279- @ Override
280- public DrillbitEndpoint apply (ServiceInstance <DrillbitEndpoint > input ) {
281- return input .getPayload ();
282- }
283- });
285+ final Map <String , DrillbitEndpoint > activeEndpointsUUID = discovery .queryForInstances (serviceName ).stream ()
286+ .collect (Collectors .toMap (ServiceInstance ::getId , ServiceInstance ::getPayload ));
287+
288+ final Map <DrillbitEndpoint , String > UUIDtoEndpoints = activeEndpointsUUID .entrySet ().stream ()
289+ .collect (Collectors .toMap (Map .Entry ::getValue , Map .Entry ::getKey ));
284290
285291 // set of newly dead bits : original bits - new set of active bits.
286292 Set <DrillbitEndpoint > unregisteredBits = new HashSet <>();
@@ -290,29 +296,32 @@ public DrillbitEndpoint apply(ServiceInstance<DrillbitEndpoint> input) {
290296
291297 // Updates the endpoints map if there is a change in state of the endpoint or with the addition
292298 // of new drillbit endpoints. Registered endpoints is set to newly live drillbit endpoints.
293- for ( DrillbitEndpoint endpoint : newDrillbitSet ) {
294- String endpointAddress = endpoint .getAddress ();
295- int endpointPort = endpoint .getUserPort ();
296- if (! endpointsMap .containsKey (new MultiKey (endpointAddress , endpointPort ))) {
297- registeredBits .add (endpoint );
298- }
299- endpointsMap .put (new MultiKey (endpointAddress , endpointPort ),endpoint );
299+ for (Map .Entry <String , DrillbitEndpoint > endpointToUUID : activeEndpointsUUID .entrySet ()) {
300+ endpointsMap .put (endpointToUUID .getKey (), endpointToUUID .getValue ());
300301 }
302+
301303 // Remove all the endpoints that are newly dead
302- for ( MultiKey key : endpointsMap .keySet ()) {
303- if (!newDrillbitSet .contains (endpointsMap .get (key ))) {
304- unregisteredBits .add (endpointsMap .get (key ));
305- endpointsMap .remove (key );
304+ for ( String bitUUID : endpointsMap .keySet ()) {
305+ if (!activeEndpointsUUID .containsKey (bitUUID )) {
306+ final DrillbitEndpoint unregisteredBit = endpointsMap .get (bitUUID );
307+ unregisteredBits .add (unregisteredBit );
308+
309+ if (UUIDtoEndpoints .containsKey (unregisteredBit )) {
310+ logger .info ("Drillbit registered again with different UUID. [Details: Address: {}, UserPort: {}," +
311+ " PreviousUUID: {}, CurrentUUID: {}" , unregisteredBit .getAddress (), unregisteredBit .getUserPort (),
312+ bitUUID , UUIDtoEndpoints .get (unregisteredBit ));
313+ }
314+ endpointsMap .remove (bitUUID );
306315 }
307316 }
308317 endpoints = endpointsMap .values ();
309318 if (logger .isDebugEnabled ()) {
310319 StringBuilder builder = new StringBuilder ();
311320 builder .append ("Active drillbit set changed. Now includes " );
312- builder .append (newDrillbitSet .size ());
321+ builder .append (activeEndpointsUUID .size ());
313322 builder .append (" total bits. New active drillbits:\n " );
314323 builder .append ("Address | User Port | Control Port | Data Port | Version | State\n " );
315- for (DrillbitEndpoint bit : newDrillbitSet ) {
324+ for (DrillbitEndpoint bit : activeEndpointsUUID . values () ) {
316325 builder .append (bit .getAddress ()).append (" | " );
317326 builder .append (bit .getUserPort ()).append (" | " );
318327 builder .append (bit .getControlPort ()).append (" | " );
0 commit comments