Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,24 @@
*/
String COMPUTATION = "io.reactivex:computation";
/**
* The operator/class runs on RxJava's {@linkplain Schedulers#io() I/O scheduler} or takes
* The operator/class runs on RxJava's {@linkplain Schedulers#cached() I/O scheduler} or takes
* timing information from it.
* @deprecated since 4.0.0, use the more specific {@link #CACHED} or {@link #VIRTUAL} constants
*/
@Deprecated(since = "4.0.0")
String IO = "io.reactivex:io";
/**
* The operator/class runs on RxJava's {@linkplain Schedulers#cached() I/O scheduler} or takes
* timing information from it.
* @since 4.0.0
*/
String CACHED = "io.reactivex:cached";
/**
* The operator/class runs on RxJava's {@linkplain Schedulers#virtual() Virtual scheduler} or takes
* timing information from it.
* @since 4.0.0
*/
String VIRTUAL = "io.reactivex:virtual";
/**
* The operator/class runs on RxJava's {@linkplain Schedulers#newThread() new thread scheduler}
* or takes timing information from it.
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/rxjava4/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
* Example:
* <pre><code>
* Disposable d = Completable.complete()
* .delay(10, TimeUnit.SECONDS, Schedulers.io())
* .delay(10, TimeUnit.SECONDS, Schedulers.cached())
* .subscribeWith(new DisposableCompletableObserver() {
* &#64;Override
* public void onStart() {
Expand Down
184 changes: 175 additions & 9 deletions src/main/java/io/reactivex/rxjava4/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
* <em>Reactive Streams</em> implementations.
* <p>
* The {@code Flowable} hosts the default buffer size of 128 elements for operators, accessible via {@link #bufferSize()},
* that can be overridden globally via the system parameter {@code rx3.buffer-size}. Most operators, however, have
* that can be overridden globally via the system parameter {@code rx4.buffer-size}. Most operators, however, have
* overloads that allow setting their internal buffer size explicitly.
* <p>
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
Expand Down Expand Up @@ -161,7 +161,7 @@ public abstract non-sealed class Flowable<@NonNull T> implements Publisher<T>,
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx3.buffer-size", 128));
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx4.buffer-size", 128));
}

/**
Expand Down Expand Up @@ -250,7 +250,7 @@ public abstract non-sealed class Flowable<@NonNull T> implements Publisher<T>,

/**
* Returns the default internal buffer size used by most async operators.
* <p>The value can be overridden via system parameter {@code rx3.buffer-size}
* <p>The value can be overridden via system parameter {@code rx4.buffer-size}
* <em>before</em> the {@code Flowable} class is loaded.
* @return the default internal buffer size.
*/
Expand Down Expand Up @@ -20815,7 +20815,100 @@ public final Stream<T> blockingStream(int prefetch) {
public static <@NonNull T> Flowable<T> virtualCreate(@NonNull VirtualGenerator<T> generator, @NonNull ExecutorService executor) {
Objects.requireNonNull(generator, "generator is null");
Objects.requireNonNull(executor, "executor is null");
return RxJavaPlugins.onAssembly(new FlowableVirtualCreateExecutor<>(generator, executor));
return RxJavaPlugins.onAssembly(new FlowableVirtualCreateExecutor<>(generator, executor, null));
}

/**
* Construct a {@code Flowable} and use the given {@code generator}
* to generate items on demand while running on the {@link Schedulers#virtual()}.
* <p>
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator honors backpressure from downstream and blocks the emitter if
* the downstream is not ready.
* </dd>
* <dt><b>Scheduler:</b></dt>
* <dd>The operator by default runs on the {@link Schedulers#virtual()} scheduler.</dd>
* </dl>
* <p>
* Note that backpressure is handled via blocking so it is recommended the default
* {@link Scheduler} uses virtual threads, such as the one returned by
* {@link Schedulers#virtual()}.
* <p>
* Examples:
* <pre><code>
* Flowable.&lt;Integer&gt;virtualCreate(emitter -> {
* for (int i = 0; i &lt; 10; i++) {
* Thread.sleep(1000);
* emitter.emit(i);
* }
* })
* .subscribe(
* System.out::println,
* Throwable::printStackTrace,
* () -&gt; System.out.println("Done")
* );
* </code></pre>
* @param <T> the element type to emit
* @param generator the callback used to generate items on demand by the downstream
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code generator} or {@code executor} is {@code null}
* @since 4.0.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.VIRTUAL)
@NonNull
public static <@NonNull T> Flowable<T> virtualCreate(@NonNull VirtualGenerator<T> generator) {
return virtualCreate(generator, Schedulers.virtual());
}

/**
* Construct a {@code Flowable} and use the given {@code generator}
* to generate items on demand while running on the given {@link Scheduler}.
* <p>
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator honors backpressure from downstream and blocks the emitter if
* the downstream is not ready.
* </dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
* <p>
* Note that backpressure is handled via blocking so it is recommended the provided
* {@code Scheduler} uses virtual threads, such as the one returned by
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
* <p>
* Examples:
* <pre><code>
* Flowable.&lt;Integer&gt;virtualCreate(emitter -> {
* for (int i = 0; i &lt; 10; i++) {
* Thread.sleep(1000);
* emitter.emit(i);
* }
* }, Schedulers.virtual())
* .subscribe(
* System.out::println,
* Throwable::printStackTrace,
* () -&gt; System.out.println("Done")
* );
* </code></pre>
* @param <T> the element type to emit
* @param generator the callback used to generate items on demand by the downstream
* @param scheduler the target {@code Scheduler} to use for running the callback
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code generator} or {@code scheduler} is {@code null}
* @since 4.0.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public static <@NonNull T> Flowable<T> virtualCreate(@NonNull VirtualGenerator<T> generator, @NonNull Scheduler scheduler) {
Objects.requireNonNull(generator, "generator is null");
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableVirtualCreateExecutor<>(generator, null, scheduler));
}

/**
Expand All @@ -20835,7 +20928,8 @@ public final Stream<T> blockingStream(int prefetch) {
* {@code ExecutorService} uses virtual threads, such as the one returned by
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
* @param <R> the downstream element type
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)} is invoked for each upstream item
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
* is invoked for each upstream item
* @param executor the target {@code ExecutorService} to use for running the callback
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code transformer} or {@code executor} is {@code null}
Expand All @@ -20845,10 +20939,80 @@ public final Stream<T> blockingStream(int prefetch) {
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <@NonNull R> Flowable<R> virtualTransform(@NonNull VirtualTransformer<T, R> transformer, @NonNull ExecutorService executor) {
public final <@NonNull R> Flowable<R> virtualTransform(@NonNull VirtualTransformer<T, R> transformer,
@NonNull ExecutorService executor) {
return virtualTransform(transformer, executor, Flowable.bufferSize());
}

/**
* Returns a {@code Flowable} that turns an upstream item an upstream item into
* zero or more downstream values by running on the {@link Schedulers#virtual()} scheduler.
* <p>
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator honors backpressure from downstream and blocks the emitter if
* the downstream is not ready.
* </dd>
* <dt><b>Scheduler:</b></dt>
* <dd>The operator by default runs on the {@link Schedulers#virtual()} scheduler.</dd>
* </dl>
* <p>
* Note that backpressure is handled via blocking so it is recommended the default
* {@link Scheduler} uses virtual threads, such as the one returned by
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
* @param <R> the downstream element type
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
* is invoked for each upstream item
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code transformer} is {@code null}
* @since 4.0.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.VIRTUAL)
@NonNull
public final <@NonNull R> Flowable<R> virtualTransform(@NonNull VirtualTransformer<T, R> transformer) {
return virtualTransform(transformer, Schedulers.virtual(), Flowable.bufferSize());
}

/**
* Returns a {@code Flowable} that turns an upstream item an upstream item into
* zero or more downstream values by running on the given {@link Scheduler}.
* <p>
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator honors backpressure from downstream and blocks the emitter if
* the downstream is not ready.
* </dd>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
* </dl>
* <p>
* Note that backpressure is handled via blocking so it is recommended the provided
* {@code Scheduler} uses virtual threads, such as the one returned by
* {@link Schedulers#virtual()}.
* @param <R> the downstream element type
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
* is invoked for each upstream item
* @param scheduler the target {@code Scheduler} to use for running the callback
* @param prefetch the number of items to fetch from the upstream.
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code transformer} or {@code scheduler} is {@code null}
* @throws IllegalArgumentException if {@code prefetch} is non-positive
* @since 4.0.0
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@NonNull
public final <@NonNull R> Flowable<R> virtualTransform(@NonNull VirtualTransformer<T, R> transformer,
@NonNull Scheduler scheduler, int prefetch) {
Objects.requireNonNull(transformer, "transformer is null");
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return new FlowableVirtualTransformExecutor<>(this, transformer, null, scheduler, prefetch);
}

/**
* Returns a {@code Flowable} that turns an upstream item into zero or more downstream
* values by running on the given {@link ExecutorService}.
Expand All @@ -20866,7 +21030,8 @@ public final Stream<T> blockingStream(int prefetch) {
* {@code ExecutorService} uses virtual threads, such as the one returned by
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
* @param <R> the downstream element type
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)} is invoked for each upstream item
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
* is invoked for each upstream item
* @param executor the target {@code ExecutorService} to use for running the callback
* @param prefetch the number of items to fetch from the upstream.
* @return the new {@code Flowable} instance
Expand All @@ -20878,11 +21043,12 @@ public final Stream<T> blockingStream(int prefetch) {
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final <@NonNull R> Flowable<R> virtualTransform(@NonNull VirtualTransformer<T, R> transformer, @NonNull ExecutorService executor, int prefetch) {
public final <@NonNull R> Flowable<R> virtualTransform(@NonNull VirtualTransformer<T, R> transformer,
@NonNull ExecutorService executor, int prefetch) {
Objects.requireNonNull(transformer, "transformer is null");
Objects.requireNonNull(executor, "executor is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return new FlowableVirtualTransformExecutor<>(this, transformer, executor, prefetch);
return new FlowableVirtualTransformExecutor<>(this, transformer, executor, null, prefetch);
}

}
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/rxjava4/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
* Example:
* <pre><code>
* Disposable d = Maybe.just("Hello World")
* .delay(10, TimeUnit.SECONDS, Schedulers.io())
* .delay(10, TimeUnit.SECONDS, Schedulers.cached())
* .subscribeWith(new DisposableMaybeObserver&lt;String&gt;() {
* &#64;Override
* public void onStart() {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/rxjava4/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* for such non-backpressured flows, which {@code Observable} itself implements as well.
* <p>
* The {@code Observable}'s operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()}),
* that can be overridden globally via the system parameter {@code rx3.buffer-size}. Most operators, however, have
* that can be overridden globally via the system parameter {@code rx4.buffer-size}. Most operators, however, have
* overloads that allow setting their internal buffer size explicitly.
* <p>
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
Expand Down Expand Up @@ -181,7 +181,7 @@ public abstract class Observable<@NonNull T> implements ObservableSource<T> {
/**
* Returns the default 'island' size or capacity-increment hint for unbounded buffers.
* <p>Delegates to {@link Flowable#bufferSize} but is public for convenience.
* <p>The value can be overridden via system parameter {@code rx3.buffer-size}
* <p>The value can be overridden via system parameter {@code rx4.buffer-size}
* <em>before</em> the {@link Flowable} class is loaded.
* @return the default 'island' size or capacity-increment hint
*/
Expand Down
Loading