diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel index 77d456bc16a..f00d01e8cc3 100644 --- a/broker/BUILD.bazel +++ b/broker/BUILD.bazel @@ -32,7 +32,6 @@ java_library( "//tieredstore", "@maven//:org_slf4j_slf4j_api", "@maven//:ch_qos_logback_logback_classic", - "@maven//:com_alibaba_fastjson", "@maven//:com_alibaba_fastjson2_fastjson2", "@maven//:com_github_luben_zstd_jni", "@maven//:com_google_guava_guava", @@ -84,7 +83,6 @@ java_library( "//remoting", "//store", "//tieredstore", - "@maven//:com_alibaba_fastjson", "@maven//:com_alibaba_fastjson2_fastjson2", "@maven//:com_google_guava_guava", "@maven//:io_netty_netty_all", diff --git a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java index ee2d4e54a6a..c59c00c040c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java @@ -16,9 +16,7 @@ */ package org.apache.rocketmq.broker; -import com.alibaba.fastjson.JSON; -import java.nio.charset.StandardCharsets; -import java.util.function.BiConsumer; +import com.alibaba.fastjson2.JSON; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.config.ConfigRocksDBStorage; import org.apache.rocketmq.common.constant.LoggerName; @@ -31,6 +29,9 @@ import org.rocksdb.Statistics; import org.rocksdb.WriteBatch; +import java.nio.charset.StandardCharsets; +import java.util.function.BiConsumer; + public class RocksDBConfigManager { protected static final Logger BROKER_LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); public volatile boolean isStop = false; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java index 120f5b104c7..9f173daf469 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java @@ -16,17 +16,9 @@ */ package org.apache.rocketmq.broker.offset; -import com.alibaba.fastjson.annotation.JSONField; +import com.alibaba.fastjson2.annotation.JSONField; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -37,6 +29,15 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + public class ConsumerOrderInfoManager extends ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java index 1ee01fea1c8..661ace9bcb0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java @@ -16,9 +16,9 @@ */ package org.apache.rocketmq.broker.pop; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.alibaba.fastjson.annotation.JSONField; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.annotation.JSONField; + import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -119,7 +119,7 @@ public byte[] getValueBytes() { } public static PopConsumerRecord decode(byte[] body) { - return JSONObject.parseObject(body, PopConsumerRecord.class); + return JSON.parseObject(body, PopConsumerRecord.class); } public long getPopTime() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index dde13a5ed73..45f7bc62a4f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -16,25 +16,9 @@ */ package org.apache.rocketmq.broker.pop; -import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson2.JSON; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; -import java.nio.ByteBuffer; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.broker.BrokerController; @@ -65,6 +49,23 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + public class PopConsumerService extends ServiceThread { private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index 23a4f6167c6..06a531552a5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -16,11 +16,9 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson2.JSON; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import java.util.BitSet; -import java.nio.charset.StandardCharsets; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.PopMetricsManager; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; @@ -52,6 +50,9 @@ import org.apache.rocketmq.store.pop.AckMsg; import org.apache.rocketmq.store.pop.BatchAckMsg; +import java.nio.charset.StandardCharsets; +import java.util.BitSet; + public class AckMessageProcessor implements NettyRequestProcessor { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 4200f34bdee..c747fa15af0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -16,33 +16,12 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.opentelemetry.api.common.Attributes; -import java.io.UnsupportedEncodingException; -import java.net.UnknownHostException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.AccessValidator; @@ -236,6 +215,28 @@ import org.apache.rocketmq.store.timer.TimerMessageStore; import org.apache.rocketmq.store.util.LibC; +import java.io.UnsupportedEncodingException; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; @@ -2891,7 +2892,7 @@ private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx, } else { ConsumerFilterData filterData = this.brokerController.getConsumerFilterManager() .get(requestHeader.getTopic(), requestHeader.getConsumerGroup()); - body.setFilterData(JSON.toJSONString(filterData, true)); + body.setFilterData(JSON.toJSONString(filterData)); messageFilter = new ExpressionMessageFilter(subscriptionData, filterData, this.brokerController.getConsumerFilterManager()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java index de72ee7baff..f288c001b83 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java @@ -16,12 +16,9 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson2.JSON; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.nio.charset.StandardCharsets; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.PopMetricsManager; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; @@ -50,6 +47,10 @@ import org.apache.rocketmq.store.pop.AckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); private final BrokerController brokerController; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java index 820388b18d2..7c309ec5c4d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java @@ -16,15 +16,7 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson.JSON; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicInteger; +import com.alibaba.fastjson2.JSON; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.metrics.PopMetricsManager; import org.apache.rocketmq.common.KeyBuilder; @@ -44,6 +36,15 @@ import org.apache.rocketmq.store.pop.BatchAckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicInteger; + public class PopBufferMergeService extends ServiceThread { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); ConcurrentHashMap diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index dd8314b7e0d..1ce643334f5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -16,27 +16,13 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson2.JSON; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.FileRegion; import io.opentelemetry.api.common.Attributes; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Random; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.filter.ConsumerFilterData; import org.apache.rocketmq.broker.filter.ConsumerFilterManager; @@ -91,6 +77,21 @@ import org.apache.rocketmq.store.pop.BatchAckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index e1ead86169b..dcffaf50cc6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -16,18 +16,8 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson2.JSON; import io.opentelemetry.api.common.Attributes; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.broker.BrokerController; @@ -37,12 +27,12 @@ import org.apache.rocketmq.client.consumer.PullStatus; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; -import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; @@ -61,6 +51,17 @@ import org.apache.rocketmq.store.pop.BatchAckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; + import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java index 6b9cf159383..dfbe5d347ad 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java @@ -16,13 +16,8 @@ */ package org.apache.rocketmq.broker.topic; -import com.alibaba.fastjson.JSON; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONWriter; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerPathConfigHelper; import org.apache.rocketmq.common.ConfigManager; @@ -40,6 +35,13 @@ import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader; import org.apache.rocketmq.remoting.rpc.TopicRequestHeader; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse; public class TopicQueueMappingManager extends ConfigManager { @@ -149,7 +151,10 @@ public String encode(boolean pretty) { TopicQueueMappingSerializeWrapper wrapper = new TopicQueueMappingSerializeWrapper(); wrapper.setTopicQueueMappingInfoMap(topicQueueMappingTable); wrapper.setDataVersion(this.dataVersion); - return JSON.toJSONString(wrapper, pretty); + if (pretty) { + return JSON.toJSONString(wrapper, JSONWriter.Feature.PrettyFormat); + } + return JSON.toJSONString(wrapper); } @Override diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java index ad30c73c608..d8dd811db28 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java @@ -16,16 +16,21 @@ */ package org.apache.rocketmq.broker.transaction; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.serializer.SerializerFeature; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONWriter; import com.google.common.io.Files; -import java.io.BufferedWriter; +import org.apache.rocketmq.common.ConfigManager; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; +import org.apache.rocketmq.remoting.protocol.DataVersion; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.nio.charset.StandardCharsets; +import java.io.OutputStream; import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -33,14 +38,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; -import org.apache.rocketmq.common.ConfigManager; -import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.topic.TopicValidator; -import org.apache.rocketmq.logging.org.slf4j.Logger; -import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import org.apache.rocketmq.remoting.protocol.DataVersion; -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; - public class TransactionMetrics extends ConfigManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -90,11 +87,11 @@ public void setTransactionCounts(ConcurrentMap transactionCounts this.transactionCounts = transactionCounts; } - protected void write0(Writer writer) { + protected void write0(OutputStream out) { TransactionMetricsSerializeWrapper wrapper = new TransactionMetricsSerializeWrapper(); wrapper.setTransactionCount(transactionCounts); wrapper.setDataVersion(dataVersion); - JSON.writeJSONString(writer, wrapper, SerializerFeature.BrowserCompatible); + JSON.writeTo(out, wrapper, JSONWriter.Feature.BrowserCompatible); } @Override @@ -182,7 +179,7 @@ public synchronized void persist() { String config = configFilePath(); String temp = config + ".tmp"; String backup = config + ".bak"; - BufferedWriter bufferedWriter = null; + FileOutputStream outputStream = null; try { File tmpFile = new File(temp); File parentDirectory = tmpFile.getParentFile(); @@ -199,11 +196,10 @@ public synchronized void persist() { return; } } - bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(tmpFile, false), - StandardCharsets.UTF_8)); - write0(bufferedWriter); - bufferedWriter.flush(); - bufferedWriter.close(); + outputStream = new FileOutputStream(tmpFile, false); + write0(outputStream); + outputStream.flush(); + outputStream.close(); log.debug("Finished writing tmp file: {}", temp); File configFile = new File(config); @@ -216,9 +212,9 @@ public synchronized void persist() { } catch (IOException e) { log.error("Failed to persist {}", temp, e); } finally { - if (null != bufferedWriter) { + if (null != outputStream) { try { - bufferedWriter.close(); + outputStream.close(); } catch (IOException ignore) { } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java new file mode 100644 index 00000000000..d9feb6a782a --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker; + +import com.alibaba.fastjson2.JSON; +import org.apache.rocketmq.common.config.ConfigRocksDBStorage; +import org.apache.rocketmq.remoting.protocol.DataVersion; +import org.junit.Before; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.mock; + +public class RocksDBConfigManagerTest { + + private ConfigRocksDBStorage configRocksDBStorage; + + private RocksDBConfigManager rocksDBConfigManager; + + @Before + public void setUp() throws IllegalAccessException { + configRocksDBStorage = mock(ConfigRocksDBStorage.class); + rocksDBConfigManager = spy(new RocksDBConfigManager("testPath", 1000L, null)); + rocksDBConfigManager.configRocksDBStorage = configRocksDBStorage; + } + + @Test + public void testLoadDataVersion() throws Exception { + DataVersion expected = new DataVersion(); + expected.nextVersion(); + String jsonData = JSON.toJSONString(expected); + byte[] mockDataVersion = jsonData.getBytes(StandardCharsets.UTF_8); + + when(rocksDBConfigManager.configRocksDBStorage.getKvDataVersion()).thenReturn(mockDataVersion); + + boolean result = rocksDBConfigManager.loadDataVersion(); + + assertTrue(result); + assertEquals(expected.getCounter().get(), rocksDBConfigManager.getKvDataVersion().getCounter().get()); + assertEquals(expected.getTimestamp(), rocksDBConfigManager.getKvDataVersion().getTimestamp()); + } + + @Test + public void testUpdateKvDataVersion() throws Exception { + rocksDBConfigManager.updateKvDataVersion(); + + DataVersion expectedDataVersion = rocksDBConfigManager.getKvDataVersion(); + verify(rocksDBConfigManager.configRocksDBStorage, times(1)).updateKvDataVersion( + eq(JSON.toJSONString(expectedDataVersion).getBytes(StandardCharsets.UTF_8)) + ); + } +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 959b147d9d3..90c333b7703 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -16,7 +16,8 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -34,10 +35,10 @@ import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerManager; import org.apache.rocketmq.broker.client.net.Broker2Client; -import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; -import org.apache.rocketmq.broker.schedule.ScheduleMessageService; import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager; import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager; +import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; +import org.apache.rocketmq.broker.schedule.ScheduleMessageService; import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; @@ -73,6 +74,7 @@ import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UserInfo; +import org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader; @@ -87,11 +89,13 @@ import org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetSubscriptionGroupConfigRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetUserRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ListAclsRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ListUsersRequestHeader; import org.apache.rocketmq.remoting.protocol.header.NotifyMinBrokerIdChangeRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryConsumeQueueRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryCorrectionOffsetHeader; import org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader; import org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader; @@ -104,6 +108,7 @@ import org.apache.rocketmq.remoting.protocol.header.UpdateUserRequestHeader; import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.store.CommitLog; import org.apache.rocketmq.store.DefaultMessageStore; @@ -111,6 +116,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.logfile.DefaultMappedFile; +import org.apache.rocketmq.store.queue.ConsumeQueueInterface; import org.apache.rocketmq.store.stats.BrokerStats; import org.apache.rocketmq.store.timer.TimerCheckpoint; import org.apache.rocketmq.store.timer.TimerMessageStore; @@ -145,6 +151,8 @@ import java.util.concurrent.atomic.LongAdder; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -717,7 +725,7 @@ public void testGetAllConsumerOffset() throws RemotingCommandException { consumerOffsetManager = mock(ConsumerOffsetManager.class); when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); ConsumerOffsetManager consumerOffset = new ConsumerOffsetManager(); - when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset, false)); + when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset)); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null); RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); @@ -1328,6 +1336,69 @@ public void testResetMasterFlushOffset() throws RemotingCommandException { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testGetSubscriptionGroup() throws RemotingCommandException { + brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().put("group", new SubscriptionGroupConfig()); + GetSubscriptionGroupConfigRequestHeader requestHeader = new GetSubscriptionGroupConfigRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG, requestHeader); + requestHeader.setGroup("group"); + request.makeCustomHeaderToNet(); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertEquals(ResponseCode.SUCCESS, response.getCode()); + } + + @Test + public void testCheckRocksdbCqWriteProgress() throws RemotingCommandException { + CheckRocksdbCqWriteProgressRequestHeader requestHeader = new CheckRocksdbCqWriteProgressRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS, requestHeader); + requestHeader.setTopic("topic"); + request.makeCustomHeaderToNet(); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertEquals(ResponseCode.SUCCESS, response.getCode()); + } + + @Test + public void testQueryConsumeQueue() throws RemotingCommandException { + messageStore = mock(MessageStore.class); + ConsumeQueueInterface consumeQueue = mock(ConsumeQueueInterface.class); + when(consumeQueue.getMinOffsetInQueue()).thenReturn(0L); + when(consumeQueue.getMaxOffsetInQueue()).thenReturn(1L); + when(messageStore.getConsumeQueue(anyString(), anyInt())).thenReturn(consumeQueue); + when(brokerController.getMessageStore()).thenReturn(messageStore); + QueryConsumeQueueRequestHeader requestHeader = new QueryConsumeQueueRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_QUEUE, requestHeader); + requestHeader.setTopic("topic"); + requestHeader.setQueueId(0); + request.makeCustomHeaderToNet(); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertEquals(ResponseCode.SUCCESS, response.getCode()); + } + + @Test + public void testProcessRequest_GetTopicConfig() throws Exception { + GetTopicConfigRequestHeader requestHeader = new GetTopicConfigRequestHeader(); + requestHeader.setTopic("testTopic"); + + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, requestHeader); + request.makeCustomHeaderToNet(); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName("testTopic"); + TopicConfigManager topicConfigManager = mock(TopicConfigManager.class); + when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); + when(topicConfigManager.selectTopicConfig("testTopic")) + .thenReturn(topicConfig); + + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + + assertNotNull(response); + assertEquals(ResponseCode.SUCCESS, response.getCode()); + + String responseBody = new String(response.getBody(), StandardCharsets.UTF_8); + TopicConfigAndQueueMapping result = JSONObject.parseObject(responseBody, TopicConfigAndQueueMapping.class); + assertEquals("testTopic", result.getTopicName()); + } + private ResetOffsetRequestHeader createRequestHeader(String topic,String group,long timestamp,boolean force,long offset,int queueId) { ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); requestHeader.setTopic(topic); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java index e15d51b4a87..77490dbd69a 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java @@ -18,12 +18,11 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import java.lang.reflect.Field; -import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.net.Broker2Client; import org.apache.rocketmq.broker.failover.EscapeBridge; +import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.MessageConst; @@ -41,10 +40,12 @@ import org.apache.rocketmq.store.AppendMessageResult; import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.exception.ConsumeQueueException; +import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -52,8 +53,13 @@ import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; +import java.lang.reflect.Field; +import java.util.concurrent.CompletableFuture; + import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -162,4 +168,51 @@ public void testProcessRequest_NoMessage() throws RemotingCommandException, Cons assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_MESSAGE); assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque()); } + + @Test + public void testProcessRequestAsync_JsonParsing() throws Exception { + Channel mockChannel = mock(Channel.class); + RemotingCommand mockRequest = mock(RemotingCommand.class); + BrokerController mockBrokerController = mock(BrokerController.class); + TopicConfigManager mockTopicConfigManager = mock(TopicConfigManager.class); + MessageStore mockMessageStore = mock(MessageStore.class); + BrokerConfig mockBrokerConfig = mock(BrokerConfig.class); + BrokerStatsManager mockBrokerStatsManager = mock(BrokerStatsManager.class); + PopMessageProcessor mockPopMessageProcessor = mock(PopMessageProcessor.class); + PopBufferMergeService mockPopBufferMergeService = mock(PopBufferMergeService.class); + + when(mockBrokerController.getTopicConfigManager()).thenReturn(mockTopicConfigManager); + when(mockBrokerController.getMessageStore()).thenReturn(mockMessageStore); + when(mockBrokerController.getBrokerConfig()).thenReturn(mockBrokerConfig); + when(mockBrokerController.getBrokerStatsManager()).thenReturn(mockBrokerStatsManager); + when(mockBrokerController.getPopMessageProcessor()).thenReturn(mockPopMessageProcessor); + when(mockPopMessageProcessor.getPopBufferMergeService()).thenReturn(mockPopBufferMergeService); + when(mockPopBufferMergeService.addAk(anyInt(), any())).thenReturn(false); + when(mockBrokerController.getEscapeBridge()).thenReturn(escapeBridge); + PutMessageResult mockPutMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, null, true); + when(mockBrokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(mockPutMessageResult)); + + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setReadQueueNums(4); + when(mockTopicConfigManager.selectTopicConfig(anyString())).thenReturn(topicConfig); + when(mockMessageStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(0L); + when(mockMessageStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(10L); + when(mockBrokerConfig.isPopConsumerKVServiceEnable()).thenReturn(false); + + ChangeInvisibleTimeRequestHeader requestHeader = new ChangeInvisibleTimeRequestHeader(); + requestHeader.setTopic("TestTopic"); + requestHeader.setQueueId(1); + requestHeader.setOffset(5L); + requestHeader.setConsumerGroup("TestGroup"); + requestHeader.setExtraInfo("0 10000 10000 0 TestBroker 1"); + requestHeader.setInvisibleTime(60000L); + when(mockRequest.decodeCommandCustomHeader(ChangeInvisibleTimeRequestHeader.class)).thenReturn(requestHeader); + + ChangeInvisibleTimeProcessor processor = new ChangeInvisibleTimeProcessor(mockBrokerController); + CompletableFuture futureResponse = processor.processRequestAsync(mockChannel, mockRequest, true); + + RemotingCommand response = futureResponse.get(); + assertNotNull(response); + assertEquals(ResponseCode.SUCCESS, response.getCode()); + } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java index acc7a3da74a..33d6820a7ef 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java @@ -16,19 +16,20 @@ */ package org.apache.rocketmq.broker.processor; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; +import com.alibaba.fastjson2.JSON; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.broker.client.ClientChannelInfo; +import org.apache.rocketmq.broker.client.ConsumerManager; +import org.apache.rocketmq.broker.failover.EscapeBridge; import org.apache.rocketmq.broker.schedule.ScheduleMessageService; +import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; -import org.apache.rocketmq.remoting.netty.NettyClientConfig; -import org.apache.rocketmq.remoting.netty.NettyServerConfig; -import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.PutMessageResult; +import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.pop.AckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; @@ -37,56 +38,82 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; -import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; -import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData; +import java.lang.reflect.Method; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.Silent.class) public class PopBufferMergeServiceTest { - @Spy - private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig()); + @Mock + private BrokerController brokerController; + private PopMessageProcessor popMessageProcessor; + + @Mock + private ScheduleMessageService scheduleMessageService; + @Mock - private ChannelHandlerContext handlerContext; + private TopicConfigManager topicConfigManager; + + @Mock + private ConsumerManager consumerManager; + @Mock private DefaultMessageStore messageStore; - private ScheduleMessageService scheduleMessageService; - private ClientChannelInfo clientChannelInfo; - private String group = "FooBarGroup"; - private String topic = "FooBar"; + + @Mock + private MessageStoreConfig messageStoreConfig; + + private String defaultGroup = "defaultGroup"; + + private String defaultTopic = "defaultTopic"; + + private PopBufferMergeService popBufferMergeService; + + @Mock + private BrokerConfig brokerConfig; + + @Mock + private EscapeBridge escapeBridge; @Before public void init() throws Exception { - FieldUtils.writeField(brokerController.getBrokerConfig(), "enablePopBufferMerge", true, true); - brokerController.setMessageStore(messageStore); + when(brokerConfig.getBrokerIP1()).thenReturn("127.0.0.1"); + when(brokerConfig.isEnablePopBufferMerge()).thenReturn(true); + when(brokerConfig.getPopCkStayBufferTime()).thenReturn(10 * 1000); + when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); + when(brokerController.getEscapeBridge()).thenReturn(escapeBridge); + when(brokerController.getMessageStore()).thenReturn(messageStore); + when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); + when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService); + when(brokerController.getConsumerManager()).thenReturn(consumerManager); + when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig); popMessageProcessor = new PopMessageProcessor(brokerController); - scheduleMessageService = new ScheduleMessageService(brokerController); - scheduleMessageService.parseDelayLevel(); - Channel mockChannel = mock(Channel.class); - brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new TopicConfig()); - clientChannelInfo = new ClientChannelInfo(mockChannel); - ConsumerData consumerData = createConsumerData(group, topic); - brokerController.getConsumerManager().registerConsumer( - consumerData.getGroupName(), - clientChannelInfo, - consumerData.getConsumeType(), - consumerData.getMessageModel(), - consumerData.getConsumeFromWhere(), - consumerData.getSubscriptionDataSet(), - false); + popBufferMergeService = new PopBufferMergeService(brokerController, popMessageProcessor); + FieldUtils.writeDeclaredField(popBufferMergeService, "brokerController", brokerController, true); + ConcurrentMap topicConfigTable = new ConcurrentHashMap<>(); + topicConfigTable.put(defaultTopic, new TopicConfig()); + when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable); } - @Test(timeout = 10_000) + @Test(timeout = 15_000) public void testBasic() throws Exception { // This test case fails on Windows in CI pipeline // Disable it for later fix Assume.assumeFalse(MixAll.isWindows()); - PopBufferMergeService popBufferMergeService = new PopBufferMergeService(brokerController, popMessageProcessor); - popBufferMergeService.start(); PopCheckPoint ck = new PopCheckPoint(); ck.setBitMap(0); int msgCnt = 1; @@ -97,8 +124,8 @@ public void testBasic() throws Exception { ck.setInvisibleTime(invisibleTime); int offset = 100; ck.setStartOffset(offset); - ck.setCId(group); - ck.setTopic(topic); + ck.setCId(defaultGroup); + ck.setTopic(defaultTopic); int queueId = 0; ck.setQueueId(queueId); @@ -108,18 +135,93 @@ public void testBasic() throws Exception { AckMsg ackMsg = new AckMsg(); ackMsg.setAckOffset(ackOffset); ackMsg.setStartOffset(offset); - ackMsg.setConsumerGroup(group); - ackMsg.setTopic(topic); + ackMsg.setConsumerGroup(defaultGroup); + ackMsg.setTopic(defaultTopic); ackMsg.setQueueId(queueId); ackMsg.setPopTime(popTime); try { assertThat(popBufferMergeService.addCk(ck, reviveQid, ackOffset, nextBeginOffset)).isTrue(); - assertThat(popBufferMergeService.getLatestOffset(topic, group, queueId)).isEqualTo(nextBeginOffset); + assertThat(popBufferMergeService.getLatestOffset(defaultTopic, defaultGroup, queueId)).isEqualTo(nextBeginOffset); Thread.sleep(1000); // wait background threads of PopBufferMergeService run for some time assertThat(popBufferMergeService.addAk(reviveQid, ackMsg)).isTrue(); - assertThat(popBufferMergeService.getLatestOffset(topic, group, queueId)).isEqualTo(nextBeginOffset); + assertThat(popBufferMergeService.getLatestOffset(defaultTopic, defaultGroup, queueId)).isEqualTo(nextBeginOffset); } finally { popBufferMergeService.shutdown(true); } } + + @Test + public void testAddCkJustOffset_MergeKeyConflict() { + PopCheckPoint point = mock(PopCheckPoint.class); + String mergeKey = "testMergeKey"; + when(point.getTopic()).thenReturn(mergeKey); + when(point.getCId()).thenReturn(""); + when(point.getQueueId()).thenReturn(0); + when(point.getStartOffset()).thenReturn(0L); + when(point.getPopTime()).thenReturn(0L); + when(point.getBrokerName()).thenReturn(""); + popBufferMergeService.buffer.put(mergeKey + "000", mock(PopBufferMergeService.PopCheckPointWrapper.class)); + + assertFalse(popBufferMergeService.addCkJustOffset(point, 0, 0, 0)); + } + + @Test + public void testAddCkMock() { + int queueId = 0; + long startOffset = 100L; + long invisibleTime = 30_000L; + long popTime = System.currentTimeMillis(); + int reviveQueueId = 0; + long nextBeginOffset = 101L; + String brokerName = "brokerName"; + popBufferMergeService.addCkMock(defaultGroup, defaultTopic, queueId, startOffset, invisibleTime, popTime, reviveQueueId, nextBeginOffset, brokerName); + verify(brokerConfig, times(1)).isEnablePopLog(); + } + + @Test + public void testPutAckToStore() throws Exception { + PopCheckPoint point = new PopCheckPoint(); + point.setStartOffset(100L); + point.setCId("testGroup"); + point.setTopic("testTopic"); + point.setQueueId(1); + point.setPopTime(System.currentTimeMillis()); + point.setBrokerName("testBroker"); + + PopBufferMergeService.PopCheckPointWrapper pointWrapper = mock(PopBufferMergeService.PopCheckPointWrapper.class); + when(pointWrapper.getCk()).thenReturn(point); + when(pointWrapper.getReviveQueueId()).thenReturn(0); + + AtomicInteger toStoreBits = new AtomicInteger(0); + when(pointWrapper.getToStoreBits()).thenReturn(toStoreBits); + + byte msgIndex = 0; + AtomicInteger count = new AtomicInteger(0); + + EscapeBridge escapeBridge = mock(EscapeBridge.class); + when(brokerController.getEscapeBridge()).thenReturn(escapeBridge); + when(brokerController.getBrokerConfig().isAppendAckAsync()).thenReturn(false); + + when(escapeBridge.putMessageToSpecificQueue(any())).thenAnswer(invocation -> { + MessageExtBrokerInner capturedMessage = invocation.getArgument(0); + AckMsg ackMsg = JSON.parseObject(capturedMessage.getBody(), AckMsg.class); + + assertEquals(point.ackOffsetByIndex(msgIndex), ackMsg.getAckOffset()); + assertEquals(point.getStartOffset(), ackMsg.getStartOffset()); + assertEquals(point.getCId(), ackMsg.getConsumerGroup()); + assertEquals(point.getTopic(), ackMsg.getTopic()); + assertEquals(point.getQueueId(), ackMsg.getQueueId()); + assertEquals(point.getPopTime(), ackMsg.getPopTime()); + assertEquals(point.getBrokerName(), ackMsg.getBrokerName()); + + PutMessageResult result = mock(PutMessageResult.class); + when(result.getPutMessageStatus()).thenReturn(PutMessageStatus.PUT_OK); + return result; + }); + + Method method = PopBufferMergeService.class.getDeclaredMethod("putAckToStore", PopBufferMergeService.PopCheckPointWrapper.class, byte.class, AtomicInteger.class); + method.setAccessible(true); + method.invoke(popBufferMergeService, pointWrapper, msgIndex, count); + verify(escapeBridge, times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); + } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java index fdb0690e5dc..28476149ab4 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java @@ -16,10 +16,9 @@ */ package org.apache.rocketmq.broker.processor; +import com.alibaba.fastjson2.JSON; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.embedded.EmbeddedChannel; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.common.BrokerConfig; @@ -27,6 +26,7 @@ import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.constant.ConsumeInitMode; import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; @@ -42,7 +42,7 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.exception.ConsumeQueueException; import org.apache.rocketmq.store.logfile.DefaultMappedFile; -import org.junit.Assert; +import org.apache.rocketmq.store.pop.PopCheckPoint; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -50,8 +50,13 @@ import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; + import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -168,17 +173,17 @@ public void testGetInitOffset_retryTopic() throws RemotingCommandException { .thenReturn(CompletableFuture.completedFuture(getMessageResult)); long offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 0); - Assert.assertEquals(-1, offset); + assertEquals(-1, offset); RemotingCommand request = createPopMsgCommand(newGroup, topic, 0, ConsumeInitMode.MAX); popMessageProcessor.processRequest(handlerContext, request); offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 0); - Assert.assertEquals(minOffset, offset); + assertEquals(minOffset, offset); when(messageStore.getMinOffsetInQueue(retryTopic, 0)).thenReturn(minOffset * 2); popMessageProcessor.processRequest(handlerContext, request); offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic, 0); - Assert.assertEquals(minOffset, offset); // will not entry getInitOffset() again + assertEquals(minOffset, offset); // will not entry getInitOffset() again messageStore.getMinOffsetInQueue(retryTopic, 0); // prevent UnnecessaryStubbingException } @@ -193,17 +198,17 @@ public void testGetInitOffset_normalTopic() throws RemotingCommandException, Con .thenReturn(CompletableFuture.completedFuture(getMessageResult)); long offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0); - Assert.assertEquals(-1, offset); + assertEquals(-1, offset); RemotingCommand request = createPopMsgCommand(newGroup, topic, 0, ConsumeInitMode.MAX); popMessageProcessor.processRequest(handlerContext, request); offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0); - Assert.assertEquals(maxOffset - 1, offset); // checkInMem return false + assertEquals(maxOffset - 1, offset); // checkInMem return false when(messageStore.getMaxOffsetInQueue(topic, 0)).thenReturn(maxOffset * 2); popMessageProcessor.processRequest(handlerContext, request); offset = brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0); - Assert.assertEquals(maxOffset - 1, offset); // will not entry getInitOffset() again + assertEquals(maxOffset - 1, offset); // will not entry getInitOffset() again messageStore.getMaxOffsetInQueue(topic, 0); // prevent UnnecessaryStubbingException } @@ -240,4 +245,31 @@ private GetMessageResult createGetMessageResult(int msgCnt) { } return getMessageResult; } + + @Test + public void testBuildCkMsgJsonParsing() { + PopCheckPoint ck = new PopCheckPoint(); + ck.setTopic("TestTopic"); + ck.setQueueId(1); + ck.setStartOffset(100L); + ck.setCId("TestConsumer"); + ck.setPopTime(System.currentTimeMillis()); + ck.setBrokerName("TestBroker"); + + int reviveQid = 0; + PopMessageProcessor processor = new PopMessageProcessor(brokerController); + + MessageExtBrokerInner result = processor.buildCkMsg(ck, reviveQid); + + String jsonBody = new String(result.getBody(), StandardCharsets.UTF_8); + PopCheckPoint actual = JSON.parseObject(jsonBody, PopCheckPoint.class); + + assertEquals(ck.getTopic(), actual.getTopic()); + assertEquals(ck.getQueueId(), actual.getQueueId()); + assertEquals(ck.getStartOffset(), actual.getStartOffset()); + assertEquals(ck.getCId(), actual.getCId()); + assertEquals(ck.getPopTime(), actual.getPopTime()); + assertEquals(ck.getBrokerName(), actual.getBrokerName()); + assertEquals(ck.getReviveTime(), actual.getReviveTime()); + } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java index 3010e836101..e6a2cdb6cdc 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java @@ -16,13 +16,7 @@ */ package org.apache.rocketmq.broker.processor; -import com.alibaba.fastjson.JSON; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import com.alibaba.fastjson2.JSON; import org.apache.commons.lang3.tuple.Triple; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.failover.EscapeBridge; @@ -40,12 +34,13 @@ import org.apache.rocketmq.common.utils.DataConverter; import org.apache.rocketmq.common.utils.NetworkUtil; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.store.AppendMessageResult; +import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; -import org.apache.rocketmq.store.AppendMessageResult; -import org.apache.rocketmq.store.AppendMessageStatus; import org.apache.rocketmq.store.pop.AckMsg; +import org.apache.rocketmq.store.pop.BatchAckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; import org.apache.rocketmq.store.timer.TimerMessageStore; import org.junit.Assert; @@ -56,18 +51,27 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.Silent.class) public class PopReviveServiceTest { @@ -405,6 +409,59 @@ public void testReviveMsgFromCk_messageNotFound_needRetry_noEnd() throws Throwab verify(messageStore, times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK } + @Test + public void testReviveMsgFromBatchAck() throws Throwable { + brokerConfig.setEnableSkipLongAwaitingAck(true); + when(consumerOffsetManager.queryOffset(PopAckConstants.REVIVE_GROUP, REVIVE_TOPIC, REVIVE_QUEUE_ID)).thenReturn(0L); + List reviveMessageExtList = new ArrayList<>(); + long basePopTime = System.currentTimeMillis(); + reviveMessageExtList.add(buildBatchAckMsg(buildBatchAckMsg(Arrays.asList(1L, 2L, 3L), basePopTime), 1, 1, basePopTime)); + doReturn(reviveMessageExtList, new ArrayList<>()).when(popReviveService).getReviveMessage(anyLong(), anyInt()); + + PopReviveService.ConsumeReviveObj consumeReviveObj = new PopReviveService.ConsumeReviveObj(); + popReviveService.consumeReviveMessage(consumeReviveObj); + assertEquals(1, consumeReviveObj.map.size()); + + ArgumentCaptor commitOffsetCaptor = ArgumentCaptor.forClass(Long.class); + doNothing().when(consumerOffsetManager).commitOffset(anyString(), anyString(), anyString(), anyInt(), commitOffsetCaptor.capture()); + popReviveService.mergeAndRevive(consumeReviveObj); + assertEquals(1, commitOffsetCaptor.getValue().longValue()); + } + + public static MessageExtBrokerInner buildBatchAckMsg(BatchAckMsg batchAckMsg, long deliverMs, long reviveOffset, long deliverTime) { + MessageExtBrokerInner result = buildBatchAckInnerMessage(REVIVE_TOPIC, batchAckMsg, REVIVE_QUEUE_ID, STORE_HOST, deliverMs, PopMessageProcessor.genAckUniqueId(batchAckMsg)); + result.setQueueOffset(reviveOffset); + result.setDeliverTimeMs(deliverMs); + result.setStoreTimestamp(deliverTime); + return result; + } + + public static BatchAckMsg buildBatchAckMsg(Collection offsets, long popTime) { + BatchAckMsg result = new BatchAckMsg(); + result.setConsumerGroup(GROUP); + result.setTopic(TOPIC); + result.setQueueId(0); + result.setPopTime(popTime); + result.setBrokerName("broker-a"); + result.getAckOffsetList().addAll(offsets); + return result; + } + + public static MessageExtBrokerInner buildBatchAckInnerMessage(String reviveTopic, AckMsg ackMsg, int reviveQid, SocketAddress host, long deliverMs, String ackUniqueId) { + MessageExtBrokerInner result = new MessageExtBrokerInner(); + result.setTopic(reviveTopic); + result.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.CHARSET_UTF8)); + result.setQueueId(reviveQid); + result.setTags(PopAckConstants.BATCH_ACK_TAG); + result.setBornTimestamp(System.currentTimeMillis()); + result.setBornHost(host); + result.setStoreHost(host); + result.setDeliverTimeMs(deliverMs); + result.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, ackUniqueId); + result.setPropertiesString(MessageDecoder.messageProperties2String(result.getProperties())); + return result; + } + public static PopCheckPoint buildPopCheckPoint(long startOffset, long popTime, long reviveOffset) { PopCheckPoint ck = new PopCheckPoint(); ck.setStartOffset(startOffset); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java index b74e57ab936..9b25e0134c2 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java @@ -17,15 +17,11 @@ package org.apache.rocketmq.broker.topic; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONWriter; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.remoting.protocol.body.TopicQueueMappingSerializeWrapper; import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail; import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils; import org.apache.rocketmq.remoting.protocol.statictopic.TopicRemappingDetailWrapper; @@ -37,6 +33,16 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -79,9 +85,9 @@ public void testEncodeDecode() throws Exception { String topic = UUID.randomUUID().toString(); int queueNum = 10; TopicRemappingDetailWrapper topicRemappingDetailWrapper = TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, brokers, new HashMap<>()); - Assert.assertEquals(1, topicRemappingDetailWrapper.getBrokerConfigMap().size()); + assertEquals(1, topicRemappingDetailWrapper.getBrokerConfigMap().size()); TopicQueueMappingDetail topicQueueMappingDetail = topicRemappingDetailWrapper.getBrokerConfigMap().values().iterator().next().getMappingDetail(); - Assert.assertEquals(queueNum, topicQueueMappingDetail.getHostedQueues().size()); + assertEquals(queueNum, topicQueueMappingDetail.getHostedQueues().size()); mappingDetailMap.put(topic, topicQueueMappingDetail); } } @@ -89,7 +95,7 @@ public void testEncodeDecode() throws Exception { { topicQueueMappingManager = new TopicQueueMappingManager(brokerController); Assert.assertTrue(topicQueueMappingManager.load()); - Assert.assertEquals(0, topicQueueMappingManager.getTopicQueueMappingTable().size()); + assertEquals(0, topicQueueMappingManager.getTopicQueueMappingTable().size()); for (TopicQueueMappingDetail mappingDetail : mappingDetailMap.values()) { for (int i = 0; i < 10; i++) { topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, false, true); @@ -101,11 +107,49 @@ public void testEncodeDecode() throws Exception { { topicQueueMappingManager = new TopicQueueMappingManager(brokerController); Assert.assertTrue(topicQueueMappingManager.load()); - Assert.assertEquals(mappingDetailMap.size(), topicQueueMappingManager.getTopicQueueMappingTable().size()); + assertEquals(mappingDetailMap.size(), topicQueueMappingManager.getTopicQueueMappingTable().size()); for (TopicQueueMappingDetail topicQueueMappingDetail: topicQueueMappingManager.getTopicQueueMappingTable().values()) { - Assert.assertEquals(topicQueueMappingDetail, mappingDetailMap.get(topicQueueMappingDetail.getTopic())); + assertEquals(topicQueueMappingDetail, mappingDetailMap.get(topicQueueMappingDetail.getTopic())); } } delete(topicQueueMappingManager); } + + @Test + public void testEncodePretty() { + TopicQueueMappingManager topicQueueMappingManager = new TopicQueueMappingManager(null); + TopicQueueMappingDetail detail = new TopicQueueMappingDetail(); + detail.setTopic("testTopic"); + detail.setBname("testBroker"); + + topicQueueMappingManager.getTopicQueueMappingTable().put("testTopic", detail); + topicQueueMappingManager.getDataVersion().nextVersion(); + + String actual = topicQueueMappingManager.encode(true); + TopicQueueMappingSerializeWrapper expectedWrapper = new TopicQueueMappingSerializeWrapper(); + expectedWrapper.setTopicQueueMappingInfoMap(new ConcurrentHashMap<>(topicQueueMappingManager.getTopicQueueMappingTable())); + expectedWrapper.setDataVersion(topicQueueMappingManager.getDataVersion()); + String expected = JSON.toJSONString(expectedWrapper, JSONWriter.Feature.PrettyFormat); + + assertEquals(expected, actual); + } + + @Test + public void testEncodeNonPretty() { + TopicQueueMappingManager topicQueueMappingManager = new TopicQueueMappingManager(null); + TopicQueueMappingDetail detail = new TopicQueueMappingDetail(); + detail.setTopic("testTopic"); + detail.setBname("testBroker"); + + topicQueueMappingManager.getTopicQueueMappingTable().put("testTopic", detail); + topicQueueMappingManager.getDataVersion().nextVersion(); + + String actual = topicQueueMappingManager.encode(false); + TopicQueueMappingSerializeWrapper expectedWrapper = new TopicQueueMappingSerializeWrapper(); + expectedWrapper.setTopicQueueMappingInfoMap(new ConcurrentHashMap<>(topicQueueMappingManager.getTopicQueueMappingTable())); + expectedWrapper.setDataVersion(topicQueueMappingManager.getDataVersion()); + String expected = JSON.toJSONString(expectedWrapper); + + assertEquals(expected, actual); + } } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java index 690b4eabb57..62a6ad8b5b9 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java @@ -19,23 +19,40 @@ import org.apache.rocketmq.broker.transaction.TransactionMetrics; import org.apache.rocketmq.broker.transaction.TransactionMetrics.Metric; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Collections; +import java.util.UUID; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; @RunWith(MockitoJUnitRunner.class) public class TransactionMetricsTest { private TransactionMetrics transactionMetrics; private String configPath; + private Path path; @Before - public void setUp() throws Exception { - configPath = "configPath"; - transactionMetrics = new TransactionMetrics(configPath); + public void before() throws Exception { + configPath = createBaseDir(); + path = Paths.get(configPath); + transactionMetrics = spy(new TransactionMetrics(configPath)); + } + + @After + public void after() throws Exception { + deleteFile(configPath); + assertFalse(path.toFile().exists()); } /** @@ -80,4 +97,40 @@ public void testCleanMetrics() { transactionMetrics.cleanMetrics(Collections.singleton(topic)); assert transactionMetrics.getTransactionCount(topic) == 0; } + + @Test + public void testPersist() { + assertFalse(path.toFile().exists()); + transactionMetrics.persist(); + assertTrue(path.toFile().exists()); + verify(transactionMetrics).persist(); + } + + private String createBaseDir() { + String baseDir = System.getProperty("java.io.tmpdir") + File.separator + "unitteststore-" + UUID.randomUUID(); + final File file = new File(baseDir); + if (file.exists()) { + System.exit(1); + } + return baseDir; + } + + private void deleteFile(String fileName) { + deleteFile(new File(fileName)); + } + + private void deleteFile(File file) { + if (!file.exists()) { + return; + } + if (file.isFile()) { + file.delete(); + } else if (file.isDirectory()) { + File[] files = file.listFiles(); + for (File file1 : files) { + deleteFile(file1); + } + file.delete(); + } + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java index 38e0a207528..67cf045cfb8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java +++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java @@ -16,7 +16,7 @@ */ package org.apache.rocketmq.store.pop; -import com.alibaba.fastjson.annotation.JSONField; +import com.alibaba.fastjson2.annotation.JSONField; import java.util.ArrayList; import java.util.List; @@ -35,7 +35,6 @@ public class PopCheckPoint implements Comparable { private int queueId; @JSONField(name = "t") private String topic; - @JSONField(name = "c") private String cid; @JSONField(name = "ro") private long reviveOffset; @@ -114,10 +113,12 @@ public void setTopic(String topic) { this.topic = topic; } + @JSONField(name = "c") public String getCId() { return cid; } + @JSONField(name = "c") public void setCId(String cid) { this.cid = cid; }