Skip to content

Commit d6d7b42

Browse files
authored
IGNITE-28076 Use watch executor in SchemaSafeTimeTrackerImpl (#7729)
1 parent 3acbf3e commit d6d7b42

11 files changed

Lines changed: 73 additions & 10 deletions

File tree

modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1505,7 +1505,7 @@ private class Node {
15051505

15061506
schemaManager = new SchemaManager(registry, catalogManager);
15071507

1508-
schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime());
1508+
schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime(), metaStorageManager.watchExecutor());
15091509
metaStorageManager.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
15101510

15111511
schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);

modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@
128128
* <li>Providing corresponding Meta storage service proxy interface</li>
129129
* </ul>
130130
*/
131-
public class MetaStorageManagerImpl implements MetaStorageManager, MetastorageGroupMaintenance {
131+
public class MetaStorageManagerImpl implements MetaStorageManager, MetastorageGroupMaintenance, WatchProcessorAccess {
132132
private static final IgniteLogger LOG = Loggers.forClass(MetaStorageManagerImpl.class);
133133

134134
private final ClusterService clusterService;
@@ -1409,4 +1409,9 @@ CompletableFuture<Void> sendCompactionCommand(long compactionRevision) {
14091409
public void markAsStopping() {
14101410
metaStorageSvcFut.thenAccept(MetaStorageServiceImpl::markAsStopping);
14111411
}
1412+
1413+
@Override
1414+
public Executor watchExecutor() {
1415+
return storage.watchExecutor();
1416+
}
14121417
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.metastorage.impl;
19+
20+
import java.util.concurrent.Executor;
21+
22+
/**
23+
* Provides access to watch processing required by internal components.
24+
*/
25+
public interface WatchProcessorAccess {
26+
/**
27+
* Returns executor for watch processing.
28+
*/
29+
Executor watchExecutor();
30+
}

modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.TreeSet;
3232
import java.util.concurrent.CompletableFuture;
3333
import java.util.concurrent.CopyOnWriteArrayList;
34+
import java.util.concurrent.Executor;
3435
import java.util.concurrent.atomic.AtomicBoolean;
3536
import java.util.function.Predicate;
3637
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -396,4 +397,9 @@ protected void drainNotifyWatchProcessorEventsBeforeStartingWatches() {
396397
notifyWatchProcessorEventsBeforeStartingWatches = null;
397398
}
398399
}
400+
401+
@Override
402+
public Executor watchExecutor() {
403+
return watchProcessor.watchExecutor();
404+
}
399405
}

modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Collection;
2222
import java.util.List;
2323
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.Executor;
2425
import org.apache.ignite.internal.close.ManuallyCloseable;
2526
import org.apache.ignite.internal.hlc.HybridTimestamp;
2627
import org.apache.ignite.internal.metastorage.CommandId;
@@ -558,4 +559,9 @@ boolean invoke(
558559
* @return Future that's completed when flushing of the data is completed.
559560
*/
560561
CompletableFuture<Void> flush();
562+
563+
/**
564+
* Returns executor used to execute watches.
565+
*/
566+
Executor watchExecutor();
561567
}

modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Set;
4040
import java.util.concurrent.CompletableFuture;
4141
import java.util.concurrent.CopyOnWriteArrayList;
42+
import java.util.concurrent.Executor;
4243
import java.util.concurrent.ExecutorService;
4344
import java.util.concurrent.LinkedBlockingQueue;
4445
import java.util.concurrent.ThreadFactory;
@@ -513,4 +514,8 @@ private static boolean isNotIdempotentCacheCommand(Entry entry) {
513514
IDEMPOTENT_COMMAND_PREFIX_BYTES, 0, prefixLength
514515
);
515516
}
517+
518+
Executor watchExecutor() {
519+
return watchExecutor;
520+
}
516521
}

modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,7 @@ public CompletableFuture<Boolean> invoke(Condition condition, Operation success,
639639

640640
volatileLogStorageManagerCreator = new VolatileLogStorageManagerCreator(name, workDir.resolve("volatile-log-spillout-" + name));
641641

642-
schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime());
642+
schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime(), metaStorageManager.watchExecutor());
643643
metaStorageManager.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
644644

645645
LongSupplier delayDurationMsSupplier = () -> DELAY_DURATION_MS;

modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ public CompletableFuture<Boolean> invoke(Condition condition, List<Operation> su
602602
zoneId -> completedFuture(Set.of())
603603
);
604604

605-
var schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
605+
var schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime(), metaStorageMgr.watchExecutor());
606606
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
607607

608608
LongSupplier delayDurationMsSupplier = () -> TestIgnitionManager.DEFAULT_DELAY_DURATION_MS;

modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -935,7 +935,7 @@ public class IgniteImpl implements Ignite {
935935
volatileLogStorageManagerCreator
936936
);
937937

938-
schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
938+
schemaSafeTimeTracker = new SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime(), metaStorageMgr.watchExecutor());
939939
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
940940

941941
SchemaSyncService schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);

modules/runner/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImpl.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Arrays;
2323
import java.util.List;
2424
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.Executor;
2526
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
2627
import org.apache.ignite.internal.hlc.HybridTimestamp;
2728
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -39,15 +40,18 @@
3940
public class SchemaSafeTimeTrackerImpl implements SchemaSafeTimeTracker, IgniteComponent, NotificationEnqueuedListener {
4041
private final ClusterTime clusterTime;
4142

43+
private final Executor watchExecutor;
44+
4245
private final PendingComparableValuesTracker<HybridTimestamp, Void> schemaSafeTime =
4346
new PendingComparableValuesTracker<>(HybridTimestamp.MIN_VALUE);
4447

4548
private CompletableFuture<Void> schemaSafeTimeUpdateFuture = nullCompletedFuture();
4649

4750
private final Object futureMutex = new Object();
4851

49-
public SchemaSafeTimeTrackerImpl(ClusterTime clusterTime) {
52+
public SchemaSafeTimeTrackerImpl(ClusterTime clusterTime, Executor watchExecutor) {
5053
this.clusterTime = clusterTime;
54+
this.watchExecutor = watchExecutor;
5155
}
5256

5357
@Override
@@ -74,16 +78,16 @@ public void onEnqueued(CompletableFuture<Void> newNotificationFuture, List<Entry
7478
// The update touches the Catalog (i.e. schemas), so we must chain with the core notification future
7579
// as Catalog listeners will be included in it (because we need to wait for those listeners to finish execution
7680
// before updating the schema safe time).
77-
newSchemaSafeTimeUpdateFuture = schemaSafeTimeUpdateFuture.thenCompose(unused -> newNotificationFuture);
81+
newSchemaSafeTimeUpdateFuture = schemaSafeTimeUpdateFuture.thenComposeAsync(unused -> newNotificationFuture, watchExecutor);
7882
} else {
7983
// The update does not concern the Catalog (schemas), so we can update schema safe time as soon as previous updates to it
8084
// get applied.
8185
newSchemaSafeTimeUpdateFuture = schemaSafeTimeUpdateFuture;
8286
}
8387

84-
newSchemaSafeTimeUpdateFuture = newSchemaSafeTimeUpdateFuture.thenRun(() -> {
88+
newSchemaSafeTimeUpdateFuture = newSchemaSafeTimeUpdateFuture.thenRunAsync(() -> {
8589
schemaSafeTime.update(timestamp, null);
86-
});
90+
}, watchExecutor);
8791

8892
schemaSafeTimeUpdateFuture = newSchemaSafeTimeUpdateFuture;
8993
}

0 commit comments

Comments
 (0)