Skip to content

Commit c38638e

Browse files
authored
4.x: Flowable API reduction: remaining operators & other related changes (#8200)
* 4.x: Flowable API reduction: remaining operators & other related changes * Small cleanup and coverage
1 parent aac2c93 commit c38638e

67 files changed

Lines changed: 1567 additions & 3231 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

gradle/wrapper/gradle-wrapper.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-9.6.0-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-9.6.1-bin.zip
44
networkTimeout=10000
55
retries=0
66
retryBackOffMs=500

src/jmh/java/io/reactivex/rxjava4/core/OperatorMergePerf.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public void mergeNAsyncStreamsOfN(final InputThousand input) throws InterruptedE
8383
public void mergeTwoAsyncStreamsOfN(final InputThousand input) throws InterruptedException {
8484
PerfSubscriber o = input.newLatchedObserver();
8585
Flowable<Integer> ob = Flowable.range(0, input.size).subscribeOn(Schedulers.computation());
86-
Flowable.merge(ob, ob).subscribe(o);
86+
Flowable.mergeArray(ob, ob).subscribe(o);
8787
if (input.size == 1) {
8888
while (o.latch.getCount() != 0) { }
8989
} else {

src/main/java/io/reactivex/rxjava4/core/Completable.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ public static Completable concatArray(@NonNull StandardBufferedConfig config, @N
220220
return wrap(sources[0]);
221221
}
222222
if (config.delayErrors()) {
223-
return Flowable.fromArray(sources).concatMapCompletableDelayError(Functions.identity(), true, config.bufferSize());
223+
return Flowable.fromArray(sources)
224+
.concatMapCompletable(Functions.identity(), config);
224225
}
225226
return RxJavaPlugins.onAssembly(new CompletableConcatArray(sources));
226227

@@ -283,7 +284,6 @@ public static Completable concat(@NonNull Publisher<@NonNull ? extends Completab
283284
* @param config the operatar configuration record
284285
* @return the new {@code Completable} instance
285286
* @throws NullPointerException if {@code sources} or {@code config} is {@code null}
286-
* @throws IllegalArgumentException if {@code prefetch} is non-positive
287287
* @since 4.0.0
288288
*/
289289
@CheckReturnValue
@@ -295,7 +295,8 @@ public static Completable concat(@NonNull Publisher<@NonNull ? extends Completab
295295
Objects.requireNonNull(sources, "sources is null");
296296
Objects.requireNonNull(config, "config is null");
297297
if (config.delayErrors()) {
298-
return Flowable.fromPublisher(sources).concatMapCompletableDelayError(Functions.identity(), true, config.bufferSize());
298+
return Flowable.fromPublisher(sources)
299+
.concatMapCompletable(Functions.identity(), config);
299300
}
300301
return RxJavaPlugins.onAssembly(new CompletableConcat(sources, config.bufferSize()));
301302
}
@@ -321,7 +322,8 @@ public static Completable concat(@NonNull Iterable<@NonNull ? extends Completabl
321322
Objects.requireNonNull(sources, "sources is null");
322323
Objects.requireNonNull(config, "config is null");
323324
if (config.delayErrors()) {
324-
return Flowable.fromIterable(sources).concatMapCompletableDelayError(Functions.identity(), true, config.bufferSize());
325+
return Flowable.fromIterable(sources)
326+
.concatMapCompletable(Functions.identity(), config);
325327
}
326328
return RxJavaPlugins.onAssembly(new CompletableConcatIterable(sources));
327329
}
@@ -870,7 +872,6 @@ public static Completable merge(@NonNull Publisher<@NonNull ? extends Completabl
870872
* @param config the configuration record for this operator
871873
* @return the new {@code Completable} instance
872874
* @throws NullPointerException if {@code sources} or {@code config} is {@code null}
873-
* @throws IllegalArgumentException if {@code maxConcurrency} is less than 1
874875
* @since 4.0.0
875876
*/
876877
@CheckReturnValue
@@ -898,6 +899,7 @@ public static Completable merge(@NonNull Publisher<@NonNull ? extends Completabl
898899
* @param config the configuration record for this operator
899900
* @return the new {@code Completable} instance
900901
* @throws NullPointerException if {@code sources} or {@code config} is {@code null}
902+
* @since 4.0.0
901903
*/
902904
@CheckReturnValue
903905
@NonNull

src/main/java/io/reactivex/rxjava4/core/Flowable.java

Lines changed: 337 additions & 2293 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/rxjava4/core/Maybe.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,6 @@ public abstract class Maybe<@NonNull T> implements MaybeSource<T> {
234234
* @param sources the {@code Publisher} of {@code MaybeSource} instances
235235
* @param config the configuration record for this operator
236236
* @throws NullPointerException if {@code sources} or {@code config} is {@code null}
237-
* @throws IllegalArgumentException if {@code prefetch} is non-positive
238237
* @return the new {@code Flowable} instance
239238
* @since 4.0.0
240239
*/
@@ -521,7 +520,6 @@ public abstract class Maybe<@NonNull T> implements MaybeSource<T> {
521520
* @param config the configuration record for this operator
522521
* @return the new {@code Flowable} instance with the specified concatenation behavior
523522
* @throws NullPointerException if {@code sources} or {@code config} is {@code null}
524-
* @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
525523
* @since 4.0.0
526524
*/
527525
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -1135,7 +1133,6 @@ public abstract class Maybe<@NonNull T> implements MaybeSource<T> {
11351133
* @param config the configuration record for this operator
11361134
* @return the new {@code Flowable} instance
11371135
* @throws NullPointerException if {@code sources} or {@code config} is {@code null}
1138-
* @throws IllegalArgumentException if {@code maxConcurrency} is non-positive
11391136
* @since 4.0.0
11401137
*/
11411138
@BackpressureSupport(BackpressureKind.FULL)
@@ -2686,32 +2683,6 @@ public final Maybe<T> delay(long time, @NonNull TimeUnit unit) {
26862683
return delay(time, unit, Schedulers.computation(), false);
26872684
}
26882685

2689-
/**
2690-
* Returns a {@code Maybe} that signals the events emitted by the current {@code Maybe} shifted forward in time by a
2691-
* specified delay.
2692-
* <p>
2693-
* <img width="640" height="340" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.delay.tb.png" alt="">
2694-
* <dl>
2695-
* <dt><b>Scheduler:</b></dt>
2696-
* <dd>This version of {@code delay} operates by default on the {@code computation} {@link Scheduler}.</dd>
2697-
* </dl>
2698-
*
2699-
* @param time the delay to shift the source by
2700-
* @param unit the {@link TimeUnit} in which {@code time} is defined
2701-
* @param delayError if {@code true}, both success and error signals are delayed. if {@code false}, only success signals are delayed.
2702-
* @return the new {@code Maybe} instance
2703-
* @throws NullPointerException if {@code unit} is {@code null}
2704-
* @see <a href="http://reactivex.io/documentation/operators/delay.html">ReactiveX operators documentation: Delay</a>
2705-
* @see #delay(long, TimeUnit, Scheduler, boolean)
2706-
* @since 3.0.0
2707-
*/
2708-
@CheckReturnValue
2709-
@SchedulerSupport(SchedulerSupport.COMPUTATION)
2710-
@NonNull
2711-
public final Maybe<T> delay(long time, @NonNull TimeUnit unit, boolean delayError) {
2712-
return delay(time, unit, Schedulers.computation(), delayError);
2713-
}
2714-
27152686
/**
27162687
* Returns a {@code Maybe} that signals the events emitted by the current {@code Maybe} shifted forward in time by a
27172688
* specified delay.

0 commit comments

Comments
 (0)