Skip to content

Commit 2f1b8fb

Browse files
committed
More progress.
1 parent cb910ef commit 2f1b8fb

4 files changed

Lines changed: 249 additions & 15 deletions

File tree

randomizedtesting-jupiter/src/main/java/com/carrotsearch/randomizedtesting/jupiter/DetectThreadLeaks.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,18 @@ enum Scope {
3030
/** Check for leaked threads after each individual test method. */
3131
TEST
3232
}
33+
34+
/**
35+
* Milliseconds to wait for leaked threads to self-terminate before declaring a failure. If all
36+
* leaked threads terminate within this window, the test passes. Default is 0 (no lingering).
37+
*
38+
* <p>Place this annotation on the same class as {@link DetectThreadLeaks}.
39+
*/
40+
@Target({ElementType.TYPE, ElementType.METHOD})
41+
@Retention(RetentionPolicy.RUNTIME)
42+
@Documented
43+
@Inherited
44+
@interface LingerTime {
45+
int millis();
46+
}
3347
}

randomizedtesting-jupiter/src/main/java/com/carrotsearch/randomizedtesting/jupiter/DetectThreadLeaksExtension.java

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.carrotsearch.randomizedtesting.jupiter;
22

33
import java.util.HashSet;
4+
import java.util.Map;
5+
import java.util.concurrent.TimeUnit;
46
import java.util.logging.Logger;
57
import java.util.stream.Collectors;
68
import org.junit.jupiter.api.extension.AfterAllCallback;
@@ -20,6 +22,9 @@ public class DetectThreadLeaksExtension
2022
private static final String SNAPSHOT_KEY = "snapshot";
2123
private static final String CONCURRENT_KEY = "concurrent";
2224

