Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.ignite.internal.sql.engine.exec;

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.sql.engine.exec.ScannableTableImpl.IndexMeta;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
Expand Down Expand Up @@ -92,8 +96,13 @@ private ExecutableTable loadTable(IgniteTable sqlTable) {
tableDescriptor, schemaRegistry, schemaDescriptor
);

Int2ObjectMap<IndexMeta> indexMeta = new Int2ObjectOpenHashMap<>();
for (IgniteIndex index : sqlTable.indexes().values()) {
indexMeta.put(index.id(), new ScannableTableImpl.IndexMeta(index.collation().getFieldCollations().size()));
}

InternalTable internalTable = table.internalTable();
ScannableTable scannableTable = new ScannableTableImpl(internalTable, converterFactory);
ScannableTable scannableTable = new ScannableTableImpl(internalTable, indexMeta, converterFactory);
TableRowConverter rowConverter = converterFactory.create(null);

UpdatableTableImpl updatableTable = new UpdatableTableImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,6 @@ public Node<RowT> visit(IgniteIndexScan rel) {
rowFactory,
idx,
scannableTable,
tbl.descriptor(),
partitionProvider,
comp,
ranges,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.ignite.internal.sql.engine.exec;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
Expand Down Expand Up @@ -55,7 +54,6 @@ <RowT> Publisher<RowT> scan(
* @param partWithConsistencyToken Partition.
* @param rowFactory Row factory.
* @param indexId Index id.
* @param columns Index columns.
* @param cond Index condition.
* @param requiredColumns Required columns.
* @return A publisher that produces rows.
Expand All @@ -65,7 +63,6 @@ <RowT> Publisher<RowT> indexRangeScan(
PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory,
int indexId,
List<String> columns,
@Nullable RangeCondition<RowT> cond,
int @Nullable [] requiredColumns
);
Expand All @@ -78,7 +75,6 @@ <RowT> Publisher<RowT> indexRangeScan(
* @param partWithConsistencyToken Partition.
* @param rowFactory Row factory.
* @param indexId Index id.
* @param columns Index columns.
* @param key A key to lookup.
* @param requiredColumns Required columns.
* @return A publisher that produces rows.
Expand All @@ -88,7 +84,6 @@ <RowT> Publisher<RowT> indexLookup(
PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory,
int indexId,
List<String> columns,
RowT key,
int @Nullable [] requiredColumns
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import static org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS;
import static org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL;

import java.util.List;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.binarytuple.BinaryTuple;
Expand All @@ -39,6 +39,7 @@
import org.apache.ignite.internal.table.OperationContext;
import org.apache.ignite.internal.table.TxContext;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.subscription.TransformingPublisher;
import org.jetbrains.annotations.Nullable;

Expand All @@ -49,11 +50,18 @@ public class ScannableTableImpl implements ScannableTable {

private final InternalTable internalTable;

private final Int2ObjectMap<IndexMeta> indexMeta;

private final TableRowConverterFactory converterFactory;

/** Constructor. */
public ScannableTableImpl(InternalTable internalTable, TableRowConverterFactory converterFactory) {
public ScannableTableImpl(
InternalTable internalTable,
Int2ObjectMap<IndexMeta> indexMeta,
TableRowConverterFactory converterFactory
) {
this.internalTable = internalTable;
this.indexMeta = indexMeta;
this.converterFactory = converterFactory;
}

Expand Down Expand Up @@ -83,7 +91,6 @@ public <RowT> Publisher<RowT> indexRangeScan(
PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory,
int indexId,
List<String> columns,
@Nullable RangeCondition<RowT> cond,
int @Nullable [] requiredColumns
) {
Expand All @@ -94,15 +101,23 @@ public <RowT> Publisher<RowT> indexRangeScan(
BinaryTuplePrefix lower;
BinaryTuplePrefix upper;

IndexMeta meta = indexMeta.get(indexId);

if (meta == null) {
throw new IllegalStateException(format("Index metadata not found [tableId={}, tableName={}, indexId={}].",
internalTable.tableId(), internalTable.name(), indexId));
}

int indexKeySize = meta.indexKeySize;
int flags = 0;

if (cond == null) {
flags = LESS_OR_EQUAL | GREATER_OR_EQUAL;
lower = null;
upper = null;
} else {
lower = toBinaryTuplePrefix(columns.size(), handler, cond.lower());
upper = toBinaryTuplePrefix(columns.size(), handler, cond.upper());
lower = toBinaryTuplePrefix(indexKeySize, handler, cond.lower());
upper = toBinaryTuplePrefix(indexKeySize, handler, cond.upper());

flags |= (cond.lowerInclude()) ? GREATER_OR_EQUAL : GREATER;
flags |= (cond.upperInclude()) ? LESS_OR_EQUAL : LESS;
Expand Down Expand Up @@ -130,7 +145,6 @@ public <RowT> Publisher<RowT> indexLookup(
PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory,
int indexId,
List<String> columns,
RowT key,
int @Nullable [] requiredColumns
) {
Expand All @@ -140,8 +154,19 @@ public <RowT> Publisher<RowT> indexLookup(

BinaryTuple keyTuple = handler.toBinaryTuple(key);

assert keyTuple.elementCount() == columns.size()
: format("Key should contain exactly {} fields, but was {}", columns.size(), handler.toString(key));
if (IgniteUtils.assertionsEnabled()) {
IndexMeta meta = indexMeta.get(indexId);

if (meta == null) {
throw new IllegalStateException(format("Index metadata not found [tableId={}, tableName={}, indexId={}].",
internalTable.tableId(), internalTable.name(), indexId));
}

int indexKeySize = meta.indexKeySize;

assert keyTuple.elementCount() == indexKeySize
: format("Key should contain exactly {} fields, but was {}", indexKeySize, handler.toString(key));
}

int partId = partWithConsistencyToken.partId();

Expand Down Expand Up @@ -224,4 +249,13 @@ private static TxContext transactionalContextFrom(TxAttributes txAttributes, lon

return TxContext.readWrite(txAttributes.id(), txAttributes.coordinatorId(), commitPartition, enlistmentConsistencyToken);
}

/** Metadata required to process scan over particular index. */
public static class IndexMeta {
private final int indexKeySize;

IndexMeta(int indexKeySize) {
this.indexKeySize = indexKeySize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.lang.IgniteStringBuilder;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
Expand All @@ -38,9 +36,7 @@
import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeCondition;
import org.apache.ignite.internal.sql.engine.exec.exp.RangeIterable;
import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.util.SubscriptionUtils;
import org.apache.ignite.internal.util.TransformingIterator;
import org.jetbrains.annotations.Nullable;
Expand All @@ -66,14 +62,11 @@ public class IndexScanNode<RowT> extends StorageScanNode<RowT> {

private final @Nullable Comparator<RowT> comp;

private final List<String> columns;

/**
* Constructor.
*
* @param ctx Execution context.
* @param rowFactory Row factory.
* @param tableDescriptor Table descriptor.
* @param partitionProvider Partition provider.
* @param comp Rows comparator.
* @param rangeConditions Range conditions.
Expand All @@ -86,7 +79,6 @@ public IndexScanNode(
RowFactory<RowT> rowFactory,
IgniteIndex schemaIndex,
ScannableTable table,
TableDescriptor tableDescriptor,
PartitionProvider<RowT> partitionProvider,
@Nullable Comparator<RowT> comp,
@Nullable RangeIterable<RowT> rangeConditions,
Expand All @@ -103,12 +95,6 @@ public IndexScanNode(
this.rangeConditions = rangeConditions;
this.comp = comp;
this.factory = rowFactory;

columns = schemaIndex.collation().getFieldCollations().stream()
.map(RelFieldCollation::getFieldIndex)
.map(tableDescriptor::columnDescriptor)
.map(ColumnDescriptor::name)
.collect(Collectors.toList());
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -159,12 +145,10 @@ private Publisher<RowT> partitionPublisher(

switch (schemaIndex.type()) {
case SORTED:
return table.indexRangeScan(ctx, partWithConsistencyToken, factory, indexId,
columns, cond, requiredColumns);
return table.indexRangeScan(ctx, partWithConsistencyToken, factory, indexId, cond, requiredColumns);

case HASH:
return table.indexLookup(ctx, partWithConsistencyToken, factory, indexId,
columns, cond.lower(), requiredColumns);
return table.indexLookup(ctx, partWithConsistencyToken, factory, indexId, cond.lower(), requiredColumns);

default:
throw new AssertionError("Unexpected index type: " + schemaIndex.type());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,12 @@ public IgniteTable table(int catalogVersion, int tableId) {
throw new IgniteInternalException(Common.INTERNAL_ERR, "Table with given id not found: " + tableId);
}

CacheKey tableKey = tableCacheKey(tableDescriptor.id(), tableDescriptor.updateTimestamp());
HybridTimestamp tableTime = deriveTableTimestamp(tableDescriptor, catalog);
CacheKey tableKey = tableCacheKey(tableDescriptor.id(), tableTime);

IgniteTableImpl igniteTable = tableCache.get(tableKey, (x) -> {
TableDescriptor descriptor = createTableDescriptorForTable(catalog, tableDescriptor);
return createTableDataOnlyTable(catalog, tableDescriptor, descriptor);
return createTableDataOnlyTable(catalog, tableTime, tableDescriptor, descriptor);
});

Map<String, IgniteIndex> tableIndexes = getIndexes(catalog,
Expand All @@ -202,6 +203,24 @@ public IgniteTable table(int catalogVersion, int tableId) {
});
}

private static HybridTimestamp deriveTableTimestamp(CatalogTableDescriptor table, Catalog catalog) {
// Consolidated time accounting for update time of all table-related objects which affects either
// planning or sql-related execution.
//
// Indexes included because ScannableTable contains index-related meta, hence we need to create
// new instance as soon as any indexes has changed or new index has been added. Such kind of
// consolidation doesn't account for DROP, but in case of indexes this should not be a big deal
// because it's rather infrequent operation.
HybridTimestamp time = table.updateTimestamp();
for (CatalogIndexDescriptor index : catalog.indexes(table.id())) {
if (index.updateTimestamp().compareTo(time) > 0) {
time = index.updateTimestamp();
}
}

return time;
}

private static long cacheKey(int part1, int part2) {
long cacheKey = part1;
cacheKey <<= 32;
Expand Down Expand Up @@ -232,12 +251,13 @@ private IgniteSchema createSqlSchema(Catalog catalog, CatalogSchemaDescriptor sc

// Assemble sql-engine.TableDescriptors as they are required by indexes.
for (CatalogTableDescriptor tableDescriptor : schemaDescriptor.tables()) {
CacheKey tableKey = tableCacheKey(tableDescriptor.id(), tableDescriptor.updateTimestamp());
HybridTimestamp tableTime = deriveTableTimestamp(tableDescriptor, catalog);
CacheKey tableKey = tableCacheKey(tableDescriptor.id(), tableTime);

// Load cached table by (id, version)
IgniteTableImpl igniteTable = tableCache.get(tableKey, (k) -> {
TableDescriptor descriptor = createTableDescriptorForTable(catalog, tableDescriptor);
return createTableDataOnlyTable(catalog, tableDescriptor, descriptor);
return createTableDataOnlyTable(catalog, tableTime, tableDescriptor, descriptor);
});

// Get actual indices
Expand Down Expand Up @@ -445,6 +465,7 @@ private static CatalogColumnDescriptor createColumnDescriptor(CatalogTableColumn

private IgniteTableImpl createTableDataOnlyTable(
Catalog catalog,
HybridTimestamp tableTime,
CatalogTableDescriptor table,
TableDescriptor descriptor
) {
Expand All @@ -455,7 +476,7 @@ private IgniteTableImpl createTableDataOnlyTable(

CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(catalog, table.zoneId());

return createTable(table, descriptor, tableIndexes, zoneDescriptor, sqlStatisticManager);
return createTable(table, tableTime, descriptor, tableIndexes, zoneDescriptor, sqlStatisticManager);
}

private Map<String, IgniteIndex> getIndexes(Catalog catalog, int tableId, int primaryKeyIndexId) {
Expand Down Expand Up @@ -501,6 +522,7 @@ private static CatalogZoneDescriptor getZoneDescriptor(Catalog catalog, int zone

private static IgniteTableImpl createTable(
CatalogTableDescriptor catalogTableDescriptor,
HybridTimestamp tableTime,
TableDescriptor tableDescriptor,
Map<String, IgniteIndex> indexes,
CatalogZoneDescriptor zoneDescriptor,
Expand All @@ -526,7 +548,7 @@ private static IgniteTableImpl createTable(
tableName,
tableId,
catalogTableDescriptor.latestSchemaVersion(),
catalogTableDescriptor.updateTimestamp().longValue(),
tableTime.longValue(),
tableDescriptor,
primaryKeyColumns,
statistic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.ignite.internal.sql.engine.exec;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
Expand All @@ -44,7 +43,6 @@ public <RowT> Publisher<RowT> indexRangeScan(
PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory,
int indexId,
List<String> columns,
@Nullable RangeCondition<RowT> cond,
int @Nullable [] requiredColumns
) {
Expand All @@ -57,7 +55,6 @@ public <RowT> Publisher<RowT> indexLookup(
PartitionWithConsistencyToken partWithConsistencyToken,
RowFactory<RowT> rowFactory,
int indexId,
List<String> columns,
RowT key,
int @Nullable [] requiredColumns
) {
Expand Down
Loading