Skip to content

Commit c073e5c

Browse files
authored
4.x: API reductions: sequenceEqual, switchOnNext, zip (#8190)
* 4.x: API reductions: sequenceEqual, switchOnNext, zip * Fix copy-paste error prefetch -> bufferSize param naming
1 parent d8e5803 commit c073e5c

17 files changed

Lines changed: 424 additions & 351 deletions

src/main/java/io/reactivex/rxjava4/core/Observable.java

Lines changed: 89 additions & 314 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/rxjava4/core/config/CompletableMergeConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.reactivex.rxjava4.core.config;
1515

16-
import io.reactivex.rxjava4.core.Flowable;
16+
import io.reactivex.rxjava4.core.*;
1717
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
1818

1919
/**
@@ -39,7 +39,7 @@ public record CompletableMergeConfig(boolean delayErrors, int maxConcurrency) {
3939
* @param delayErrors should the error propagation be delayed?
4040
*/
4141
public CompletableMergeConfig(boolean delayErrors) {
42-
this(delayErrors, 2);
42+
this(delayErrors, Flowable.bufferSize());
4343
}
4444

4545
/**

src/main/java/io/reactivex/rxjava4/core/config/MaybeConcatEagerConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.reactivex.rxjava4.core.config;
1515

16-
import io.reactivex.rxjava4.core.Flowable;
16+
import io.reactivex.rxjava4.core.*;
1717
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
1818

1919
/**
@@ -40,7 +40,7 @@ public record MaybeConcatEagerConfig(boolean delayError, int maxConcurrency, int
4040
* @param delayError should the error propagation be delayed?
4141
*/
4242
public MaybeConcatEagerConfig(boolean delayError) {
43-
this(delayError, 2, 2);
43+
this(delayError, Flowable.bufferSize(), Flowable.bufferSize());
4444
}
4545

4646
/**

src/main/java/io/reactivex/rxjava4/core/config/ObservableCombineLatestConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
public record ObservableCombineLatestConfig(boolean delayError, int bufferSize) {
2626

2727
/**
28-
* The default configuration with no error delays and prefetch of 2.
28+
* The default configuration with no error delays and bufferSize of Observable.bufferSize().
2929
*/
3030
public static final ObservableCombineLatestConfig DEFAULT = new ObservableCombineLatestConfig(false, Observable.bufferSize());
3131

3232
/**
33-
* The default configuration with error delays and prefetch of 2.
33+
* The default configuration with error delays and bufferSize of Observable.bufferSize().
3434
*/
3535
public static final ObservableCombineLatestConfig DELAY_ERROR = new ObservableCombineLatestConfig(true, Observable.bufferSize());
3636

@@ -56,6 +56,6 @@ public ObservableCombineLatestConfig(int bufferSize) {
5656
* @param bufferSize the expected number of row combination items to be buffered internally
5757
*/
5858
public ObservableCombineLatestConfig {
59-
ObjectHelper.verifyPositive(bufferSize, "prefetch");
59+
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
6060
}
6161
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import java.util.Objects;
17+
18+
import io.reactivex.rxjava4.core.Observable;
19+
import io.reactivex.rxjava4.functions.BiPredicate;
20+
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
21+
22+
/**
23+
* Configuration record for Observable.sequenceEqual() operators.
24+
* @param <T> the element type of the sequences being compared
25+
* @param bufferSize the expected number of items to cache from the inner {@code ObservableSource}s
26+
* @param isEqual the custom lambda to compare two elements
27+
* @since 4.0.0
28+
*/
29+
public record ObservableSequenceEqualConfig<T>(int bufferSize, BiPredicate<? super T, ? super T> isEqual) {
30+
31+
/**
32+
* The default configuration with bufferSize of Observable.bufferSize() and a default Objects.equals predicate.
33+
*/
34+
public static final ObservableSequenceEqualConfig<Object> DEFAULT =
35+
new ObservableSequenceEqualConfig<>(Observable.bufferSize(), ObjectHelper.equalsPredicate());
36+
37+
/**
38+
* Constructs a configuration record.
39+
* @param bufferSize the expected number of row combination items to be buffered internally
40+
*/
41+
public ObservableSequenceEqualConfig(int bufferSize) {
42+
this(bufferSize, ObjectHelper.equalsPredicate());
43+
}
44+
45+
/**
46+
* Constructs a configuration record.
47+
* @param isEqual the custom lambda to compare two elements
48+
*/
49+
public ObservableSequenceEqualConfig(BiPredicate<? super T, ? super T> isEqual) {
50+
this(Observable.bufferSize(), isEqual);
51+
}
52+
53+
/**
54+
* Constructs a configuration record.
55+
* @param bufferSize the expected number of items to cache from the inner {@code ObservableSource}s
56+
* @param isEqual the custom lambda to compare two elements
57+
*/
58+
public ObservableSequenceEqualConfig {
59+
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
60+
Objects.requireNonNull(isEqual, "isEqual is null");
61+
}
62+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import io.reactivex.rxjava4.core.Observable;
17+
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
18+
19+
/**
20+
* Configuration record for Observable.switchOnNext() operators.
21+
* @param delayError should the error propagation be delayed?
22+
* @param bufferSize the expected number of items to cache from the inner {@code ObservableSource}s
23+
* @since 4.0.0
24+
*/
25+
public record ObservableSwitchConfig(boolean delayError, int bufferSize) {
26+
27+
/**
28+
* The default configuration with no error delays and bufferSize of Observable.bufferSize().
29+
*/
30+
public static final ObservableSwitchConfig DEFAULT = new ObservableSwitchConfig(false, Observable.bufferSize());
31+
32+
/**
33+
* The default configuration with error delays and bufferSize of Observable.bufferSize().
34+
*/
35+
public static final ObservableSwitchConfig DELAY_ERROR = new ObservableSwitchConfig(true, Observable.bufferSize());
36+
37+
/**
38+
* Constructs a configuration record.
39+
* @param delayError should the error propagation be delayed?
40+
*/
41+
public ObservableSwitchConfig(boolean delayError) {
42+
this(delayError, Observable.bufferSize());
43+
}
44+
45+
/**
46+
* Constructs a configuration record.
47+
* @param bufferSize the expected number of row combination items to be buffered internally
48+
*/
49+
public ObservableSwitchConfig(int bufferSize) {
50+
this(false, bufferSize);
51+
}
52+
53+
/**
54+
* Constructs a configuration record.
55+
* @param delayError should the error propagation be delayed?
56+
* @param bufferSize the expected number of row combination items to be buffered internally
57+
*/
58+
public ObservableSwitchConfig {
59+
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
60+
}
61+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import io.reactivex.rxjava4.core.Observable;
17+
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
18+
19+
/**
20+
* Configuration record for Observable.zip() operators.
21+
* @param delayError should the error propagation be delayed?
22+
* @param bufferSize the expected number of items to cache from the inner {@code ObservableSource}s
23+
* @since 4.0.0
24+
*/
25+
public record ObservableZipConfig(boolean delayError, int bufferSize) {
26+
27+
/**
28+
* The default configuration with no error delays and bufferSize of Observable.bufferSize().
29+
*/
30+
public static final ObservableZipConfig DEFAULT = new ObservableZipConfig(false, Observable.bufferSize());
31+
32+
/**
33+
* The default configuration with error delays and bufferSize of Observable.bufferSize().
34+
*/
35+
public static final ObservableZipConfig DELAY_ERROR = new ObservableZipConfig(true, Observable.bufferSize());
36+
37+
/**
38+
* Constructs a configuration record.
39+
* @param delayError should the error propagation be delayed?
40+
*/
41+
public ObservableZipConfig(boolean delayError) {
42+
this(delayError, Observable.bufferSize());
43+
}
44+
45+
/**
46+
* Constructs a configuration record.
47+
* @param bufferSize the expected number of row combination items to be buffered internally
48+
*/
49+
public ObservableZipConfig(int bufferSize) {
50+
this(false, bufferSize);
51+
}
52+
53+
/**
54+
* Constructs a configuration record.
55+
* @param delayError should the error propagation be delayed?
56+
* @param bufferSize the expected number of row combination items to be buffered internally
57+
*/
58+
public ObservableZipConfig {
59+
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
60+
}
61+
}

src/main/java/io/reactivex/rxjava4/core/config/SingleConcatEagerConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
package io.reactivex.rxjava4.core.config;
1515

16-
import io.reactivex.rxjava4.core.Flowable;
16+
import io.reactivex.rxjava4.core.*;
1717
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
1818

1919
/**
@@ -40,7 +40,7 @@ public record SingleConcatEagerConfig(boolean delayError, int maxConcurrency, in
4040
* @param delayError should the error propagation be delayed?
4141
*/
4242
public SingleConcatEagerConfig(boolean delayError) {
43-
this(delayError, 2, 2);
43+
this(delayError, Flowable.bufferSize(), Flowable.bufferSize());
4444
}
4545

4646
/**

src/test/java/io/reactivex/rxjava4/core/config/ObservableCombineLatestConfigTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ public class ObservableCombineLatestConfigTest extends RxJavaTest {
2222

2323
@Test
2424
public void validation() {
25-
assertTrue(new ObservableCombineLatestConfig(true).delayError(), "delayError - true");
26-
assertFalse(new ObservableCombineLatestConfig(false).delayError(), "delayError - false");
27-
assertEquals(5, new ObservableCombineLatestConfig(5).bufferSize(), "bufferSize - 5");
28-
assertEquals(5, new ObservableCombineLatestConfig(true, 5).bufferSize(), "bufferSize both - true, 5");
29-
assertEquals(5, new ObservableCombineLatestConfig(false, 5).bufferSize(), "bufferSize both - false, 5");
30-
assertTrue(new ObservableCombineLatestConfig(true, 5).delayError(), "delayError both - true, 5");
31-
assertFalse(new ObservableCombineLatestConfig(false, 5).delayError(), "delayError both - false, 5");
25+
assertTrue(new ObservableSwitchConfig(true).delayError(), "delayError - true");
26+
assertFalse(new ObservableSwitchConfig(false).delayError(), "delayError - false");
27+
assertEquals(5, new ObservableSwitchConfig(5).bufferSize(), "bufferSize - 5");
28+
assertEquals(5, new ObservableSwitchConfig(true, 5).bufferSize(), "bufferSize both - true, 5");
29+
assertEquals(5, new ObservableSwitchConfig(false, 5).bufferSize(), "bufferSize both - false, 5");
30+
assertTrue(new ObservableSwitchConfig(true, 5).delayError(), "delayError both - true, 5");
31+
assertFalse(new ObservableSwitchConfig(false, 5).delayError(), "delayError both - false, 5");
3232
}
3333
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava4.core.config;
15+
16+
import static org.junit.jupiter.api.Assertions.assertEquals;
17+
18+
import java.util.Objects;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import io.reactivex.rxjava4.core.RxJavaTest;
23+
import io.reactivex.rxjava4.functions.BiPredicate;
24+
import io.reactivex.rxjava4.internal.functions.ObjectHelper;
25+
26+
public class ObservableSequenceEqualConfigTest extends RxJavaTest {
27+
28+
@Test
29+
public void validation() {
30+
BiPredicate<Object, Object> predicate = Objects::equals;
31+
32+
assertEquals(5, new ObservableSequenceEqualConfig<>(5).bufferSize(), "bufferSize - 5");
33+
assertEquals(ObjectHelper.equalsPredicate(), new ObservableSequenceEqualConfig<>(5).isEqual(), "isEqual - std");
34+
assertEquals(5, new ObservableSequenceEqualConfig<>(5, predicate).bufferSize(), "bufferSize - 5");
35+
assertEquals(predicate, new ObservableSequenceEqualConfig<>(5, predicate).isEqual(), "isEqual - custom");
36+
}
37+
}

0 commit comments

Comments
 (0)