Skip to content

Commit f59e188

Browse files
committed
Add max-union-size-per-datasource Property
1 parent ffa1b59 commit f59e188

9 files changed

Lines changed: 122 additions & 7 deletions

File tree

docs/document/content/user-manual/common-config/props.cn.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Apache ShardingSphere 提供属性配置的方式配置系统级配置。
1616
| sql-simple (?) | boolean | 是否在日志中打印简单风格的 SQL | false |
1717
| kernel-executor-size (?) | int | 用于设置任务处理线程池的大小<br />每个 ShardingSphereDataSource 使用一个独立的线程池,同一个 JVM 的不同数据源不共享线程池 | infinite |
1818
| max-connections-size-per-query (?) | int | 一次查询请求在每个数据库实例中所能使用的最大连接数 | 1 |
19+
| max-union-size-per-datasource (?) | int | 每个数据源允许合并的最大 UNION ALL 数量。当路由到同一数据源的路由单元数量超过此值时,将分批合并以恢复并行执行能力 | Integer.MAX_VALUE |
1920
| check-table-metadata-enabled (?) | boolean | 在程序启动和更新时,是否检查分片元数据的结构一致性 | false |
2021
| load-table-metadata-batch-size (?) | int | 在程序启动或刷新元数据时,单个批次加载表元数据的数量 | 1000 |
2122

docs/document/content/user-manual/common-config/props.en.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Apache ShardingSphere provides the way of property configuration to configure sy
1616
| sql-simple (?) | boolean | Whether show SQL details in simple style | false |
1717
| kernel-executor-size (?) | int | The max thread size of worker group to execute SQL. One ShardingSphereDataSource will use a independent thread pool, it does not share thread pool even different data source in same JVM | infinite |
1818
| max-connections-size-per-query (?) | int | Max opened connection size for each query | 1 |
19+
| max-union-size-per-datasource (?) | int | Max union size per datasource for aggregate rewrite. When route units count for a datasource exceeds this value, they will be split into batches to restore parallel execution capability | Integer.MAX_VALUE |
1920
| check-table-metadata-enabled (?) | boolean | Whether validate table meta data consistency when application startup or updated | false |
2021
| load-table-metadata-batch-size (?) | int | The number of table metadata loaded at a time when application startup or refreshes table metadata | 1000 |
2122

infra/common/src/main/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertyKey.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ public enum ConfigurationPropertyKey implements TypedPropertyKey {
5353
*/
5454
MAX_CONNECTIONS_SIZE_PER_QUERY("max-connections-size-per-query", String.valueOf(1), int.class, false),
5555

56+
/**
57+
* Max union size per datasource for aggregate rewrite.
58+
* When route units count for a datasource exceeds this value, they will be split into batches.
59+
*/
60+
MAX_UNION_SIZE_PER_DATASOURCE("max-union-size-per-datasource", String.valueOf(Integer.MAX_VALUE), int.class, false),
61+
5662
/**
5763
* Whether validate table metadata consistency when application startup or updated.
5864
*/

infra/common/src/test/java/org/apache/shardingsphere/infra/config/props/ConfigurationPropertiesTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ void assertGetValue() {
4040
assertTrue((Boolean) actual.getValue(ConfigurationPropertyKey.SQL_SIMPLE));
4141
assertThat(actual.getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE), is(20));
4242
assertThat(actual.getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY), is(20));
43+
assertThat(actual.getValue(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE), is(100));
4344
assertTrue((Boolean) actual.getValue(ConfigurationPropertyKey.CHECK_TABLE_METADATA_ENABLED));
4445
assertThat(actual.getValue(ConfigurationPropertyKey.LOAD_TABLE_METADATA_BATCH_SIZE), is(500));
4546
assertThat(actual.getValue(ConfigurationPropertyKey.PROXY_FRONTEND_DATABASE_PROTOCOL_TYPE), is(TypedSPILoader.getService(DatabaseType.class, "PostgreSQL")));
@@ -63,6 +64,7 @@ private Properties createProperties() {
6364
new Property(ConfigurationPropertyKey.SQL_SIMPLE.getKey(), Boolean.TRUE.toString()),
6465
new Property(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE.getKey(), "20"),
6566
new Property(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(), "20"),
67+
new Property(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE.getKey(), "100"),
6668
new Property(ConfigurationPropertyKey.CHECK_TABLE_METADATA_ENABLED.getKey(), Boolean.TRUE.toString()),
6769
new Property(ConfigurationPropertyKey.LOAD_TABLE_METADATA_BATCH_SIZE.getKey(), "500"),
6870
new Property(ConfigurationPropertyKey.PROXY_FRONTEND_DATABASE_PROTOCOL_TYPE.getKey(), "PostgreSQL"),
@@ -87,6 +89,7 @@ void assertGetDefaultValue() {
8789
assertFalse((Boolean) actual.getValue(ConfigurationPropertyKey.SQL_SIMPLE));
8890
assertThat(actual.getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE), is(0));
8991
assertThat(actual.getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY), is(1));
92+
assertThat(actual.getValue(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE), is(Integer.MAX_VALUE));
9093
assertFalse((Boolean) actual.getValue(ConfigurationPropertyKey.CHECK_TABLE_METADATA_ENABLED));
9194
assertThat(actual.getValue(ConfigurationPropertyKey.LOAD_TABLE_METADATA_BATCH_SIZE), is(1000));
9295
assertNull(actual.getValue(ConfigurationPropertyKey.PROXY_FRONTEND_DATABASE_PROTOCOL_TYPE));

