Skip to content

Commit ebedbb7

Browse files
authored
IGNITE-28096 Extract TableImplFactory from TableManager (#7898)
1 parent c91c153 commit ebedbb7

12 files changed

Lines changed: 585 additions & 231 deletions

File tree

modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@
232232
import org.apache.ignite.internal.table.StreamerReceiverRunner;
233233
import org.apache.ignite.internal.table.TableTestUtils;
234234
import org.apache.ignite.internal.table.TableViewInternal;
235+
import org.apache.ignite.internal.table.distributed.DefaultMvTableStorageFactory;
235236
import org.apache.ignite.internal.table.distributed.TableManager;
236237
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
237238
import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -1595,7 +1596,8 @@ private class Node {
15951596
minTimeCollectorService,
15961597
systemDistributedConfiguration,
15971598
metricManager,
1598-
TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
1599+
TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY,
1600+
new DefaultMvTableStorageFactory(dataStorageMgr, catalogManager, lowWatermark)
15991601
);
16001602

16011603
tableManager.setStreamerReceiverRunner(mock(StreamerReceiverRunner.class));

modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@
5757
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
5858
import org.apache.ignite.internal.catalog.CatalogTestUtils;
5959
import org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner;
60-
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
61-
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
6260
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
6361
import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
6462
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
@@ -169,6 +167,8 @@
169167
import org.apache.ignite.internal.systemview.api.SystemViewManager;
170168
import org.apache.ignite.internal.table.StreamerReceiverRunner;
171169
import org.apache.ignite.internal.table.TableTestUtils;
170+
import org.apache.ignite.internal.table.distributed.DefaultMvTableStorageFactory;
171+
import org.apache.ignite.internal.table.distributed.MvTableStorageFactory;
172172
import org.apache.ignite.internal.table.distributed.TableManager;
173173
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
174174
import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -800,30 +800,9 @@ public CompletableFuture<Boolean> invoke(Condition condition, Operation success,
800800
minTimeCollectorService,
801801
systemDistributedConfiguration,
802802
metricManager,
803-
TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
804-
) {
805-
806-
@Override
807-
protected MvTableStorage createTableStorage(CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
808-
MvTableStorage storage = createSpy(super.createTableStorage(tableDescriptor, zoneDescriptor));
809-
810-
var partitionStorages = new ConcurrentHashMap<Integer, MvPartitionStorage>();
811-
812-
doAnswer(invocation -> {
813-
Integer partitionId = invocation.getArgument(0);
814-
815-
return partitionStorages.computeIfAbsent(partitionId, id -> {
816-
try {
817-
return (MvPartitionStorage) createSpy(invocation.callRealMethod());
818-
} catch (Throwable e) {
819-
throw new RuntimeException(e);
820-
}
821-
});
822-
}).when(storage).getMvPartition(anyInt());
823-
824-
return storage;
825-
}
826-
};
803+
TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY,
804+
spyingMvTableStorageFactory(new DefaultMvTableStorageFactory(dataStorageMgr, catalogManager, lowWatermark))
805+
);
827806

828807
tableManager.setStreamerReceiverRunner(mock(StreamerReceiverRunner.class));
829808

@@ -1050,6 +1029,28 @@ public ReplicaMeta getPrimaryReplica(int zoneId) throws InterruptedException {
10501029
return primaryReplicaFuture.join();
10511030
}
10521031

