Skip to content

Commit aac2c93

Browse files
authored
4.x: Flowable API reduction: concat*, merge, concatMap*, flatMap* (#8195)
* 4.x: Flowable API reduction: concat*, merge, concatMap*, flatMap* * Observable to use MAX_VALUE concurrency in flatMap
1 parent 5ad2973 commit aac2c93

39 files changed

Lines changed: 826 additions & 1966 deletions

src/jmh/java/io/reactivex/rxjava4/core/FlowableFlatMapCompletableAsyncPerf.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.openjdk.jmh.annotations.*;
2020
import org.openjdk.jmh.infra.Blackhole;
2121

22-
import io.reactivex.rxjava4.core.config.StandardConcurrentBufferedConfig;
22+
import io.reactivex.rxjava4.core.config.*;
2323
import io.reactivex.rxjava4.functions.Action;
2424
import io.reactivex.rxjava4.internal.functions.Functions;
2525
import io.reactivex.rxjava4.schedulers.Schedulers;
@@ -57,7 +57,8 @@ public void setup() {
5757
Arrays.fill(array, 777);
5858

5959
flatMapCompletable = Flowable.fromArray(array)
60-
.flatMapCompletable(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation())), false, maxConcurrency);
60+
.flatMapCompletable(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation())),
61+
new StandardConcurrentConfig(false, maxConcurrency));
6162

6263
flatMap = Flowable.fromArray(array)
6364
.flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()),

src/jmh/java/io/reactivex/rxjava4/core/FlowableFlatMapCompletableSyncPerf.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.openjdk.jmh.annotations.*;
2020
import org.openjdk.jmh.infra.Blackhole;
2121

22-
import io.reactivex.rxjava4.core.config.StandardConcurrentBufferedConfig;
22+
import io.reactivex.rxjava4.core.config.*;
2323
import io.reactivex.rxjava4.internal.functions.Functions;
2424

2525
@SuppressWarnings("exports")
@@ -47,7 +47,7 @@ public void setup() {
4747
Arrays.fill(array, 777);
4848

4949
flatMapCompletable = Flowable.fromArray(array)
50-
.flatMapCompletable(Functions.justFunction(Completable.complete()), false, maxConcurrency);
50+
.flatMapCompletable(Functions.justFunction(Completable.complete()), new StandardConcurrentConfig(false, maxConcurrency));
5151

5252
flatMap = Flowable.fromArray(array)
5353
.flatMap(Functions.justFunction(Completable.complete().toFlowable()), new StandardConcurrentBufferedConfig(false, maxConcurrency));

src/main/java/io/reactivex/rxjava4/core/Completable.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,8 @@ public static Completable mergeArray(@NonNull StandardConcurrentConfig config, @
917917
}
918918
return RxJavaPlugins.onAssembly(new CompletableMergeArray(sources));
919919
}
920-
return Flowable.fromArray(sources).flatMapCompletable(Functions.identity(), config.delayErrors(), config.maxConcurrency());
920+
return Flowable.fromArray(sources)
921+
.flatMapCompletable(Functions.identity(), config);
921922
}
922923

