Skip to content

Commit 34217c7

Browse files
committed
Upgrade components, add Streamable.create for the universal generator
1 parent fdf14cf commit 34217c7

12 files changed

Lines changed: 680 additions & 63 deletions

File tree

src/main/java/io/reactivex/rxjava4/core/CompletionStageDisposable.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,30 @@
1515

1616
import java.util.concurrent.CompletionStage;
1717

18-
import io.reactivex.rxjava4.disposables.Disposable;
18+
import io.reactivex.rxjava4.annotations.NonNull;
19+
import io.reactivex.rxjava4.disposables.*;
1920

20-
public record CompletionStageDisposable<T>(CompletionStage<T> stage, Disposable disposable) {
21+
/**
22+
* Consist of a terminal stage and a disposable to be able to cancel a sequence.
23+
* @param <T> the return and element type of the various stages
24+
* @param stage the embedded stage to work with
25+
* @param disposable the way to cancel the stage concurrently
26+
* @since 4.0.0
27+
*/
28+
public record CompletionStageDisposable<T>(@NonNull CompletionStage<T> stage, @NonNull Disposable disposable) {
29+
30+
/**
31+
* Await the completion of the current stage.
32+
*/
33+
public void await() {
34+
Streamer.await(stage);
35+
}
2136

37+
/**
38+
* Await the completion of the current stage.
39+
* @param canceller the canceller link
40+
*/
41+
public void await(DisposableContainer canceller) {
42+
Streamer.await(stage, canceller);
43+
}
2244
}

src/main/java/io/reactivex/rxjava4/core/Flowable.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21062,16 +21062,44 @@ public final Stream<T> blockingStream(int prefetch) {
2106221062
* relays the values one-by-one to the {@link Streamer } view of the sequence.
2106321063
* </dd>
2106421064
* <dt><b>Scheduler:</b></dt>
21065-
* <dd>The operator by design does not run on any scheduler or executor.</dd>
21065+
* <dd>The operator by design runs on the default virtual executor of the system.</dd>
2106621066
* </dl>
2106721067
* <p>
2106821068
* @return the new {@code Streamable} instance
21069+
* @since 4.0.0
2106921070
*/
2107021071
@CheckReturnValue
2107121072
@BackpressureSupport(BackpressureKind.FULL)
21072-
@SchedulerSupport(SchedulerSupport.NONE)
21073+
@SchedulerSupport(SchedulerSupport.VIRTUAL)
2107321074
@NonNull
2107421075
public final Streamable<T> toStreamable() {
21075-
return new StreamableFromPublisher <>(this);
21076+
return toStreamable(Executors.newVirtualThreadPerTaskExecutor());
21077+
}
21078+
21079+
/**
21080+
* Converts this {@code Flowable} into a {@link Streamable} instance,
21081+
* transparently relaying signals between the two async representations of a sequence.
21082+
* <p>
21083+
* <dl>
21084+
* <dt><b>Backpressure:</b></dt>
21085+
* <dd>This operator requests from the upstream in a bounded manner and
21086+
* relays the values one-by-one to the {@link Streamer } view of the sequence.
21087+
* </dd>
21088+
* <dt><b>Scheduler:</b></dt>
21089+
* <dd>The operator runs on the provided {@link ExecutorService}.</dd>
21090+
* </dl>
21091+
* <p>
21092+
* @param executor where the coordination will happen
21093+
* @return the new {@code Streamable} instance
21094+
* @throws NullPointerException if {@code executor} is {@code null}
21095+
* @since 4.0.0
21096+
*/
21097+
@CheckReturnValue
21098+
@BackpressureSupport(BackpressureKind.FULL)
21099+
@SchedulerSupport(SchedulerSupport.NONE)
21100+
@NonNull
21101+
public final Streamable<T> toStreamable(ExecutorService executor) {
21102+
Objects.requireNonNull(executor, "executor is null");
21103+
return new StreamableFromPublisher<>(this, executor);
2107621104
}
2107721105
}

src/main/java/io/reactivex/rxjava4/core/Scheduler.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
package io.reactivex.rxjava4.core;
1515

1616
import java.util.Objects;
17-
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.*;
18+
import java.util.concurrent.atomic.AtomicReference;
1819

1920
import io.reactivex.rxjava4.annotations.*;
2021
import io.reactivex.rxjava4.disposables.Disposable;
@@ -381,6 +382,29 @@ public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flow
381382
return (S) new SchedulerWhen(combine, this);
382383
}
383384

