1919
2020import static org .apache .hadoop .hdds .protocol .proto .HddsProtos .ReplicationFactor .ONE ;
2121import static org .junit .jupiter .api .Assertions .assertEquals ;
22+ import static org .junit .jupiter .api .Assertions .assertFalse ;
23+ import static org .junit .jupiter .api .Assertions .assertTrue ;
2224
25+ import java .io .BufferedInputStream ;
26+ import java .io .File ;
27+ import java .io .InputStream ;
2328import java .io .OutputStream ;
29+ import java .nio .file .Files ;
2430import java .security .MessageDigest ;
31+ import java .util .Arrays ;
2532import java .util .Collections ;
33+ import java .util .List ;
2634import java .util .concurrent .ThreadLocalRandom ;
35+
2736import org .apache .hadoop .hdds .StringUtils ;
37+ import org .apache .hadoop .hdds .client .BlockID ;
2838import org .apache .hadoop .hdds .client .RatisReplicationConfig ;
2939import org .apache .hadoop .hdds .conf .OzoneConfiguration ;
3040import org .apache .hadoop .hdds .conf .StorageUnit ;
3343import org .apache .hadoop .hdds .scm .storage .StreamBlockInputStream ;
3444import org .apache .hadoop .hdds .utils .db .CodecBuffer ;
3545import org .apache .hadoop .ozone .ClientConfigForTesting ;
46+ import org .apache .hadoop .ozone .HddsDatanodeService ;
3647import org .apache .hadoop .ozone .MiniOzoneCluster ;
37- import org .apache .hadoop .ozone .OzoneConfigKeys ;
3848import org .apache .hadoop .ozone .client .OzoneBucket ;
3949import org .apache .hadoop .ozone .client .OzoneClient ;
4050import org .apache .hadoop .ozone .client .OzoneClientFactory ;
4151import org .apache .hadoop .ozone .client .io .KeyInputStream ;
52+ import org .apache .hadoop .ozone .client .protocol .ClientProtocol ;
53+ import org .apache .hadoop .ozone .container .common .impl .ContainerData ;
54+ import org .apache .hadoop .ozone .container .common .impl .ContainerLayoutVersion ;
4255import org .apache .hadoop .ozone .om .TestBucket ;
56+ import org .apache .hadoop .ozone .om .helpers .OmKeyInfo ;
57+ import org .apache .hadoop .ozone .om .helpers .OmKeyLocationInfo ;
4358import org .apache .ozone .test .GenericTestUtils ;
59+ import org .apache .ratis .util .JavaUtils ;
4460import org .apache .ratis .util .SizeInBytes ;
61+ import org .apache .ratis .util .function .CheckedBiConsumer ;
4562import org .junit .jupiter .api .Test ;
4663import org .slf4j .LoggerFactory ;
4764import org .slf4j .event .Level ;
5269public class TestStreamRead {
5370 {
5471 GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("com" ), Level .ERROR );
55- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.ipc" ), Level .ERROR );
56- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.hdds.server.http" ), Level .ERROR );
57- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.hdds.scm.container" ), Level .ERROR );
58- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.hdds.scm.ha" ), Level .ERROR );
59- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.hdds.scm.safemode" ), Level .ERROR );
60- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.hdds.utils" ), Level .ERROR );
61- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.ozone.container.common" ), Level .ERROR );
62- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.hadoop.ozone.om" ), Level .ERROR );
63- GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org.apache.ratis" ), Level .ERROR );
72+ GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("org" ), Level .ERROR );
73+
74+ GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("BackgroundPipelineScrubber" ), Level .ERROR );
75+ GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("ExpiredContainerReplicaOpScrubber" ), Level .ERROR );
76+ GenericTestUtils .setLogLevel (LoggerFactory .getLogger ("SCMHATransactionMonitor" ), Level .ERROR );
6477 GenericTestUtils .setLogLevel (LoggerFactory .getLogger (CodecBuffer .class ), Level .ERROR );
6578 }
6679
6780 static final int CHUNK_SIZE = 1 << 20 ; // 1MB
6881 static final int FLUSH_SIZE = 2 * CHUNK_SIZE ; // 2MB
6982 static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE ; // 4MB
7083
71- static final int BLOCK_SIZE = 64 << 20 ;
7284 static final SizeInBytes KEY_SIZE = SizeInBytes .valueOf ("128M" );
85+ static final int BLOCK_SIZE = KEY_SIZE .getSizeInt ();
7386
7487 static MiniOzoneCluster newCluster (int bytesPerChecksum ) throws Exception {
7588 final OzoneConfiguration conf = new OzoneConfiguration ();
@@ -79,9 +92,8 @@ static MiniOzoneCluster newCluster(int bytesPerChecksum) throws Exception {
7992 conf .setFromObject (config );
8093
8194 conf .setInt (ScmConfigKeys .OZONE_DATANODE_PIPELINE_LIMIT , 1 );
82- conf .setInt (ScmConfigKeys .OZONE_SCM_RATIS_PIPELINE_LIMIT , 5 );
95+ conf .setInt (ScmConfigKeys .OZONE_SCM_RATIS_PIPELINE_LIMIT , 1 );
8396 conf .setQuietMode (true );
84- conf .setStorageSize (OzoneConfigKeys .OZONE_SCM_BLOCK_SIZE , 64 , StorageUnit .MB );
8597
8698 ClientConfigForTesting .newBuilder (StorageUnit .BYTES )
8799 .setBlockSize (BLOCK_SIZE )
@@ -114,53 +126,132 @@ void testReadKey256k() throws Exception {
114126 }
115127
116128 void runTestReadKey (SizeInBytes keySize , SizeInBytes bytesPerChecksum ) throws Exception {
129+ System .out .println ("cluster starting ..." );
117130 try (MiniOzoneCluster cluster = newCluster (bytesPerChecksum .getSizeInt ())) {
118131 cluster .waitForClusterToBeReady ();
119-
120132 System .out .println ("cluster ready" );
121133
134+ final List <HddsDatanodeService > datanodes = cluster .getHddsDatanodes ();
135+ assertEquals (1 , datanodes .size ());
136+ final HddsDatanodeService datanode = datanodes .get (0 );
137+
122138 OzoneConfiguration conf = cluster .getConf ();
123139 OzoneClientConfig clientConfig = conf .getObject (OzoneClientConfig .class );
124140 clientConfig .setStreamReadBlock (true );
125- OzoneConfiguration copy = new OzoneConfiguration (conf );
126- copy .setFromObject (clientConfig );
141+ final OzoneConfiguration steamReadConf = new OzoneConfiguration (conf );
142+ steamReadConf .setFromObject (clientConfig );
143+
144+ clientConfig .setStreamReadBlock (false );
145+ final OzoneConfiguration nonSteamReadConf = new OzoneConfiguration (conf );
146+ nonSteamReadConf .setFromObject (clientConfig );
127147
128- final int n = 5 ;
129- final SizeInBytes writeBufferSize = SizeInBytes .valueOf ("8MB" );
148+ final SizeInBytes writeBufferSize = SizeInBytes .valueOf ("32MB" );
130149 final SizeInBytes [] readBufferSizes = {
131150 SizeInBytes .valueOf ("32M" ),
132151 SizeInBytes .valueOf ("8M" ),
133152 SizeInBytes .valueOf ("1M" ),
134153 SizeInBytes .valueOf ("4k" ),
135154 };
155+ final int [][] combinations = {
156+ {0 , 1 , 2 },
157+ {0 , 2 , 1 },
158+ {1 , 0 , 2 },
159+ {1 , 2 , 0 },
160+ {2 , 0 , 1 },
161+ {2 , 1 , 0 },
162+ };
136163
137- try (OzoneClient client = OzoneClientFactory .getRpcClient (copy )) {
138- final TestBucket bucket = TestBucket .newBuilder (client ).build ();
164+ try (OzoneClient streamReadClient = OzoneClientFactory .getRpcClient (steamReadConf );
165+ OzoneClient nonStreamReadClient = OzoneClientFactory .getRpcClient (nonSteamReadConf )) {
166+ final TestBucket streamReadBucket = TestBucket .newBuilder (streamReadClient ).build ();
167+ final String bucket = streamReadBucket .delegate ().getName ();
168+ final String volume = streamReadBucket .delegate ().getVolumeName ();
139169
140- for (int i = 0 ; i < n ; i ++) {
170+ for (int i = 0 ; i < combinations . length ; i ++) {
141171 final String keyName = "key" + i ;
142172 System .out .println ("---------------------------------------------------------" );
143173 System .out .printf ("%s with %s bytes and %s bytesPerChecksum%n" ,
144174 keyName , keySize , bytesPerChecksum );
145175
146- final String md5 = createKey (bucket .delegate (), keyName , keySize , writeBufferSize );
176+ // create key
177+ createKey (streamReadBucket .delegate (), keyName , keySize , writeBufferSize );
178+
179+ // get block file and generate md5
180+ final OmKeyInfo info = nonStreamReadClient .getProxy ().getKeyInfo (volume , bucket , keyName , false );
181+ final List <OmKeyLocationInfo > locations = info .getLatestVersionLocations ().getLocationList ();
182+ assertEquals (1 , locations .size ());
183+ final BlockID blockId = locations .get (0 ).getBlockID ();
184+ final ContainerData containerData = datanode .getDatanodeStateMachine ().getContainer ().getContainerSet ()
185+ .getContainer (blockId .getContainerID ()).getContainerData ();
186+ final File blockFile = ContainerLayoutVersion .FILE_PER_BLOCK .getChunkFile (containerData , blockId , null );
187+ assertTrue (blockFile .exists ());
188+ assertEquals (BLOCK_SIZE , blockFile .length ());
189+ final String expectedMd5 = generateMd5 (keySize , writeBufferSize , blockFile );
190+
191+ final CheckedBiConsumer <SizeInBytes , String , Exception > streamRead = (readBufferSize , md5 )
192+ -> streamRead (keySize , readBufferSize , md5 , streamReadBucket , keyName );
193+ final CheckedBiConsumer <SizeInBytes , String , Exception > nonStreamRead = (readBufferSize , md5 )
194+ -> nonStreamRead (keySize , readBufferSize , md5 , nonStreamReadClient , volume , bucket , keyName );
195+ final CheckedBiConsumer <SizeInBytes , String , Exception > fileRead = (readBufferSize , md5 )
196+ -> fileRead (keySize , readBufferSize , md5 , blockFile );
197+ final List <CheckedBiConsumer <SizeInBytes , String , Exception >> operations
198+ = Arrays .asList (streamRead , nonStreamRead , fileRead );
199+
200+ final int [] ordering = combinations [i ];
147201 for (SizeInBytes readBufferSize : readBufferSizes ) {
148- runTestReadKey (keyName , keySize , readBufferSize , null , bucket );
149- runTestReadKey (keyName , keySize , readBufferSize , md5 , bucket );
202+ for (int j : ordering ) {
203+ final CheckedBiConsumer <SizeInBytes , String , Exception > op = operations .get (j );
204+ op .accept (readBufferSize , null );
205+ op .accept (readBufferSize , expectedMd5 );
206+ }
150207 }
151208 }
152209 }
153210 }
154211 }
155212
213+ static void streamRead (SizeInBytes keySize , SizeInBytes bufferSize , String expectedMD5 ,
214+ TestBucket bucket , String keyName ) throws Exception {
215+ try (KeyInputStream in = bucket .getKeyInputStream (keyName )) {
216+ assertTrue (in .isStreamBlockInputStream ());
217+ runTestReadKey (keySize , bufferSize , expectedMD5 , in );
218+ }
219+ }
220+
221+ static void nonStreamRead (SizeInBytes keySize , SizeInBytes bufferSize , String expectedMD5 ,
222+ OzoneClient nonStreamReadClient , String volume , String bucket , String keyName ) throws Exception {
223+ final ClientProtocol proxy = nonStreamReadClient .getProxy ();
224+ try (KeyInputStream in = (KeyInputStream ) proxy .getKey (volume , bucket , keyName ).getInputStream ()) {
225+ assertFalse (in .isStreamBlockInputStream ());
226+ runTestReadKey (keySize , bufferSize , expectedMD5 , in );
227+ }
228+ }
229+
230+ static void fileRead (SizeInBytes keySize , SizeInBytes bufferSize , String expectedMD5 ,
231+ File blockFile ) throws Exception {
232+ try (InputStream in = new BufferedInputStream (Files .newInputStream (blockFile .toPath ()), bufferSize .getSizeInt ())) {
233+ runTestReadKey (keySize , bufferSize , expectedMD5 , in );
234+ }
235+ }
236+
237+ static String generateMd5 (SizeInBytes keySize , SizeInBytes bufferSize , File blockFile ) throws Exception {
238+ try (InputStream in = new BufferedInputStream (Files .newInputStream (blockFile .toPath ()), bufferSize .getSizeInt ())) {
239+ return runTestReadKey ("generateMd5" , keySize , bufferSize , true , in );
240+ }
241+ }
242+
156243 static void print (String name , long keySizeByte , long elapsedNanos , SizeInBytes bufferSize , String computedMD5 ) {
157244 final double keySizeMb = keySizeByte * 1.0 / (1 << 20 );
158245 final double elapsedSeconds = elapsedNanos / 1_000_000_000.0 ;
159- System .out .printf ("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f MB, md5=%s)%n" ,
160- name , keySizeMb / elapsedSeconds , elapsedSeconds , bufferSize , keySizeMb , computedMD5 );
246+ if (computedMD5 == null ) {
247+ System .out .printf ("%16s: %8.2f MB/s (%7.3f s, buffer %16s, keySize %8.2f MB)" ,
248+ name , keySizeMb / elapsedSeconds , elapsedSeconds , bufferSize , keySizeMb );
249+ } else {
250+ System .out .printf (" md5=%s%n" , computedMD5 );
251+ }
161252 }
162253
163- static String createKey (OzoneBucket bucket , String keyName , SizeInBytes keySize , SizeInBytes bufferSize )
254+ static void createKey (OzoneBucket bucket , String keyName , SizeInBytes keySize , SizeInBytes bufferSize )
164255 throws Exception {
165256 final byte [] buffer = new byte [bufferSize .getSizeInt ()];
166257 ThreadLocalRandom .current ().nextBytes (buffer );
@@ -177,49 +268,45 @@ static String createKey(OzoneBucket bucket, String keyName, SizeInBytes keySize,
177268 }
178269 final long elapsedNanos = System .nanoTime () - startTime ;
179270
180- final MessageDigest md5 = MessageDigest .getInstance ("MD5" );
181271 for (long pos = 0 ; pos < keySizeByte ;) {
182272 final int writeSize = Math .toIntExact (Math .min (buffer .length , keySizeByte - pos ));
183- md5 .update (buffer , 0 , writeSize );
184273 pos += writeSize ;
185274 }
186275
187- final String computedMD5 = StringUtils .bytes2Hex (md5 .digest ());
188- print ("createStreamKey" , keySizeByte , elapsedNanos , bufferSize , computedMD5 );
189- return computedMD5 ;
276+ print ("createStreamKey" , keySizeByte , elapsedNanos , bufferSize , null );
190277 }
191278
192- private void runTestReadKey (String keyName , SizeInBytes keySize , SizeInBytes bufferSize , String expectedMD5 ,
193- TestBucket bucket ) throws Exception {
279+ static void runTestReadKey (SizeInBytes keySize , SizeInBytes bufferSize , String expectedMD5 ,
280+ InputStream in ) throws Exception {
281+ final String method = JavaUtils .getCallerStackTraceElement ().getMethodName ();
282+ final String computedMD5 = runTestReadKey (method , keySize , bufferSize , expectedMD5 != null , in );
283+ assertEquals (expectedMD5 , computedMD5 );
284+ }
285+
286+ static String runTestReadKey (String name , SizeInBytes keySize , SizeInBytes bufferSize , boolean generateMd5 ,
287+ InputStream in ) throws Exception {
194288 final long keySizeByte = keySize .getSize ();
195289 final MessageDigest md5 = MessageDigest .getInstance ("MD5" );
196290 // Read the data fully into a large enough byte array
197291 final byte [] buffer = new byte [bufferSize .getSizeInt ()];
198292 final long startTime = System .nanoTime ();
199- try (KeyInputStream keyInputStream = bucket .getKeyInputStream (keyName )) {
200- int pos = 0 ;
201- for (; pos < keySizeByte ;) {
202- final int read = keyInputStream .read (buffer , 0 , buffer .length );
203- if (read == -1 ) {
204- break ;
205- }
293+ int pos = 0 ;
294+ for (; pos < keySizeByte ;) {
295+ final int read = in .read (buffer , 0 , buffer .length );
296+ if (read == -1 ) {
297+ break ;
298+ }
206299
207- if (expectedMD5 != null ) {
208- md5 .update (buffer , 0 , read );
209- }
210- pos += read ;
300+ if (generateMd5 ) {
301+ md5 .update (buffer , 0 , read );
211302 }
212- assertEquals ( keySizeByte , pos ) ;
303+ pos += read ;
213304 }
305+ assertEquals (keySizeByte , pos );
214306 final long elapsedNanos = System .nanoTime () - startTime ;
215307
216- final String computedMD5 ;
217- if (expectedMD5 == null ) {
218- computedMD5 = null ;
219- } else {
220- computedMD5 = StringUtils .bytes2Hex (md5 .digest ());
221- assertEquals (expectedMD5 , computedMD5 );
222- }
223- print ("readStreamKey" , keySizeByte , elapsedNanos , bufferSize , computedMD5 );
308+ final String computedMD5 = generateMd5 ? StringUtils .bytes2Hex (md5 .digest ()) : null ;
309+ print (name , keySizeByte , elapsedNanos , bufferSize , computedMD5 );
310+ return computedMD5 ;
224311 }
225312}
0 commit comments