|
27 | 27 | import org.apache.kafka.clients.admin.AlterConfigOp; |
28 | 28 | import org.apache.kafka.clients.admin.Config; |
29 | 29 | import org.apache.kafka.clients.admin.ConfigEntry; |
| 30 | +import org.apache.kafka.clients.admin.ConsumerGroupDescription; |
| 31 | +import org.apache.kafka.clients.admin.MemberDescription; |
30 | 32 | import org.apache.kafka.clients.admin.NewPartitions; |
31 | 33 | import org.apache.kafka.clients.admin.NewTopic; |
32 | 34 | import org.apache.kafka.clients.admin.TopicDescription; |
@@ -95,6 +97,7 @@ class ClientAuthzIT extends AuthzIT { |
95 | 97 | public static final IntegerSerializer INTEGER_SERIALIZER = new IntegerSerializer(); |
96 | 98 | public static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer(); |
97 | 99 | public static final IntegerDeserializer INTEGER_DESERIALIZER = new IntegerDeserializer(); |
| 100 | + public static final int UUID_STRING_LENGTH = 36; |
98 | 101 |
|
99 | 102 | @Name("kafkaClusterWithAuthz") |
100 | 103 | static Admin kafkaClusterWithAuthzAdmin; |
@@ -349,6 +352,24 @@ public Outcome<Void> alterConfigs(ConfigResource.Type type, String resourceName, |
349 | 352 | } |
350 | 353 | return trace("alterConfigs", outcome); |
351 | 354 | } |
| 355 | + |
| 356 | + public Outcome<ConsumerGroupDescription> describeConsumerGroup(String groupId) { |
| 357 | + Outcome<ConsumerGroupDescription> outcome; |
| 358 | + try { |
| 359 | + outcome = new Success<>(admin |
| 360 | + .describeConsumerGroups(List.of(groupId)) |
| 361 | + .all() |
| 362 | + .get().get(groupId)); |
| 363 | + } |
| 364 | + catch (ExecutionException e) { |
| 365 | + outcome = Fail.of(e); |
| 366 | + } |
| 367 | + catch (InterruptedException e) { |
| 368 | + throw new RuntimeException(e); |
| 369 | + } |
| 370 | + trace("describeConsumerGroup", outcome.map(ClientAuthzIT::cleanConsumerGroupDescription)); |
| 371 | + return outcome; |
| 372 | + } |
352 | 373 | } |
353 | 374 |
|
354 | 375 | record ProducerContext<V>(String user, |
@@ -651,6 +672,7 @@ public void run(ClientFactory clientFactory) throws InterruptedException { |
651 | 672 | String setupUser = ALICE; |
652 | 673 | String producerUser = ALICE; |
653 | 674 | String consumerUser = ALICE; |
| 675 | + String adminUser = ALICE; |
654 | 676 |
|
655 | 677 | try (var setup = clientFactory.newAdmin(setupUser, Map.of(AdminClientConfig.CLIENT_ID_CONFIG, "setup"))) { |
656 | 678 | setup.createTopic(topicA); |
@@ -721,6 +743,12 @@ public void run(ClientFactory clientFactory) throws InterruptedException { |
721 | 743 | transactionalProducer.commitTransaction(); |
722 | 744 |
|
723 | 745 | } |
| 746 | + |
| 747 | + try (var admin = clientFactory.newAdmin(adminUser, Map.of(AdminClientConfig.CLIENT_ID_CONFIG, "admin"))) { |
| 748 | + ConsumerGroupDescription actual = admin.describeConsumerGroup(GROUP_2).value(); |
| 749 | + assertThat(actual.groupId()).isEqualTo(GROUP_2); |
| 750 | + assertThat(actual.members()).hasSize(1); |
| 751 | + } |
724 | 752 | } |
725 | 753 | setup.deleteTopic(topicA); |
726 | 754 | } |
@@ -921,6 +949,33 @@ static TopicPartitionInfo cleanTopicPartitionInfo(TopicPartitionInfo topicPartit |
921 | 949 | } |
922 | 950 | } |
923 | 951 |
|
| 952 | + static ConsumerGroupDescription cleanConsumerGroupDescription(ConsumerGroupDescription groupDescription) { |
| 953 | + return new ConsumerGroupDescription( |
| 954 | + groupDescription.groupId(), |
| 955 | + groupDescription.isSimpleConsumerGroup(), |
| 956 | + groupDescription.members().stream().map(ClientAuthzIT::cleanGroupMember).toList(), |
| 957 | + groupDescription.partitionAssignor(), |
| 958 | + groupDescription.type(), |
| 959 | + groupDescription.groupState(), |
| 960 | + cleanNode(groupDescription.coordinator()), |
| 961 | + groupDescription.authorizedOperations(), |
| 962 | + groupDescription.groupEpoch(), |
| 963 | + groupDescription.targetAssignmentEpoch()); |
| 964 | + } |
| 965 | + |
| 966 | + private static MemberDescription cleanGroupMember(MemberDescription memberDescription) { |
| 967 | + return new MemberDescription( |
| 968 | + memberDescription.consumerId().substring(0, |
| 969 | + memberDescription.consumerId().length() - UUID_STRING_LENGTH), |
| 970 | + memberDescription.groupInstanceId(), |
| 971 | + memberDescription.clientId(), |
| 972 | + memberDescription.host(), |
| 973 | + memberDescription.assignment(), |
| 974 | + memberDescription.targetAssignment(), |
| 975 | + memberDescription.memberEpoch(), |
| 976 | + memberDescription.upgraded()); |
| 977 | + } |
| 978 | + |
924 | 979 | static List<Node> cleanNodes(List<Node> nodes) { |
925 | 980 | return nodes.stream() |
926 | 981 | .map(ClientAuthzIT::cleanNode) |
|
0 commit comments