Skip to content

Commit a87d019

Browse files
authored
4.x: Cleanup API reduction - Observable.switchMap (#8193)
1 parent d19355f commit a87d019

5 files changed

Lines changed: 70 additions & 143 deletions

File tree

src/main/java/io/reactivex/rxjava4/core/Observable.java

Lines changed: 29 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -11332,13 +11332,13 @@ public final Observable<T> switchIfEmpty(@NonNull ObservableSource<? extends T>
1133211332
* @return the new {@code Observable} instance
1133311333
* @throws NullPointerException if {@code mapper} is {@code null}
1133411334
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
11335-
* @see #switchMapDelayError(Function)
11335+
* @see #switchMap(Function, ObservableSwitchConfig)
1133611336
*/
1133711337
@CheckReturnValue
1133811338
@SchedulerSupport(SchedulerSupport.NONE)
1133911339
@NonNull
1134011340
public final <@NonNull R> Observable<R> switchMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
11341-
return switchMap(mapper, bufferSize());
11341+
return switchMap(mapper, ObservableSwitchConfig.DEFAULT);
1134211342
}
1134311343

1134411344
/**
@@ -11359,20 +11359,20 @@ public final Observable<T> switchIfEmpty(@NonNull ObservableSource<? extends T>
1135911359
* @param mapper
1136011360
* a function that, when applied to an item emitted by the current {@code Observable}, returns an
1136111361
* {@code ObservableSource}
11362-
* @param bufferSize
11363-
* the number of elements expected from the current active inner {@code ObservableSource} to be buffered
11362+
* @param config
11363+
* the configuration record for this operator
1136411364
* @return the new {@code Observable} instance
11365-
* @throws NullPointerException if {@code mapper} is {@code null}
11366-
* @throws IllegalArgumentException if {@code bufferSize} is non-positive
11365+
* @throws NullPointerException if {@code mapper} or {@code config} is {@code null}
1136711366
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
11368-
* @see #switchMapDelayError(Function, int)
11367+
* @since 4.0.0
1136911368
*/
1137011369
@CheckReturnValue
1137111370
@SchedulerSupport(SchedulerSupport.NONE)
1137211371
@NonNull
11373-
public final <@NonNull R> Observable<R> switchMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper, int bufferSize) {
11372+
public final <@NonNull R> Observable<R> switchMap(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper,
11373+
@NonNull ObservableSwitchConfig config) {
1137411374
Objects.requireNonNull(mapper, "mapper is null");
11375-
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
11375+
Objects.requireNonNull(config, "config is null");
1137611376
if (this instanceof ScalarSupplier) {
1137711377
@SuppressWarnings("unchecked")
1137811378
T v = ((ScalarSupplier<T>)this).get();
@@ -11381,7 +11381,7 @@ public final Observable<T> switchIfEmpty(@NonNull ObservableSource<? extends T>
1138111381
}
1138211382
return ObservableScalarXMap.scalarXMap(v, mapper);
1138311383
}
11384-
return RxJavaPlugins.onAssembly(new ObservableSwitchMap<>(this, mapper, bufferSize, false));
11384+
return RxJavaPlugins.onAssembly(new ObservableSwitchMap<>(this, mapper, config.bufferSize(), config.delayError()));
1138511385
}
1138611386

1138711387
/**
@@ -11401,7 +11401,7 @@ public final Observable<T> switchIfEmpty(@NonNull ObservableSource<? extends T>
1140111401
* <dt><b>Error handling:</b></dt>
1140211402
* <dd>If either the current {@code Observable} or the active {@code CompletableSource} signals an {@code onError},
1140311403
* the resulting {@code Completable} is terminated immediately with that {@link Throwable}.
11404-
* Use the {@link #switchMapCompletableDelayError(Function)} to delay such inner failures until
11404+
* Use the {@link #switchMapCompletable(Function, boolean)} to delay such inner failures until
1140511405
* every inner {@code CompletableSource}s and the main {@code Observable} terminates in some fashion.
1140611406
* If they fail concurrently, the operator may combine the {@code Throwable}s into a
1140711407
* {@link CompositeException}
@@ -11416,7 +11416,7 @@ public final Observable<T> switchIfEmpty(@NonNull ObservableSource<? extends T>
1141611416
* (non blockingly) for its terminal event
1141711417
* @return the new {@code Completable} instance
1141811418
* @throws NullPointerException if {@code mapper} is {@code null}
11419-
* @see #switchMapCompletableDelayError(Function)
11419+
* @see #switchMapCompletable(Function, boolean)
1142011420
* @since 2.2
1142111421
*/
1142211422
@CheckReturnValue
@@ -11441,7 +11441,7 @@ public final Completable switchMapCompletable(@NonNull Function<? super T, ? ext
1144111441
* {@code Observable}.
1144211442
* <dl>
1144311443
* <dt><b>Scheduler:</b></dt>
11444-
* <dd>{@code switchMapCompletableDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
11444+
* <dd>{@code switchMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
1144511445
* <dt><b>Error handling:</b></dt>
1144611446
* <dd>The errors of the current {@code Observable} and all the {@code CompletableSource}s, who had the chance
1144711447
* to run to their completion, are delayed until
@@ -11458,17 +11458,18 @@ public final Completable switchMapCompletable(@NonNull Function<? super T, ? ext
1145811458
* @param mapper the function called with each upstream item and should return a
1145911459
* {@code CompletableSource} to be subscribed to and awaited for
1146011460
* (non blockingly) for its terminal event
11461+
* @param delayError if {@code true}, errors from the outer and inner sources will be delayed
1146111462
* @return the new {@code Completable} instance
1146211463
* @throws NullPointerException if {@code mapper} is {@code null}
1146311464
* @see #switchMapCompletable(Function)
11464-
* @since 2.2
11465+
* @since 4.0.0
1146511466
*/
1146611467
@CheckReturnValue
1146711468
@SchedulerSupport(SchedulerSupport.NONE)
1146811469
@NonNull
11469-
public final Completable switchMapCompletableDelayError(@NonNull Function<? super T, ? extends CompletableSource> mapper) {
11470+
public final Completable switchMapCompletable(@NonNull Function<? super T, ? extends CompletableSource> mapper, boolean delayError) {
1147011471
Objects.requireNonNull(mapper, "mapper is null");
11471-
return RxJavaPlugins.onAssembly(new ObservableSwitchMapCompletable<>(this, mapper, true));
11472+
return RxJavaPlugins.onAssembly(new ObservableSwitchMapCompletable<>(this, mapper, delayError));
1147211473
}
1147311474

1147411475
/**
@@ -11498,7 +11499,7 @@ public final Completable switchMapCompletableDelayError(@NonNull Function<? supe
1149811499
* and get subscribed to.
1149911500
* @return the new {@code Observable} instance
1150011501
* @throws NullPointerException if {@code mapper} is {@code null}
11501-
* @see #switchMapMaybeDelayError(Function)
11502+
* @see #switchMapMaybe(Function, boolean)
1150211503
* @since 2.2
1150311504
*/
1150411505
@CheckReturnValue
@@ -11517,24 +11518,25 @@ public final Completable switchMapCompletableDelayError(@NonNull Function<? supe
1151711518
* <img width="640" height="469" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMapMaybeDelayError.o.png" alt="">
1151811519
* <dl>
1151911520
* <dt><b>Scheduler:</b></dt>
11520-
* <dd>{@code switchMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
11521+
* <dd>{@code switchMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
1152111522
* </dl>
1152211523
* <p>History: 2.1.11 - experimental
1152311524
* @param <R> the output value type
1152411525
* @param mapper the function called with the current upstream event and should
1152511526
* return a {@code MaybeSource} to replace the current active inner source
1152611527
* and get subscribed to.
11528+
* @param delayError if {@code true}, errors from the outer and inner sources will be delayed
1152711529
* @return the new {@code Observable} instance
1152811530
* @throws NullPointerException if {@code mapper} is {@code null}
1152911531
* @see #switchMapMaybe(Function)
11530-
* @since 2.2
11532+
* @since 4.0.0
1153111533
*/
1153211534
@CheckReturnValue
1153311535
@SchedulerSupport(SchedulerSupport.NONE)
1153411536
@NonNull
11535-
public final <@NonNull R> Observable<R> switchMapMaybeDelayError(@NonNull Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
11537+
public final <@NonNull R> Observable<R> switchMapMaybe(@NonNull Function<? super T, ? extends MaybeSource<? extends R>> mapper, boolean delayError) {
1153611538
Objects.requireNonNull(mapper, "mapper is null");
11537-
return RxJavaPlugins.onAssembly(new ObservableSwitchMapMaybe<>(this, mapper, true));
11539+
return RxJavaPlugins.onAssembly(new ObservableSwitchMapMaybe<>(this, mapper, delayError));
1153811540
}
1153911541

1154011542
/**
@@ -11558,7 +11560,7 @@ public final Completable switchMapCompletableDelayError(@NonNull Function<? supe
1155811560
* @return the new {@code Observable} instance
1155911561
* @throws NullPointerException if {@code mapper} is {@code null}
1156011562
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
11561-
* @see #switchMapSingleDelayError(Function)
11563+
* @see #switchMapSingle(Function, boolean)
1156211564
* @since 2.2
1156311565
*/
1156411566
@CheckReturnValue
@@ -11581,102 +11583,26 @@ public final Completable switchMapCompletableDelayError(@NonNull Function<? supe
1158111583
* <img width="640" height="467" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMapSingleDelayError.o.png" alt="">
1158211584
* <dl>
1158311585
* <dt><b>Scheduler:</b></dt>
11584-
* <dd>{@code switchMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
11586+
* <dd>{@code switchMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
1158511587
* </dl>
1158611588
* <p>History: 2.0.8 - experimental
1158711589
* @param <R> the element type of the inner {@code SingleSource}s and the output
1158811590
* @param mapper
1158911591
* a function that, when applied to an item emitted by the current {@code Observable}, returns a
1159011592
* {@code SingleSource}
11593+
* @param delayError if {@code true}, errors from the outer and inner sources will be delayed
1159111594
* @return the new {@code Observable} instance
1159211595
* @throws NullPointerException if {@code mapper} is {@code null}
1159311596
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
1159411597
* @see #switchMapSingle(Function)
11595-
* @since 2.2
11596-
*/
11597-
@CheckReturnValue
11598-
@SchedulerSupport(SchedulerSupport.NONE)
11599-
@NonNull
11600-
public final <@NonNull R> Observable<R> switchMapSingleDelayError(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
11601-
Objects.requireNonNull(mapper, "mapper is null");
11602-
return RxJavaPlugins.onAssembly(new ObservableSwitchMapSingle<>(this, mapper, true));
11603-
}
11604-
11605-
/**
11606-
* Returns a new {@code Observable} by applying a function that you supply to each item emitted by the current
11607-
* {@code Observable} that returns an {@link ObservableSource}, and then emitting the items emitted by the most recently emitted
11608-
* of these {@code ObservableSource}s and delays any error until all {@code ObservableSource}s terminate.
11609-
* <p>
11610-
* The resulting {@code Observable} completes if both the current {@code Observable} and the last inner {@code ObservableSource}, if any, complete.
11611-
* If the current {@code Observable} signals an {@code onError}, the termination of the last inner {@code ObservableSource} will emit that error as is
11612-
* or wrapped into a {@link CompositeException} along with the other possible errors the former inner {@code ObservableSource}s signaled.
11613-
* <p>
11614-
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.v3.png" alt="">
11615-
* <dl>
11616-
* <dt><b>Scheduler:</b></dt>
11617-
* <dd>{@code switchMapDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
11618-
* </dl>
11619-
*
11620-
* @param <R> the element type of the inner {@code ObservableSource}s and the output
11621-
* @param mapper
11622-
* a function that, when applied to an item emitted by the current {@code Observable}, returns an
11623-
* {@code ObservableSource}
11624-
* @return the new {@code Observable} instance
11625-
* @throws NullPointerException if {@code mapper} is {@code null}
11626-
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
11627-
* @see #switchMap(Function)
11628-
* @since 2.0
11629-
*/
11630-
@CheckReturnValue
11631-
@SchedulerSupport(SchedulerSupport.NONE)
11632-
@NonNull
11633-
public final <@NonNull R> Observable<R> switchMapDelayError(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
11634-
return switchMapDelayError(mapper, bufferSize());
11635-
}
11636-
11637-
/**
11638-
* Returns a new {@code Observable} by applying a function that you supply to each item emitted by the current
11639-
* {@code Observable} that returns an {@link ObservableSource}, and then emitting the items emitted by the most recently emitted
11640-
* of these {@code ObservableSource}s and delays any error until all {@code ObservableSource}s terminate.
11641-
* <p>
11642-
* The resulting {@code Observable} completes if both the current {@code Observable} and the last inner {@code ObservableSource}, if any, complete.
11643-
* If the current {@code Observable} signals an {@code onError}, the termination of the last inner {@code ObservableSource} will emit that error as is
11644-
* or wrapped into a {@link CompositeException} along with the other possible errors the former inner {@code ObservableSource}s signaled.
11645-
* <p>
11646-
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.v3.png" alt="">
11647-
* <dl>
11648-
* <dt><b>Scheduler:</b></dt>
11649-
* <dd>{@code switchMapDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
11650-
* </dl>
11651-
*
11652-
* @param <R> the element type of the inner {@code ObservableSource}s and the output
11653-
* @param mapper
11654-
* a function that, when applied to an item emitted by the current {@code Observable}, returns an
11655-
* {@code ObservableSource}
11656-
* @param bufferSize
11657-
* the number of elements expected from the current active inner {@code ObservableSource} to be buffered
11658-
* @return the new {@code Observable} instance
11659-
* @throws NullPointerException if {@code mapper} is {@code null}
11660-
* @throws IllegalArgumentException if {@code bufferSize} is non-positive
11661-
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
11662-
* @see #switchMap(Function, int)
11663-
* @since 2.0
11598+
* @since 4.0.0
1166411599
*/
1166511600
@CheckReturnValue
1166611601
@SchedulerSupport(SchedulerSupport.NONE)
1166711602
@NonNull
11668-
public final <@NonNull R> Observable<R> switchMapDelayError(@NonNull Function<? super T, ? extends ObservableSource<? extends R>> mapper, int bufferSize) {
11603+
public final <@NonNull R> Observable<R> switchMapSingle(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper, boolean delayError) {
1166911604
Objects.requireNonNull(mapper, "mapper is null");
11670-
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
11671-
if (this instanceof ScalarSupplier) {
11672-
@SuppressWarnings("unchecked")
11673-
T v = ((ScalarSupplier<T>)this).get();
11674-
if (v == null) {
11675-
return empty();
11676-
}
11677-
return ObservableScalarXMap.scalarXMap(v, mapper);
11678-
}
11679-
return RxJavaPlugins.onAssembly(new ObservableSwitchMap<>(this, mapper, bufferSize, true));
11605+
return RxJavaPlugins.onAssembly(new ObservableSwitchMapSingle<>(this, mapper, delayError));
1168011606
}
1168111607

1168211608
/**

src/test/java/io/reactivex/rxjava4/internal/operators/mixed/ObservableSwitchMapCompletableTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public void innerErrorDelayed() {
269269
final PublishSubject<Integer> ps = PublishSubject.create();
270270
final CompletableSubject cs = CompletableSubject.create();
271271

272-
TestObserver<Void> to = ps.switchMapCompletableDelayError(Functions.justFunction(cs)).test();
272+
TestObserver<Void> to = ps.switchMapCompletable(Functions.justFunction(cs), true).test();
273273

274274
ps.onNext(1);
275275

@@ -289,7 +289,7 @@ public void mainCompletesinnerErrorDelayed() {
289289
final PublishSubject<Integer> ps = PublishSubject.create();
290290
final CompletableSubject cs = CompletableSubject.create();
291291

292-
TestObserver<Void> to = ps.switchMapCompletableDelayError(Functions.justFunction(cs)).test();
292+
TestObserver<Void> to = ps.switchMapCompletable(Functions.justFunction(cs), true).test();
293293

294294
ps.onNext(1);
295295
ps.onComplete();
@@ -306,7 +306,7 @@ public void mainErrorDelayed() {
306306
final PublishSubject<Integer> ps = PublishSubject.create();
307307
final CompletableSubject cs = CompletableSubject.create();
308308

309-
TestObserver<Void> to = ps.switchMapCompletableDelayError(Functions.justFunction(cs)).test();
309+
TestObserver<Void> to = ps.switchMapCompletable(Functions.justFunction(cs), true).test();
310310

311311
ps.onNext(1);
312312

@@ -370,6 +370,6 @@ public void undeliverableUponCancel() {
370370
@Test
371371
public void undeliverableUponCancelDelayError() {
372372
TestHelper.checkUndeliverableUponCancel((ObservableConverter<Integer, Completable>) upstream ->
373-
upstream.switchMapCompletableDelayError((Function<Integer, Completable>) _ -> Completable.complete().hide()));
373+
upstream.switchMapCompletable((Function<Integer, Completable>) _ -> Completable.complete().hide(), true));
374374
}
375375
}

0 commit comments

Comments
 (0)