Skip to content

Commit 7c9661e

Browse files
authored
4.x: +shared, +parallel, +blocking Scheduler (#8154)
* 4.x: +shared, +parallel, +blocking Scheduler * Improve new scheduler tests, fix access issues * BlockingScheduler clarification * Fix documentation style error * Fix unit mistakes in the config record
1 parent 8af1ab4 commit 7c9661e

18 files changed

Lines changed: 2724 additions & 44 deletions

src/main/java/io/reactivex/rxjava4/core/Flowable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
* <em>Reactive Streams</em> implementations.
5353
* <p>
5454
* The {@code Flowable} hosts the default buffer size of 128 elements for operators, accessible via {@link #bufferSize()},
55-
* that can be overridden globally via the system parameter {@code rx4.buffer-size}. Most operators, however, have
55+
* that can be overridden globally via the system parameter {@code rxjava4.buffer-size}. Most operators, however, have
5656
* overloads that allow setting their internal buffer size explicitly.
5757
* <p>
5858
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
@@ -162,7 +162,7 @@ public abstract non-sealed class Flowable<@NonNull T> implements Publisher<T>,
162162
/** The default buffer size. */
163163
static final int BUFFER_SIZE;
164164
static {
165-
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx4.buffer-size", 128));
165+
BUFFER_SIZE = Math.max(1, Integer.getInteger("rxjava4.buffer-size", 128));
166166
}
167167

168168
/**
@@ -251,7 +251,7 @@ public abstract non-sealed class Flowable<@NonNull T> implements Publisher<T>,
251251

252252
/**
253253
* Returns the default internal buffer size used by most async operators.
254-
* <p>The value can be overridden via system parameter {@code rx4.buffer-size}
254+
* <p>The value can be overridden via system parameter {@code rxjava4.buffer-size}
255255
* <em>before</em> the {@code Flowable} class is loaded.
256256
* @return the default internal buffer size.
257257
*/

src/main/java/io/reactivex/rxjava4/core/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
* for such non-backpressured flows, which {@code Observable} itself implements as well.
4848
* <p>
4949
* The {@code Observable}'s operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()}),
50-
* that can be overridden globally via the system parameter {@code rx4.buffer-size}. Most operators, however, have
50+
* that can be overridden globally via the system parameter {@code rxjava4.buffer-size}. Most operators, however, have
5151
* overloads that allow setting their internal buffer size explicitly.
5252
* <p>
5353
* The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
@@ -181,7 +181,7 @@ public abstract class Observable<@NonNull T> implements ObservableSource<T> {
181181
/**
182182
* Returns the default 'island' size or capacity-increment hint for unbounded buffers.
183183
* <p>Delegates to {@link Flowable#bufferSize} but is public for convenience.
184-
* <p>The value can be overridden via system parameter {@code rx4.buffer-size}
184+
* <p>The value can be overridden via system parameter {@code rxjava4.buffer-size}
185185
* <em>before</em> the {@link Flowable} class is loaded.
186186
* @return the default 'island' size or capacity-increment hint
187187
*/

src/main/java/io/reactivex/rxjava4/core/Scheduler.java

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
* can detect the earlier hook and not apply a new one over again.
6262
* <p>
6363
* The default implementation of {@link #now(TimeUnit)} and {@link Worker#now(TimeUnit)} methods to return current {@link System#currentTimeMillis()}
64-
* value in the desired time unit, unless {@code rx4.scheduler.use-nanotime} (boolean) is set. When the property is set to
64+
* value in the desired time unit, unless {@code rxjava4.scheduler.use-nanotime} (boolean) is set. When the property is set to
6565
* {@code true}, the method uses {@link System#nanoTime()} as its basis instead. Custom {@code Scheduler} implementations can override this
6666
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
6767
* Note that operators requiring a {@code Scheduler} may rely on either of the {@code now()} calls provided by
@@ -74,7 +74,7 @@
7474
* based on the relative time between it and {@link Worker#now(TimeUnit)}. However, drifts or changes in the
7575
* system clock could affect this calculation either by scheduling subsequent runs too frequently or too far apart.
7676
* Therefore, the default implementation uses the {@link #clockDriftTolerance()} value (set via
77-
* {@code rx4.scheduler.drift-tolerance} and {@code rx4.scheduler.drift-tolerance-unit}) to detect a
77+
* {@code rxjava4.scheduler.drift-tolerance} and {@code rxjava4.scheduler.drift-tolerance-unit}) to detect a
7878
* drift in {@link Worker#now(TimeUnit)} and re-adjust the absolute/relative time calculation accordingly.
7979
* <p>
8080
* The default implementations of {@link #start()} and {@link #shutdown()} do nothing and should be overridden if the
@@ -95,10 +95,10 @@ public abstract class Scheduler {
9595
* <p>
9696
* Associated system parameter:
9797
* <ul>
98-
* <li>{@code rx4.scheduler.use-nanotime}, boolean, default {@code false}
98+
* <li>{@code rxjava4.scheduler.use-nanotime}, boolean, default {@code false}
9999
* </ul>
100100
*/
101-
static boolean IS_DRIFT_USE_NANOTIME = Boolean.getBoolean("rx4.scheduler.use-nanotime");
101+
static boolean IS_DRIFT_USE_NANOTIME = Boolean.getBoolean("rxjava4.scheduler.use-nanotime");
102102

103103
/**
104104
* Returns the current clock time depending on state of {@link Scheduler#IS_DRIFT_USE_NANOTIME} in given {@code unit}
@@ -122,15 +122,15 @@ static long computeNow(TimeUnit unit) {
122122
* <p>
123123
* Associated system parameters:
124124
* <ul>
125-
* <li>{@code rx4.scheduler.drift-tolerance}, long, default {@code 15}</li>
126-
* <li>{@code rx4.scheduler.drift-tolerance-unit}, string, default {@code minutes},
125+
* <li>{@code rxjava4.scheduler.drift-tolerance}, long, default {@code 15}</li>
126+
* <li>{@code rxjava4.scheduler.drift-tolerance-unit}, string, default {@code minutes},
127127
* supports {@code seconds} and {@code milliseconds}.
128128
* </ul>
129129
*/
130130
static final long CLOCK_DRIFT_TOLERANCE_NANOSECONDS =
131131
computeClockDrift(
132-
Long.getLong("rx4.scheduler.drift-tolerance", 15),
133-
System.getProperty("rx4.scheduler.drift-tolerance-unit", "minutes")
132+
Long.getLong("rxjava4.scheduler.drift-tolerance", 15),
133+
System.getProperty("rxjava4.scheduler.drift-tolerance-unit", "minutes")
134134
);
135135

136136
/**
@@ -152,8 +152,8 @@ static long computeClockDrift(long time, String timeUnit) {
152152
* Returns the clock drift tolerance in nanoseconds.
153153
* <p>Related system properties:
154154
* <ul>
155-
* <li>{@code rx4.scheduler.drift-tolerance}, long, default {@code 15}</li>
156-
* <li>{@code rx4.scheduler.drift-tolerance-unit}, string, default {@code minutes},
155+
* <li>{@code rxjava4.scheduler.drift-tolerance}, long, default {@code 15}</li>
156+
* <li>{@code rxjava4.scheduler.drift-tolerance-unit}, string, default {@code minutes},
157157
* supports {@code seconds} and {@code milliseconds}.
158158
* </ul>
159159
* @return the tolerance in nanoseconds
@@ -320,6 +320,24 @@ public ExecutorService toExecutorService(boolean useWorker) {
320320
return new SchedulerToExecutorService(this, new AtomicReference<>(useWorker ? createWorker() : null));
321321
}
322322

323+
/**
324+
* Creates a new {@code Scheduler} that uses one of the Workers from this Scheduler
325+
* and shares the access to it through its own Workers.
326+
* <p>
327+
* Disposing a worker doesn't dispose the underlying shared worker so other
328+
* workers of this class can continue their work; use {@link #shutdown()} to release
329+
* the underlying shared worker.
330+
* <p>
331+
* This scheduler doesn't support {@link #start()} (it's a no-op) and once {@link #shutdown()}
332+
* it can't be revived.
333+
* @return the shared Scheduler instance
334+
* @since 4.0.0
335+
*/
336+
@NonNull
337+
public Scheduler share() {
338+
return createWorker().share();
339+
}
340+
323341
/**
324342
* Represents an isolated, sequential worker of a parent Scheduler for executing {@code Runnable} tasks on
325343
* an underlying task-execution scheme (such as custom Threads, event loop, {@link java.util.concurrent.Executor Executor} or Actor system).
@@ -332,7 +350,7 @@ public ExecutorService toExecutorService(boolean useWorker) {
332350
* {@link #dispose()} can prevent their execution or potentially interrupt them if they are currently running.
333351
* <p>
334352
* The default implementation of the {@link #now(TimeUnit)} method returns current {@link System#currentTimeMillis()}
335-
* value in the desired time unit, unless {@code rx4.scheduler.use-nanotime} (boolean) is set. When the property is set to
353+
* value in the desired time unit, unless {@code rxjava4.scheduler.use-nanotime} (boolean) is set. When the property is set to
336354
* {@code true}, the method uses {@link System#nanoTime()} as its basis instead. Custom {@code Worker} implementations can override this
337355
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
338356
* Note that operators requiring a scheduler may rely on either of the {@code now()} calls provided by
@@ -345,7 +363,7 @@ public ExecutorService toExecutorService(boolean useWorker) {
345363
* based on the relative time between it and {@link #now(TimeUnit)}. However, drifts or changes in the
346364
* system clock would affect this calculation either by scheduling subsequent runs too frequently or too far apart.
347365
* Therefore, the default implementation uses the {@link #clockDriftTolerance()} value (set via
348-
* {@code rx4.scheduler.drift-tolerance} and {@code rx4.scheduler.drift-tolerance-unit}) to detect a drift in {@link #now(TimeUnit)} and
366+
* {@code rxjava4.scheduler.drift-tolerance} and {@code rxjava4.scheduler.drift-tolerance-unit}) to detect a drift in {@link #now(TimeUnit)} and
349367
* re-adjust the absolute/relative time calculation accordingly.
350368
* <p>
351369
* If the {@code Worker} is disposed, the {@code schedule} methods
@@ -455,6 +473,24 @@ public long now(@NonNull TimeUnit unit) {
455473
return computeNow(unit);
456474
}
457475

476+
/**
477+
* Creates a new {@code Scheduler} that uses this Worker
478+
* and shares the access to it through its own Workers.
479+
* <p>
480+
* Disposing a worker doesn't dispose the underlying shared worker so other
481+
* workers of this class can continue their work; use {@link #shutdown()} to release
482+
* the underlying shared worker.
483+
* <p>
484+
* This scheduler doesn't support {@link #start()} (it's a no-op) and once {@link #shutdown()}
485+
* it can't be revived.
486+
* @return the shared Scheduler instance
487+
* @since 4.0.0
488+
*/
489+
@NonNull
490+
public Scheduler share() {
491+
return new SharedScheduler(this);
492+
}
493+
458494
/**
459495
* Holds state and logic to calculate when the next delayed invocation
460496
* of this task has to happen (accounting for clock drifts).
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import java.util.concurrent.ThreadFactory;
17+
18+
import io.reactivex.rxjava4.annotations.*;
19+
import io.reactivex.rxjava4.schedulers.Schedulers;
20+
21+
/**
22+
* Configuration record for {@link Schedulers#createParallel(ParallelSchedulerConfig)}.
23+
* @param parallelism the number of concurrent threads, default the number of CPUs.
24+
* @param tracking if true, tasks submitted to it will be tracked and can be en-masse disposed
25+
* @param priority the thread priority of the created platform threads.
26+
* @param threadNamePrefix the prefix to name the scheduler's threads
27+
* @param factory the customizable factory for the underlying Executor, if non-null, the priority and threadNamePrefix
28+
* are ignored
29+
*/
30+
public record ParallelSchedulerConfig(
31+
int parallelism,
32+
boolean tracking,
33+
int priority,
34+
@NonNull String threadNamePrefix,
35+
@Nullable ThreadFactory factory) {
36+
37+
/**
38+
* Creates a default config with available CPUs parallelism,
39+
* normal priority, tracking and RxParallelScheduler thread name prefix.
40+
*/
41+
public ParallelSchedulerConfig() {
42+
this(Runtime.getRuntime().availableProcessors(), true, Thread.NORM_PRIORITY, "RxParallelScheduler", null);
43+
}
44+
45+
/**
46+
* Creates a default config with the given parallelism,
47+
* normal priority, tracking and RxParallelScheduler thread name prefix.
48+
* @param parallelism the number of threads to work with in the scheduler
49+
*/
50+
public ParallelSchedulerConfig(int parallelism) {
51+
this(parallelism, true, Thread.NORM_PRIORITY, "RxParallelScheduler", null);
52+
}
53+
54+
/**
55+
* Creates a default config with the given parallelism,
56+
* normal priority, optionally tracking and RxParallelScheduler thread name prefix.
57+
* @param parallelism the number of threads to work with in the scheduler
58+
* @param tracking if true, tasks submitted to it will be tracked and can be en-masse disposed
59+
*/
60+
public ParallelSchedulerConfig(int parallelism, boolean tracking) {
61+
this(parallelism, tracking, Thread.NORM_PRIORITY, "RxParallelScheduler", null);
62+
}
63+
64+
/**
65+
* Creates a default config with the given parallelism,
66+
* normal priority, optionally tracking and RxParallelScheduler thread name prefix.
67+
* @param parallelism the number of threads to work with in the scheduler
68+
* @param threadNamePrefix the prefix to name the scheduler's threads
69+
*/
70+
public ParallelSchedulerConfig(int parallelism, @NonNull String threadNamePrefix) {
71+
this(parallelism, true, Thread.NORM_PRIORITY, threadNamePrefix, null);
72+
}
73+
74+
/**
75+
* Creates a default config with the given parallelism,
76+
* normal priority, optionally tracking and RxParallelScheduler thread name prefix.
77+
* @param parallelism the number of threads to work with in the scheduler
78+
* @param tracking if true, tasks submitted to it will be tracked and can be en-masse disposed
79+
* @param threadNamePrefix the prefix to name the scheduler's threads
80+
*/
81+
public ParallelSchedulerConfig(int parallelism, boolean tracking, @NonNull String threadNamePrefix) {
82+
this(parallelism, tracking, Thread.NORM_PRIORITY, threadNamePrefix, null);
83+
}
84+
85+
/**
86+
* Creates a default config with the given parallelism,
87+
* normal priority, optionally tracking and RxParallelScheduler thread name prefix.
88+
* @param parallelism the number of threads to work with in the scheduler
89+
* @param tracking if true, tasks submitted to it will be tracked and can be en-masse disposed
90+
* @param factory the customizable factory for the underlying Executor
91+
*/
92+
public ParallelSchedulerConfig(int parallelism, boolean tracking, @NonNull ThreadFactory factory) {
93+
this(parallelism, tracking, Thread.NORM_PRIORITY, "", factory);
94+
}
95+
96+
/**
97+
* Creates a fully configurable ParallelSchedulerConfig object.
98+
* @param parallelism the number of threads to work with in the scheduler
99+
* @param tracking if true, tasks submitted to it will be tracked and can be en-masse disposed
100+
* @param priority
101+
* @param threadNamePrefix the prefix to name the scheduler's threads
102+
* @param factory the customizable factory for the underlying Executor, if non-null, the priority
103+
* and threadNamePrefix are ignored
104+
*/
105+
public ParallelSchedulerConfig(
106+
int parallelism,
107+
boolean tracking,
108+
int priority,
109+
@NonNull String threadNamePrefix,
110+
@Nullable ThreadFactory factory) {
111+
this.parallelism = parallelism;
112+
this.tracking = tracking;
113+
this.priority = priority;
114+
this.threadNamePrefix = threadNamePrefix;
115+
this.factory = factory;
116+
}
117+
}

0 commit comments

Comments
 (0)