code = new HashSet<>();
- code.add(17);
- code.add(25);
- code.add(215);
- code.add(200);
- code.add(207);
-
- for (int i = 0; i < 400; i++) {
- boolean boo = Permission.needAdminPerm(i);
- if (boo) {
- Assert.assertTrue(code.contains(i));
- }
- }
- }
-
- @Test
- public void AclExceptionTest(){
- AclException aclException = new AclException("CAL_SIGNATURE_FAILED",10015);
- AclException aclExceptionWithMessage = new AclException("CAL_SIGNATURE_FAILED",10015,"CAL_SIGNATURE_FAILED Exception");
- Assert.assertEquals(aclException.getCode(),10015);
- Assert.assertEquals(aclExceptionWithMessage.getStatus(),"CAL_SIGNATURE_FAILED");
- aclException.setCode(10016);
- Assert.assertEquals(aclException.getCode(),10016);
- aclException.setStatus("netaddress examine scope Exception netaddress");
- Assert.assertEquals(aclException.getStatus(),"netaddress examine scope Exception netaddress");
- }
-
- @Test
- public void checkResourcePermsNormalTest() {
- Permission.checkResourcePerms(null);
- Permission.checkResourcePerms(new ArrayList<>());
- Permission.checkResourcePerms(Arrays.asList("topicA=PUB"));
- Permission.checkResourcePerms(Arrays.asList("topicA=PUB", "topicB=SUB", "topicC=PUB|SUB"));
- }
-
- @Test(expected = AclException.class)
- public void checkResourcePermsExceptionTest1() {
- Permission.checkResourcePerms(Arrays.asList("topicA"));
- }
-
- @Test(expected = AclException.class)
- public void checkResourcePermsExceptionTest2() {
- Permission.checkResourcePerms(Arrays.asList("topicA="));
- }
-
- @Test(expected = AclException.class)
- public void checkResourcePermsExceptionTest3() {
- Permission.checkResourcePerms(Arrays.asList("topicA=DENY1"));
- }
-}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java
deleted file mode 100644
index a1a4bde4f87..00000000000
--- a/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.acl.common;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Properties;
-
-public class SessionCredentialsTest {
-
- @Test
- public void equalsTest(){
- SessionCredentials sessionCredentials=new SessionCredentials("RocketMQ","12345678");
- sessionCredentials.setSecurityToken("abcd");
- SessionCredentials other=new SessionCredentials("RocketMQ","12345678","abcd");
- Assert.assertTrue(sessionCredentials.equals(other));
- }
-
- @Test
- public void updateContentTest(){
- SessionCredentials sessionCredentials=new SessionCredentials();
- Properties properties=new Properties();
- properties.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
- properties.setProperty(SessionCredentials.SECRET_KEY,"12345678");
- properties.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
- sessionCredentials.updateContent(properties);
- }
-
- @Test
- public void SessionCredentialHashCodeTest(){
- SessionCredentials sessionCredentials=new SessionCredentials();
- Properties properties=new Properties();
- properties.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
- properties.setProperty(SessionCredentials.SECRET_KEY,"12345678");
- properties.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
- sessionCredentials.updateContent(properties);
- Assert.assertEquals(sessionCredentials.hashCode(),353652211);
- }
-
- @Test
- public void SessionCredentialEqualsTest(){
- SessionCredentials sessionCredential1 =new SessionCredentials();
- Properties properties1=new Properties();
- properties1.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
- properties1.setProperty(SessionCredentials.SECRET_KEY,"12345678");
- properties1.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
- sessionCredential1.updateContent(properties1);
-
- SessionCredentials sessionCredential2 =new SessionCredentials();
- Properties properties2=new Properties();
- properties2.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
- properties2.setProperty(SessionCredentials.SECRET_KEY,"12345678");
- properties2.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
- sessionCredential2.updateContent(properties2);
-
- Assert.assertTrue(sessionCredential2.equals(sessionCredential1));
- sessionCredential2.setSecretKey("1234567899");
- sessionCredential2.setSignature("1234567899");
- Assert.assertFalse(sessionCredential2.equals(sessionCredential1));
- }
-
- @Test
- public void SessionCredentialToStringTest(){
- SessionCredentials sessionCredential1 =new SessionCredentials();
- Properties properties1=new Properties();
- properties1.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
- properties1.setProperty(SessionCredentials.SECRET_KEY,"12345678");
- properties1.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
- sessionCredential1.updateContent(properties1);
-
- Assert.assertEquals(sessionCredential1.toString(),
- "SessionCredentials [accessKey=RocketMQ, secretKey=12345678, signature=null, SecurityToken=abcd]");
- }
-
-
-}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java
deleted file mode 100644
index eebc86d4258..00000000000
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessControlFlowTest.java
+++ /dev/null
@@ -1,396 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.acl.plain;
-
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.AclConstants;
-import org.apache.rocketmq.acl.common.AclException;
-import org.apache.rocketmq.acl.common.AclUtils;
-import org.apache.rocketmq.acl.common.SessionCredentials;
-import org.apache.rocketmq.common.AclConfig;
-import org.apache.rocketmq.common.PlainAccessConfig;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * In this class, we'll test the following scenarios, each containing several consecutive operations on ACL,
- *
like updating and deleting ACL, changing config files and checking validations.
- *
Case 1: Only conf/plain_acl.yml exists;
- *
Case 2: Only conf/acl/plain_acl.yml exists;
- *
Case 3: Both conf/plain_acl.yml and conf/acl/plain_acl.yml exists.
- */
-public class PlainAccessControlFlowTest {
- public static final String DEFAULT_TOPIC = "topic-acl";
-
- public static final String DEFAULT_GROUP = "GID_acl";
-
- public static final String DEFAULT_PRODUCER_AK = "ak11111";
- public static final String DEFAULT_PRODUCER_SK = "1234567";
-
- public static final String DEFAULT_CONSUMER_SK = "7654321";
- public static final String DEFAULT_CONSUMER_AK = "ak22222";
-
- public static final String DEFAULT_GLOBAL_WHITE_ADDR = "172.16.123.123";
- public static final List DEFAULT_GLOBAL_WHITE_ADDRS_LIST = Arrays.asList(DEFAULT_GLOBAL_WHITE_ADDR);
-
- public static final Path EMPTY_ACL_FOLDER_PLAIN_ACL_YML_PATH = Paths.get("src/test/resources/empty_acl_folder_conf/conf/plain_acl.yml");
- private static final Path EMPTY_ACL_FOLDER_PLAIN_ACL_YML_BAK_PATH = Paths.get("src/test/resources/empty_acl_folder_conf/conf/plain_acl.yml.bak");
-
-
- public static final Path ONLY_ACL_FOLDER_DELETE_YML_PATH = Paths.get("src/test/resources/only_acl_folder_conf/conf/plain_acl.yml");
- private static final Path ONLY_ACL_FOLDER_PLAIN_ACL_YML_PATH = Paths.get("src/test/resources/only_acl_folder_conf/conf/acl/plain_acl.yml");
- private static final Path ONLY_ACL_FOLDER_PLAIN_ACL_YML_BAK_PATH = Paths.get("src/test/resources/only_acl_folder_conf/conf/acl/plain_acl.yml.bak");
-
- private static final Path BOTH_ACL_FOLDER_PLAIN_ACL_YML_PATH = Paths.get("src/test/resources/both_acl_file_folder_conf/conf/acl/plain_acl.yml");
- private static final Path BOTH_ACL_FOLDER_PLAIN_ACL_YML_BAK_PATH = Paths.get("src/test/resources/both_acl_file_folder_conf/conf/acl/plain_acl.yml.bak");
- private static final Path BOTH_CONF_FOLDER_PLAIN_ACL_YML_PATH = Paths.get("src/test/resources/both_acl_file_folder_conf/conf/plain_acl.yml");
- private static final Path BOTH_CONF_FOLDER_PLAIN_ACL_YML_BAK_PATH = Paths.get("src/test/resources/both_acl_file_folder_conf/conf/plain_acl.yml.bak");
-
- private boolean isCheckCase1 = false;
- private boolean isCheckCase2 = false;
- private boolean isCheckCase3 = false;
-
-
-
- /**
- * backup ACL config files
- *
- * @throws IOException
- */
- @Before
- public void prepare() throws IOException {
-
- Files.copy(EMPTY_ACL_FOLDER_PLAIN_ACL_YML_PATH,
- EMPTY_ACL_FOLDER_PLAIN_ACL_YML_BAK_PATH,
- StandardCopyOption.REPLACE_EXISTING);
-
-
- Files.copy(ONLY_ACL_FOLDER_PLAIN_ACL_YML_PATH,
- ONLY_ACL_FOLDER_PLAIN_ACL_YML_BAK_PATH,
- StandardCopyOption.REPLACE_EXISTING);
-
-
- Files.copy(BOTH_ACL_FOLDER_PLAIN_ACL_YML_PATH,
- BOTH_ACL_FOLDER_PLAIN_ACL_YML_BAK_PATH,
- StandardCopyOption.REPLACE_EXISTING);
- Files.copy(BOTH_CONF_FOLDER_PLAIN_ACL_YML_PATH,
- BOTH_CONF_FOLDER_PLAIN_ACL_YML_BAK_PATH,
- StandardCopyOption.REPLACE_EXISTING);
-
- }
-
- /**
- * restore ACL config files
- *
- * @throws IOException
- */
- @After
- public void restore() throws IOException {
- if (this.isCheckCase1) {
- Files.copy(EMPTY_ACL_FOLDER_PLAIN_ACL_YML_BAK_PATH,
- EMPTY_ACL_FOLDER_PLAIN_ACL_YML_PATH,
- StandardCopyOption.REPLACE_EXISTING);
- }
-
- if (this.isCheckCase2) {
- Files.copy(ONLY_ACL_FOLDER_PLAIN_ACL_YML_BAK_PATH,
- ONLY_ACL_FOLDER_PLAIN_ACL_YML_PATH,
- StandardCopyOption.REPLACE_EXISTING);
- Files.deleteIfExists(ONLY_ACL_FOLDER_DELETE_YML_PATH);
- }
-
- if (this.isCheckCase3) {
- Files.copy(BOTH_ACL_FOLDER_PLAIN_ACL_YML_BAK_PATH,
- BOTH_ACL_FOLDER_PLAIN_ACL_YML_PATH,
- StandardCopyOption.REPLACE_EXISTING);
- Files.copy(BOTH_CONF_FOLDER_PLAIN_ACL_YML_BAK_PATH,
- BOTH_CONF_FOLDER_PLAIN_ACL_YML_PATH,
- StandardCopyOption.REPLACE_EXISTING);
- }
-
- }
-
- @Test
- public void testEmptyAclFolderCase() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
- this.isCheckCase1 = true;
- System.setProperty("rocketmq.home.dir", Paths.get("src/test/resources/empty_acl_folder_conf").toString());
- PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
-
- checkDefaultAclFileExists(plainAccessValidator);
- testValidationAfterConsecutiveUpdates(plainAccessValidator);
- testValidationAfterConfigFileChanged(plainAccessValidator);
-
- }
-
- @Test
- public void testOnlyAclFolderCase() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
- this.isCheckCase2 = true;
- System.setProperty("rocketmq.home.dir", Paths.get("src/test/resources/only_acl_folder_conf").toString());
- PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
-
- checkDefaultAclFileExists(plainAccessValidator);
- testValidationAfterConsecutiveUpdates(plainAccessValidator);
- testValidationAfterConfigFileChanged(plainAccessValidator);
- }
-
-
- @Test
- public void testBothAclFileAndFolderCase() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
- this.isCheckCase3 = true;
- System.setProperty("rocketmq.home.dir", Paths.get("src/test/resources/both_acl_file_folder_conf").toString());
- PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
-
- checkDefaultAclFileExists(plainAccessValidator);
- testValidationAfterConsecutiveUpdates(plainAccessValidator);
- testValidationAfterConfigFileChanged(plainAccessValidator);
-
- }
-
- private void testValidationAfterConfigFileChanged(PlainAccessValidator plainAccessValidator) throws NoSuchFieldException, IllegalAccessException, InterruptedException {
- PlainAccessConfig producerAccessConfig = generateProducerAccessConfig();
- PlainAccessConfig consumerAccessConfig = generateConsumerAccessConfig();
- List plainAccessConfigList = new LinkedList<>();
- plainAccessConfigList.add(producerAccessConfig);
- plainAccessConfigList.add(consumerAccessConfig);
- Map ymlMap = new HashMap<>();
- ymlMap.put(AclConstants.CONFIG_ACCOUNTS, plainAccessConfigList);
-
- // write prepared PlainAccessConfigs to file
- final String aclConfigFile = System.getProperty("rocketmq.home.dir") + File.separator + "conf/plain_acl.yml";
- AclUtils.writeDataObject(aclConfigFile, ymlMap);
-
- loadConfigFile(plainAccessValidator, aclConfigFile);
-
- // check if added successfully
- final AclConfig allAclConfig = plainAccessValidator.getAllAclConfig();
- final List plainAccessConfigs = allAclConfig.getPlainAccessConfigs();
- checkPlainAccessConfig(producerAccessConfig, plainAccessConfigs);
- checkPlainAccessConfig(consumerAccessConfig, plainAccessConfigs);
-
- //delete consumer account
- plainAccessConfigList.remove(consumerAccessConfig);
- AclUtils.writeDataObject(aclConfigFile, ymlMap);
-
- loadConfigFile(plainAccessValidator, aclConfigFile);
-
- // sending messages will be successful using prepared credentials
- SessionCredentials producerCredential = new SessionCredentials(DEFAULT_PRODUCER_AK, DEFAULT_PRODUCER_SK);
- AclClientRPCHook producerHook = new AclClientRPCHook(producerCredential);
- validateSendMessage(RequestCode.SEND_MESSAGE, DEFAULT_TOPIC, producerHook, "", plainAccessValidator);
- validateSendMessage(RequestCode.SEND_MESSAGE_V2, DEFAULT_TOPIC, producerHook, "", plainAccessValidator);
-
- // consuming messages will be failed for account has been deleted
- SessionCredentials consumerCredential = new SessionCredentials(DEFAULT_CONSUMER_AK, DEFAULT_CONSUMER_SK);
- AclClientRPCHook consumerHook = new AclClientRPCHook(consumerCredential);
- boolean isConsumeFailed = false;
- try {
- validatePullMessage(DEFAULT_TOPIC, DEFAULT_GROUP, consumerHook, "", plainAccessValidator);
- } catch (AclException e) {
- isConsumeFailed = true;
- }
- Assert.assertTrue("Message should not be consumed after account deleted", isConsumeFailed);
-
- }
-
-
- private void testValidationAfterConsecutiveUpdates(PlainAccessValidator plainAccessValidator) throws NoSuchFieldException, IllegalAccessException {
- PlainAccessConfig producerAccessConfig = generateProducerAccessConfig();
- plainAccessValidator.updateAccessConfig(producerAccessConfig);
-
- PlainAccessConfig consumerAccessConfig = generateConsumerAccessConfig();
- plainAccessValidator.updateAccessConfig(consumerAccessConfig);
-
- plainAccessValidator.updateGlobalWhiteAddrsConfig(DEFAULT_GLOBAL_WHITE_ADDRS_LIST, null);
-
- // check if the above config updated successfully
- final AclConfig allAclConfig = plainAccessValidator.getAllAclConfig();
- final List plainAccessConfigs = allAclConfig.getPlainAccessConfigs();
- checkPlainAccessConfig(producerAccessConfig, plainAccessConfigs);
- checkPlainAccessConfig(consumerAccessConfig, plainAccessConfigs);
-
- Assert.assertEquals(DEFAULT_GLOBAL_WHITE_ADDRS_LIST, allAclConfig.getGlobalWhiteAddrs());
-
- // check sending and consuming messages
- SessionCredentials producerCredential = new SessionCredentials(DEFAULT_PRODUCER_AK, DEFAULT_PRODUCER_SK);
- AclClientRPCHook producerHook = new AclClientRPCHook(producerCredential);
- validateSendMessage(RequestCode.SEND_MESSAGE, DEFAULT_TOPIC, producerHook, "", plainAccessValidator);
- validateSendMessage(RequestCode.SEND_MESSAGE_V2, DEFAULT_TOPIC, producerHook, "", plainAccessValidator);
-
- SessionCredentials consumerCredential = new SessionCredentials(DEFAULT_CONSUMER_AK, DEFAULT_CONSUMER_SK);
- AclClientRPCHook consumerHook = new AclClientRPCHook(consumerCredential);
- validatePullMessage(DEFAULT_TOPIC, DEFAULT_GROUP, consumerHook, "", plainAccessValidator);
-
- // load from file
- loadConfigFile(plainAccessValidator,
- System.getProperty("rocketmq.home.dir") + File.separator + "conf/plain_acl.yml");
- SessionCredentials unmatchedCredential = new SessionCredentials("non_exists_sk", "non_exists_sk");
- AclClientRPCHook dummyHook = new AclClientRPCHook(unmatchedCredential);
- validateSendMessage(RequestCode.SEND_MESSAGE, DEFAULT_TOPIC, dummyHook, DEFAULT_GLOBAL_WHITE_ADDR, plainAccessValidator);
- validateSendMessage(RequestCode.SEND_MESSAGE_V2, DEFAULT_TOPIC, dummyHook, DEFAULT_GLOBAL_WHITE_ADDR, plainAccessValidator);
- validatePullMessage(DEFAULT_TOPIC, DEFAULT_GROUP, dummyHook, DEFAULT_GLOBAL_WHITE_ADDR, plainAccessValidator);
-
- //recheck after reloading
- validateSendMessage(RequestCode.SEND_MESSAGE, DEFAULT_TOPIC, producerHook, "", plainAccessValidator);
- validateSendMessage(RequestCode.SEND_MESSAGE_V2, DEFAULT_TOPIC, producerHook, "", plainAccessValidator);
- validatePullMessage(DEFAULT_TOPIC, DEFAULT_GROUP, consumerHook, "", plainAccessValidator);
-
- }
-
- private void loadConfigFile(PlainAccessValidator plainAccessValidator, String configFileName) throws NoSuchFieldException, IllegalAccessException {
- Class clazz = PlainAccessValidator.class;
- Field f = clazz.getDeclaredField("aclPlugEngine");
- f.setAccessible(true);
- PlainPermissionManager aclPlugEngine = (PlainPermissionManager) f.get(plainAccessValidator);
- aclPlugEngine.load(configFileName);
- }
-
- private PlainAccessConfig generateConsumerAccessConfig() {
- PlainAccessConfig plainAccessConfig2 = new PlainAccessConfig();
- String accessKey2 = DEFAULT_CONSUMER_AK;
- String secretKey2 = DEFAULT_CONSUMER_SK;
- plainAccessConfig2.setAccessKey(accessKey2);
- plainAccessConfig2.setSecretKey(secretKey2);
- plainAccessConfig2.setAdmin(false);
- plainAccessConfig2.setDefaultTopicPerm(AclConstants.DENY);
- plainAccessConfig2.setDefaultGroupPerm(AclConstants.DENY);
- plainAccessConfig2.setTopicPerms(Arrays.asList(DEFAULT_TOPIC + "=" + AclConstants.SUB));
- plainAccessConfig2.setGroupPerms(Arrays.asList(DEFAULT_GROUP + "=" + AclConstants.SUB));
- return plainAccessConfig2;
- }
-
- private PlainAccessConfig generateProducerAccessConfig() {
- PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
- String accessKey = DEFAULT_PRODUCER_AK;
- String secretKey = DEFAULT_PRODUCER_SK;
- plainAccessConfig.setAccessKey(accessKey);
- plainAccessConfig.setSecretKey(secretKey);
- plainAccessConfig.setAdmin(false);
- plainAccessConfig.setDefaultTopicPerm(AclConstants.DENY);
- plainAccessConfig.setDefaultGroupPerm(AclConstants.DENY);
- plainAccessConfig.setTopicPerms(Arrays.asList(DEFAULT_TOPIC + "=" + AclConstants.PUB));
- return plainAccessConfig;
- }
-
- public void validatePullMessage(String topic,
- String group,
- AclClientRPCHook aclClientRPCHook,
- String remoteAddr,
- PlainAccessValidator plainAccessValidator) {
- PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
- pullMessageRequestHeader.setTopic(topic);
- pullMessageRequestHeader.setConsumerGroup(group);
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE,
- pullMessageRequestHeader);
- aclClientRPCHook.doBeforeRequest(remoteAddr, remotingCommand);
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(
- RemotingCommand.decode(buf), remoteAddr);
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
- Assert.fail("Should not throw RemotingCommandException");
- }
- }
-
- public void validateSendMessage(int requestCode,
- String topic,
- AclClientRPCHook aclClientRPCHook,
- String remoteAddr,
- PlainAccessValidator plainAccessValidator) {
- SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
- messageRequestHeader.setTopic(topic);
- RemotingCommand remotingCommand;
- if (RequestCode.SEND_MESSAGE == requestCode) {
- remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
- } else {
- remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2,
- SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader));
- }
-
- aclClientRPCHook.doBeforeRequest(remoteAddr, remotingCommand);
-
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(
- RemotingCommand.decode(buf), remoteAddr);
- System.out.println(accessResource.getWhiteRemoteAddress());
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
- Assert.fail("Should not throw RemotingCommandException");
- }
- }
-
-
- private void checkPlainAccessConfig(final PlainAccessConfig plainAccessConfig, final List plainAccessConfigs) {
- for (PlainAccessConfig config : plainAccessConfigs) {
- if (config.getAccessKey().equals(plainAccessConfig.getAccessKey())) {
- Assert.assertEquals(plainAccessConfig.getSecretKey(), config.getSecretKey());
- Assert.assertEquals(plainAccessConfig.isAdmin(), config.isAdmin());
- Assert.assertEquals(plainAccessConfig.getDefaultGroupPerm(), config.getDefaultGroupPerm());
- Assert.assertEquals(plainAccessConfig.getDefaultGroupPerm(), config.getDefaultGroupPerm());
- Assert.assertEquals(plainAccessConfig.getWhiteRemoteAddress(), config.getWhiteRemoteAddress());
- if (null != plainAccessConfig.getTopicPerms()) {
- Assert.assertNotNull(config.getTopicPerms());
- Assert.assertTrue(config.getTopicPerms().containsAll(plainAccessConfig.getTopicPerms()));
- }
- if (null != plainAccessConfig.getGroupPerms()) {
- Assert.assertNotNull(config.getGroupPerms());
- Assert.assertTrue(config.getGroupPerms().containsAll(plainAccessConfig.getGroupPerms()));
- }
- }
- }
- }
-
- private void checkDefaultAclFileExists(PlainAccessValidator plainAccessValidator) {
- boolean isExists = Files.exists(Paths.get(System.getProperty("rocketmq.home.dir")
- + File.separator + "conf/plain_acl.yml"));
- Assert.assertTrue("default acl config file should exist", isExists);
-
- }
-
-}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java b/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
deleted file mode 100644
index 49558afa585..00000000000
--- a/acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessValidatorTest.java
+++ /dev/null
@@ -1,1098 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.acl.plain;
-
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.AclConstants;
-import org.apache.rocketmq.acl.common.AclException;
-import org.apache.rocketmq.acl.common.AclUtils;
-import org.apache.rocketmq.acl.common.SessionCredentials;
-import org.apache.rocketmq.common.AclConfig;
-import org.apache.rocketmq.common.DataVersion;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.PlainAccessConfig;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
-import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
-import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
-import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
-import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
-import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
-import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
-import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class PlainAccessValidatorTest {
-
- private PlainAccessValidator plainAccessValidator;
- private AclClientRPCHook aclClient;
- private SessionCredentials sessionCredentials;
-
- @Before
- public void init() {
- File file = new File("src/test/resources".replace("/", File.separator));
- System.setProperty("rocketmq.home.dir", file.getAbsolutePath());
- plainAccessValidator = new PlainAccessValidator();
- sessionCredentials = new SessionCredentials();
- sessionCredentials.setAccessKey("RocketMQ");
- sessionCredentials.setSecretKey("12345678");
- sessionCredentials.setSecurityToken("87654321");
- aclClient = new AclClientRPCHook(sessionCredentials);
- }
-
- @Test
- public void contentTest() {
- SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
- messageRequestHeader.setTopic("topicA");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
- aclClient.doBeforeRequest("", remotingCommand);
-
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "127.0.0.1");
- String signature = AclUtils.calSignature(accessResource.getContent(), sessionCredentials.getSecretKey());
-
- Assert.assertEquals(accessResource.getSignature(), signature);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
-
- }
-
- @Test
- public void validateTest() {
- SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
- messageRequestHeader.setTopic("topicB");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
- aclClient.doBeforeRequest("", remotingCommand);
-
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
-
- }
-
- @Test
- public void validateSendMessageTest() {
- SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
- messageRequestHeader.setTopic("topicB");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
- aclClient.doBeforeRequest("", remotingCommand);
-
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test
- public void validateSendMessageToRetryTopicTest() {
- SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
- messageRequestHeader.setTopic(MixAll.getRetryTopic("groupB"));
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
- aclClient.doBeforeRequest("", remotingCommand);
-
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test
- public void validateSendMessageV2Test() {
- SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
- messageRequestHeader.setTopic("topicB");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader));
- aclClient.doBeforeRequest("", remotingCommand);
-
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test
- public void validateSendMessageV2ToRetryTopicTest() {
- SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
- messageRequestHeader.setTopic(MixAll.getRetryTopic("groupC"));
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(messageRequestHeader));
- aclClient.doBeforeRequest("", remotingCommand);
-
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6:9876");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test
- public void validateForAdminCommandWithOutAclRPCHook() {
- RemotingCommand consumerOffsetAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
- plainAccessValidator.parse(consumerOffsetAdminRequest, "192.168.0.1:9876");
-
- RemotingCommand subscriptionGroupAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
- plainAccessValidator.parse(subscriptionGroupAdminRequest, "192.168.0.1:9876");
-
- RemotingCommand delayOffsetAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
- plainAccessValidator.parse(delayOffsetAdminRequest, "192.168.0.1:9876");
-
- RemotingCommand allTopicConfigAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
- plainAccessValidator.parse(allTopicConfigAdminRequest, "192.168.0.1:9876");
-
- }
-
- @Test
- public void validatePullMessageTest() {
- PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
- pullMessageRequestHeader.setTopic("topicC");
- pullMessageRequestHeader.setConsumerGroup("groupC");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
- aclClient.doBeforeRequest("", remotingCommand);
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test
- public void validateConsumeMessageBackTest() {
- ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = new ConsumerSendMsgBackRequestHeader();
- consumerSendMsgBackRequestHeader.setOriginTopic("topicC");
- consumerSendMsgBackRequestHeader.setGroup("groupC");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, consumerSendMsgBackRequestHeader);
- aclClient.doBeforeRequest("", remotingCommand);
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test
- public void validateQueryMessageTest() {
- QueryMessageRequestHeader queryMessageRequestHeader = new QueryMessageRequestHeader();
- queryMessageRequestHeader.setTopic("topicC");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, queryMessageRequestHeader);
- aclClient.doBeforeRequest("", remotingCommand);
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test
- public void validateQueryMessageByKeyTest() {
- QueryMessageRequestHeader queryMessageRequestHeader = new QueryMessageRequestHeader();
- queryMessageRequestHeader.setTopic("topicC");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, queryMessageRequestHeader);
- aclClient.doBeforeRequest("", remotingCommand);
- remotingCommand.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, "false");
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1:9876");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test
- public void validateHeartBeatTest() {
- HeartbeatData heartbeatData = new HeartbeatData();
- Set producerDataSet = new HashSet<>();
- Set consumerDataSet = new HashSet<>();
- Set subscriptionDataSet = new HashSet<>();
- ProducerData producerData = new ProducerData();
- producerData.setGroupName("groupB");
- ConsumerData consumerData = new ConsumerData();
- consumerData.setGroupName("groupC");
- SubscriptionData subscriptionData = new SubscriptionData();
- subscriptionData.setTopic("topicC");
- producerDataSet.add(producerData);
- consumerDataSet.add(consumerData);
- subscriptionDataSet.add(subscriptionData);
- consumerData.setSubscriptionDataSet(subscriptionDataSet);
- heartbeatData.setProducerDataSet(producerDataSet);
- heartbeatData.setConsumerDataSet(consumerDataSet);
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
- remotingCommand.setBody(heartbeatData.encode());
- aclClient.doBeforeRequest("", remotingCommand);
- ByteBuffer buf = remotingCommand.encode();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test
- public void validateUnRegisterClientTest() {
- UnregisterClientRequestHeader unregisterClientRequestHeader = new UnregisterClientRequestHeader();
- unregisterClientRequestHeader.setConsumerGroup("groupB");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, unregisterClientRequestHeader);
- aclClient.doBeforeRequest("", remotingCommand);
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test
- public void validateGetConsumerListByGroupTest() {
- GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = new GetConsumerListByGroupRequestHeader();
- getConsumerListByGroupRequestHeader.setConsumerGroup("groupB");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, getConsumerListByGroupRequestHeader);
- aclClient.doBeforeRequest("", remotingCommand);
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test
- public void validateUpdateConsumerOffSetTest() {
- UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = new UpdateConsumerOffsetRequestHeader();
- updateConsumerOffsetRequestHeader.setConsumerGroup("groupB");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, updateConsumerOffsetRequestHeader);
- aclClient.doBeforeRequest("", remotingCommand);
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test(expected = AclException.class)
- public void validateNullAccessKeyTest() {
- SessionCredentials sessionCredentials = new SessionCredentials();
- sessionCredentials.setAccessKey("RocketMQ1");
- sessionCredentials.setSecretKey("1234");
- AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(sessionCredentials);
- SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
- messageRequestHeader.setTopic("topicB");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
- aclClientRPCHook.doBeforeRequest("", remotingCommand);
-
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test(expected = AclException.class)
- public void validateErrorSecretKeyTest() {
- SessionCredentials sessionCredentials = new SessionCredentials();
- sessionCredentials.setAccessKey("RocketMQ");
- sessionCredentials.setSecretKey("1234");
- AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(sessionCredentials);
- SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
- messageRequestHeader.setTopic("topicB");
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
- aclClientRPCHook.doBeforeRequest("", remotingCommand);
-
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "192.168.1.1");
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test
- public void validateGetAllTopicConfigTest() {
- String whiteRemoteAddress = "192.168.0.1";
- RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
-
- ByteBuffer buf = remotingCommand.encodeHeader();
- buf.getInt();
- buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
- buf.position(0);
- try {
- PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), whiteRemoteAddress);
- plainAccessValidator.validate(accessResource);
- } catch (RemotingCommandException e) {
- e.printStackTrace();
-
- Assert.fail("Should not throw IOException");
- }
- }
-
- @Test
- public void addAccessAclYamlConfigTest() throws InterruptedException {
- String backupFileName = System.getProperty("rocketmq.home.dir")
- + File.separator + "conf/plain_acl_bak.yml".replace("/", File.separator);
- String targetFileName = System.getProperty("rocketmq.home.dir")
- + File.separator + "conf/plain_acl.yml".replace("/", File.separator);
- Map backUpAclConfigMap = AclUtils.getYamlDataObject(backupFileName, Map.class);
- AclUtils.writeDataObject(targetFileName, backUpAclConfigMap);
-
- PlainAccessConfig plainAccessConfig = new PlainAccessConfig();
- plainAccessConfig.setAccessKey("rocketmq3");
- plainAccessConfig.setSecretKey("1234567890");
- plainAccessConfig.setWhiteRemoteAddress("192.168.0.*");
- plainAccessConfig.setDefaultGroupPerm("PUB");
- plainAccessConfig.setDefaultTopicPerm("SUB");
- List topicPerms = new ArrayList();
- topicPerms.add("topicC=PUB|SUB");
- topicPerms.add("topicB=PUB");
- plainAccessConfig.setTopicPerms(topicPerms);
- List groupPerms = new ArrayList();
- groupPerms.add("groupB=PUB|SUB");
- groupPerms.add("groupC=DENY");
- plainAccessConfig.setGroupPerms(groupPerms);
-
- PlainAccessValidator plainAccessValidator = new PlainAccessValidator();
- plainAccessValidator.updateAccessConfig(plainAccessConfig);
- Thread.sleep(10000);
-
- Map verifyMap = new HashMap<>();
- AclConfig aclConfig = plainAccessValidator.getAllAclConfig();
- List plainAccessConfigs = aclConfig.getPlainAccessConfigs();
- for (PlainAccessConfig plainAccessConfig1 : plainAccessConfigs) {
- if (plainAccessConfig1.getAccessKey().equals(plainAccessConfig.getAccessKey())) {
- verifyMap.put(AclConstants.CONFIG_SECRET_KEY, plainAccessConfig1.getSecretKey());
- verifyMap.put(AclConstants.CONFIG_DEFAULT_TOPIC_PERM, plainAccessConfig1.getDefaultTopicPerm());
- verifyMap.put(AclConstants.CONFIG_DEFAULT_GROUP_PERM, plainAccessConfig1.getDefaultGroupPerm());
- verifyMap.put(AclConstants.CONFIG_ADMIN_ROLE, plainAccessConfig1.isAdmin());
- verifyMap.put(AclConstants.CONFIG_WHITE_ADDR, plainAccessConfig1.getWhiteRemoteAddress());
- verifyMap.put(AclConstants.CONFIG_TOPIC_PERMS, plainAccessConfig1.getTopicPerms());
- verifyMap.put(AclConstants.CONFIG_GROUP_PERMS, plainAccessConfig1.getGroupPerms());
- }
- }
-
- Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_SECRET_KEY), "1234567890");
- Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_TOPIC_PERM), "SUB");
- Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_DEFAULT_GROUP_PERM), "PUB");
- Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_ADMIN_ROLE), false);
- Assert.assertEquals(verifyMap.get(AclConstants.CONFIG_WHITE_ADDR), "192.168.0.*");
- Assert.assertEquals(((List) verifyMap.get(AclConstants.CONFIG_TOPIC_PERMS)).size(), 2);
- Assert.assertEquals(((List) verifyMap.get(AclConstants.CONFIG_GROUP_PERMS)).size(), 2);
-
- String aclFileName = System.getProperty("rocketmq.home.dir") + File.separator + "conf/plain_acl.yml";
- Map readableMap = AclUtils.getYamlDataObject(aclFileName, Map.class);
- List