Skip to content

Commit ebdb8a9

Browse files
authored
4.x: Java Migration; records, newer API, newer syntax (#8187)
1 parent 557a600 commit ebdb8a9

68 files changed

Lines changed: 865 additions & 1487 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/jmh/java/io/reactivex/rxjava4/core/InputWithIncrementingInteger.java

Lines changed: 25 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -44,57 +44,45 @@ public void onNext(Integer t) {
4444
}
4545
}
4646

47-
static final class IncrementingIterable implements Iterable<Integer> {
47+
record IncrementingIterable(int size) implements Iterable<Integer> {
4848

49-
final class IncrementingIterator implements Iterator<Integer> {
50-
int i;
49+
final class IncrementingIterator implements Iterator<Integer> {
50+
int i;
5151

52-
@Override
53-
public boolean hasNext() {
54-
return i < size;
55-
}
52+
@Override
53+
public boolean hasNext() {
54+
return i < size;
55+
}
5656

57-
@Override
58-
public Integer next() {
59-
Blackhole.consumeCPU(10);
60-
return i++;
61-
}
57+
@Override
58+
public Integer next() {
59+
Blackhole.consumeCPU(10);
60+
return i++;
61+
}
6262

63-
@Override
64-
public void remove() {
63+
@Override
64+
public void remove() {
6565

66+
}
6667
}
67-
}
68-
69-
final int size;
70-
71-
IncrementingIterable(int size) {
72-
this.size = size;
73-
}
7468

7569
@Override
76-
public Iterator<Integer> iterator() {
77-
return new IncrementingIterator();
70+
public Iterator<Integer> iterator() {
71+
return new IncrementingIterator();
72+
}
7873
}
79-
}
80-
81-
static final class IncrementingPublisher implements Publisher<Integer> {
8274

83-
final int size;
84-
85-
IncrementingPublisher(int size) {
86-
this.size = size;
87-
}
75+
record IncrementingPublisher(int size) implements Publisher<Integer> {
8876

8977
@Override
90-
public void subscribe(Subscriber<? super Integer> s) {
91-
s.onSubscribe(EmptySubscription.INSTANCE);
92-
for (int i = 0; i < size; i++) {
93-
s.onNext(i);
78+
public void subscribe(Subscriber<? super Integer> s) {
79+
s.onSubscribe(EmptySubscription.INSTANCE);
80+
for (int i = 0; i < size; i++) {
81+
s.onNext(i);
82+
}
83+
s.onComplete();
9484
}
95-
s.onComplete();
9685
}
97-
}
9886

9987
public Iterable<Integer> iterable;
10088
public Flowable<Integer> flowable;

src/jmh/java/io/reactivex/rxjava4/core/StrictPerf.java

Lines changed: 40 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -54,67 +54,51 @@ public void external(Blackhole bh) {
5454
source.subscribe(new ExternalConsumer(bh, cpu));
5555
}
5656

57-
static final class InternalConsumer implements FlowableSubscriber<Object> {
58-
final Blackhole bh;
59-
60-
final int cycles;
61-
62-
InternalConsumer(Blackhole bh, int cycles) {
63-
this.bh = bh;
64-
this.cycles = cycles;
65-
}
66-
67-
@Override
68-
public void onNext(Object t) {
69-
bh.consume(t);
70-
Blackhole.consumeCPU(cycles);
71-
}
72-
73-
@Override
74-
public void onError(Throwable t) {
75-
bh.consume(t);
76-
}
77-
78-
@Override
79-
public void onComplete() {
80-
bh.consume(true);
81-
}
82-
83-
@Override
84-
public void onSubscribe(Subscription s) {
85-
s.request(Long.MAX_VALUE);
86-
}
87-
}
88-
89-
static final class ExternalConsumer implements Subscriber<Object> {
90-
final Blackhole bh;
91-
92-
final int cycles;
93-
94-
ExternalConsumer(Blackhole bh, int cycles) {
95-
this.bh = bh;
96-
this.cycles = cycles;
97-
}
57+
record InternalConsumer(Blackhole bh, int cycles) implements FlowableSubscriber<Object> {
9858

9959
@Override
100-
public void onNext(Object t) {
101-
bh.consume(t);
102-
Blackhole.consumeCPU(cycles);
60+
public void onNext(Object t) {
61+
bh.consume(t);
62+
Blackhole.consumeCPU(cycles);
63+
}
64+
65+
@Override
66+
public void onError(Throwable t) {
67+
bh.consume(t);
68+
}
69+
70+
@Override
71+
public void onComplete() {
72+
bh.consume(true);
73+
}
74+
75+
@Override
76+
public void onSubscribe(Subscription s) {
77+
s.request(Long.MAX_VALUE);
78+
}
10379
}
10480

105-
@Override
106-
public void onError(Throwable t) {
107-
bh.consume(t);
108-
}
109-
110-
@Override
111-
public void onComplete() {
112-
bh.consume(true);
113-
}
81+
record ExternalConsumer(Blackhole bh, int cycles) implements Subscriber<Object> {
11482

11583
@Override
116-
public void onSubscribe(Subscription s) {
117-
s.request(Long.MAX_VALUE);
84+
public void onNext(Object t) {
85+
bh.consume(t);
86+
Blackhole.consumeCPU(cycles);
87+
}
88+
89+
@Override
90+
public void onError(Throwable t) {
91+
bh.consume(t);
92+
}
93+
94+
@Override
95+
public void onComplete() {
96+
bh.consume(true);
97+
}
98+
99+
@Override
100+
public void onSubscribe(Subscription s) {
101+
s.request(Long.MAX_VALUE);
102+
}
118103
}
119-
}
120104
}

src/main/java/io/reactivex/rxjava4/core/Flowable.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2467,18 +2467,13 @@ public static int bufferSize() {
24672467
Objects.requireNonNull(source, "source is null");
24682468
Objects.requireNonNull(strategy, "strategy is null");
24692469
Flowable<T> f = new FlowableFromObservable<>(source);
2470-
switch (strategy) {
2471-
case DROP:
2472-
return f.onBackpressureDrop();
2473-
case LATEST:
2474-
return f.onBackpressureLatest();
2475-
case MISSING:
2476-
return f;
2477-
case ERROR:
2478-
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<>(f));
2479-
default:
2480-
return f.onBackpressureBuffer();
2481-
}
2470+
return switch (strategy) {
2471+
case DROP -> f.onBackpressureDrop();
2472+
case LATEST -> f.onBackpressureLatest();
2473+
case MISSING -> f;
2474+
case ERROR -> RxJavaPlugins.onAssembly(new FlowableOnBackpressureError<>(f));
2475+
default -> f.onBackpressureBuffer();
2476+
};
24822477
}
24832478

24842479
/**

src/main/java/io/reactivex/rxjava4/core/Streamer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,10 @@ default void close() {
9393
*/
9494
default Streamer<T> finishVia(@NonNull DisposableContainer canceller) {
9595
Objects.requireNonNull(canceller, "canceller is null");
96-
if (this instanceof StreamerFinishViaDisposableContainerCanceller<T> augment) {
97-
if (augment.streamer == this && augment.canceller == canceller) {
96+
if (this instanceof StreamerFinishViaDisposableContainerCanceller<T>(
97+
Streamer<T> streamer, DisposableContainer canceller1
98+
)) {
99+
if (streamer == this && canceller1 == canceller) {
98100
// DO not rewrap!
99101
return this;
100102
}

src/main/java/io/reactivex/rxjava4/exceptions/CompositeException.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,11 @@ public CompositeException(@NonNull Iterable<? extends Throwable> errors) {
6464
Set<Throwable> deDupedExceptions = new LinkedHashSet<>();
6565
if (errors != null) {
6666
for (Throwable ex : errors) {
67-
if (ex instanceof CompositeException) {
68-
deDupedExceptions.addAll(((CompositeException) ex).getExceptions());
69-
} else
70-
if (ex != null) {
71-
deDupedExceptions.add(ex);
67+
if (ex instanceof CompositeException ce) {
68+
deDupedExceptions.addAll(ce.getExceptions());
7269
} else {
73-
deDupedExceptions.add(new NullPointerException("Throwable was null!"));
70+
deDupedExceptions.add(Objects.requireNonNullElseGet(ex,
71+
() -> new NullPointerException("Throwable was null!")));
7472
}
7573
}
7674
} else {

0 commit comments

Comments
 (0)