forked from eclipse-vertx/vertx-sql-client
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDB2SocketConnection.java
More file actions
153 lines (136 loc) · 5.52 KB
/
DB2SocketConnection.java
File metadata and controls
153 lines (136 loc) · 5.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/*
* Copyright (c) 2011-2026 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.db2client.impl;
import io.netty.channel.ChannelPipeline;
import io.vertx.core.Completable;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.net.NetSocketInternal;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.db2client.DB2ConnectOptions;
import io.vertx.db2client.DB2Exception;
import io.vertx.db2client.impl.codec.*;
import io.vertx.db2client.impl.command.InitialHandshakeCommand;
import io.vertx.db2client.impl.drda.ConnectionMetaData;
import io.vertx.db2client.impl.drda.SQLState;
import io.vertx.db2client.impl.drda.SqlCode;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.codec.CommandMessage;
import io.vertx.sqlclient.codec.SocketConnectionBase;
import io.vertx.sqlclient.internal.PreparedStatement;
import io.vertx.sqlclient.internal.QueryResultHandler;
import io.vertx.sqlclient.spi.DatabaseMetadata;
import io.vertx.sqlclient.spi.connection.Connection;
import io.vertx.sqlclient.spi.protocol.CommandBase;
import io.vertx.sqlclient.spi.protocol.ExtendedQueryCommand;
import io.vertx.sqlclient.spi.protocol.SimpleQueryCommand;
import io.vertx.sqlclient.spi.protocol.TxCommand;
import java.util.Map;
import java.util.function.Predicate;
public class DB2SocketConnection extends SocketConnectionBase {
private final DB2ConnectOptions connectOptions;
private DB2Codec codec;
private Handler<Void> closeHandler;
public final ConnectionMetaData connMetadata = new ConnectionMetaData();
public ConnectionState status = ConnectionState.CONNECTING;
public DB2SocketConnection(NetSocketInternal socket,
ClientMetrics clientMetrics,
DB2ConnectOptions connectOptions,
boolean cachePreparedStatements,
int preparedStatementCacheSize,
Predicate<String> preparedStatementCacheSqlFilter,
int pipeliningLimit,
ContextInternal context) {
super(socket, clientMetrics, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
this.connectOptions = connectOptions;
}
@Override
protected <R> void fail(CommandBase<R> command, Completable<R> handler, Throwable err) {
if (status == ConnectionState.CONNECTING && command instanceof InitialHandshakeCommand) {
// Sometimes DB2 closes the connection when sending an invalid Database name.
// -4499 = A fatal error occurred that resulted in a disconnect from the data
// source.
// 08001 = "The connection was unable to be established"
err = new DB2Exception("The connection was closed by the database server.", SqlCode.CONNECTION_REFUSED,
SQLState.AUTH_DATABASE_CONNECTION_REFUSED);
}
super.fail(command, handler, err);
}
// TODO RETURN FUTURE ???
void sendStartupMessage(String username,
String password,
String database,
Map<String, String> properties,
Promise<Connection> completionHandler) {
InitialHandshakeCommand cmd = new InitialHandshakeCommand(this, username, password, database, properties);
schedule(context, cmd).onComplete(completionHandler);
}
@Override
protected SqlConnectOptions connectOptions() {
return connectOptions;
}
@Override
public void init() {
codec = new DB2Codec(this);
ChannelPipeline pipeline = socket.channelHandlerContext().pipeline();
pipeline.addBefore("handler", "codec", codec);
super.init();
}
@Override
protected CommandMessage<?, ?> toMessage(ExtendedQueryCommand<?> command, PreparedStatement preparedStatement) {
if (command.isBatch()) {
return new ExtendedBatchQueryDB2CommandMessage<>(command, (DB2PreparedStatement) preparedStatement);
} else {
return new ExtendedQueryDB2CommandMessage(command, (DB2PreparedStatement) preparedStatement);
}
}
@Override
protected CommandMessage<?, ?> toMessage(CommandBase<?> command) {
return DB2CommandMessage.wrap(command);
}
@Override
protected <R> void doSchedule(CommandBase<R> cmd, Completable<R> handler) {
if (cmd instanceof TxCommand) {
TxCommand<R> txCmd = (TxCommand<R>) cmd;
if (txCmd.kind() == TxCommand.Kind.BEGIN) {
// DB2 always implicitly starts a transaction with each query, and does
// not support the 'BEGIN' keyword. Instead we can no-op BEGIN commands
handler.succeed(txCmd.result());
} else {
SimpleQueryCommand<Void> cmd2 = new SimpleQueryCommand<>(txCmd.kind().sql(), false, false,
SocketConnectionBase.NULL_COLLECTOR, QueryResultHandler.NOOP_HANDLER);
super.doSchedule(cmd2, (res, err) -> handler.complete(txCmd.result(), err));
}
} else {
super.doSchedule(cmd, handler);
}
}
@Override
public void handleClose(Throwable t) {
super.handleClose(t);
if (closeHandler != null) {
context().runOnContext(closeHandler);
}
}
@Override
public String system() {
return "db2";
}
@Override
public DatabaseMetadata databaseMetadata() {
return connMetadata.getDbMetadata();
}
public DB2SocketConnection closeHandler(Handler<Void> handler) {
closeHandler = handler;
return this;
}
}