Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Apache ShardingSphere 提供属性配置的方式配置系统级配置。
| sql-simple (?) | boolean | 是否在日志中打印简单风格的 SQL | false |
| kernel-executor-size (?) | int | 用于设置任务处理线程池的大小<br />每个 ShardingSphereDataSource 使用一个独立的线程池,同一个 JVM 的不同数据源不共享线程池 | infinite |
| max-connections-size-per-query (?) | int | 一次查询请求在每个数据库实例中所能使用的最大连接数 | 1 |
| max-union-size-per-datasource (?) | int | 每个数据源允许合并的最大 UNION ALL 数量。当路由到同一数据源的路由单元数量超过此值时,将分批合并以恢复并行执行能力 | Integer.MAX_VALUE |
| check-table-metadata-enabled (?) | boolean | 在程序启动和更新时,是否检查分片元数据的结构一致性 | false |
| load-table-metadata-batch-size (?) | int | 在程序启动或刷新元数据时,单个批次加载表元数据的数量 | 1000 |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Apache ShardingSphere provides the way of property configuration to configure sy
| sql-simple (?) | boolean | Whether show SQL details in simple style | false |
| kernel-executor-size (?) | int | The max thread size of worker group to execute SQL. One ShardingSphereDataSource will use a independent thread pool, it does not share thread pool even different data source in same JVM | infinite |
| max-connections-size-per-query (?) | int | Max opened connection size for each query | 1 |
| max-union-size-per-datasource (?) | int | Max union size per datasource for aggregate rewrite. When route units count for a datasource exceeds this value, they will be split into batches to restore parallel execution capability | Integer.MAX_VALUE |
| check-table-metadata-enabled (?) | boolean | Whether validate table meta data consistency when application startup or updated | false |
| load-table-metadata-batch-size (?) | int | The number of table metadata loaded at a time when application startup or refreshes table metadata | 1000 |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public enum ConfigurationPropertyKey implements TypedPropertyKey {
*/
MAX_CONNECTIONS_SIZE_PER_QUERY("max-connections-size-per-query", String.valueOf(1), int.class, false),

/**
* Max union size per datasource for aggregate rewrite.
* When route units count for a datasource exceeds this value, they will be split into batches.
*/
MAX_UNION_SIZE_PER_DATASOURCE("max-union-size-per-datasource", String.valueOf(Integer.MAX_VALUE), int.class, false),

/**
* Whether validate table metadata consistency when application startup or updated.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ void assertGetValue() {
assertTrue((Boolean) actual.getValue(ConfigurationPropertyKey.SQL_SIMPLE));
assertThat(actual.getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE), is(20));
assertThat(actual.getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY), is(20));
assertThat(actual.getValue(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE), is(100));
assertTrue((Boolean) actual.getValue(ConfigurationPropertyKey.CHECK_TABLE_METADATA_ENABLED));
assertThat(actual.getValue(ConfigurationPropertyKey.LOAD_TABLE_METADATA_BATCH_SIZE), is(500));
assertThat(actual.getValue(ConfigurationPropertyKey.PROXY_FRONTEND_DATABASE_PROTOCOL_TYPE), is(TypedSPILoader.getService(DatabaseType.class, "PostgreSQL")));
Expand All @@ -63,6 +64,7 @@ private Properties createProperties() {
new Property(ConfigurationPropertyKey.SQL_SIMPLE.getKey(), Boolean.TRUE.toString()),
new Property(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE.getKey(), "20"),
new Property(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(), "20"),
new Property(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE.getKey(), "100"),
new Property(ConfigurationPropertyKey.CHECK_TABLE_METADATA_ENABLED.getKey(), Boolean.TRUE.toString()),
new Property(ConfigurationPropertyKey.LOAD_TABLE_METADATA_BATCH_SIZE.getKey(), "500"),
new Property(ConfigurationPropertyKey.PROXY_FRONTEND_DATABASE_PROTOCOL_TYPE.getKey(), "PostgreSQL"),
Expand All @@ -87,6 +89,7 @@ void assertGetDefaultValue() {
assertFalse((Boolean) actual.getValue(ConfigurationPropertyKey.SQL_SIMPLE));
assertThat(actual.getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE), is(0));
assertThat(actual.getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY), is(1));
assertThat(actual.getValue(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE), is(Integer.MAX_VALUE));
assertFalse((Boolean) actual.getValue(ConfigurationPropertyKey.CHECK_TABLE_METADATA_ENABLED));
assertThat(actual.getValue(ConfigurationPropertyKey.LOAD_TABLE_METADATA_BATCH_SIZE), is(1000));
assertNull(actual.getValue(ConfigurationPropertyKey.PROXY_FRONTEND_DATABASE_PROTOCOL_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.context.statement.type.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.datanode.DataNode;
Expand Down Expand Up @@ -73,15 +74,18 @@ public final class RouteSQLRewriteEngine {
* @return SQL rewrite result
*/
public RouteSQLRewriteResult rewrite(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext, final QueryContext queryContext) {
return new RouteSQLRewriteResult(translate(queryContext, createSQLRewriteUnits(sqlRewriteContext, routeContext)));
int maxUnionSizePerDataSource = Optional.ofNullable(queryContext.getMetaData().getProps())
.map(props -> props.<Integer>getValue(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE))
.orElse(Integer.parseInt(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE.getDefaultValue()));
return new RouteSQLRewriteResult(translate(queryContext, createSQLRewriteUnits(sqlRewriteContext, routeContext, maxUnionSizePerDataSource)));
}

private Map<RouteUnit, SQLRewriteUnit> createSQLRewriteUnits(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
private Map<RouteUnit, SQLRewriteUnit> createSQLRewriteUnits(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext, final int maxUnionSizePerDataSource) {
Map<RouteUnit, SQLRewriteUnit> result = new LinkedHashMap<>(routeContext.getRouteUnits().size(), 1F);
for (Entry<String, List<RouteUnit>> entry : aggregateRouteUnitGroups(routeContext.getRouteUnits()).entrySet()) {
List<RouteUnit> routeUnits = entry.getValue();
if (isNeedAggregateRewrite(sqlRewriteContext.getSqlStatementContext(), routeUnits)) {
result.put(routeUnits.get(ThreadLocalRandom.current().nextInt(routeUnits.size())), createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits));
if (isNeedAggregateRewrite(sqlRewriteContext.getSqlStatementContext(), routeUnits, maxUnionSizePerDataSource)) {
createAggregatedRewriteUnits(sqlRewriteContext, routeContext, routeUnits, maxUnionSizePerDataSource, result);
} else {
for (RouteUnit each : routeUnits) {
result.put(each, createSQLRewriteUnit(sqlRewriteContext, routeContext, each));
Expand All @@ -91,6 +95,25 @@ private Map<RouteUnit, SQLRewriteUnit> createSQLRewriteUnits(final SQLRewriteCon
return result;
}

private void createAggregatedRewriteUnits(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext,
final List<RouteUnit> routeUnits, final int maxUnionSizePerDataSource, final Map<RouteUnit, SQLRewriteUnit> result) {
if (routeUnits.size() <= maxUnionSizePerDataSource) {
result.put(routeUnits.get(ThreadLocalRandom.current().nextInt(routeUnits.size())), createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits));
} else {
for (List<RouteUnit> batch : partitionRouteUnits(routeUnits, maxUnionSizePerDataSource)) {
result.put(batch.get(ThreadLocalRandom.current().nextInt(batch.size())), createSQLRewriteUnit(sqlRewriteContext, routeContext, batch));
}
}
}

private List<List<RouteUnit>> partitionRouteUnits(final List<RouteUnit> routeUnits, final int batchSize) {
List<List<RouteUnit>> result = new ArrayList<>();
for (int i = 0; i < routeUnits.size(); i += batchSize) {
result.add(routeUnits.subList(i, Math.min(i + batchSize, routeUnits.size())));
}
return result;
}

private Map<String, List<RouteUnit>> aggregateRouteUnitGroups(final Collection<RouteUnit> routeUnits) {
Map<String, List<RouteUnit>> result = new LinkedHashMap<>(routeUnits.size(), 1F);
for (RouteUnit each : routeUnits) {
Expand All @@ -99,8 +122,8 @@ private Map<String, List<RouteUnit>> aggregateRouteUnitGroups(final Collection<R
return result;
}

private boolean isNeedAggregateRewrite(final SQLStatementContext sqlStatementContext, final Collection<RouteUnit> routeUnits) {
if (!(sqlStatementContext instanceof SelectStatementContext) || 1 == routeUnits.size()) {
private boolean isNeedAggregateRewrite(final SQLStatementContext sqlStatementContext, final Collection<RouteUnit> routeUnits, final int maxUnionSizePerDataSource) {
if (!(sqlStatementContext instanceof SelectStatementContext) || 1 == routeUnits.size() || 1 == maxUnionSizePerDataSource) {
return false;
}
SelectStatementContext statementContext = (SelectStatementContext) sqlStatementContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.context.statement.type.dml.InsertStatementContext;
import org.apache.shardingsphere.infra.binder.context.statement.type.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
Expand All @@ -40,6 +43,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -70,11 +74,25 @@ void assertRewriteWithStandardParameterBuilder() {
}

private QueryContext mockQueryContext(final SQLStatementContext sqlStatementContext, final String sql) {
return mockQueryContext(sqlStatementContext, sql, Integer.MAX_VALUE);
}

private QueryContext mockQueryContext(final SQLStatementContext sqlStatementContext, final String sql, final int maxUnionSizePerDataSource) {
QueryContext result = mock(QueryContext.class, RETURNS_DEEP_STUBS);
when(result.getSqlStatementContext()).thenReturn(sqlStatementContext);
when(result.getSql()).thenReturn(sql);
when(result.getParameters()).thenReturn(Collections.singletonList(1));
when(result.getHintValueContext()).thenReturn(new HintValueContext());
ShardingSphereMetaData metaData = mock(ShardingSphereMetaData.class);
ConfigurationProperties props = new ConfigurationProperties(createPropsWithMaxUnionSizePerDataSource(maxUnionSizePerDataSource));
when(metaData.getProps()).thenReturn(props);
when(result.getMetaData()).thenReturn(metaData);
return result;
}

private Properties createPropsWithMaxUnionSizePerDataSource(final int maxUnionSizePerDataSource) {
Properties result = new Properties();
result.setProperty(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE.getKey(), String.valueOf(maxUnionSizePerDataSource));
return result;
}

Expand Down Expand Up @@ -207,4 +225,64 @@ private Map<String, StorageUnit> mockStorageUnits(final DatabaseType databaseTyp
when(result.getStorageType()).thenReturn(databaseType);
return Collections.singletonMap("ds_0", result);
}

@Test
void assertRewriteWithBatchUnionWhenExceedsMaxUnionSizePerDataSource() {
SelectStatementContext statementContext = mock(SelectStatementContext.class, RETURNS_DEEP_STUBS);
when(statementContext.getOrderByContext().getItems()).thenReturn(Collections.emptyList());
when(statementContext.getPaginationContext().isHasPagination()).thenReturn(false);
DatabaseType databaseType = mock(DatabaseType.class);
when(statementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
ShardingSphereDatabase database = mockDatabase(databaseType);
QueryContext queryContext = mockQueryContext(statementContext, "SELECT ?", 2);
SQLRewriteContext sqlRewriteContext = new SQLRewriteContext(database, queryContext);
RouteContext routeContext = new RouteContext();
for (int i = 0; i < 5; i++) {
routeContext.getRouteUnits().add(new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_" + i))));
}
RouteSQLRewriteResult actual = new RouteSQLRewriteEngine(
new SQLTranslatorRule(new DefaultSQLTranslatorRuleConfigurationBuilder().build()), database, mock(RuleMetaData.class)).rewrite(sqlRewriteContext, routeContext, queryContext);
assertThat(actual.getSqlRewriteUnits().size(), is(3));
}

@Test
void assertRewriteWithMaxUnionSizePerDataSourceEqualsRouteUnitsCount() {
SelectStatementContext statementContext = mock(SelectStatementContext.class, RETURNS_DEEP_STUBS);
when(statementContext.getOrderByContext().getItems()).thenReturn(Collections.emptyList());
when(statementContext.getPaginationContext().isHasPagination()).thenReturn(false);
DatabaseType databaseType = mock(DatabaseType.class);
when(statementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
ShardingSphereDatabase database = mockDatabase(databaseType);
QueryContext queryContext = mockQueryContext(statementContext, "SELECT ?", 2);
SQLRewriteContext sqlRewriteContext = new SQLRewriteContext(database, queryContext);
RouteContext routeContext = new RouteContext();
RouteUnit firstRouteUnit = new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_0")));
RouteUnit secondRouteUnit = new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_1")));
routeContext.getRouteUnits().add(firstRouteUnit);
routeContext.getRouteUnits().add(secondRouteUnit);
RouteSQLRewriteResult actual = new RouteSQLRewriteEngine(
new SQLTranslatorRule(new DefaultSQLTranslatorRuleConfigurationBuilder().build()), database, mock(RuleMetaData.class)).rewrite(sqlRewriteContext, routeContext, queryContext);
assertThat(actual.getSqlRewriteUnits().size(), is(1));
assertThat(actual.getSqlRewriteUnits().values().iterator().next().getSql(), is("SELECT ? UNION ALL SELECT ?"));
}

@Test
void assertRewriteWithMaxUnionSizePerDataSourceOne() {
SelectStatementContext statementContext = mock(SelectStatementContext.class, RETURNS_DEEP_STUBS);
when(statementContext.getOrderByContext().getItems()).thenReturn(Collections.emptyList());
when(statementContext.getPaginationContext().isHasPagination()).thenReturn(false);
DatabaseType databaseType = mock(DatabaseType.class);
when(statementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
ShardingSphereDatabase database = mockDatabase(databaseType);
QueryContext queryContext = mockQueryContext(statementContext, "SELECT ?", 1);
SQLRewriteContext sqlRewriteContext = new SQLRewriteContext(database, queryContext);
RouteContext routeContext = new RouteContext();
RouteUnit firstRouteUnit = new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_0")));
RouteUnit secondRouteUnit = new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_1")));
routeContext.getRouteUnits().add(firstRouteUnit);
routeContext.getRouteUnits().add(secondRouteUnit);
RouteSQLRewriteResult actual = new RouteSQLRewriteEngine(
new SQLTranslatorRule(new DefaultSQLTranslatorRuleConfigurationBuilder().build()), database, mock(RuleMetaData.class)).rewrite(sqlRewriteContext, routeContext, queryContext);
assertThat(actual.getSqlRewriteUnits().size(), is(2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void assertExecute() {
executor.setConnectionContext(new DistSQLConnectionContext(mock(QueryContext.class), 1,
mock(DatabaseType.class), mock(DatabaseConnectionManager.class), mock(ExecutorStatementManager.class)));
Collection<LocalDataQueryResultRow> actual = executor.getRows(mock(ShowDistVariablesStatement.class), contextManager);
assertThat(actual.size(), is(21));
assertThat(actual.size(), is(22));
LocalDataQueryResultRow row = actual.iterator().next();
assertThat(row.getCell(1), is("agent_plugins_enabled"));
assertThat(row.getCell(2), is("false"));
Expand Down
Loading