forked from eclipse-vertx/vertx-sql-client
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPgConnectionFactory.java
More file actions
181 lines (167 loc) · 7.51 KB
/
PgConnectionFactory.java
File metadata and controls
181 lines (167 loc) · 7.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/*
* Copyright (c) 2011-2026 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.pgclient.impl;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.net.NetSocketInternal;
import io.vertx.core.net.*;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.SslMode;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.impl.ConnectionFactoryBase;
import io.vertx.sqlclient.internal.Connection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Predicate;
/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class PgConnectionFactory extends ConnectionFactoryBase<PgConnectOptions> {
public PgConnectionFactory(VertxInternal vertx) {
super(vertx);
}
public PgConnectionFactory(VertxInternal vertx, NetClientOptions transportOptions) {
super(vertx, transportOptions);
}
private void checkSslMode(PgConnectOptions options) {
switch (options.getSslMode()) {
case VERIFY_FULL:
String hostnameVerificationAlgorithm = options.getSslOptions().getHostnameVerificationAlgorithm();
if (hostnameVerificationAlgorithm == null || hostnameVerificationAlgorithm.isEmpty()) {
throw new IllegalArgumentException("Host verification algorithm must be specified under verify-full sslmode");
}
case VERIFY_CA:
TrustOptions trustOptions = options.getSslOptions().getTrustOptions();
if (trustOptions == null) {
throw new IllegalArgumentException("Trust options must be specified under verify-full or verify-ca sslmode");
}
break;
}
}
@Override
protected Future<Connection> doConnectInternal(PgConnectOptions options, ContextInternal context) {
try {
checkSslMode(options);
} catch (Exception e) {
return context.failedFuture(e);
}
SocketAddress server = options.getSocketAddress();
return connect(server, context, true, options);
}
public Future<Void> cancelRequest(PgConnectOptions options, int processId, int secretKey) {
return connect(options.getSocketAddress(), vertx.createEventLoopContext(), false, options)
.compose(conn -> {
PgSocketConnection socket = (PgSocketConnection) conn;
return socket.sendCancelRequestMessage(processId, secretKey);
});
}
private Future<Connection> connect(SocketAddress server, ContextInternal context, boolean sendStartupMessage, PgConnectOptions options) {
SslMode sslMode = options.isUsingDomainSocket() ? SslMode.DISABLE : options.getSslMode();
ConnectOptions connectOptions = new ConnectOptions()
.setRemoteAddress(server);
Future<Connection> connFuture;
switch (sslMode) {
case DISABLE:
connFuture = connect(connectOptions, context, false, sendStartupMessage, options);
break;
case ALLOW:
connFuture = connect(connectOptions, context, false, sendStartupMessage, options).recover(err -> connect(connectOptions, context, true, sendStartupMessage, options));
break;
case PREFER:
connFuture = connect(connectOptions, context, true, sendStartupMessage, options).recover(err -> connect(connectOptions, context, false, sendStartupMessage, options));
break;
case REQUIRE:
case VERIFY_CA:
case VERIFY_FULL:
connFuture = connect(connectOptions, context, true, sendStartupMessage, options);
break;
default:
return context.failedFuture(new IllegalArgumentException("Unsupported SSL mode"));
}
return connFuture;
}
private Future<Connection> connect(ConnectOptions connectOptions, ContextInternal context, boolean ssl, boolean sendStartupMessage, PgConnectOptions options) {
Future<Connection> res = doConnect(connectOptions, context, ssl, options);
if (sendStartupMessage) {
return res.flatMap(conn -> {
PgSocketConnection socket = (PgSocketConnection) conn;
socket.init();
String username = options.getUser();
String password = options.getPassword();
String database = options.getDatabase();
Map<String, String> properties = options.getProperties() != null ? Collections.unmodifiableMap(options.getProperties()) : null;
return socket.sendStartupMessage(username, password, database, properties);
});
} else {
return res;
}
}
private Future<Connection> doConnect(ConnectOptions connectOptions, ContextInternal context, boolean ssl, PgConnectOptions options) {
Future<NetSocket> soFut;
try {
soFut = client.connect(connectOptions);
} catch (Exception e) {
// Client is closed
return context.failedFuture(e);
}
Future<Connection> connFut = soFut.map(so -> newSocketConnection(context, (NetSocketInternal) so, options));
if (ssl && !connectOptions.getRemoteAddress().isDomainSocket()) {
// upgrade connection to SSL if needed
connFut = connFut.flatMap(conn -> Future.future(p -> {
PgSocketConnection socket = (PgSocketConnection) conn;
ClientSSLOptions o = options.getSslOptions().copy();
if (o.getHostnameVerificationAlgorithm() == null) {
o.setHostnameVerificationAlgorithm("");
}
socket.upgradeToSSLConnection(o, ar2 -> {
if (ar2.succeeded()) {
p.complete(conn);
} else {
p.fail(ar2.cause());
}
});
}));
}
return connFut;
}
@Override
public Future<SqlConnection> connect(Context context, PgConnectOptions options) {
ContextInternal contextInternal = (ContextInternal) context;
if (options.isUsingDomainSocket() && !vertx.transport().supportsDomainSockets()) {
return contextInternal.failedFuture(UDS_NOT_SUPPORTED);
}
PromiseInternal<SqlConnection> promise = contextInternal.promise();
connect(asEventLoopContext(contextInternal), options)
.map(conn -> {
PgConnectionImpl pgConn = new PgConnectionImpl(this, contextInternal, conn);
conn.init(pgConn);
return (SqlConnection)pgConn;
})
.onComplete(promise);
return promise.future();
}
private PgSocketConnection newSocketConnection(ContextInternal context, NetSocketInternal socket, PgConnectOptions options) {
boolean cachePreparedStatements = options.getCachePreparedStatements();
int preparedStatementCacheMaxSize = options.getPreparedStatementCacheMaxSize();
Predicate<String> preparedStatementCacheSqlFilter = options.getPreparedStatementCacheSqlFilter();
int pipeliningLimit = options.getPipeliningLimit();
boolean useLayer7Proxy = options.getUseLayer7Proxy();
VertxMetrics vertxMetrics = vertx.metrics();
ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", options.getMetricsName()) : null;
PgSocketConnection conn = new PgSocketConnection(socket, metrics, options, cachePreparedStatements, preparedStatementCacheMaxSize, preparedStatementCacheSqlFilter, pipeliningLimit, useLayer7Proxy, context);
return conn;
}
}