Skip to content

Commit 5ad2973

Browse files
authored
4.x: Standardize config records into 3 main variants (#8194)
* 4.x: Standardize config records into 3 main variants * Fix coverage
1 parent a87d019 commit 5ad2973

99 files changed

Lines changed: 1253 additions & 2556 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/jmh/java/io/reactivex/rxjava4/core/FlowableFlatMapCompletableAsyncPerf.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.openjdk.jmh.annotations.*;
2020
import org.openjdk.jmh.infra.Blackhole;
2121

22-
import io.reactivex.rxjava4.core.config.FlatMapConfig;
22+
import io.reactivex.rxjava4.core.config.StandardConcurrentBufferedConfig;
2323
import io.reactivex.rxjava4.functions.Action;
2424
import io.reactivex.rxjava4.internal.functions.Functions;
2525
import io.reactivex.rxjava4.schedulers.Schedulers;
@@ -60,7 +60,8 @@ public void setup() {
6060
.flatMapCompletable(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation())), false, maxConcurrency);
6161

6262
flatMap = Flowable.fromArray(array)
63-
.flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()), new FlatMapConfig(false, maxConcurrency));
63+
.flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()),
64+
new StandardConcurrentBufferedConfig(false, maxConcurrency));
6465
}
6566

6667
// @Benchmark

src/jmh/java/io/reactivex/rxjava4/core/FlowableFlatMapCompletableSyncPerf.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.openjdk.jmh.annotations.*;
2020
import org.openjdk.jmh.infra.Blackhole;
2121

22-
import io.reactivex.rxjava4.core.config.FlatMapConfig;
22+
import io.reactivex.rxjava4.core.config.StandardConcurrentBufferedConfig;
2323
import io.reactivex.rxjava4.internal.functions.Functions;
2424

2525
@SuppressWarnings("exports")
@@ -50,7 +50,7 @@ public void setup() {
5050
.flatMapCompletable(Functions.justFunction(Completable.complete()), false, maxConcurrency);
5151

5252
flatMap = Flowable.fromArray(array)
53-
.flatMap(Functions.justFunction(Completable.complete().toFlowable()), new FlatMapConfig(false, maxConcurrency));
53+
.flatMap(Functions.justFunction(Completable.complete().toFlowable()), new StandardConcurrentBufferedConfig(false, maxConcurrency));
5454
}
5555

5656
@Benchmark

src/jmh/java/io/reactivex/rxjava4/parallel/ParallelPerf.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
package io.reactivex.rxjava4.parallel;
1515

1616
import java.util.Arrays;
17+
import java.util.concurrent.Flow.Publisher;
1718
import java.util.concurrent.TimeUnit;
1819

1920
import org.openjdk.jmh.annotations.*;
2021
import org.openjdk.jmh.infra.Blackhole;
21-
import static java.util.concurrent.Flow.*;
2222

2323
import io.reactivex.rxjava4.core.*;
24-
import io.reactivex.rxjava4.core.config.FlatMapConfig;
24+
import io.reactivex.rxjava4.core.config.StandardConcurrentBufferedConfig;
2525
import io.reactivex.rxjava4.flowables.GroupedFlowable;
2626
import io.reactivex.rxjava4.functions.Function;
2727
import io.reactivex.rxjava4.schedulers.Schedulers;
@@ -67,7 +67,7 @@ public void setup() {
6767
Flowable<Integer> source = Flowable.fromArray(ints);
6868

6969
flatMap = source.flatMap((Function<Integer, Publisher<Integer>>) v -> Flowable.just(v).subscribeOn(Schedulers.computation())
70-
.map(ParallelPerf.this), new FlatMapConfig(cpu));
70+
.map(ParallelPerf.this), new StandardConcurrentBufferedConfig(cpu));
7171