385+
/**
386+
* Turn this Scheduler into an ExecutorService implementation
387+
* using its various *Direct() methods instead of workers.
388+
* @return the ExecutorService view of this Scheduler
389+
* @since 4.0.0
390+
*/
391+
public ExecutorService toExecutorService() {
392+
return new SchedulerToExecutorService(this, null);
393+
}
394+
395+
/**
396+
* Turn this Scheduler into an ExecutorService implementation
397+
* using its various *Direct() methods or worker methods,
398+
* depending on the parameter.
399+
* @param useWorker if true, one of the workers is used as an executorservice,
400+
* if false, the whole scheduler and its *Direct methods are used.
401+
* @return the ExecutorService view of this Scheduler
402+
* @since 4.0.0
403+
*/
404+
public ExecutorService toExecutorService(boolean useWorker) {
405+
return new SchedulerToExecutorService(this, new AtomicReference<>());
406+
}
407+
384408
/**
385409
* Represents an isolated, sequential worker of a parent Scheduler for executing {@code Runnable} tasks on
386410
* an underlying task-execution scheme (such as custom Threads, event loop, {@link java.util.concurrent.Executor Executor} or Actor system).
@@ -572,6 +596,35 @@ public Runnable getWrappedRunnable() {
572596
return this.decoratedRun;
573597
}
574598
}
599+
600+
/**
601+
* A stateless Worker that reports itself as shutdown and doesn't do anything.
602+
* @since 4.0.0
603+
*/
604+
@NonNull
605+
public static final Worker SHUTDOWN = new ShutdownWorker();
606+
}
607+
608+
/**
609+
* Implementation of a stateless, shutdown worker. For cleanup and termination purposes.
610+
* @since 4.0.0
611+
*/
612+
static final class ShutdownWorker extends Worker {
613+
614+
@Override
615+
public void dispose() {
616+
}
617+
618+
@Override
619+
public boolean isDisposed() {
620+
return true;
621+
}
622+
623+
@Override
624+
public @NonNull Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
625+
return Disposable.disposed();
626+
}
627+
575628
}
576629

577630
static final class PeriodicDirectTask

src/main/java/io/reactivex/rxjava4/core/Streamable.java

Lines changed: 122 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.reactivex.rxjava4.exceptions.Exceptions;
2323
import io.reactivex.rxjava4.functions.Consumer;
2424
import io.reactivex.rxjava4.internal.operators.streamable.*;
25+
import io.reactivex.rxjava4.schedulers.Schedulers;
2526
import io.reactivex.rxjava4.subscribers.TestSubscriber;
2627

