Skip to content

Commit 73241eb

Browse files
authored
4.x: Enhance Streamable/Streamer and DisposableContainer APIs (#8092)
* 4.x: Enhance Streamable/Streamer and DisposableContainer APIs * Create the "await" keyword, as static helper method. 10 foot pile of shit, 11 foot high ladder, Brian. * Add conversion operators for the minimum viable testing * Upgrade components, add Streamable.create for the universal generator * Fix flaky tests, Fix a missing await() call. F java
1 parent 377194b commit 73241eb

37 files changed

+1793
-320
lines changed

src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,30 @@
1515

1616
import java.util.concurrent.CompletionStage;
1717

18-
import io.reactivex.rxjava4.disposables.Disposable;
18+
import io.reactivex.rxjava4.annotations.NonNull;
19+
import io.reactivex.rxjava4.disposables.*;
1920

20-
public record CompletionStageDisposable<T>(CompletionStage<T> stage, Disposable disposable) {
21+
/**
22+
* Consist of a terminal stage and a disposable to be able to cancel a sequence.
23+
* @param <T> the return and element type of the various stages
24+
* @param stage the embedded stage to work with
25+
* @param disposable the way to cancel the stage concurrently
26+
* @since 4.0.0
27+
*/
28+
public record CompletionStageDisposable<T>(@NonNull CompletionStage<T> stage, @NonNull Disposable disposable) {
29+
30+
/**
31+
* Await the completion of the current stage.
32+
*/
33+
public void await() {
34+
Streamer.await(stage);
35+
}
2136

37+
/**
38+
* Await the completion of the current stage.
39+
* @param canceller the canceller link
40+
*/
41+
public void await(DisposableContainer canceller) {
42+
Streamer.await(stage, canceller);
43+
}
2244
}

src/main/java/io/reactivex/rxjava4/core/Flowable.java

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.reactivex.rxjava4.internal.operators.mixed.*;
3333
import io.reactivex.rxjava4.internal.operators.observable.ObservableFromPublisher;
3434
import io.reactivex.rxjava4.internal.operators.single.SingleToFlowable;
35+
import io.reactivex.rxjava4.internal.operators.streamable.StreamableFromPublisher;
3536
import io.reactivex.rxjava4.internal.schedulers.ImmediateThinScheduler;
3637
import io.reactivex.rxjava4.internal.subscribers.*;
3738
import io.reactivex.rxjava4.internal.util.*;
@@ -16023,7 +16024,7 @@ public final void subscribe(@NonNull FlowableSubscriber<? super T> subscriber) {
1602316024
// can't call onSubscribe because the call might have set a Subscription already
1602416025
RxJavaPlugins.onError(e);
1602516026

16026-
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
16027+
var npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
1602716028
npe.initCause(e);
1602816029
throw npe;
1602916030
}
@@ -20928,7 +20929,7 @@ public final Stream<T> blockingStream(int prefetch) {
2092820929
* {@code ExecutorService} uses virtual threads, such as the one returned by
2092920930
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
2093020931
* @param <R> the downstream element type
20931-
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
20932+
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)}
2093220933
* is invoked for each upstream item
2093320934
* @param executor the target {@code ExecutorService} to use for running the callback
2093420935
* @return the new {@code Flowable} instance
@@ -20961,7 +20962,7 @@ public final Stream<T> blockingStream(int prefetch) {
2096120962
* {@link Scheduler} uses virtual threads, such as the one returned by
2096220963
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
2096320964
* @param <R> the downstream element type
20964-
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
20965+
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)}
2096520966
* is invoked for each upstream item
2096620967
* @return the new {@code Flowable} instance
2096720968
* @throws NullPointerException if {@code transformer} is {@code null}
@@ -20992,7 +20993,7 @@ public final Stream<T> blockingStream(int prefetch) {
2099220993
* {@code Scheduler} uses virtual threads, such as the one returned by
2099320994
* {@link Schedulers#virtual()}.
2099420995
* @param <R> the downstream element type
20995-
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
20996+
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)}
2099620997
* is invoked for each upstream item
2099720998
* @param scheduler the target {@code Scheduler} to use for running the callback
2099820999
* @param prefetch the number of items to fetch from the upstream.
@@ -21030,7 +21031,7 @@ public final Stream<T> blockingStream(int prefetch) {
2103021031
* {@code ExecutorService} uses virtual threads, such as the one returned by
2103121032
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
2103221033
* @param <R> the downstream element type
21033-
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
21034+
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)}
2103421035
* is invoked for each upstream item
2103521036
* @param executor the target {@code ExecutorService} to use for running the callback
2103621037
* @param prefetch the number of items to fetch from the upstream.
@@ -21051,4 +21052,54 @@ public final Stream<T> blockingStream(int prefetch) {
2105121052
return new FlowableVirtualTransformExecutor<>(this, transformer, executor, null, prefetch);
2105221053
}
2105321054

