Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
@RunWith(MockitoJUnitRunner.Silent.class)
public class LiteLifecycleManagerTest {

private final static BrokerConfig BROKER_CONFIG = new BrokerConfig();
Expand Down Expand Up @@ -177,6 +177,7 @@ public void testCleanExpiredLiteTopic() {
}
}

@Ignore("Flaky: fails 2/100 runs (2.0%)")
@Test
public void testCleanByParentTopic() {
int num = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,15 @@ public void testConsumeTheChangeInvisibleShorter() {

@Test
public void testRecover() {
long recoverPopTime = System.currentTimeMillis();
QueueLevelConsumerManager savedConsumerOrderInfoManager = new QueueLevelConsumerManager();
savedConsumerOrderInfoManager.update(
null,
false,
TOPIC,
GROUP,
QUEUE_ID_0,
popTime,
recoverPopTime,
3000,
Lists.newArrayList(1L),
new StringBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.junit.Ignore;

@RunWith(MockitoJUnitRunner.class)
public class TransactionalMessageServiceImplTest {
Expand Down Expand Up @@ -162,6 +163,7 @@ public void testDeletePrepareMessage_queueFull() throws InterruptedException {
assertThat(res).isFalse();
}

@Ignore("Flaky: fails 1/100 runs (1.0%)")
@Test
public void testDeletePrepareMessage_maxSize() throws InterruptedException {
brokerController.getBrokerConfig().setTransactionOpMsgMaxSize(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import org.junit.Ignore;

@RunWith(MockitoJUnitRunner.Silent.class)
public class DefaultMQConsumerWithTraceTest {
Expand Down Expand Up @@ -216,6 +217,7 @@ public void terminate() {
pushConsumer.shutdown();
}

@Ignore("Flaky: fails 1/100 runs (1.0%)")
@Test
public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import org.junit.Ignore;

@RunWith(MockitoJUnitRunner.class)
public class DefaultMQLitePullConsumerWithTraceTest {
Expand Down Expand Up @@ -153,6 +154,7 @@ public void destroy() {
}
}

@Ignore("Flaky: fails 6/100 runs (6.0%)")
@Test
public void testSubscribe_PollMessageSuccess_WithDefaultTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = createLitePullConsumerWithDefaultTraceTopic();
Expand All @@ -169,6 +171,7 @@ public void testSubscribe_PollMessageSuccess_WithDefaultTraceTopic() throws Exce
}
}

@Ignore("Flaky: fails 5/100 runs (5.0%)")
@Test
public void testSubscribe_PollMessageSuccess_WithCustomizedTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = createLitePullConsumerWithCustomizedTraceTopic();
Expand Down
56 changes: 56 additions & 0 deletions docs/cn/Flaky_Test_Detector_Plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# RocketMQ Flaky Test 检测方案

## 背景与目标

RocketMQ 主干 CI 经常出现间歇性测试失败,导致开发者对红色构建产生信任疲劳,真正的回归问题容易被掩盖。本方案通过大规模重复执行统计失败率,对不稳定方法(≥1%)标记 `@Ignore` 恢复 CI 可靠性,同时保留数据为后续修复提供优先级依据。

## 方法论来源

- **Google** — [Flaky Tests at Google and How We Mitigate Them](https://testing.googleblog.com/2016/05/flaky-tests-at-google-and-how-we.html)(2016):提出 "deflake"(重复执行 N 次统计失败率)和 "quarantine"(将不稳定测试从主线 CI 隔离)。内部数据:约 1.5% 的测试存在 flakiness,16% 曾经 flaky 过。
- **Meta** — [Predictive Test Selection](https://engineering.fb.com/2018/11/21/developer-tools/predictive-test-selection/)(2018):通过 aggressive retry 区分 flaky 失败与真实回归。
- **Spotify** — [Test Flakiness Methods](https://engineering.atspotify.com/2019/11/test-flakiness-methods-for-identifying-and-dealing-with-flaky-tests/)(2019):重复执行 + 隔离 + 追踪的三阶段治理框架。

## 核心思路:三层漏斗

采用"粗筛 → 精筛 → 定位"逐步缩小范围,避免在全量方法级别浪费算力:

```
第一层:模块级(16 模块 × 100 次)→ 筛出有失败的模块
第二层:类级(仅不稳定模块中的测试类 × 100 次)→ 筛出有失败的类
第三层:方法级(仅不稳定类中的测试方法 × 100 次)→ 精确定位每个方法的失败率
```

每层执行完后分析 Surefire XML 报告,输出不稳定列表作为下一层的输入。标记后重新全量执行验证,如仍有新 flaky 出现则循环标记 + 验证,直到零失败。

## 执行架构

- **控制节点(本地)**:编排任务分发、结果收集、数据分析
- **工作节点(10 台 ECS,16C 64G)**:每台最多 4 个 Docker 容器并行执行测试,互不干扰

## 执行流程

```
1. 构建 → Docker 内 JDK 8 编译 RocketMQ,打包为测试镜像
2. 分发 → 内网中转分发镜像到所有工作节点
3. 派发 → 生成任务列表,均匀拆分到各节点,启动 worker
4. 收集 → 轮询等待完成,回收 Surefire XML 报告
5. 分析 → 解析 XML,统计失败次数和失败率
6. 标记 → 对超过阈值的方法添加 @Ignore
7. 验证 → 重新构建并全量执行,确认主干稳定
```

## 关键设计决策

| 决策点 | 选择 | 原因 |
|--------|------|------|
| 编译环境 | Docker 内 JDK 8 | 本地 JDK 版本不一致,容器内保证一致性 |
| 镜像分发 | 先传一台再内网中转 | 内网带宽远大于公网 |
| 测试隔离 | 每轮独立容器 | 避免进程残留、端口占用等干扰 |
| 失败判定 | ≥1% 失败率 | 1000 次有效执行下 1% 约 10 次失败,平衡误判与漏判 |
| 标记方式 | `@Ignore` + 失败率注释 | 最小侵入,方便后续逐个启用 |
| 验证循环 | 标记后重新全量跑 | 处理"隐藏 flaky"问题 |

## 后续计划

- 对高失败率方法(>10%)优先根因分析并修复,修复后移除 `@Ignore` 并重新验证
- 考虑将检测工具集成到定期 CI 任务中,持续监控测试稳定性
56 changes: 56 additions & 0 deletions docs/en/Flaky_Test_Detector_Plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# RocketMQ Flaky Test Detection Plan

## Background & Goals

RocketMQ's mainline CI frequently experiences intermittent test failures, causing developer trust fatigue toward red builds and masking real regressions. This plan uses large-scale repeated execution to statistically measure failure rates, marks unstable methods (≥1%) with `@Ignore` to restore CI reliability, and retains the data to prioritize subsequent fixes.

## Methodology References

- **Google** — [Flaky Tests at Google and How We Mitigate Them](https://testing.googleblog.com/2016/05/flaky-tests-at-google-and-how-we.html) (2016): Introduced "deflake" (run N times to measure failure rate) and "quarantine" (isolate flaky tests from mainline CI). Internal data: ~1.5% of tests are flaky, 16% have been flaky at some point.
- **Meta** — [Predictive Test Selection](https://engineering.fb.com/2018/11/21/developer-tools/predictive-test-selection/) (2018): Uses aggressive retry to separate flaky failures from real regressions.
- **Spotify** — [Test Flakiness Methods](https://engineering.atspotify.com/2019/11/test-flakiness-methods-for-identifying-and-dealing-with-flaky-tests/) (2019): Three-stage framework of repeated execution + isolation + tracking.

## Core Idea: Three-Layer Funnel

A "coarse → fine → pinpoint" strategy to progressively narrow scope and avoid wasting compute at the full method level:

```
Layer 1: Module level (16 modules × 100 runs) → filter out modules with failures
Layer 2: Class level (only classes in unstable modules × 100 runs) → filter out classes with failures
Layer 3: Method level (only methods in unstable classes × 100 runs) → precisely locate each method's failure rate
```

After each layer, Surefire XML reports are analyzed and the unstable list feeds the next layer. After marking, a full re-run verifies stability; if new flaky tests surface, the mark + verify cycle repeats until zero failures.

## Execution Architecture

- **Control node (local)**: Orchestrates task distribution, result collection, data analysis
- **Worker nodes (10 ECS, 16C 64G each)**: Max 4 Docker containers per node in parallel, each test run isolated

## Execution Flow

```
1. Build → Compile RocketMQ with JDK 8 inside Docker, package as test image
2. Distribute → Relay image via internal network to all worker nodes
3. Dispatch → Generate task list, split evenly across nodes, start workers
4. Collect → Poll until complete, retrieve Surefire XML reports
5. Analyze → Parse XML, compute failure count and rate per method
6. Mark → Add @Ignore to methods exceeding threshold
7. Verify → Rebuild and run full suite to confirm trunk stability
```

## Key Design Decisions

| Decision Point | Choice | Rationale |
|---------------|--------|-----------|
| Build environment | JDK 8 inside Docker | Local JDK versions vary; container ensures consistency |
| Image distribution | Upload to one node, relay via internal network | Internal bandwidth far exceeds public internet |
| Test isolation | Independent container per run | Avoids residual processes, port conflicts |
| Failure threshold | ≥1% failure rate | ~10 failures across 1000 effective runs; balances false positives vs. missed cases |
| Marking approach | `@Ignore` + failure rate comment | Minimal intrusion, easy to re-enable later |
| Verification loop | Full re-run after marking | Handles "hidden flaky" problem |

## Follow-up Plan

- Prioritize root-cause analysis and fix for high failure rate methods (>10%); remove `@Ignore` and re-verify after fix
- Consider integrating the detection tool into periodic CI tasks for continuous stability monitoring
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Random;

import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Ignore;

public class BloomFilterTest {

Expand Down Expand Up @@ -142,6 +143,7 @@ public void testBloomFilterData() {
assertThat(bloomFilter.isValid(bloomFilterData)).isFalse();
}

@Ignore("Flaky: fails 1/100 runs (1.0%)")
@Test
public void testCheckFalseHit() {
BloomFilter bloomFilter = BloomFilter.createByFn(1, 300);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand All @@ -48,6 +49,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@Ignore("Flaky: multiple methods fail intermittently in CI with 'expected:<8> but was:<2>' due to async race in pop priority consume")
@RunWith(Parameterized.class)
public class PopPriorityIT extends BasePopNormally {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ public void concurrentGetTest() throws InterruptedException {
storeConfig.setTieredStoreIndexFileMaxIndexNum(2000);
indexService = new IndexStoreService(fileAllocator, filePath);
indexService.start();
// Wait for service thread to complete its first iteration and enter 10s wait,
// preventing compaction from racing with queries below.
TimeUnit.MILLISECONDS.sleep(500);

int fileCount = 10;
for (int j = 0; j < fileCount; j++) {
Expand Down Expand Up @@ -357,35 +360,32 @@ public void queryCrossFileBoundaryTest() throws InterruptedException, ExecutionE
indexService = new IndexStoreService(fileAllocator, filePath);
indexService.start();

// Create first file with early beginTime
long file1Begin = System.currentTimeMillis();
for (int i = 0; i < storeConfig.getTieredStoreIndexFileMaxIndexNum() - 1; i++) {
long file1Begin = indexService.getTimeStoreTable().firstKey();

// Fill file1 completely to trigger SEALED and create file2.
// maxIndexNum=20, seals when indexItemCount + 1 >= 20, so 20 puts will seal and overflow.
for (int i = 0; i < storeConfig.getTieredStoreIndexFileMaxIndexNum(); i++) {
indexService.putKey(TOPIC_NAME, TOPIC_ID, QUEUE_ID,
Collections.singleton("crossKey"), i * 100L, MESSAGE_SIZE, file1Begin + i * 1000);
}

// Create second file with later beginTime (beyond query range)
long file2Begin = System.currentTimeMillis() + 100_000;
indexService.createNewIndexFile(file2Begin);
// One more put to go into file2
long file2ItemTimestamp = file1Begin + 100_000;
for (int i = 0; i < 5; i++) {
indexService.putKey(TOPIC_NAME, TOPIC_ID, QUEUE_ID,
Collections.singleton("crossKey"), i * 100L, MESSAGE_SIZE, file2Begin + i);
Collections.singleton("crossKey"), (20 + i) * 100L, MESSAGE_SIZE, file2ItemTimestamp + i);
}

Assert.assertEquals(2, indexService.getTimeStoreTable().size());

// Query range: beginTime is AFTER file1's beginTime but BEFORE file1's last item timestamp
// This should select file1, NOT file2 (file2 beginTime > queryEnd)
// Query range starts AFTER file1's beginTimestamp but covers file1's items.
// This verifies headMap(endTime) correctly includes file1 even though file1.key < queryBegin.
long queryBegin = file1Begin + 5_000;
long queryEnd = file1Begin + 15_000;

List<IndexItem> results = indexService.queryAsync(
TOPIC_NAME, "crossKey", 10, queryBegin, queryEnd).get();
TOPIC_NAME, "crossKey", 50, queryBegin, queryEnd).get();

// file1 has items at timestamps: file1Begin, file1Begin+1000, ..., file1Begin+(N-1)*1000
// Items in range [file1Begin+5000, file1Begin+15000] should match
// The bug (subMap) would return empty because file1's key < queryBegin
Assert.assertFalse("Should find index items from file covering query range", results.isEmpty());
Assert.assertTrue("Should find items within query time range", results.size() > 0);
}
}
Loading