|
30 | 30 | import static org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.verifyAllDataChecksumsMatch; |
31 | 31 | import static org.apache.hadoop.ozone.container.common.ContainerTestUtils.createBlockMetaData; |
32 | 32 | import static org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet; |
| 33 | +import static org.apache.ozone.test.MetricsAsserts.assertCounter; |
| 34 | +import static org.apache.ozone.test.MetricsAsserts.getMetrics; |
33 | 35 | import static org.assertj.core.api.Assertions.assertThat; |
34 | 36 | import static org.junit.jupiter.api.Assertions.assertEquals; |
35 | 37 | import static org.junit.jupiter.api.Assertions.assertFalse; |
36 | 38 | import static org.junit.jupiter.api.Assertions.assertNotEquals; |
37 | 39 | import static org.junit.jupiter.api.Assertions.assertNotNull; |
38 | 40 | import static org.junit.jupiter.api.Assertions.assertNull; |
39 | 41 | import static org.junit.jupiter.api.Assertions.assertTrue; |
| 42 | +import static org.junit.jupiter.api.Assertions.fail; |
40 | 43 | import static org.mockito.ArgumentMatchers.any; |
41 | 44 | import static org.mockito.ArgumentMatchers.anyLong; |
42 | 45 | import static org.mockito.ArgumentMatchers.eq; |
|
53 | 56 | import com.google.common.collect.Sets; |
54 | 57 | import java.io.File; |
55 | 58 | import java.io.IOException; |
| 59 | +import java.nio.ByteBuffer; |
56 | 60 | import java.nio.file.Files; |
57 | 61 | import java.nio.file.Path; |
58 | 62 | import java.time.Clock; |
|
68 | 72 | import org.apache.commons.io.FileUtils; |
69 | 73 | import org.apache.hadoop.conf.StorageUnit; |
70 | 74 | import org.apache.hadoop.fs.FileUtil; |
| 75 | +import org.apache.hadoop.hdds.client.BlockID; |
71 | 76 | import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
72 | 77 | import org.apache.hadoop.hdds.protocol.DatanodeDetails; |
73 | 78 | import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; |
74 | 79 | import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; |
75 | 80 | import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; |
| 81 | +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; |
76 | 82 | import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State; |
77 | 83 | import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType; |
78 | 84 | import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; |
79 | 85 | import org.apache.hadoop.hdds.scm.ScmConfigKeys; |
80 | 86 | import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; |
81 | 87 | import org.apache.hadoop.hdds.scm.pipeline.PipelineID; |
82 | 88 | import org.apache.hadoop.hdds.security.token.TokenVerifier; |
| 89 | +import org.apache.hadoop.hdds.utils.io.RandomAccessFileChannel; |
| 90 | +import org.apache.hadoop.metrics2.MetricsRecordBuilder; |
| 91 | +import org.apache.hadoop.ozone.common.ChunkBuffer; |
| 92 | +import org.apache.hadoop.ozone.container.ContainerTestHelper; |
83 | 93 | import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager; |
84 | 94 | import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeWriter; |
85 | 95 | import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; |
86 | 96 | import org.apache.hadoop.ozone.container.common.ContainerTestUtils; |
| 97 | +import org.apache.hadoop.ozone.container.common.helpers.BlockData; |
| 98 | +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; |
87 | 99 | import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; |
88 | 100 | import org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils; |
89 | 101 | import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; |
|
94 | 106 | import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender; |
95 | 107 | import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; |
96 | 108 | import org.apache.hadoop.ozone.container.common.statemachine.StateContext; |
| 109 | +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; |
97 | 110 | import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; |
98 | 111 | import org.apache.hadoop.ozone.container.common.volume.HddsVolume; |
99 | 112 | import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; |
|
106 | 119 | import org.apache.hadoop.util.Time; |
107 | 120 | import org.apache.ozone.test.GenericTestUtils; |
108 | 121 | import org.apache.ozone.test.GenericTestUtils.LogCapturer; |
| 122 | +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; |
109 | 123 | import org.junit.jupiter.api.Assertions; |
110 | 124 | import org.junit.jupiter.api.BeforeEach; |
111 | 125 | import org.junit.jupiter.api.Test; |
@@ -938,4 +952,138 @@ private KeyValueHandler createKeyValueHandler(Path path) throws IOException { |
938 | 952 |
|
939 | 953 | return kvHandler; |
940 | 954 | } |
| 955 | + |
| 956 | + private static class HandlerWithVolumeSet { |
| 957 | + private final KeyValueHandler handler; |
| 958 | + private final MutableVolumeSet volumeSet; |
| 959 | + private final ContainerSet containerSet; |
| 960 | + |
| 961 | + HandlerWithVolumeSet(KeyValueHandler handler, MutableVolumeSet volumeSet, ContainerSet containerSet) { |
| 962 | + this.handler = handler; |
| 963 | + this.volumeSet = volumeSet; |
| 964 | + this.containerSet = containerSet; |
| 965 | + } |
| 966 | + |
| 967 | + KeyValueHandler getHandler() { |
| 968 | + return handler; |
| 969 | + } |
| 970 | + |
| 971 | + MutableVolumeSet getVolumeSet() { |
| 972 | + return volumeSet; |
| 973 | + } |
| 974 | + |
| 975 | + ContainerSet getContainerSet() { |
| 976 | + return containerSet; |
| 977 | + } |
| 978 | + } |
| 979 | + |
| 980 | + private HandlerWithVolumeSet createKeyValueHandlerWithVolumeSet(Path path) throws IOException { |
| 981 | + ContainerMetrics.remove(); |
| 982 | + final ContainerSet containerSet = newContainerSet(); |
| 983 | + final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); |
| 984 | + |
| 985 | + HddsVolume hddsVolume = new HddsVolume.Builder(path.toString()).conf(conf) |
| 986 | + .clusterID(CLUSTER_ID).datanodeUuid(DATANODE_UUID) |
| 987 | + .volumeSet(volumeSet) |
| 988 | + .build(); |
| 989 | + hddsVolume.format(CLUSTER_ID); |
| 990 | + hddsVolume.createWorkingDir(CLUSTER_ID, null); |
| 991 | + hddsVolume.createTmpDirs(CLUSTER_ID); |
| 992 | + when(volumeSet.getVolumesList()).thenReturn(Collections.singletonList(hddsVolume)); |
| 993 | + |
| 994 | + final KeyValueHandler kvHandler = ContainerTestUtils.getKeyValueHandler(conf, |
| 995 | + DATANODE_UUID, containerSet, volumeSet); |
| 996 | + kvHandler.setClusterID(CLUSTER_ID); |
| 997 | + hddsVolume.getVolumeInfoStats().unregister(); |
| 998 | + hddsVolume.getVolumeIOStats().unregister(); |
| 999 | + |
| 1000 | + ContainerController controller = new ContainerController(containerSet, |
| 1001 | + Collections.singletonMap(ContainerType.KeyValueContainer, kvHandler)); |
| 1002 | + OnDemandContainerScanner onDemandScanner = new OnDemandContainerScanner( |
| 1003 | + conf.getObject(ContainerScannerConfiguration.class), controller); |
| 1004 | + containerSet.registerOnDemandScanner(onDemandScanner); |
| 1005 | + |
| 1006 | + return new HandlerWithVolumeSet(kvHandler, volumeSet, containerSet); |
| 1007 | + } |
| 1008 | + |
| 1009 | + @Test |
| 1010 | + public void testReadBlockMetrics() throws Exception { |
| 1011 | + Path testDir = Files.createTempDirectory("testReadBlockMetrics"); |
| 1012 | + RandomAccessFileChannel blockFile = null; |
| 1013 | + try { |
| 1014 | + conf.set(OZONE_SCM_CONTAINER_LAYOUT_KEY, ContainerLayoutVersion.FILE_PER_BLOCK.name()); |
| 1015 | + HandlerWithVolumeSet handlerWithVolume = createKeyValueHandlerWithVolumeSet(testDir); |
| 1016 | + KeyValueHandler kvHandler = handlerWithVolume.getHandler(); |
| 1017 | + MutableVolumeSet volumeSet = handlerWithVolume.getVolumeSet(); |
| 1018 | + ContainerSet containerSet = handlerWithVolume.getContainerSet(); |
| 1019 | + |
| 1020 | + long containerID = ContainerTestHelper.getTestContainerID(); |
| 1021 | + KeyValueContainerData containerData = new KeyValueContainerData( |
| 1022 | + containerID, ContainerLayoutVersion.FILE_PER_BLOCK, |
| 1023 | + (long) StorageUnit.GB.toBytes(1), UUID.randomUUID().toString(), |
| 1024 | + DATANODE_UUID); |
| 1025 | + KeyValueContainer container = new KeyValueContainer(containerData, conf); |
| 1026 | + container.create(volumeSet, new RoundRobinVolumeChoosingPolicy(), CLUSTER_ID); |
| 1027 | + containerSet.addContainer(container); |
| 1028 | + |
| 1029 | + BlockID blockID = ContainerTestHelper.getTestBlockID(containerID); |
| 1030 | + BlockData blockData = new BlockData(blockID); |
| 1031 | + ChunkInfo chunkInfo = new ChunkInfo("chunk1", 0, 1024); |
| 1032 | + blockData.addChunk(chunkInfo.getProtoBufMessage()); |
| 1033 | + kvHandler.getBlockManager().putBlock(container, blockData); |
| 1034 | + |
| 1035 | + ChunkBuffer data = ChunkBuffer.wrap(ByteBuffer.allocate(1024)); |
| 1036 | + kvHandler.getChunkManager().writeChunk(container, blockID, chunkInfo, data, |
| 1037 | + DispatcherContext.getHandleWriteChunk()); |
| 1038 | + |
| 1039 | + ContainerCommandRequestProto readBlockRequest = |
| 1040 | + ContainerCommandRequestProto.newBuilder() |
| 1041 | + .setCmdType(ContainerProtos.Type.ReadBlock) |
| 1042 | + .setContainerID(containerID) |
| 1043 | + .setDatanodeUuid(DATANODE_UUID) |
| 1044 | + .setReadBlock(ContainerProtos.ReadBlockRequestProto.newBuilder() |
| 1045 | + .setBlockID(blockID.getDatanodeBlockIDProtobuf()) |
| 1046 | + .setOffset(0) |
| 1047 | + .setLength(1024) |
| 1048 | + .build()) |
| 1049 | + .build(); |
| 1050 | + |
| 1051 | + final AtomicInteger responseCount = new AtomicInteger(0); |
| 1052 | + |
| 1053 | + StreamObserver<ContainerCommandResponseProto> streamObserver = |
| 1054 | + new StreamObserver<ContainerCommandResponseProto>() { |
| 1055 | + @Override |
| 1056 | + public void onNext(ContainerCommandResponseProto response) { |
| 1057 | + assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); |
| 1058 | + responseCount.incrementAndGet(); |
| 1059 | + } |
| 1060 | + |
| 1061 | + @Override |
| 1062 | + public void onError(Throwable t) { |
| 1063 | + fail("ReadBlock failed", t); |
| 1064 | + } |
| 1065 | + |
| 1066 | + @Override |
| 1067 | + public void onCompleted() { |
| 1068 | + } |
| 1069 | + }; |
| 1070 | + |
| 1071 | + blockFile = new RandomAccessFileChannel(); |
| 1072 | + ContainerCommandResponseProto response = kvHandler.readBlock( |
| 1073 | + readBlockRequest, container, blockFile, streamObserver); |
| 1074 | + |
| 1075 | + assertNull(response, "ReadBlock should return null on success"); |
| 1076 | + assertTrue(responseCount.get() > 0, "Should receive at least one response"); |
| 1077 | + |
| 1078 | + MetricsRecordBuilder containerMetrics = getMetrics( |
| 1079 | + ContainerMetrics.STORAGE_CONTAINER_METRICS); |
| 1080 | + assertCounter("bytesReadBlock", 1024L, containerMetrics); |
| 1081 | + } finally { |
| 1082 | + if (blockFile != null) { |
| 1083 | + blockFile.close(); |
| 1084 | + } |
| 1085 | + FileUtils.deleteDirectory(testDir.toFile()); |
| 1086 | + ContainerMetrics.remove(); |
| 1087 | + } |
| 1088 | + } |
941 | 1089 | } |
0 commit comments