Skip to content

Commit c416dc8

Browse files
committed
Create the "await" keyword, as static helper method.
10 foot pile of shit, 11 foot high ladder, Brian.
1 parent 5072e1e commit c416dc8

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed

src/main/java/io/reactivex/rxjava4/core/Streamable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import io.reactivex.rxjava4.functions.Consumer;
2424
import io.reactivex.rxjava4.internal.operators.streamable.*;
2525

26+
import static io.reactivex.rxjava4.core.Streamer.await;
27+
2628
/**
2729
* The {@code IAsyncEnumerable} of the Java world.
2830
* Runs best with Virtual Threads.
@@ -119,7 +121,7 @@ public final CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T
119121
var future = executor.submit(() -> {
120122
try (var str = me.stream(canceller)) {
121123
while (!canceller.isDisposed()) {
122-
if (str.next().toCompletableFuture().join()) {
124+
if (await(str.next(canceller), canceller)) {
123125
consumer.accept(Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!"));
124126
} else {
125127
break;

src/main/java/io/reactivex/rxjava4/core/Streamer.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import java.util.NoSuchElementException;
1717
import java.util.concurrent.CompletionStage;
1818

19-
import io.reactivex.rxjava4.annotations.NonNull;
19+
import io.reactivex.rxjava4.annotations.*;
2020
import io.reactivex.rxjava4.disposables.*;
2121

2222
/**
@@ -69,4 +69,34 @@ default CompletionStage<Boolean> next() {
6969
default void close() {
7070
cancel().toCompletableFuture().join();
7171
}
72+
73+
/**
74+
* The {@code await} keyword for async/await.
75+
* @param <T> the type of the returned value if any.
76+
* @param stage the stage to await virtual-blockingly
77+
* @return the awaited value
78+
*/
79+
@Nullable
80+
static <T> T await(@NonNull CompletionStage<T> stage) {
81+
return await(stage, null);
82+
}
83+
84+
/**
85+
* The cancellable {@code await} keyword for async/await.
86+
* @param <T> the type of the returned value if any.
87+
* @param stage the stage to await virtual-blockingly
88+
* @param cancellation the container that can trigger a cancellation on demand
89+
* @return the awaited value
90+
*/
91+
@Nullable
92+
static <T> T await(@NonNull CompletionStage<T> stage, @Nullable DisposableContainer cancellation) {
93+
var f = stage.toCompletableFuture();
94+
if (cancellation == null) {
95+
return f.join();
96+
}
97+
var d = Disposable.fromFuture(f, true);
98+
try (var _ = cancellation.subscribe(d)) {
99+
return f.join();
100+
}
101+
}
72102
}

0 commit comments

Comments
 (0)