diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 81bf849dab2..21002594b3d 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.table.distributed; -import static java.util.Collections.unmodifiableMap; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.anyOf; import static java.util.concurrent.CompletableFuture.completedFuture; @@ -464,11 +463,11 @@ private CompletableFuture prepareTableResourcesOnRecovery( schemaRegistry ); - tableRegistry.tables().put(tableId, table); + tableRegistry.register(tableId, table); zoneCoordinator.addTableToZone(zoneDescriptor.id(), table); - tableRegistry.startedTables().put(tableId, table); + tableRegistry.markStarted(tableId); })); }); } @@ -499,7 +498,7 @@ private CompletableFuture loadTableToZoneOnTableCreate( return schemaManager.schemaRegistry(causalityToken, tableId).thenAccept(schemaRegistry -> { TableImpl table = createTableImpl(causalityToken, tableDescriptor, zoneDescriptor, schemaDescriptor, schemaRegistry); - tableRegistry.tables().put(tableId, table); + tableRegistry.register(tableId, table); }); })); @@ -524,7 +523,7 @@ private void unlockZoneForRead(CatalogZoneDescriptor zoneDescriptor, Completable private void onTableDrop(DropTableEventParameters parameters) { inBusyLock(busyLock, () -> { - unregisterMetricsSource(tableRegistry.startedTables().get(parameters.tableId())); + unregisterMetricsSource(tableRegistry.startedTable(parameters.tableId())); destructionEventsQueue.enqueue(new DestroyTableEvent(parameters.catalogVersion(), parameters.tableId())); }); @@ -569,7 +568,7 @@ private CompletableFuture onTablePropertiesChanged(AlterTablePropertiesEventP return failedFuture(e); } - TableViewInternal table = tableRegistry.tables().get(parameters.tableId()); + TableViewInternal table = tableRegistry.table(parameters.tableId()); table.updateStalenessConfiguration(parameters.staleRowsFraction(), parameters.minStaleRowsCount()); @@ -586,7 +585,7 @@ private CompletableFuture onTableRename(RenameTableEventParameters parameters return failedFuture(e); } - TableViewInternal table = tableRegistry.tables().get(parameters.tableId()); + TableViewInternal table = tableRegistry.table(parameters.tableId()); // TODO: revisit this approach, see https://issues.apache.org/jira/browse/IGNITE-21235. ((TableImpl) table).name(parameters.newTableName()); @@ -633,7 +632,7 @@ public CompletableFuture stopAsync(ComponentContext componentContext) { try { closeAllManually( zoneCoordinator::stop, - () -> closeAllManually(tableRegistry.tables().values().stream().map(table -> () -> closeTable(table))), + () -> closeAllManually(tableRegistry.allRegisteredTables().values().stream().map(table -> () -> closeTable(table))), () -> shutdownAndAwaitTermination(scanRequestExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS), () -> streamerFlushExecutorFactory.stop(shutdownTimeoutSeconds) ); @@ -688,9 +687,7 @@ private TableImpl createTableImpl( * @param tableId Table id to destroy. */ private CompletableFuture destroyTableLocally(int tableId) { - TableViewInternal table = tableRegistry.startedTables().remove(tableId); - - tableRegistry.localPartsByTableId().remove(tableId); + TableViewInternal table = tableRegistry.removeStarted(tableId); assert table != null : tableId; @@ -699,7 +696,7 @@ private CompletableFuture destroyTableLocally(int tableId) { return zoneCoordinator.stopAndDestroyTableProcessors(table) .thenComposeAsync(unused -> inBusyLockAsync(busyLock, () -> internalTable.storage().destroy()), ioExecutor) .thenAccept(unused -> inBusyLock(busyLock, () -> { - tableRegistry.tables().remove(tableId); + tableRegistry.unregister(tableId); schemaManager.dropRegistry(tableId); })) .whenComplete((v, e) -> { @@ -751,14 +748,14 @@ private CompletableFuture> tablesAsyncInternalBusy() { * @see #assignmentsUpdatedVv */ private CompletableFuture> tablesById(long causalityToken) { - return assignmentsUpdatedVv.get(causalityToken).thenApply(v -> unmodifiableMap(tableRegistry.startedTables())); + return assignmentsUpdatedVv.get(causalityToken).thenApply(v -> tableRegistry.allStartedTables()); } /** * Returns an internal map, which contains all managed tables by their ID. */ private Map tablesById() { - return unmodifiableMap(tableRegistry.tables()); + return tableRegistry.allRegisteredTables(); } /** @@ -766,7 +763,7 @@ private Map tablesById() { */ @TestOnly public Map startedTables() { - return unmodifiableMap(tableRegistry.startedTables()); + return tableRegistry.allStartedTables(); } @Override @@ -828,7 +825,7 @@ public CompletableFuture localPartitionSetAsync(long causalityToke try { return localPartitionsVv.get(causalityToken) - .thenApply(unused -> tableRegistry.localPartsByTableId().getOrDefault(tableId, PartitionSet.EMPTY_SET)); + .thenApply(unused -> tableRegistry.localPartitions(tableId)); } finally { busyLock.leaveBusy(); } @@ -870,7 +867,7 @@ private CompletableFuture tableAsyncInternal(QualifiedName na } private CompletableFuture tableAsyncInternalBusy(int tableId) { - TableViewInternal tableImpl = tableRegistry.startedTables().get(tableId); + TableViewInternal tableImpl = tableRegistry.startedTable(tableId); if (tableImpl != null) { return completedFuture(tableImpl); @@ -886,7 +883,7 @@ private CompletableFuture tableAsyncInternalBusy(int tableId) if (e != null) { getLatestTableFuture.completeExceptionally(e); } else { - getLatestTableFuture.complete(tableRegistry.startedTables().get(tableId)); + getLatestTableFuture.complete(tableRegistry.startedTable(tableId)); } }); } else { @@ -900,7 +897,7 @@ private CompletableFuture tableAsyncInternalBusy(int tableId) // This check is needed for the case when we have registered tablesListener, // but tablesVv has already been completed, so listener would be triggered only for the next versioned value update. - tableImpl = tableRegistry.startedTables().get(tableId); + tableImpl = tableRegistry.startedTable(tableId); if (tableImpl != null) { assignmentsUpdatedVv.removeWhenComplete(tablesListener); @@ -923,7 +920,7 @@ private static T sync(CompletableFuture future) { */ @Override public @Nullable TableViewInternal cachedTable(int tableId) { - return tableRegistry.tables().get(tableId); + return tableRegistry.table(tableId); } /** @@ -933,7 +930,7 @@ private static T sync(CompletableFuture future) { */ @TestOnly public @Nullable TableViewInternal cachedTable(String name) { - return findTableImplByName(tableRegistry.tables().values(), name); + return findTableImplByName(tableRegistry.allRegisteredTables().values(), name); } private CatalogZoneDescriptor getZoneDescriptor(CatalogTableDescriptor tableDescriptor, int catalogVersion) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java index 5194a951678..f6a23aceeb3 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java @@ -17,18 +17,18 @@ package org.apache.ignite.internal.table.distributed; +import static java.util.Collections.unmodifiableMap; + import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import org.apache.ignite.internal.table.TableViewInternal; +import org.jetbrains.annotations.Nullable; /** - * Holds shared mutable state for table tracking, shared between {@link TableManager} and {@link TableZoneCoordinator}. + * Tracks table lifecycle state shared between {@link TableManager} and {@link TableZoneCoordinator}. * - *
    - *
  • {@link #tables} — all registered tables by ID.
  • - *
  • {@link #startedTables} — tables that are fully started (partition resources prepared).
  • - *
  • {@link #localPartsByTableId} — local partition sets per table.
  • - *
+ *

A table progresses through: registered → started → (removed from started) → unregistered. */ class TableRegistry { /** All registered tables by ID. */ @@ -40,15 +40,68 @@ class TableRegistry { /** Local partitions by table ID. */ private final Map localPartsByTableId = new ConcurrentHashMap<>(); - Map tables() { - return tables; + /** Registers a newly created table. Does not mark it as started. */ + void register(int tableId, TableViewInternal table) { + tables.put(tableId, table); + } + + /** Promotes an already-registered table to the started. */ + void markStarted(int tableId) { + TableViewInternal table = tables.get(tableId); + + assert table != null : "Table must be registered before marking as started: tableId=" + tableId; + + startedTables.put(tableId, table); + } + + /** Returns a registered table by ID, or null if not found. */ + @Nullable TableViewInternal table(int tableId) { + return tables.get(tableId); + } + + /** Returns a started table by ID, or null if not started. */ + @Nullable TableViewInternal startedTable(int tableId) { + return startedTables.get(tableId); + } + + /** Returns an unmodifiable view of all registered tables. */ + Map allRegisteredTables() { + return unmodifiableMap(tables); + } + + /** Returns an unmodifiable view of all started tables. */ + Map allStartedTables() { + return unmodifiableMap(startedTables); + } + + /** Removes the table from started tables and clears its local partitions. Returns the removed table, or null. */ + @Nullable TableViewInternal removeStarted(int tableId) { + TableViewInternal removed = startedTables.remove(tableId); + localPartsByTableId.remove(tableId); + return removed; + } + + /** Removes the table from the registry entirely. */ + void unregister(int tableId) { + tables.remove(tableId); + } + + /** Sets the local partition set for a table. */ + void setLocalPartitions(int tableId, PartitionSet partitions) { + localPartsByTableId.put(tableId, partitions); } - Map startedTables() { - return startedTables; + /** Atomically extends local partitions by adding a partition index. Creates a new set if absent. */ + void extendLocalPartitions(int tableId, int partitionIndex) { + localPartsByTableId.compute(tableId, (id, old) -> { + PartitionSet set = Objects.requireNonNullElseGet(old, BitSetPartitionSet::new); + set.set(partitionIndex); + return set; + }); } - Map localPartsByTableId() { - return localPartsByTableId; + /** Returns local partitions for a table, or {@link PartitionSet#EMPTY_SET} if absent. */ + PartitionSet localPartitions(int tableId) { + return localPartsByTableId.getOrDefault(tableId, PartitionSet.EMPTY_SET); } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java index 9a2822d9be6..44a4986c38e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java @@ -316,10 +316,10 @@ CompletableFuture loadTableToZone( } } - var table = (TableImpl) tableRegistry.tables().get(tableId); + var table = (TableImpl) tableRegistry.table(tableId); return createPartitionStoragesIfAbsent(table, parts) - .thenRun(() -> tableRegistry.localPartsByTableId().put(tableId, parts)); + .thenRun(() -> tableRegistry.setLocalPartitions(tableId, parts)); }, ioExecutor)) // If the table is already closed, it's not a problem (probably the node is stopping). .exceptionally(ignoreTableClosedException()) @@ -331,7 +331,7 @@ CompletableFuture loadTableToZone( } return localPartsUpdateFuture.thenRunAsync(() -> inBusyLock(busyLock, () -> { - var table = (TableImpl) tableRegistry.tables().get(tableId); + var table = (TableImpl) tableRegistry.table(tableId); for (int i = 0; i < zoneDescriptor.partitions(); i++) { var zonePartitionId = new ZonePartitionId(zoneDescriptor.id(), i); @@ -350,11 +350,9 @@ CompletableFuture loadTableToZone( // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible performance degradation. return createPartsFut.thenAccept(ignore -> { - var table = (TableImpl) tableRegistry.tables().get(tableId); + tableRegistry.markStarted(tableId); - tableRegistry.startedTables().put(tableId, table); - - addTableToZone(zoneDescriptor.id(), table); + addTableToZone(zoneDescriptor.id(), (TableImpl) tableRegistry.startedTable(tableId)); }); } @@ -458,10 +456,7 @@ private CompletableFuture createPartitionsAndLoadResourcesToZoneReplica( CompletableFuture[] futures = zoneTables.stream() .map(tbl -> inBusyLockAsync(busyLock, () -> { return runAsync(() -> inBusyLock(busyLock, () -> { - tableRegistry.localPartsByTableId().compute( - tbl.tableId(), - (tableId, oldPartitionSet) -> extendPartitionSet(oldPartitionSet, partitionIndex) - ); + tableRegistry.extendLocalPartitions(tbl.tableId(), partitionIndex); lowWatermark.getLowWatermarkSafe(lwm -> registerIndexesToTable( @@ -667,7 +662,7 @@ private CompletableFuture stopAndDestroyTablePartition(TablePartitionId ta return tokenFuture .thenCompose(ignore -> { - TableViewInternal table = tableRegistry.tables().get(tablePartitionId.tableId()); + TableViewInternal table = tableRegistry.table(tablePartitionId.tableId()); assert table != null : tablePartitionId; return stopAndDestroyTablePartition(tablePartitionId, table); @@ -793,12 +788,6 @@ private Set zoneTablesRawSet(int zoneId) { return tablesPerZone.getOrDefault(zoneId, Set.of()); } - private static PartitionSet extendPartitionSet(@Nullable PartitionSet oldPartitionSet, int partitionId) { - PartitionSet newPartitionSet = Objects.requireNonNullElseGet(oldPartitionSet, BitSetPartitionSet::new); - newPartitionSet.set(partitionId); - return newPartitionSet; - } - private static Function ignoreTableClosedException() { return ex -> { if (hasCause(ex, TableClosedException.class)) { diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableRegistryTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableRegistryTest.java new file mode 100644 index 00000000000..fdf87f122d5 --- /dev/null +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableRegistryTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; +import org.junit.jupiter.api.Test; + +/** Tests for {@link TableRegistry}. */ +class TableRegistryTest extends BaseIgniteAbstractTest { + + private final TableRegistry registry = new TableRegistry(); + + @Test + void registerAddsToTablesOnly() { + TableViewInternal table = mock(TableViewInternal.class); + + registry.register(1, table); + + assertSame(table, registry.table(1)); + assertNull(registry.startedTable(1)); + } + + @Test + void markStartedPromotesRegisteredTable() { + TableViewInternal table = mock(TableViewInternal.class); + + registry.register(1, table); + assertNull(registry.startedTable(1)); + + registry.markStarted(1); + assertSame(table, registry.startedTable(1)); + } + + @Test + void tableReturnsNullForUnknownId() { + assertNull(registry.table(42)); + } + + @Test + void startedTableReturnsNullForNonStarted() { + registry.register(1, mock(TableViewInternal.class)); + + assertNull(registry.startedTable(1)); + } + + @Test + void allTablesReturnsUnmodifiableMap() { + registry.register(1, mock(TableViewInternal.class)); + + assertThrows(UnsupportedOperationException.class, () -> + registry.allRegisteredTables().put(2, mock(TableViewInternal.class))); + } + + @Test + void allStartedTablesReturnsUnmodifiableMap() { + registry.register(1, mock(TableViewInternal.class)); + registry.markStarted(1); + + assertThrows(UnsupportedOperationException.class, () -> + registry.allStartedTables().put(2, mock(TableViewInternal.class))); + } + + @Test + void allTablesReflectsRegistrations() { + assertTrue(registry.allRegisteredTables().isEmpty()); + + registry.register(1, mock(TableViewInternal.class)); + registry.register(2, mock(TableViewInternal.class)); + + assertEquals(2, registry.allRegisteredTables().size()); + } + + @Test + void allStartedTablesReflectsState() { + assertTrue(registry.allStartedTables().isEmpty()); + + registry.register(1, mock(TableViewInternal.class)); + registry.markStarted(1); + + assertEquals(1, registry.allStartedTables().size()); + } + + @Test + void removeStartedReturnsTableAndClearsLocalPartitions() { + TableViewInternal table = mock(TableViewInternal.class); + + registry.register(1, table); + registry.markStarted(1); + registry.setLocalPartitions(1, new BitSetPartitionSet()); + + TableViewInternal removed = registry.removeStarted(1); + + assertSame(table, removed); + assertNull(registry.startedTable(1)); + assertSame(PartitionSet.EMPTY_SET, registry.localPartitions(1)); + // Table should still be in the tables map. + assertSame(table, registry.table(1)); + } + + @Test + void removeStartedOnUnknownIdReturnsNull() { + assertNull(registry.removeStarted(42)); + } + + @Test + void unregisterRemovesFromTablesMap() { + registry.register(1, mock(TableViewInternal.class)); + + registry.unregister(1); + + assertNull(registry.table(1)); + } + + @Test + void setAndGetLocalPartitions() { + PartitionSet parts = new BitSetPartitionSet(); + parts.set(0); + parts.set(3); + + registry.setLocalPartitions(1, parts); + + assertSame(parts, registry.localPartitions(1)); + } + + @Test + void extendLocalPartitionsOnAbsentEntry() { + registry.extendLocalPartitions(1, 5); + + PartitionSet parts = registry.localPartitions(1); + assertNotNull(parts); + assertTrue(parts.get(5)); + assertEquals(1, parts.size()); + } + + @Test + void extendLocalPartitionsOnExistingEntry() { + registry.extendLocalPartitions(1, 2); + registry.extendLocalPartitions(1, 7); + + PartitionSet parts = registry.localPartitions(1); + assertTrue(parts.get(2)); + assertTrue(parts.get(7)); + assertEquals(2, parts.size()); + } + + @Test + void localPartitionsReturnsEmptySetIfAbsent() { + assertSame(PartitionSet.EMPTY_SET, registry.localPartitions(99)); + } + + @Test + void markStartedOnUnregisteredTableFails() { + assertThrows(AssertionError.class, () -> registry.markStarted(42)); + } + + @Test + void fullLifecycle() { + TableViewInternal table = mock(TableViewInternal.class); + + // Register. + registry.register(1, table); + assertSame(table, registry.table(1)); + assertNull(registry.startedTable(1)); + + // Mark started. + registry.markStarted(1); + assertSame(table, registry.startedTable(1)); + + // Add partitions. + registry.extendLocalPartitions(1, 0); + assertTrue(registry.localPartitions(1).get(0)); + + // Remove started (deactivate). + TableViewInternal removed = registry.removeStarted(1); + assertSame(table, removed); + assertNull(registry.startedTable(1)); + assertSame(PartitionSet.EMPTY_SET, registry.localPartitions(1)); + assertSame(table, registry.table(1)); // still registered + + // Unregister (final cleanup). + registry.unregister(1); + assertNull(registry.table(1)); + } + + @Test + void recoveryLifecycle() { + TableViewInternal table = mock(TableViewInternal.class); + + // Recovery registers and starts. + registry.register(1, table); + registry.markStarted(1); + assertSame(table, registry.table(1)); + assertSame(table, registry.startedTable(1)); + + // Destroy. + registry.removeStarted(1); + registry.unregister(1); + + assertNull(registry.table(1)); + assertNull(registry.startedTable(1)); + } +}