2728
/**
@@ -79,6 +80,7 @@ default Streamer<T> stream() {
7980
* @param item the constant item to produce
8081
* @return the {@code Streamable} instance
8182
*/
83+
@NonNull
8284
static <@NonNull T> Streamable<T> just(@NonNull T item) {
8385
Objects.requireNonNull(item, "item is null");
8486
return new StreamableJust<>(item);
@@ -90,10 +92,75 @@ default Streamer<T> stream() {
9092
* @param source Flow.Publisher to convert
9193
* @return the new Streamable instance
9294
*/
93-
static <T> Streamable<T> fromPublisher(Flow.Publisher<T> source) {
95+
@NonNull
96+
static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source) {
9497
Objects.requireNonNull(source, "source is null");
95-
return new StreamableFromPublisher<T>(source);
98+
return fromPublisher(source, Executors.newVirtualThreadPerTaskExecutor());
99+
}
100+
101+
/**
102+
* Convert any Flow.Publisher into a Streamable sequence.
103+
* @param <T> the element type
104+
* @param source Flow.Publisher to convert
105+
* @param executor where the conversion will run
106+
* @return the new Streamable instance
107+
*/
108+
@NonNull
109+
static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNull ExecutorService executor) {
110+
Objects.requireNonNull(source, "source is null");
111+
return new StreamableFromPublisher<T>(source, executor);
112+
}
113+
114+
/**
115+
* Generate a sequence of values via a virtual generator callback (yielder)
116+
* which is free to block and is natively backpressured.
117+
* <p>
118+
* Runs on the {@link Schedulers#virtual()} scheduler.
119+
* @param <T> the element type
120+
* @param generator the generator to use
121+
* @return the streamable instance
122+
*/
123+
@NonNull
124+
static <@NonNull T> Streamable<T> create(@NonNull VirtualGenerator<T> generator) {
125+
// FIXME native implementation
126+
return Flowable.virtualCreate(generator)
127+
.toStreamable();
128+
}
129+
130+
/**
131+
* Generate a sequence of values via a virtual generator callback (yielder)
132+
* which is free to block and is natively backpressured.
133+
* <p>
134+
* Runs on the given scheduler.
135+
* @param <T> the element type
136+
* @param generator the generator to use
137+
* @param scheduler the scheduler to run the virtual generator on
138+
* @return the streamable instance
139+
*/
140+
@NonNull
141+
static <@NonNull T> Streamable<T> create(@NonNull VirtualGenerator<T> generator, @NonNull Scheduler scheduler) {
142+
// FIXME native implementation
143+
return Flowable.virtualCreate(generator, scheduler)
144+
.toStreamable();
145+
}
146+
147+
/**
148+
* Generate a sequence of values via a virtual generator callback (yielder)
149+
* which is free to block and is natively backpressured.
150+
* <p>
151+
* Runs on the given executor service.
152+
* @param <T> the element type
153+
* @param generator the generator to use
154+
* @param executor the executor to run the virtual generator on
155+
* @return the streamable instance
156+
*/
157+
@NonNull
158+
static <@NonNull T> Streamable<T> create(@NonNull VirtualGenerator<T> generator, @NonNull ExecutorService executor) {
159+
// FIXME native implementation
160+
return Flowable.virtualCreate(generator, executor)
161+
.toStreamable();
96162
}
163+
97164
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
98165
// Operators
99166
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo
@@ -103,6 +170,7 @@ static <T> Streamable<T> fromPublisher(Flow.Publisher<T> source) {
103170
* on the default Executors.newVirtualThreadPerTaskExecutor() virtual thread.
104171
* @return the new Flowable instance
105172
*/
173+
@NonNull
106174
default Flowable<T> toFlowable() {
107175
return toFlowable(Executors.newVirtualThreadPerTaskExecutor());
108176
}
@@ -113,10 +181,44 @@ default Flowable<T> toFlowable() {
113181
* @param executor the executor to use
114182
* @return the new Flowable instance
115183
*/
116-
default Flowable<T> toFlowable(ExecutorService executor) {
184+
@NonNull
185+
default Flowable<T> toFlowable(@NonNull ExecutorService executor) {
186+
Objects.requireNonNull(executor, "executir is null");
117187
var me = this;
118188
return Flowable.virtualCreate(emitter -> {
119-
me.forEach(emitter::emit);
189+
me.forEach(emitter::emit).await(emitter.canceller());
190+
}, executor);
191+
}
192+
193+
/**
194+
* Transforms the upstream sequence into zero or more elements for the downstream.
195+
* @param <R> the result element type
196+
* @param transformer the interface to implement the transforming logic
197+
* @return the new Streamable instance
198+
*/
199+
@NonNull
200+
default <@NonNull R> Streamable<R> transform(@NonNull VirtualTransformer<T, R> transformer) {
201+
return transform(transformer, Executors.newVirtualThreadPerTaskExecutor());
202+
}
203+
204+
/**
205+
* Transforms the upstream sequence into zero or more elements for the downstream.
206+
* @param <R> the result element type
207+
* @param transformer the interface to implement the transforming logic
208+
* @param executor where to run the transform and blocking operations
209+
* @return the new Streamable instance
210+
*/
211+
@NonNull
212+
default <@NonNull R> Streamable<R> transform(@NonNull VirtualTransformer<T, R> transformer,
213+
@NonNull ExecutorService executor) {
214+
Objects.requireNonNull(transformer, "transformer is null");
215+
Objects.requireNonNull(executor, "executor is null");
216+
var me = this;
217+
return create(emitter -> {
218+
me.forEach(item -> {
219+
System.out.println("item " + item);
220+
transformer.transform(item, emitter);
221+
}, executor).await(emitter.canceller());
120222
}, executor);
121223
}
122224

@@ -174,6 +276,7 @@ default CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> con
174276
try (var str = me.stream(canceller)) {
175277
while (!canceller.isDisposed()) {
176278
if (str.awaitNext(canceller)) {
279+
System.out.println("Received " + str.current());
177280
consumer.accept(Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!"));
178281
} else {
179282
break;
@@ -203,9 +306,11 @@ default CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> con
203306
default void subscribe(@NonNull Flow.Subscriber<? super T> subscriber, @NonNull ExecutorService executor) {
204307
final Streamable<T> me = this;
205308
Flowable.<T>virtualCreate(emitter -> {
309+
// System.out.println("subscribe::virtualCreate");
206310
me.forEach(v -> {
311+
// System.out.println("subscribe::virtualCreate::forEach::emit");
207312
emitter.emit(v);
208-
});
313+
}).await();
209314
}, executor)
210315
.subscribe(subscriber);
211316
}
@@ -218,6 +323,7 @@ default void subscribe(@NonNull Flow.Subscriber<? super T> subscriber) {
218323
final Streamable<T> me = this;
219324
Flowable.<T>virtualCreate(emitter -> {
220325
me.forEach(v -> {
326+
// System.out.println("Emitting " + v);
221327
emitter.emit(v);
222328
});
223329
})
@@ -233,4 +339,15 @@ default TestSubscriber<T> test() {
233339
subscribe(ts);
234340
return ts;
235341
}
342+
343+
/**
344+
* Creates a new {@link TestSubscriber} and subscribes it to this {@code Streamable}.
345+
* @param executor the executor to use
346+
* @return the created test subscriber
347+
*/
348+
default TestSubscriber<T> test(ExecutorService executor) {
349+
var ts = new TestSubscriber<T>();
350+
subscribe(ts, executor);
351+
return ts;
352+
}
236353
}

0 commit comments

Comments
 (0)