21055+
/**
21056+
* Converts this {@code Flowable} into a {@link Streamable} instance,
21057+
* transparently relaying signals between the two async representations of a sequence.
21058+
* <p>
21059+
* <dl>
21060+
* <dt><b>Backpressure:</b></dt>
21061+
* <dd>This operator requests from the upstream in a bounded manner and
21062+
* relays the values one-by-one to the {@link Streamer } view of the sequence.
21063+
* </dd>
21064+
* <dt><b>Scheduler:</b></dt>
21065+
* <dd>The operator by design runs on the default virtual executor of the system.</dd>
21066+
* </dl>
21067+
* <p>
21068+
* @return the new {@code Streamable} instance
21069+
* @since 4.0.0
21070+
*/
21071+
@CheckReturnValue
21072+
@BackpressureSupport(BackpressureKind.FULL)
21073+
@SchedulerSupport(SchedulerSupport.VIRTUAL)
21074+
@NonNull
21075+
public final Streamable<T> toStreamable() {
21076+
return toStreamable(Executors.newVirtualThreadPerTaskExecutor());
21077+
}
21078+
21079+
/**
21080+
* Converts this {@code Flowable} into a {@link Streamable} instance,
21081+
* transparently relaying signals between the two async representations of a sequence.
21082+
* <p>
21083+
* <dl>
21084+
* <dt><b>Backpressure:</b></dt>
21085+
* <dd>This operator requests from the upstream in a bounded manner and
21086+
* relays the values one-by-one to the {@link Streamer } view of the sequence.
21087+
* </dd>
21088+
* <dt><b>Scheduler:</b></dt>
21089+
* <dd>The operator runs on the provided {@link ExecutorService}.</dd>
21090+
* </dl>
21091+
* <p>
21092+
* @param executor where the coordination will happen
21093+
* @return the new {@code Streamable} instance
21094+
* @throws NullPointerException if {@code executor} is {@code null}
21095+
* @since 4.0.0
21096+
*/
21097+
@CheckReturnValue
21098+
@BackpressureSupport(BackpressureKind.FULL)
21099+
@SchedulerSupport(SchedulerSupport.NONE)
21100+
@NonNull
21101+
public final Streamable<T> toStreamable(ExecutorService executor) {
21102+
Objects.requireNonNull(executor, "executor is null");
21103+
return new StreamableFromPublisher<>(this, executor);
21104+
}
2105421105
}

src/main/java/io/reactivex/rxjava4/core/Scheduler.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
package io.reactivex.rxjava4.core;
1515

1616
import java.util.Objects;
17-
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.*;
18+
import java.util.concurrent.atomic.AtomicReference;
1819

1920
import io.reactivex.rxjava4.annotations.*;
2021
import io.reactivex.rxjava4.disposables.Disposable;
@@ -381,6 +382,29 @@ public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flow
381382
return (S) new SchedulerWhen(combine, this);
382383
}
383384

385+
/**
386+
* Turn this Scheduler into an ExecutorService implementation
387+
* using its various *Direct() methods instead of workers.
388+
* @return the ExecutorService view of this Scheduler
389+
* @since 4.0.0
390+
*/
391+
public ExecutorService toExecutorService() {
392+
return new SchedulerToExecutorService(this, null);
393+
}
394+
395+
/**
396+
* Turn this Scheduler into an ExecutorService implementation
397+
* using its various *Direct() methods or worker methods,
398+
* depending on the parameter.
399+
* @param useWorker if true, one of the workers is used as an executorservice,
400+
* if false, the whole scheduler and its *Direct methods are used.
401+
* @return the ExecutorService view of this Scheduler
402+
* @since 4.0.0
403+
*/
404+
public ExecutorService toExecutorService(boolean useWorker) {
405+
return new SchedulerToExecutorService(this, new AtomicReference<>());
406+
}
407+
384408
/**
385409
* Represents an isolated, sequential worker of a parent Scheduler for executing {@code Runnable} tasks on
386410
* an underlying task-execution scheme (such as custom Threads, event loop, {@link java.util.concurrent.Executor Executor} or Actor system).
@@ -572,6 +596,35 @@ public Runnable getWrappedRunnable() {
572596
return this.decoratedRun;
573597
}
574598
}
599+
600+
/**
601+
* A stateless Worker that reports itself as shutdown and doesn't do anything.
602+
* @since 4.0.0
603+
*/
604+
@NonNull
605+
public static final Worker SHUTDOWN = new ShutdownWorker();
606+
}
607+
608+
/**
609+
* Implementation of a stateless, shutdown worker. For cleanup and termination purposes.
610+
* @since 4.0.0
611+
*/
612+
static final class ShutdownWorker extends Worker {
613+
614+
@Override
615+
public void dispose() {
616+
}
617+
618+
@Override
619+
public boolean isDisposed() {
620+
return true;
621+
}
622+
623+
@Override
624+
public @NonNull Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
625+
return Disposable.disposed();
626+
}
627+
575628
}
576629

577630
static final class PeriodicDirectTask

0 commit comments

Comments
 (0)