Skip to content

Commit 81a5688

Browse files
committed
IGNITE-28327 Improve TableRegistry api
1 parent ebedbb7 commit 81a5688

4 files changed

Lines changed: 333 additions & 51 deletions

File tree

modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.ignite.internal.table.distributed;
1919

20-
import static java.util.Collections.unmodifiableMap;
2120
import static java.util.concurrent.CompletableFuture.allOf;
2221
import static java.util.concurrent.CompletableFuture.anyOf;
2322
import static java.util.concurrent.CompletableFuture.completedFuture;
@@ -464,11 +463,11 @@ private CompletableFuture<Void> prepareTableResourcesOnRecovery(
464463
schemaRegistry
465464
);
466465

467-
tableRegistry.tables().put(tableId, table);
466+
tableRegistry.register(tableId, table);
468467

469468
zoneCoordinator.addTableToZone(zoneDescriptor.id(), table);
470469

471-
tableRegistry.startedTables().put(tableId, table);
470+
tableRegistry.markStarted(tableId);
472471
}));
473472
});
474473
}
@@ -499,7 +498,7 @@ private CompletableFuture<Void> loadTableToZoneOnTableCreate(
499498
return schemaManager.schemaRegistry(causalityToken, tableId).thenAccept(schemaRegistry -> {
500499
TableImpl table = createTableImpl(causalityToken, tableDescriptor, zoneDescriptor, schemaDescriptor, schemaRegistry);
501500

502-
tableRegistry.tables().put(tableId, table);
501+
tableRegistry.register(tableId, table);
503502
});
504503
}));
505504

