Skip to content

Commit 8b3af84

Browse files
committed
Add TCK tests, fix other tests
1 parent b2b05a0 commit 8b3af84

7 files changed

Lines changed: 326 additions & 4 deletions

src/test/java/io/reactivex/rxjava4/internal/virtual/VirtualCreateVirtualTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,14 @@ public void checkIsInsideVirtualThread() {
4949
})
5050
.test()
5151
.awaitDone(5, TimeUnit.SECONDS)
52-
.assertResult(true); }
52+
.assertResult(true);
53+
}
5354

5455
@Test
5556
public void checkIsInsideVirtualThreadExec() throws Throwable {
5657
Flowable.virtualCreate(emitter -> {
5758
emitter.emit(Thread.currentThread().isVirtual());
58-
})
59+
}, Schedulers.cached())
5960
.test()
6061
.awaitDone(5, TimeUnit.SECONDS)
6162
.assertResult(false);
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.internal.virtual.tck;
15+
16+
import java.io.IOException;
17+
import java.util.concurrent.Flow.Publisher;
18+
19+
import org.testng.annotations.Test;
20+
21+
import io.reactivex.rxjava4.core.Flowable;
22+
import io.reactivex.rxjava4.tck.BaseTck;
23+
24+
@Test
25+
public class FlowableVirtualCreateVirtualTckTest extends BaseTck<Long> {
26+
27+
@Override
28+
public Publisher<Long> createFlowPublisher(final long elements) {
29+
return
30+
Flowable.virtualCreate(emitter -> {
31+
for (var i = 0L; i < elements; i++) {
32+
emitter.emit(i);
33+
}
34+
});
35+
}
36+
37+
@Override
38+
public Publisher<Long> createFailedFlowPublisher() {
39+
return Flowable.<Long>virtualCreate(_ -> {
40+
throw new IOException();
41+
});
42+
}
43+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.internal.virtual.tck;
15+
16+
import java.io.IOException;
17+
import java.util.concurrent.Flow.Publisher;
18+
19+
import org.testng.annotations.Test;
20+
21+
import io.reactivex.rxjava4.core.Flowable;
22+
import io.reactivex.rxjava4.schedulers.Schedulers;
23+
import io.reactivex.rxjava4.tck.BaseTck;
24+
25+
@Test
26+
public class FlowableVirtualTransformVirtual1TckTest extends BaseTck<Long> {
27+
28+
@Override
29+
public Publisher<Long> createFlowPublisher(final long elements) {
30+
var half = elements >> 1;
31+
var rest = elements - half;
32+
return Flowable.rangeLong(0, rest)
33+
.virtualTransform((v, emitter) -> {
34+
emitter.emit(v);
35+
if (v < rest - 1 || half == rest) {
36+
emitter.emit(v);
37+
}
38+
}, Schedulers.virtual(), 1);
39+
}
40+
41+
@Override
42+
public Publisher<Long> createFailedFlowPublisher() {
43+
return Flowable.just(1)
44+
.virtualTransform((_, _) -> {
45+
throw new IOException();
46+
}, Schedulers.virtual(), 1);
47+
}
48+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.internal.virtual.tck;
15+
16+
import java.io.IOException;
17+
import java.util.concurrent.Flow.Publisher;
18+
import java.util.concurrent.TimeUnit;
19+
20+
import org.testng.annotations.Test;
21+
22+
import io.reactivex.rxjava4.core.Flowable;
23+
import io.reactivex.rxjava4.schedulers.Schedulers;
24+
import io.reactivex.rxjava4.tck.BaseTck;
25+
26+
@Test
27+
public class FlowableVirtualTransformVirtual2TckTest extends BaseTck<Long> {
28+
29+
@Override
30+
public Publisher<Long> createFlowPublisher(final long elements) {
31+
var half = elements >> 1;
32+
var rest = elements - half;
33+
return Flowable.rangeLong(0, rest)
34+
.virtualTransform((v, emitter) -> {
35+
emitter.emit(v);
36+
if (v < rest - 1 || half == rest) {
37+
emitter.emit(v);
38+
}
39+
}, Schedulers.virtual(), 2);
40+
}
41+
42+
@Override
43+
public Publisher<Long> createFailedFlowPublisher() {
44+
return Flowable.just(1)
45+
.virtualTransform((_, _) -> {
46+
throw new IOException();
47+
}, Schedulers.virtual(), 2);
48+
}
49+
50+
@Test
51+
public void slowProducer() {
52+
Flowable.range(1, 10)
53+
.subscribeOn(Schedulers.computation())
54+
.map(v -> {
55+
Thread.interrupted();
56+
Thread.sleep(10);
57+
return v;
58+
})
59+
.virtualTransform((v, emitter) -> {
60+
emitter.emit(v);
61+
}, Schedulers.virtual(), 2)
62+
.test()
63+
.awaitDone(5, TimeUnit.SECONDS)
64+
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
65+
}
66+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.internal.virtual.tck;
15+
16+
import java.io.IOException;
17+
import java.util.concurrent.Flow.Publisher;
18+
19+
import org.testng.annotations.Test;
20+
21+
import io.reactivex.rxjava4.core.Flowable;
22+
import io.reactivex.rxjava4.schedulers.Schedulers;
23+
import io.reactivex.rxjava4.tck.BaseTck;
24+
25+
@Test
26+
public class FlowableVirtualTransformVirtual3TckTest extends BaseTck<Long> {
27+
28+
@Override
29+
public Publisher<Long> createFlowPublisher(final long elements) {
30+
var half = elements >> 1;
31+
var rest = elements - half;
32+
return Flowable.rangeLong(0, rest)
33+
.virtualTransform((v, emitter) -> {
34+
emitter.emit(v);
35+
if (v < rest - 1 || half == rest) {
36+
emitter.emit(v);
37+
}
38+
}, Schedulers.virtual(), Flowable.bufferSize());
39+
}
40+
41+
@Override
42+
public Publisher<Long> createFailedFlowPublisher() {
43+
return Flowable.error(new IOException())
44+
.virtualTransform((_, _) -> {
45+
}, Schedulers.virtual(), Flowable.bufferSize());
46+
}
47+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.internal.virtual.tck;
15+
16+
import java.io.IOException;
17+
import java.util.concurrent.*;
18+
import java.util.concurrent.Flow.Publisher;
19+
20+
import org.testng.annotations.Test;
21+
22+
import io.reactivex.rxjava4.core.Flowable;
23+
import io.reactivex.rxjava4.schedulers.Schedulers;
24+
import io.reactivex.rxjava4.tck.BaseTck;
25+
import io.reactivex.rxjava4.testsupport.TestHelper;
26+
27+
@Test
28+
public class FlowableVirtualTransformVirtualTckTest extends BaseTck<Long> {
29+
30+
@Override
31+
public Publisher<Long> createFlowPublisher(final long elements) {
32+
var half = elements >> 1;
33+
var rest = elements - half;
34+
return Flowable.rangeLong(0, rest)
35+
.virtualTransform((v, emitter) -> {
36+
emitter.emit(v);
37+
if (v < rest - 1 || half == rest) {
38+
emitter.emit(v);
39+
}
40+
}, Schedulers.virtual(), Flowable.bufferSize());
41+
}
42+
43+
@Override
44+
public Publisher<Long> createFailedFlowPublisher() {
45+
return Flowable.just(1)
46+
.virtualTransform((_, _) -> {
47+
throw new IOException();
48+
}, Schedulers.virtual(), Flowable.bufferSize());
49+
}
50+
51+
@Test
52+
public void slowProducer() {
53+
var log = new ConcurrentLinkedQueue<String>();
54+
55+
var ts = Flowable.range(1, 10)
56+
.doOnNext(v -> log.offer("Range: " + v))
57+
.doOnRequest(v -> log.offer("SubscribeOn requested: " + v))
58+
.doOnSubscribe(_ -> log.offer("Range subscribed to"))
59+
.subscribeOn(Schedulers.computation())
60+
.doOnRequest(v -> log.offer("Map requested: " + v))
61+
.doOnSubscribe(_ -> log.offer("subscribeOn subscribed to"))
62+
.map(v -> {
63+
log.offer("Map: " + v);
64+
log.offer("Map interrupted? " + Thread.interrupted());
65+
try {
66+
Thread.sleep(10);
67+
} catch (InterruptedException ex) {
68+
log.offer("Map sleep interrupted");
69+
}
70+
return v;
71+
})
72+
.doOnRequest(v -> log.offer("Transform requested: " + v))
73+
.virtualTransform((v, emitter) -> {
74+
log.offer("Tansform before emit: " + v);
75+
emitter.emit(v);
76+
log.offer("Tansform after emit: " + v);
77+
}, Schedulers.virtual(), Flowable.bufferSize())
78+
.doOnRequest(v -> log.offer("Test requested: " + v))
79+
.doOnNext(v -> log.offer("Test received: " + v))
80+
.test()
81+
.awaitDone(5, TimeUnit.SECONDS)
82+
;
83+
84+
try {
85+
ts.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
86+
} catch (AssertionError ex) {
87+
var sb = new StringBuilder();
88+
log.forEach(v -> sb.append("\r\n").append(v));
89+
var exc = new AssertionError(sb.append("\r\n").toString(), ex);
90+
throw exc;
91+
}
92+
}
93+
94+
@Test
95+
public void slowProducerService() {
96+
TestHelper.checkObstruction();
97+
98+
Flowable.range(1, 10)
99+
.subscribeOn(Schedulers.computation())
100+
.map(v -> {
101+
Thread.interrupted();
102+
try {
103+
Thread.sleep(10);
104+
} catch (InterruptedException ex) {
105+
// ignored
106+
}
107+
return v;
108+
})
109+
.virtualTransform((v, emitter) -> {
110+
emitter.emit(v);
111+
}, Schedulers.virtual(), Flowable.bufferSize())
112+
.test()
113+
.awaitDone(5, TimeUnit.SECONDS)
114+
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
115+
}
116+
}

src/test/java/io/reactivex/rxjava4/schedulers/VirtualThreadSchedulerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import java.util.concurrent.*;
1919

20-
import org.junit.Test;
20+
import org.junit.*;
2121

2222
import io.reactivex.rxjava4.core.*;
2323
import io.reactivex.rxjava4.core.Scheduler.Worker;
@@ -45,7 +45,7 @@ public final void virtualScheduler() {
4545

4646
@Override
4747
public String apply(Integer t) {
48-
assertTrue(Thread.currentThread().getName().startsWith("RxCachedThreadScheduler"));
48+
assertTrue(Thread.currentThread().isVirtual());
4949
return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
5050
}
5151
});
@@ -91,6 +91,7 @@ public void workerDisposed() {
9191
assertTrue(((Disposable)w).isDisposed());
9292
}
9393

94+
@Ignore("FIXME DeferredExecutorScheduler doesn't support shutdown yet")
9495
@Test
9596
@SuppressUndeliverable
9697
public void shutdownRejects() {

0 commit comments

Comments
 (0)