7272
groupBy = source.groupBy(new Function<Integer, Integer>() {
7373
int i;

src/main/java/io/reactivex/rxjava4/core/Completable.java

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ public static Completable complete() {
190190
@NonNull
191191
@SchedulerSupport(SchedulerSupport.NONE)
192192
public static Completable concatArray(@NonNull CompletableSource... sources) {
193-
return concatArray(CompletableConcatConfig.DEFAULT, sources);
193+
return concatArray(StandardBufferedConfig.DEFAULT, sources);
194194
}
195195

196196
/**
@@ -199,7 +199,7 @@ public static Completable concatArray(@NonNull CompletableSource... sources) {
199199
* <img width="640" height="324" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.concatArrayDelayError.png" alt="">
200200
* <dl>
201201
* <dt><b>Scheduler:</b></dt>
202-
* <dd>{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
202+
* <dd>{@code concatArray} does not operate by default on a particular {@link Scheduler}.</dd>
203203
* </dl>
204204
* @param sources the sources to concatenate
205205
* @param config the configuration record for this operator
@@ -210,7 +210,7 @@ public static Completable concatArray(@NonNull CompletableSource... sources) {
210210
@CheckReturnValue
211211
@NonNull
212212
@SchedulerSupport(SchedulerSupport.NONE)
213-
public static Completable concatArray(@NonNull CompletableConcatConfig config, @NonNull CompletableSource... sources) {
213+
public static Completable concatArray(@NonNull StandardBufferedConfig config, @NonNull CompletableSource... sources) {
214214
Objects.requireNonNull(sources, "sources is null");
215215
Objects.requireNonNull(config, "config is null");
216216
if (sources.length == 0) {
@@ -219,8 +219,8 @@ public static Completable concatArray(@NonNull CompletableConcatConfig config, @
219219
if (sources.length == 1) {
220220
return wrap(sources[0]);
221221
}
222-
if (config.delayError()) {
223-
return Flowable.fromArray(sources).concatMapCompletableDelayError(Functions.identity(), true, config.prefetch());
222+
if (config.delayErrors()) {
223+
return Flowable.fromArray(sources).concatMapCompletableDelayError(Functions.identity(), true, config.bufferSize());
224224
}
225225
return RxJavaPlugins.onAssembly(new CompletableConcatArray(sources));
226226

@@ -242,7 +242,7 @@ public static Completable concatArray(@NonNull CompletableConcatConfig config, @
242242
@NonNull
243243
@SchedulerSupport(SchedulerSupport.NONE)
244244
public static Completable concat(@NonNull Iterable<@NonNull ? extends CompletableSource> sources) {
245-
return concat(sources, CompletableConcatConfig.DEFAULT);
245+
return concat(sources, StandardBufferedConfig.DEFAULT);
246246
}
247247

248248
/**
@@ -265,7 +265,7 @@ public static Completable concat(@NonNull Iterable<@NonNull ? extends Completabl
265265
@BackpressureSupport(BackpressureKind.FULL)
266266
@NonNull
267267
public static Completable concat(@NonNull Publisher<@NonNull ? extends CompletableSource> sources) {
268-
return concat(sources, CompletableConcatConfig.DEFAULT);
268+
return concat(sources, StandardBufferedConfig.DEFAULT);
269269
}
270270

271271
/**
@@ -291,13 +291,13 @@ public static Completable concat(@NonNull Publisher<@NonNull ? extends Completab
291291
@SchedulerSupport(SchedulerSupport.NONE)
292292
@BackpressureSupport(BackpressureKind.FULL)
293293
public static Completable concat(@NonNull Publisher<@NonNull ? extends CompletableSource> sources,
294-
@NonNull CompletableConcatConfig config) {
294+
@NonNull StandardBufferedConfig config) {
295295
Objects.requireNonNull(sources, "sources is null");
296296
Objects.requireNonNull(config, "config is null");
297-
if (config.delayError()) {
298-
return Flowable.fromPublisher(sources).concatMapCompletableDelayError(Functions.identity(), true, config.prefetch());
297+
if (config.delayErrors()) {
298+
return Flowable.fromPublisher(sources).concatMapCompletableDelayError(Functions.identity(), true, config.bufferSize());
299299
}
300-
return RxJavaPlugins.onAssembly(new CompletableConcat(sources, config.prefetch()));
300+
return RxJavaPlugins.onAssembly(new CompletableConcat(sources, config.bufferSize()));
301301
}
302302

303303
/**
@@ -317,11 +317,11 @@ public static Completable concat(@NonNull Publisher<@NonNull ? extends Completab
317317
@CheckReturnValue
318318
@NonNull
319319
@SchedulerSupport(SchedulerSupport.NONE)
320-
public static Completable concat(@NonNull Iterable<@NonNull ? extends CompletableSource> sources, @NonNull CompletableConcatConfig config) {
320+
public static Completable concat(@NonNull Iterable<@NonNull ? extends CompletableSource> sources, @NonNull StandardBufferedConfig config) {
321321
Objects.requireNonNull(sources, "sources is null");
322322
Objects.requireNonNull(config, "config is null");
323-
if (config.delayError()) {
324-
return Flowable.fromIterable(sources).concatMapCompletableDelayError(Functions.identity(), true, config.prefetch());
323+
if (config.delayErrors()) {
324+
return Flowable.fromIterable(sources).concatMapCompletableDelayError(Functions.identity(), true, config.bufferSize());
325325
}
326326
return RxJavaPlugins.onAssembly(new CompletableConcatIterable(sources));
327327
}
@@ -396,7 +396,7 @@ public static Completable create(@NonNull CompletableOnSubscribe source) {
396396
public static Single<Boolean> sequenceEqual(@NonNull CompletableSource source1, @NonNull CompletableSource source2) { // NOPMD
397397
Objects.requireNonNull(source1, "source1 is null");
398398
Objects.requireNonNull(source2, "source2 is null");
399-
return mergeArray(CompletableMergeConfig.DEFAULT, source1, source2).andThen(Single.just(true));
399+
return mergeArray(StandardConcurrentConfig.MAX_DEFAULT, source1, source2).andThen(Single.just(true));
400400
}
401401

402402
/**
@@ -773,7 +773,7 @@ public static Completable fromSupplier(@NonNull Supplier<?> supplier) {
773773
@NonNull
774774
@SchedulerSupport(SchedulerSupport.NONE)
775775
public static Completable mergeArray(@NonNull CompletableSource... sources) {
776-
return mergeArray(CompletableMergeConfig.DEFAULT, sources);
776+
return mergeArray(StandardConcurrentConfig.MAX_DEFAULT, sources);
777777
}
778778

779779
/**
@@ -804,7 +804,7 @@ public static Completable mergeArray(@NonNull CompletableSource... sources) {
804804
@NonNull
805805
@SchedulerSupport(SchedulerSupport.NONE)
806806
public static Completable merge(@NonNull Iterable<@NonNull ? extends CompletableSource> sources) {
807-
return merge(sources, CompletableMergeConfig.DEFAULT);
807+
return merge(sources, StandardConcurrentConfig.MAX_DEFAULT);
808808
}
809809

810810
/**
@@ -839,7 +839,7 @@ public static Completable merge(@NonNull Iterable<@NonNull ? extends Completable
839839
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
840840
@NonNull
841841
public static Completable merge(@NonNull Publisher<@NonNull ? extends CompletableSource> sources) {
842-
return merge(sources, new CompletableMergeConfig(false, Integer.MAX_VALUE));
842+
return merge(sources, StandardConcurrentConfig.MAX_DEFAULT);
843843
}
844844

845845
/**
@@ -878,7 +878,7 @@ public static Completable merge(@NonNull Publisher<@NonNull ? extends Completabl
878878
@BackpressureSupport(BackpressureKind.FULL)
879879
@NonNull
880880
public static Completable merge(@NonNull Publisher<@NonNull ? extends CompletableSource> sources,
881-
@NonNull CompletableMergeConfig config) {
881+
@NonNull StandardConcurrentConfig config) {
882882
Objects.requireNonNull(sources, "sources is null");
883883
Objects.requireNonNull(config, "config is null");
884884
return RxJavaPlugins.onAssembly(new CompletableMerge(sources, config.maxConcurrency(), config.delayErrors()));
@@ -902,7 +902,7 @@ public static Completable merge(@NonNull Publisher<@NonNull ? extends Completabl
902902
@CheckReturnValue
903903
@NonNull
904904
@SchedulerSupport(SchedulerSupport.NONE)
905-
public static Completable mergeArray(@NonNull CompletableMergeConfig config, @NonNull CompletableSource... sources) {
905+
public static Completable mergeArray(@NonNull StandardConcurrentConfig config, @NonNull CompletableSource... sources) {
906906
Objects.requireNonNull(sources, "sources is null");
907907
Objects.requireNonNull(config, "config is null");
908908
if (sources.length == 0) {
@@ -911,11 +911,13 @@ public static Completable mergeArray(@NonNull CompletableMergeConfig config, @No
911911
if (sources.length == 1) {
912912
return wrap(sources[0]);
913913
}
914-
if (config.delayErrors()) {
915-
return RxJavaPlugins.onAssembly(new CompletableMergeArrayDelayError(sources /* TODO , config.maxConcurrency() */));
914+
if (config.maxConcurrency() >= sources.length) {
915+
if (config.delayErrors()) {
916+
return RxJavaPlugins.onAssembly(new CompletableMergeArrayDelayError(sources));
917+
}
918+
return RxJavaPlugins.onAssembly(new CompletableMergeArray(sources));
916919
}
917-
return RxJavaPlugins.onAssembly(new CompletableMergeArray(sources /* TODO , config.maxConcurrency() */));
918-
920+
return Flowable.fromArray(sources).flatMapCompletable(Functions.identity(), config.delayErrors(), config.maxConcurrency());
919921
}
920922

