Skip to content

Commit d8e5803

Browse files
authored
4.x: Cleanup, fix inferable types via var or diamond (#8189)
1 parent aeb5464 commit d8e5803

251 files changed

Lines changed: 1229 additions & 1162 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/main/java/io/reactivex/rxjava4/core/Streamable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source) {
115115
@NonNull
116116
static <T> Streamable<T> fromPublisher(@NonNull Flow.Publisher<T> source, @NonNull ExecutorService executor) {
117117
Objects.requireNonNull(source, "source is null");
118-
return new StreamableFromPublisher<T>(source, executor);
118+
return new StreamableFromPublisher<>(source, executor);
119119
}
120120

121121
/**
@@ -357,7 +357,7 @@ default CompletionStageDisposable<Void> forEach(@NonNull Consumer<? super T> con
357357
return null;
358358
});
359359
canceller.add(Disposable.fromFuture(future));
360-
return new CompletionStageDisposable<Void>(StreamableHelper.toCompletionStage((Future<Void>)(Future<?>)future), canceller);
360+
return new CompletionStageDisposable<>(StreamableHelper.toCompletionStage((Future<Void>) (Future<?>) future), canceller);
361361
}
362362

363363
/**
@@ -407,8 +407,8 @@ default CompletionStageDisposable<Void> forEach(
407407
return null;
408408
});
409409
canceller.add(Disposable.fromFuture(future));
410-
return new CompletionStageDisposable<Void>(
411-
StreamableHelper.toCompletionStage((Future<Void>)(Future<?>)future), canceller);
410+
return new CompletionStageDisposable<>(
411+
StreamableHelper.toCompletionStage((Future<Void>) (Future<?>) future), canceller);
412412
}
413413

414414
/**

src/main/java/io/reactivex/rxjava4/internal/operators/flowable/FlowableCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public FlowableCache(Flowable<T> source, int capacityHint) {
103103
Node<T> n = new Node<>(capacityHint);
104104
this.head = n;
105105
this.tail = n;
106-
this.subscribers = new AtomicReference<>(EMPTY);
106+
this.subscribers = new AtomicReference<CacheSubscription<T>[]>(EMPTY);
107107
}
108108

109109
@Override

src/main/java/io/reactivex/rxjava4/internal/operators/flowable/FlowablePublish.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ static final class PublishConnection<T>
179179
this.upstream = new AtomicReference<>();
180180
this.connect = new AtomicBoolean();
181181
this.bufferSize = bufferSize;
182-
this.subscribers = new AtomicReference<>(EMPTY);
182+
this.subscribers = new AtomicReference<InnerSubscription<T>[]>(EMPTY);
183183
}
184184

185185
@SuppressWarnings("unchecked")

src/main/java/io/reactivex/rxjava4/internal/operators/flowable/FlowablePublishMulticast.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ static final class MulticastProcessor<T> extends Flowable<T> implements Flowable
158158
this.delayError = delayError;
159159
this.wip = new AtomicInteger();
160160
this.upstream = new AtomicReference<>();
161-
this.subscribers = new AtomicReference<>(EMPTY);
161+
this.subscribers = new AtomicReference<MulticastSubscription<T>[]>(EMPTY);
162162
}
163163

164164
@Override

src/main/java/io/reactivex/rxjava4/internal/operators/maybe/MaybeCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public final class MaybeCache<T> extends Maybe<T> implements MaybeObserver<T> {
4343
@SuppressWarnings("unchecked")
4444
public MaybeCache(MaybeSource<T> source) {
4545
this.source = new AtomicReference<>(source);
46-
this.observers = new AtomicReference<>(EMPTY);
46+
this.observers = new AtomicReference<CacheDisposable<T>[]>(EMPTY);
4747
}
4848

4949
@Override

src/main/java/io/reactivex/rxjava4/internal/operators/single/SingleCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public final class SingleCache<T> extends Single<T> implements SingleObserver<T>
4040
public SingleCache(SingleSource<? extends T> source) {
4141
this.source = source;
4242
this.wip = new AtomicInteger();
43-
this.observers = new AtomicReference<>(EMPTY);
43+
this.observers = new AtomicReference<CacheDisposable<T>[]>(EMPTY);
4444
}
4545

4646
@Override

src/main/java/io/reactivex/rxjava4/internal/operators/single/SingleOnErrorComplete.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,6 @@ public SingleOnErrorComplete(Single<T> source,
3838

3939
@Override
4040
protected void subscribeActual(MaybeObserver<? super T> observer) {
41-
source.subscribe(new MaybeOnErrorComplete.OnErrorCompleteMultiObserver<T>(observer, predicate));
41+
source.subscribe(new MaybeOnErrorComplete.OnErrorCompleteMultiObserver<>(observer, predicate));
4242
}
4343
}

src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableEmpty.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public final class StreamableEmpty<T> implements Streamable<T> {
2424

2525
@Override
2626
public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) {
27-
return new EmptyStreamer<T>();
27+
return new EmptyStreamer<>();
2828
}
2929

3030
static final class EmptyStreamer<T> implements Streamer<T> {

src/main/java/io/reactivex/rxjava4/internal/operators/streamable/StreamableJust.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public StreamableJust(T item) {
2828

2929
@Override
3030
public @NonNull Streamer<@NonNull T> stream(@NonNull DisposableContainer cancellation) {
31-
return new JustStreamer<T>(item, cancellation);
31+
return new JustStreamer<>(item, cancellation);
3232
}
3333

3434
static final class JustStreamer<T> implements Streamer<T>, Disposable {

src/main/java/io/reactivex/rxjava4/processors/AsyncProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public static <T> AsyncProcessor<T> create() {
148148
*/
149149
@SuppressWarnings("unchecked")
150150
AsyncProcessor() {
151-
this.subscribers = new AtomicReference<>(EMPTY);
151+
this.subscribers = new AtomicReference<AsyncSubscription<T>[]>(EMPTY);
152152
}
153153

154154
@Override

0 commit comments

Comments
 (0)