diff --git a/docs/document/content/user-manual/common-config/props.cn.md b/docs/document/content/user-manual/common-config/props.cn.md index 47e6c7a650866..ea2c3acd7594d 100644 --- a/docs/document/content/user-manual/common-config/props.cn.md +++ b/docs/document/content/user-manual/common-config/props.cn.md @@ -16,6 +16,7 @@ Apache ShardingSphere 提供属性配置的方式配置系统级配置。 | sql-simple (?) | boolean | 是否在日志中打印简单风格的 SQL | false | | kernel-executor-size (?) | int | 用于设置任务处理线程池的大小
每个 ShardingSphereDataSource 使用一个独立的线程池,同一个 JVM 的不同数据源不共享线程池 | infinite | | max-connections-size-per-query (?) | int | 一次查询请求在每个数据库实例中所能使用的最大连接数 | 1 | +| max-union-size-per-datasource (?) | int | 每个数据源允许合并的最大 UNION ALL 数量。当路由到同一数据源的路由单元数量超过此值时,将分批合并以恢复并行执行能力 | Integer.MAX_VALUE | | check-table-metadata-enabled (?) | boolean | 在程序启动和更新时,是否检查分片元数据的结构一致性 | false | | load-table-metadata-batch-size (?) | int | 在程序启动或刷新元数据时,单个批次加载表元数据的数量 | 1000 | diff --git a/docs/document/content/user-manual/common-config/props.en.md b/docs/document/content/user-manual/common-config/props.en.md index 17e749ec45046..441dcadf95e58 100644 --- a/docs/document/content/user-manual/common-config/props.en.md +++ b/docs/document/content/user-manual/common-config/props.en.md @@ -16,6 +16,7 @@ Apache ShardingSphere provides the way of property configuration to configure sy | sql-simple (?) | boolean | Whether show SQL details in simple style | false | | kernel-executor-size (?) | int | The max thread size of worker group to execute SQL. One ShardingSphereDataSource will use a independent thread pool, it does not share thread pool even different data source in same JVM | infinite | | max-connections-size-per-query (?) | int | Max opened connection size for each query | 1 | +| max-union-size-per-datasource (?) | int | Max union size per datasource for aggregate rewrite. When route units count for a datasource exceeds this value, they will be split into batches to restore parallel execution capability | Integer.MAX_VALUE | | check-table-metadata-enabled (?) | boolean | Whether validate table meta data consistency when application startup or updated | false | | load-table-metadata-batch-size (?) | int | The number of table metadata loaded at a time when application startup or refreshes table metadata | 1000 | diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java index d1735da3ee598..e27fd89b943cb 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java @@ -53,6 +53,12 @@ public enum ConfigurationPropertyKey implements TypedPropertyKey { */ MAX_CONNECTIONS_SIZE_PER_QUERY("max-connections-size-per-query", String.valueOf(1), int.class, false), + /** + * Max union size per datasource for aggregate rewrite. + * When route units count for a datasource exceeds this value, they will be split into batches. + */ + MAX_UNION_SIZE_PER_DATASOURCE("max-union-size-per-datasource", String.valueOf(Integer.MAX_VALUE), int.class, false), + /** * Whether validate table metadata consistency when application startup or updated. */ diff --git a/infra/common/src/test/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertiesTest.java b/infra/common/src/test/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertiesTest.java index 1311933928db1..70a3899259716 100644 --- a/infra/common/src/test/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertiesTest.java +++ b/infra/common/src/test/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertiesTest.java @@ -40,6 +40,7 @@ void assertGetValue() { assertTrue((Boolean) actual.getValue(ConfigurationPropertyKey.SQL_SIMPLE)); assertThat(actual.getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE), is(20)); assertThat(actual.getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY), is(20)); + assertThat(actual.getValue(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE), is(100)); assertTrue((Boolean) actual.getValue(ConfigurationPropertyKey.CHECK_TABLE_METADATA_ENABLED)); assertThat(actual.getValue(ConfigurationPropertyKey.LOAD_TABLE_METADATA_BATCH_SIZE), is(500)); assertThat(actual.getValue(ConfigurationPropertyKey.PROXY_FRONTEND_DATABASE_PROTOCOL_TYPE), is(TypedSPILoader.getService(DatabaseType.class, "PostgreSQL"))); @@ -63,6 +64,7 @@ private Properties createProperties() { new Property(ConfigurationPropertyKey.SQL_SIMPLE.getKey(), Boolean.TRUE.toString()), new Property(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE.getKey(), "20"), new Property(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(), "20"), + new Property(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE.getKey(), "100"), new Property(ConfigurationPropertyKey.CHECK_TABLE_METADATA_ENABLED.getKey(), Boolean.TRUE.toString()), new Property(ConfigurationPropertyKey.LOAD_TABLE_METADATA_BATCH_SIZE.getKey(), "500"), new Property(ConfigurationPropertyKey.PROXY_FRONTEND_DATABASE_PROTOCOL_TYPE.getKey(), "PostgreSQL"), @@ -87,6 +89,7 @@ void assertGetDefaultValue() { assertFalse((Boolean) actual.getValue(ConfigurationPropertyKey.SQL_SIMPLE)); assertThat(actual.getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE), is(0)); assertThat(actual.getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY), is(1)); + assertThat(actual.getValue(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE), is(Integer.MAX_VALUE)); assertFalse((Boolean) actual.getValue(ConfigurationPropertyKey.CHECK_TABLE_METADATA_ENABLED)); assertThat(actual.getValue(ConfigurationPropertyKey.LOAD_TABLE_METADATA_BATCH_SIZE), is(1000)); assertNull(actual.getValue(ConfigurationPropertyKey.PROXY_FRONTEND_DATABASE_PROTOCOL_TYPE)); diff --git a/infra/rewrite/core/src/main/java/org/apache/shardingsphere/infra/rewrite/engine/RouteSQLRewriteEngine.java b/infra/rewrite/core/src/main/java/org/apache/shardingsphere/infra/rewrite/engine/RouteSQLRewriteEngine.java index 17e03cc1a5516..0429e7679e77e 100644 --- a/infra/rewrite/core/src/main/java/org/apache/shardingsphere/infra/rewrite/engine/RouteSQLRewriteEngine.java +++ b/infra/rewrite/core/src/main/java/org/apache/shardingsphere/infra/rewrite/engine/RouteSQLRewriteEngine.java @@ -20,6 +20,7 @@ import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.database.connector.core.type.DatabaseType; import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation; +import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey; import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext; import org.apache.shardingsphere.infra.binder.context.statement.type.dml.SelectStatementContext; import org.apache.shardingsphere.infra.datanode.DataNode; @@ -73,15 +74,18 @@ public final class RouteSQLRewriteEngine { * @return SQL rewrite result */ public RouteSQLRewriteResult rewrite(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext, final QueryContext queryContext) { - return new RouteSQLRewriteResult(translate(queryContext, createSQLRewriteUnits(sqlRewriteContext, routeContext))); + int maxUnionSizePerDataSource = Optional.ofNullable(queryContext.getMetaData().getProps()) + .map(props -> props.getValue(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE)) + .orElse(Integer.parseInt(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE.getDefaultValue())); + return new RouteSQLRewriteResult(translate(queryContext, createSQLRewriteUnits(sqlRewriteContext, routeContext, maxUnionSizePerDataSource))); } - private Map createSQLRewriteUnits(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) { + private Map createSQLRewriteUnits(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext, final int maxUnionSizePerDataSource) { Map result = new LinkedHashMap<>(routeContext.getRouteUnits().size(), 1F); for (Entry> entry : aggregateRouteUnitGroups(routeContext.getRouteUnits()).entrySet()) { List routeUnits = entry.getValue(); - if (isNeedAggregateRewrite(sqlRewriteContext.getSqlStatementContext(), routeUnits)) { - result.put(routeUnits.get(ThreadLocalRandom.current().nextInt(routeUnits.size())), createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits)); + if (isNeedAggregateRewrite(sqlRewriteContext.getSqlStatementContext(), routeUnits, maxUnionSizePerDataSource)) { + createAggregatedRewriteUnits(sqlRewriteContext, routeContext, routeUnits, maxUnionSizePerDataSource, result); } else { for (RouteUnit each : routeUnits) { result.put(each, createSQLRewriteUnit(sqlRewriteContext, routeContext, each)); @@ -91,6 +95,25 @@ private Map createSQLRewriteUnits(final SQLRewriteCon return result; } + private void createAggregatedRewriteUnits(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext, + final List routeUnits, final int maxUnionSizePerDataSource, final Map result) { + if (routeUnits.size() <= maxUnionSizePerDataSource) { + result.put(routeUnits.get(ThreadLocalRandom.current().nextInt(routeUnits.size())), createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits)); + } else { + for (List batch : partitionRouteUnits(routeUnits, maxUnionSizePerDataSource)) { + result.put(batch.get(ThreadLocalRandom.current().nextInt(batch.size())), createSQLRewriteUnit(sqlRewriteContext, routeContext, batch)); + } + } + } + + private List> partitionRouteUnits(final List routeUnits, final int batchSize) { + List> result = new ArrayList<>(); + for (int i = 0; i < routeUnits.size(); i += batchSize) { + result.add(routeUnits.subList(i, Math.min(i + batchSize, routeUnits.size()))); + } + return result; + } + private Map> aggregateRouteUnitGroups(final Collection routeUnits) { Map> result = new LinkedHashMap<>(routeUnits.size(), 1F); for (RouteUnit each : routeUnits) { @@ -99,8 +122,8 @@ private Map> aggregateRouteUnitGroups(final Collection routeUnits) { - if (!(sqlStatementContext instanceof SelectStatementContext) || 1 == routeUnits.size()) { + private boolean isNeedAggregateRewrite(final SQLStatementContext sqlStatementContext, final Collection routeUnits, final int maxUnionSizePerDataSource) { + if (!(sqlStatementContext instanceof SelectStatementContext) || 1 == routeUnits.size() || 1 == maxUnionSizePerDataSource) { return false; } SelectStatementContext statementContext = (SelectStatementContext) sqlStatementContext; diff --git a/infra/rewrite/core/src/test/java/org/apache/shardingsphere/infra/rewrite/engine/RouteSQLRewriteEngineTest.java b/infra/rewrite/core/src/test/java/org/apache/shardingsphere/infra/rewrite/engine/RouteSQLRewriteEngineTest.java index a7b263423d96b..55801a0849534 100644 --- a/infra/rewrite/core/src/test/java/org/apache/shardingsphere/infra/rewrite/engine/RouteSQLRewriteEngineTest.java +++ b/infra/rewrite/core/src/test/java/org/apache/shardingsphere/infra/rewrite/engine/RouteSQLRewriteEngineTest.java @@ -21,8 +21,11 @@ import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext; import org.apache.shardingsphere.infra.binder.context.statement.type.dml.InsertStatementContext; import org.apache.shardingsphere.infra.binder.context.statement.type.dml.SelectStatementContext; +import org.apache.shardingsphere.infra.config.props.ConfigurationProperties; +import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey; import org.apache.shardingsphere.infra.datanode.DataNode; import org.apache.shardingsphere.infra.hint.HintValueContext; +import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit; import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData; @@ -40,6 +43,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Map; +import java.util.Properties; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -70,11 +74,25 @@ void assertRewriteWithStandardParameterBuilder() { } private QueryContext mockQueryContext(final SQLStatementContext sqlStatementContext, final String sql) { + return mockQueryContext(sqlStatementContext, sql, Integer.MAX_VALUE); + } + + private QueryContext mockQueryContext(final SQLStatementContext sqlStatementContext, final String sql, final int maxUnionSizePerDataSource) { QueryContext result = mock(QueryContext.class, RETURNS_DEEP_STUBS); when(result.getSqlStatementContext()).thenReturn(sqlStatementContext); when(result.getSql()).thenReturn(sql); when(result.getParameters()).thenReturn(Collections.singletonList(1)); when(result.getHintValueContext()).thenReturn(new HintValueContext()); + ShardingSphereMetaData metaData = mock(ShardingSphereMetaData.class); + ConfigurationProperties props = new ConfigurationProperties(createPropsWithMaxUnionSizePerDataSource(maxUnionSizePerDataSource)); + when(metaData.getProps()).thenReturn(props); + when(result.getMetaData()).thenReturn(metaData); + return result; + } + + private Properties createPropsWithMaxUnionSizePerDataSource(final int maxUnionSizePerDataSource) { + Properties result = new Properties(); + result.setProperty(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE.getKey(), String.valueOf(maxUnionSizePerDataSource)); return result; } @@ -207,4 +225,64 @@ private Map mockStorageUnits(final DatabaseType databaseTyp when(result.getStorageType()).thenReturn(databaseType); return Collections.singletonMap("ds_0", result); } + + @Test + void assertRewriteWithBatchUnionWhenExceedsMaxUnionSizePerDataSource() { + SelectStatementContext statementContext = mock(SelectStatementContext.class, RETURNS_DEEP_STUBS); + when(statementContext.getOrderByContext().getItems()).thenReturn(Collections.emptyList()); + when(statementContext.getPaginationContext().isHasPagination()).thenReturn(false); + DatabaseType databaseType = mock(DatabaseType.class); + when(statementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType); + ShardingSphereDatabase database = mockDatabase(databaseType); + QueryContext queryContext = mockQueryContext(statementContext, "SELECT ?", 2); + SQLRewriteContext sqlRewriteContext = new SQLRewriteContext(database, queryContext); + RouteContext routeContext = new RouteContext(); + for (int i = 0; i < 5; i++) { + routeContext.getRouteUnits().add(new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_" + i)))); + } + RouteSQLRewriteResult actual = new RouteSQLRewriteEngine( + new SQLTranslatorRule(new DefaultSQLTranslatorRuleConfigurationBuilder().build()), database, mock(RuleMetaData.class)).rewrite(sqlRewriteContext, routeContext, queryContext); + assertThat(actual.getSqlRewriteUnits().size(), is(3)); + } + + @Test + void assertRewriteWithMaxUnionSizePerDataSourceEqualsRouteUnitsCount() { + SelectStatementContext statementContext = mock(SelectStatementContext.class, RETURNS_DEEP_STUBS); + when(statementContext.getOrderByContext().getItems()).thenReturn(Collections.emptyList()); + when(statementContext.getPaginationContext().isHasPagination()).thenReturn(false); + DatabaseType databaseType = mock(DatabaseType.class); + when(statementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType); + ShardingSphereDatabase database = mockDatabase(databaseType); + QueryContext queryContext = mockQueryContext(statementContext, "SELECT ?", 2); + SQLRewriteContext sqlRewriteContext = new SQLRewriteContext(database, queryContext); + RouteContext routeContext = new RouteContext(); + RouteUnit firstRouteUnit = new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_0"))); + RouteUnit secondRouteUnit = new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_1"))); + routeContext.getRouteUnits().add(firstRouteUnit); + routeContext.getRouteUnits().add(secondRouteUnit); + RouteSQLRewriteResult actual = new RouteSQLRewriteEngine( + new SQLTranslatorRule(new DefaultSQLTranslatorRuleConfigurationBuilder().build()), database, mock(RuleMetaData.class)).rewrite(sqlRewriteContext, routeContext, queryContext); + assertThat(actual.getSqlRewriteUnits().size(), is(1)); + assertThat(actual.getSqlRewriteUnits().values().iterator().next().getSql(), is("SELECT ? UNION ALL SELECT ?")); + } + + @Test + void assertRewriteWithMaxUnionSizePerDataSourceOne() { + SelectStatementContext statementContext = mock(SelectStatementContext.class, RETURNS_DEEP_STUBS); + when(statementContext.getOrderByContext().getItems()).thenReturn(Collections.emptyList()); + when(statementContext.getPaginationContext().isHasPagination()).thenReturn(false); + DatabaseType databaseType = mock(DatabaseType.class); + when(statementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType); + ShardingSphereDatabase database = mockDatabase(databaseType); + QueryContext queryContext = mockQueryContext(statementContext, "SELECT ?", 1); + SQLRewriteContext sqlRewriteContext = new SQLRewriteContext(database, queryContext); + RouteContext routeContext = new RouteContext(); + RouteUnit firstRouteUnit = new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_0"))); + RouteUnit secondRouteUnit = new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_1"))); + routeContext.getRouteUnits().add(firstRouteUnit); + routeContext.getRouteUnits().add(secondRouteUnit); + RouteSQLRewriteResult actual = new RouteSQLRewriteEngine( + new SQLTranslatorRule(new DefaultSQLTranslatorRuleConfigurationBuilder().build()), database, mock(RuleMetaData.class)).rewrite(sqlRewriteContext, routeContext, queryContext); + assertThat(actual.getSqlRewriteUnits().size(), is(2)); + } } diff --git a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/variable/ShowDistVariablesExecutorTest.java b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/variable/ShowDistVariablesExecutorTest.java index 573875460abff..9d5f172451052 100644 --- a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/variable/ShowDistVariablesExecutorTest.java +++ b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/variable/ShowDistVariablesExecutorTest.java @@ -52,7 +52,7 @@ void assertExecute() { executor.setConnectionContext(new DistSQLConnectionContext(mock(QueryContext.class), 1, mock(DatabaseType.class), mock(DatabaseConnectionManager.class), mock(ExecutorStatementManager.class))); Collection actual = executor.getRows(mock(ShowDistVariablesStatement.class), contextManager); - assertThat(actual.size(), is(21)); + assertThat(actual.size(), is(22)); LocalDataQueryResultRow row = actual.iterator().next(); assertThat(row.getCell(1), is("agent_plugins_enabled")); assertThat(row.getCell(2), is("false")); diff --git a/proxy/frontend/dialect/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java b/proxy/frontend/dialect/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java index 8a6b218531a2a..31f10f0acc020 100644 --- a/proxy/frontend/dialect/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java +++ b/proxy/frontend/dialect/opengauss/src/test/java/org/apache/shardingsphere/proxy/frontend/opengauss/command/query/extended/bind/OpenGaussComBatchBindExecutorTest.java @@ -137,6 +137,7 @@ private ContextManager mockContextManager() { when(result.getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE)).thenReturn(0); when(result.getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY)).thenReturn(1); when(result.getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.SQL_SHOW)).thenReturn(false); + when(result.getMetaDataContexts().getMetaData().getProps().getValue(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE)).thenReturn(Integer.MAX_VALUE); when(result.getMetaDataContexts().getMetaData().getGlobalRuleMetaData()).thenReturn(new RuleMetaData(Collections.singleton( new SQLTranslatorRule(new DefaultSQLTranslatorRuleConfigurationBuilder().build())))); ShardingSphereDatabase database = mockDatabase(); diff --git a/test/e2e/sql/src/test/resources/cases/ral/dataset/empty_rules/show_dist_variables.xml b/test/e2e/sql/src/test/resources/cases/ral/dataset/empty_rules/show_dist_variables.xml index c92f9db947f20..d101798372674 100644 --- a/test/e2e/sql/src/test/resources/cases/ral/dataset/empty_rules/show_dist_variables.xml +++ b/test/e2e/sql/src/test/resources/cases/ral/dataset/empty_rules/show_dist_variables.xml @@ -28,6 +28,7 @@ +