Skip to content

Commit 1d3d529

Browse files
authored
Fix #8086: Isolate windowUnsubscribeNonOverlappingAsyncSource to JUnit 5 (#8087)
1 parent e54d3ee commit 1d3d529

File tree

2 files changed

+64
-33
lines changed

2 files changed

+64
-33
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.internal.operators.observable;
15+
16+
import static org.junit.jupiter.api.Assertions.assertTrue;
17+
import org.junit.jupiter.api.Test;
18+
import org.junit.jupiter.api.parallel.Isolated;
19+
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
import io.reactivex.rxjava4.core.Observable;
24+
import io.reactivex.rxjava4.core.RxJavaTest;
25+
import io.reactivex.rxjava4.functions.Consumer;
26+
import io.reactivex.rxjava4.schedulers.Schedulers;
27+
import io.reactivex.rxjava4.testsupport.*;
28+
29+
@Isolated
30+
public class ObservableWindowWithSizeIsolatedTest extends RxJavaTest {
31+
32+
@Test
33+
public void windowUnsubscribeNonOverlappingAsyncSource() {
34+
TestObserverEx<Integer> to = new TestObserverEx<>();
35+
36+
final AtomicInteger count = new AtomicInteger();
37+
Observable.merge(Observable.range(1, 100000)
38+
.doOnNext(new Consumer<Integer>() {
39+
40+
@Override
41+
public void accept(Integer t1) {
42+
if (count.incrementAndGet() == 500000) {
43+
// give it a small break halfway through
44+
try {
45+
Thread.sleep(50);
46+
} catch (InterruptedException ex) {
47+
// ignored
48+
}
49+
}
50+
}
51+
52+
})
53+
.observeOn(Schedulers.computation())
54+
.window(5)
55+
.take(2))
56+
.subscribe(to);
57+
58+
to.awaitDone(500, TimeUnit.MILLISECONDS);
59+
to.assertTerminated();
60+
to.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
61+
// make sure we don't emit all values ... the unsubscribe should propagate
62+
assertTrue(count.get() < 100000);
63+
}
64+
}

src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableWindowWithSizeTest.java

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -126,39 +126,6 @@ public void accept(Integer t1) {
126126
assertEquals(10, count.get());
127127
}
128128

129-
@Test
130-
public void windowUnsubscribeNonOverlappingAsyncSource() {
131-
TestObserverEx<Integer> to = new TestObserverEx<>();
132-
133-
final AtomicInteger count = new AtomicInteger();
134-
Observable.merge(Observable.range(1, 100000)
135-
.doOnNext(new Consumer<Integer>() {
136-
137-
@Override
138-
public void accept(Integer t1) {
139-
if (count.incrementAndGet() == 500000) {
140-
// give it a small break halfway through
141-
try {
142-
Thread.sleep(50);
143-
} catch (InterruptedException ex) {
144-
// ignored
145-
}
146-
}
147-
}
148-
149-
})
150-
.observeOn(Schedulers.computation())
151-
.window(5)
152-
.take(2))
153-
.subscribe(to);
154-
155-
to.awaitDone(500, TimeUnit.MILLISECONDS);
156-
to.assertTerminated();
157-
to.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
158-
// make sure we don't emit all values ... the unsubscribe should propagate
159-
assertTrue(count.get() < 100000);
160-
}
161-
162129
@Test
163130
public void windowUnsubscribeOverlapping() {
164131
TestObserverEx<Integer> to = new TestObserverEx<>();

0 commit comments

Comments
 (0)