Skip to content

Commit e344f64

Browse files
authored
HDFS-17893. Close block readers in DFSStripedInputStream on EC read exception (#8348)
1 parent eb8f005 commit e344f64

2 files changed

Lines changed: 191 additions & 4 deletions

File tree

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ private void readDataForDecoding() throws IOException {
198198
checkMissingBlocks();
199199
}
200200

201-
void readParityChunks(int num) throws IOException {
201+
private void readParityChunks(int num) throws IOException {
202202
for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
203203
i++) {
204204
if (alignedStripe.chunks[i] == null) {
@@ -298,7 +298,7 @@ private Callable<BlockReadStats> readCells(final BlockReader reader,
298298
};
299299
}
300300

301-
boolean readChunk(final LocatedBlock block, int chunkIndex)
301+
private boolean readChunk(final LocatedBlock block, int chunkIndex)
302302
throws IOException {
303303
final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
304304
if (block == null) {
@@ -353,7 +353,7 @@ void readStripe() throws IOException {
353353
readParityChunks(alignedStripe.missingChunksNum);
354354
}
355355
} catch (IOException e) {
356-
dfsStripedInputStream.close();
356+
dfsStripedInputStream.closeCurrentBlockReaders();
357357
throw e;
358358
}
359359
// TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
@@ -397,7 +397,7 @@ void readStripe() throws IOException {
397397
} catch (InterruptedException ie) {
398398
String err = "Read request interrupted";
399399
DFSClient.LOG.error(err, ie);
400-
dfsStripedInputStream.close();
400+
dfsStripedInputStream.closeCurrentBlockReaders();
401401
clearFutures();
402402
// Don't decode if read interrupted
403403
throw new InterruptedIOException(err);
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs;
19+
20+
import static org.assertj.core.api.Assertions.fail;
21+
22+
import java.io.IOException;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.CyclicBarrier;
27+
28+
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.fs.Path;
30+
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
31+
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
32+
import org.apache.hadoop.hdfs.server.datanode.DataNode;
33+
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
34+
import org.apache.hadoop.io.erasurecode.CodecUtil;
35+
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
36+
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
37+
import org.junit.jupiter.api.AfterEach;
38+
import org.junit.jupiter.api.BeforeEach;
39+
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.io.TempDir;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
43+
44+
public class TestDFSStripedInputStreamReadFailures {
45+
46+
public static final Logger LOG =
47+
LoggerFactory.getLogger(TestDFSStripedInputStreamReadFailures.class);
48+
49+
private MiniDFSCluster cluster;
50+
private Configuration conf = new Configuration();
51+
private DistributedFileSystem fs;
52+
private ErasureCodingPolicy ecPolicy;
53+
private short dataBlocks;
54+
private short parityBlocks;
55+
private int cellSize;
56+
private final int stripesPerBlock = 2;
57+
private int blockSize;
58+
59+
@TempDir
60+
private java.nio.file.Path baseDir;
61+
62+
@BeforeEach
63+
public void setup() throws IOException {
64+
ecPolicy = StripedFileTestUtil.getDefaultECPolicy();
65+
dataBlocks = (short) ecPolicy.getNumDataUnits();
66+
parityBlocks = (short) ecPolicy.getNumParityUnits();
67+
cellSize = ecPolicy.getCellSize();
68+
blockSize = stripesPerBlock * cellSize;
69+
70+
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
71+
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
72+
if (ErasureCodeNative.isNativeCodeLoaded()) {
73+
conf.set(
74+
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
75+
NativeRSRawErasureCoderFactory.CODER_NAME);
76+
}
77+
78+
cluster = new MiniDFSCluster.Builder(conf, baseDir.toFile()).numDataNodes(
79+
dataBlocks + parityBlocks).build();
80+
cluster.waitActive();
81+
for (DataNode dn : cluster.getDataNodes()) {
82+
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
83+
}
84+
fs = cluster.getFileSystem();
85+
DFSTestUtil.enableAllECPolicies(fs);
86+
fs.getClient()
87+
.setErasureCodingPolicy("/", ecPolicy.getName());
88+
}
89+
90+
@AfterEach
91+
public void tearDown() {
92+
if (cluster != null) {
93+
cluster.shutdown();
94+
cluster = null;
95+
}
96+
}
97+
98+
private Path writeFile(String name, byte[] bytes) throws Exception {
99+
Path path = new Path(name);
100+
101+
DFSTestUtil.writeFile(fs, path, new String(bytes));
102+
StripedFileTestUtil.waitBlockGroupsReported(fs, name);
103+
104+
StripedFileTestUtil.checkData(fs, path, bytes.length,
105+
new ArrayList<DatanodeInfo>(), null, blockSize * dataBlocks);
106+
107+
return path;
108+
}
109+
110+
@Test
111+
public void testReadWithXceiverExhaustion() throws Exception {
112+
113+
// Write a little more than 1 stripe size
114+
// worth of data to 10 files
115+
int numBytes = cellSize * dataBlocks + 123;
116+
int numFiles = 10;
117+
118+
byte[] content = StripedFileTestUtil.generateBytes(numBytes);
119+
Path[] files = new Path[numFiles];
120+
for (int i = 0; i < numFiles; i++) {
121+
files[i] = writeFile("/file_"+ i, content);
122+
}
123+
124+
// reconfigure DNs with xceivers set to 2
125+
for (DataNode dn : cluster.getDataNodes()) {
126+
dn.reconfigureProperty(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, "2");
127+
}
128+
boolean reconfigurationComplete = false;
129+
while (!reconfigurationComplete) {
130+
Thread.sleep(100);
131+
for (DataNode dn : cluster.getDataNodes()) {
132+
if (dn.getXceiverCount() != 2) {
133+
break;
134+
}
135+
}
136+
reconfigurationComplete = true;
137+
}
138+
139+
// Start a thread for each file that we created
140+
// and use StripedFileTestUtil.verifyStatefulRead
141+
// to read from the file.
142+
final List<Throwable> exceptions = new ArrayList<>();
143+
final List<Thread> threads = new ArrayList<>(numFiles);
144+
final CyclicBarrier barrier = new CyclicBarrier(numFiles);
145+
final CountDownLatch completed = new CountDownLatch(numFiles);
146+
ThreadGroup testGroup = new ThreadGroup("xceiverTestThreads") {
147+
@Override
148+
public void uncaughtException(Thread t, Throwable e) {
149+
exceptions.add(e);
150+
super.uncaughtException(t, e);
151+
}
152+
};
153+
for (int i = 0; i < numFiles; i++) {
154+
int fileNum = i;
155+
threads.add(new Thread(testGroup, () -> {
156+
byte[] buffer = new byte[numBytes];
157+
try {
158+
barrier.await();
159+
StripedFileTestUtil.verifyStatefulRead(fs, files[fileNum], numBytes, content, buffer);
160+
} catch (Exception e1) {
161+
exceptions.add(e1);
162+
} finally {
163+
completed.countDown();
164+
}
165+
}));
166+
}
167+
threads.forEach(t -> t.start());
168+
completed.await();
169+
threads.forEach(t -> {
170+
try {
171+
t.join();
172+
} catch (InterruptedException e1) {
173+
throw new RuntimeException("Interrupted while trying to join thread");
174+
}
175+
});
176+
if (exceptions.size() > 0) {
177+
LOG.info("{} exceptions occurred", exceptions.size());
178+
exceptions.forEach(t -> {
179+
LOG.error("Exception details:", t);
180+
if (!(t instanceof IOException &&
181+
t.getMessage().contains("missing blocks, the stripe is"))) {
182+
fail("Unexpected exceptions occurred during test", t);
183+
}
184+
});
185+
}
186+
}
187+
}

0 commit comments

Comments
 (0)