25+
/** Total time budget (ms) to join interrupted threads before giving up. */
26+
private static final long INTERRUPT_JOIN_MS = 2_000L;
27+
2328
@Override
2429
public void beforeAll(ExtensionContext context) {
2530
if (context.getExecutionMode() != ExecutionMode.SAME_THREAD) {
@@ -40,7 +45,10 @@ public void afterAll(ExtensionContext context) {
4045
if (isConcurrentMode(context) || scope(context) != DetectThreadLeaks.Scope.SUITE) {
4146
return;
4247
}
43-
checkLeaks(context.getStore(EXTENSION_NAMESPACE), "suite [" + context.getDisplayName() + "]");
48+
checkLeaks(
49+
context.getStore(EXTENSION_NAMESPACE),
50+
"suite [" + context.getDisplayName() + "]",
51+
linger(context));
4452
}
4553

4654
@Override
@@ -56,13 +64,29 @@ public void afterEach(ExtensionContext context) {
5664
if (isConcurrentMode(context) || scope(context) != DetectThreadLeaks.Scope.TEST) {
5765
return;
5866
}
59-
checkLeaks(context.getStore(EXTENSION_NAMESPACE), "test [" + context.getDisplayName() + "]");
67+
checkLeaks(
68+
context.getStore(EXTENSION_NAMESPACE),
69+
"test [" + context.getDisplayName() + "]",
70+
linger(context));
6071
}
6172

6273
private static DetectThreadLeaks.Scope scope(ExtensionContext context) {
6374
return context.getRequiredTestClass().getAnnotation(DetectThreadLeaks.class).scope();
6475
}
6576

77+
private static int linger(ExtensionContext context) {
78+
// Method-level annotation takes precedence over class-level.
79+
var methodAnn =
80+
context
81+
.getTestMethod()
82+
.map(m -> m.getAnnotation(DetectThreadLeaks.LingerTime.class))
83+
.orElse(null);
84+
if (methodAnn != null) return methodAnn.millis();
85+
86+
var classAnn = context.getRequiredTestClass().getAnnotation(DetectThreadLeaks.LingerTime.class);
87+
return classAnn == null ? 0 : classAnn.millis();
88+
}
89+
6690
private static boolean isConcurrentMode(ExtensionContext context) {
6791
// Check the concurrent flag stored in beforeAll (class-level context = parent of method ctx).
6892
return context
@@ -74,26 +98,70 @@ private static boolean isConcurrentMode(ExtensionContext context) {
7498
.orElse(false);
7599
}
76100

77-
private static void checkLeaks(ExtensionContext.Store store, String description) {
101+
private static void checkLeaks(ExtensionContext.Store store, String description, int lingerMs) {
78102
var snapshot = store.get(SNAPSHOT_KEY, HashSet.class);
79103
if (snapshot == null) return;
80104

81-
var leaked = liveThreads();
82-
leaked.removeAll(snapshot);
83-
leaked.removeIf(t -> !t.isAlive());
105+
var leaked = leakedSince(snapshot);
106+
if (leaked.isEmpty()) return;
107+
108+
// Linger: poll until threads self-terminate or the window expires.
109+
if (lingerMs > 0) {
110+
long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(lingerMs);
111+
while (!leaked.isEmpty() && System.nanoTime() < deadline) {
112+
try {
113+
long remainingMs = TimeUnit.NANOSECONDS.toMillis(deadline - System.nanoTime());
114+
Thread.sleep(Math.max(1L, Math.min(100L, remainingMs)));
115+
} catch (InterruptedException e) {
116+
Thread.currentThread().interrupt();
117+
break;
118+
}
119+
leaked = leakedSince(snapshot);
120+
}
121+
if (leaked.isEmpty()) return;
122+
}
123+
124+
// Interrupt leaked threads for cleanup, then wait briefly for them to terminate.
125+
leaked.keySet().forEach(Thread::interrupt);
126+
long joinDeadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(INTERRUPT_JOIN_MS);
127+
for (Thread t : leaked.keySet()) {
128+
long remaining = TimeUnit.NANOSECONDS.toMillis(joinDeadline - System.nanoTime());
129+
if (remaining <= 0) break;
130+
try {
131+
t.join(remaining);
132+
} catch (InterruptedException e) {
133+
Thread.currentThread().interrupt();
134+
break;
135+
}
136+
}
84137

85-
if (!leaked.isEmpty()) {
86-
var sb = new StringBuilder(leaked.size() + " thread(s) leaked from " + description + ":");
87-
leaked.forEach(t -> sb.append("\n ").append(Threads.threadName(t)));
88-
throw new AssertionError(sb.toString());
138+
// Report failure with stack traces captured before the interrupt.
139+
var sb = new StringBuilder(leaked.size() + " thread(s) leaked from " + description + ":");
140+
int cnt = 1;
141+
for (var entry : leaked.entrySet()) {
142+
sb.append(String.format("%n %2d) %s", cnt++, Threads.threadName(entry.getKey())));
143+
for (var ste : entry.getValue()) {
144+
sb.append(String.format("%n at %s", ste));
145+
}
89146
}
147+
throw new AssertionError(sb.toString());
148+
}
149+
150+
private static Map<Thread, StackTraceElement[]> leakedSince(HashSet<?> snapshot) {
151+
var current = liveThreadsWithStacks();
152+
current.keySet().removeAll(snapshot);
153+
return current;
90154
}
91155

92156
private static HashSet<Thread> liveThreads() {
93-
return Thread.getAllStackTraces().keySet().stream()
94-
.filter(Thread::isAlive)
95-
.filter(t -> !isKnownSystemThread(t))
96-
.collect(Collectors.toCollection(HashSet::new));
157+
return new HashSet<>(liveThreadsWithStacks().keySet());
158+
}
159+
160+
private static Map<Thread, StackTraceElement[]> liveThreadsWithStacks() {
161+
return Thread.getAllStackTraces().entrySet().stream()
162+
.filter(e -> e.getKey().isAlive())
163+
.filter(e -> !isKnownSystemThread(e.getKey()))
164+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
97165
}
98166

99167
private static boolean isKnownSystemThread(Thread t) {

randomizedtesting-jupiter/src/test/java/com/carrotsearch/randomizedtesting/jupiter/F005_ThreadLeaks.java

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.carrotsearch.randomizedtesting.jupiter.infra.IgnoreInStandaloneRuns;
88
import java.util.concurrent.TimeUnit;
99
import java.util.stream.Stream;
10+
import org.assertj.core.api.Condition;
1011
import org.junit.jupiter.api.AfterAll;
1112
import org.junit.jupiter.api.BeforeAll;
1213
import org.junit.jupiter.api.DynamicTest;
@@ -70,6 +71,33 @@ static void beforeAll() {
7071
}
7172
}
7273

74+
@Nested
75+
class TestStackTracesInMessage {
76+
@Test
77+
void leakErrorMessageContainsStackTrace() {
78+
collectExecutionResults(testKitBuilder(SuiteScopeWithLeak.class))
79+
.results()
80+
.allEvents()
81+
.finished()
82+
.failed()
83+
.assertEventsMatchExactly(
84+
event(
85+
finishedWithFailure(
86+
instanceOf(AssertionError.class),
87+
new Condition<>(
88+
t -> t.getMessage().contains(getClass().getPackageName()),
89+
"error message contains stack frames"))));
90+
}
91+
92+
@DetectThreadLeaks(scope = DetectThreadLeaks.Scope.SUITE)
93+
static class SuiteScopeWithLeak extends IgnoreInStandaloneRuns {
94+
@Test
95+
void testMethod() {
96+
startSleepingThread();
97+
}
98+
}
99+
}
100+
73101
@Nested
74102
class TestTestScope {
75103
@Test
@@ -91,6 +119,114 @@ void testMethod() {
91119
}
92120
}
93121

122+
@Nested
123+
class TestLinger {
124+
@Test
125+
void threadTerminatingWithinLingerWindowPasses() {
126+
collectExecutionResults(testKitBuilder(ShortLivedLeak.class))
127+
.results()
128+
.allEvents()
129+
.assertThatEvents()
130+
.doNotHave(event(finishedWithFailure()));
131+
}
132+
133+
@Test
134+
void methodLingerTakesPrecedenceOverAbsentClassLinger() {
135+
collectExecutionResults(testKitBuilder(MethodLingerOverridesAbsentClassLinger.class))
136+
.results()
137+
.allEvents()
138+
.assertThatEvents()
139+
.doNotHave(event(finishedWithFailure()));
140+
}
141+
142+
@Test
143+
void methodLingerTakesPrecedenceOverClassLinger() {
144+
collectExecutionResults(testKitBuilder(MethodLingerOverridesClassLinger.class))
145+
.results()
146+
.allEvents()
147+
.assertThatEvents()
148+
.doNotHave(event(finishedWithFailure()));
149+
}
150+
151+
@Test
152+
void threadOutlastingLingerWindowFails() {
153+
collectExecutionResults(testKitBuilder(LongLivedLeak.class))
154+
.results()
155+
.allEvents()
156+
.finished()
157+
.failed()
158+
.assertEventsMatchExactly(event(finishedWithFailure(instanceOf(AssertionError.class))));
159+
}
160+
161+
// Class linger 10s; thread sleeps 100ms → terminates before linger expires → pass.
162+
@DetectThreadLeaks(scope = DetectThreadLeaks.Scope.SUITE)
163+
@DetectThreadLeaks.LingerTime(millis = 10_000)
164+
static class ShortLivedLeak extends IgnoreInStandaloneRuns {
165+
@Test
166+
void testMethod() {
167+
var t =
168+
new Thread(
169+
() -> {
170+
try {
171+
Thread.sleep(100);
172+
} catch (InterruptedException ignored) {
173+
}
174+
});
175+
t.setDaemon(true);
176+
t.start();
177+
}
178+
}
179+
180+
// Class linger 50ms; thread sleeps 1 min → outlasts linger → fail.
181+
@DetectThreadLeaks(scope = DetectThreadLeaks.Scope.SUITE)
182+
@DetectThreadLeaks.LingerTime(millis = 50)
183+
static class LongLivedLeak extends IgnoreInStandaloneRuns {
184+
@Test
185+
void testMethod() {
186+
startSleepingThread();
187+
}
188+
}
189+
190+
// Method linger 10s overrides absent class linger; thread sleeps 100ms → pass.
191+
@DetectThreadLeaks(scope = DetectThreadLeaks.Scope.TEST)
192+
static class MethodLingerOverridesAbsentClassLinger extends IgnoreInStandaloneRuns {
193+
@Test
194+
@DetectThreadLeaks.LingerTime(millis = 10_000)
195+
void testMethod() {
196+
var t =
197+
new Thread(
198+
() -> {
199+
try {
200+
Thread.sleep(100);
201+
} catch (InterruptedException ignored) {
202+
}
203+
});
204+
t.setDaemon(true);
205+
t.start();
206+
}
207+
}
208+
209+
// Method linger 10s overrides class linger 50ms; thread sleeps 100ms → pass.
210+
@DetectThreadLeaks(scope = DetectThreadLeaks.Scope.TEST)
211+
@DetectThreadLeaks.LingerTime(millis = 50)
212+
static class MethodLingerOverridesClassLinger extends IgnoreInStandaloneRuns {
213+
@Test
214+
@DetectThreadLeaks.LingerTime(millis = 10_000)
215+
void testMethod() {
216+
var t =
217+
new Thread(
218+
() -> {
219+
try {
220+
Thread.sleep(100);
221+
} catch (InterruptedException ignored) {
222+
}
223+
});
224+
t.setDaemon(true);
225+
t.start();
226+
}
227+
}
228+
}
229+
94230
@Nested
95231
class TestConcurrentMode {
96232
@Test

randomizedtesting-jupiter/src/test/java/com/carrotsearch/randomizedtesting/jupiter/F005_ThreadLeaks.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
## Functionality
44

55
* It should be possible to add a `@DetectThreadLeaks` extension which detects new threads forked within the test
6-
container. This extension takes a single parameter - the scope of detection. Either we care about threads leaked
6+
container. This extension takes a parameter - the scope of detection. Either we care about threads leaked
77
from the entire container or from each individual test. Here is an example of use:
88

99
```java
@@ -22,6 +22,22 @@ public class TestClass {
2222
* The extension is only functional in sequential mode. It should emit a warning and do nothing if tests are
2323
run in concurrent mode.
2424

25+
* Occasionally there will be threads that cannot be joined but will eventually terminate. One can specify an additional
26+
"linger" time before the thread leak is reported, for example one second, below:
27+
28+
```java
29+
@DetectThreadLeaks(scope = DetectThreadLeaks.Scope.SUITE)
30+
@DetectThreadLeaks.LingerTime(millis = 1_000)
31+
public class TestClass {
32+
@Test
33+
public void testMethod() {
34+
new Thread(() -> {
35+
try { Thread.sleep(1000); } catch (Exception e) {}
36+
}).start();
37+
}
38+
}
39+
```
40+
2541
## Migration notes (from randomizedtesting for junit4)
2642

2743
*

0 commit comments

Comments
 (0)