2222import com .google .common .annotations .VisibleForTesting ;
2323import java .io .IOException ;
2424import java .io .InterruptedIOException ;
25+ import java .io .UncheckedIOException ;
2526import java .util .ArrayList ;
2627import java .util .Collections ;
2728import java .util .Comparator ;
3132import java .util .Objects ;
3233import java .util .concurrent .CompletableFuture ;
3334import java .util .concurrent .ConcurrentHashMap ;
35+ import java .util .concurrent .ConcurrentMap ;
3436import java .util .concurrent .ExecutionException ;
3537import java .util .concurrent .Semaphore ;
3638import java .util .concurrent .TimeUnit ;
@@ -91,14 +93,13 @@ public class XceiverClientGrpc extends XceiverClientSpi {
9193 private static final int SHUTDOWN_WAIT_MAX_SECONDS = 5 ;
9294 private final Pipeline pipeline ;
9395 private final ConfigurationSource config ;
94- private final Map <DatanodeID , XceiverClientProtocolServiceStub > asyncStubs ;
9596 private final XceiverClientMetrics metrics ;
96- private final Map <DatanodeID , ManagedChannel > channels ;
9797 private final Semaphore semaphore ;
9898 private long timeout ;
9999 private final SecurityConfig secConfig ;
100100 private final boolean topologyAwareRead ;
101101 private final ClientTrustManager trustManager ;
102+ private final ConcurrentMap <DatanodeID , ChannelInfo > dnChannelInfoMap ;
102103 // Cache the DN which returned the GetBlock command so that the ReadChunk
103104 // command can be sent to the same DN.
104105 private final Map <DatanodeBlockID , DatanodeDetails > getBlockDNcache ;
@@ -126,8 +127,7 @@ public XceiverClientGrpc(Pipeline pipeline, ConfigurationSource config,
126127 this .semaphore =
127128 new Semaphore (HddsClientUtils .getMaxOutstandingRequests (config ));
128129 this .metrics = XceiverClientManager .getXceiverClientMetrics ();
129- this .channels = new ConcurrentHashMap <>();
130- this .asyncStubs = new ConcurrentHashMap <>();
130+ this .dnChannelInfoMap = new ConcurrentHashMap <>();
131131 this .topologyAwareRead = config .getBoolean (
132132 OzoneConfigKeys .OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY ,
133133 OzoneConfigKeys .OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT );
@@ -161,49 +161,42 @@ public void connect() throws Exception {
161161 connectToDatanode (dn );
162162 }
163163
164- private void connectToDatanode (DatanodeDetails dn )
165- throws IOException {
164+ private void connectToDatanode (DatanodeDetails dn ) throws IOException {
166165 if (isClosed .get ()) {
167166 throw new IOException ("Client is closed." );
168167 }
169168
170- if (isConnected (dn )) {
171- return ;
172- }
173- // read port from the data node, on failure use default configured port
174- int port = dn .getStandalonePort ().getValue ();
175- if (port == 0 ) {
176- port = config .getInt (OzoneConfigKeys .HDDS_CONTAINER_IPC_PORT ,
177- OzoneConfigKeys .HDDS_CONTAINER_IPC_PORT_DEFAULT );
178- }
179- final int finalPort = port ;
180-
181- LOG .debug ("Connecting to server : {}; nodes in pipeline : {}, " , dn , pipeline .getNodes ());
182-
183- channels .computeIfPresent (dn .getID (), (dnId , channel ) -> {
184- if (channel .isTerminated () || channel .isShutdown ()) {
185- asyncStubs .remove (dnId );
186- return null ; // removes from channels map
187- }
188-
189- return channel ;
190- });
191-
192- ManagedChannel channel ;
193169 try {
194- channel = channels .computeIfAbsent (dn .getID (), dnId -> {
195- try {
196- return createChannel (dn , finalPort ).build ();
197- } catch (IOException e ) {
198- throw new RuntimeException (e );
170+ dnChannelInfoMap .compute (dn .getID (), (dnId , channelInfo ) -> {
171+ // channel is absent or stale
172+ if (channelInfo == null || channelInfo .isChannelInactive ()) {
173+ LOG .debug ("Connecting to server: {}; nodes in pipeline: {}" , dn , pipeline .getNodes ());
174+ try {
175+ return generateNewChannel (dn );
176+ } catch (IOException e ) {
177+ throw new UncheckedIOException (e );
178+ }
199179 }
180+
181+ // channel is present and active
182+ return channelInfo ;
200183 });
201- } catch (RuntimeException e ) {
184+ } catch (UncheckedIOException e ) {
202185 LOG .error ("Failed to create channel to datanode {}" , dn , e );
203- throw new IOException (e .getCause ());
186+ throw e .getCause ();
187+ }
188+ }
189+
190+ private ChannelInfo generateNewChannel (DatanodeDetails dn ) throws IOException {
191+ // read port from the data node, on failure use default configured port
192+ int port = dn .getStandalonePort ().getValue ();
193+ if (port == 0 ) {
194+ port = config .getInt (OzoneConfigKeys .HDDS_CONTAINER_IPC_PORT , OzoneConfigKeys .HDDS_CONTAINER_IPC_PORT_DEFAULT );
204195 }
205196
206- asyncStubs .computeIfAbsent (dn .getID (), dnId -> XceiverClientProtocolServiceGrpc .newStub (channel ));
197+ ManagedChannel channel = createChannel (dn , port ).build ();
198+ XceiverClientProtocolServiceStub stub = XceiverClientProtocolServiceGrpc .newStub (channel );
199+ return new ChannelInfo (channel , stub );
207200 }
208201
209202 protected NettyChannelBuilder createChannel (DatanodeDetails dn , int port )
@@ -248,11 +241,12 @@ private boolean datanodeUseHostName() {
248241 */
249242 @ VisibleForTesting
250243 public boolean isConnected (DatanodeDetails details ) {
251- return isConnected (channels .get (details .getID ()));
252- }
244+ if (details == null ) {
245+ return false ;
246+ }
253247
254- private boolean isConnected ( ManagedChannel channel ) {
255- return channel != null && !channel . isTerminated () && ! channel . isShutdown ();
248+ ChannelInfo channelInfo = dnChannelInfoMap . get ( details . getID ());
249+ return channelInfo != null && !channelInfo . isChannelInactive ();
256250 }
257251
258252 /**
@@ -267,13 +261,17 @@ public void close() {
267261 return ;
268262 }
269263
270- for (ManagedChannel channel : channels .values ()) {
271- channel .shutdown ();
264+ for (ChannelInfo channelInfo : dnChannelInfoMap .values ()) {
265+ channelInfo . getChannel () .shutdown ();
272266 }
273267
274268 final long maxWaitNanos = TimeUnit .SECONDS .toNanos (SHUTDOWN_WAIT_MAX_SECONDS );
275269 long deadline = System .nanoTime () + maxWaitNanos ;
276- List <ManagedChannel > nonTerminatedChannels = new ArrayList <>(channels .values ());
270+ List <ManagedChannel > nonTerminatedChannels = dnChannelInfoMap .values ()
271+ .stream ()
272+ .map (ChannelInfo ::getChannel )
273+ .filter (Objects ::nonNull )
274+ .collect (Collectors .toList ());
277275
278276 while (!nonTerminatedChannels .isEmpty () && System .nanoTime () < deadline ) {
279277 nonTerminatedChannels .removeIf (ManagedChannel ::isTerminated );
@@ -286,16 +284,17 @@ public void close() {
286284 }
287285 }
288286
289- List <DatanodeID > failedChannels = channels .entrySet ().stream ()
290- .filter (e -> !e .getValue ().isTerminated ())
287+ List <DatanodeID > failedChannels = dnChannelInfoMap .entrySet ()
288+ .stream ()
289+ .filter (e -> !e .getValue ().getChannel ().isTerminated ())
291290 .map (Map .Entry ::getKey )
292291 .collect (Collectors .toList ());
292+
293293 if (!failedChannels .isEmpty ()) {
294294 LOG .warn ("Channels {} did not terminate within timeout." , failedChannels );
295295 }
296296
297- channels .clear ();
298- asyncStubs .clear ();
297+ dnChannelInfoMap .clear ();
299298 }
300299
301300 @ Override
@@ -581,7 +580,7 @@ public void initStreamRead(BlockID blockID, StreamingReaderSpi streamObserver) t
581580 try {
582581 checkOpen (dn );
583582 semaphore .acquire ();
584- XceiverClientProtocolServiceStub stub = asyncStubs .get (dn .getID ());
583+ XceiverClientProtocolServiceStub stub = dnChannelInfoMap .get (dn .getID ()). getStub ( );
585584 if (stub == null ) {
586585 throw new IOException ("Failed to get gRPC stub for DataNode: " + dn );
587586 }
@@ -698,7 +697,7 @@ public XceiverClientReply sendCommandAsync(
698697
699698 // create a new grpc message stream pair for each call.
700699 final StreamObserver <ContainerCommandRequestProto > requestObserver =
701- asyncStubs .get (dnId ).withDeadlineAfter (timeout , TimeUnit .SECONDS )
700+ dnChannelInfoMap .get (dnId ). getStub ( ).withDeadlineAfter (timeout , TimeUnit .SECONDS )
702701 .send (new StreamObserver <ContainerCommandResponseProto >() {
703702 @ Override
704703 public void onNext (ContainerCommandResponseProto value ) {
@@ -739,30 +738,9 @@ private void decreasePendingMetricsAndReleaseSemaphore() {
739738
740739 private void checkOpen (DatanodeDetails dn )
741740 throws IOException {
742- if (isClosed .get ()) {
743- throw new IOException ("This channel is not connected." );
744- }
745-
746- ManagedChannel channel = channels .get (dn .getID ());
747- // If the channel doesn't exist for this specific datanode or the channel
748- // is closed, just reconnect
749- if (!isConnected (channel )) {
750- reconnect (dn );
751- }
752-
753- }
754-
755- private void reconnect (DatanodeDetails dn )
756- throws IOException {
757- ManagedChannel channel ;
758- try {
759- connectToDatanode (dn );
760- channel = channels .get (dn .getID ());
761- } catch (Exception e ) {
762- throw new IOException ("Error while connecting" , e );
763- }
741+ connectToDatanode (dn );
764742
765- if (!isConnected (channel )) {
743+ if (!isConnected (dn )) {
766744 throw new IOException ("This channel is not connected." );
767745 }
768746 }
@@ -784,4 +762,31 @@ public ConfigurationSource getConfig() {
784762 public void setTimeout (long timeout ) {
785763 this .timeout = timeout ;
786764 }
765+
766+ /**
767+ * Group the channel and stub so that they are published together.
768+ */
769+ private static class ChannelInfo {
770+ private final ManagedChannel channel ;
771+ private final XceiverClientProtocolServiceStub stub ;
772+
773+ ChannelInfo (ManagedChannel channel , XceiverClientProtocolServiceStub stub ) {
774+ this .channel = channel ;
775+ this .stub = stub ;
776+ }
777+
778+ public ManagedChannel getChannel () {
779+ return channel ;
780+ }
781+
782+ public XceiverClientProtocolServiceStub getStub () {
783+ return stub ;
784+ }
785+
786+ public boolean isChannelInactive () {
787+ return channel == null
788+ || channel .isTerminated ()
789+ || channel .isShutdown ();
790+ }
791+ }
787792}
0 commit comments