1919
2020import static org .apache .hadoop .hdds .protocol .proto .HddsProtos .ReplicationFactor .ONE ;
2121import static org .junit .jupiter .api .Assertions .assertEquals ;
22+ import static org .junit .jupiter .api .Assertions .assertFalse ;
23+ import static org .junit .jupiter .api .Assertions .assertTrue ;
2224
25+ import java .io .BufferedInputStream ;
26+ import java .io .File ;
27+ import java .io .InputStream ;
2328import java .io .OutputStream ;
29+ import java .nio .file .Files ;
2430import java .security .MessageDigest ;
31+ import java .util .Arrays ;
2532import java .util .Collections ;
33+ import java .util .List ;
2634import java .util .concurrent .ThreadLocalRandom ;
2735import org .apache .hadoop .hdds .StringUtils ;
36+ import org .apache .hadoop .hdds .client .BlockID ;
2837import org .apache .hadoop .hdds .client .RatisReplicationConfig ;
2938import org .apache .hadoop .hdds .conf .OzoneConfiguration ;
3039import org .apache .hadoop .hdds .conf .StorageUnit ;
3342import org .apache .hadoop .hdds .scm .storage .StreamBlockInputStream ;
3443import org .apache .hadoop .hdds .utils .db .CodecBuffer ;
3544import org .apache .hadoop .ozone .ClientConfigForTesting ;
45+ import org .apache .hadoop .ozone .HddsDatanodeService ;
3646import org .apache .hadoop .ozone .MiniOzoneCluster ;
37- import org .apache .hadoop .ozone .OzoneConfigKeys ;
3847import org .apache .hadoop .ozone .client .OzoneBucket ;
3948import org .apache .hadoop .ozone .client .OzoneClient ;
4049import org .apache .hadoop .ozone .client .OzoneClientFactory ;
4150import org .apache .hadoop .ozone .client .io .KeyInputStream ;
51+ import org .apache .hadoop .ozone .client .protocol .ClientProtocol ;
52+ import org .apache .hadoop .ozone .container .common .impl .ContainerData ;
53+ import org .apache .hadoop .ozone .container .common .impl .ContainerLayoutVersion ;
4254import org .apache .hadoop .ozone .om .TestBucket ;
55+ import org .apache .hadoop .ozone .om .helpers .OmKeyInfo ;
56+ import org .apache .hadoop .ozone .om .helpers .OmKeyLocationInfo ;
4357import org .apache .ozone .test .GenericTestUtils ;
58+ import org .apache .ratis .util .JavaUtils ;
4459import org .apache .ratis .util .SizeInBytes ;
60+ import org .apache .ratis .util .function .CheckedBiConsumer ;
4561import org .junit .jupiter .api .Test ;
4662import org .slf4j .LoggerFactory ;
4763import org .slf4j .event .Level ;
5268public class TestStreamRead {
5369 {
5470 GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("com" ), Level .ERROR );
55- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.ipc" ), Level .ERROR );
56- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.hdds.server.http" ), Level .ERROR );
57- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.hdds.scm.container" ), Level .ERROR );
58- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.hdds.scm.ha" ), Level .ERROR );
59- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.hdds.scm.safemode" ), Level .ERROR );
60- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.hdds.utils" ), Level .ERROR );
61- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.ozone.container.common" ), Level .ERROR );
62- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.ozone.om" ), Level .ERROR );
63- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.ratis" ), Level .ERROR );
71+ GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org" ), Level .ERROR );
72+
73+ GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("BackgroundPipelineScrubber" ), Level .ERROR );
74+ GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("ExpiredContainerReplicaOpScrubber" ), Level .ERROR );
75+ GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("SCMHATransactionMonitor" ), Level .ERROR );
6476 GenericTestUtils .setLogLevel (LoggerFactory .getLogger (CodecBuffer .class ), Level .ERROR );
6577 }
6678
6779 static final int CHUNK_SIZE = 1 << 20 ; // 1MB
6880 static final int FLUSH_SIZE = 2 * CHUNK_SIZE ; // 2MB
6981 static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE ; // 4MB
7082
71- static final int BLOCK_SIZE = 64 << 20 ;
7283 static final SizeInBytes KEY_SIZE = SizeInBytes .valueOf ("128M" );
84+ static final int BLOCK_SIZE = KEY_SIZE .getSizeInt ();
85+
86+ static final String DUMMY_KEY = "dummyKey" ;
7387
7488 static MiniOzoneCluster newCluster (int bytesPerChecksum ) throws Exception {
7589 final OzoneConfiguration conf = new OzoneConfiguration ();
@@ -79,9 +93,8 @@ static MiniOzoneCluster newCluster(int bytesPerChecksum) throws Exception {
7993 conf .setFromObject (config );
8094
8195 conf .setInt (ScmConfigKeys .OZONE_DATANODE_PIPELINE_LIMIT , 1 );
82- conf .setInt (ScmConfigKeys .OZONE_SCM_RATIS_PIPELINE_LIMIT , 5 );
96+ conf .setInt (ScmConfigKeys .OZONE_SCM_RATIS_PIPELINE_LIMIT , 1 );
8397 conf .setQuietMode (true );
84- conf .setStorageSize (OzoneConfigKeys .OZONE_SCM_BLOCK_SIZE , 64 , StorageUnit .MB );
8598
8699 ClientConfigForTesting .newBuilder (StorageUnit .BYTES )
87100 .setBlockSize (BLOCK_SIZE )
@@ -114,53 +127,128 @@ void testReadKey256k() throws Exception {
114127 }
115128
116129 void runTestReadKey (SizeInBytes keySize , SizeInBytes bytesPerChecksum ) throws Exception {
130+ System .out .println ("cluster starting ..." );
117131 try (MiniOzoneCluster cluster = newCluster (bytesPerChecksum .getSizeInt ())) {
118132 cluster .waitForClusterToBeReady ();
119-
120133 System .out .println ("cluster ready" );
121134
135+ final List <HddsDatanodeService > datanodes = cluster .getHddsDatanodes ();
136+ assertEquals (1 , datanodes .size ());
137+ final HddsDatanodeService datanode = datanodes .get (0 );
138+
122139 OzoneConfiguration conf = cluster .getConf ();
123140 OzoneClientConfig clientConfig = conf .getObject (OzoneClientConfig .class );
124141 clientConfig .setStreamReadBlock (true );
125- OzoneConfiguration copy = new OzoneConfiguration (conf );
126- copy .setFromObject (clientConfig );
142+ final OzoneConfiguration steamReadConf = new OzoneConfiguration (conf );
143+ steamReadConf .setFromObject (clientConfig );
144+
145+ clientConfig .setStreamReadBlock (false );
146+ final OzoneConfiguration nonSteamReadConf = new OzoneConfiguration (conf );
147+ nonSteamReadConf .setFromObject (clientConfig );
127148
128- final int n = 5 ;
129- final SizeInBytes writeBufferSize = SizeInBytes .valueOf ("8MB" );
130- final SizeInBytes [] readBufferSizes = {
149+ final SizeInBytes [] bufferSizes = {
131150 SizeInBytes .valueOf ("32M" ),
132151 SizeInBytes .valueOf ("8M" ),
133152 SizeInBytes .valueOf ("1M" ),
134153 SizeInBytes .valueOf ("4k" ),
135154 };
136155
137- try (OzoneClient client = OzoneClientFactory .getRpcClient (copy )) {
138- final TestBucket bucket = TestBucket .newBuilder (client ).build ();
156+ try (OzoneClient streamReadClient = OzoneClientFactory .getRpcClient (steamReadConf );
157+ OzoneClient nonStreamReadClient = OzoneClientFactory .getRpcClient (nonSteamReadConf )) {
158+ final TestBucket testBucket = TestBucket .newBuilder (streamReadClient ).build ();
159+ final String volume = testBucket .delegate ().getVolumeName ();
160+ final String bucket = testBucket .delegate ().getName ();
161+ final String keyName = "key0" ;
162+
163+ // get the client ready by writing a dummy key
164+ createKey (testBucket .delegate (), DUMMY_KEY , SizeInBytes .ONE_KB , SizeInBytes .ONE_KB );
165+
166+
167+ for (SizeInBytes bufferSize : bufferSizes ) {
168+ // create key
169+ System .out .println ("---------------------------------------------------------" );
170+ createKey (testBucket .delegate (), keyName , keySize , bufferSize );
171+
172+ // get block file and generate md5
173+ final OmKeyInfo info = nonStreamReadClient .getProxy ().getKeyInfo (volume , bucket , keyName , false );
174+ final List <OmKeyLocationInfo > locations = info .getLatestVersionLocations ().getLocationList ();
175+ assertEquals (1 , locations .size ());
176+ final BlockID blockId = locations .get (0 ).getBlockID ();
177+ final ContainerData containerData = datanode .getDatanodeStateMachine ().getContainer ().getContainerSet ()
178+ .getContainer (blockId .getContainerID ()).getContainerData ();
179+ final File blockFile = ContainerLayoutVersion .FILE_PER_BLOCK .getChunkFile (containerData , blockId , null );
180+ assertTrue (blockFile .exists ());
181+ assertEquals (BLOCK_SIZE , blockFile .length ());
182+ final String expectedMd5 = generateMd5 (keySize , SizeInBytes .ONE_MB , blockFile );
139183
140- for (int i = 0 ; i < n ; i ++) {
141- final String keyName = "key" + i ;
184+ // run tests
142185 System .out .println ("---------------------------------------------------------" );
143186 System .out .printf ("%s with %s bytes and %s bytesPerChecksum%n" ,
144187 keyName , keySize , bytesPerChecksum );
145188
146- final String md5 = createKey (bucket .delegate (), keyName , keySize , writeBufferSize );
147- for (SizeInBytes readBufferSize : readBufferSizes ) {
148- runTestReadKey (keyName , keySize , readBufferSize , null , bucket );
149- runTestReadKey (keyName , keySize , readBufferSize , md5 , bucket );
189+ final CheckedBiConsumer <SizeInBytes , String , Exception > streamRead = (readBufferSize , md5 )
190+ -> streamRead (keySize , readBufferSize , md5 , testBucket , keyName );
191+ final CheckedBiConsumer <SizeInBytes , String , Exception > nonStreamRead = (readBufferSize , md5 )
192+ -> nonStreamRead (keySize , readBufferSize , md5 , nonStreamReadClient , volume , bucket , keyName );
193+ final CheckedBiConsumer <SizeInBytes , String , Exception > fileRead = (readBufferSize , md5 )
194+ -> fileRead (keySize , readBufferSize , md5 , blockFile );
195+ final List <CheckedBiConsumer <SizeInBytes , String , Exception >> operations
196+ = Arrays .asList (streamRead , nonStreamRead , fileRead );
197+ Collections .shuffle (operations );
198+
199+ for (CheckedBiConsumer <SizeInBytes , String , Exception > op : operations ) {
200+ for (int i = 0 ; i < 5 ; i ++) {
201+ op .accept (bufferSize , null );
202+ }
203+ op .accept (bufferSize , expectedMd5 );
150204 }
151205 }
152206 }
153207 }
154208 }
155209
210+ static void streamRead (SizeInBytes keySize , SizeInBytes bufferSize , String expectedMD5 ,
211+ TestBucket bucket , String keyName ) throws Exception {
212+ try (KeyInputStream in = bucket .getKeyInputStream (keyName )) {
213+ assertTrue (in .isStreamBlockInputStream ());
214+ runTestReadKey (keySize , bufferSize , expectedMD5 , in );
215+ }
216+ }
217+
218+ static void nonStreamRead (SizeInBytes keySize , SizeInBytes bufferSize , String expectedMD5 ,
219+ OzoneClient nonStreamReadClient , String volume , String bucket , String keyName ) throws Exception {
220+ final ClientProtocol proxy = nonStreamReadClient .getProxy ();
221+ try (KeyInputStream in = (KeyInputStream ) proxy .getKey (volume , bucket , keyName ).getInputStream ()) {
222+ assertFalse (in .isStreamBlockInputStream ());
223+ runTestReadKey (keySize , bufferSize , expectedMD5 , in );
224+ }
225+ }
226+
227+ static void fileRead (SizeInBytes keySize , SizeInBytes bufferSize , String expectedMD5 ,
228+ File blockFile ) throws Exception {
229+ try (InputStream in = new BufferedInputStream (Files .newInputStream (blockFile .toPath ()), bufferSize .getSizeInt ())) {
230+ runTestReadKey (keySize , bufferSize , expectedMD5 , in );
231+ }
232+ }
233+
234+ static String generateMd5 (SizeInBytes keySize , SizeInBytes bufferSize , File blockFile ) throws Exception {
235+ try (InputStream in = new BufferedInputStream (Files .newInputStream (blockFile .toPath ()), bufferSize .getSizeInt ())) {
236+ return runTestReadKey ("generateMd5" , keySize , bufferSize , true , in );
237+ }
238+ }
239+
156240 static void print (String name , long keySizeByte , long elapsedNanos , SizeInBytes bufferSize , String computedMD5 ) {
157241 final double keySizeMb = keySizeByte * 1.0 / (1 << 20 );
158242 final double elapsedSeconds = elapsedNanos / 1_000_000_000.0 ;
159- System .out .printf ("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f MB, md5=%s)%n" ,
160- name , keySizeMb / elapsedSeconds , elapsedSeconds , bufferSize , keySizeMb , computedMD5 );
243+ if (computedMD5 == null ) {
244+ System .out .printf ("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f MB)%n" ,
245+ name , keySizeMb / elapsedSeconds , elapsedSeconds , bufferSize , keySizeMb );
246+ } else {
247+ System .out .printf ("%16s md5=%s%n" , name , computedMD5 );
248+ }
161249 }
162250
163- static String createKey (OzoneBucket bucket , String keyName , SizeInBytes keySize , SizeInBytes bufferSize )
251+ static void createKey (OzoneBucket bucket , String keyName , SizeInBytes keySize , SizeInBytes bufferSize )
164252 throws Exception {
165253 final byte [] buffer = new byte [bufferSize .getSizeInt ()];
166254 ThreadLocalRandom .current ().nextBytes (buffer );
@@ -177,49 +265,47 @@ static String createKey(OzoneBucket bucket, String keyName, SizeInBytes keySize,
177265 }
178266 final long elapsedNanos = System .nanoTime () - startTime ;
179267
180- final MessageDigest md5 = MessageDigest .getInstance ("MD5" );
181268 for (long pos = 0 ; pos < keySizeByte ;) {
182269 final int writeSize = Math .toIntExact (Math .min (buffer .length , keySizeByte - pos ));
183- md5 .update (buffer , 0 , writeSize );
184270 pos += writeSize ;
185271 }
186272
187- final String computedMD5 = StringUtils .bytes2Hex (md5 .digest ());
188- print ("createStreamKey" , keySizeByte , elapsedNanos , bufferSize , computedMD5 );
189- return computedMD5 ;
273+ if (!keyName .startsWith (DUMMY_KEY )) {
274+ print ("createStreamKey" , keySizeByte , elapsedNanos , bufferSize , null );
275+ }
276+ }
277+
278+ static void runTestReadKey (SizeInBytes keySize , SizeInBytes bufferSize , String expectedMD5 ,
279+ InputStream in ) throws Exception {
280+ final String method = JavaUtils .getCallerStackTraceElement ().getMethodName ();
281+ final String computedMD5 = runTestReadKey (method , keySize , bufferSize , expectedMD5 != null , in );
282+ assertEquals (expectedMD5 , computedMD5 );
190283 }
191284
192- private void runTestReadKey (String keyName , SizeInBytes keySize , SizeInBytes bufferSize , String expectedMD5 ,
193- TestBucket bucket ) throws Exception {
285+ static String runTestReadKey (String name , SizeInBytes keySize , SizeInBytes bufferSize , boolean generateMd5 ,
286+ InputStream in ) throws Exception {
194287 final long keySizeByte = keySize .getSize ();
195288 final MessageDigest md5 = MessageDigest .getInstance ("MD5" );
196289 // Read the data fully into a large enough byte array
197290 final byte [] buffer = new byte [bufferSize .getSizeInt ()];
198291 final long startTime = System .nanoTime ();
199- try (KeyInputStream keyInputStream = bucket .getKeyInputStream (keyName )) {
200- int pos = 0 ;
201- for (; pos < keySizeByte ;) {
202- final int read = keyInputStream .read (buffer , 0 , buffer .length );
203- if (read == -1 ) {
204- break ;
205- }
292+ int pos = 0 ;
293+ for (; pos < keySizeByte ;) {
294+ final int read = in .read (buffer , 0 , buffer .length );
295+ if (read == -1 ) {
296+ break ;
297+ }
206298
207- if (expectedMD5 != null ) {
208- md5 .update (buffer , 0 , read );
209- }
210- pos += read ;
299+ if (generateMd5 ) {
300+ md5 .update (buffer , 0 , read );
211301 }
212- assertEquals ( keySizeByte , pos ) ;
302+ pos += read ;
213303 }
304+ assertEquals (keySizeByte , pos );
214305 final long elapsedNanos = System .nanoTime () - startTime ;
215306
216- final String computedMD5 ;
217- if (expectedMD5 == null ) {
218- computedMD5 = null ;
219- } else {
220- computedMD5 = StringUtils .bytes2Hex (md5 .digest ());
221- assertEquals (expectedMD5 , computedMD5 );
222- }
223- print ("readStreamKey" , keySizeByte , elapsedNanos , bufferSize , computedMD5 );
307+ final String computedMD5 = generateMd5 ? StringUtils .bytes2Hex (md5 .digest ()) : null ;
308+ print (name , keySizeByte , elapsedNanos , bufferSize , computedMD5 );
309+ return computedMD5 ;
224310 }
225311}
0 commit comments