Skip to content

Commit 3ff3dd8

Browse files
committed
Fix flaky tests, Fix a missing await() call. F java
1 parent 34217c7 commit 3ff3dd8

26 files changed

Lines changed: 592 additions & 302 deletions

src/main/java/io/reactivex/rxjava4/core/Flowable.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16024,7 +16024,7 @@ public final void subscribe(@NonNull FlowableSubscriber<? super T> subscriber) {
1602416024
// can't call onSubscribe because the call might have set a Subscription already
1602516025
RxJavaPlugins.onError(e);
1602616026

16027-
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
16027+
var npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
1602816028
npe.initCause(e);
1602916029
throw npe;
1603016030
}
@@ -20929,7 +20929,7 @@ public final Stream<T> blockingStream(int prefetch) {
2092920929
* {@code ExecutorService} uses virtual threads, such as the one returned by
2093020930
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
2093120931
* @param <R> the downstream element type
20932-
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
20932+
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)}
2093320933
* is invoked for each upstream item
2093420934
* @param executor the target {@code ExecutorService} to use for running the callback
2093520935
* @return the new {@code Flowable} instance
@@ -20962,7 +20962,7 @@ public final Stream<T> blockingStream(int prefetch) {
2096220962
* {@link Scheduler} uses virtual threads, such as the one returned by
2096320963
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
2096420964
* @param <R> the downstream element type
20965-
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
20965+
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)}
2096620966
* is invoked for each upstream item
2096720967
* @return the new {@code Flowable} instance
2096820968
* @throws NullPointerException if {@code transformer} is {@code null}
@@ -20993,7 +20993,7 @@ public final Stream<T> blockingStream(int prefetch) {
2099320993
* {@code Scheduler} uses virtual threads, such as the one returned by
2099420994
* {@link Schedulers#virtual()}.
2099520995
* @param <R> the downstream element type
20996-
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
20996+
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)}
2099720997
* is invoked for each upstream item
2099820998
* @param scheduler the target {@code Scheduler} to use for running the callback
2099920999
* @param prefetch the number of items to fetch from the upstream.
@@ -21031,7 +21031,7 @@ public final Stream<T> blockingStream(int prefetch) {
2103121031
* {@code ExecutorService} uses virtual threads, such as the one returned by
2103221032
* {@link Executors#newVirtualThreadPerTaskExecutor()}.
2103321033
* @param <R> the downstream element type
21034-
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter)}
21034+
* @param transformer the callback whose {@link VirtualTransformer#transform(Object, VirtualEmitter, Disposable)}
2103521035
* is invoked for each upstream item
2103621036
* @param executor the target {@code ExecutorService} to use for running the callback
2103721037
* @param prefetch the number of items to fetch from the upstream.

src/main/java/io/reactivex/rxjava4/core/Streamable.java

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
import java.util.Objects;
1818
import java.util.concurrent.*;
1919

20-
import io.reactivex.rxjava4.annotations.NonNull;
20+
import io.reactivex.rxjava4.annotations.*;
2121
import io.reactivex.rxjava4.disposables.*;
2222
import io.reactivex.rxjava4.exceptions.Exceptions;
23-
import io.reactivex.rxjava4.functions.Consumer;
23+
import io.reactivex.rxjava4.functions.*;
2424
import io.reactivex.rxjava4.internal.operators.streamable.*;
2525
import io.reactivex.rxjava4.schedulers.Schedulers;
2626
import io.reactivex.rxjava4.subscribers.TestSubscriber;
@@ -43,6 +43,7 @@ public interface Streamable<@NonNull T> {
4343
* @param cancellation where to register and listen for cancellation calls.
4444
* @return the Streamer instance to consume.
4545
*/
46+
@CheckReturnValue
4647
@NonNull
4748
Streamer<T> stream(@NonNull DisposableContainer cancellation);
4849

@@ -55,6 +56,7 @@ public interface Streamable<@NonNull T> {
5556
* Realizes the stream and returns an interface that let's one consume it.
5657
* @return the Streamer instance to consume.
5758
*/
59+
@CheckReturnValue
5860
@NonNull
5961
default Streamer<T> stream() {
6062
return stream(new CompositeDisposable()); // FIXME, use a practically no-op disposable container instead
@@ -69,6 +71,7 @@ default Streamer<T> stream() {
6971
* @param <T> the element type
7072
* @return the {@code Streamable} instance
7173
*/
74+
@CheckReturnValue
7275
@NonNull
7376
static <@NonNull T> Streamable<T> empty() {
7477
return new StreamableEmpty<>();
@@ -80,6 +83,7 @@ default Streamer<T> stream() {
8083
* @param item the constant item to produce
8184
* @return the {@code Streamable} instance
8285
*/
86+
@CheckReturnValue
8387
@NonNull
8488
static <@NonNull T> Streamable<T> just(@NonNull T item) {
8589
Objects.requireNonNull(item, "item is null");
@@ -92,6 +96,7 @@ default Streamer<T> stream() {
9296
* @param source Flow.Publisher to convert
9397
* @return the new Streamable instance
9498
*/
99+
@CheckReturnValue
95100
@NonNull
96101
static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source) {
97102
Objects.requireNonNull(source, "source is null");
@@ -105,6 +110,7 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source) {
105110
* @param executor where the conversion will run
106111
* @return the new Streamable instance
107112
*/
113+
@CheckReturnValue
108114
@NonNull
109115
static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNull ExecutorService executor) {
110116
Objects.requireNonNull(source, "source is null");
@@ -120,6 +126,7 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNu
120126
* @param generator the generator to use
121127
* @return the streamable instance
122128
*/
129+
@CheckReturnValue
123130
@NonNull
124131
static <@NonNull T> Streamable<T> create(@NonNull VirtualGenerator<T> generator) {
125132
// FIXME native implementation
@@ -137,6 +144,7 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNu
137144
* @param scheduler the scheduler to run the virtual generator on
138145
* @return the streamable instance
139146
*/
147+
@CheckReturnValue
140148
@NonNull
141149
static <@NonNull T> Streamable<T> create(@NonNull VirtualGenerator<T> generator, @NonNull Scheduler scheduler) {
142150
// FIXME native implementation
@@ -154,6 +162,7 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNu
154162
* @param executor the executor to run the virtual generator on
155163
* @return the streamable instance
156164
*/
165+
@CheckReturnValue
157166
@NonNull
158167
static <@NonNull T> Streamable<T> create(@NonNull VirtualGenerator<T> generator, @NonNull ExecutorService executor) {
159168
// FIXME native implementation
@@ -170,6 +179,7 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNu
170179
* on the default Executors.newVirtualThreadPerTaskExecutor() virtual thread.
171180
* @return the new Flowable instance
172181
*/
182+
@CheckReturnValue
173183
@NonNull
174184
default Flowable<T> toFlowable() {
175185
return toFlowable(Executors.newVirtualThreadPerTaskExecutor());
@@ -181,6 +191,7 @@ default Flowable<T> toFlowable() {
181191
* @param executor the executor to use
182192
* @return the new Flowable instance
183193
*/
194+
@CheckReturnValue
184195
@NonNull
185196
default Flowable<T> toFlowable(@NonNull ExecutorService executor) {
186197
Objects.requireNonNull(executor, "executir is null");
@@ -196,6 +207,7 @@ default Flowable<T> toFlowable(@NonNull ExecutorService executor) {
196207
* @param transformer the interface to implement the transforming logic
197208
* @return the new Streamable instance
198209
*/
210+
@CheckReturnValue
199211
@NonNull
200212
default <@NonNull R> Streamable<R> transform(@NonNull VirtualTransformer<T, R> transformer) {
201213
return transform(transformer, Executors.newVirtualThreadPerTaskExecutor());
@@ -208,17 +220,19 @@ default Flowable<T> toFlowable(@NonNull ExecutorService executor) {
208220
* @param executor where to run the transform and blocking operations
209221
* @return the new Streamable instance
210222
*/
223+
@CheckReturnValue
211224
@NonNull
212225
default <@NonNull R> Streamable<R> transform(@NonNull VirtualTransformer<T, R> transformer,
213226
@NonNull ExecutorService executor) {
214227
Objects.requireNonNull(transformer, "transformer is null");
215228
Objects.requireNonNull(executor, "executor is null");
216229
var me = this;
217230
return create(emitter -> {
218-
me.forEach(item -> {
219-
System.out.println("item " + item);
220-
transformer.transform(item, emitter);
221-
}, executor).await(emitter.canceller());
231+
me.forEach((item, stopper) -> {
232+
// System.out.println("item " + item);
233+
transformer.transform(item, emitter, stopper);
234+
}, emitter.canceller(), executor)
235+
.await(emitter.canceller());
222236
}, executor);
223237
}
224238

@@ -231,6 +245,7 @@ default Flowable<T> toFlowable(@NonNull ExecutorService executor) {
231245
* @param consumer the callback that gets the elements until completion
232246
* @return a Disposable that let's one cancel the sequence asynchronously.
233247
*/
248+
@CheckReturnValue
234249
@NonNull
235250
default CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> consumer) {
236251
CompositeDisposable canceller = new CompositeDisposable();
@@ -243,6 +258,8 @@ default CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> con
243258
* @param canceller the container to trigger cancellation of the sequence
244259
* @return the {@code CompletionStage} that gets notified when the sequence ends
245260
*/
261+
@CheckReturnValue
262+
@NonNull
246263
default CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> consumer, @NonNull DisposableContainer canceller) {
247264
return forEach(consumer, canceller, Executors.newVirtualThreadPerTaskExecutor());
248265
}
@@ -253,6 +270,7 @@ default CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> con
253270
* @param executor the service that hosts the blocking waits.
254271
* @return a Disposable that let's one cancel the sequence asynchronously.
255272
*/
273+
@CheckReturnValue
256274
@NonNull
257275
default CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> consumer, @NonNull ExecutorService executor) {
258276
CompositeDisposable canceller = new CompositeDisposable();
@@ -266,6 +284,8 @@ default CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> con
266284
* @param executor the service that hosts the blocking waits.
267285
* @return the {@code CompletionStage} that gets notified when the sequence ends
268286
*/
287+
@CheckReturnValue
288+
@NonNull
269289
@SuppressWarnings("unchecked")
270290
default CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> consumer, @NonNull DisposableContainer canceller, @NonNull ExecutorService executor) {
271291
Objects.requireNonNull(consumer, "consumer is null");
@@ -276,14 +296,18 @@ default CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> con
276296
try (var str = me.stream(canceller)) {
277297
while (!canceller.isDisposed()) {
278298
if (str.awaitNext(canceller)) {
279-
System.out.println("Received " + str.current());
299+
// System.out.println("Received " + str.current());
280300
consumer.accept(Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!"));
281301
} else {
302+
// System.out.println("EOF ");
282303
break;
283304
}
284305
}
306+
// System.out.println("Canceller status after loop: " + canceller.isDisposed());
285307
} catch (final Throwable crash) {
286308
Exceptions.throwIfFatal(crash);
309+
// System.out.println("Canceller status in error: " + canceller.isDisposed());
310+
crash.printStackTrace();
287311
if (crash instanceof RuntimeException ex) {
288312
throw ex;
289313
}
@@ -298,6 +322,57 @@ default CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> con
298322
return new CompletionStageDisposable<Void>(StreamableHelper.toCompletionStage((Future<Void>)(Future<?>)future), canceller);
299323
}
300324

325+
/**
326+
* Consumes elements from this {@code Streamable} via the provided executor service.
327+
* @param consumer the callback that gets the elements until completion
328+
* @param canceller the container to trigger cancellation of the sequence
329+
* @param executor the service that hosts the blocking waits.
330+
* @return the {@code CompletionStage} that gets notified when the sequence ends
331+
*/
332+
@CheckReturnValue
333+
@NonNull
334+
@SuppressWarnings("unchecked")
335+
default CompletionStageDisposable<Void> forEach(
336+
@NonNull BiConsumer<? super T, ? super Disposable> consumer,
337+
@NonNull DisposableContainer canceller,
338+
@NonNull ExecutorService executor) {
339+
Objects.requireNonNull(consumer, "consumer is null");
340+
Objects.requireNonNull(canceller, "canceller is null");
341+
Objects.requireNonNull(executor, "executor is null");
342+
final Streamable<T> me = this;
343+
var future = executor.submit(() -> {
344+
try (var str = me.stream(canceller)) {
345+
var stopper = Disposable.empty();
346+
while (!canceller.isDisposed() && !stopper.isDisposed()) {
347+
if (str.awaitNext(canceller)) {
348+
// System.out.println("Received " + str.current());
349+
var v = Objects.requireNonNull(str.current(), "The upstream Streamable " + me.getClass() + " produced a null element!");
350+
consumer.accept(v, stopper);
351+
} else {
352+
// System.out.println("EOF ");
353+
break;
354+
}
355+
}
356+
// System.out.println("Canceller status after loop: " + canceller.isDisposed());
357+
} catch (final Throwable crash) {
358+
Exceptions.throwIfFatal(crash);
359+
// System.out.println("Canceller status in error: " + canceller.isDisposed());
360+
// crash.printStackTrace();
361+
if (crash instanceof RuntimeException ex) {
362+
throw ex;
363+
}
364+
if (crash instanceof Exception ex) {
365+
throw ex;
366+
}
367+
throw new InvocationTargetException(crash);
368+
}
369+
return null;
370+
});
371+
canceller.add(Disposable.fromFuture(future));
372+
return new CompletionStageDisposable<Void>(
373+
StreamableHelper.toCompletionStage((Future<Void>)(Future<?>)future), canceller);
374+
}
375+
301376
/**
302377
* Consume this {@code Streamable} via the given flow-reactive-streams subscriber.
303378
* @param subscriber the subscriber to consume with.
@@ -310,7 +385,7 @@ default void subscribe(@NonNull Flow.Subscriber<? super T> subscriber, @NonNull
310385
me.forEach(v -> {
311386
// System.out.println("subscribe::virtualCreate::forEach::emit");
312387
emitter.emit(v);
313-
}).await();
388+
}).await(emitter.canceller());
314389
}, executor)
315390
.subscribe(subscriber);
316391
}
@@ -325,7 +400,7 @@ default void subscribe(@NonNull Flow.Subscriber<? super T> subscriber) {
325400
me.forEach(v -> {
326401
// System.out.println("Emitting " + v);
327402
emitter.emit(v);
328-
});
403+
}).await(emitter.canceller());
329404
})
330405
.subscribe(subscriber);
331406
}
@@ -334,6 +409,8 @@ default void subscribe(@NonNull Flow.Subscriber<? super T> subscriber) {
334409
* Creates a new {@link TestSubscriber} and subscribes it to this {@code Streamable}.
335410
* @return the created test subscriber
336411
*/
412+
@CheckReturnValue
413+
@NonNull
337414
default TestSubscriber<T> test() {
338415
var ts = new TestSubscriber<T>();
339416
subscribe(ts);
@@ -345,7 +422,9 @@ default TestSubscriber<T> test() {
345422
* @param executor the executor to use
346423
* @return the created test subscriber
347424
*/
348-
default TestSubscriber<T> test(ExecutorService executor) {
425+
@CheckReturnValue
426+
@NonNull
427+
default TestSubscriber<T> test(@NonNull ExecutorService executor) {
349428
var ts = new TestSubscriber<T>();
350429
subscribe(ts, executor);
351430
return ts;

src/main/java/io/reactivex/rxjava4/core/VirtualTransformer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
*/
1313

1414
package io.reactivex.rxjava4.core;
15+
16+
import io.reactivex.rxjava4.disposables.Disposable;
17+
1518
/**
1619
* Interface called by the {@link Flowable#virtualTransform(VirtualTransformer, java.util.concurrent.ExecutorService)}
1720
* operator to generate any number of output values based of the current input of the upstream.
@@ -29,7 +32,8 @@ public interface VirtualTransformer<T, R> {
2932
*
3033
* @param value the upstream value
3134
* @param emitter the emitter to use to generate result value(s)
35+
* @param stopper call to stop the upstream
3236
* @throws Throwable signaled as {@code onError} for the downstream.
3337
*/
34-
void transform(T value, VirtualEmitter<R> emitter) throws Throwable;
38+
void transform(T value, VirtualEmitter<R> emitter, Disposable stopper) throws Throwable;
3539
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.functions;
15+
16+
import io.reactivex.rxjava4.annotations.NonNull;
17+
18+
/**
19+
* A functional interface (callback) that accepts two values (of possibly different types).
20+
* @param <T1> the first value type
21+
* @param <T2> the second value type
22+
* @param <T3> the third value type
23+
* @since 4.0.0
24+
*/
25+
@FunctionalInterface
26+
public interface Consumer3<@NonNull T1, @NonNull T2, @NonNull T3> {
27+
28+
/**
29+
* Performs an operation on the given values.
30+
* @param t1 the first value
31+
* @param t2 the second value
32+
* @param t3 the third value
33+
* @throws Throwable if the implementation wishes to throw any type of exception
34+
*/
35+
void accept(T1 t1, T2 t2, T3 t3) throws Throwable;
36+
}

0 commit comments

Comments
 (0)