Skip to content

Commit bceab69

Browse files
authored
IGNITE-28437 Sql. Reduce number of rows fetched during ordered index scan (#7919)
1 parent 45b6124 commit bceab69

2 files changed

Lines changed: 14 additions & 9 deletions

File tree

modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.ignite.internal.sql.engine.exec.rel;
1919

20+
import static java.lang.Integer.max;
21+
import static java.lang.Integer.min;
22+
2023
import java.util.Collection;
2124
import java.util.Comparator;
2225
import java.util.Iterator;
@@ -38,7 +41,6 @@
3841
import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
3942
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
4043
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
41-
import org.apache.ignite.internal.sql.engine.util.Commons;
4244
import org.apache.ignite.internal.util.SubscriptionUtils;
4345
import org.apache.ignite.internal.util.TransformingIterator;
4446
import org.jetbrains.annotations.Nullable;
@@ -131,8 +133,18 @@ private Publisher<RowT> indexPublisher(
131133
partWithConsistencyToken -> partitionPublisher(partWithConsistencyToken, cond)
132134
);
133135

136+
int bufferSize = context().bufferSize();
137+
138+
// Let's prefetch equal share of a buffer from each partition.
139+
int fetchSize = max(context().bufferSize() / partsWithConsistencyTokens.size(), 1);
140+
141+
// Adds some buffer to improve chances to fulfill entire request without need go to storage once again.
142+
// This renders over-prefetching over all local partitions in total, but at least it's capped now at
143+
// 2x bufferSize within up to 512 local partitions.
144+
fetchSize = min(fetchSize * 2, bufferSize);
145+
134146
if (comp != null) {
135-
return SubscriptionUtils.orderedMerge(comp, Commons.SORTED_IDX_PART_PREFETCH_SIZE, it);
147+
return SubscriptionUtils.orderedMerge(comp, fetchSize, it);
136148
} else {
137149
return SubscriptionUtils.concat(it);
138150
}

modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,6 @@ public final class Commons {
138138
SqlKind.OTHER_DDL
139139
);
140140

141-
/**
142-
* The number of elements to be prefetched from each partition when scanning the sorted index.
143-
* The higher the value, the fewer calls to the upstream will be, but at the same time, the bigger
144-
* internal buffer will be.
145-
*/
146-
public static final int SORTED_IDX_PART_PREFETCH_SIZE = 100;
147-
148141
@SuppressWarnings("rawtypes")
149142
public static final List<RelTraitDef> DISTRIBUTED_TRAITS_SET = List.of(
150143
ConventionTraitDef.INSTANCE,

0 commit comments

Comments
 (0)