921923
/**
@@ -932,18 +934,22 @@ public static Completable mergeArray(@NonNull CompletableMergeConfig config, @No
932934
* @param config the configuration record for this operator
933935
* @return the new {@code Completable} instance
934936
* @throws NullPointerException if {@code sources} or {@code config} is {@code null}
937+
* @see Completable#merge(Iterable)
935938
* @since 4.0.0
936939
*/
937940
@CheckReturnValue
938941
@NonNull
939942
@SchedulerSupport(SchedulerSupport.NONE)
940-
public static Completable merge(@NonNull Iterable<@NonNull ? extends CompletableSource> sources, @NonNull CompletableMergeConfig config) {
943+
public static Completable merge(@NonNull Iterable<@NonNull ? extends CompletableSource> sources, @NonNull StandardConcurrentConfig config) {
941944
Objects.requireNonNull(sources, "sources is null");
942945
Objects.requireNonNull(config, "config is null");
943-
if (config.delayErrors()) {
944-
return RxJavaPlugins.onAssembly(new CompletableMergeDelayErrorIterable(sources /* TODO , config.maxConcurrency() */));
946+
if (config.maxConcurrency() == Integer.MAX_VALUE) {
947+
if (config.delayErrors()) {
948+
return RxJavaPlugins.onAssembly(new CompletableMergeDelayErrorIterable(sources));
949+
}
950+
return RxJavaPlugins.onAssembly(new CompletableMergeIterable(sources));
945951
}
946-
return RxJavaPlugins.onAssembly(new CompletableMergeIterable(sources /* TODO , config.maxConcurrency() */));
952+
return Flowable.fromIterable(sources).flatMapCompletable(Functions.identity(), config.delayErrors(), config.maxConcurrency());
947953
}
948954

949955
/**

0 commit comments

Comments
 (0)