Skip to content

Commit 0f04f33

Browse files
authored
Support ConnectionContextAware for DistSQLQueryExecutor (#37469)
* Add QueryContext param in DistSQLUpdateProxyBackendHandler constructor * Refactor DistSQLUpdateExecuteEngine constructor, replace ConnectionContext to DistSQLConnectionContext * Refactor DistSQLExecutorAwareSetter set method, remove ConnectionContext param * Update unit test
1 parent f4e34c3 commit 0f04f33

8 files changed

Lines changed: 43 additions & 38 deletions

File tree

infra/distsql-handler/src/main/java/org/apache/shardingsphere/distsql/handler/aware/DistSQLExecutorAwareSetter.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
2929
import org.apache.shardingsphere.infra.rewrite.sql.token.common.generator.aware.ConnectionContextAware;
3030
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
31-
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
3231
import org.apache.shardingsphere.mode.manager.ContextManager;
3332

3433
import java.util.Optional;
@@ -46,13 +45,11 @@ public final class DistSQLExecutorAwareSetter {
4645
*
4746
* @param contextManager context manager
4847
* @param database database
49-
* @param distSQLConnectionContext DistSQL connection context
50-
* @param connectionContext connection context
48+
* @param distsqlConnectionContext DistSQL connection context
5149
* @param sqlStatement DistSQL statement
5250
*/
5351
@SuppressWarnings("rawtypes")
54-
public void set(final ContextManager contextManager, final ShardingSphereDatabase database, final DistSQLConnectionContext distSQLConnectionContext,
55-
final ConnectionContext connectionContext, final DistSQLStatement sqlStatement) {
52+
public void set(final ContextManager contextManager, final ShardingSphereDatabase database, final DistSQLConnectionContext distsqlConnectionContext, final DistSQLStatement sqlStatement) {
5653
if (executor instanceof DistSQLExecutorDatabaseAware) {
5754
ShardingSpherePreconditions.checkNotNull(database, NoDatabaseSelectedException::new);
5855
((DistSQLExecutorDatabaseAware) executor).setDatabase(database);
@@ -64,10 +61,10 @@ public void set(final ContextManager contextManager, final ShardingSphereDatabas
6461
setRule((DistSQLExecutorRuleAware) executor, contextManager, database);
6562
}
6663
if (executor instanceof DistSQLExecutorConnectionContextAware) {
67-
((DistSQLExecutorConnectionContextAware) executor).setConnectionContext(distSQLConnectionContext);
64+
((DistSQLExecutorConnectionContextAware) executor).setConnectionContext(distsqlConnectionContext);
6865
}
6966
if (executor instanceof ConnectionContextAware) {
70-
((ConnectionContextAware) executor).setConnectionContext(connectionContext);
67+
((ConnectionContextAware) executor).setConnectionContext(distsqlConnectionContext.getQueryContext().getConnectionContext());
7168
}
7269
}
7370

infra/distsql-handler/src/main/java/org/apache/shardingsphere/distsql/handler/engine/query/DistSQLQueryExecuteEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public DistSQLQueryExecuteEngine(final DistSQLStatement sqlStatement, final Stri
6767
public void executeQuery() throws SQLException {
6868
DistSQLQueryExecutor<DistSQLStatement> executor = TypedSPILoader.getService(DistSQLQueryExecutor.class, sqlStatement.getClass());
6969
try {
70-
new DistSQLExecutorAwareSetter(executor).set(contextManager, null == databaseName ? null : contextManager.getDatabase(databaseName), distsqlConnectionContext, null, sqlStatement);
70+
new DistSQLExecutorAwareSetter(executor).set(contextManager, null == databaseName ? null : contextManager.getDatabase(databaseName), distsqlConnectionContext, sqlStatement);
7171
} catch (final UnsupportedSQLOperationException ignored) {
7272
columnNames = executor.getColumnNames(sqlStatement);
7373
rows = Collections.emptyList();

infra/distsql-handler/src/main/java/org/apache/shardingsphere/distsql/handler/engine/update/DistSQLUpdateExecuteEngine.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.shardingsphere.distsql.handler.engine.update;
1919

2020
import org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorAwareSetter;
21+
import org.apache.shardingsphere.distsql.handler.engine.DistSQLConnectionContext;
2122
import org.apache.shardingsphere.distsql.handler.engine.update.rdl.rule.engine.database.DatabaseRuleDefinitionExecuteEngine;
2223
import org.apache.shardingsphere.distsql.handler.engine.update.rdl.rule.engine.global.GlobalRuleDefinitionExecuteEngine;
2324
import org.apache.shardingsphere.distsql.handler.engine.update.rdl.rule.spi.database.DatabaseRuleDefinitionExecutor;
@@ -31,7 +32,6 @@
3132
import org.apache.shardingsphere.distsql.statement.type.rdl.rule.database.DatabaseRuleDefinitionStatement;
3233
import org.apache.shardingsphere.distsql.statement.type.rdl.rule.global.GlobalRuleDefinitionStatement;
3334
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
34-
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
3535
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
3636
import org.apache.shardingsphere.mode.manager.ContextManager;
3737

@@ -49,13 +49,14 @@ public final class DistSQLUpdateExecuteEngine {
4949

5050
private final String databaseName;
5151

52-
private final ConnectionContext connectionContext;
52+
private final DistSQLConnectionContext distsqlConnectionContext;
5353

54-
public DistSQLUpdateExecuteEngine(final DistSQLStatement sqlStatement, final String currentDatabaseName, final ContextManager contextManager, final ConnectionContext connectionContext) {
54+
public DistSQLUpdateExecuteEngine(final DistSQLStatement sqlStatement, final String currentDatabaseName,
55+
final ContextManager contextManager, final DistSQLConnectionContext distsqlConnectionContext) {
5556
this.sqlStatement = sqlStatement;
5657
this.contextManager = contextManager;
5758
databaseName = DatabaseNameUtils.getDatabaseName(sqlStatement, currentDatabaseName);
58-
this.connectionContext = connectionContext;
59+
this.distsqlConnectionContext = distsqlConnectionContext;
5960
}
6061

6162
/**
@@ -90,7 +91,7 @@ private void executeNormalUpdate() throws SQLException {
9091
Optional<AdvancedDistSQLUpdateExecutor> advancedExecutor = TypedSPILoader.findService(AdvancedDistSQLUpdateExecutor.class, sqlStatement.getClass());
9192
DistSQLUpdateExecutor executor = advancedExecutor.isPresent() ? advancedExecutor.get() : TypedSPILoader.getService(DistSQLUpdateExecutor.class, sqlStatement.getClass());
9293
ShardingSphereDatabase database = null == databaseName ? null : contextManager.getDatabase(databaseName);
93-
new DistSQLExecutorAwareSetter(executor).set(contextManager, database, null, connectionContext, sqlStatement);
94+
new DistSQLExecutorAwareSetter(executor).set(contextManager, database, distsqlConnectionContext, sqlStatement);
9495
new DistSQLExecutorRequiredChecker(executor).check(sqlStatement, contextManager, database);
9596
executor.executeUpdate(sqlStatement, contextManager);
9697
}

proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/DistSQLProxyBackendHandlerFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ public static ProxyBackendHandler newInstance(final DistSQLStatement sqlStatemen
5353
return new DistSQLQueryProxyBackendHandler(sqlStatement, queryContext, connectionSession, contextManager);
5454
}
5555
if (sqlStatement instanceof RDLStatement) {
56-
return new DistSQLUpdateProxyBackendHandler(sqlStatement, connectionSession, contextManager);
56+
return new DistSQLUpdateProxyBackendHandler(sqlStatement, queryContext, connectionSession, contextManager);
5757
}
5858
if (sqlStatement instanceof RALStatement) {
5959
return sqlStatement instanceof QueryableRALStatement
6060
? new DistSQLQueryProxyBackendHandler(sqlStatement, queryContext, connectionSession, contextManager)
61-
: new DistSQLUpdateProxyBackendHandler(sqlStatement, connectionSession, contextManager);
61+
: new DistSQLUpdateProxyBackendHandler(sqlStatement, queryContext, connectionSession, contextManager);
6262
}
6363
throw new UnsupportedSQLOperationException(sqlStatement.getClass().getName());
6464
}

proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/DistSQLUpdateProxyBackendHandler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package org.apache.shardingsphere.proxy.backend.handler.distsql;
1919

20+
import org.apache.shardingsphere.distsql.handler.engine.DistSQLConnectionContext;
2021
import org.apache.shardingsphere.distsql.handler.engine.update.DistSQLUpdateExecuteEngine;
2122
import org.apache.shardingsphere.distsql.statement.DistSQLStatement;
23+
import org.apache.shardingsphere.infra.session.query.QueryContext;
2224
import org.apache.shardingsphere.mode.manager.ContextManager;
2325
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
2426
import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
@@ -35,9 +37,12 @@ public final class DistSQLUpdateProxyBackendHandler implements DistSQLProxyBacke
3537

3638
private final DistSQLUpdateExecuteEngine engine;
3739

38-
public DistSQLUpdateProxyBackendHandler(final DistSQLStatement sqlStatement, final ConnectionSession connectionSession, final ContextManager contextManager) {
40+
public DistSQLUpdateProxyBackendHandler(final DistSQLStatement sqlStatement, final QueryContext queryContext, final ConnectionSession connectionSession, final ContextManager contextManager) {
3941
this.sqlStatement = sqlStatement;
40-
engine = new DistSQLUpdateExecuteEngine(sqlStatement, connectionSession.getUsedDatabaseName(), contextManager, connectionSession.getConnectionContext());
42+
DistSQLConnectionContext distsqlConnectionContext = new DistSQLConnectionContext(queryContext,
43+
connectionSession.getDatabaseConnectionManager().getConnectionSize(), connectionSession.getProtocolType(),
44+
connectionSession.getDatabaseConnectionManager(), connectionSession.getStatementManager());
45+
engine = new DistSQLUpdateExecuteEngine(sqlStatement, connectionSession.getUsedDatabaseName(), contextManager, distsqlConnectionContext);
4146
}
4247

4348
@Override

proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/DistSQLProxyBackendHandlerFactoryTest.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -114,29 +114,29 @@ private MetaDataContexts mockMetaDataContexts(final ShardingSphereDatabase datab
114114
void assertExecuteDataSourcesContext() throws SQLException {
115115
RegisterStorageUnitStatement sqlStatement = mock(RegisterStorageUnitStatement.class);
116116
when(sqlStatement.getAttributes()).thenReturn(new SQLStatementAttributes());
117-
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
117+
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, mock(), connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
118118
}
119119

120120
@Test
121121
void assertExecuteShardingTableRuleContext() throws SQLException {
122122
when(contextManager.getDatabase("foo_db").getRuleMetaData()).thenReturn(new RuleMetaData(Collections.emptyList()));
123123
CreateShardingTableRuleStatement sqlStatement = mock(CreateShardingTableRuleStatement.class);
124124
when(sqlStatement.getAttributes()).thenReturn(new SQLStatementAttributes());
125-
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
125+
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, mock(), connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
126126
}
127127

128128
@Test
129129
void assertExecuteAddResourceContext() throws SQLException {
130130
RegisterStorageUnitStatement sqlStatement = mock(RegisterStorageUnitStatement.class);
131131
when(sqlStatement.getAttributes()).thenReturn(new SQLStatementAttributes());
132-
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
132+
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, mock(), connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
133133
}
134134

135135
@Test
136136
void assertExecuteAlterResourceContext() throws SQLException {
137137
AlterStorageUnitStatement sqlStatement = mock(AlterStorageUnitStatement.class);
138138
when(sqlStatement.getAttributes()).thenReturn(new SQLStatementAttributes());
139-
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
139+
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, mock(), connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
140140
}
141141

142142
@Test
@@ -145,7 +145,7 @@ void assertExecuteAlterShadowRuleContext() throws SQLException {
145145
when(contextManager.getDatabase("foo_db")).thenReturn(database);
146146
AlterShadowRuleStatement sqlStatement = mock(AlterShadowRuleStatement.class);
147147
when(sqlStatement.getAttributes()).thenReturn(new SQLStatementAttributes());
148-
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
148+
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, mock(), connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
149149
}
150150

151151
@Test
@@ -154,7 +154,7 @@ void assertExecuteCreateShadowRuleContext() throws SQLException {
154154
when(contextManager.getDatabase("foo_db")).thenReturn(database);
155155
CreateShadowRuleStatement sqlStatement = mock(CreateShadowRuleStatement.class);
156156
when(sqlStatement.getAttributes()).thenReturn(new SQLStatementAttributes());
157-
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
157+
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, mock(), connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
158158
}
159159

160160
@Test
@@ -163,7 +163,7 @@ void assertExecuteDropShadowRuleContext() throws SQLException {
163163
when(contextManager.getDatabase("foo_db")).thenReturn(database);
164164
DropShadowRuleStatement sqlStatement = mock(DropShadowRuleStatement.class);
165165
when(sqlStatement.getAttributes()).thenReturn(new SQLStatementAttributes());
166-
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
166+
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, mock(), connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
167167
}
168168

169169
@Test
@@ -172,7 +172,7 @@ void assertExecuteAlterDefaultShadowAlgorithm() throws SQLException {
172172
when(contextManager.getDatabase("foo_db")).thenReturn(database);
173173
AlterDefaultShadowAlgorithmStatement statement = new AlterDefaultShadowAlgorithmStatement(
174174
new ShadowAlgorithmSegment("foo", new AlgorithmSegment("SQL_HINT", PropertiesBuilder.build(new Property("type", "value")))));
175-
assertThat(new DistSQLUpdateProxyBackendHandler(statement, connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
175+
assertThat(new DistSQLUpdateProxyBackendHandler(statement, mock(), connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
176176
}
177177

178178
@Test
@@ -205,33 +205,33 @@ void assertExecuteDropShadowAlgorithmContext() throws SQLException {
205205
when(contextManager.getDatabase("foo_db")).thenReturn(database);
206206
DropShadowAlgorithmStatement sqlStatement = mock(DropShadowAlgorithmStatement.class);
207207
when(sqlStatement.getAttributes()).thenReturn(new SQLStatementAttributes());
208-
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
208+
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, mock(), connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
209209
}
210210

211211
@Test
212212
void assertExecuteDropResourceContext() throws SQLException {
213213
UnregisterStorageUnitStatement sqlStatement = mock(UnregisterStorageUnitStatement.class);
214214
when(sqlStatement.getAttributes()).thenReturn(new SQLStatementAttributes());
215-
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
215+
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, mock(), connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
216216
}
217217

218218
@Test
219219
void assertExecuteDropReadwriteSplittingRuleContext() {
220220
assertThrows(MissingRequiredRuleException.class,
221-
() -> new DistSQLUpdateProxyBackendHandler(mock(DropReadwriteSplittingRuleStatement.class, RETURNS_DEEP_STUBS), connectionSession, contextManager).execute());
221+
() -> new DistSQLUpdateProxyBackendHandler(mock(DropReadwriteSplittingRuleStatement.class, RETURNS_DEEP_STUBS), mock(), connectionSession, contextManager).execute());
222222
}
223223

224224
@Test
225225
void assertExecuteCreateReadwriteSplittingRuleContext() throws SQLException {
226226
CreateReadwriteSplittingRuleStatement sqlStatement = mock(CreateReadwriteSplittingRuleStatement.class);
227227
when(sqlStatement.getAttributes()).thenReturn(new SQLStatementAttributes());
228-
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
228+
assertThat(new DistSQLUpdateProxyBackendHandler(sqlStatement, mock(), connectionSession, contextManager).execute(), isA(UpdateResponseHeader.class));
229229
}
230230

231231
@Test
232232
void assertExecuteAlterReadwriteSplittingRuleContext() {
233233
assertThrows(MissingRequiredRuleException.class,
234-
() -> new DistSQLUpdateProxyBackendHandler(mock(AlterReadwriteSplittingRuleStatement.class, RETURNS_DEEP_STUBS), connectionSession, contextManager).execute());
234+
() -> new DistSQLUpdateProxyBackendHandler(mock(AlterReadwriteSplittingRuleStatement.class, RETURNS_DEEP_STUBS), mock(), connectionSession, contextManager).execute());
235235
}
236236

237237
private ShardingSphereDatabase mockDatabaseWithRule() {

0 commit comments

Comments
 (0)