Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.ignite.internal.table.distributed;

import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.anyOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
Expand Down Expand Up @@ -464,11 +463,11 @@ private CompletableFuture<Void> prepareTableResourcesOnRecovery(
schemaRegistry
);

tableRegistry.tables().put(tableId, table);
tableRegistry.register(tableId, table);

zoneCoordinator.addTableToZone(zoneDescriptor.id(), table);

tableRegistry.startedTables().put(tableId, table);
tableRegistry.markStarted(tableId);
}));
});
}
Expand Down Expand Up @@ -499,7 +498,7 @@ private CompletableFuture<Void> loadTableToZoneOnTableCreate(
return schemaManager.schemaRegistry(causalityToken, tableId).thenAccept(schemaRegistry -> {
TableImpl table = createTableImpl(causalityToken, tableDescriptor, zoneDescriptor, schemaDescriptor, schemaRegistry);

tableRegistry.tables().put(tableId, table);
tableRegistry.register(tableId, table);
});
}));

Expand All @@ -524,7 +523,7 @@ private void unlockZoneForRead(CatalogZoneDescriptor zoneDescriptor, Completable

private void onTableDrop(DropTableEventParameters parameters) {
inBusyLock(busyLock, () -> {
unregisterMetricsSource(tableRegistry.startedTables().get(parameters.tableId()));
unregisterMetricsSource(tableRegistry.startedTable(parameters.tableId()));

destructionEventsQueue.enqueue(new DestroyTableEvent(parameters.catalogVersion(), parameters.tableId()));
});
Expand Down Expand Up @@ -569,7 +568,7 @@ private CompletableFuture<?> onTablePropertiesChanged(AlterTablePropertiesEventP
return failedFuture(e);
}

TableViewInternal table = tableRegistry.tables().get(parameters.tableId());
TableViewInternal table = tableRegistry.table(parameters.tableId());

table.updateStalenessConfiguration(parameters.staleRowsFraction(), parameters.minStaleRowsCount());

Expand All @@ -586,7 +585,7 @@ private CompletableFuture<?> onTableRename(RenameTableEventParameters parameters
return failedFuture(e);
}

TableViewInternal table = tableRegistry.tables().get(parameters.tableId());
TableViewInternal table = tableRegistry.table(parameters.tableId());

// TODO: revisit this approach, see https://issues.apache.org/jira/browse/IGNITE-21235.
((TableImpl) table).name(parameters.newTableName());
Expand Down Expand Up @@ -633,7 +632,7 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
try {
closeAllManually(
zoneCoordinator::stop,
() -> closeAllManually(tableRegistry.tables().values().stream().map(table -> () -> closeTable(table))),
() -> closeAllManually(tableRegistry.allRegisteredTables().values().stream().map(table -> () -> closeTable(table))),
() -> shutdownAndAwaitTermination(scanRequestExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS),
() -> streamerFlushExecutorFactory.stop(shutdownTimeoutSeconds)
);
Expand Down Expand Up @@ -688,9 +687,7 @@ private TableImpl createTableImpl(
* @param tableId Table id to destroy.
*/
private CompletableFuture<Void> destroyTableLocally(int tableId) {
TableViewInternal table = tableRegistry.startedTables().remove(tableId);

tableRegistry.localPartsByTableId().remove(tableId);
TableViewInternal table = tableRegistry.removeStarted(tableId);

assert table != null : tableId;

Expand All @@ -699,7 +696,7 @@ private CompletableFuture<Void> destroyTableLocally(int tableId) {
return zoneCoordinator.stopAndDestroyTableProcessors(table)
.thenComposeAsync(unused -> inBusyLockAsync(busyLock, () -> internalTable.storage().destroy()), ioExecutor)
.thenAccept(unused -> inBusyLock(busyLock, () -> {
tableRegistry.tables().remove(tableId);
tableRegistry.unregister(tableId);
schemaManager.dropRegistry(tableId);
}))
.whenComplete((v, e) -> {
Expand Down Expand Up @@ -751,22 +748,22 @@ private CompletableFuture<List<Table>> tablesAsyncInternalBusy() {
* @see #assignmentsUpdatedVv
*/
private CompletableFuture<Map<Integer, TableViewInternal>> tablesById(long causalityToken) {
return assignmentsUpdatedVv.get(causalityToken).thenApply(v -> unmodifiableMap(tableRegistry.startedTables()));
return assignmentsUpdatedVv.get(causalityToken).thenApply(v -> tableRegistry.allStartedTables());
}

/**
* Returns an internal map, which contains all managed tables by their ID.
*/
private Map<Integer, TableViewInternal> tablesById() {
return unmodifiableMap(tableRegistry.tables());
return tableRegistry.allRegisteredTables();
}

/**
* Returns a map with started tables.
*/
@TestOnly
public Map<Integer, TableViewInternal> startedTables() {
return unmodifiableMap(tableRegistry.startedTables());
return tableRegistry.allStartedTables();
}

@Override
Expand Down Expand Up @@ -828,7 +825,7 @@ public CompletableFuture<PartitionSet> localPartitionSetAsync(long causalityToke

try {
return localPartitionsVv.get(causalityToken)
.thenApply(unused -> tableRegistry.localPartsByTableId().getOrDefault(tableId, PartitionSet.EMPTY_SET));
.thenApply(unused -> tableRegistry.localPartitions(tableId));
} finally {
busyLock.leaveBusy();
}
Expand Down Expand Up @@ -870,7 +867,7 @@ private CompletableFuture<TableViewInternal> tableAsyncInternal(QualifiedName na
}

private CompletableFuture<TableViewInternal> tableAsyncInternalBusy(int tableId) {
TableViewInternal tableImpl = tableRegistry.startedTables().get(tableId);
TableViewInternal tableImpl = tableRegistry.startedTable(tableId);

if (tableImpl != null) {
return completedFuture(tableImpl);
Expand All @@ -886,7 +883,7 @@ private CompletableFuture<TableViewInternal> tableAsyncInternalBusy(int tableId)
if (e != null) {
getLatestTableFuture.completeExceptionally(e);
} else {
getLatestTableFuture.complete(tableRegistry.startedTables().get(tableId));
getLatestTableFuture.complete(tableRegistry.startedTable(tableId));
}
});
} else {
Expand All @@ -900,7 +897,7 @@ private CompletableFuture<TableViewInternal> tableAsyncInternalBusy(int tableId)

// This check is needed for the case when we have registered tablesListener,
// but tablesVv has already been completed, so listener would be triggered only for the next versioned value update.
tableImpl = tableRegistry.startedTables().get(tableId);
tableImpl = tableRegistry.startedTable(tableId);

if (tableImpl != null) {
assignmentsUpdatedVv.removeWhenComplete(tablesListener);
Expand All @@ -923,7 +920,7 @@ private static <T> T sync(CompletableFuture<T> future) {
*/
@Override
public @Nullable TableViewInternal cachedTable(int tableId) {
return tableRegistry.tables().get(tableId);
return tableRegistry.table(tableId);
}

/**
Expand All @@ -933,7 +930,7 @@ private static <T> T sync(CompletableFuture<T> future) {
*/
@TestOnly
public @Nullable TableViewInternal cachedTable(String name) {
return findTableImplByName(tableRegistry.tables().values(), name);
return findTableImplByName(tableRegistry.allRegisteredTables().values(), name);
}

private CatalogZoneDescriptor getZoneDescriptor(CatalogTableDescriptor tableDescriptor, int catalogVersion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

package org.apache.ignite.internal.table.distributed;

import static java.util.Collections.unmodifiableMap;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.table.TableViewInternal;
import org.jetbrains.annotations.Nullable;

/**
* Holds shared mutable state for table tracking, shared between {@link TableManager} and {@link TableZoneCoordinator}.
* Tracks table lifecycle state shared between {@link TableManager} and {@link TableZoneCoordinator}.
*
* <ul>
* <li>{@link #tables} — all registered tables by ID.</li>
* <li>{@link #startedTables} — tables that are fully started (partition resources prepared).</li>
* <li>{@link #localPartsByTableId} — local partition sets per table.</li>
* </ul>
* <p>A table progresses through: registered → started → (removed from started) → unregistered.
*/
class TableRegistry {
/** All registered tables by ID. */
Expand All @@ -40,15 +40,68 @@ class TableRegistry {
/** Local partitions by table ID. */
private final Map<Integer, PartitionSet> localPartsByTableId = new ConcurrentHashMap<>();

Map<Integer, TableViewInternal> tables() {
return tables;
/** Registers a newly created table. Does not mark it as started. */
void register(int tableId, TableViewInternal table) {
tables.put(tableId, table);
}

/** Promotes an already-registered table to the started. */
void markStarted(int tableId) {
TableViewInternal table = tables.get(tableId);

assert table != null : "Table must be registered before marking as started: tableId=" + tableId;

startedTables.put(tableId, table);
}

/** Returns a registered table by ID, or null if not found. */
@Nullable TableViewInternal table(int tableId) {
return tables.get(tableId);
}

/** Returns a started table by ID, or null if not started. */
@Nullable TableViewInternal startedTable(int tableId) {
return startedTables.get(tableId);
}

/** Returns an unmodifiable view of all registered tables. */
Map<Integer, TableViewInternal> allRegisteredTables() {
return unmodifiableMap(tables);
}

/** Returns an unmodifiable view of all started tables. */
Map<Integer, TableViewInternal> allStartedTables() {
return unmodifiableMap(startedTables);
}

/** Removes the table from started tables and clears its local partitions. Returns the removed table, or null. */
@Nullable TableViewInternal removeStarted(int tableId) {
TableViewInternal removed = startedTables.remove(tableId);
localPartsByTableId.remove(tableId);
return removed;
}

/** Removes the table from the registry entirely. */
void unregister(int tableId) {
tables.remove(tableId);
}

/** Sets the local partition set for a table. */
void setLocalPartitions(int tableId, PartitionSet partitions) {
localPartsByTableId.put(tableId, partitions);
}

Map<Integer, TableViewInternal> startedTables() {
return startedTables;
/** Atomically extends local partitions by adding a partition index. Creates a new set if absent. */
void extendLocalPartitions(int tableId, int partitionIndex) {
localPartsByTableId.compute(tableId, (id, old) -> {
PartitionSet set = Objects.requireNonNullElseGet(old, BitSetPartitionSet::new);
set.set(partitionIndex);
return set;
});
}

Map<Integer, PartitionSet> localPartsByTableId() {
return localPartsByTableId;
/** Returns local partitions for a table, or {@link PartitionSet#EMPTY_SET} if absent. */
PartitionSet localPartitions(int tableId) {
return localPartsByTableId.getOrDefault(tableId, PartitionSet.EMPTY_SET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,10 @@ CompletableFuture<Void> loadTableToZone(
}
}

var table = (TableImpl) tableRegistry.tables().get(tableId);
var table = (TableImpl) tableRegistry.table(tableId);

return createPartitionStoragesIfAbsent(table, parts)
.thenRun(() -> tableRegistry.localPartsByTableId().put(tableId, parts));
.thenRun(() -> tableRegistry.setLocalPartitions(tableId, parts));
}, ioExecutor))
// If the table is already closed, it's not a problem (probably the node is stopping).
.exceptionally(ignoreTableClosedException())
Expand All @@ -331,7 +331,7 @@ CompletableFuture<Void> loadTableToZone(
}

return localPartsUpdateFuture.thenRunAsync(() -> inBusyLock(busyLock, () -> {
var table = (TableImpl) tableRegistry.tables().get(tableId);
var table = (TableImpl) tableRegistry.table(tableId);

for (int i = 0; i < zoneDescriptor.partitions(); i++) {
var zonePartitionId = new ZonePartitionId(zoneDescriptor.id(), i);
Expand All @@ -350,11 +350,9 @@ CompletableFuture<Void> loadTableToZone(

// TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible performance degradation.
return createPartsFut.thenAccept(ignore -> {
var table = (TableImpl) tableRegistry.tables().get(tableId);
tableRegistry.markStarted(tableId);

tableRegistry.startedTables().put(tableId, table);

addTableToZone(zoneDescriptor.id(), table);
addTableToZone(zoneDescriptor.id(), (TableImpl) tableRegistry.startedTable(tableId));
});
}

Expand Down Expand Up @@ -458,10 +456,7 @@ private CompletableFuture<Void> createPartitionsAndLoadResourcesToZoneReplica(
CompletableFuture<?>[] futures = zoneTables.stream()
.map(tbl -> inBusyLockAsync(busyLock, () -> {
return runAsync(() -> inBusyLock(busyLock, () -> {
tableRegistry.localPartsByTableId().compute(
tbl.tableId(),
(tableId, oldPartitionSet) -> extendPartitionSet(oldPartitionSet, partitionIndex)
);
tableRegistry.extendLocalPartitions(tbl.tableId(), partitionIndex);

lowWatermark.getLowWatermarkSafe(lwm ->
registerIndexesToTable(
Expand Down Expand Up @@ -667,7 +662,7 @@ private CompletableFuture<Void> stopAndDestroyTablePartition(TablePartitionId ta

return tokenFuture
.thenCompose(ignore -> {
TableViewInternal table = tableRegistry.tables().get(tablePartitionId.tableId());
TableViewInternal table = tableRegistry.table(tablePartitionId.tableId());
assert table != null : tablePartitionId;

return stopAndDestroyTablePartition(tablePartitionId, table);
Expand Down Expand Up @@ -793,12 +788,6 @@ private Set<TableViewInternal> zoneTablesRawSet(int zoneId) {
return tablesPerZone.getOrDefault(zoneId, Set.of());
}

private static PartitionSet extendPartitionSet(@Nullable PartitionSet oldPartitionSet, int partitionId) {
PartitionSet newPartitionSet = Objects.requireNonNullElseGet(oldPartitionSet, BitSetPartitionSet::new);
newPartitionSet.set(partitionId);
return newPartitionSet;
}

private static <T> Function<Throwable, T> ignoreTableClosedException() {
return ex -> {
if (hasCause(ex, TableClosedException.class)) {
Expand Down
Loading