Skip to content

Commit d9d985c

Browse files
authored
4.x: Detect un-awaited CompletionStageDisposables during tests (#8093)
* 4.x: Detect un-awaited CompletionStageDisposables during tests * Promote the Cleaner api use in readme.
1 parent 73241eb commit d9d985c

File tree

4 files changed

+201
-21
lines changed

4 files changed

+201
-21
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ It extends the [observer pattern](http://en.wikipedia.org/wiki/Observer_pattern)
1919
- :+1: `java.util.concurrent.Flow`-based implementation.
2020
- :+1: Virtual Thread support; `virtualCreate()`, `virtualTransform()`, :eye: `Schedulers.virtual()`.
2121
- :+1: New `Streamable<T>` built around Virtual Threads & virtual blocking. Think `IAsyncEnumerable` for Java. :satellite: in progress.
22+
- :+1: Using Java Cleaner API to detect resource leaks and using it for adaptive cleanups.
2223
- :information_source: Reactive Streams Test Compatibility Kit usage; [Reactive-Streams](https://github.com/reactive-streams/reactive-streams-jvm).
2324
- :satellite: Rewamp of the javadoc bloat in the base types via `sealed` interfaces.
2425
- :satellite: Reduce overload bloat by using `record`-based configurations.
2526
- :satellite: Internal optimizations now that I have the master :key:.
2627
- :eye: Possible usages for Scoped variables for context and per-item resource management.
27-
- :eye: Possible use for the Java Cleaner API.
2828
- :eye: Possible inclusion of 2nd and 3rd party operators.
2929
- :eye: Possible inclusion of the Iterable Extensions (Ix) 2nd party library. ju.Stream is sh|t wrt interfacing and composability.
3030
- :question: Android compatibility depends on your API level and what desugaring is available.

src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java

Lines changed: 106 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,77 @@
1313

1414
package io.reactivex.rxjava4.core;
1515

16+
import java.lang.ref.Cleaner;
17+
import java.util.Objects;
1618
import java.util.concurrent.CompletionStage;
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
import java.util.function.Consumer;
1721

1822
import io.reactivex.rxjava4.annotations.NonNull;
1923
import io.reactivex.rxjava4.disposables.*;
24+
import io.reactivex.rxjava4.plugins.RxJavaPlugins;
2025

2126
/**
2227
* Consist of a terminal stage and a disposable to be able to cancel a sequence.
2328
* @param <T> the return and element type of the various stages
24-
* @param stage the embedded stage to work with
25-
* @param disposable the way to cancel the stage concurrently
2629
* @since 4.0.0
2730
*/
28-
public record CompletionStageDisposable<T>(@NonNull CompletionStage<T> stage, @NonNull Disposable disposable) {
31+
public final class CompletionStageDisposable<T> implements AutoCloseable {
2932

33+
// record classes can't have extra fields, why?
34+
// also I have to write out the constructor instead of declaring it in the record definition, FFS
35+
36+
static final Cleaner cleaner = Cleaner.create();
37+
38+
static volatile Consumer<Cleaner.Cleanable> trackAllocations;
39+
40+
static final class State extends AtomicBoolean implements Runnable {
41+
42+
/** */
43+
private static final long serialVersionUID = 262854674341831347L;
44+
45+
Throwable allocationTrace;
46+
47+
@Override
48+
public void run() {
49+
if (!get()) {
50+
RxJavaPlugins.onError(
51+
new IllegalStateException("CompletionStageDisposable was not awaited or ignored explicitly",
52+
allocationTrace));
53+
}
54+
}
55+
56+
}
57+
58+
final CompletionStage<T> stage;
59+
final Disposable disposable;
60+
final State state;
61+
final Cleaner.Cleanable cleanable;
62+
63+
/**
64+
* Construct an instance with parameters
65+
* @param stage the stage to be awaited
66+
* @param disposable the disposable to cancel asynchronously
67+
*/
68+
public CompletionStageDisposable(@NonNull CompletionStage<T> stage, @NonNull Disposable disposable) {
69+
Objects.requireNonNull(stage, "stage is null");
70+
Objects.requireNonNull(disposable, "disposable is null");
71+
this.stage = stage;
72+
this.disposable = disposable;
73+
this.state = new State();
74+
this.cleanable = cleaner.register(this, state);
75+
if (trackAllocations != null) {
76+
state.allocationTrace = new StackOverflowError("CompletionStageDisposable::AllocationTrace");
77+
trackAllocations.accept(this.cleanable);
78+
} else {
79+
state.allocationTrace = null;
80+
}
81+
}
3082
/**
3183
* Await the completion of the current stage.
3284
*/
3385
public void await() {
86+
state.lazySet(true);;
3487
Streamer.await(stage);
3588
}
3689

@@ -39,6 +92,56 @@ public void await() {
3992
* @param canceller the canceller link
4093
*/
4194
public void await(DisposableContainer canceller) {
95+
state.lazySet(true);;
4296
Streamer.await(stage, canceller);
4397
}
98+
99+
/**
100+
* Indicate this instance is deliberately not awaiting its stage.
101+
*/
102+
public void ignore() {
103+
state.lazySet(true);;
104+
}
105+
106+
@Override
107+
public void close() {
108+
try {
109+
state.lazySet(true);
110+
disposable.dispose();
111+
} finally {
112+
cleanable.clean();
113+
}
114+
}
115+
116+
/**
117+
* Set an allocator tracer callback to track where CompletionStageDisposables are leaking.
118+
* @param callback the callback to call when a new trace is being established
119+
*/
120+
public static void setAllocationTrace(Consumer<Cleaner.Cleanable> callback) {
121+
trackAllocations = callback;
122+
}
123+
124+
/**
125+
* Returns the current allocation stacktrace capturing consumer.
126+
* @return the current allocation stacktrace capturing consumer.
127+
*/
128+
public static Consumer<Cleaner.Cleanable> getAllocationTrace() {
129+
return trackAllocations;
130+
}
131+
132+
/***
133+
* Returns the associated completion stage value.
134+
* @return the associated completion stage value.
135+
*/
136+
public CompletionStage<T> stage() {
137+
return stage;
138+
}
139+
140+
/**
141+
* Returns the associated disposable value.
142+
* @return the associated disposable value.
143+
*/
144+
public Disposable disposable() {
145+
return disposable;
146+
}
44147
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.internal.operators.streamable;
15+
16+
import java.lang.ref.Cleaner;
17+
import java.util.*;
18+
19+
import org.junit.jupiter.api.*;
20+
21+
import io.reactivex.rxjava4.core.CompletionStageDisposable;
22+
import io.reactivex.rxjava4.exceptions.CompositeException;
23+
import io.reactivex.rxjava4.functions.Consumer;
24+
import io.reactivex.rxjava4.plugins.RxJavaPlugins;
25+
26+
public abstract class StreamableBaseTest {
27+
28+
protected java.util.function.Consumer<Cleaner.Cleanable> stageTrackingState;
29+
30+
protected Consumer<? super Throwable> oldHandler;
31+
32+
protected List<Throwable> errors;
33+
34+
protected List<Cleaner.Cleanable> cleaners;
35+
36+
protected volatile boolean undeliverablesExpected;
37+
38+
@BeforeEach
39+
protected final void beforeTest() {
40+
errors = Collections.synchronizedList(new ArrayList<>());
41+
cleaners = Collections.synchronizedList(new ArrayList<>());
42+
undeliverablesExpected = false;
43+
44+
stageTrackingState = CompletionStageDisposable.getAllocationTrace();
45+
CompletionStageDisposable.setAllocationTrace(cleaners::add);
46+
47+
oldHandler = RxJavaPlugins.getErrorHandler();
48+
RxJavaPlugins.setErrorHandler(e -> {
49+
if (!undeliverablesExpected) {
50+
errors.add(e);
51+
}
52+
if (oldHandler != null) {
53+
oldHandler.accept(e);
54+
}
55+
});
56+
}
57+
58+
@AfterEach
59+
protected final void afterTest(TestInfo testInfo) {
60+
CompletionStageDisposable.setAllocationTrace(stageTrackingState);
61+
for (var c : cleaners) {
62+
c.clean();
63+
}
64+
if (errors.size() != 0) {
65+
throw new AssertionError("Undeliverable exceptions during test detected: " + testInfo.getDisplayName(),
66+
new CompositeException(errors));
67+
}
68+
}
69+
70+
protected final void setUndeliverablesExpected(boolean isExpected) {
71+
undeliverablesExpected = isExpected;
72+
}
73+
74+
}

src/test/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableTest.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@
1313

1414
package io.reactivex.rxjava4.internal.operators.streamable;
1515

16-
import static org.junit.jupiter.api.Assertions.assertFalse;
17-
import static org.junit.jupiter.api.Assertions.assertEquals;
16+
import static org.junit.jupiter.api.Assertions.*;
1817

1918
import java.util.concurrent.TimeUnit;
2019
import java.util.concurrent.atomic.AtomicInteger;
@@ -29,7 +28,7 @@
2928
import io.reactivex.rxjava4.testsupport.TestHelper;
3029

3130
@Isolated
32-
public class StreamableTest {
31+
public class StreamableTest extends StreamableBaseTest {
3332

3433
@Test
3534
public void empty() throws Throwable {
@@ -38,16 +37,18 @@ public void empty() throws Throwable {
3837
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
3938
ts.onSubscribe(EmptySubscription.INSTANCE);
4039

41-
var comp = Streamable.empty().forEach(e -> { ts.onError(new TestException("Element produced? " + e)); }, exec);
40+
try (var comp = Streamable.empty().forEach(e -> { ts.onError(new TestException("Element produced? " + e)); }, exec)) {
4241

43-
comp.stage().toCompletableFuture().thenAccept(_ -> ts.onComplete()).exceptionally(e -> { ts.onError(e); return null; }).join();
42+
comp.stage().toCompletableFuture().thenAccept(_ -> ts.onComplete())
43+
.exceptionally(e -> { ts.onError(e); return null; });
4444

45-
ts
46-
.awaitDone(5, TimeUnit.SECONDS)
47-
.assertResult();
45+
ts
46+
.awaitDone(5, TimeUnit.SECONDS)
47+
.assertResult();
4848

49-
assertFalse(exec.isShutdown(), "Exec::IsShutdown");
50-
assertFalse(exec.isTerminated(), "Exec::IsTerminated");
49+
assertFalse(exec.isShutdown(), "Exec::IsShutdown");
50+
assertFalse(exec.isTerminated(), "Exec::IsTerminated");
51+
}
5152
});
5253
}
5354

@@ -58,16 +59,18 @@ public void just() throws Throwable {
5859
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
5960
ts.onSubscribe(EmptySubscription.INSTANCE);
6061

61-
var comp = Streamable.just(1).forEach(e -> { ts.onNext(e); }, exec);
62+
try (var comp = Streamable.just(1).forEach(e -> { ts.onNext(e); }, exec)) {
6263

63-
comp.stage().toCompletableFuture().thenAccept(_ -> ts.onComplete()).exceptionally(e -> { ts.onError(e); return null; }).join();
64+
comp.stage().toCompletableFuture().thenAccept(_ -> ts.onComplete())
65+
.exceptionally(e -> { ts.onError(e); return null; }).join();
6466

65-
ts
66-
.awaitDone(5, TimeUnit.SECONDS)
67-
.assertResult(1);
67+
ts
68+
.awaitDone(5, TimeUnit.SECONDS)
69+
.assertResult(1);
6870

69-
assertFalse(exec.isShutdown(), "Exec::IsShutdown");
70-
assertFalse(exec.isTerminated(), "Exec::IsTerminated");
71+
assertFalse(exec.isShutdown(), "Exec::IsShutdown");
72+
assertFalse(exec.isTerminated(), "Exec::IsTerminated");
73+
}
7174
});
7275
}
7376

0 commit comments

Comments
 (0)