From d72844197b2301243021edd60d10a4a875cbe564 Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Mon, 28 Apr 2025 00:19:32 +0200 Subject: [PATCH] Improve the implementation of prepared statement. Motivation: The implementation of prepared statement tries to accomodate both a lazy mode and a non lazy one, leading to the utilization of a future to unify the obtention of the prepared statement. This is clearly suboptimal for the common case, since lazy prepared statement is only necessary when meeting indeterminate prepared statement (ambiguous) that requires a tuple for an actual preparation. In addition the execution is done using a context comparison which fails when on a duplicated context. Changes: Use the context in thread comparison instead of context equality. Make a prepared statement base which has two implementations lazy/direct which are both optimized for their specific cases. --- .../io/vertx/sqlclient/impl/CursorImpl.java | 30 +-- .../sqlclient/impl/PreparedStatementBase.java | 245 +++++++++++++++++ .../sqlclient/impl/PreparedStatementImpl.java | 253 ------------------ .../vertx/sqlclient/impl/RowStreamImpl.java | 4 +- .../sqlclient/internal/SqlConnectionBase.java | 6 +- 5 files changed, 261 insertions(+), 277 deletions(-) create mode 100644 vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementBase.java delete mode 100644 vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java index 1c02524a9..c525e6dba 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/CursorImpl.java @@ -36,16 +36,16 @@ public class CursorImpl implements Cursor { private final Connection conn; - private final PreparedStatementImpl ps; + private final PreparedStatementBase ps; private final ContextInternal context; private final boolean autoCommit; private final TupleInternal params; private String id; private boolean closed; - private QueryResultBuilder, RowSetImpl, RowSet> result; + QueryResultBuilder, RowSetImpl, RowSet> result; - CursorImpl(PreparedStatementImpl ps, Connection conn, ContextInternal context, boolean autoCommit, TupleInternal params) { + CursorImpl(PreparedStatementBase ps, Connection conn, ContextInternal context, boolean autoCommit, TupleInternal params) { this.ps = ps; this.conn = conn; this.context = context; @@ -64,22 +64,14 @@ public synchronized boolean hasMore() { @Override public synchronized Future> read(int count) { PromiseInternal> promise = context.promise(); - ps.withPreparedStatement(ps.options(), params, ar -> { - if (ar.succeeded()) { - PreparedStatement preparedStatement = ar.result(); - QueryExecutor, RowSetImpl, RowSet> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); - if (id == null) { - id = UUID.randomUUID().toString(); - this.result = builder.executeExtendedQuery(conn, preparedStatement, ps.options(), autoCommit, params, count, id, false, promise); - } else if (this.result.isSuspended()) { - this.result = builder.executeExtendedQuery(conn, preparedStatement, ps.options(), autoCommit, params, count, id, true, promise); - } else { - throw new IllegalStateException(); - } - } else { - promise.fail(ar.cause()); - } - }); + boolean suspended; + if (id == null) { + id = UUID.randomUUID().toString(); + suspended = false; + } else { + suspended = true; + } + ps.readCursor(this, id, suspended, params, count, promise); return promise.future(); } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementBase.java new file mode 100644 index 000000000..0686a2b6e --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementBase.java @@ -0,0 +1,245 @@ +/* + * Copyright (C) 2017 Julien Viet + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.vertx.sqlclient.impl; + +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.PromiseInternal; +import io.vertx.sqlclient.PrepareOptions; +import io.vertx.sqlclient.PreparedQuery; +import io.vertx.sqlclient.internal.ArrayTuple; +import io.vertx.sqlclient.internal.Connection; +import io.vertx.sqlclient.internal.command.CloseCursorCommand; +import io.vertx.sqlclient.internal.command.CloseStatementCommand; +import io.vertx.sqlclient.Cursor; +import io.vertx.sqlclient.PreparedStatement; +import io.vertx.sqlclient.SqlResult; +import io.vertx.sqlclient.RowSet; +import io.vertx.sqlclient.RowStream; +import io.vertx.sqlclient.Row; +import io.vertx.sqlclient.Tuple; +import io.vertx.core.*; +import io.vertx.sqlclient.internal.command.PrepareStatementCommand; +import io.vertx.sqlclient.internal.TupleInternal; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collector; + +/** + * @author Julien Viet + */ +public abstract class PreparedStatementBase implements PreparedStatement { + + public static PreparedStatement create(Connection conn, + ContextInternal context, + io.vertx.sqlclient.internal.PreparedStatement preparedStatement, + boolean autoCommit) { + return new PreparedStatementBase(conn, context, autoCommit) { + @Override + protected > void executeBatch(List argsList, QueryExecutor builder, PromiseInternal p) { + builder.executeBatchQuery(conn, null, preparedStatement, autoCommit, argsList, p); + } + @Override + protected > void execute(Tuple args, int fetch, String cursorId, boolean suspended, QueryExecutor builder, PromiseInternal p) { + builder.executeExtendedQuery(conn, preparedStatement, null, autoCommit, args, fetch, cursorId, suspended, p); + } + @Override + protected void close(Promise promise) { + conn.schedule(new CloseStatementCommand(preparedStatement), promise); + } + @Override + protected void closeCursor(String cursorId, Promise promise) { + CloseCursorCommand cmd = new CloseCursorCommand(cursorId, preparedStatement); + conn.schedule(cmd, promise); + } + @Override + protected void readCursor(CursorImpl cursor, String id, boolean suspended, TupleInternal params, int count, PromiseInternal> promise) { + QueryExecutor, RowSetImpl, RowSet> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); + cursor.result = builder.executeExtendedQuery(conn, preparedStatement, null, autoCommit, params, count, id, suspended, promise); + } + }; + } + + public static PreparedStatement create(Connection conn, ContextInternal context, PrepareOptions options, String sql, boolean autoCommit) { + return new PreparedStatementBase(conn, context, autoCommit) { + Future future; + void withPreparedStatement(PrepareOptions options, Tuple args, Handler> handler) { + if (context.inThread()) { + if (future == null) { + Promise promise = context.promise(); + PrepareStatementCommand prepare = new PrepareStatementCommand(sql, options, true, args.types()); + conn.schedule(prepare, promise); + future = promise.future(); + } + future.onComplete(handler); + } else { + context.runOnContext(v -> withPreparedStatement(options, args, handler)); + } + } + @Override + protected > void executeBatch(List argsList, QueryExecutor builder, PromiseInternal p) { + withPreparedStatement(options, argsList.get(0), ar -> { + if (ar.succeeded()) { + builder.executeBatchQuery(conn, options, ar.result(), autoCommit, argsList, p); + } else { + p.fail(ar.cause()); + } + }); + } + @Override + protected > void execute(Tuple args, int fetch, String cursorId, boolean suspended, QueryExecutor builder, PromiseInternal p) { + withPreparedStatement(options, args, ar -> { + if (ar.succeeded()) { + builder.executeExtendedQuery(conn, ar.result(), options, autoCommit, args, fetch, cursorId, suspended, p); + } else { + p.fail(ar.cause()); + } + }); + } + @Override + protected void readCursor(CursorImpl cursor, String id, boolean suspended, TupleInternal params, int count, PromiseInternal> promise) { + withPreparedStatement(options, params, ar -> { + if (ar.succeeded()) { + QueryExecutor, RowSetImpl, RowSet> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); + cursor.result = builder.executeExtendedQuery(conn, ar.result(), options, autoCommit, params, count, id, suspended, promise); + } else { + promise.fail(ar.cause()); + } + }); + } + @Override + protected void close(Promise promise) { + if (future != null) { + future.onComplete(ar -> { + if (ar.succeeded()) { + CloseStatementCommand cmd = new CloseStatementCommand(ar.result()); + conn.schedule(cmd, promise); + } else { + promise.fail(ar.cause()); + } + }); + } + } + @Override + protected void closeCursor(String cursorId, Promise promise) { + if (future != null) { + future.onComplete(ar -> { + if (ar.succeeded()) { + CloseCursorCommand cmd = new CloseCursorCommand(cursorId, ar.result()); + conn.schedule(cmd, promise); + } else { + promise.fail(ar.cause()); + } + }); + } else { + promise.fail("Invalid"); + } + } + }; + } + + private final Connection conn; + private final ContextInternal context; + private final boolean autoCommit; + private final AtomicBoolean closed; + + private PreparedStatementBase(Connection conn, ContextInternal context, boolean autoCommit) { + this.conn = conn; + this.context = context; + this.autoCommit = autoCommit; + this.closed = new AtomicBoolean(); + } + + protected abstract > void execute(Tuple args, int fetch, String cursorId, boolean suspended, QueryExecutor builder, PromiseInternal p); + protected abstract > void executeBatch(List argsList, QueryExecutor builder, PromiseInternal p); + protected abstract void close(Promise promise); + protected abstract void closeCursor(String cursorId, Promise promise); + protected abstract void readCursor(CursorImpl cursor, String id, boolean suspended, TupleInternal params, int count, PromiseInternal> promise); + + @Override + public final PreparedQuery> query() { + return new PreparedStatementQuery<>(new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR)); + } + + @Override + public final Cursor cursor(Tuple args) { + return new CursorImpl(this, conn, context, autoCommit, (TupleInternal) args); + } + + @Override + public final Future close() { + if (closed.compareAndSet(false, true)) { + Promise promise = context.promise(); + close(promise); + return promise.future(); + } else { + return context.failedFuture("Already closed"); + } + } + + @Override + public final RowStream createStream(int fetch, Tuple args) { + return new RowStreamImpl(this, context, fetch, args); + } + + private class PreparedStatementQuery> extends QueryBase implements PreparedQuery { + + public PreparedStatementQuery(QueryExecutor builder) { + super(builder); + } + + @Override + protected > QueryBase copy(QueryExecutor builder) { + return new PreparedStatementQuery<>(builder); + } + + @Override + public PreparedQuery> collecting(Collector collector) { + return (PreparedQuery>) super.collecting(collector); + } + + @Override + public PreparedQuery> mapping(Function mapper) { + return (PreparedQuery>) super.mapping(mapper); + } + + @Override + public Future execute() { + return execute(ArrayTuple.EMPTY); + } + + @Override + public Future execute(Tuple args) { + PromiseInternal promise = context.promise(); + PreparedStatementBase.this.execute(args, 0, null, false, builder, promise); + return promise.future(); + } + + @Override + public Future executeBatch(List argsList) { + if (argsList.isEmpty()) { + context.failedFuture("Empty batch"); + } else { + PromiseInternal promise = context.promise(); + PreparedStatementBase.this.executeBatch(argsList, builder, promise); + return promise.future(); + } + } + } +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java deleted file mode 100644 index e9e2223b0..000000000 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PreparedStatementImpl.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Copyright (C) 2017 Julien Viet - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package io.vertx.sqlclient.impl; - -import io.vertx.core.internal.ContextInternal; -import io.vertx.core.internal.PromiseInternal; -import io.vertx.sqlclient.PrepareOptions; -import io.vertx.sqlclient.PreparedQuery; -import io.vertx.sqlclient.internal.ArrayTuple; -import io.vertx.sqlclient.internal.Connection; -import io.vertx.sqlclient.internal.command.CloseCursorCommand; -import io.vertx.sqlclient.internal.command.CloseStatementCommand; -import io.vertx.sqlclient.Cursor; -import io.vertx.sqlclient.PreparedStatement; -import io.vertx.sqlclient.SqlResult; -import io.vertx.sqlclient.RowSet; -import io.vertx.sqlclient.RowStream; -import io.vertx.sqlclient.Row; -import io.vertx.sqlclient.Tuple; -import io.vertx.core.*; -import io.vertx.sqlclient.internal.command.PrepareStatementCommand; -import io.vertx.sqlclient.internal.TupleInternal; - -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; -import java.util.stream.Collector; - -/** - * @author Julien Viet - */ -public class PreparedStatementImpl implements PreparedStatement { - - public static PreparedStatement create(Connection conn, ContextInternal context, io.vertx.sqlclient.internal.PreparedStatement ps, boolean autoCommit) { - return new PreparedStatementImpl(conn, context, ps, autoCommit); - } - - public static PreparedStatement create(Connection conn, ContextInternal context, PrepareOptions options, String sql, boolean autoCommit) { - return new PreparedStatementImpl(conn, context, sql, options, autoCommit); - } - - private final Connection conn; - private final ContextInternal context; - private final String sql; - private final PrepareOptions options; - private Promise promise; - private Future future; - private final boolean autoCommit; - private final AtomicBoolean closed = new AtomicBoolean(); - - private PreparedStatementImpl(Connection conn, ContextInternal context, io.vertx.sqlclient.internal.PreparedStatement ps, boolean autoCommit) { - this.conn = conn; - this.context = context; - this.sql = null; - this.options = null; - this.promise = null; - this.future = Future.succeededFuture(ps); - this.autoCommit = autoCommit; - } - - private PreparedStatementImpl(Connection conn, - ContextInternal context, - String sql, - PrepareOptions options, - boolean autoCommit) { - this.conn = conn; - this.context = context; - this.sql = sql; - this.options = options; - this.promise = Promise.promise(); - this.autoCommit = autoCommit; - } - - PrepareOptions options() { - return options; - } - - @Override - public PreparedQuery> query() { - QueryExecutor, RowSetImpl, RowSet> builder = new QueryExecutor<>(RowSetImpl.FACTORY, RowSetImpl.COLLECTOR); - return new PreparedStatementQuery<>(builder); - } - - void withPreparedStatement(PrepareOptions options, Tuple args, Handler> handler) { - if (context == Vertx.currentContext()) { - if (future == null) { - // Lazy statement; - PrepareStatementCommand prepare = new PrepareStatementCommand(sql, options, true, args.types()); - conn.schedule(prepare, promise); - future = promise.future(); - } - future.onComplete(handler); - } else { - context.runOnContext(v -> withPreparedStatement(options, args, handler)); - } - } - - > void execute(Tuple args, - int fetch, - String cursorId, - boolean suspended, - QueryExecutor builder, - PromiseInternal p) { - withPreparedStatement(options, args, ar -> { - if (ar.succeeded()) { - builder.executeExtendedQuery( - conn, - ar.result(), - options, - autoCommit, - args, - fetch, - cursorId, - suspended, - p); - } else { - p.fail(ar.cause()); - } - }); - } - - > void executeBatch(List argsList, - QueryExecutor builder, - PromiseInternal p) { - withPreparedStatement(options, argsList.get(0), ar -> { - if (ar.succeeded()) { - builder.executeBatchQuery(conn, options, ar.result(), autoCommit, argsList, p); - } else { - p.fail(ar.cause()); - } - }); - } - - @Override - public Cursor cursor(Tuple args) { - return cursor((TupleInternal) args); - } - - private Cursor cursor(TupleInternal args) { - return new CursorImpl(this, conn, context, autoCommit, args); - } - - @Override - public Future close() { - if (closed.compareAndSet(false, true)) { - Promise promise = context.promise(); - if (this.promise == null) { - CloseStatementCommand cmd = new CloseStatementCommand(future.result()); - conn.schedule(cmd, promise); - } else { - if (future == null) { - future = this.promise.future(); - this.promise.fail("Closed"); - } - future.onComplete(ar -> { - if (ar.succeeded()) { - CloseStatementCommand cmd = new CloseStatementCommand(ar.result()); - conn.schedule(cmd, promise); - } else { - promise.complete(); - } - }); - } - return promise.future(); - } else { - return context.failedFuture("Already closed"); - } - } - - @Override - public RowStream createStream(int fetch, Tuple args) { - return new RowStreamImpl(this, context, fetch, args); - } - - void closeCursor(String cursorId, Promise promise) { - future.onComplete(ar -> { - if (ar.succeeded()) { - CloseCursorCommand cmd = new CloseCursorCommand(cursorId, ar.result()); - conn.schedule(cmd, promise); - } else { - promise.fail(ar.cause()); - } - }); - } - - private class PreparedStatementQuery> extends QueryBase implements PreparedQuery { - - public PreparedStatementQuery(QueryExecutor builder) { - super(builder); - } - - @Override - protected > QueryBase copy(QueryExecutor builder) { - return new PreparedStatementQuery<>(builder); - } - - @Override - public PreparedQuery> collecting(Collector collector) { - return (PreparedQuery>) super.collecting(collector); - } - - @Override - public PreparedQuery> mapping(Function mapper) { - return (PreparedQuery>) super.mapping(mapper); - } - - @Override - public Future execute() { - return execute(ArrayTuple.EMPTY); - } - - @Override - public Future execute(Tuple args) { - PromiseInternal promise = context.promise(); - execute(args, promise); - return promise.future(); - } - - private void execute(Tuple args, PromiseInternal promise) { - PreparedStatementImpl.this.execute(args, 0, null, false, builder, promise); - } - - @Override - public Future executeBatch(List argsList) { - PromiseInternal promise = context.promise(); - executeBatch(argsList, promise); - return promise.future(); - } - - private void executeBatch(List argsList, PromiseInternal promise) { - if (argsList.isEmpty()) { - promise.fail("Empty batch"); - } else { - PreparedStatementImpl.this.executeBatch(argsList, builder, promise); - } - } - } -} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowStreamImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowStreamImpl.java index 301999110..1333a186a 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowStreamImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/RowStreamImpl.java @@ -32,7 +32,7 @@ public class RowStreamImpl implements RowStreamInternal, Handler>> { - private final PreparedStatementImpl ps; + private final PreparedStatementBase ps; private final ContextInternal context; private final int fetch; private final Tuple params; @@ -46,7 +46,7 @@ public class RowStreamImpl implements RowStreamInternal, Handler result; - RowStreamImpl(PreparedStatementImpl ps, ContextInternal context, int fetch, Tuple params) { + RowStreamImpl(PreparedStatementBase ps, ContextInternal context, int fetch, Tuple params) { this.ps = ps; this.context = context; this.fetch = fetch; diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java index 569b5231e..f8aca2b28 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/internal/SqlConnectionBase.java @@ -26,7 +26,7 @@ import io.vertx.sqlclient.PreparedStatement; import io.vertx.sqlclient.SqlConnection; import io.vertx.sqlclient.Transaction; -import io.vertx.sqlclient.impl.PreparedStatementImpl; +import io.vertx.sqlclient.impl.PreparedStatementBase; import io.vertx.sqlclient.impl.TransactionImpl; import io.vertx.sqlclient.internal.command.CommandBase; import io.vertx.sqlclient.internal.command.PrepareStatementCommand; @@ -78,10 +78,10 @@ public Future prepare(String sql, PrepareOptions options) { schedule(new PrepareStatementCommand(sql, options, true), promise); return promise.future() .compose( - cr -> Future.succeededFuture(PreparedStatementImpl.create(conn, context, cr, autoCommit())), + cr -> Future.succeededFuture(PreparedStatementBase.create(conn, context, cr, autoCommit())), err -> { if (conn.isIndeterminatePreparedStatementError(err)) { - return Future.succeededFuture(PreparedStatementImpl.create(conn, context, options, sql, autoCommit())); + return Future.succeededFuture(PreparedStatementBase.create(conn, context, options, sql, autoCommit())); } else { return Future.failedFuture(err); }