Skip to content

Commit aeb5464

Browse files
authored
4.x: Cleanup - Migration; lambdas expressions (#8188)
* 4.x: Cleanup - Migration; lambdas expressions * Fix message format error accidentally introduced in test
1 parent ebdb8a9 commit aeb5464

31 files changed

Lines changed: 249 additions & 380 deletions

src/main/java/io/reactivex/rxjava4/core/Streamable.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,7 @@ default Flowable<T> toFlowable() {
238238
default Flowable<T> toFlowable(@NonNull ExecutorService executor) {
239239
Objects.requireNonNull(executor, "executir is null");
240240
var me = this;
241-
return Flowable.virtualCreate(emitter -> {
242-
me.forEach(emitter::emit).await(emitter.canceller());
243-
}, executor);
241+
return Flowable.virtualCreate(emitter -> me.forEach(emitter::emit).await(emitter.canceller()), executor);
244242
}
245243

246244
/**
@@ -269,13 +267,11 @@ default Flowable<T> toFlowable(@NonNull ExecutorService executor) {
269267
Objects.requireNonNull(transformer, "transformer is null");
270268
Objects.requireNonNull(executor, "executor is null");
271269
var me = this;
272-
return create(emitter -> {
273-
me.forEach((item, stopper) -> {
274-
// System.out.println("item " + item);
275-
transformer.transform(item, emitter, stopper);
276-
}, emitter.canceller(), executor)
277-
.await(emitter.canceller());
278-
}, executor);
270+
return create(emitter -> me.forEach((item, stopper) -> {
271+
// System.out.println("item " + item);
272+
transformer.transform(item, emitter, stopper);
273+
}, emitter.canceller(), executor)
274+
.await(emitter.canceller()), executor);
279275
}
280276

281277
// oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo

src/main/java/io/reactivex/rxjava4/internal/operators/parallel/ParallelJoin.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,6 @@ void drainLoop() {
216216
int missed = 1;
217217

218218
JoinInnerSubscriber<T>[] s = this.subscribers;
219-
int n = s.length;
220219
Subscriber<? super T> a = this.downstream;
221220

222221
for (;;) {
@@ -387,7 +386,6 @@ void drainLoop() {
387386
int missed = 1;
388387

389388
JoinInnerSubscriber<T>[] s = this.subscribers;
390-
int n = s.length;
391389
Subscriber<? super T> a = this.downstream;
392390

393391
for (;;) {

src/main/java/io/reactivex/rxjava4/internal/util/AwaitCoordinatorStatic.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,7 @@ static <T> T awaitFirst(Iterable<? extends CompletionStage<? extends T>> stages,
122122
var winner = new CompletableFuture<CompletionStage<? extends T>>();
123123

124124
for (var stage : stages) {
125-
stage.whenComplete((_, _) -> {
126-
winner.complete(stage);
127-
});
125+
stage.whenComplete((_, _) -> winner.complete(stage));
128126
}
129127
return await(winner, canceller).toCompletableFuture().getNow(null);
130128
}
@@ -144,9 +142,7 @@ static <T> CompletionStage<T> awaitFirstStage(
144142
var winner = new CompletableFuture<CompletionStage<? extends T>>();
145143

146144
for (var stage : stages) {
147-
stage.whenComplete((_, _) -> {
148-
winner.complete(stage);
149-
});
145+
stage.whenComplete((_, _) -> winner.complete(stage));
150146
}
151147
return (CompletionStage<T>)await(winner, canceller).toCompletableFuture().getNow(null);
152148
}
@@ -167,9 +163,7 @@ static <T> int awaitFirstIndex(
167163
for (int i = 0; i < stages.size(); i++) {
168164
var stage = stages.get(i);
169165
var fi = i;
170-
stage.whenComplete((_, _) -> {
171-
winner.complete(fi);
172-
});
166+
stage.whenComplete((_, _) -> winner.complete(fi));
173167
}
174168
return await(winner, canceller);
175169
}

src/test/java/io/reactivex/rxjava4/internal/observers/FutureSingleObserverTest.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,7 @@ public void cancel() {
4747
fail("Should have thrown!");
4848
} catch (CancellationException ex) {
4949
// expected
50-
} catch (InterruptedException ex) {
51-
throw new AssertionError(ex);
52-
} catch (ExecutionException ex) {
50+
} catch (InterruptedException | ExecutionException ex) {
5351
throw new AssertionError(ex);
5452
}
5553

@@ -58,11 +56,7 @@ public void cancel() {
5856
fail("Should have thrown!");
5957
} catch (CancellationException ex) {
6058
// expected
61-
} catch (InterruptedException ex) {
62-
throw new AssertionError(ex);
63-
} catch (ExecutionException ex) {
64-
throw new AssertionError(ex);
65-
} catch (TimeoutException ex) {
59+
} catch (InterruptedException | ExecutionException | TimeoutException ex) {
6660
throw new AssertionError(ex);
6761
}
6862
}

src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableGroupByTest.java

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1736,11 +1736,11 @@ public void delayErrorCompleteMoreWorkInGroup() {
17361736
public void groupSyncFusionRejected() {
17371737
Flowable.just(1)
17381738
.groupBy(_ -> 1)
1739-
.doOnNext(g -> {
1740-
g.subscribeWith(new TestSubscriberEx<Integer>().setInitialFusionMode(QueueFuseable.SYNC))
1741-
.assertFuseable()
1742-
.assertFusionMode(QueueFuseable.NONE);
1743-
})
1739+
.doOnNext(g ->
1740+
g.subscribeWith(new TestSubscriberEx<Integer>()
1741+
.setInitialFusionMode(QueueFuseable.SYNC))
1742+
.assertFuseable()
1743+
.assertFusionMode(QueueFuseable.NONE))
17441744
.test()
17451745
.assertComplete();
17461746
}
@@ -1755,11 +1755,7 @@ public void subscribeAbandonRace() throws Throwable {
17551755
CountDownLatch cdl = new CountDownLatch(1);
17561756

17571757
pp.groupBy(_ -> 1)
1758-
.doOnNext(g -> {
1759-
TestHelper.raceOther(() -> {
1760-
g.subscribe(ts);
1761-
}, cdl);
1762-
})
1758+
.doOnNext(g -> TestHelper.raceOther(() -> g.subscribe(ts), cdl))
17631759
.test();
17641760

17651761
pp.onNext(1);
@@ -1895,9 +1891,7 @@ public void issue6974Part2Case1ObserveOn() {
18951891
Flowable
18961892
.range(1, 500_000)
18971893
.map(i -> i % groups)
1898-
.doOnCancel(() -> {
1899-
System.out.println("Cancelling upstream");
1900-
})
1894+
.doOnCancel(() -> System.out.println("Cancelling upstream"))
19011895
.groupBy(i -> i, i -> i, false, groupByBufferSize,
19021896
sizeCap(groups * 2, notifyOnExplicitEviction))
19031897
.flatMap(gf -> gf
@@ -1946,9 +1940,7 @@ public void issue6974Part2Case1ObserveOnNoCap() {
19461940
Flowable
19471941
.range(1, 500_000)
19481942
.map(i -> i % groups)
1949-
.doOnRequest(v -> {
1950-
System.out.println("Source: " + v);
1951-
})
1943+
.doOnRequest(v -> System.out.println("Source: " + v))
19521944
.groupBy(i -> i)
19531945
.flatMap(gf -> gf
19541946
.observeOn(Schedulers.computation())
@@ -1971,9 +1963,7 @@ public void issue6974Part2Case1ObserveOnNoCapHide() {
19711963
Flowable
19721964
.range(1, 500_000)
19731965
.map(i -> i % groups)
1974-
.doOnRequest(v -> {
1975-
System.out.println("Source: " + v);
1976-
})
1966+
.doOnRequest(v -> System.out.println("Source: " + v))
19771967
.groupBy(i -> i)
19781968
.flatMap(gf -> gf
19791969
.hide()

src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableGroupJoinTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -551,9 +551,7 @@ public void disposeAfterOnNext() {
551551
TestSubscriber<Integer> ts = new TestSubscriber<>();
552552

553553
pp1.groupJoin(pp2, _ -> Flowable.never(), _ -> Flowable.never(), (a, _) -> a)
554-
.doOnNext(_ -> {
555-
ts.cancel();
556-
})
554+
.doOnNext(_ -> ts.cancel())
557555
.subscribe(ts);
558556

559557
pp2.onNext(1);

src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableSwitchTest.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -947,9 +947,7 @@ public void fusedBoundary() {
947947
Flowable.range(1, 10000)
948948
.switchMap((Function<Integer, Flowable<Object>>) _ -> Flowable.just(2).hide()
949949
.observeOn(Schedulers.single())
950-
.map((Function<Integer, Object>) _ -> {
951-
return Thread.currentThread().getName();
952-
}))
950+
.map((Function<Integer, Object>) _ -> Thread.currentThread().getName()))
953951
.to(TestHelper.<Object>testConsumer())
954952
.awaitDone(5, TimeUnit.SECONDS)
955953
.assertNever(thread)
@@ -1110,9 +1108,7 @@ public void cancellationShouldTriggerInnerCancellationRace() throws Throwable {
11101108

11111109
int n = 10_000;
11121110
for (int i = 0; i < n; i++) {
1113-
Flowable.<Integer>create(it -> {
1114-
it.onNext(0);
1115-
}, BackpressureStrategy.MISSING)
1111+
Flowable.<Integer>create(it -> it.onNext(0), BackpressureStrategy.MISSING)
11161112
.switchMap(_ -> createFlowable(inner))
11171113
.observeOn(Schedulers.computation())
11181114
.doFinally(outer::incrementAndGet)
@@ -1129,12 +1125,8 @@ Flowable<Integer> createFlowable(AtomicInteger inner) {
11291125
return Flowable.<Integer>unsafeCreate(s -> {
11301126
SerializedSubscriber<Integer> it = new SerializedSubscriber<>(s);
11311127
it.onSubscribe(new BooleanSubscription());
1132-
Schedulers.cached().scheduleDirect(() -> {
1133-
it.onNext(1);
1134-
}, 0, TimeUnit.MILLISECONDS);
1135-
Schedulers.cached().scheduleDirect(() -> {
1136-
it.onNext(2);
1137-
}, 0, TimeUnit.MILLISECONDS);
1128+
Schedulers.cached().scheduleDirect(() -> it.onNext(1), 0, TimeUnit.MILLISECONDS);
1129+
Schedulers.cached().scheduleDirect(() -> it.onNext(2), 0, TimeUnit.MILLISECONDS);
11381130
})
11391131
.doFinally(inner::incrementAndGet);
11401132
}

src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableToListTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,7 @@ public void listWithBlockingFirst() {
174174
static void await(CyclicBarrier cb) {
175175
try {
176176
cb.await();
177-
} catch (InterruptedException ex) {
178-
ex.printStackTrace();
179-
} catch (BrokenBarrierException ex) {
177+
} catch (InterruptedException | BrokenBarrierException ex) {
180178
ex.printStackTrace();
181179
}
182180
}

src/test/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableToSortedListTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,7 @@ public void withFollowingFirst() {
145145
static void await(CyclicBarrier cb) {
146146
try {
147147
cb.await();
148-
} catch (InterruptedException ex) {
149-
ex.printStackTrace();
150-
} catch (BrokenBarrierException ex) {
148+
} catch (InterruptedException | BrokenBarrierException ex) {
151149
ex.printStackTrace();
152150
}
153151
}

src/test/java/io/reactivex/rxjava4/internal/operators/observable/ObservableCreateTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -586,9 +586,7 @@ public void serializedEnqueueAndDrainRace() throws Throwable {
586586
})
587587
.doOnNext(v -> {
588588
if (v == 1) {
589-
TestHelper.raceOther(() -> {
590-
ref.get().onNext(2);
591-
}, cdl);
589+
TestHelper.raceOther(() -> ref.get().onNext(2), cdl);
592590
ref.get().onNext(3);
593591
}
594592
})

0 commit comments

Comments
 (0)