Skip to content

Commit e9a146d

Browse files
authored
[fix](connect) Align COM_RESET_CONNECTION behavior with MySQL (#63884)
### What problem does this PR solve? `COM_RESET_CONNECTION` was accepted by Doris, but its behavior was not compatible with MySQL. The previous implementation cleared the current catalog/database state and returned OK after only a partial reset. This could make pooled clients, such as C# MySqlConnector with `ConnectionReset=True`, fail later unqualified SQL with `Current database is not set`. Other session-scoped state, including user variables and prepared statements, also needed to be reset consistently. ### What is changed? - Preserve the current catalog/database state across `COM_RESET_CONNECTION` so pooled connections can continue using the selected database. - Reset session variables, user variables, prepared statements, running query state, insert result, command state, and returned row count. - Roll back transaction state during reset and return an error if rollback fails. - Drop temporary tables during reset and return an error if cleanup fails. - Return OK with the autocommit server status when reset succeeds. - Return the MySQL-compatible unknown prepared statement error when executing a statement cleared by reset. - Extend regression and FE unit coverage for reset behavior, error handling, and current database preservation.
1 parent 88185e8 commit e9a146d

6 files changed

Lines changed: 366 additions & 18 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java

Lines changed: 88 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.doris.mysql.MysqlHandshakePacket;
5757
import org.apache.doris.mysql.MysqlSslContext;
5858
import org.apache.doris.mysql.ProxyMysqlChannel;
59+
import org.apache.doris.mysql.privilege.Auth;
5960
import org.apache.doris.mysql.privilege.PrivPredicate;
6061
import org.apache.doris.nereids.StatementContext;
6162
import org.apache.doris.nereids.stats.StatsErrorEstimator;
@@ -113,6 +114,7 @@ public class ConnectContext {
113114
private static final Logger LOG = LogManager.getLogger(ConnectContext.class);
114115

115116
private static final String SSL_PROTOCOL = "TLS";
117+
private static final int INITIAL_PREPARED_STMT_ID = Integer.MIN_VALUE;
116118

117119
public enum ConnectType {
118120
MYSQL,
@@ -128,7 +130,7 @@ public enum ConnectType {
128130
protected volatile TUniqueId loadId;
129131
protected volatile long backendId;
130132
// range [Integer.MIN_VALUE, Integer.MAX_VALUE]
131-
protected int preparedStmtId = Integer.MIN_VALUE;
133+
protected int preparedStmtId = INITIAL_PREPARED_STMT_ID;
132134
protected volatile LoadTaskInfo streamLoadInfo;
133135

134136
protected volatile TUniqueId queryId = null;
@@ -369,6 +371,47 @@ public void clearLastDBOfCatalog() {
369371
lastDBOfCatalog.clear();
370372
}
371373

374+
public void resetConnection() throws UserException {
375+
closeTxnForConnectionReset();
376+
if (!dbToTempTableNamesMap.isEmpty()) {
377+
cleanupTemporaryTables(true);
378+
dbToTempTableNamesMap.clear();
379+
}
380+
resetSessionVariable();
381+
userVars = new HashMap<>();
382+
preparedQuerys.clear();
383+
preparedStatementContextMap.clear();
384+
runningQuery = null;
385+
queryId = null;
386+
lastQueryId = null;
387+
setTraceId(null);
388+
insertResult = null;
389+
command = MysqlCommand.COM_SLEEP;
390+
returnRows = 0;
391+
}
392+
393+
private void resetSessionVariable() {
394+
sessionVariable = VariableMgr.newSessionVariable();
395+
applyUserSessionVariableDefaults();
396+
if (Config.use_fuzzy_session_variable) {
397+
sessionVariable.initFuzzyModeVariables();
398+
}
399+
}
400+
401+
private void applyUserSessionVariableDefaults() {
402+
String qualifiedUser = getQualifiedUser();
403+
if (Strings.isNullOrEmpty(qualifiedUser)) {
404+
return;
405+
}
406+
Env currentEnv = env == null ? Env.getCurrentEnv() : env;
407+
Auth auth = currentEnv == null ? null : currentEnv.getAuth();
408+
if (auth == null) {
409+
return;
410+
}
411+
setUserQueryTimeout(auth.getQueryTimeout(qualifiedUser));
412+
setUserInsertTimeout(auth.getInsertTimeout(qualifiedUser));
413+
}
414+
372415
public void setNotEvalNondeterministicFunction(boolean notEvalNondeterministicFunction) {
373416
this.notEvalNondeterministicFunction = notEvalNondeterministicFunction;
374417
}
@@ -385,12 +428,9 @@ public void init() {
385428
state = new QueryState();
386429
returnRows = 0;
387430
isKilled = false;
388-
sessionVariable = VariableMgr.newSessionVariable();
431+
resetSessionVariable();
389432
userVars = new HashMap<>();
390433
command = MysqlCommand.COM_SLEEP;
391-
if (Config.use_fuzzy_session_variable) {
392-
sessionVariable.initFuzzyModeVariables();
393-
}
394434

395435
sessionId = UUID.randomUUID().toString();
396436
if (!FeConstants.runningUnitTest) {
@@ -490,6 +530,18 @@ public void closeTxn() {
490530
}
491531
}
492532

533+
private void closeTxnForConnectionReset() throws DdlException {
534+
if (isTxnModel()) {
535+
try {
536+
txnEntry.abortTransaction();
537+
} catch (Exception e) {
538+
throw new DdlException(String.format("rollback transaction failed, db: %s, txnId: %s",
539+
currentDb, txnEntry.getTransactionId()), e);
540+
}
541+
txnEntry = null;
542+
}
543+
}
544+
493545
public long getStmtId() {
494546
return stmtId;
495547
}
@@ -911,30 +963,50 @@ public void cleanup() {
911963
}
912964

913965
protected void deleteTempTable() {
966+
try {
967+
cleanupTemporaryTables(false);
968+
} catch (DdlException e) {
969+
LOG.error("drop temporary table error", e);
970+
}
971+
}
972+
973+
private void cleanupTemporaryTables(boolean reportFailure) throws DdlException {
914974
// only delete temporary table in its creating session, not proxy session in master fe
915975
if (isProxy) {
916976
return;
917977
}
918978

979+
Map<String, Set<String>> tempTables = new HashMap<>();
980+
for (Map.Entry<String, Set<String>> entry : dbToTempTableNamesMap.entrySet()) {
981+
tempTables.put(entry.getKey(), new HashSet<>(entry.getValue()));
982+
}
983+
919984
// if current fe is master, delete temporary table directly
920985
if (Env.getCurrentEnv().isMaster()) {
921-
for (String dbName : dbToTempTableNamesMap.keySet()) {
922-
Database db = Env.getCurrentEnv().getInternalCatalog().getDb(dbName).get();
923-
for (String tableName : dbToTempTableNamesMap.get(dbName)) {
986+
for (String dbName : tempTables.keySet()) {
987+
for (String tableName : tempTables.get(dbName)) {
924988
LOG.info("try to drop temporary table: {}.{}", dbName, tableName);
925989
try {
990+
Database db = Env.getCurrentEnv().getInternalCatalog().getDb(dbName).get();
926991
Env.getCurrentEnv().getInternalCatalog()
927992
.dropTableWithoutCheck(db, db.getTable(tableName).get(), false, true);
928-
} catch (DdlException e) {
993+
} catch (Exception e) {
994+
if (reportFailure) {
995+
if (e instanceof DdlException) {
996+
throw (DdlException) e;
997+
}
998+
throw new DdlException(String.format(
999+
"drop temporary table error: db: %s, table: %s", dbName, tableName), e);
1000+
}
9291001
LOG.error("drop temporary table error: {}.{}", dbName, tableName, e);
9301002
}
9311003
}
9321004
}
9331005
} else {
9341006
// forward to master fe to drop table
9351007
RedirectStatus redirectStatus = new RedirectStatus(true, false);
936-
for (String dbName : dbToTempTableNamesMap.keySet()) {
937-
for (String tableName : dbToTempTableNamesMap.get(dbName)) {
1008+
for (String dbName : tempTables.keySet()) {
1009+
for (String tableName : tempTables.get(dbName)) {
9381010
LOG.info("request to delete temporary table: {}.{}", dbName, tableName);
9391011
String dropTableSql = String.format("drop table `%s`", tableName);
9401012
OriginStatement originStmt = new OriginStatement(dropTableSql, 0);
@@ -945,6 +1017,11 @@ protected void deleteTempTable() {
9451017
try {
9461018
masterOpExecutor.execute();
9471019
} catch (Exception e) {
1020+
if (reportFailure) {
1021+
throw new DdlException(String.format(
1022+
"master FE drop temporary table error: db: %s, table: %s",
1023+
dbName, tableName), e);
1024+
}
9481025
LOG.error("master FE drop temporary table error: db: {}, table: {}", dbName, tableName, e);
9491026
}
9501027
}

fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.apache.doris.common.util.SqlUtils;
5151
import org.apache.doris.common.util.Util;
5252
import org.apache.doris.datasource.CatalogIf;
53-
import org.apache.doris.datasource.InternalCatalog;
5453
import org.apache.doris.metric.MetricRepo;
5554
import org.apache.doris.mysql.MysqlChannel;
5655
import org.apache.doris.mysql.MysqlCommand;
@@ -155,12 +154,20 @@ protected void handleDebug() {
155154
}
156155

157156
protected void handleResetConnection() {
158-
ctx.changeDefaultCatalog(InternalCatalog.INTERNAL_CATALOG_NAME);
159-
ctx.clearLastDBOfCatalog();
160-
ctx.getState().setOk();
157+
try {
158+
ctx.resetConnection();
159+
ctx.getState().setOk();
160+
} catch (UserException e) {
161+
ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage());
162+
}
161163
}
162164

163-
protected void handleStmtReset() {
165+
protected void handleStmtResetById(int stmtId) {
166+
if (ctx.getPreparedStementContext(String.valueOf(stmtId)) == null) {
167+
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_STMT_HANDLER,
168+
String.format("Unknown prepared statement handler (%s) given to mysqld_stmt_reset", stmtId));
169+
return;
170+
}
164171
ctx.getState().setOk();
165172
}
166173

fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ private void handleStmtClose() {
8282
handleStmtClose(stmtId);
8383
}
8484

85+
private void handleStmtReset() {
86+
packetBuf = packetBuf.order(ByteOrder.LITTLE_ENDIAN);
87+
int stmtId = packetBuf.getInt();
88+
handleStmtResetById(stmtId);
89+
}
90+
8591
private String getPacket() {
8692
byte[] bytes = packetBuf.array();
8793
StringBuilder printB = new StringBuilder();
@@ -214,8 +220,8 @@ private void handleExecute() {
214220
PreparedStatementContext preparedStatementContext = ctx.getPreparedStementContext(String.valueOf(stmtId));
215221
if (preparedStatementContext == null) {
216222
LOG.warn("No such statement in context, stmtId:{}", stmtId);
217-
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR,
218-
"msg: Not supported such prepared statement");
223+
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_STMT_HANDLER,
224+
String.format("Unknown prepared statement handler (%s) given to mysqld_stmt_execute", stmtId));
219225
return;
220226
}
221227
handleExecute(preparedStatementContext.command, stmtId, preparedStatementContext, packetBuf, null);

0 commit comments

Comments
 (0)