From 57789a0df5d4eacf6d79df832873a84187b2acb4 Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Tue, 4 Nov 2025 11:44:39 +0100 Subject: [PATCH] MySQL cursor fetch uses wrong row descriptor after initial fetch (#1562) See #1525 MySQL sends column definitions in response to a prepare command. But when executing the statement or fetching the first page of a cursor, it sends column definitions again. The row decoder should not use the definitions given in response to prepare command because they may not be accurate. The previous cursor implementation used them when fetching the second page. Signed-off-by: Thomas Segismont --- .../impl/codec/ExtendedQueryCommandCodec.java | 12 ++++++++-- .../impl/codec/MySQLPreparedStatement.java | 9 ++++--- .../impl/codec/PrepareStatementCodec.java | 3 --- .../impl/codec/QueryCommandBaseCodec.java | 4 ++++ .../codec/ResetStatementCommandCodec.java | 1 + .../tests/mysqlclient/MySQLQueryTest.java | 24 +++++++++++++++++-- .../tests/mysqlclient/junit/MySQLRule.java | 4 ++++ .../src/test/resources/init.sql | 5 ++++ 8 files changed, 50 insertions(+), 12 deletions(-) diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java index 2b5bd675b..895d4b7f3 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ExtendedQueryCommandCodec.java @@ -17,6 +17,7 @@ package io.vertx.mysqlclient.impl.codec; import io.netty.buffer.ByteBuf; +import io.vertx.mysqlclient.impl.MySQLRowDesc; import io.vertx.mysqlclient.impl.protocol.CommandType; import io.vertx.sqlclient.Tuple; import io.vertx.sqlclient.internal.command.CommandResponse; @@ -31,7 +32,14 @@ class ExtendedQueryCommandCodec extends ExtendedQueryCommandBaseCodec 0 && statement.isCursorOpen) { // restore the state we need for decoding fetch response based on the prepared statement - columnDefinitions = statement.rowDesc.columnDefinitions(); + columnDefinitions = statement.cursorRowDescriptor.columnDefinitions(); + } + } + + @Override + protected void handleRowDescriptorCreated(MySQLRowDesc mySQLRowDesc) { + if (cmd.fetch() > 0) { + statement.cursorRowDescriptor = mySQLRowDesc; } } @@ -42,7 +50,7 @@ void encode(MySQLEncoder encoder) { if (statement.isCursorOpen) { if (decoder == null) { // restore the state we need for decoding if column definitions are not included in the fetch response - decoder = new RowResultDecoder<>(cmd.collector(), statement.rowDesc); + decoder = new RowResultDecoder<>(cmd.collector(), statement.cursorRowDescriptor); } sendStatementFetchCommand(statement.statementId, cmd.fetch()); } else { diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java index ae5526856..140a0e605 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/MySQLPreparedStatement.java @@ -23,7 +23,7 @@ import io.vertx.mysqlclient.impl.datatype.DataType; import io.vertx.mysqlclient.impl.datatype.DataTypeCodec; import io.vertx.sqlclient.Tuple; -import io.vertx.sqlclient.impl.*; +import io.vertx.sqlclient.impl.ErrorMessageFactory; import io.vertx.sqlclient.internal.ParamDesc; import io.vertx.sqlclient.internal.PreparedStatement; import io.vertx.sqlclient.internal.RowDesc; @@ -36,18 +36,17 @@ class MySQLPreparedStatement implements PreparedStatement { final long statementId; final String sql; final MySQLParamDesc paramDesc; - final MySQLRowDesc rowDesc; final boolean closeAfterUsage; private boolean sendTypesToServer; private final DataType[] bindingTypes; boolean isCursorOpen; + MySQLRowDesc cursorRowDescriptor; - MySQLPreparedStatement(String sql, long statementId, MySQLParamDesc paramDesc, MySQLRowDesc rowDesc, boolean closeAfterUsage) { + MySQLPreparedStatement(String sql, long statementId, MySQLParamDesc paramDesc, boolean closeAfterUsage) { this.statementId = statementId; this.paramDesc = paramDesc; - this.rowDesc = rowDesc; this.sql = sql; this.closeAfterUsage = closeAfterUsage; @@ -63,7 +62,7 @@ public ParamDesc paramDesc() { @Override public RowDesc rowDesc() { - return rowDesc; + throw new UnsupportedOperationException("The client should use the column definitions provided by execute or fetch response instead of prepare response"); } @Override diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java index 24bb73ea3..6f98695be 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/PrepareStatementCodec.java @@ -18,8 +18,6 @@ import io.netty.buffer.ByteBuf; import io.vertx.mysqlclient.impl.MySQLParamDesc; -import io.vertx.mysqlclient.impl.MySQLRowDesc; -import io.vertx.mysqlclient.impl.datatype.DataFormat; import io.vertx.mysqlclient.impl.protocol.ColumnDefinition; import io.vertx.mysqlclient.impl.protocol.CommandType; import io.vertx.sqlclient.internal.PreparedStatement; @@ -135,7 +133,6 @@ private void handleReadyForQuery() { cmd.sql(), this.statementId, new MySQLParamDesc(paramDescs), - MySQLRowDesc.create(columnDescs, DataFormat.BINARY), !cmd.isManaged()))); } diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java index 37c6aa329..cc9bc1f73 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/QueryCommandBaseCodec.java @@ -94,9 +94,13 @@ protected void handleResultsetColumnDefinitions(ByteBuf payload) { protected void handleResultsetColumnDefinitionsDecodingCompleted() { commandHandlerState = CommandHandlerState.HANDLING_ROW_DATA_OR_END_PACKET; MySQLRowDesc mySQLRowDesc = MySQLRowDesc.create(columnDefinitions, format); // use the column definitions if provided by execute or fetch response instead of prepare response + handleRowDescriptorCreated(mySQLRowDesc); decoder = new RowResultDecoder<>(cmd.collector(), mySQLRowDesc); } + protected void handleRowDescriptorCreated(MySQLRowDesc mySQLRowDesc) { + } + protected void handleRows(ByteBuf payload, int payloadLength) { /* Resultset row can begin with 0xfe byte (when using text protocol with a field length > 0xffffff) diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java index 0ddc12aa0..5fbc98a90 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/codec/ResetStatementCommandCodec.java @@ -29,6 +29,7 @@ void encode(MySQLEncoder encoder) { statement.cleanBindings(); statement.isCursorOpen = false; + statement.cursorRowDescriptor = null; sendStatementResetCommand(statement.statementId); } diff --git a/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java b/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java index 5a34ed8e0..05ddda2aa 100644 --- a/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java +++ b/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/MySQLQueryTest.java @@ -14,6 +14,7 @@ import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.file.FileSystem; +import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import io.vertx.mysqlclient.MySQLClient; @@ -21,7 +22,6 @@ import io.vertx.mysqlclient.MySQLConnection; import io.vertx.sqlclient.*; import org.junit.After; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -32,6 +32,8 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import static org.junit.Assume.assumeFalse; + @RunWith(VertxUnitRunner.class) public class MySQLQueryTest extends MySQLTestBase { @@ -280,7 +282,7 @@ public void testDecodePacketSizeMoreThan16MB(TestContext ctx) { @Test public void testEncodePacketSizeMoreThan16MB(TestContext ctx) { - Assume.assumeFalse(rule.isUsingMySQL5_6()); + assumeFalse(rule.isUsingMySQL5_6()); int dataSize = 20 * 1024 * 1024; // 20MB payload byte[] data = new byte[dataSize]; ThreadLocalRandom.current().nextBytes(data); @@ -470,4 +472,22 @@ public void testLocalInfileRequestInPackets(TestContext ctx) { })); })); } + + @Test + public void testColumnDefinitionChangeWithCursor(TestContext ctx) { + assumeFalse("Query syntax not supported on MySQL 5", rule.isUsingMySQL5()); + Async rows = ctx.async(100); + Async completion = ctx.async(); + MySQLConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> { + conn + .prepare("SELECT row_number() over () as \"Number\", case when 1 then 1 else 0 end as \"Case\" FROM mysql.help_relation limit 0,100") + .onComplete(ctx.asyncAssertSuccess(ps -> { + // Make sure to fetch from the database twice + RowStream stream = ps.createStream(50); + stream.exceptionHandler(ctx::fail); + stream.endHandler(v -> completion.complete()); + stream.handler(row -> rows.countDown()); + })); + })); + } } diff --git a/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/junit/MySQLRule.java b/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/junit/MySQLRule.java index c56b477f1..7f583aa34 100644 --- a/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/junit/MySQLRule.java +++ b/vertx-mysql-client/src/test/java/io/vertx/tests/mysqlclient/junit/MySQLRule.java @@ -143,6 +143,10 @@ public boolean isUsingMariaDB() { return databaseServerInfo.getDatabaseType() == DatabaseType.MariaDB; } + public boolean isUsingMySQL5() { + return databaseServerInfo.databaseType == DatabaseType.MySQL && databaseServerInfo.dockerImageTag.startsWith("5."); + } + public boolean isUsingMySQL5_6() { return databaseServerInfo == DatabaseServerInfo.MySQL_V5_6; } diff --git a/vertx-mysql-client/src/test/resources/init.sql b/vertx-mysql-client/src/test/resources/init.sql index ef68fdf9e..9181a68f8 100644 --- a/vertx-mysql-client/src/test/resources/init.sql +++ b/vertx-mysql-client/src/test/resources/init.sql @@ -1,3 +1,8 @@ +#allow reading mysql schema +GRANT +SELECT +ON mysql.* TO 'mysql'; + # testing change schema CREATE DATABASE emptyschema; GRANT ALL ON emptyschema.* TO 'mysql'@'%';