Skip to content

Commit d19355f

Browse files
authored
4.x: Cleanu API: Observable flatMap*, groupBy, observeOn (#8192)
1 parent e2cde40 commit d19355f

22 files changed

Lines changed: 380 additions & 456 deletions

src/main/java/io/reactivex/rxjava4/core/Observable.java

Lines changed: 71 additions & 378 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/rxjava4/core/config/FlatMapConfig.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@
2828
public record FlatMapConfig(boolean delayErrors, int maxConcurrency, int bufferSize) {
2929

3030
/**
31-
* The default config with no error delay and Flowable#bufferSize() as the maximum concurrency setting.
31+
* The default configuration with no error delay and Flowable#bufferSize() as the maximum concurrency setting.
3232
*/
33-
public static final FlatMapConfig DEFAULT = new FlatMapConfig(false, Flowable.bufferSize());
33+
public static final FlatMapConfig DEFAULT = new FlatMapConfig(false);
3434

3535
/**
36-
* The default config with error delay and Flowable#bufferSize() as the maximum concurrency setting.
36+
* The default configuration with error delay and Flowable#bufferSize() as the maximum concurrency setting.
3737
*/
38-
public static final FlatMapConfig DELAY_ERRORS = new FlatMapConfig(true, Flowable.bufferSize());
38+
public static final FlatMapConfig DELAY_ERRORS = new FlatMapConfig(true);
3939

4040
/**
4141
* Optionally delay error, {@link Flowable#bufferSize()} sizes
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import io.reactivex.rxjava4.core.Observable;
17+
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
18+
19+
/**
20+
* Configuration record for Observable.groupBy() operators.
21+
* @param delayError should the error propagation be delayed?
22+
* @param bufferSize the expected number of items to buffer until it can be processed
23+
* @since 4.0.0
24+
*/
25+
public record ObservableGroupByConfig(boolean delayError, int bufferSize) {
26+
27+
/**
28+
* The default configuration with no error delays and bufferSize of Observable.bufferSize().
29+
*/
30+
public static final ObservableGroupByConfig DEFAULT = new ObservableGroupByConfig(false);
31+
32+
/**
33+
* The default configuration with error delays and bufferSize of Observable.bufferSize().
34+
*/
35+
public static final ObservableGroupByConfig DELAY_ERROR = new ObservableGroupByConfig(true);
36+
37+
/**
38+
* Constructs a configuration record.
39+
* @param delayError should the error propagation be delayed?
40+
*/
41+
public ObservableGroupByConfig(boolean delayError) {
42+
this(delayError, Observable.bufferSize());
43+
}
44+
45+
/**
46+
* Constructs a configuration record.
47+
* @param bufferSize the expected number of items to buffer until it can be processed
48+
*/
49+
public ObservableGroupByConfig(int bufferSize) {
50+
this(false, bufferSize);
51+
}
52+
53+
/**
54+
* Constructs a configuration record.
55+
* @param delayError should the error propagation be delayed?
56+
* @param bufferSize the expected number of items to buffer until it can be processed
57+
*/
58+
public ObservableGroupByConfig {
59+
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
60+
}
61+
}

src/main/java/io/reactivex/rxjava4/core/config/ObservableMergeConfig.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,11 @@
1313

1414
package io.reactivex.rxjava4.core.config;
1515

16-
import io.reactivex.rxjava4.core.Flowable;
17-
import io.reactivex.rxjava4.core.Observable;
16+
import io.reactivex.rxjava4.core.*;
1817
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
1918

2019
/**
21-
* Configuration record for Observable.merge() operators.
20+
* Configuration record for Observable.merge() and Observable.flatMap operators.
2221
* @param delayErrors should the error be delayed?
2322
* @param maxConcurrency the maximum number of concurrent flows?
2423
* @param bufferSize what would be the buffer size?
@@ -27,25 +26,25 @@
2726
public record ObservableMergeConfig(boolean delayErrors, int maxConcurrency, int bufferSize) {
2827

2928
/**
30-
* The default configuration with no error delays and bufferSize of {@link Observable#bufferSize()}.
29+
* The default configuration with no error delays, MAX_VALUE concurrency and bufferSize of {@link Observable#bufferSize()}.
3130
*/
32-
public static final ObservableMergeConfig DEFAULT = new ObservableMergeConfig(false, Observable.bufferSize());
31+
public static final ObservableMergeConfig DEFAULT = new ObservableMergeConfig(false);
3332

3433
/**
35-
* The default configuration with error delays and bufferSize of {@link Observable#bufferSize()}.
34+
* The default configuration with error delays, MAX_VALUE concurrency and bufferSize of {@link Observable#bufferSize()}.
3635
*/
37-
public static final ObservableMergeConfig DELAY_ERROR = new ObservableMergeConfig(true, Observable.bufferSize());
36+
public static final ObservableMergeConfig DELAY_ERROR = new ObservableMergeConfig(true);
3837

3938
/**
40-
* Optionally delay error, {@link Flowable#bufferSize()} sizes
39+
* Optionally delay error, {@link Observable#bufferSize()} sizes
4140
* @param delayErrors should the error be delayed?
4241
*/
4342
public ObservableMergeConfig(boolean delayErrors) {
44-
this(delayErrors, Flowable.bufferSize(), Flowable.bufferSize());
43+
this(delayErrors, Integer.MAX_VALUE, Observable.bufferSize());
4544
}
4645

4746
/**
48-
* Optionally set the buffer size, no delay errors.
47+
* Optionally set the maximum concurrency levels, no errors and a buffer size of {@link Observable#bufferSize()}.
4948
* @param maxConcurrency the maximum number of concurrent flows
5049
*/
5150
public ObservableMergeConfig(int maxConcurrency) {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import io.reactivex.rxjava4.core.Observable;
17+
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
18+
19+
/**
20+
* Configuration record for Observable.observeOn() operators.
21+
* @param delayError should the error propagation be delayed?
22+
* @param bufferSize the expected number of items to cache from the upstream
23+
* @since 4.0.0
24+
*/
25+
public record ObservableObserveOnConfig(boolean delayError, int bufferSize) {
26+
27+
/**
28+
* The default configuration with no error delays and bufferSize of Observable.bufferSize().
29+
*/
30+
public static final ObservableObserveOnConfig DEFAULT = new ObservableObserveOnConfig(false);
31+
32+
/**
33+
* The default configuration with error delays and bufferSize of Observable.bufferSize().
34+
*/
35+
public static final ObservableObserveOnConfig DELAY_ERROR = new ObservableObserveOnConfig(true);
36+
37+
/**
38+
* Constructs a configuration record.
39+
* @param delayError should the error propagation be delayed?
40+
*/
41+
public ObservableObserveOnConfig(boolean delayError) {
42+
this(delayError, Observable.bufferSize());
43+
}
44+
45+
/**
46+
* Constructs a configuration record.
47+
* @param bufferSize the expected number of items to cache from the upstream
48+
*/
49+
public ObservableObserveOnConfig(int bufferSize) {
50+
this(false, bufferSize);
51+
}
52+
53+
/**
54+
* Constructs a configuration record.
55+
* @param delayError should the error propagation be delayed?
56+
* @param bufferSize the expected number of items to cache from the upstream
57+
*/
58+
public ObservableObserveOnConfig {
59+
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
60+
}
61+
}

src/main/java/io/reactivex/rxjava4/core/config/ObservableZipConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ public record ObservableZipConfig(boolean delayError, int bufferSize) {
2727
/**
2828
* The default configuration with no error delays and bufferSize of Observable.bufferSize().
2929
*/
30-
public static final ObservableZipConfig DEFAULT = new ObservableZipConfig(false, Observable.bufferSize());
30+
public static final ObservableZipConfig DEFAULT = new ObservableZipConfig(false);
3131

3232
/**
3333
* The default configuration with error delays and bufferSize of Observable.bufferSize().
3434
*/
35-
public static final ObservableZipConfig DELAY_ERROR = new ObservableZipConfig(true, Observable.bufferSize());
35+
public static final ObservableZipConfig DELAY_ERROR = new ObservableZipConfig(true);
3636

3737
/**
3838
* Constructs a configuration record.

src/main/java/io/reactivex/rxjava4/internal/operators/observable/ObservableFlatMapCompletableCompletable.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,19 @@ public final class ObservableFlatMapCompletableCompletable<T> extends Completabl
3838

3939
final boolean delayErrors;
4040

41+
final int bufferSize;
42+
4143
public ObservableFlatMapCompletableCompletable(ObservableSource<T> source,
42-
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
44+
Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors, int bufferSize) {
4345
this.source = source;
4446
this.mapper = mapper;
4547
this.delayErrors = delayErrors;
48+
this.bufferSize = bufferSize;
4649
}
4750

4851
@Override
4952
protected void subscribeActual(CompletableObserver observer) {
50-
source.subscribe(new FlatMapCompletableMainObserver<>(observer, mapper, delayErrors));
53+
source.subscribe(new FlatMapCompletableMainObserver<>(observer, mapper, delayErrors, bufferSize));
5154
}
5255

5356
@Override
@@ -67,16 +70,20 @@ static final class FlatMapCompletableMainObserver<T> extends AtomicInteger imple
6770

6871
final boolean delayErrors;
6972

73+
final int bufferSize;
74+
7075
final CompositeDisposable set;
7176

7277
Disposable upstream;
7378

7479
volatile boolean disposed;
7580

76-
FlatMapCompletableMainObserver(CompletableObserver observer, Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
81+
FlatMapCompletableMainObserver(CompletableObserver observer, Function<? super T, ? extends CompletableSource> mapper,
82+
boolean delayErrors, int bufferSize) {
7783
this.downstream = observer;
7884
this.mapper = mapper;
7985
this.delayErrors = delayErrors;
86+
this.bufferSize = bufferSize;
8087
this.errors = new AtomicThrowable();
8188
this.set = new CompositeDisposable();
8289
this.lazySet(1);

src/main/java/io/reactivex/rxjava4/internal/operators/observable/ObservableFlatMapMaybe.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,20 @@ public final class ObservableFlatMapMaybe<T, R> extends AbstractObservableWithUp
3636

3737
final boolean delayErrors;
3838

39+
final int bufferSize;
40+
3941
public ObservableFlatMapMaybe(ObservableSource<T> source, Function<? super T, ? extends MaybeSource<? extends R>> mapper,
40-
boolean delayError) {
42+
boolean delayError,
43+
int bufferSize) {
4144
super(source);
4245
this.mapper = mapper;
4346
this.delayErrors = delayError;
47+
this.bufferSize = bufferSize;
4448
}
4549

4650
@Override
4751
protected void subscribeActual(Observer<? super R> observer) {
48-
source.subscribe(new FlatMapMaybeObserver<>(observer, mapper, delayErrors));
52+
source.subscribe(new FlatMapMaybeObserver<>(observer, mapper, delayErrors, bufferSize));
4953
}
5054

5155
static final class FlatMapMaybeObserver<T, R>
@@ -59,6 +63,8 @@ static final class FlatMapMaybeObserver<T, R>
5963

6064
final boolean delayErrors;
6165

66+
final int bufferSize;
67+
6268
final CompositeDisposable set;
6369

6470
final AtomicInteger active;
@@ -74,10 +80,11 @@ static final class FlatMapMaybeObserver<T, R>
7480
volatile boolean cancelled;
7581

7682
FlatMapMaybeObserver(Observer<? super R> actual,
77-
Function<? super T, ? extends MaybeSource<? extends R>> mapper, boolean delayErrors) {
83+
Function<? super T, ? extends MaybeSource<? extends R>> mapper, boolean delayErrors, int bufferSize) {
7884
this.downstream = actual;
7985
this.mapper = mapper;
8086
this.delayErrors = delayErrors;
87+
this.bufferSize = bufferSize;
8188
this.set = new CompositeDisposable();
8289
this.errors = new AtomicThrowable();
8390
this.active = new AtomicInteger(1);
@@ -178,7 +185,7 @@ SpscLinkedArrayQueue<R> getOrCreateQueue() {
178185
if (current != null) {
179186
return current;
180187
}
181-
current = new SpscLinkedArrayQueue<>(Observable.bufferSize());
188+
current = new SpscLinkedArrayQueue<>(bufferSize);
182189
if (queue.compareAndSet(null, current)) {
183190
return current;
184191
}

src/main/java/io/reactivex/rxjava4/internal/operators/observable/ObservableFlatMapSingle.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,21 @@ public final class ObservableFlatMapSingle<T, R> extends AbstractObservableWithU
3636

3737
final boolean delayErrors;
3838

39-
public ObservableFlatMapSingle(ObservableSource<T> source, Function<? super T, ? extends SingleSource<? extends R>> mapper,
40-
boolean delayError) {
39+
final int bufferSize;
40+
41+
public ObservableFlatMapSingle(ObservableSource<T> source,
42+
Function<? super T, ? extends SingleSource<? extends R>> mapper,
43+
boolean delayError,
44+
int bufferSize) {
4145
super(source);
4246
this.mapper = mapper;
4347
this.delayErrors = delayError;
48+
this.bufferSize = bufferSize;
4449
}
4550

4651
@Override
4752
protected void subscribeActual(Observer<? super R> observer) {
48-
source.subscribe(new FlatMapSingleObserver<>(observer, mapper, delayErrors));
53+
source.subscribe(new FlatMapSingleObserver<>(observer, mapper, delayErrors, bufferSize));
4954
}
5055

5156
static final class FlatMapSingleObserver<T, R>
@@ -59,6 +64,8 @@ static final class FlatMapSingleObserver<T, R>
5964

6065
final boolean delayErrors;
6166

67+
final int bufferSize;
68+
6269
final CompositeDisposable set;
6370

6471
final AtomicInteger active;
@@ -74,10 +81,12 @@ static final class FlatMapSingleObserver<T, R>
7481
volatile boolean cancelled;
7582

7683
FlatMapSingleObserver(Observer<? super R> actual,
77-
Function<? super T, ? extends SingleSource<? extends R>> mapper, boolean delayErrors) {
84+
Function<? super T, ? extends SingleSource<? extends R>> mapper,
85+
boolean delayErrors, int bufferSize) {
7886
this.downstream = actual;
7987
this.mapper = mapper;
8088
this.delayErrors = delayErrors;
89+
this.bufferSize = bufferSize;
8190
this.set = new CompositeDisposable();
8291
this.errors = new AtomicThrowable();
8392
this.active = new AtomicInteger(1);
@@ -178,7 +187,7 @@ SpscLinkedArrayQueue<R> getOrCreateQueue() {
178187
if (current != null) {
179188
return current;
180189
}
181-
current = new SpscLinkedArrayQueue<>(Observable.bufferSize());
190+
current = new SpscLinkedArrayQueue<>(bufferSize);
182191
if (queue.compareAndSet(null, current)) {
183192
return current;
184193
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import io.reactivex.rxjava4.core.RxJavaTest;
17+
import org.junit.jupiter.api.Test;
18+
19+
import static org.junit.jupiter.api.Assertions.*;
20+
21+
public class ObservableGroupByConfigTest extends RxJavaTest {
22+
23+
@Test
24+
public void validation() {
25+
assertTrue(new ObservableGroupByConfig(true).delayError(), "delayError - true");
26+
assertFalse(new ObservableGroupByConfig(false).delayError(), "delayError - false");
27+
assertEquals(5, new ObservableGroupByConfig(5).bufferSize(), "bufferSize - 5");
28+
assertEquals(5, new ObservableGroupByConfig(true, 5).bufferSize(), "bufferSize both - true, 5");
29+
assertEquals(5, new ObservableGroupByConfig(false, 5).bufferSize(), "bufferSize both - false, 5");
30+
assertTrue(new ObservableGroupByConfig(true, 5).delayError(), "delayError both - true, 5");
31+
assertFalse(new ObservableGroupByConfig(false, 5).delayError(), "delayError both - false, 5");
32+
}
33+
}

0 commit comments

Comments
 (0)