Skip to content

Commit 4d8e19d

Browse files
authored
4.x: API turn Completable concatX and mergeX into config-based (#8166)
1 parent 78c35cb commit 4d8e19d

11 files changed

Lines changed: 348 additions & 260 deletions

File tree

src/main/java/io/reactivex/rxjava4/core/Completable.java

Lines changed: 78 additions & 194 deletions
Large diffs are not rendered by default.
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
17+
18+
/**
19+
* Configuration record for Completable.concat() operators.
20+
* @param delayError should the error propagation be delayed?
21+
* @param prefetch the number of source sequences to request from a backpressured source
22+
* @since 4.0.0
23+
*/
24+
public record CompletableConcatConfig(boolean delayError, int prefetch) {
25+
26+
/**
27+
* The default configuration with no error delays and prefetch of 2.
28+
*/
29+
public static final CompletableConcatConfig DEFAULT = new CompletableConcatConfig(false, 2);
30+
31+
/**
32+
* The default configuration with error delays and prefetch of 2.
33+
*/
34+
public static final CompletableConcatConfig DELAY_ERROR = new CompletableConcatConfig(true, 2);
35+
36+
/**
37+
* Constructs a configuration record.
38+
* @param delayError should the error propagation be delayed?
39+
*/
40+
public CompletableConcatConfig(boolean delayError) {
41+
this(delayError, 2);
42+
}
43+
44+
/**
45+
* Constructs a configuration record.
46+
* @param prefetch the number of source sequences to request from a backpressured source
47+
*/
48+
public CompletableConcatConfig(int prefetch) {
49+
this(false, prefetch);
50+
}
51+
52+
/**
53+
* Constructs a configuration record.
54+
* @param delayError should the error propagation be delayed?
55+
* @param prefetch the number of source sequences to request from a backpressured source
56+
*/
57+
public CompletableConcatConfig(boolean delayError, int prefetch) {
58+
ObjectHelper.verifyPositive(prefetch, "prefetch");
59+
this.delayError = delayError;
60+
this.prefetch = prefetch;
61+
}
62+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import io.reactivex.rxjava4.core.Flowable;
17+
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
18+
19+
/**
20+
* Configuration record for Completable.merge() operators.
21+
* @param delayErrors should the error propagation be delayed?
22+
* @param maxConcurrency the number of source sequences run concurrently
23+
* @since 4.0.0
24+
*/
25+
public record CompletableMergeConfig(boolean delayErrors, int maxConcurrency) {
26+
27+
/**
28+
* The default config with no error delay and Flowable#bufferSize() as the maximum concurrency setting.
29+
*/
30+
public static final CompletableMergeConfig DEFAULT = new CompletableMergeConfig(false, Flowable.bufferSize());
31+
32+
/**
33+
* The default config with error delay and Flowable#bufferSize() as the maximum concurrency setting.
34+
*/
35+
public static final CompletableMergeConfig DELAY_ERRORS = new CompletableMergeConfig(true, Flowable.bufferSize());
36+
37+
/**
38+
* Constructs a configuration record.
39+
* @param delayErrors should the error propagation be delayed?
40+
*/
41+
public CompletableMergeConfig(boolean delayErrors) {
42+
this(delayErrors, 2);
43+
}
44+
45+
/**
46+
* Constructs a configuration record.
47+
* @param maxConcurrency the number of source sequences run concurrently
48+
*/
49+
public CompletableMergeConfig(int maxConcurrency) {
50+
this(false, maxConcurrency);
51+
}
52+
53+
/**
54+
* Constructs a configuration record.
55+
* @param delayErrors should the error propagation be delayed?
56+
* @param maxConcurrency the number of source sequences run concurrently
57+
*/
58+
public CompletableMergeConfig(boolean delayErrors, int maxConcurrency) {
59+
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
60+
this.delayErrors = delayErrors;
61+
this.maxConcurrency = maxConcurrency;
62+
}
63+
}

src/test/java/io/reactivex/rxjava4/completable/CompletableTest.java

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.reactivex.rxjava4.annotations.NonNull;
2929
import io.reactivex.rxjava4.core.*;
3030
import io.reactivex.rxjava4.core.Observable;
31+
import io.reactivex.rxjava4.core.config.*;
3132
import io.reactivex.rxjava4.disposables.Disposable;
3233
import io.reactivex.rxjava4.exceptions.*;
3334
import io.reactivex.rxjava4.functions.*;
@@ -321,7 +322,7 @@ public void concatObservablePrefetch() {
321322
.repeat(10)
322323
.doOnRequest(requested::add);
323324

324-
Completable c = Completable.concat(cs, 5);
325+
Completable c = Completable.concat(cs, new CompletableConcatConfig(5));
325326

326327
c.blockingAwait();
327328

@@ -683,7 +684,7 @@ public void mergeObservableMaxConcurrent() {
683684
.repeat(10)
684685
.doOnRequest(requested::add);
685686

686-
Completable c = Completable.merge(cs, 5);
687+
Completable c = Completable.merge(cs, new CompletableMergeConfig(5));
687688

688689
c.blockingAwait();
689690

@@ -693,14 +694,14 @@ public void mergeObservableMaxConcurrent() {
693694

694695
@Test
695696
public void mergeDelayErrorEmpty() {
696-
Completable c = Completable.mergeArrayDelayError();
697+
Completable c = Completable.mergeArray(CompletableMergeConfig.DELAY_ERRORS);
697698

698699
c.blockingAwait();
699700
}
700701

701702
@Test
702703
public void mergeDelayErrorSingleSource() {
703-
Completable c = Completable.mergeArrayDelayError(normal.completable);
704+
Completable c = Completable.mergeArray(CompletableMergeConfig.DELAY_ERRORS, normal.completable);
704705

705706
c.blockingAwait();
706707

@@ -709,14 +710,14 @@ public void mergeDelayErrorSingleSource() {
709710

710711
@Test(expected = TestException.class)
711712
public void mergeDelayErrorSingleSourceThrows() {
712-
Completable c = Completable.mergeArrayDelayError(error.completable);
713+
Completable c = Completable.mergeArray(CompletableMergeConfig.DELAY_ERRORS, error.completable);
713714

714715
c.blockingAwait();
715716
}
716717

717718
@Test
718719
public void mergeDelayErrorMultipleSources() {
719-
Completable c = Completable.mergeArrayDelayError(normal.completable, normal.completable, normal.completable);
720+
Completable c = Completable.mergeArray(CompletableMergeConfig.DELAY_ERRORS, normal.completable, normal.completable, normal.completable);
720721

721722
c.blockingAwait();
722723

@@ -725,7 +726,7 @@ public void mergeDelayErrorMultipleSources() {
725726

726727
@Test
727728
public void mergeDelayErrorMultipleOneThrows() {
728-
Completable c = Completable.mergeArrayDelayError(normal.completable, error.completable, normal.completable);
729+
Completable c = Completable.mergeArray(CompletableMergeConfig.DELAY_ERRORS, normal.completable, error.completable, normal.completable);
729730

730731
try {
731732
c.blockingAwait();
@@ -736,28 +737,28 @@ public void mergeDelayErrorMultipleOneThrows() {
736737

737738
@Test(expected = NullPointerException.class)
738739
public void mergeDelayErrorMultipleOneIsNull() {
739-
Completable c = Completable.mergeArrayDelayError(normal.completable, null);
740+
Completable c = Completable.mergeArray(CompletableMergeConfig.DELAY_ERRORS, normal.completable, null);
740741

741742
c.blockingAwait();
742743
}
743744

744745
@Test
745746
public void mergeDelayErrorIterableEmpty() {
746-
Completable c = Completable.mergeDelayError(Collections.<Completable>emptyList());
747+
Completable c = Completable.merge(Collections.<Completable>emptyList(), CompletableMergeConfig.DELAY_ERRORS);
747748

748749
c.blockingAwait();
749750
}
750751

751752
@Test(expected = NullPointerException.class)
752753
public void mergeDelayErrorIterableIteratorNull() {
753-
Completable c = Completable.mergeDelayError((Iterable<Completable>) () -> null);
754+
Completable c = Completable.merge((Iterable<Completable>) () -> null, CompletableMergeConfig.DELAY_ERRORS);
754755

755756
c.blockingAwait();
756757
}
757758

758759
@Test
759760
public void mergeDelayErrorIterableSingle() {
760-
Completable c = Completable.mergeDelayError(Collections.singleton(normal.completable));
761+
Completable c = Completable.merge(Collections.singleton(normal.completable), CompletableMergeConfig.DELAY_ERRORS);
761762

762763
c.blockingAwait();
763764

@@ -766,7 +767,8 @@ public void mergeDelayErrorIterableSingle() {
766767

767768
@Test
768769
public void mergeDelayErrorIterableMany() {
769-
Completable c = Completable.mergeDelayError(Arrays.asList(normal.completable, normal.completable, normal.completable));
770+
Completable c = Completable.merge(
771+
Arrays.asList(normal.completable, normal.completable, normal.completable), CompletableMergeConfig.DELAY_ERRORS);
770772

771773
c.blockingAwait();
772774

@@ -775,14 +777,15 @@ public void mergeDelayErrorIterableMany() {
775777

776778
@Test(expected = TestException.class)
777779
public void mergeDelayErrorIterableOneThrows() {
778-
Completable c = Completable.mergeDelayError(Collections.singleton(error.completable));
780+
Completable c = Completable.merge(Collections.singleton(error.completable), CompletableMergeConfig.DELAY_ERRORS);
779781

780782
c.blockingAwait();
781783
}
782784

783785
@Test
784786
public void mergeDelayErrorIterableManyOneThrows() {
785-
Completable c = Completable.mergeDelayError(Arrays.asList(normal.completable, error.completable, normal.completable));
787+
Completable c = Completable.merge(
788+
Arrays.asList(normal.completable, error.completable, normal.completable), CompletableMergeConfig.DELAY_ERRORS);
786789

787790
try {
788791
c.blockingAwait();
@@ -793,44 +796,44 @@ public void mergeDelayErrorIterableManyOneThrows() {
793796

794797
@Test(expected = TestException.class)
795798
public void mergeDelayErrorIterableIterableThrows() {
796-
Completable c = Completable.mergeDelayError((Iterable<Completable>) () -> {
799+
Completable c = Completable.merge((Iterable<Completable>) () -> {
797800
throw new TestException();
798-
});
801+
}, CompletableMergeConfig.DELAY_ERRORS);
799802

800803
c.blockingAwait();
801804
}
802805

803806
@Test(expected = TestException.class)
804807
public void mergeDelayErrorIterableIteratorHasNextThrows() {
805-
Completable c = Completable.mergeDelayError(new IterableIteratorHasNextThrows());
808+
Completable c = Completable.merge(new IterableIteratorHasNextThrows(), CompletableMergeConfig.DELAY_ERRORS);
806809

807810
c.blockingAwait();
808811
}
809812

810813
@Test(expected = TestException.class)
811814
public void mergeDelayErrorIterableIteratorNextThrows() {
812-
Completable c = Completable.mergeDelayError(new IterableIteratorNextThrows());
815+
Completable c = Completable.merge(new IterableIteratorNextThrows(), CompletableMergeConfig.DELAY_ERRORS);
813816

814817
c.blockingAwait();
815818
}
816819

817820
@Test
818821
public void mergeDelayErrorObservableEmpty() {
819-
Completable c = Completable.mergeDelayError(Flowable.<Completable>empty());
822+
Completable c = Completable.merge(Flowable.<Completable>empty(), new CompletableMergeConfig(true));
820823

821824
c.blockingAwait();
822825
}
823826

824827
@Test(expected = TestException.class)
825828
public void mergeDelayErrorObservableError() {
826-
Completable c = Completable.mergeDelayError(Flowable.<Completable>error(TestException::new));
829+
Completable c = Completable.merge(Flowable.<Completable>error(TestException::new), new CompletableMergeConfig(true));
827830

828831
c.blockingAwait();
829832
}
830833

831834
@Test
832835
public void mergeDelayErrorObservableSingle() {
833-
Completable c = Completable.mergeDelayError(Flowable.just(normal.completable));
836+
Completable c = Completable.merge(Flowable.just(normal.completable), new CompletableMergeConfig(true));
834837

835838
c.blockingAwait();
836839

@@ -839,14 +842,14 @@ public void mergeDelayErrorObservableSingle() {
839842

840843
@Test(expected = TestException.class)
841844
public void mergeDelayErrorObservableSingleThrows() {
842-
Completable c = Completable.mergeDelayError(Flowable.just(error.completable));
845+
Completable c = Completable.merge(Flowable.just(error.completable), new CompletableMergeConfig(true));
843846

844847
c.blockingAwait();
845848
}
846849

847850
@Test
848851
public void mergeDelayErrorObservableMany() {
849-
Completable c = Completable.mergeDelayError(Flowable.just(normal.completable).repeat(3));
852+
Completable c = Completable.merge(Flowable.just(normal.completable).repeat(3), new CompletableMergeConfig(true));
850853

851854
c.blockingAwait();
852855

@@ -855,7 +858,7 @@ public void mergeDelayErrorObservableMany() {
855858

856859
@Test(expected = TestException.class)
857860
public void mergeDelayErrorObservableManyOneThrows() {
858-
Completable c = Completable.mergeDelayError(Flowable.just(normal.completable, error.completable));
861+
Completable c = Completable.merge(Flowable.just(normal.completable, error.completable), new CompletableMergeConfig(true));
859862

860863
c.blockingAwait();
861864
}
@@ -868,7 +871,7 @@ public void mergeDelayErrorObservableMaxConcurrent() {
868871
.repeat(10)
869872
.doOnRequest(requested::add);
870873

871-
Completable c = Completable.mergeDelayError(cs, 5);
874+
Completable c = Completable.merge(cs, new CompletableMergeConfig(true, 5));
872875

873876
c.blockingAwait();
874877

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import static org.junit.jupiter.api.Assertions.*;
17+
18+
import org.junit.jupiter.api.Test;
19+
20+
import io.reactivex.rxjava4.core.RxJavaTest;
21+
22+
public class CompletableConcatConfigTest extends RxJavaTest {
23+
24+
@Test
25+
public void validation() {
26+
assertTrue(new CompletableConcatConfig(true).delayError(), "delayError - true");
27+
assertFalse(new CompletableConcatConfig(false).delayError(), "delayError - false");
28+
assertEquals(5, new CompletableConcatConfig(5).prefetch(), "prefetch - 5");
29+
assertEquals(5, new CompletableConcatConfig(true, 5).prefetch(), "prefetch both - true, 5");
30+
assertEquals(5, new CompletableConcatConfig(false, 5).prefetch(), "prefetch both - false, 5");
31+
assertTrue(new CompletableConcatConfig(true, 5).delayError(), "delayError both - true, 5");
32+
assertFalse(new CompletableConcatConfig(false, 5).delayError(), "delayError both - false, 5");
33+
}
34+
}

0 commit comments

Comments
 (0)