66
77package io .kroxylicious .systemtests ;
88
9+ import java .nio .charset .StandardCharsets ;
910import java .time .Duration ;
1011import java .util .List ;
12+ import java .util .Map ;
1113
14+ import org .apache .kafka .common .protocol .ApiKeys ;
1215import org .junit .jupiter .api .AfterAll ;
1316import org .junit .jupiter .api .BeforeAll ;
1417import org .junit .jupiter .api .BeforeEach ;
2932import io .kroxylicious .systemtests .templates .metrics .ScraperTemplates ;
3033import io .kroxylicious .systemtests .templates .strimzi .KafkaNodePoolTemplates ;
3134import io .kroxylicious .systemtests .templates .strimzi .KafkaTemplates ;
35+ import io .kroxylicious .test .tester .SimpleMetric ;
36+
37+ import edu .umd .cs .findbugs .annotations .NonNull ;
3238
3339import static io .kroxylicious .systemtests .k8s .KubeClusterResource .kubeClient ;
34- import static io .kroxylicious .systemtests .utils .MetricsUtils .assertMetricValue ;
35- import static io .kroxylicious .systemtests .utils .MetricsUtils .assertMetricValueCount ;
36- import static io .kroxylicious .systemtests .utils .MetricsUtils .assertMetricValueHigherThan ;
40+ import static io .kroxylicious .test .tester .SimpleMetricAssert .assertThat ;
3741import static org .junit .jupiter .api .Assertions .assertAll ;
3842
3943/**
@@ -43,61 +47,138 @@ class MetricsST extends AbstractST {
4347 private static final Logger LOGGER = LoggerFactory .getLogger (MetricsST .class );
4448 private final String clusterName = "my-cluster" ;
4549 protected static final String BROKER_NODE_NAME = "kafka" ;
46- private static final String MESSAGE = "Hello-world" ;
50+ private static final String RECORD_VALUE = "Hello-world" ;
4751 private MetricsCollector kroxyliciousCollector ;
4852 private String bootstrap ;
4953 private KroxyliciousOperator kroxyliciousOperator ;
5054
5155 @ Test
52- void kroxyliciousMetricsBeforeSendingMessages () {
53- LOGGER .atInfo ().setMessage ("Metrics: {}" ).addArgument (kroxyliciousCollector .getCollectedData ().values ()).log ();
54- assertAll ("Checking the presence of the metrics" ,
55- () -> assertMetricValueCount (kroxyliciousCollector , "kroxylicious_inbound_downstream_messages_total" , 1 ),
56- () -> assertMetricValueCount (kroxyliciousCollector , "kroxylicious_inbound_downstream_decoded_messages_total" , 1 ));
57- assertAll ("Checking the value of the metrics" ,
58- () -> assertMetricValue (kroxyliciousCollector , "kroxylicious_inbound_downstream_messages_total" , 0 ),
59- () -> assertMetricValue (kroxyliciousCollector , "kroxylicious_inbound_downstream_decoded_messages_total" , 0 ));
60- }
61-
62- @ Test
63- void kroxyliciousDownstreamMessages (String namespace ) {
64- int numberOfMessages = 1 ;
56+ void kroxyliciousMessageTotals (String namespace ) {
57+ int numberOfRecords = 1 ;
6558
6659 LOGGER .atInfo ().setMessage ("And a kafka Topic named {}" ).addArgument (topicName ).log ();
6760 KafkaSteps .createTopic (namespace , topicName , bootstrap , 1 , 1 );
6861
69- LOGGER .atInfo ().setMessage ("When {} messages '{}' are sent to the topic '{}'" ).addArgument (numberOfMessages ).addArgument (MESSAGE ).addArgument (topicName ).log ();
70- KroxyliciousSteps .produceMessages (namespace , topicName , bootstrap , MESSAGE , numberOfMessages );
62+ LOGGER .atInfo ().setMessage ("When {} messages '{}' are sent to the topic '{}'" ).addArgument (numberOfRecords ).addArgument (RECORD_VALUE ).addArgument (topicName )
63+ .log ();
64+ KroxyliciousSteps .produceMessages (namespace , topicName , bootstrap , RECORD_VALUE , numberOfRecords );
7165 LOGGER .atInfo ().setMessage ("Then the messages are consumed" ).log ();
72- List <ConsumerRecord > result = KroxyliciousSteps .consumeMessages (namespace , topicName , bootstrap , numberOfMessages , Duration .ofMinutes (2 ));
66+ List <ConsumerRecord > result = KroxyliciousSteps .consumeMessages (namespace , topicName , bootstrap , numberOfRecords , Duration .ofMinutes (2 ));
7367 LOGGER .atInfo ().setMessage ("Received: {}" ).addArgument (result ).log ();
7468 kroxyliciousCollector .collectMetricsFromPods ();
7569 LOGGER .atInfo ().setMessage ("Metrics: {}" ).addArgument (kroxyliciousCollector .getCollectedData ().values ()).log ();
70+
71+ var parsedMetrics = convertToSimpleMetrics (kroxyliciousCollector );
72+
73+ var produceLabels = Map .of ("api_key" , ApiKeys .PRODUCE .name (), "node_id" , "0" );
74+ var fetchLabels = Map .of ("api_key" , ApiKeys .FETCH .name (), "node_id" , "0" );
75+
7676 assertAll (
77- () -> assertMetricValueCount (kroxyliciousCollector , "kroxylicious_inbound_downstream_messages_total" , 1 ),
78- () -> assertMetricValueHigherThan (kroxyliciousCollector , "kroxylicious_inbound_downstream_messages_total" , 0 ),
79- () -> assertMetricValueHigherThan (kroxyliciousCollector , "kroxylicious_inbound_downstream_decoded_messages_total" , 0 ));
77+ () -> assertThat (parsedMetrics )
78+ .withUniqueMetric ("kroxylicious_client_to_proxy_request_total" , produceLabels )
79+ .value ()
80+ .isOne () /* We send one record, so there has to be exactly one produce message */ ,
81+
82+ () -> assertThat (parsedMetrics )
83+ .withUniqueMetric ("kroxylicious_proxy_to_server_request_total" , produceLabels )
84+ .value ()
85+ .isOne (),
86+
87+ () -> assertThat (parsedMetrics )
88+ .withUniqueMetric ("kroxylicious_server_to_proxy_response_total" , produceLabels )
89+ .value ()
90+ .isOne (),
91+
92+ () -> assertThat (parsedMetrics )
93+ .withUniqueMetric ("kroxylicious_proxy_to_client_response_total" , produceLabels )
94+ .value ()
95+ .isOne (),
96+
97+ () -> assertThat (parsedMetrics )
98+ .withUniqueMetric ("kroxylicious_client_to_proxy_request_total" , fetchLabels )
99+ .value ()
100+ .isGreaterThanOrEqualTo (1 ), /* We expect to consume 1 record, but the client may call fetch more than once */
101+
102+ () -> assertThat (parsedMetrics )
103+ .withUniqueMetric ("kroxylicious_proxy_to_server_request_total" , fetchLabels )
104+ .value ()
105+ .isGreaterThanOrEqualTo (1 ),
106+
107+ () -> assertThat (parsedMetrics )
108+ .withUniqueMetric ("kroxylicious_server_to_proxy_response_total" , fetchLabels )
109+ .value ()
110+ .isGreaterThanOrEqualTo (1 ),
111+
112+ () -> assertThat (parsedMetrics )
113+ .withUniqueMetric ("kroxylicious_proxy_to_client_response_total" , fetchLabels )
114+ .value ()
115+ .isGreaterThanOrEqualTo (1 ));
80116 }
81117
82118 @ Test
83- void kroxyliciousPayloadSize (String namespace ) {
119+ void kroxyliciousMessageSize (String namespace ) {
84120 int numberOfMessages = 1 ;
85121
86122 LOGGER .atInfo ().setMessage ("And a kafka Topic named {}" ).addArgument (topicName ).log ();
87123 KafkaSteps .createTopic (namespace , topicName , bootstrap , 1 , 1 );
88124
89- LOGGER .atInfo ().setMessage ("When {} messages '{}' are sent to the topic '{}'" ).addArgument (numberOfMessages ).addArgument (MESSAGE ).addArgument (topicName ).log ();
90- KroxyliciousSteps .produceMessages (namespace , topicName , bootstrap , MESSAGE , numberOfMessages );
125+ LOGGER .atInfo ().setMessage ("When {} messages '{}' are sent to the topic '{}'" ).addArgument (numberOfMessages ).addArgument (RECORD_VALUE ).addArgument (topicName )
126+ .log ();
127+ KroxyliciousSteps .produceMessages (namespace , topicName , bootstrap , RECORD_VALUE , numberOfMessages );
91128 LOGGER .atInfo ().setMessage ("Then the messages are consumed" ).log ();
92129 List <ConsumerRecord > result = KroxyliciousSteps .consumeMessages (namespace , topicName , bootstrap , numberOfMessages , Duration .ofMinutes (2 ));
93130 LOGGER .atInfo ().setMessage ("Received: {}" ).addArgument (result ).log ();
94131 kroxyliciousCollector .collectMetricsFromPods ();
95132 LOGGER .atInfo ().setMessage ("Metrics: {}" ).addArgument (kroxyliciousCollector .getCollectedData ().values ()).log ();
96- assertAll (
97- () -> assertMetricValueCount (kroxyliciousCollector , "kroxylicious_payload_size_bytes_count" , 1 ),
98- () -> assertMetricValueCount (kroxyliciousCollector , "kroxylicious_payload_size_bytes_sum" , 1 ),
99- () -> assertMetricValueCount (kroxyliciousCollector , "kroxylicious_payload_size_bytes_max" , 1 ));
100133
134+ int recordValueSize = RECORD_VALUE .getBytes (StandardCharsets .UTF_8 ).length ;
135+ var parsedMetrics = convertToSimpleMetrics (kroxyliciousCollector );
136+
137+ var produceLabels = Map .of ("api_key" , ApiKeys .PRODUCE .name (), "node_id" , "0" );
138+ var fetchLabels = Map .of ("api_key" , ApiKeys .FETCH .name (), "node_id" , "0" );
139+
140+ assertAll (
141+ () -> assertThat (parsedMetrics )
142+ .withUniqueMetric ("kroxylicious_client_to_proxy_request_size_bytes_count" , produceLabels )
143+ .value ()
144+ .isOne (), /* We send one record, so there has to be exactly one produce message */
145+
146+ () -> assertThat (parsedMetrics )
147+ .withUniqueMetric ("kroxylicious_client_to_proxy_request_size_bytes_sum" , produceLabels )
148+ .value ()
149+ .isGreaterThan (recordValueSize ),
150+
151+ () -> assertThat (parsedMetrics )
152+ .withUniqueMetric ("kroxylicious_proxy_to_server_request_size_bytes_count" , produceLabels )
153+ .value ()
154+ .isOne (),
155+
156+ () -> assertThat (parsedMetrics )
157+ .withUniqueMetric ("kroxylicious_proxy_to_server_request_size_bytes_sum" , produceLabels )
158+ .value ()
159+ .isGreaterThan (recordValueSize ),
160+
161+ () -> assertThat (parsedMetrics )
162+ .withUniqueMetric ("kroxylicious_server_to_proxy_response_size_bytes_count" , fetchLabels )
163+ .value ()
164+ .isGreaterThanOrEqualTo (1 ), /* We expect to consume 1 record, but the client may call fetch more than once */
165+
166+ () -> assertThat (parsedMetrics )
167+ .withUniqueMetric ("kroxylicious_server_to_proxy_response_size_bytes_sum" , fetchLabels )
168+ .value ()
169+ .isGreaterThan (recordValueSize ),
170+
171+ () -> assertThat (parsedMetrics )
172+ .withUniqueMetric ("kroxylicious_proxy_to_client_response_size_bytes_count" , fetchLabels )
173+ .value ()
174+ .isGreaterThanOrEqualTo (1 ),
175+
176+ () -> assertThat (parsedMetrics )
177+ .withUniqueMetric ("kroxylicious_proxy_to_client_response_size_bytes_sum" , fetchLabels )
178+ .value ()
179+ .isGreaterThan (recordValueSize )
180+
181+ );
101182 }
102183
103184 @ BeforeAll
@@ -151,4 +232,11 @@ void beforeEach(String namespace) throws InterruptedException {
151232 .build ();
152233 kroxyliciousCollector .collectMetricsFromPods ();
153234 }
235+
236+ @ NonNull
237+ private static List <SimpleMetric > convertToSimpleMetrics (MetricsCollector collector ) {
238+ return collector .getCollectedData ().values ().stream ()
239+ .map (SimpleMetric ::parse )
240+ .flatMap (List ::stream ).toList ();
241+ }
154242}
0 commit comments