diff --git a/src/main/java/io/reactivex/rxjava4/annotations/SchedulerSupport.java b/src/main/java/io/reactivex/rxjava4/annotations/SchedulerSupport.java
index 9ddcdea883..2a07964427 100644
--- a/src/main/java/io/reactivex/rxjava4/annotations/SchedulerSupport.java
+++ b/src/main/java/io/reactivex/rxjava4/annotations/SchedulerSupport.java
@@ -46,10 +46,24 @@
*/
String COMPUTATION = "io.reactivex:computation";
/**
- * The operator/class runs on RxJava's {@linkplain Schedulers#io() I/O scheduler} or takes
+ * The operator/class runs on RxJava's {@linkplain Schedulers#cached() I/O scheduler} or takes
* timing information from it.
+ * @deprecated since 4.0.0, use the more specific {@link #CACHED} or {@link #VIRTUAL} constants
*/
+ @Deprecated(since = "4.0.0")
String IO = "io.reactivex:io";
+ /**
+ * The operator/class runs on RxJava's {@linkplain Schedulers#cached() I/O scheduler} or takes
+ * timing information from it.
+ * @since 4.0.0
+ */
+ String CACHED = "io.reactivex:cached";
+ /**
+ * The operator/class runs on RxJava's {@linkplain Schedulers#virtual() Virtual scheduler} or takes
+ * timing information from it.
+ * @since 4.0.0
+ */
+ String VIRTUAL = "io.reactivex:virtual";
/**
* The operator/class runs on RxJava's {@linkplain Schedulers#newThread() new thread scheduler}
* or takes timing information from it.
diff --git a/src/main/java/io/reactivex/rxjava4/core/Completable.java b/src/main/java/io/reactivex/rxjava4/core/Completable.java
index b0a68a7696..58b6645152 100644
--- a/src/main/java/io/reactivex/rxjava4/core/Completable.java
+++ b/src/main/java/io/reactivex/rxjava4/core/Completable.java
@@ -69,7 +69,7 @@
* Example:
*
* The {@code Flowable} hosts the default buffer size of 128 elements for operators, accessible via {@link #bufferSize()},
- * that can be overridden globally via the system parameter {@code rx3.buffer-size}. Most operators, however, have
+ * that can be overridden globally via the system parameter {@code rx4.buffer-size}. Most operators, however, have
* overloads that allow setting their internal buffer size explicitly.
*
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
@@ -161,7 +161,7 @@ public abstract non-sealed class Flowable<@NonNull T> implements Publisher,
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
- BUFFER_SIZE = Math.max(1, Integer.getInteger("rx3.buffer-size", 128));
+ BUFFER_SIZE = Math.max(1, Integer.getInteger("rx4.buffer-size", 128));
}
/**
@@ -250,7 +250,7 @@ public abstract non-sealed class Flowable<@NonNull T> implements Publisher,
/**
* Returns the default internal buffer size used by most async operators.
- *
The value can be overridden via system parameter {@code rx3.buffer-size}
+ *
The value can be overridden via system parameter {@code rx4.buffer-size}
* before the {@code Flowable} class is loaded.
* @return the default internal buffer size.
*/
@@ -20815,7 +20815,100 @@ public final Stream blockingStream(int prefetch) {
public static <@NonNull T> Flowable virtualCreate(@NonNull VirtualGenerator generator, @NonNull ExecutorService executor) {
Objects.requireNonNull(generator, "generator is null");
Objects.requireNonNull(executor, "executor is null");
- return RxJavaPlugins.onAssembly(new FlowableVirtualCreateExecutor<>(generator, executor));
+ return RxJavaPlugins.onAssembly(new FlowableVirtualCreateExecutor<>(generator, executor, null));
+ }
+
+ /**
+ * Construct a {@code Flowable} and use the given {@code generator}
+ * to generate items on demand while running on the {@link Schedulers#virtual()}.
+ *
+ *
+ *
Backpressure:
+ *
This operator honors backpressure from downstream and blocks the emitter if
+ * the downstream is not ready.
+ *
+ *
Scheduler:
+ *
The operator by default runs on the {@link Schedulers#virtual()} scheduler.
+ *
+ *
+ * Note that backpressure is handled via blocking so it is recommended the default
+ * {@link Scheduler} uses virtual threads, such as the one returned by
+ * {@link Schedulers#virtual()}.
+ *
+ * @param the element type to emit
+ * @param generator the callback used to generate items on demand by the downstream
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code generator} or {@code executor} is {@code null}
+ * @since 4.0.0
+ */
+ @CheckReturnValue
+ @BackpressureSupport(BackpressureKind.FULL)
+ @SchedulerSupport(SchedulerSupport.VIRTUAL)
+ @NonNull
+ public static <@NonNull T> Flowable virtualCreate(@NonNull VirtualGenerator generator) {
+ return virtualCreate(generator, Schedulers.virtual());
+ }
+
+ /**
+ * Construct a {@code Flowable} and use the given {@code generator}
+ * to generate items on demand while running on the given {@link Scheduler}.
+ *
+ *
+ *
Backpressure:
+ *
This operator honors backpressure from downstream and blocks the emitter if
+ * the downstream is not ready.
+ *
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * Note that backpressure is handled via blocking so it is recommended the provided
+ * {@code Scheduler} uses virtual threads, such as the one returned by
+ * {@link Executors#newVirtualThreadPerTaskExecutor()}.
+ *
+ * @param the element type to emit
+ * @param generator the callback used to generate items on demand by the downstream
+ * @param scheduler the target {@code Scheduler} to use for running the callback
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code generator} or {@code scheduler} is {@code null}
+ * @since 4.0.0
+ */
+ @CheckReturnValue
+ @BackpressureSupport(BackpressureKind.FULL)
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @NonNull
+ public static <@NonNull T> Flowable virtualCreate(@NonNull VirtualGenerator generator, @NonNull Scheduler scheduler) {
+ Objects.requireNonNull(generator, "generator is null");
+ Objects.requireNonNull(scheduler, "scheduler is null");
+ return RxJavaPlugins.onAssembly(new FlowableVirtualCreateExecutor<>(generator, null, scheduler));
}
/**
@@ -20835,7 +20928,8 @@ public final Stream blockingStream(int prefetch) {
* {@code ExecutorService} uses virtual threads, such as the one returned by
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
* @param the downstream element type
- * @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)} is invoked for each upstream item
+ * @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
+ * is invoked for each upstream item
* @param executor the target {@code ExecutorService} to use for running the callback
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code transformer} or {@code executor} is {@code null}
@@ -20845,10 +20939,80 @@ public final Stream blockingStream(int prefetch) {
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
- public final <@NonNull R> Flowable virtualTransform(@NonNull VirtualTransformer transformer, @NonNull ExecutorService executor) {
+ public final <@NonNull R> Flowable virtualTransform(@NonNull VirtualTransformer transformer,
+ @NonNull ExecutorService executor) {
return virtualTransform(transformer, executor, Flowable.bufferSize());
}
+ /**
+ * Returns a {@code Flowable} that turns an upstream item an upstream item into
+ * zero or more downstream values by running on the {@link Schedulers#virtual()} scheduler.
+ *
+ *
+ *
Backpressure:
+ *
This operator honors backpressure from downstream and blocks the emitter if
+ * the downstream is not ready.
+ *
+ *
Scheduler:
+ *
The operator by default runs on the {@link Schedulers#virtual()} scheduler.
+ *
+ *
+ * Note that backpressure is handled via blocking so it is recommended the default
+ * {@link Scheduler} uses virtual threads, such as the one returned by
+ * {@link Executors#newVirtualThreadPerTaskExecutor()}.
+ * @param the downstream element type
+ * @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
+ * is invoked for each upstream item
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code transformer} is {@code null}
+ * @since 4.0.0
+ */
+ @CheckReturnValue
+ @BackpressureSupport(BackpressureKind.FULL)
+ @SchedulerSupport(SchedulerSupport.VIRTUAL)
+ @NonNull
+ public final <@NonNull R> Flowable virtualTransform(@NonNull VirtualTransformer transformer) {
+ return virtualTransform(transformer, Schedulers.virtual(), Flowable.bufferSize());
+ }
+
+ /**
+ * Returns a {@code Flowable} that turns an upstream item an upstream item into
+ * zero or more downstream values by running on the given {@link Scheduler}.
+ *
+ *
+ *
Backpressure:
+ *
This operator honors backpressure from downstream and blocks the emitter if
+ * the downstream is not ready.
+ *
+ *
Scheduler:
+ *
You specify which {@code Scheduler} this operator will use.
+ *
+ *
+ * Note that backpressure is handled via blocking so it is recommended the provided
+ * {@code Scheduler} uses virtual threads, such as the one returned by
+ * {@link Schedulers#virtual()}.
+ * @param the downstream element type
+ * @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
+ * is invoked for each upstream item
+ * @param scheduler the target {@code Scheduler} to use for running the callback
+ * @param prefetch the number of items to fetch from the upstream.
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code transformer} or {@code scheduler} is {@code null}
+ * @throws IllegalArgumentException if {@code prefetch} is non-positive
+ * @since 4.0.0
+ */
+ @CheckReturnValue
+ @BackpressureSupport(BackpressureKind.FULL)
+ @SchedulerSupport(SchedulerSupport.CUSTOM)
+ @NonNull
+ public final <@NonNull R> Flowable virtualTransform(@NonNull VirtualTransformer transformer,
+ @NonNull Scheduler scheduler, int prefetch) {
+ Objects.requireNonNull(transformer, "transformer is null");
+ Objects.requireNonNull(scheduler, "scheduler is null");
+ ObjectHelper.verifyPositive(prefetch, "prefetch");
+ return new FlowableVirtualTransformExecutor<>(this, transformer, null, scheduler, prefetch);
+ }
+
/**
* Returns a {@code Flowable} that turns an upstream item into zero or more downstream
* values by running on the given {@link ExecutorService}.
@@ -20866,7 +21030,8 @@ public final Stream blockingStream(int prefetch) {
* {@code ExecutorService} uses virtual threads, such as the one returned by
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
* @param the downstream element type
- * @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)} is invoked for each upstream item
+ * @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
+ * is invoked for each upstream item
* @param executor the target {@code ExecutorService} to use for running the callback
* @param prefetch the number of items to fetch from the upstream.
* @return the new {@code Flowable} instance
@@ -20878,11 +21043,12 @@ public final Stream blockingStream(int prefetch) {
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
- public final <@NonNull R> Flowable virtualTransform(@NonNull VirtualTransformer transformer, @NonNull ExecutorService executor, int prefetch) {
+ public final <@NonNull R> Flowable virtualTransform(@NonNull VirtualTransformer transformer,
+ @NonNull ExecutorService executor, int prefetch) {
Objects.requireNonNull(transformer, "transformer is null");
Objects.requireNonNull(executor, "executor is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
- return new FlowableVirtualTransformExecutor<>(this, transformer, executor, prefetch);
+ return new FlowableVirtualTransformExecutor<>(this, transformer, executor, null, prefetch);
}
}
diff --git a/src/main/java/io/reactivex/rxjava4/core/Maybe.java b/src/main/java/io/reactivex/rxjava4/core/Maybe.java
index 8b14019411..a040c2a8e8 100644
--- a/src/main/java/io/reactivex/rxjava4/core/Maybe.java
+++ b/src/main/java/io/reactivex/rxjava4/core/Maybe.java
@@ -68,7 +68,7 @@
* Example:
*
* Disposable d = Maybe.just("Hello World")
- * .delay(10, TimeUnit.SECONDS, Schedulers.io())
+ * .delay(10, TimeUnit.SECONDS, Schedulers.cached())
* .subscribeWith(new DisposableMaybeObserver<String>() {
* @Override
* public void onStart() {
diff --git a/src/main/java/io/reactivex/rxjava4/core/Observable.java b/src/main/java/io/reactivex/rxjava4/core/Observable.java
index 4cfa9094b7..c68130859d 100644
--- a/src/main/java/io/reactivex/rxjava4/core/Observable.java
+++ b/src/main/java/io/reactivex/rxjava4/core/Observable.java
@@ -47,7 +47,7 @@
* for such non-backpressured flows, which {@code Observable} itself implements as well.
*
* The {@code Observable}'s operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()}),
- * that can be overridden globally via the system parameter {@code rx3.buffer-size}. Most operators, however, have
+ * that can be overridden globally via the system parameter {@code rx4.buffer-size}. Most operators, however, have
* overloads that allow setting their internal buffer size explicitly.
*
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
@@ -181,7 +181,7 @@ public abstract class Observable<@NonNull T> implements ObservableSource {
/**
* Returns the default 'island' size or capacity-increment hint for unbounded buffers.
*
Delegates to {@link Flowable#bufferSize} but is public for convenience.
- *
The value can be overridden via system parameter {@code rx3.buffer-size}
+ *
The value can be overridden via system parameter {@code rx4.buffer-size}
* before the {@link Flowable} class is loaded.
* @return the default 'island' size or capacity-increment hint
*/
diff --git a/src/main/java/io/reactivex/rxjava4/core/Scheduler.java b/src/main/java/io/reactivex/rxjava4/core/Scheduler.java
index fc500b374c..8a4ead7afb 100644
--- a/src/main/java/io/reactivex/rxjava4/core/Scheduler.java
+++ b/src/main/java/io/reactivex/rxjava4/core/Scheduler.java
@@ -62,7 +62,7 @@
* can detect the earlier hook and not apply a new one over again.
*
* The default implementation of {@link #now(TimeUnit)} and {@link Worker#now(TimeUnit)} methods to return current {@link System#currentTimeMillis()}
- * value in the desired time unit, unless {@code rx3.scheduler.use-nanotime} (boolean) is set. When the property is set to
+ * value in the desired time unit, unless {@code rx4.scheduler.use-nanotime} (boolean) is set. When the property is set to
* {@code true}, the method uses {@link System#nanoTime()} as its basis instead. Custom {@code Scheduler} implementations can override this
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
* Note that operators requiring a {@code Scheduler} may rely on either of the {@code now()} calls provided by
@@ -75,7 +75,7 @@
* based on the relative time between it and {@link Worker#now(TimeUnit)}. However, drifts or changes in the
* system clock could affect this calculation either by scheduling subsequent runs too frequently or too far apart.
* Therefore, the default implementation uses the {@link #clockDriftTolerance()} value (set via
- * {@code rx3.scheduler.drift-tolerance} and {@code rx3.scheduler.drift-tolerance-unit}) to detect a
+ * {@code rx4.scheduler.drift-tolerance} and {@code rx4.scheduler.drift-tolerance-unit}) to detect a
* drift in {@link Worker#now(TimeUnit)} and re-adjust the absolute/relative time calculation accordingly.
*
* The default implementations of {@link #start()} and {@link #shutdown()} do nothing and should be overridden if the
@@ -96,10 +96,10 @@ public abstract class Scheduler {
*
*/
- static boolean IS_DRIFT_USE_NANOTIME = Boolean.getBoolean("rx3.scheduler.use-nanotime");
+ static boolean IS_DRIFT_USE_NANOTIME = Boolean.getBoolean("rx4.scheduler.use-nanotime");
/**
* Returns the current clock time depending on state of {@link Scheduler#IS_DRIFT_USE_NANOTIME} in given {@code unit}
@@ -123,15 +123,15 @@ static long computeNow(TimeUnit unit) {
*
* @return the tolerance in nanoseconds
@@ -393,7 +393,7 @@ public S when(@NonNull Function
* The default implementation of the {@link #now(TimeUnit)} method returns current {@link System#currentTimeMillis()}
- * value in the desired time unit, unless {@code rx3.scheduler.use-nanotime} (boolean) is set. When the property is set to
+ * value in the desired time unit, unless {@code rx4.scheduler.use-nanotime} (boolean) is set. When the property is set to
* {@code true}, the method uses {@link System#nanoTime()} as its basis instead. Custom {@code Worker} implementations can override this
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
* Note that operators requiring a scheduler may rely on either of the {@code now()} calls provided by
@@ -406,7 +406,7 @@ public S when(@NonNull Function
* If the {@code Worker} is disposed, the {@code schedule} methods
diff --git a/src/main/java/io/reactivex/rxjava4/core/Single.java b/src/main/java/io/reactivex/rxjava4/core/Single.java
index 06516c2ff2..00c8ecfcba 100644
--- a/src/main/java/io/reactivex/rxjava4/core/Single.java
+++ b/src/main/java/io/reactivex/rxjava4/core/Single.java
@@ -76,7 +76,7 @@
* Example:
*
* Disposable d = Single.just("Hello World")
- * .delay(10, TimeUnit.SECONDS, Schedulers.io())
+ * .delay(10, TimeUnit.SECONDS, Schedulers.cached())
* .subscribeWith(new DisposableSingleObserver<String>() {
* @Override
* public void onStart() {
diff --git a/src/main/java/io/reactivex/rxjava4/internal/schedulers/IoScheduler.java b/src/main/java/io/reactivex/rxjava4/internal/schedulers/CachedScheduler.java
similarity index 94%
rename from src/main/java/io/reactivex/rxjava4/internal/schedulers/IoScheduler.java
rename to src/main/java/io/reactivex/rxjava4/internal/schedulers/CachedScheduler.java
index 36a877d482..60ece46d8a 100644
--- a/src/main/java/io/reactivex/rxjava4/internal/schedulers/IoScheduler.java
+++ b/src/main/java/io/reactivex/rxjava4/internal/schedulers/CachedScheduler.java
@@ -24,7 +24,7 @@
/**
* Scheduler that creates and caches a set of thread pools and reuses them if possible.
*/
-public final class IoScheduler extends Scheduler {
+public final class CachedScheduler extends Scheduler {
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
static final RxThreadFactory WORKER_THREAD_FACTORY;
@@ -32,7 +32,7 @@ public final class IoScheduler extends Scheduler {
static final RxThreadFactory EVICTOR_THREAD_FACTORY;
/** The name of the system property for setting the keep-alive time (in seconds) for this Scheduler workers. */
- private static final String KEY_KEEP_ALIVE_TIME = "rx3.io-keep-alive-time";
+ private static final String KEY_KEEP_ALIVE_TIME = "rx4.cached-keep-alive-time";
public static final long KEEP_ALIVE_TIME_DEFAULT = 60;
private static final long KEEP_ALIVE_TIME;
@@ -43,10 +43,10 @@ public final class IoScheduler extends Scheduler {
final AtomicReference pool;
/** The name of the system property for setting the thread priority for this Scheduler. */
- private static final String KEY_IO_PRIORITY = "rx3.io-priority";
+ private static final String KEY_IO_PRIORITY = "rx4.cached-priority";
/** The name of the system property for setting the release behaviour for this Scheduler. */
- private static final String KEY_SCHEDULED_RELEASE = "rx3.io-scheduled-release";
+ private static final String KEY_SCHEDULED_RELEASE = "rx4.cached-scheduled-release";
static boolean USE_SCHEDULED_RELEASE;
static final CachedWorkerPool NONE;
@@ -156,16 +156,16 @@ void shutdown() {
}
}
- public IoScheduler() {
+ public CachedScheduler() {
this(WORKER_THREAD_FACTORY);
}
/**
- * Constructs an IoScheduler with the given thread factory and starts the pool of workers.
+ * Constructs an CachedScheduler with the given thread factory and starts the pool of workers.
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
*/
- public IoScheduler(ThreadFactory threadFactory) {
+ public CachedScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<>(NONE);
start();
diff --git a/src/main/java/io/reactivex/rxjava4/internal/schedulers/ComputationScheduler.java b/src/main/java/io/reactivex/rxjava4/internal/schedulers/ComputationScheduler.java
index 701317dbc3..533e31f6b6 100644
--- a/src/main/java/io/reactivex/rxjava4/internal/schedulers/ComputationScheduler.java
+++ b/src/main/java/io/reactivex/rxjava4/internal/schedulers/ComputationScheduler.java
@@ -36,7 +36,7 @@ public final class ComputationScheduler extends Scheduler implements SchedulerMu
* Key to setting the maximum number of computation scheduler threads.
* Zero or less is interpreted as use available. Capped by available.
*/
- static final String KEY_MAX_THREADS = "rx3.computation-threads";
+ static final String KEY_MAX_THREADS = "rx4.computation-threads";
/** The maximum number of computation scheduler threads. */
static final int MAX_THREADS;
@@ -45,7 +45,7 @@ public final class ComputationScheduler extends Scheduler implements SchedulerMu
final ThreadFactory threadFactory;
final AtomicReference pool;
/** The name of the system property for setting the thread priority for this Scheduler. */
- private static final String KEY_COMPUTATION_PRIORITY = "rx3.computation-priority";
+ private static final String KEY_COMPUTATION_PRIORITY = "rx4.computation-priority";
static {
MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
diff --git a/src/main/java/io/reactivex/rxjava4/internal/schedulers/DeferredExecutorScheduler.java b/src/main/java/io/reactivex/rxjava4/internal/schedulers/DeferredExecutorScheduler.java
new file mode 100644
index 0000000000..632c032401
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava4/internal/schedulers/DeferredExecutorScheduler.java
@@ -0,0 +1,470 @@
+/*
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.rxjava4.internal.schedulers;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import io.reactivex.rxjava4.annotations.NonNull;
+import io.reactivex.rxjava4.core.Scheduler;
+import io.reactivex.rxjava4.disposables.*;
+import io.reactivex.rxjava4.exceptions.Exceptions;
+import io.reactivex.rxjava4.functions.Supplier;
+import io.reactivex.rxjava4.internal.disposables.*;
+import io.reactivex.rxjava4.internal.functions.Functions;
+import io.reactivex.rxjava4.internal.queue.MpscLinkedQueue;
+import io.reactivex.rxjava4.plugins.RxJavaPlugins;
+import io.reactivex.rxjava4.schedulers.*;
+
+/**
+ * Wraps an Executor supplier and provides the Scheduler API over an instance of Executor
+ * created on demand.
+ */
+public final class DeferredExecutorScheduler extends Scheduler {
+
+ final boolean interruptibleWorker;
+
+ final boolean fair;
+
+ @NonNull
+ final Supplier extends Executor> executorSupplier;
+
+ static final class SingleHolder {
+ static final Scheduler HELPER = Schedulers.single();
+ }
+
+ public DeferredExecutorScheduler(@NonNull Supplier extends Executor> executorSupplier, boolean interruptibleWorker, boolean fair) {
+ this.executorSupplier = executorSupplier;
+ this.interruptibleWorker = interruptibleWorker;
+ this.fair = fair;
+ }
+
+ @NonNull
+ @Override
+ public Worker createWorker() {
+ try {
+ return new ExecutorWorker(executorSupplier.get(), interruptibleWorker, fair);
+ } catch (Throwable t) {
+ Exceptions.throwIfFatal(t);
+ throw Exceptions.propagate(t);
+ }
+ }
+
+ /* public: test support. */
+ public static final class ExecutorWorker extends Scheduler.Worker implements Runnable {
+
+ final boolean interruptibleWorker;
+
+ final boolean fair;
+
+ final Executor executor;
+
+ final MpscLinkedQueue queue;
+
+ volatile boolean disposed;
+
+ final AtomicInteger wip = new AtomicInteger();
+
+ final CompositeDisposable tasks = new CompositeDisposable();
+
+ public ExecutorWorker(Executor executor, boolean interruptibleWorker, boolean fair) {
+ this.executor = executor;
+ this.queue = new MpscLinkedQueue<>();
+ this.interruptibleWorker = interruptibleWorker;
+ this.fair = fair;
+ }
+
+ @NonNull
+ @Override
+ public Disposable schedule(@NonNull Runnable run) {
+ if (disposed) {
+ return EmptyDisposable.INSTANCE;
+ }
+
+ Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
+
+ Runnable task;
+ Disposable disposable;
+
+ if (interruptibleWorker) {
+ InterruptibleRunnable interruptibleTask = new InterruptibleRunnable(decoratedRun, tasks);
+ tasks.add(interruptibleTask);
+
+ task = interruptibleTask;
+ disposable = interruptibleTask;
+ } else {
+ @SuppressWarnings("resource")
+ BooleanRunnable runnableTask = new BooleanRunnable(decoratedRun);
+
+ task = runnableTask;
+ disposable = runnableTask;
+ }
+
+ queue.offer(task);
+
+ if (wip.getAndIncrement() == 0) {
+ try {
+ executor.execute(this);
+ } catch (RejectedExecutionException ex) {
+ disposed = true;
+ queue.clear();
+ RxJavaPlugins.onError(ex);
+ return EmptyDisposable.INSTANCE;
+ }
+ }
+
+ return disposable;
+ }
+
+ @NonNull
+ @Override
+ public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
+ if (delay <= 0) {
+ return schedule(run);
+ }
+ if (disposed) {
+ return EmptyDisposable.INSTANCE;
+ }
+
+ SequentialDisposable first = new SequentialDisposable();
+
+ final SequentialDisposable mar = new SequentialDisposable(first);
+
+ final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
+
+ ScheduledRunnable sr = new ScheduledRunnable(new SequentialDispose(mar, decoratedRun), tasks, interruptibleWorker);
+ tasks.add(sr);
+
+ if (executor instanceof ScheduledExecutorService) {
+ try {
+ Future> f = ((ScheduledExecutorService)executor).schedule((Callable