Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_dialect():
# test get_schema_names
schema_names = insp.get_schema_names()
if not operator.ge(
schema_names, ["root.__system", "root.cursor", "root.cursor_s1"]
schema_names, ["root.__audit", "root.cursor", "root.cursor_s1"]
):
test_fail()
print_message("Actual result " + str(schema_names))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
Expand Down Expand Up @@ -89,16 +90,19 @@ static class Factory extends BaseClientFactory<RaftGroup, RatisClient> {
private final RaftProperties raftProperties;
private final RaftClientRpc clientRpc;
private final RatisConfig.Client config;
private final Parameters parameters;

public Factory(
ClientManager<RaftGroup, RatisClient> clientManager,
RaftProperties raftProperties,
RaftClientRpc clientRpc,
RatisConfig.Client config) {
RatisConfig.Client config,
Parameters parameters) {
super(clientManager);
this.raftProperties = raftProperties;
this.clientRpc = clientRpc;
this.config = config;
this.parameters = parameters;
}

@Override
Expand All @@ -116,6 +120,7 @@ public PooledObject<RatisClient> makeObject(RaftGroup group) {
.setRaftGroup(group)
.setRetryPolicy(new RatisRetryPolicy(config))
.setClientRpc(clientRpc)
.setParameters(parameters)
.build(),
clientManager));
}
Expand All @@ -131,16 +136,19 @@ static class EndlessRetryFactory extends BaseClientFactory<RaftGroup, RatisClien
private final RaftProperties raftProperties;
private final RaftClientRpc clientRpc;
private final RatisConfig.Client config;
private final Parameters parameters;

public EndlessRetryFactory(
ClientManager<RaftGroup, RatisClient> clientManager,
RaftProperties raftProperties,
RaftClientRpc clientRpc,
RatisConfig.Client config) {
RatisConfig.Client config,
Parameters parameters) {
super(clientManager);
this.raftProperties = raftProperties;
this.clientRpc = clientRpc;
this.config = config;
this.parameters = parameters;
}

@Override
Expand All @@ -157,6 +165,7 @@ public PooledObject<RatisClient> makeObject(RaftGroup group) {
.setProperties(raftProperties)
.setRaftGroup(group)
.setRetryPolicy(new RatisEndlessRetryPolicy(config))
.setParameters(parameters)
.setClientRpc(clientRpc)
.build(),
clientManager));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class RatisConsensus implements IConsensus {

private final RaftProperties properties = new RaftProperties();
private final RaftClientRpc clientRpc;
private final Parameters parameters;

private final IClientManager<RaftGroup, RatisClient> clientManager;
private final IClientManager<RaftGroup, RatisClient> reconfigurationClientManager;
Expand Down Expand Up @@ -158,7 +159,7 @@ public RatisConsensus(ConsensusConfig config, IStateMachine.Registry registry) {
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(storageDir));
GrpcConfigKeys.Server.setPort(properties, config.getThisNodeEndPoint().getPort());

Parameters parameters = Utils.initRatisConfig(properties, config.getRatisConfig());
this.parameters = Utils.initRatisConfig(properties, config.getRatisConfig());
this.config = config.getRatisConfig();
this.readOption = this.config.getRead().getReadOption();
this.canServeStaleRead =
Expand Down Expand Up @@ -223,6 +224,7 @@ public RatisConsensus(ConsensusConfig config, IStateMachine.Registry registry) {
.setServerId(myself.getId())
.setProperties(properties)
.setOption(RaftStorage.StartupOption.RECOVER)
.setParameters(parameters)
.setStateMachineRegistry(
raftGroupId ->
new ApplicationStateMachineProxy(
Expand Down Expand Up @@ -1034,8 +1036,9 @@ public GenericKeyedObjectPool<RaftGroup, RatisClient> createClientPool(
new GenericKeyedObjectPool<>(
isReconfiguration
? new RatisClient.EndlessRetryFactory(
manager, properties, clientRpc, config.getClient())
: new RatisClient.Factory(manager, properties, clientRpc, config.getClient()),
manager, properties, clientRpc, config.getClient(), parameters)
: new RatisClient.Factory(
manager, properties, clientRpc, config.getClient(), parameters),
new ClientPoolProperty.Builder<RatisClient>()
.setMaxClientNumForEachNode(config.getClient().getMaxClientNumForEachNode())
.build()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.consensus.ratis.utils;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.X509ExtendedTrustManager;
import javax.net.ssl.X509TrustManager;

import java.net.Socket;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;

public class NoHostnameVerificationTrustManager extends X509ExtendedTrustManager {

private final X509TrustManager delegate;

public NoHostnameVerificationTrustManager(X509TrustManager delegate) {
this.delegate = delegate;
}

@Override
public X509Certificate[] getAcceptedIssuers() {
return delegate.getAcceptedIssuers();
}

@Override
public void checkClientTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
delegate.checkClientTrusted(chain, authType);
}

@Override
public void checkServerTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
delegate.checkServerTrusted(chain, authType);
}

@Override
public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket)
throws CertificateException {
if (delegate instanceof X509ExtendedTrustManager) {
((X509ExtendedTrustManager) delegate).checkClientTrusted(chain, authType, socket);
} else {
delegate.checkClientTrusted(chain, authType);
}
}

@Override
public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket)
throws CertificateException {
// Skip hostname check by calling base method
delegate.checkServerTrusted(chain, authType);
}

@Override
public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine)
throws CertificateException {
if (delegate instanceof X509ExtendedTrustManager) {
((X509ExtendedTrustManager) delegate).checkClientTrusted(chain, authType, engine);
} else {
delegate.checkClientTrusted(chain, authType);
}
}

@Override
public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine)
throws CertificateException {
// Skip hostname check by calling base method
delegate.checkServerTrusted(chain, authType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;

import java.io.File;
import java.io.InputStream;
Expand Down Expand Up @@ -385,7 +386,13 @@ public static Parameters initRatisConfig(RaftProperties properties, RatisConfig
TrustManagerFactory tmf =
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(trustStore);
TrustManager trustManager = tmf.getTrustManagers()[0];
TrustManager originalTrustManager = tmf.getTrustManagers()[0];

// The self-signed certification may not set Subject Alternative Name (SAN)
// Thrift with ssl didn't check it, but Grpc did.
// Wrap to disable the verification
TrustManager trustManager =
new NoHostnameVerificationTrustManager((X509TrustManager) originalTrustManager);
GrpcConfigKeys.TLS.setConf(parameters, new GrpcTlsConfig(keyManager, trustManager, true));
} catch (Exception e) {
LOGGER.error("Failed to read key store or trust store.", e);
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
<sonar.coverage.jacoco.xmlReportPaths>target/jacoco-merged-reports/jacoco.xml</sonar.coverage.jacoco.xmlReportPaths>
<!-- Exclude all generated code -->
<sonar.exclusions>**/generated-sources</sonar.exclusions>
<sonar.test.exclusions>**/test/**</sonar.test.exclusions>
<!-- URL of the ASF SonarQube server -->
<sonar.host.url>https://sonarcloud.io</sonar.host.url>
<sonar.java.checkstyle.reportPaths>target/checkstyle-report.xml</sonar.java.checkstyle.reportPaths>
Expand Down
Loading