Skip to content

Commit e2cde40

Browse files
authored
4.x: Cleanup API reduction: concat { Array, Eager, XYZ } (#8191)
* 4.x: Cleanup API reduction: concat { Array, Eager, XYZ } * Swap unit test contents due to bad copy-paste-edit
1 parent c073e5c commit e2cde40

16 files changed

Lines changed: 392 additions & 743 deletions

src/main/java/io/reactivex/rxjava4/core/ErrorMode.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,15 @@
1414
package io.reactivex.rxjava4.core;
1515

1616
/**
17-
* Indicates when an error from the main source should be reported.
17+
* Indicates when an error from the any of the involved sources should be handled.
18+
* <p>
19+
* Usually appears with {@code concat} and {@code concatMap} operators where the outer and inner source(s)
20+
* may error out in the middle of streaming and the user would like to finish the current source before
21+
* cancelling the rest and signaling the error(s) to the consumers.
1822
* @since 4.0.0
1923
*/
2024
public enum ErrorMode {
21-
/** Report the error immediately, cancelling the active inner source. */
25+
/** Report the error immediately, cancelling the active sources. */
2226
IMMEDIATE,
2327
/** Report error after an inner source terminated. */
2428
BOUNDARY,

src/main/java/io/reactivex/rxjava4/core/Observable.java

Lines changed: 61 additions & 561 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/rxjava4/core/config/ObservableConcatConfig.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,17 @@ public record ObservableConcatConfig(@NonNull ErrorMode errorMode, int bufferSiz
2929
/**
3030
* The default configuration with no error delays and bufferSize of {@link Observable#bufferSize()}.
3131
*/
32-
public static final ObservableConcatConfig DEFAULT = new ObservableConcatConfig(ErrorMode.IMMEDIATE, Observable.bufferSize());
32+
public static final ObservableConcatConfig DEFAULT = new ObservableConcatConfig(ErrorMode.IMMEDIATE);
3333

3434
/**
35-
* The default configuration with error delays and bufferSize of {@link Observable#bufferSize()}.
35+
* The default configuration with error delays till the end and bufferSize of {@link Observable#bufferSize()}.
3636
*/
37-
public static final ObservableConcatConfig DELAY_ERROR = new ObservableConcatConfig(ErrorMode.END, Observable.bufferSize());
37+
public static final ObservableConcatConfig DELAY_ERROR = new ObservableConcatConfig(ErrorMode.END);
3838

3939
/**
40-
* The default configuration with error delays and bufferSize of {@link Observable#bufferSize()}.
40+
* The default configuration with error delays till the boundary and bufferSize of {@link Observable#bufferSize()}.
4141
*/
42-
public static final ObservableConcatConfig DELAY_ERROR_BOUNDARY = new ObservableConcatConfig(ErrorMode.BOUNDARY, Observable.bufferSize());
42+
public static final ObservableConcatConfig DELAY_ERROR_BOUNDARY = new ObservableConcatConfig(ErrorMode.BOUNDARY);
4343

4444
/**
4545
* Constructs a configuration record.

src/main/java/io/reactivex/rxjava4/core/config/ObservableConcatEagerConfig.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,32 +29,37 @@ public record ObservableConcatEagerConfig(@NonNull ErrorMode errorMode, int maxC
2929
/**
3030
* The default configuration with no error delays, maxConcurrency and bufferSize of {@link Observable#bufferSize()}.
3131
*/
32-
public static final ObservableConcatEagerConfig DEFAULT = new ObservableConcatEagerConfig(ErrorMode.IMMEDIATE, Observable.bufferSize());
32+
public static final ObservableConcatEagerConfig DEFAULT = new ObservableConcatEagerConfig(ErrorMode.IMMEDIATE);
3333

3434
/**
3535
* The default configuration with error delays, maxConcurrency and bufferSize of {@link Observable#bufferSize()}.
3636
*/
37-
public static final ObservableConcatEagerConfig DELAY_ERROR = new ObservableConcatEagerConfig(ErrorMode.END, Observable.bufferSize());
37+
public static final ObservableConcatEagerConfig DELAY_ERROR = new ObservableConcatEagerConfig(ErrorMode.END);
3838

3939
/**
4040
* The default configuration with error delays, maxConcurrency and bufferSize of {@link Observable#bufferSize()}.
4141
*/
42-
public static final ObservableConcatEagerConfig DELAY_ERROR_BOUNDARY = new ObservableConcatEagerConfig(ErrorMode.BOUNDARY, Observable.bufferSize());
42+
public static final ObservableConcatEagerConfig DELAY_ERROR_BOUNDARY = new ObservableConcatEagerConfig(ErrorMode.BOUNDARY);
4343

4444
/**
45-
* Optionally delay error, {@link Flowable#bufferSize()} sizes
45+
* The default configuration with no error delays, maxConcurrency of MAX_INT and bufferSize of {@link Observable#bufferSize()}.
46+
*/
47+
public static final ObservableConcatEagerConfig MAX_DEFAULT = new ObservableConcatEagerConfig(ErrorMode.IMMEDIATE, Integer.MAX_VALUE);
48+
49+
/**
50+
* Optionally delay error, {@link Observable#bufferSize()} sizes
4651
* @param errorMode how to handle when errors appear from the inner or outer sources
4752
*/
4853
public ObservableConcatEagerConfig(ErrorMode errorMode) {
49-
this(errorMode, Flowable.bufferSize(), Flowable.bufferSize());
54+
this(errorMode, Observable.bufferSize(), Observable.bufferSize());
5055
}
5156

5257
/**
5358
* Optionally set the buffer size, no delay errors.
5459
* @param maxConcurrency the maximum number of concurrent flows
5560
*/
5661
public ObservableConcatEagerConfig(int maxConcurrency) {
57-
this(ErrorMode.IMMEDIATE, maxConcurrency, Flowable.bufferSize());
62+
this(ErrorMode.IMMEDIATE, maxConcurrency, Observable.bufferSize());
5863
}
5964

6065
/**
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import io.reactivex.rxjava4.annotations.NonNull;
17+
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
18+
import io.reactivex.rxjava4.core.ErrorMode;
19+
20+
/**
21+
* Configuration record for Observable.concatMap() operators.
22+
* @param errorMode how to handle when errors appear from the inner or outer sources
23+
* @param bufferSize the expected number of outer sources to buffer while processing an inner source
24+
* @since 4.0.0
25+
*/
26+
public record ObservableConcatMapConfig(@NonNull ErrorMode errorMode, int bufferSize) {
27+
28+
/**
29+
* The default configuration with no error delays and bufferSize of 2.
30+
*/
31+
public static final ObservableConcatMapConfig DEFAULT = new ObservableConcatMapConfig(ErrorMode.IMMEDIATE, 2);
32+
33+
/**
34+
* The default configuration with error delays till the end and bufferSize of 2.
35+
*/
36+
public static final ObservableConcatMapConfig DELAY_ERROR = new ObservableConcatMapConfig(ErrorMode.END, 2);
37+
38+
/**
39+
* The default configuration with error delays till the boundary and bufferSize of 2.
40+
*/
41+
public static final ObservableConcatMapConfig DELAY_ERROR_BOUNDARY = new ObservableConcatMapConfig(ErrorMode.BOUNDARY, 2);
42+
43+
/**
44+
* Constructs a configuration record.
45+
* @param errorMode how to handle when errors appear from the inner or outer sources
46+
*/
47+
public ObservableConcatMapConfig(@NonNull ErrorMode errorMode) {
48+
this(errorMode, 2);
49+
}
50+
51+
/**
52+
* Constructs a configuration record.
53+
* @param bufferSize the expected number of outer sources to buffer while processing an inner source
54+
*/
55+
public ObservableConcatMapConfig(int bufferSize) {
56+
this(ErrorMode.IMMEDIATE, bufferSize);
57+
}
58+
59+
/**
60+
* Constructs a configuration record.
61+
* @param errorMode how to handle when errors appear from the inner or outer sources
62+
* @param bufferSize the expected number of outer sources to buffer while processing an inner source
63+
*/
64+
public ObservableConcatMapConfig {
65+
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
66+
}
67+
}

src/test/java/io/reactivex/rxjava4/core/config/ObservableConcatConfigTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@ public class ObservableConcatConfigTest extends RxJavaTest {
2323

2424
@Test
2525
public void validation() {
26-
assertEquals(ErrorMode.IMMEDIATE, new ObservableConcatConfig(ErrorMode.IMMEDIATE).errorMode(), "errorMode - true");
27-
assertEquals(ErrorMode.BOUNDARY, new ObservableConcatConfig(ErrorMode.BOUNDARY).errorMode(), "errorMode - false");
28-
assertEquals(ErrorMode.END, new ObservableConcatConfig(ErrorMode.END).errorMode(), "errorMode - false");
26+
assertEquals(ErrorMode.IMMEDIATE, new ObservableConcatConfig(ErrorMode.IMMEDIATE).errorMode(), "errorMode - IMMEDIATE");
27+
assertEquals(ErrorMode.BOUNDARY, new ObservableConcatConfig(ErrorMode.BOUNDARY).errorMode(), "errorMode - BOUNDARY");
28+
assertEquals(ErrorMode.END, new ObservableConcatConfig(ErrorMode.END).errorMode(), "errorMode - END");
2929
assertEquals(5, new ObservableConcatConfig(5).bufferSize(), "bufferSize - 5");
30-
assertEquals(5, new ObservableConcatConfig(ErrorMode.IMMEDIATE, 5).bufferSize(), "bufferSize both - true, 5");
31-
assertEquals(5, new ObservableConcatConfig(ErrorMode.BOUNDARY, 5).bufferSize(), "bufferSize both - false, 5");
32-
assertEquals(5, new ObservableConcatConfig(ErrorMode.END, 5).bufferSize(), "bufferSize both - false, 5");
33-
assertEquals(ErrorMode.IMMEDIATE, new ObservableConcatConfig(ErrorMode.IMMEDIATE, 5).errorMode(), "errorMode both - true, 5");
34-
assertEquals(ErrorMode.BOUNDARY, new ObservableConcatConfig(ErrorMode.BOUNDARY, 5).errorMode(), "errorMode both - false, 5");
35-
assertEquals(ErrorMode.END, new ObservableConcatConfig(ErrorMode.END, 5).errorMode(), "errorMode both - false, 5");
30+
assertEquals(5, new ObservableConcatConfig(ErrorMode.IMMEDIATE, 5).bufferSize(), "bufferSize both - IMMEDIATE, 5");
31+
assertEquals(5, new ObservableConcatConfig(ErrorMode.BOUNDARY, 5).bufferSize(), "bufferSize both - BOUNDARY, 5");
32+
assertEquals(5, new ObservableConcatConfig(ErrorMode.END, 5).bufferSize(), "bufferSize both - END, 5");
33+
assertEquals(ErrorMode.IMMEDIATE, new ObservableConcatConfig(ErrorMode.IMMEDIATE, 5).errorMode(), "errorMode both - IMMEDIATE, 5");
34+
assertEquals(ErrorMode.BOUNDARY, new ObservableConcatConfig(ErrorMode.BOUNDARY, 5).errorMode(), "errorMode both - BOUNDARY, 5");
35+
assertEquals(ErrorMode.END, new ObservableConcatConfig(ErrorMode.END, 5).errorMode(), "errorMode both - END, 5");
3636
}
3737
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import io.reactivex.rxjava4.core.ErrorMode;
17+
import io.reactivex.rxjava4.core.RxJavaTest;
18+
import org.junit.jupiter.api.Test;
19+
20+
import static org.junit.jupiter.api.Assertions.*;
21+
22+
public class ObservableConcatMapConfigTest extends RxJavaTest {
23+
24+
@Test
25+
public void validation() {
26+
assertEquals(ErrorMode.IMMEDIATE, new ObservableConcatMapConfig(ErrorMode.IMMEDIATE).errorMode(), "errorMode - IMMEDIATE");
27+
assertEquals(ErrorMode.BOUNDARY, new ObservableConcatMapConfig(ErrorMode.BOUNDARY).errorMode(), "errorMode - BOUNDARY");
28+
assertEquals(ErrorMode.END, new ObservableConcatMapConfig(ErrorMode.END).errorMode(), "errorMode - END");
29+
assertEquals(5, new ObservableConcatMapConfig(5).bufferSize(), "bufferSize - 5");
30+
assertEquals(5, new ObservableConcatMapConfig(ErrorMode.IMMEDIATE, 5).bufferSize(), "bufferSize both - IMMEDIATE, 5");
31+
assertEquals(5, new ObservableConcatMapConfig(ErrorMode.BOUNDARY, 5).bufferSize(), "bufferSize both - BOUNDARY, 5");
32+
assertEquals(5, new ObservableConcatMapConfig(ErrorMode.END, 5).bufferSize(), "bufferSize both - END, 5");
33+
assertEquals(ErrorMode.IMMEDIATE, new ObservableConcatMapConfig(ErrorMode.IMMEDIATE, 5).errorMode(), "errorMode both - IMMEDIATE, 5");
34+
assertEquals(ErrorMode.BOUNDARY, new ObservableConcatMapConfig(ErrorMode.BOUNDARY, 5).errorMode(), "errorMode both - BOUNDARY, 5");
35+
assertEquals(ErrorMode.END, new ObservableConcatMapConfig(ErrorMode.END, 5).errorMode(), "errorMode both - END, 5");
36+
}
37+
}

src/test/java/io/reactivex/rxjava4/internal/operators/mixed/ObservableConcatMapCompletableTest.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
import org.junit.Test;
2222

2323
import io.reactivex.rxjava4.core.*;
24+
import io.reactivex.rxjava4.core.config.ObservableConcatMapConfig;
2425
import io.reactivex.rxjava4.exceptions.*;
25-
import io.reactivex.rxjava4.functions.*;
26+
import io.reactivex.rxjava4.functions.Function;
2627
import io.reactivex.rxjava4.internal.functions.Functions;
2728
import io.reactivex.rxjava4.internal.schedulers.ImmediateThinScheduler;
2829
import io.reactivex.rxjava4.observers.TestObserver;
@@ -54,7 +55,7 @@ public void simple2() {
5455
@Test
5556
public void simpleLongPrefetch() {
5657
Observable.range(1, 1024)
57-
.concatMapCompletable(Functions.justFunction(Completable.complete()), 32)
58+
.concatMapCompletable(Functions.justFunction(Completable.complete()), new ObservableConcatMapConfig(32))
5859
.test()
5960
.assertResult();
6061
}
@@ -78,8 +79,9 @@ public void innerError() {
7879
@Test
7980
public void innerErrorDelayed() {
8081
TestObserverEx<Void> to = Observable.range(1, 5)
81-
.concatMapCompletableDelayError(
82-
_ -> Completable.error(new TestException())
82+
.concatMapCompletable(
83+
_ -> Completable.error(new TestException()),
84+
ObservableConcatMapConfig.DELAY_ERROR
8385
)
8486
.to(TestHelper.<Void>testConsumer())
8587
.assertFailure(CompositeException.class)
@@ -161,8 +163,9 @@ public void boundaryError() {
161163
PublishSubject<Integer> ps = PublishSubject.create();
162164
CompletableSubject cs = CompletableSubject.create();
163165

164-
TestObserver<Void> to = ps.concatMapCompletableDelayError(
165-
Functions.justFunction(cs), false).test();
166+
TestObserver<Void> to = ps.concatMapCompletable(
167+
Functions.justFunction(cs), ObservableConcatMapConfig.DELAY_ERROR_BOUNDARY)
168+
.test();
166169

167170
to.assertEmpty();
168171

@@ -190,13 +193,13 @@ public void endError() {
190193
final CompletableSubject cs = CompletableSubject.create();
191194
final CompletableSubject cs2 = CompletableSubject.create();
192195

193-
TestObserver<Void> to = ps.concatMapCompletableDelayError(
196+
TestObserver<Void> to = ps.concatMapCompletable(
194197
v -> {
195198
if (v == 1) {
196199
return cs;
197200
}
198201
return cs2;
199-
}, true, 32
202+
}, new ObservableConcatMapConfig(ErrorMode.END, 32)
200203
)
201204
.test();
202205

@@ -391,13 +394,15 @@ public void undeliverableUponCancel() {
391394
@Test
392395
public void undeliverableUponCancelDelayError() {
393396
TestHelper.checkUndeliverableUponCancel((ObservableConverter<Integer, Completable>) upstream ->
394-
upstream.concatMapCompletableDelayError((Function<Integer, Completable>) _ -> Completable.complete().hide(), false, 2));
397+
upstream.concatMapCompletable((Function<Integer, Completable>) _ ->
398+
Completable.complete().hide(), new ObservableConcatMapConfig(ErrorMode.BOUNDARY, 2)));
395399
}
396400

397401
@Test
398402
public void undeliverableUponCancelDelayErrorTillEnd() {
399403
TestHelper.checkUndeliverableUponCancel((ObservableConverter<Integer, Completable>) upstream ->
400-
upstream.concatMapCompletableDelayError((Function<Integer, Completable>) _ -> Completable.complete().hide(), true, 2));
404+
upstream.concatMapCompletable((Function<Integer, Completable>) _ ->
405+
Completable.complete().hide(), new ObservableConcatMapConfig(ErrorMode.END, 2)));
401406
}
402407

403408
@Test

0 commit comments

Comments
 (0)