1032+
private static MvTableStorageFactory spyingMvTableStorageFactory(MvTableStorageFactory delegate) {
1033+
return (tableDesc, zoneDesc) -> {
1034+
MvTableStorage storage = createSpy(delegate.createMvTableStorage(tableDesc, zoneDesc));
1035+
1036+
var partitionStorages = new ConcurrentHashMap<Integer, MvPartitionStorage>();
1037+
1038+
doAnswer(invocation -> {
1039+
Integer partitionId = invocation.getArgument(0);
1040+
1041+
return partitionStorages.computeIfAbsent(partitionId, id -> {
1042+
try {
1043+
return (MvPartitionStorage) createSpy(invocation.callRealMethod());
1044+
} catch (Throwable e) {
1045+
throw new RuntimeException(e);
1046+
}
1047+
});
1048+
}).when(storage).getMvPartition(anyInt());
1049+
1050+
return storage;
1051+
};
1052+
}
1053+
10531054
@Contract("null -> null")
10541055
private static <T> @Nullable T createSpy(@Nullable T object) {
10551056
if (object == null) {

modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@
212212
import org.apache.ignite.internal.table.TableImpl;
213213
import org.apache.ignite.internal.table.TableTestUtils;
214214
import org.apache.ignite.internal.table.TableViewInternal;
215+
import org.apache.ignite.internal.table.distributed.DefaultMvTableStorageFactory;
215216
import org.apache.ignite.internal.table.distributed.TableManager;
216217
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
217218
import org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -818,7 +819,8 @@ public CompletableFuture<Set<String>> dataNodes(HybridTimestamp timestamp, int c
818819
minTimeCollectorService,
819820
systemDistributedConfiguration,
820821
metricManager,
821-
TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY
822+
TableTestUtils.NOOP_PARTITION_MODIFICATION_COUNTER_FACTORY,
823+
new DefaultMvTableStorageFactory(dataStorageManager, catalogManager, lowWatermark)
822824
);
823825

824826
var indexManager = new IndexManager(

modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@
268268
import org.apache.ignite.internal.system.JvmCpuInformationProvider;
269269
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
270270
import org.apache.ignite.internal.systemview.api.SystemViewManager;
271+
import org.apache.ignite.internal.table.distributed.DefaultMvTableStorageFactory;
271272
import org.apache.ignite.internal.table.distributed.PartitionModificationCounterFactory;
272273
import org.apache.ignite.internal.table.distributed.PublicApiThreadingIgniteTables;
273274
import org.apache.ignite.internal.table.distributed.TableManager;
@@ -393,6 +394,9 @@ public class IgniteImpl implements Ignite {
393394
/** Replica manager. */
394395
private final ReplicaManager replicaMgr;
395396

397+
/** Replica service. */
398+
private final ReplicaService replicaSvc;
399+
396400
/** Transactions manager. */
397401
private final TxManagerImpl txManager;
398402

@@ -918,7 +922,7 @@ public class IgniteImpl implements Ignite {
918922

919923
TransactionConfiguration txConfig = clusterConfigRegistry.getConfiguration(TransactionExtensionConfiguration.KEY).transaction();
920924

921-
ReplicaService replicaSvc = new ReplicaService(
925+
replicaSvc = new ReplicaService(
922926
messagingServiceReturningToStorageOperationsPool,
923927
clockService,
924928
threadPoolsManager.partitionOperationsExecutor(),
@@ -1195,7 +1199,8 @@ public class IgniteImpl implements Ignite {
11951199
minTimeCollectorService,
11961200
systemDistributedConfiguration,
11971201
metricManager,
1198-
partitionModificationCounterFactory
1202+
partitionModificationCounterFactory,
1203+
new DefaultMvTableStorageFactory(dataStorageMgr, catalogManager, lowWatermark)
11991204
);
12001205

12011206
disasterRecoveryManager = new DisasterRecoveryManager(
@@ -1837,6 +1842,11 @@ public DisasterRecoveryManager disasterRecoveryManager() {
18371842
return disasterRecoveryManager;
18381843
}
18391844

1845+
@TestOnly
1846+
public ReplicaService replicaService() {
1847+
return replicaSvc;
1848+
}
1849+
18401850
@TestOnly
18411851
public VaultManager vault() {
18421852
return vaultMgr;
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.table.distributed;
19+
20+
import org.apache.ignite.internal.catalog.CatalogService;
21+
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
22+
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
23+
import org.apache.ignite.internal.lowwatermark.LowWatermark;
24+
import org.apache.ignite.internal.storage.DataStorageManager;
25+
import org.apache.ignite.internal.storage.engine.MvTableStorage;
26+
import org.apache.ignite.internal.storage.engine.StorageEngine;
27+
import org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
28+
import org.apache.ignite.internal.table.distributed.storage.NullStorageEngine;
29+
30+
/**
31+
* Default implementation of {@link MvTableStorageFactory} that creates table storages via {@link DataStorageManager}.
32+
*/
33+
public final class DefaultMvTableStorageFactory implements MvTableStorageFactory {
34+
private final DataStorageManager dataStorageMgr;
35+
36+
private final CatalogService catalogService;
37+
38+
private final LowWatermark lowWatermark;
39+
40+
/**
41+
* Constructor.
42+
*
43+
* @param dataStorageMgr Data storage manager.
44+
* @param catalogService Catalog service.
45+
* @param lowWatermark Low watermark.
46+
*/
47+
public DefaultMvTableStorageFactory(
48+
DataStorageManager dataStorageMgr,
49+
CatalogService catalogService,
50+
LowWatermark lowWatermark
51+
) {
52+
this.dataStorageMgr = dataStorageMgr;
53+
this.catalogService = catalogService;
54+
this.lowWatermark = lowWatermark;
55+
}
56+
57+
@Override
58+
public MvTableStorage createMvTableStorage(CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
59+
StorageEngine engine = dataStorageMgr.engineByStorageProfile(tableDescriptor.storageProfile());
60+
61+
if (engine == null) {
62+
// Create a placeholder to allow Table object being created.
63+
engine = new NullStorageEngine();
64+
}
65+
66+
return engine.createMvTable(
67+
new StorageTableDescriptor(tableDescriptor.id(), zoneDescriptor.partitions(), tableDescriptor.storageProfile()),
68+
new CatalogStorageIndexDescriptorSupplier(catalogService, lowWatermark)
69+
);
70+
}
71+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.table.distributed;
19+
20+
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
21+
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
22+
import org.apache.ignite.internal.storage.engine.MvTableStorage;
23+
24+
/**
25+
* Factory for creating {@link MvTableStorage} instances.
26+
*/
27+
public interface MvTableStorageFactory {
28+
/**
29+
* Creates a new MV table storage for the given table and zone descriptors.
30+
*
31+
* @param tableDescriptor Catalog table descriptor.
32+
* @param zoneDescriptor Catalog zone descriptor.
33+
* @return New MV table storage.
34+
*/
35+
MvTableStorage createMvTableStorage(CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor);
36+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.table.distributed;
19+
20+
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
21+
import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
22+
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
23+
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.ScheduledExecutorService;
26+
import java.util.concurrent.TimeUnit;
27+
import org.apache.ignite.internal.lang.NodeStoppingException;
28+
import org.apache.ignite.internal.logger.IgniteLogger;
29+
import org.apache.ignite.internal.logger.Loggers;
30+
import org.apache.ignite.internal.network.InternalClusterNode;
31+
import org.apache.ignite.internal.thread.IgniteThreadFactory;
32+
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
33+
import org.apache.ignite.lang.IgniteException;
34+
import org.jetbrains.annotations.Nullable;
35+
36+
/**
37+
* Lazily creates and manages the lifecycle of a single streamer flush {@link ScheduledExecutorService}.
38+
*
39+
* <p>The executor is created on first call to {@link #get()} and shut down on {@link #stop}.
40+
* {@link #beforeStop()} must be called before {@link #stop} to prevent new executor creation during shutdown.
41+
*/
42+
public final class StreamerFlushExecutorFactory {
43+
private static final IgniteLogger LOG = Loggers.forClass(StreamerFlushExecutorFactory.class);
44+
45+
private final InternalClusterNode localNode;
46+
47+
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
48+
49+
@Nullable
50+
private ScheduledExecutorService executor;
51+
52+
/**
53+
* Constructor.
54+
*
55+
* @param localNode Local node, used for thread naming.
56+
*/
57+
public StreamerFlushExecutorFactory(InternalClusterNode localNode) {
58+
this.localNode = localNode;
59+
}
60+
61+
/**
62+
* Returns the executor, creating it lazily if needed.
63+
*
64+
* @return Streamer flush executor.
65+
* @throws IgniteException If the node is stopping.
66+
*/
67+
public synchronized ScheduledExecutorService get() {
68+
if (!busyLock.enterBusy()) {
69+
throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException());
70+
}
71+
72+
try {
73+
if (executor == null) {
74+
executor = Executors.newSingleThreadScheduledExecutor(
75+
IgniteThreadFactory.create(localNode.name(), "streamer-flush-executor", LOG, STORAGE_WRITE));
76+
}
77+
78+
return executor;
79+
} finally {
80+
busyLock.leaveBusy();
81+
}
82+
}
83+
84+
/**
85+
* Blocks the busy lock to prevent new executor creation during shutdown.
86+
*/
87+
public void beforeStop() {
88+
busyLock.block();
89+
}
90+
91+
/**
92+
* Shuts down the executor if it was created.
93+
*
94+
* @param timeoutSeconds Timeout in seconds to wait for termination.
95+
*/
96+
public void stop(int timeoutSeconds) {
97+
ScheduledExecutorService local;
98+
99+
synchronized (this) {
100+
local = this.executor;
101+
}
102+
103+
shutdownAndAwaitTermination(local, timeoutSeconds, TimeUnit.SECONDS);
104+
}
105+
}

0 commit comments

Comments
 (0)