infra/rewrite/core/src/main/java/org/apache/shardingsphere/infra/rewrite/engine/RouteSQLRewriteEngine.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import lombok.RequiredArgsConstructor;
2121
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
2222
import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
23+
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
24+
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
2325
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
2426
import org.apache.shardingsphere.infra.binder.context.statement.type.dml.SelectStatementContext;
2527
import org.apache.shardingsphere.infra.datanode.DataNode;
@@ -73,15 +75,18 @@ public final class RouteSQLRewriteEngine {
7375
* @return SQL rewrite result
7476
*/
7577
public RouteSQLRewriteResult rewrite(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext, final QueryContext queryContext) {
76-
return new RouteSQLRewriteResult(translate(queryContext, createSQLRewriteUnits(sqlRewriteContext, routeContext)));
78+
int maxUnionSizePerDataSource = Optional.ofNullable(queryContext.getMetaData().getProps())
79+
.map(props -> props.<Integer>getValue(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE))
80+
.orElse(Integer.parseInt(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE.getDefaultValue()));
81+
return new RouteSQLRewriteResult(translate(queryContext, createSQLRewriteUnits(sqlRewriteContext, routeContext, maxUnionSizePerDataSource)));
7782
}
7883

79-
private Map<RouteUnit, SQLRewriteUnit> createSQLRewriteUnits(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) {
84+
private Map<RouteUnit, SQLRewriteUnit> createSQLRewriteUnits(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext, final int maxUnionSizePerDataSource) {
8085
Map<RouteUnit, SQLRewriteUnit> result = new LinkedHashMap<>(routeContext.getRouteUnits().size(), 1F);
8186
for (Entry<String, List<RouteUnit>> entry : aggregateRouteUnitGroups(routeContext.getRouteUnits()).entrySet()) {
8287
List<RouteUnit> routeUnits = entry.getValue();
83-
if (isNeedAggregateRewrite(sqlRewriteContext.getSqlStatementContext(), routeUnits)) {
84-
result.put(routeUnits.get(ThreadLocalRandom.current().nextInt(routeUnits.size())), createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits));
88+
if (isNeedAggregateRewrite(sqlRewriteContext.getSqlStatementContext(), routeUnits, maxUnionSizePerDataSource)) {
89+
createAggregatedRewriteUnits(sqlRewriteContext, routeContext, routeUnits, maxUnionSizePerDataSource, result);
8590
} else {
8691
for (RouteUnit each : routeUnits) {
8792
result.put(each, createSQLRewriteUnit(sqlRewriteContext, routeContext, each));
@@ -91,6 +96,25 @@ private Map<RouteUnit, SQLRewriteUnit> createSQLRewriteUnits(final SQLRewriteCon
9196
return result;
9297
}
9398

99+
private void createAggregatedRewriteUnits(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext,
100+
final List<RouteUnit> routeUnits, final int maxUnionSizePerDataSource, final Map<RouteUnit, SQLRewriteUnit> result) {
101+
if (routeUnits.size() <= maxUnionSizePerDataSource) {
102+
result.put(routeUnits.get(ThreadLocalRandom.current().nextInt(routeUnits.size())), createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits));
103+
} else {
104+
for (List<RouteUnit> batch : partitionRouteUnits(routeUnits, maxUnionSizePerDataSource)) {
105+
result.put(batch.get(ThreadLocalRandom.current().nextInt(batch.size())), createSQLRewriteUnit(sqlRewriteContext, routeContext, batch));
106+
}
107+
}
108+
}
109+
110+
private List<List<RouteUnit>> partitionRouteUnits(final List<RouteUnit> routeUnits, final int batchSize) {
111+
List<List<RouteUnit>> result = new ArrayList<>();
112+
for (int i = 0; i < routeUnits.size(); i += batchSize) {
113+
result.add(routeUnits.subList(i, Math.min(i + batchSize, routeUnits.size())));
114+
}
115+
return result;
116+
}
117+
94118
private Map<String, List<RouteUnit>> aggregateRouteUnitGroups(final Collection<RouteUnit> routeUnits) {
95119
Map<String, List<RouteUnit>> result = new LinkedHashMap<>(routeUnits.size(), 1F);
96120
for (RouteUnit each : routeUnits) {
@@ -99,8 +123,8 @@ private Map<String, List<RouteUnit>> aggregateRouteUnitGroups(final Collection<R
99123
return result;
100124
}
101125

102-
private boolean isNeedAggregateRewrite(final SQLStatementContext sqlStatementContext, final Collection<RouteUnit> routeUnits) {
103-
if (!(sqlStatementContext instanceof SelectStatementContext) || 1 == routeUnits.size()) {
126+
private boolean isNeedAggregateRewrite(final SQLStatementContext sqlStatementContext, final Collection<RouteUnit> routeUnits, final int maxUnionSizePerDataSource) {
127+
if (!(sqlStatementContext instanceof SelectStatementContext) || 1 == routeUnits.size() || 1 == maxUnionSizePerDataSource) {
104128
return false;
105129
}
106130
SelectStatementContext statementContext = (SelectStatementContext) sqlStatementContext;

infra/rewrite/core/src/test/java/org/apache/shardingsphere/infra/rewrite/engine/RouteSQLRewriteEngineTest.java

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@
2121
import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
2222
import org.apache.shardingsphere.infra.binder.context.statement.type.dml.InsertStatementContext;
2323
import org.apache.shardingsphere.infra.binder.context.statement.type.dml.SelectStatementContext;
24+
import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
25+
import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
2426
import org.apache.shardingsphere.infra.datanode.DataNode;
2527
import org.apache.shardingsphere.infra.hint.HintValueContext;
28+
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
2629
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
2730
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
2831
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
@@ -40,6 +43,7 @@
4043
import java.util.Arrays;
4144
import java.util.Collections;
4245
import java.util.Map;
46+
import java.util.Properties;
4347

4448
import static org.hamcrest.CoreMatchers.is;
4549
import static org.hamcrest.MatcherAssert.assertThat;
@@ -70,11 +74,25 @@ void assertRewriteWithStandardParameterBuilder() {
7074
}
7175

7276
private QueryContext mockQueryContext(final SQLStatementContext sqlStatementContext, final String sql) {
77+
return mockQueryContext(sqlStatementContext, sql, Integer.MAX_VALUE);
78+
}
79+
80+
private QueryContext mockQueryContext(final SQLStatementContext sqlStatementContext, final String sql, final int maxUnionSizePerDataSource) {
7381
QueryContext result = mock(QueryContext.class, RETURNS_DEEP_STUBS);
7482
when(result.getSqlStatementContext()).thenReturn(sqlStatementContext);
7583
when(result.getSql()).thenReturn(sql);
7684
when(result.getParameters()).thenReturn(Collections.singletonList(1));
7785
when(result.getHintValueContext()).thenReturn(new HintValueContext());
86+
ShardingSphereMetaData metaData = mock(ShardingSphereMetaData.class);
87+
ConfigurationProperties props = new ConfigurationProperties(createPropsWithMaxUnionSizePerDataSource(maxUnionSizePerDataSource));
88+
when(metaData.getProps()).thenReturn(props);
89+
when(result.getMetaData()).thenReturn(metaData);
90+
return result;
91+
}
92+
93+
private Properties createPropsWithMaxUnionSizePerDataSource(final int maxUnionSizePerDataSource) {
94+
Properties result = new Properties();
95+
result.setProperty(ConfigurationPropertyKey.MAX_UNION_SIZE_PER_DATASOURCE.getKey(), String.valueOf(maxUnionSizePerDataSource));
7896
return result;
7997
}
8098

@@ -207,4 +225,64 @@ private Map<String, StorageUnit> mockStorageUnits(final DatabaseType databaseTyp
207225
when(result.getStorageType()).thenReturn(databaseType);
208226
return Collections.singletonMap("ds_0", result);
209227
}
228+
229+
@Test
230+
void assertRewriteWithBatchUnionWhenExceedsMaxUnionSizePerDataSource() {
231+
SelectStatementContext statementContext = mock(SelectStatementContext.class, RETURNS_DEEP_STUBS);
232+
when(statementContext.getOrderByContext().getItems()).thenReturn(Collections.emptyList());
233+
when(statementContext.getPaginationContext().isHasPagination()).thenReturn(false);
234+
DatabaseType databaseType = mock(DatabaseType.class);
235+
when(statementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
236+
ShardingSphereDatabase database = mockDatabase(databaseType);
237+
QueryContext queryContext = mockQueryContext(statementContext, "SELECT ?", 2);
238+
SQLRewriteContext sqlRewriteContext = new SQLRewriteContext(database, queryContext);
239+
RouteContext routeContext = new RouteContext();
240+
for (int i = 0; i < 5; i++) {
241+
routeContext.getRouteUnits().add(new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_" + i))));
242+
}
243+
RouteSQLRewriteResult actual = new RouteSQLRewriteEngine(
244+
new SQLTranslatorRule(new DefaultSQLTranslatorRuleConfigurationBuilder().build()), database, mock(RuleMetaData.class)).rewrite(sqlRewriteContext, routeContext, queryContext);
245+
assertThat(actual.getSqlRewriteUnits().size(), is(3));
246+
}
247+
248+
@Test
249+
void assertRewriteWithMaxUnionSizePerDataSourceEqualsRouteUnitsCount() {
250+
SelectStatementContext statementContext = mock(SelectStatementContext.class, RETURNS_DEEP_STUBS);
251+
when(statementContext.getOrderByContext().getItems()).thenReturn(Collections.emptyList());
252+
when(statementContext.getPaginationContext().isHasPagination()).thenReturn(false);
253+
DatabaseType databaseType = mock(DatabaseType.class);
254+
when(statementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
255+
ShardingSphereDatabase database = mockDatabase(databaseType);
256+
QueryContext queryContext = mockQueryContext(statementContext, "SELECT ?", 2);
257+
SQLRewriteContext sqlRewriteContext = new SQLRewriteContext(database, queryContext);
258+
RouteContext routeContext = new RouteContext();
259+
RouteUnit firstRouteUnit = new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_0")));
260+
RouteUnit secondRouteUnit = new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_1")));
261+
routeContext.getRouteUnits().add(firstRouteUnit);
262+
routeContext.getRouteUnits().add(secondRouteUnit);
263+
RouteSQLRewriteResult actual = new RouteSQLRewriteEngine(
264+
new SQLTranslatorRule(new DefaultSQLTranslatorRuleConfigurationBuilder().build()), database, mock(RuleMetaData.class)).rewrite(sqlRewriteContext, routeContext, queryContext);
265+
assertThat(actual.getSqlRewriteUnits().size(), is(1));
266+
assertThat(actual.getSqlRewriteUnits().values().iterator().next().getSql(), is("SELECT ? UNION ALL SELECT ?"));
267+
}
268+
269+
@Test
270+
void assertRewriteWithMaxUnionSizePerDataSourceOne() {
271+
SelectStatementContext statementContext = mock(SelectStatementContext.class, RETURNS_DEEP_STUBS);
272+
when(statementContext.getOrderByContext().getItems()).thenReturn(Collections.emptyList());
273+
when(statementContext.getPaginationContext().isHasPagination()).thenReturn(false);
274+
DatabaseType databaseType = mock(DatabaseType.class);
275+
when(statementContext.getSqlStatement().getDatabaseType()).thenReturn(databaseType);
276+
ShardingSphereDatabase database = mockDatabase(databaseType);
277+
QueryContext queryContext = mockQueryContext(statementContext, "SELECT ?", 1);
278+
SQLRewriteContext sqlRewriteContext = new SQLRewriteContext(database, queryContext);
279+
RouteContext routeContext = new RouteContext();
280+
RouteUnit firstRouteUnit = new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_0")));
281+
RouteUnit secondRouteUnit = new RouteUnit(new RouteMapper("ds", "ds_0"), Collections.singletonList(new RouteMapper("tbl", "tbl_1")));
282+
routeContext.getRouteUnits().add(firstRouteUnit);
283+
routeContext.getRouteUnits().add(secondRouteUnit);
284+
RouteSQLRewriteResult actual = new RouteSQLRewriteEngine(
285+
new SQLTranslatorRule(new DefaultSQLTranslatorRuleConfigurationBuilder().build()), database, mock(RuleMetaData.class)).rewrite(sqlRewriteContext, routeContext, queryContext);
286+
assertThat(actual.getSqlRewriteUnits().size(), is(2));
287+
}
210288
}

proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/variable/ShowDistVariablesExecutorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ void assertExecute() {
5252
executor.setConnectionContext(new DistSQLConnectionContext(mock(QueryContext.class), 1,
5353
mock(DatabaseType.class), mock(DatabaseConnectionManager.class), mock(ExecutorStatementManager.class)));
5454
Collection<LocalDataQueryResultRow> actual = executor.getRows(mock(ShowDistVariablesStatement.class), contextManager);
55-
assertThat(actual.size(), is(21));
55+
assertThat(actual.size(), is(22));
5656
LocalDataQueryResultRow row = actual.iterator().next();
5757
assertThat(row.getCell(1), is("agent_plugins_enabled"));
5858
assertThat(row.getCell(2), is("false"));

0 commit comments

Comments
 (0)