Skip to content

Commit 9b63f1e

Browse files
committed
IGNITE-28681 Introduce dedicated method for obtaining queued elements.
1 parent c778688 commit 9b63f1e

3 files changed

Lines changed: 20 additions & 25 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ public static Object rebalanceTopic(int idx) {
851851
// Finish all exchange futures.
852852
ExchangeFutureSet exchFuts0 = exchFuts;
853853

854-
for (CachePartitionExchangeWorkerTask task : exchWorker) {
854+
for (CachePartitionExchangeWorkerTask task : exchWorker.queuedElements()) {
855855
if (task instanceof GridDhtPartitionsExchangeFuture)
856856
((GridDhtPartitionsExchangeFuture)task).onDone(stopErr);
857857
}
@@ -2445,7 +2445,7 @@ public boolean mergeExchanges(final GridDhtPartitionsExchangeFuture curFut, Grid
24452445
if (exchWorker.waitForExchangeFuture(resVer))
24462446
return true;
24472447

2448-
for (CachePartitionExchangeWorkerTask task : exchWorker) {
2448+
for (CachePartitionExchangeWorkerTask task : exchWorker.queuedElements()) {
24492449
if (task instanceof GridDhtPartitionsExchangeFuture) {
24502450
GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
24512451

@@ -2525,7 +2525,7 @@ public boolean mergeExchangesOnCoordinator(
25252525
synchronized (curFut.mutex()) {
25262526
int awaited = 0;
25272527

2528-
for (CachePartitionExchangeWorkerTask task : exchWorker) {
2528+
for (CachePartitionExchangeWorkerTask task : exchWorker.queuedElements()) {
25292529
if (task instanceof GridDhtPartitionsExchangeFuture) {
25302530
GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
25312531

@@ -2624,7 +2624,7 @@ private void waitForTestVersion(AffinityTopologyVersion exchMergeTestWaitVer, Gr
26242624
while (U.currentTimeMillis() < end) {
26252625
boolean found = false;
26262626

2627-
for (CachePartitionExchangeWorkerTask task : exchWorker) {
2627+
for (CachePartitionExchangeWorkerTask task : exchWorker.queuedElements()) {
26282628
if (task instanceof GridDhtPartitionsExchangeFuture) {
26292629
GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)task;
26302630

@@ -2864,7 +2864,7 @@ private void removeMergedFutures(AffinityTopologyVersion resVer, GridDhtPartitio
28642864
if (resVer.compareTo(exchFut.initialVersion()) != 0) {
28652865
waitForExchangeFuture(resVer);
28662866

2867-
for (CachePartitionExchangeWorkerTask task : this) {
2867+
for (CachePartitionExchangeWorkerTask task : queuedElements()) {
28682868
if (task instanceof GridDhtPartitionsExchangeFuture) {
28692869
GridDhtPartitionsExchangeFuture fut0 = (GridDhtPartitionsExchangeFuture)task;
28702870

@@ -2914,7 +2914,7 @@ void processCustomTask(CachePartitionExchangeWorkerTask task) {
29142914
*/
29152915
boolean hasPendingExchange() {
29162916
if (!isQueueEmpty()) {
2917-
for (CachePartitionExchangeWorkerTask task : this) {
2917+
for (CachePartitionExchangeWorkerTask task : queuedElements()) {
29182918
if (isExchangeTask(task))
29192919
return true;
29202920
}
@@ -2928,7 +2928,7 @@ boolean hasPendingExchange() {
29282928
*/
29292929
boolean hasPendingServerExchange() {
29302930
if (!isQueueEmpty()) {
2931-
for (CachePartitionExchangeWorkerTask task : this) {
2931+
for (CachePartitionExchangeWorkerTask task : queuedElements()) {
29322932
if (task instanceof GridDhtPartitionsExchangeFuture) {
29332933
if (((GridDhtPartitionsExchangeFuture)task).changedAffinity())
29342934
return true;
@@ -2948,7 +2948,7 @@ void dumpExchangeDebugInfo() {
29482948
if (DIAGNOSTIC_WARN_LIMIT > 0) {
29492949
int cnt = 0;
29502950

2951-
for (CachePartitionExchangeWorkerTask task : this) {
2951+
for (CachePartitionExchangeWorkerTask task : queuedElements()) {
29522952
if (task instanceof GridDhtPartitionsExchangeFuture) {
29532953
U.warn(log, ">>> " + ((GridDhtPartitionsExchangeFuture)task).shortInfo());
29542954

modules/core/src/main/java/org/apache/ignite/internal/util/worker/AsynchronousQueueProcessor.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.ignite.internal.thread.context.OperationContext;
2727
import org.apache.ignite.internal.thread.context.OperationContextSnapshot;
2828
import org.apache.ignite.internal.thread.context.function.OperationContextAwareWrapper;
29+
import org.apache.ignite.internal.util.typedef.F;
2930
import org.apache.ignite.internal.worker.WorkersRegistry;
3031
import org.apache.ignite.thread.IgniteThread;
3132
import org.jetbrains.annotations.NotNull;
@@ -41,7 +42,7 @@
4142
* @param <T> Type of items to be processed.
4243
* @param <W> Type of wrapper over processing item that are stored in the underlying queue.
4344
*/
44-
public abstract class AsynchronousQueueProcessor<T, W extends OperationContextAwareWrapper<T>> extends GridWorker implements Iterable<T> {
45+
public abstract class AsynchronousQueueProcessor<T, W extends OperationContextAwareWrapper<T>> extends GridWorker {
4546
/** */
4647
private final BlockingQueue<W> workerQueue;
4748

@@ -137,6 +138,15 @@ public boolean removeQueuedElement(Object o) {
137138
return workerQueue.removeIf(w -> o.equals(w.delegate()));
138139
}
139140

141+
/** */
142+
@NotNull public Iterable<T> queuedElements() {
143+
return new Iterable<>() {
144+
@Override public @NotNull Iterator<T> iterator() {
145+
return F.iterator(workerQueue, OperationContextAwareWrapper::delegate, true);
146+
}
147+
};
148+
}
149+
140150
/** */
141151
public void clearQueue() {
142152
workerQueue.clear();
@@ -152,21 +162,6 @@ public boolean isQueueEmpty() {
152162
return workerQueue.isEmpty();
153163
}
154164

155-
/** {@inheritDoc} */
156-
@Override public @NotNull Iterator<T> iterator() {
157-
Iterator<W> iter = workerQueue.iterator();
158-
159-
return new Iterator<>() {
160-
@Override public boolean hasNext() {
161-
return iter.hasNext();
162-
}
163-
164-
@Override public T next() {
165-
return iter.next().delegate();
166-
}
167-
};
168-
}
169-
170165
/** */
171166
public void drainQueue(Consumer<? super T> consumer) {
172167
W element;

modules/core/src/test/java/org/apache/ignite/cache/affinity/PendingExchangeTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ private void waitForExchnagesBegin(GridCachePartitionExchangeManager exchangeMan
264264
assertTrue(GridTestUtils.waitForCondition(() -> {
265265
int exFuts = 0;
266266

267-
for (CachePartitionExchangeWorkerTask task : exchWorker) {
267+
for (CachePartitionExchangeWorkerTask task : exchWorker.queuedElements()) {
268268
if (task instanceof GridDhtPartitionsExchangeFuture)
269269
exFuts++;
270270
}

0 commit comments

Comments
 (0)