@@ -524,7 +523,7 @@ private void unlockZoneForRead(CatalogZoneDescriptor zoneDescriptor, Completable
524523

525524
private void onTableDrop(DropTableEventParameters parameters) {
526525
inBusyLock(busyLock, () -> {
527-
unregisterMetricsSource(tableRegistry.startedTables().get(parameters.tableId()));
526+
unregisterMetricsSource(tableRegistry.startedTable(parameters.tableId()));
528527

529528
destructionEventsQueue.enqueue(new DestroyTableEvent(parameters.catalogVersion(), parameters.tableId()));
530529
});
@@ -569,7 +568,7 @@ private CompletableFuture<?> onTablePropertiesChanged(AlterTablePropertiesEventP
569568
return failedFuture(e);
570569
}
571570

572-
TableViewInternal table = tableRegistry.tables().get(parameters.tableId());
571+
TableViewInternal table = tableRegistry.table(parameters.tableId());
573572

574573
table.updateStalenessConfiguration(parameters.staleRowsFraction(), parameters.minStaleRowsCount());
575574

@@ -586,7 +585,7 @@ private CompletableFuture<?> onTableRename(RenameTableEventParameters parameters
586585
return failedFuture(e);
587586
}
588587

589-
TableViewInternal table = tableRegistry.tables().get(parameters.tableId());
588+
TableViewInternal table = tableRegistry.table(parameters.tableId());
590589

591590
// TODO: revisit this approach, see https://issues.apache.org/jira/browse/IGNITE-21235.
592591
((TableImpl) table).name(parameters.newTableName());
@@ -633,7 +632,7 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
633632
try {
634633
closeAllManually(
635634
zoneCoordinator::stop,
636-
() -> closeAllManually(tableRegistry.tables().values().stream().map(table -> () -> closeTable(table))),
635+
() -> closeAllManually(tableRegistry.allRegisteredTables().values().stream().map(table -> () -> closeTable(table))),
637636
() -> shutdownAndAwaitTermination(scanRequestExecutor, shutdownTimeoutSeconds, TimeUnit.SECONDS),
638637
() -> streamerFlushExecutorFactory.stop(shutdownTimeoutSeconds)
639638
);
@@ -688,9 +687,7 @@ private TableImpl createTableImpl(
688687
* @param tableId Table id to destroy.
689688
*/
690689
private CompletableFuture<Void> destroyTableLocally(int tableId) {
691-
TableViewInternal table = tableRegistry.startedTables().remove(tableId);
692-
693-
tableRegistry.localPartsByTableId().remove(tableId);
690+
TableViewInternal table = tableRegistry.removeStarted(tableId);
694691

695692
assert table != null : tableId;
696693

@@ -699,7 +696,7 @@ private CompletableFuture<Void> destroyTableLocally(int tableId) {
699696
return zoneCoordinator.stopAndDestroyTableProcessors(table)
700697
.thenComposeAsync(unused -> inBusyLockAsync(busyLock, () -> internalTable.storage().destroy()), ioExecutor)
701698
.thenAccept(unused -> inBusyLock(busyLock, () -> {
702-
tableRegistry.tables().remove(tableId);
699+
tableRegistry.unregister(tableId);
703700
schemaManager.dropRegistry(tableId);
704701
}))
705702
.whenComplete((v, e) -> {
@@ -751,22 +748,22 @@ private CompletableFuture<List<Table>> tablesAsyncInternalBusy() {
751748
* @see #assignmentsUpdatedVv
752749
*/
753750
private CompletableFuture<Map<Integer, TableViewInternal>> tablesById(long causalityToken) {
754-
return assignmentsUpdatedVv.get(causalityToken).thenApply(v -> unmodifiableMap(tableRegistry.startedTables()));
751+
return assignmentsUpdatedVv.get(causalityToken).thenApply(v -> tableRegistry.allStartedTables());
755752
}
756753

757754
/**
758755
* Returns an internal map, which contains all managed tables by their ID.
759756
*/
760757
private Map<Integer, TableViewInternal> tablesById() {
761-
return unmodifiableMap(tableRegistry.tables());
758+
return tableRegistry.allRegisteredTables();
762759
}
763760

764761
/**
765762
* Returns a map with started tables.
766763
*/
767764
@TestOnly
768765
public Map<Integer, TableViewInternal> startedTables() {
769-
return unmodifiableMap(tableRegistry.startedTables());
766+
return tableRegistry.allStartedTables();
770767
}
771768

772769
@Override
@@ -828,7 +825,7 @@ public CompletableFuture<PartitionSet> localPartitionSetAsync(long causalityToke
828825

829826
try {
830827
return localPartitionsVv.get(causalityToken)
831-
.thenApply(unused -> tableRegistry.localPartsByTableId().getOrDefault(tableId, PartitionSet.EMPTY_SET));
828+
.thenApply(unused -> tableRegistry.localPartitions(tableId));
832829
} finally {
833830
busyLock.leaveBusy();
834831
}
@@ -870,7 +867,7 @@ private CompletableFuture<TableViewInternal> tableAsyncInternal(QualifiedName na
870867
}
871868

872869
private CompletableFuture<TableViewInternal> tableAsyncInternalBusy(int tableId) {
873-
TableViewInternal tableImpl = tableRegistry.startedTables().get(tableId);
870+
TableViewInternal tableImpl = tableRegistry.startedTable(tableId);
874871

875872
if (tableImpl != null) {
876873
return completedFuture(tableImpl);
@@ -886,7 +883,7 @@ private CompletableFuture<TableViewInternal> tableAsyncInternalBusy(int tableId)
886883
if (e != null) {
887884
getLatestTableFuture.completeExceptionally(e);
888885
} else {
889-
getLatestTableFuture.complete(tableRegistry.startedTables().get(tableId));
886+
getLatestTableFuture.complete(tableRegistry.startedTable(tableId));
890887
}
891888
});
892889
} else {
@@ -900,7 +897,7 @@ private CompletableFuture<TableViewInternal> tableAsyncInternalBusy(int tableId)
900897

901898
// This check is needed for the case when we have registered tablesListener,
902899
// but tablesVv has already been completed, so listener would be triggered only for the next versioned value update.
903-
tableImpl = tableRegistry.startedTables().get(tableId);
900+
tableImpl = tableRegistry.startedTable(tableId);
904901

905902
if (tableImpl != null) {
906903
assignmentsUpdatedVv.removeWhenComplete(tablesListener);
@@ -923,7 +920,7 @@ private static <T> T sync(CompletableFuture<T> future) {
923920
*/
924921
@Override
925922
public @Nullable TableViewInternal cachedTable(int tableId) {
926-
return tableRegistry.tables().get(tableId);
923+
return tableRegistry.table(tableId);
927924
}
928925

929926
/**
@@ -933,7 +930,7 @@ private static <T> T sync(CompletableFuture<T> future) {
933930
*/
934931
@TestOnly
935932
public @Nullable TableViewInternal cachedTable(String name) {
936-
return findTableImplByName(tableRegistry.tables().values(), name);
933+
return findTableImplByName(tableRegistry.allRegisteredTables().values(), name);
937934
}
938935

939936
private CatalogZoneDescriptor getZoneDescriptor(CatalogTableDescriptor tableDescriptor, int catalogVersion) {

modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableRegistry.java

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,18 @@
1717

1818
package org.apache.ignite.internal.table.distributed;
1919

20+
import static java.util.Collections.unmodifiableMap;
21+
2022
import java.util.Map;
23+
import java.util.Objects;
2124
import java.util.concurrent.ConcurrentHashMap;
2225
import org.apache.ignite.internal.table.TableViewInternal;
26+
import org.jetbrains.annotations.Nullable;
2327

2428
/**
25-
* Holds shared mutable state for table tracking, shared between {@link TableManager} and {@link TableZoneCoordinator}.
29+
* Tracks table lifecycle state shared between {@link TableManager} and {@link TableZoneCoordinator}.
2630
*
27-
* <ul>
28-
* <li>{@link #tables} — all registered tables by ID.</li>
29-
* <li>{@link #startedTables} — tables that are fully started (partition resources prepared).</li>
30-
* <li>{@link #localPartsByTableId} — local partition sets per table.</li>
31-
* </ul>
31+
* <p>A table progresses through: registered → started → (removed from started) → unregistered.
3232
*/
3333
class TableRegistry {
3434
/** All registered tables by ID. */
@@ -40,15 +40,76 @@ class TableRegistry {
4040
/** Local partitions by table ID. */
4141
private final Map<Integer, PartitionSet> localPartsByTableId = new ConcurrentHashMap<>();
4242

43-
Map<Integer, TableViewInternal> tables() {
44-
return tables;
43+
// ── Registration ──
44+
45+
/** Registers a newly created table. Does not mark it as started. */
46+
void register(int tableId, TableViewInternal table) {
47+
tables.put(tableId, table);
48+
}
49+
50+
/** Promotes an already-registered table to started. */
51+
void markStarted(int tableId) {
52+
TableViewInternal table = tables.get(tableId);
53+
54+
assert table != null : "Table must be registered before marking as started: tableId=" + tableId;
55+
56+
startedTables.put(tableId, table);
57+
}
58+
59+
// ── Lookup ──
60+
61+
/** Returns a registered table by ID, or null if not found. */
62+
@Nullable TableViewInternal table(int tableId) {
63+
return tables.get(tableId);
64+
}
65+
66+
/** Returns a started table by ID, or null if not started. */
67+
@Nullable TableViewInternal startedTable(int tableId) {
68+
return startedTables.get(tableId);
69+
}
70+
71+
/** Returns an unmodifiable view of all registered tables. */
72+
Map<Integer, TableViewInternal> allRegisteredTables() {
73+
return unmodifiableMap(tables);
74+
}
75+
76+
/** Returns an unmodifiable view of all started tables. */
77+
Map<Integer, TableViewInternal> allStartedTables() {
78+
return unmodifiableMap(startedTables);
79+
}
80+
81+
// ── Removal ──
82+
83+
/** Removes the table from started tables and clears its local partitions. Returns the removed table, or null. */
84+
@Nullable TableViewInternal removeStarted(int tableId) {
85+
TableViewInternal removed = startedTables.remove(tableId);
86+
localPartsByTableId.remove(tableId);
87+
return removed;
88+
}
89+
90+
/** Removes the table from the registry entirely. */
91+
void unregister(int tableId) {
92+
tables.remove(tableId);
93+
}
94+
95+
// ── Local partitions ──
96+
97+
/** Sets the local partition set for a table. */
98+
void setLocalPartitions(int tableId, PartitionSet partitions) {
99+
localPartsByTableId.put(tableId, partitions);
45100
}
46101

47-
Map<Integer, TableViewInternal> startedTables() {
48-
return startedTables;
102+
/** Atomically extends local partitions by adding a partition index. Creates a new set if absent. */
103+
void extendLocalPartitions(int tableId, int partitionIndex) {
104+
localPartsByTableId.compute(tableId, (id, old) -> {
105+
PartitionSet set = Objects.requireNonNullElseGet(old, BitSetPartitionSet::new);
106+
set.set(partitionIndex);
107+
return set;
108+
});
49109
}
50110

51-
Map<Integer, PartitionSet> localPartsByTableId() {
52-
return localPartsByTableId;
111+
/** Returns local partitions for a table, or {@link PartitionSet#EMPTY_SET} if absent. */
112+
PartitionSet localPartitions(int tableId) {
113+
return localPartsByTableId.getOrDefault(tableId, PartitionSet.EMPTY_SET);
53114
}
54115
}

modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableZoneCoordinator.java

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -316,10 +316,10 @@ CompletableFuture<Void> loadTableToZone(
316316
}
317317
}
318318

319-
var table = (TableImpl) tableRegistry.tables().get(tableId);
319+
var table = (TableImpl) tableRegistry.table(tableId);
320320

321321
return createPartitionStoragesIfAbsent(table, parts)
322-
.thenRun(() -> tableRegistry.localPartsByTableId().put(tableId, parts));
322+
.thenRun(() -> tableRegistry.setLocalPartitions(tableId, parts));
323323
}, ioExecutor))
324324
// If the table is already closed, it's not a problem (probably the node is stopping).
325325
.exceptionally(ignoreTableClosedException())
@@ -331,7 +331,7 @@ CompletableFuture<Void> loadTableToZone(
331331
}
332332

333333
return localPartsUpdateFuture.thenRunAsync(() -> inBusyLock(busyLock, () -> {
334-
var table = (TableImpl) tableRegistry.tables().get(tableId);
334+
var table = (TableImpl) tableRegistry.table(tableId);
335335

336336
for (int i = 0; i < zoneDescriptor.partitions(); i++) {
337337
var zonePartitionId = new ZonePartitionId(zoneDescriptor.id(), i);
@@ -350,11 +350,9 @@ CompletableFuture<Void> loadTableToZone(
350350

351351
// TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible performance degradation.
352352
return createPartsFut.thenAccept(ignore -> {
353-
var table = (TableImpl) tableRegistry.tables().get(tableId);
353+
tableRegistry.markStarted(tableId);
354354

355-
tableRegistry.startedTables().put(tableId, table);
356-
357-
addTableToZone(zoneDescriptor.id(), table);
355+
addTableToZone(zoneDescriptor.id(), (TableImpl) tableRegistry.startedTable(tableId));
358356
});
359357
}
360358

@@ -458,10 +456,7 @@ private CompletableFuture<Void> createPartitionsAndLoadResourcesToZoneReplica(
458456
CompletableFuture<?>[] futures = zoneTables.stream()
459457
.map(tbl -> inBusyLockAsync(busyLock, () -> {
460458
return runAsync(() -> inBusyLock(busyLock, () -> {
461-
tableRegistry.localPartsByTableId().compute(
462-
tbl.tableId(),
463-
(tableId, oldPartitionSet) -> extendPartitionSet(oldPartitionSet, partitionIndex)
464-
);
459+
tableRegistry.extendLocalPartitions(tbl.tableId(), partitionIndex);
465460

466461
lowWatermark.getLowWatermarkSafe(lwm ->
467462
registerIndexesToTable(
@@ -667,7 +662,7 @@ private CompletableFuture<Void> stopAndDestroyTablePartition(TablePartitionId ta
667662

668663
return tokenFuture
669664
.thenCompose(ignore -> {
670-
TableViewInternal table = tableRegistry.tables().get(tablePartitionId.tableId());
665+
TableViewInternal table = tableRegistry.table(tablePartitionId.tableId());
671666
assert table != null : tablePartitionId;
672667

673668
return stopAndDestroyTablePartition(tablePartitionId, table);
@@ -793,12 +788,6 @@ private Set<TableViewInternal> zoneTablesRawSet(int zoneId) {
793788
return tablesPerZone.getOrDefault(zoneId, Set.of());
794789
}
795790

796-
private static PartitionSet extendPartitionSet(@Nullable PartitionSet oldPartitionSet, int partitionId) {
797-
PartitionSet newPartitionSet = Objects.requireNonNullElseGet(oldPartitionSet, BitSetPartitionSet::new);
798-
newPartitionSet.set(partitionId);
799-
return newPartitionSet;
800-
}
801-
802791
private static <T> Function<Throwable, T> ignoreTableClosedException() {
803792
return ex -> {
804793
if (hasCause(ex, TableClosedException.class)) {

0 commit comments

Comments
 (0)