923924
/**
@@ -949,7 +950,8 @@ public static Completable merge(@NonNull Iterable<@NonNull ? extends Completable
949950
}
950951
return RxJavaPlugins.onAssembly(new CompletableMergeIterable(sources));
951952
}
952-
return Flowable.fromIterable(sources).flatMapCompletable(Functions.identity(), config.delayErrors(), config.maxConcurrency());
953+
return Flowable.fromIterable(sources)
954+
.flatMapCompletable(Functions.identity(), config);
953955
}
954956

955957
/**
@@ -2614,7 +2616,7 @@ public final Completable startWith(@NonNull CompletableSource other) {
26142616
@BackpressureSupport(BackpressureKind.FULL)
26152617
public final <@NonNull T> Flowable<T> startWith(@NonNull SingleSource<T> other) {
26162618
Objects.requireNonNull(other, "other is null");
2617-
return Flowable.concat(Single.wrap(other).toFlowable(), toFlowable());
2619+
return Flowable.concatArray(Single.wrap(other).toFlowable(), toFlowable());
26182620
}
26192621

26202622
/**
@@ -2640,7 +2642,7 @@ public final Completable startWith(@NonNull CompletableSource other) {
26402642
@BackpressureSupport(BackpressureKind.FULL)
26412643
public final <@NonNull T> Flowable<T> startWith(@NonNull MaybeSource<T> other) {
26422644
Objects.requireNonNull(other, "other is null");
2643-
return Flowable.concat(Maybe.wrap(other).toFlowable(), toFlowable());
2645+
return Flowable.concatArray(Maybe.wrap(other).toFlowable(), toFlowable());
26442646
}
26452647

26462648
/**

src/main/java/io/reactivex/rxjava4/core/Flowable.java

Lines changed: 325 additions & 1315 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/rxjava4/core/Maybe.java

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -342,14 +342,13 @@ public abstract class Maybe<@NonNull T> implements MaybeSource<T> {
342342
* @return the new {@code Flowable} instance with the specified concatenation behavior
343343
* @throws NullPointerException if {@code sources} is {@code null}
344344
*/
345-
@SuppressWarnings({ "rawtypes", "unchecked" })
346345
@BackpressureSupport(BackpressureKind.FULL)
347346
@CheckReturnValue
348347
@SchedulerSupport(SchedulerSupport.NONE)
349348
@NonNull
350349
@SafeVarargs
351350
public static <@NonNull T> Flowable<T> concatArrayEager(@NonNull MaybeSource<? extends T>... sources) {
352-
return Flowable.fromArray(sources).concatMapEager((Function)MaybeToPublisher.instance());
351+
return concatArrayEager(StandardConcurrentBufferedConfig.DELAY_ERRORS_BOUNDARY, sources);
353352
}
354353
/**
355354
* Concatenates a sequence of {@link MaybeSource} eagerly into a {@link Flowable} sequence.
@@ -379,10 +378,7 @@ public abstract class Maybe<@NonNull T> implements MaybeSource<T> {
379378
@NonNull
380379
@SafeVarargs
381380
public static <@NonNull T> Flowable<T> concatArrayEager(@NonNull StandardConcurrentBufferedConfig config, @NonNull MaybeSource<? extends T>... sources) {
382-
if (config.delayErrors()) {
383-
return Flowable.fromArray(sources).concatMapEagerDelayError((Function)MaybeToPublisher.instance(), true, config.maxConcurrency(), config.bufferSize());
384-
}
385-
return Flowable.fromArray(sources).concatMapEager((Function)MaybeToPublisher.instance(), config.maxConcurrency(), config.bufferSize());
381+
return Flowable.fromArray(sources).concatMapEager((Function)MaybeToPublisher.instance(), config);
386382
}
387383

388384
/**
@@ -410,11 +406,7 @@ public abstract class Maybe<@NonNull T> implements MaybeSource<T> {
410406
@NonNull
411407
@SchedulerSupport(SchedulerSupport.NONE)
412408
public static <@NonNull T> Flowable<T> concat(@NonNull Iterable<@NonNull ? extends MaybeSource<? extends T>> sources, @NonNull StandardBufferedConfig config) {
413-
Objects.requireNonNull(config, "config is null");
414-
if (config.delayErrors()) {
415-
return Flowable.fromIterable(sources).concatMapMaybeDelayError(Functions.identity(), true, config.bufferSize());
416-
}
417-
return Flowable.fromIterable(sources).concatMapMaybe(Functions.identity(), config.bufferSize());
409+
return Flowable.fromIterable(sources).concatMapMaybe(Functions.identity(), config);
418410
}
419411

420412
/**
@@ -442,7 +434,7 @@ public abstract class Maybe<@NonNull T> implements MaybeSource<T> {
442434
@SchedulerSupport(SchedulerSupport.NONE)
443435
@NonNull
444436
public static <@NonNull T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends MaybeSource<? extends T>> sources) {
445-
return Flowable.fromIterable(sources).concatMapEagerDelayError((Function)MaybeToPublisher.instance(), false);
437+
return Flowable.fromIterable(sources).concatMapEager((Function)MaybeToPublisher.instance(), StandardConcurrentBufferedConfig.DELAY_ERRORS_BOUNDARY);
446438
}
447439

448440
/**
@@ -474,11 +466,7 @@ public abstract class Maybe<@NonNull T> implements MaybeSource<T> {
474466
@NonNull
475467
public static <@NonNull T> Flowable<T> concatEager(@NonNull Iterable<@NonNull ? extends MaybeSource<? extends T>> sources,
476468
@NonNull StandardConcurrentBufferedConfig config) {
477-
Objects.requireNonNull(config, "config is null");
478-
if (config.delayErrors()) {
479-
return Flowable.fromIterable(sources).concatMapEagerDelayError((Function)MaybeToPublisher.instance(), true, config.maxConcurrency(), config.bufferSize());
480-
}
481-
return Flowable.fromIterable(sources).concatMapEager((Function)MaybeToPublisher.instance(), config.maxConcurrency(), config.bufferSize());
469+
return Flowable.fromIterable(sources).concatMapEager((Function)MaybeToPublisher.instance(), config);
482470
}
483471

484472
/**
@@ -543,11 +531,7 @@ public abstract class Maybe<@NonNull T> implements MaybeSource<T> {
543531
@NonNull
544532
public static <@NonNull T> Flowable<T> concatEager(@NonNull Publisher<@NonNull ? extends MaybeSource<? extends T>> sources,
545533
@NonNull StandardConcurrentBufferedConfig config) {
546-
Objects.requireNonNull(config, "config is null");
547-
if (config.delayErrors()) {
548-
return Flowable.fromPublisher(sources).concatMapEagerDelayError((Function)MaybeToPublisher.instance(), true, config.maxConcurrency(), config.bufferSize());
549-
}
550-
return Flowable.fromPublisher(sources).concatMapEager((Function)MaybeToPublisher.instance(), config.maxConcurrency(), config.bufferSize());
534+
return Flowable.fromPublisher(sources).concatMapEager((Function)MaybeToPublisher.instance(), config);
551535
}
552536

553537
/**
@@ -1086,7 +1070,7 @@ public abstract class Maybe<@NonNull T> implements MaybeSource<T> {
10861070
@SchedulerSupport(SchedulerSupport.NONE)
10871071
@NonNull
10881072
public static <@NonNull T> Flowable<T> merge(@NonNull Iterable<@NonNull ? extends MaybeSource<? extends T>> sources) {
1089-
return Flowable.fromIterable(sources).flatMapMaybe(Functions.identity(), false, Integer.MAX_VALUE);
1073+
return Flowable.fromIterable(sources).flatMapMaybe(Functions.identity(), StandardConcurrentConfig.MAX_DEFAULT);
10901074
}
10911075

10921076
/**
@@ -1278,7 +1262,7 @@ public abstract class Maybe<@NonNull T> implements MaybeSource<T> {
12781262
public static <@NonNull T> Flowable<T> mergeArray(@NonNull StandardConcurrentConfig config, @NonNull MaybeSource<? extends T>... sources) {
12791263
Objects.requireNonNull(sources, "sources is null");
12801264
Objects.requireNonNull(config, "config is null");
1281-
return Flowable.fromArray(sources).flatMapMaybe(Functions.identity(), config.delayErrors(), config.maxConcurrency());
1265+
return Flowable.fromArray(sources).flatMapMaybe(Functions.identity(), config);
12821266
}
12831267

12841268
/**
@@ -1318,7 +1302,7 @@ public abstract class Maybe<@NonNull T> implements MaybeSource<T> {
13181302
@NonNull
13191303
public static <@NonNull T> Flowable<T> merge(@NonNull Iterable<@NonNull ? extends MaybeSource<? extends T>> sources, @NonNull StandardConcurrentConfig config) {
13201304
Objects.requireNonNull(config, "config is null");
1321-
return Flowable.fromIterable(sources).flatMapMaybe(Functions.identity(), config.delayErrors(), config.maxConcurrency());
1305+
return Flowable.fromIterable(sources).flatMapMaybe(Functions.identity(), config);
13221306
}
13231307

13241308
/**
@@ -4478,7 +4462,7 @@ public final void safeSubscribe(@NonNull MaybeObserver<? super T> observer) {
44784462
@BackpressureSupport(BackpressureKind.FULL)
44794463
public final Flowable<T> startWith(@NonNull CompletableSource other) {
44804464
Objects.requireNonNull(other, "other is null");
4481-
return Flowable.concat(Completable.wrap(other).<T>toFlowable(), toFlowable());
4465+
return Flowable.concatArray(Completable.wrap(other).<T>toFlowable(), toFlowable());
44824466
}
44834467

44844468
/**
@@ -4503,7 +4487,7 @@ public final Flowable<T> startWith(@NonNull CompletableSource other) {
45034487
@BackpressureSupport(BackpressureKind.FULL)
45044488
public final Flowable<T> startWith(@NonNull SingleSource<T> other) {
45054489
Objects.requireNonNull(other, "other is null");
4506-
return Flowable.concat(Single.wrap(other).toFlowable(), toFlowable());
4490+
return Flowable.concatArray(Single.wrap(other).toFlowable(), toFlowable());
45074491
}
45084492

45094493
/**
@@ -4528,7 +4512,7 @@ public final Flowable<T> startWith(@NonNull SingleSource<T> other) {
45284512
@BackpressureSupport(BackpressureKind.FULL)
45294513
public final Flowable<T> startWith(@NonNull MaybeSource<T> other) {
45304514
Objects.requireNonNull(other, "other is null");
4531-
return Flowable.concat(Maybe.wrap(other).toFlowable(), toFlowable());
4515+
return Flowable.concatArray(Maybe.wrap(other).toFlowable(), toFlowable());
45324516
}
45334517

45344518
/**

src/main/java/io/reactivex/rxjava4/core/Observable.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7312,7 +7312,7 @@ public final Single<T> firstOrError() {
73127312
@SchedulerSupport(SchedulerSupport.NONE)
73137313
@NonNull
73147314
public final <@NonNull R> Observable<R> flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
7315-
return flatMap(mapper, StandardConcurrentBufferedConfig.DEFAULT);
7315+
return flatMap(mapper, StandardConcurrentBufferedConfig.MAX_DEFAULT);
73167316
}
73177317

73187318
/**
@@ -7387,7 +7387,7 @@ public final Single<T> firstOrError() {
73877387
@NonNull Function<? super T, ? extends ObservableSource<? extends R>> onNextMapper,
73887388
@NonNull Function<? super Throwable, ? extends ObservableSource<? extends R>> onErrorMapper,
73897389
@NonNull Supplier<? extends ObservableSource<? extends R>> onCompleteSupplier) {
7390-
return flatMap(onNextMapper, onErrorMapper, onCompleteSupplier, StandardConcurrentBufferedConfig.DEFAULT);
7390+
return flatMap(onNextMapper, onErrorMapper, onCompleteSupplier, StandardConcurrentBufferedConfig.MAX_DEFAULT);
73917391
}
73927392

73937393
/**
@@ -7461,7 +7461,7 @@ public final Single<T> firstOrError() {
74617461
@NonNull
74627462
public final <@NonNull U, @NonNull R> Observable<R> flatMap(@NonNull Function<? super T, ? extends ObservableSource<? extends U>> mapper,
74637463
@NonNull BiFunction<? super T, ? super U, ? extends R> combiner) {
7464-
return flatMap(mapper, combiner, StandardConcurrentBufferedConfig.DEFAULT);
7464+
return flatMap(mapper, combiner, StandardConcurrentBufferedConfig.MAX_DEFAULT);
74657465
}
74667466

74677467
/**
@@ -7519,7 +7519,7 @@ public final Single<T> firstOrError() {
75197519
@SchedulerSupport(SchedulerSupport.NONE)
75207520
@NonNull
75217521
public final Completable flatMapCompletable(@NonNull Function<? super T, ? extends CompletableSource> mapper) {
7522-
return flatMapCompletable(mapper, StandardConcurrentBufferedConfig.DEFAULT);
7522+
return flatMapCompletable(mapper, StandardConcurrentBufferedConfig.MAX_DEFAULT);
75237523
}
75247524

75257525
/**
@@ -7628,7 +7628,7 @@ public final Completable flatMapCompletable(@NonNull Function<? super T, ? exten
76287628
@SchedulerSupport(SchedulerSupport.NONE)
76297629
@NonNull
76307630
public final <@NonNull R> Observable<R> flatMapMaybe(@NonNull Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
7631-
return flatMapMaybe(mapper, StandardConcurrentBufferedConfig.DEFAULT);
7631+
return flatMapMaybe(mapper, StandardConcurrentBufferedConfig.MAX_DEFAULT);
76327632
}
76337633

76347634
/**
@@ -7675,7 +7675,7 @@ public final Completable flatMapCompletable(@NonNull Function<? super T, ? exten
76757675
@SchedulerSupport(SchedulerSupport.NONE)
76767676
@NonNull
76777677
public final <@NonNull R> Observable<R> flatMapSingle(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
7678-
return flatMapSingle(mapper, StandardConcurrentBufferedConfig.DEFAULT);
7678+
return flatMapSingle(mapper, StandardConcurrentBufferedConfig.MAX_DEFAULT);
76797679
}
76807680

76817681
/**

0 commit comments

Comments
 (0)