Skip to content

Commit fc8046e

Browse files
committed
savepoints
Signed-off-by: doxlik <doxlikx@gmail.com>
1 parent 5869727 commit fc8046e

15 files changed

Lines changed: 885 additions & 30 deletions

File tree

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@
189189
<module>vertx-db2-client</module>
190190
<module>vertx-sql-client-templates</module>
191191
<module>vertx-oracle-client</module>
192+
<module>vertx-pg-savepoints-example</module>
192193
</modules>
193194

194195
</project>

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import io.vertx.pgclient.PgNotice;
2727
import io.vertx.pgclient.PgNotification;
2828
import io.vertx.pgclient.impl.codec.NoticeResponse;
29-
import io.vertx.pgclient.impl.codec.TxFailedEvent;
29+
import io.vertx.pgclient.impl.codec.TxStatusEvent;
3030
import io.vertx.pgclient.spi.PgDriver;
3131
import io.vertx.sqlclient.codec.SocketConnectionBase;
3232
import io.vertx.sqlclient.internal.SqlConnectionBase;
@@ -99,9 +99,9 @@ public void handleEvent(Object event) {
9999
} else {
100100
notice.log(SocketConnectionBase.logger);
101101
}
102-
} else if (event instanceof TxFailedEvent) {
102+
} else if (event instanceof TxStatusEvent) {
103103
if (tx != null) {
104-
tx.fail();
104+
tx.status(((TxStatusEvent) event).status());
105105
}
106106
}
107107
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgSocketConnection.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import io.vertx.pgclient.impl.codec.NoticeResponse;
3232
import io.vertx.pgclient.impl.codec.PgCodec;
3333
import io.vertx.pgclient.impl.codec.PgCommandMessage;
34-
import io.vertx.pgclient.impl.codec.TxFailedEvent;
34+
import io.vertx.pgclient.impl.codec.TxStatusEvent;
3535
import io.vertx.sqlclient.codec.CommandMessage;
3636
import io.vertx.sqlclient.codec.SocketConnectionBase;
3737
import io.vertx.sqlclient.spi.connection.Connection;
@@ -41,6 +41,7 @@
4141
import io.vertx.sqlclient.spi.protocol.CommandBase;
4242
import io.vertx.sqlclient.spi.protocol.ExtendedQueryCommand;
4343
import io.vertx.sqlclient.spi.protocol.InitCommand;
44+
import io.vertx.sqlclient.spi.protocol.SavepointCommand;
4445
import io.vertx.sqlclient.spi.protocol.SimpleQueryCommand;
4546
import io.vertx.sqlclient.spi.protocol.TxCommand;
4647

@@ -116,7 +117,7 @@ Future<Void> sendCancelRequestMessage(int processId, int secretKey) {
116117
@Override
117118
protected void handleMessage(Object msg) {
118119
super.handleMessage(msg);
119-
if (msg instanceof Notification || msg instanceof TxFailedEvent || msg instanceof NoticeResponse) {
120+
if (msg instanceof Notification || msg instanceof TxStatusEvent || msg instanceof NoticeResponse) {
120121
handleEvent(msg);
121122
}
122123
}
@@ -172,6 +173,15 @@ protected <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {
172173
SocketConnectionBase.NULL_COLLECTOR,
173174
QueryResultHandler.NOOP_HANDLER);
174175
super.doSchedule(cmd2, (res, err) -> handler.complete(tx.result(), err));
176+
} else if (cmd instanceof SavepointCommand) {
177+
SavepointCommand<R> savepoint = (SavepointCommand<R>) cmd;
178+
SimpleQueryCommand<Void> cmd2 = new SimpleQueryCommand<>(
179+
savepoint.sql(),
180+
false,
181+
false,
182+
SocketConnectionBase.NULL_COLLECTOR,
183+
QueryResultHandler.NOOP_HANDLER);
184+
super.doSchedule(cmd2, (res, err) -> handler.complete(savepoint.result(), err));
175185
} else {
176186
super.doSchedule(cmd, handler);
177187
}

vertx-pg-client/src/main/java/io/vertx/pgclient/impl/codec/PgDecoder.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,12 +236,11 @@ private void decodeRowDescription(ByteBuf in) {
236236
private void decodeReadyForQuery(ChannelHandlerContext ctx, ByteBuf in) {
237237
byte id = in.readByte();
238238
if (id == I) {
239-
// IDLE
239+
ctx.fireChannelRead(TxStatusEvent.IDLE);
240240
} else if (id == T) {
241-
// ACTIVE
241+
ctx.fireChannelRead(TxStatusEvent.ACTIVE);
242242
} else {
243-
// FAILED
244-
ctx.fireChannelRead(TxFailedEvent.INSTANCE);
243+
ctx.fireChannelRead(TxStatusEvent.FAILED);
245244
}
246245
codec.peek().handleReadyForQuery();
247246
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright (c) 2011-2025 Contributors to the Eclipse Foundation
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7+
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10+
*/
11+
package io.vertx.pgclient.impl.codec;
12+
13+
import io.vertx.sqlclient.impl.TransactionState;
14+
15+
public class TxStatusEvent {
16+
17+
public static final TxStatusEvent IDLE = new TxStatusEvent(TransactionState.IDLE);
18+
public static final TxStatusEvent ACTIVE = new TxStatusEvent(TransactionState.ACTIVE);
19+
public static final TxStatusEvent FAILED = new TxStatusEvent(TransactionState.FAILED);
20+
21+
private final TransactionState status;
22+
23+
private TxStatusEvent(TransactionState status) {
24+
this.status = status;
25+
}
26+
27+
public TransactionState status() {
28+
return status;
29+
}
30+
}

vertx-pg-client/src/main/java/io/vertx/pgclient/spi/PgDriver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ public int appendQueryPlaceholder(StringBuilder queryBuilder, int index, int cur
7373
return index;
7474
}
7575

76+
@Override
77+
public boolean supportsSavepoints() {
78+
return true;
79+
}
80+
7681
@Override
7782
public SqlConnectionInternal wrapConnection(ContextInternal context, ConnectionFactory<PgConnectOptions> factory, Connection connection) {
7883
return new PgConnectionImpl((PgConnectionFactory) factory, context, connection);

0 commit comments

Comments
 (0)