Skip to content

Commit f5bfbc5

Browse files
authored
4.x: Intro Schedulers.virtual(), deprecate Schedulers.io(), fix tests (#8051)
* 4.x: Intro Schedulers.virtual(), deprecate Schedulers.io(), fix tests * Add TCK tests, fix other tests
1 parent a1741d5 commit f5bfbc5

File tree

103 files changed

+1881
-304
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

103 files changed

+1881
-304
lines changed

src/main/java/io/reactivex/rxjava4/annotations/SchedulerSupport.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,24 @@
4646
*/
4747
String COMPUTATION = "io.reactivex:computation";
4848
/**
49-
* The operator/class runs on RxJava's {@linkplain Schedulers#io() I/O scheduler} or takes
49+
* The operator/class runs on RxJava's {@linkplain Schedulers#cached() I/O scheduler} or takes
5050
* timing information from it.
51+
* @deprecated since 4.0.0, use the more specific {@link #CACHED} or {@link #VIRTUAL} constants
5152
*/
53+
@Deprecated(since = "4.0.0")
5254
String IO = "io.reactivex:io";
55+
/**
56+
* The operator/class runs on RxJava's {@linkplain Schedulers#cached() I/O scheduler} or takes
57+
* timing information from it.
58+
* @since 4.0.0
59+
*/
60+
String CACHED = "io.reactivex:cached";
61+
/**
62+
* The operator/class runs on RxJava's {@linkplain Schedulers#virtual() Virtual scheduler} or takes
63+
* timing information from it.
64+
* @since 4.0.0
65+
*/
66+
String VIRTUAL = "io.reactivex:virtual";
5367
/**
5468
* The operator/class runs on RxJava's {@linkplain Schedulers#newThread() new thread scheduler}
5569
* or takes timing information from it.

src/main/java/io/reactivex/rxjava4/core/Completable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
* Example:
7070
* <pre><code>
7171
* Disposable d = Completable.complete()
72-
* .delay(10, TimeUnit.SECONDS, Schedulers.io())
72+
* .delay(10, TimeUnit.SECONDS, Schedulers.cached())
7373
* .subscribeWith(new DisposableCompletableObserver() {
7474
* &#64;Override
7575
* public void onStart() {

src/main/java/io/reactivex/rxjava4/core/Flowable.java

Lines changed: 175 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
* <em>Reactive Streams</em> implementations.
5252
* <p>
5353
* The {@code Flowable} hosts the default buffer size of 128 elements for operators, accessible via {@link #bufferSize()},
54-
* that can be overridden globally via the system parameter {@code rx3.buffer-size}. Most operators, however, have
54+
* that can be overridden globally via the system parameter {@code rx4.buffer-size}. Most operators, however, have
5555
* overloads that allow setting their internal buffer size explicitly.
5656
* <p>
5757
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
@@ -161,7 +161,7 @@ public abstract non-sealed class Flowable<@NonNull T> implements Publisher<T>,
161161
/** The default buffer size. */
162162
static final int BUFFER_SIZE;
163163
static {
164-
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx3.buffer-size", 128));
164+
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx4.buffer-size", 128));
165165
}
166166

167167
/**
@@ -250,7 +250,7 @@ public abstract non-sealed class Flowable<@NonNull T> implements Publisher<T>,
250250

251251
/**
252252
* Returns the default internal buffer size used by most async operators.
253-
* <p>The value can be overridden via system parameter {@code rx3.buffer-size}
253+
* <p>The value can be overridden via system parameter {@code rx4.buffer-size}
254254
* <em>before</em> the {@code Flowable} class is loaded.
255255
* @return the default internal buffer size.
256256
*/
@@ -20815,7 +20815,100 @@ public final Stream<T> blockingStream(int prefetch) {
2081520815
public static <@NonNull T> Flowable<T> virtualCreate(@NonNull VirtualGenerator<T> generator, @NonNull ExecutorService executor) {
2081620816
Objects.requireNonNull(generator, "generator is null");
2081720817
Objects.requireNonNull(executor, "executor is null");
20818-
return RxJavaPlugins.onAssembly(new FlowableVirtualCreateExecutor<>(generator, executor));
20818+
return RxJavaPlugins.onAssembly(new FlowableVirtualCreateExecutor<>(generator, executor, null));
20819+
}
20820+
20821+
/**
20822+
* Construct a {@code Flowable} and use the given {@code generator}
20823+
* to generate items on demand while running on the {@link Schedulers#virtual()}.
20824+
* <p>
20825+
* <dl>
20826+
* <dt><b>Backpressure:</b></dt>
20827+
* <dd>This operator honors backpressure from downstream and blocks the emitter if
20828+
* the downstream is not ready.
20829+
* </dd>
20830+
* <dt><b>Scheduler:</b></dt>
20831+
* <dd>The operator by default runs on the {@link Schedulers#virtual()} scheduler.</dd>
20832+
* </dl>
20833+
* <p>
20834+
* Note that backpressure is handled via blocking so it is recommended the default
20835+
* {@link Scheduler} uses virtual threads, such as the one returned by
20836+
* {@link Schedulers#virtual()}.
20837+
* <p>
20838+
* Examples:
20839+
* <pre><code>
20840+
* Flowable.&lt;Integer&gt;virtualCreate(emitter -> {
20841+
* for (int i = 0; i &lt; 10; i++) {
20842+
* Thread.sleep(1000);
20843+
* emitter.emit(i);
20844+
* }
20845+
* })
20846+
* .subscribe(
20847+
* System.out::println,
20848+
* Throwable::printStackTrace,
20849+
* () -&gt; System.out.println("Done")
20850+
* );
20851+
* </code></pre>
20852+
* @param <T> the element type to emit
20853+
* @param generator the callback used to generate items on demand by the downstream
20854+
* @return the new {@code Flowable} instance
20855+
* @throws NullPointerException if {@code generator} or {@code executor} is {@code null}
20856+
* @since 4.0.0
20857+
*/
20858+
@CheckReturnValue
20859+
@BackpressureSupport(BackpressureKind.FULL)
20860+
@SchedulerSupport(SchedulerSupport.VIRTUAL)
20861+
@NonNull
20862+
public static <@NonNull T> Flowable<T> virtualCreate(@NonNull VirtualGenerator<T> generator) {
20863+
return virtualCreate(generator, Schedulers.virtual());
20864+
}
20865+
20866+
/**
20867+
* Construct a {@code Flowable} and use the given {@code generator}
20868+
* to generate items on demand while running on the given {@link Scheduler}.
20869+
* <p>
20870+
* <dl>
20871+
* <dt><b>Backpressure:</b></dt>
20872+
* <dd>This operator honors backpressure from downstream and blocks the emitter if
20873+
* the downstream is not ready.
20874+
* </dd>
20875+
* <dt><b>Scheduler:</b></dt>
20876+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
20877+
* </dl>
20878+
* <p>
20879+
* Note that backpressure is handled via blocking so it is recommended the provided
20880+
* {@code Scheduler} uses virtual threads, such as the one returned by
20881+
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
20882+
* <p>
20883+
* Examples:
20884+
* <pre><code>
20885+
* Flowable.&lt;Integer&gt;virtualCreate(emitter -> {
20886+
* for (int i = 0; i &lt; 10; i++) {
20887+
* Thread.sleep(1000);
20888+
* emitter.emit(i);
20889+
* }
20890+
* }, Schedulers.virtual())
20891+
* .subscribe(
20892+
* System.out::println,
20893+
* Throwable::printStackTrace,
20894+
* () -&gt; System.out.println("Done")
20895+
* );
20896+
* </code></pre>
20897+
* @param <T> the element type to emit
20898+
* @param generator the callback used to generate items on demand by the downstream
20899+
* @param scheduler the target {@code Scheduler} to use for running the callback
20900+
* @return the new {@code Flowable} instance
20901+
* @throws NullPointerException if {@code generator} or {@code scheduler} is {@code null}
20902+
* @since 4.0.0
20903+
*/
20904+
@CheckReturnValue
20905+
@BackpressureSupport(BackpressureKind.FULL)
20906+
@SchedulerSupport(SchedulerSupport.CUSTOM)
20907+
@NonNull
20908+
public static <@NonNull T> Flowable<T> virtualCreate(@NonNull VirtualGenerator<T> generator, @NonNull Scheduler scheduler) {
20909+
Objects.requireNonNull(generator, "generator is null");
20910+
Objects.requireNonNull(scheduler, "scheduler is null");
20911+
return RxJavaPlugins.onAssembly(new FlowableVirtualCreateExecutor<>(generator, null, scheduler));
2081920912
}
2082020913

2082120914
/**
@@ -20835,7 +20928,8 @@ public final Stream<T> blockingStream(int prefetch) {
2083520928
* {@code ExecutorService} uses virtual threads, such as the one returned by
2083620929
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
2083720930
* @param <R> the downstream element type
20838-
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)} is invoked for each upstream item
20931+
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
20932+
* is invoked for each upstream item
2083920933
* @param executor the target {@code ExecutorService} to use for running the callback
2084020934
* @return the new {@code Flowable} instance
2084120935
* @throws NullPointerException if {@code transformer} or {@code executor} is {@code null}
@@ -20845,10 +20939,80 @@ public final Stream<T> blockingStream(int prefetch) {
2084520939
@BackpressureSupport(BackpressureKind.FULL)
2084620940
@SchedulerSupport(SchedulerSupport.NONE)
2084720941
@NonNull
20848-
public final <@NonNull R> Flowable<R> virtualTransform(@NonNull VirtualTransformer<T, R> transformer, @NonNull ExecutorService executor) {
20942+
public final <@NonNull R> Flowable<R> virtualTransform(@NonNull VirtualTransformer<T, R> transformer,
20943+
@NonNull ExecutorService executor) {
2084920944
return virtualTransform(transformer, executor, Flowable.bufferSize());
2085020945
}
2085120946

20947+
/**
20948+
* Returns a {@code Flowable} that turns an upstream item an upstream item into
20949+
* zero or more downstream values by running on the {@link Schedulers#virtual()} scheduler.
20950+
* <p>
20951+
* <dl>
20952+
* <dt><b>Backpressure:</b></dt>
20953+
* <dd>This operator honors backpressure from downstream and blocks the emitter if
20954+
* the downstream is not ready.
20955+
* </dd>
20956+
* <dt><b>Scheduler:</b></dt>
20957+
* <dd>The operator by default runs on the {@link Schedulers#virtual()} scheduler.</dd>
20958+
* </dl>
20959+
* <p>
20960+
* Note that backpressure is handled via blocking so it is recommended the default
20961+
* {@link Scheduler} uses virtual threads, such as the one returned by
20962+
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
20963+
* @param <R> the downstream element type
20964+
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
20965+
* is invoked for each upstream item
20966+
* @return the new {@code Flowable} instance
20967+
* @throws NullPointerException if {@code transformer} is {@code null}
20968+
* @since 4.0.0
20969+
*/
20970+
@CheckReturnValue
20971+
@BackpressureSupport(BackpressureKind.FULL)
20972+
@SchedulerSupport(SchedulerSupport.VIRTUAL)
20973+
@NonNull
20974+
public final <@NonNull R> Flowable<R> virtualTransform(@NonNull VirtualTransformer<T, R> transformer) {
20975+
return virtualTransform(transformer, Schedulers.virtual(), Flowable.bufferSize());
20976+
}
20977+
20978+
/**
20979+
* Returns a {@code Flowable} that turns an upstream item an upstream item into
20980+
* zero or more downstream values by running on the given {@link Scheduler}.
20981+
* <p>
20982+
* <dl>
20983+
* <dt><b>Backpressure:</b></dt>
20984+
* <dd>This operator honors backpressure from downstream and blocks the emitter if
20985+
* the downstream is not ready.
20986+
* </dd>
20987+
* <dt><b>Scheduler:</b></dt>
20988+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
20989+
* </dl>
20990+
* <p>
20991+
* Note that backpressure is handled via blocking so it is recommended the provided
20992+
* {@code Scheduler} uses virtual threads, such as the one returned by
20993+
* {@link Schedulers#virtual()}.
20994+
* @param <R> the downstream element type
20995+
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
20996+
* is invoked for each upstream item
20997+
* @param scheduler the target {@code Scheduler} to use for running the callback
20998+
* @param prefetch the number of items to fetch from the upstream.
20999+
* @return the new {@code Flowable} instance
21000+
* @throws NullPointerException if {@code transformer} or {@code scheduler} is {@code null}
21001+
* @throws IllegalArgumentException if {@code prefetch} is non-positive
21002+
* @since 4.0.0
21003+
*/
21004+
@CheckReturnValue
21005+
@BackpressureSupport(BackpressureKind.FULL)
21006+
@SchedulerSupport(SchedulerSupport.CUSTOM)
21007+
@NonNull
21008+
public final <@NonNull R> Flowable<R> virtualTransform(@NonNull VirtualTransformer<T, R> transformer,
21009+
@NonNull Scheduler scheduler, int prefetch) {
21010+
Objects.requireNonNull(transformer, "transformer is null");
21011+
Objects.requireNonNull(scheduler, "scheduler is null");
21012+
ObjectHelper.verifyPositive(prefetch, "prefetch");
21013+
return new FlowableVirtualTransformExecutor<>(this, transformer, null, scheduler, prefetch);
21014+
}
21015+
2085221016
/**
2085321017
* Returns a {@code Flowable} that turns an upstream item into zero or more downstream
2085421018
* values by running on the given {@link ExecutorService}.
@@ -20866,7 +21030,8 @@ public final Stream<T> blockingStream(int prefetch) {
2086621030
* {@code ExecutorService} uses virtual threads, such as the one returned by
2086721031
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
2086821032
* @param <R> the downstream element type
20869-
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)} is invoked for each upstream item
21033+
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
21034+
* is invoked for each upstream item
2087021035
* @param executor the target {@code ExecutorService} to use for running the callback
2087121036
* @param prefetch the number of items to fetch from the upstream.
2087221037
* @return the new {@code Flowable} instance
@@ -20878,11 +21043,12 @@ public final Stream<T> blockingStream(int prefetch) {
2087821043
@BackpressureSupport(BackpressureKind.FULL)
2087921044
@SchedulerSupport(SchedulerSupport.NONE)
2088021045
@NonNull
20881-
public final <@NonNull R> Flowable<R> virtualTransform(@NonNull VirtualTransformer<T, R> transformer, @NonNull ExecutorService executor, int prefetch) {
21046+
public final <@NonNull R> Flowable<R> virtualTransform(@NonNull VirtualTransformer<T, R> transformer,
21047+
@NonNull ExecutorService executor, int prefetch) {
2088221048
Objects.requireNonNull(transformer, "transformer is null");
2088321049
Objects.requireNonNull(executor, "executor is null");
2088421050
ObjectHelper.verifyPositive(prefetch, "prefetch");
20885-
return new FlowableVirtualTransformExecutor<>(this, transformer, executor, prefetch);
21051+
return new FlowableVirtualTransformExecutor<>(this, transformer, executor, null, prefetch);
2088621052
}
2088721053

2088821054
}

src/main/java/io/reactivex/rxjava4/core/Maybe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
* Example:
6969
* <pre><code>
7070
* Disposable d = Maybe.just("Hello World")
71-
* .delay(10, TimeUnit.SECONDS, Schedulers.io())
71+
* .delay(10, TimeUnit.SECONDS, Schedulers.cached())
7272
* .subscribeWith(new DisposableMaybeObserver&lt;String&gt;() {
7373
* &#64;Override
7474
* public void onStart() {

src/main/java/io/reactivex/rxjava4/core/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
* for such non-backpressured flows, which {@code Observable} itself implements as well.
4848
* <p>
4949
* The {@code Observable}'s operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()}),
50-
* that can be overridden globally via the system parameter {@code rx3.buffer-size}. Most operators, however, have
50+
* that can be overridden globally via the system parameter {@code rx4.buffer-size}. Most operators, however, have
5151
* overloads that allow setting their internal buffer size explicitly.
5252
* <p>
5353
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
@@ -181,7 +181,7 @@ public abstract class Observable<@NonNull T> implements ObservableSource<T> {
181181
/**
182182
* Returns the default 'island' size or capacity-increment hint for unbounded buffers.
183183
* <p>Delegates to {@link Flowable#bufferSize} but is public for convenience.
184-
* <p>The value can be overridden via system parameter {@code rx3.buffer-size}
184+
* <p>The value can be overridden via system parameter {@code rx4.buffer-size}
185185
* <em>before</em> the {@link Flowable} class is loaded.
186186
* @return the default 'island' size or capacity-increment hint
187187
*/

0 commit comments

Comments
 (0)