Skip to content

Commit 0bf41c0

Browse files
pjfanningCopilot
andauthored
cluster-bootstrap support TLS requests in client calls (#426)
* cluster-bootstrap support TLS requests in client calls Update HttpContactPointBootstrap.scala Update HttpContactPointBootstrap.scala add cert Update BootstrapCoordinatorSpec.scala extra test make TLS version configurable cert unused Update BootstrapCoordinatorSpec.scala Update BootstrapCoordinatorSpec.scala Update HttpContactPointBootstrap.scala Revert "cert unused" This reverts commit 83b45b5. add tests * Update management-cluster-bootstrap/src/main/resources/reference.conf Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 9bf6504 commit 0bf41c0

8 files changed

Lines changed: 142 additions & 6 deletions

File tree

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ lazy val managementClusterBootstrap = pekkoModule("management-cluster-bootstrap"
151151
libraryDependencies := Dependencies.managementClusterBootstrap,
152152
mimaPreviousArtifactsSet)
153153
.dependsOn(management)
154+
.dependsOn(managementPki)
154155

155156
lazy val leaseKubernetes = pekkoModule("lease-kubernetes")
156157
.enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin)

management-cluster-bootstrap/src/main/resources/reference.conf

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,13 @@ pekko.management {
134134

135135
# Max amount of jitter to be added on retries
136136
probe-interval-jitter = 0.2
137+
138+
http-client {
139+
# set this to your HTTPS certificate path if you want to setup a HTTPS trust store
140+
ca-path = ""
141+
# the TLS version to use when connecting to contact points
142+
tls-version = "TLSv1.2"
143+
}
137144
}
138145

139146
join-decider {

management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrapSettings.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,13 @@ final class ClusterBootstrapSettings(config: Config, log: LoggingAdapter) {
135135
object contactPoint {
136136
private val contactPointConfig = bootConfig.getConfig("contact-point")
137137

138+
object httpClient {
139+
private val httpClientConfig = contactPointConfig.getConfig("http-client")
140+
141+
val caPath: String = httpClientConfig.getString("ca-path")
142+
val tlsVersion: String = httpClientConfig.getString("tls-version")
143+
}
144+
138145
val fallbackPort: Int =
139146
contactPointConfig
140147
.optDefinedValue("fallback-port")

management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@
1414
package org.apache.pekko.management.cluster.bootstrap.internal
1515

1616
import java.time.LocalDateTime
17+
import java.security.{ KeyStore, SecureRandom }
1718
import java.util.concurrent.ThreadLocalRandom
1819
import java.util.concurrent.TimeoutException
20+
import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager }
1921
import scala.concurrent.Future
2022
import scala.concurrent.duration._
23+
2124
import org.apache.pekko
2225
import pekko.actor.Actor
2326
import pekko.actor.ActorLogging
@@ -29,7 +32,9 @@ import pekko.actor.Timers
2932
import pekko.annotation.InternalApi
3033
import pekko.cluster.Cluster
3134
import pekko.discovery.ServiceDiscovery.ResolvedTarget
35+
import pekko.http.scaladsl.ConnectionContext
3236
import pekko.http.scaladsl.Http
37+
import pekko.http.scaladsl.HttpsConnectionContext
3338
import pekko.http.scaladsl.model.HttpResponse
3439
import pekko.http.scaladsl.model.StatusCodes
3540
import pekko.http.scaladsl.model.Uri
@@ -41,6 +46,7 @@ import pekko.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol
4146
import pekko.management.cluster.bootstrap.contactpoint.{ ClusterBootstrapRequests, HttpBootstrapJsonProtocol }
4247
import pekko.pattern.after
4348
import pekko.pattern.pipe
49+
import pekko.pki.kubernetes.PemManagersProvider
4450

4551
@InternalApi
4652
private[bootstrap] object HttpContactPointBootstrap {
@@ -56,6 +62,26 @@ private[bootstrap] object HttpContactPointBootstrap {
5662

5763
private case object ProbeTick extends DeadLetterSuppression
5864
private val ProbingTimerKey = "probing-key"
65+
66+
def generateSSLContext(settings: ClusterBootstrapSettings): SSLContext = {
67+
val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm)
68+
val keyStore = KeyStore.getInstance("PKCS12")
69+
keyStore.load(null)
70+
factory.init(keyStore, Array.empty)
71+
val km: Array[KeyManager] = factory.getKeyManagers
72+
val caPath = settings.contactPoint.httpClient.caPath.trim
73+
val tm: Array[TrustManager] = if (caPath.isEmpty) {
74+
Array.empty
75+
} else {
76+
val certificates = PemManagersProvider.loadCertificates(caPath)
77+
PemManagersProvider.buildTrustManagers(certificates)
78+
}
79+
val tlsVersion = settings.contactPoint.httpClient.tlsVersion.trim
80+
val random: SecureRandom = new SecureRandom
81+
val sslContext = SSLContext.getInstance(tlsVersion)
82+
sslContext.init(km, tm, random)
83+
sslContext
84+
}
5985
}
6086

6187
/**
@@ -88,7 +114,12 @@ private[bootstrap] class HttpContactPointBootstrap(
88114
}
89115

90116
private implicit val sys: ActorSystem = context.system
117+
118+
private lazy val clientSslContext: HttpsConnectionContext =
119+
ConnectionContext.httpsClient(HttpContactPointBootstrap.generateSSLContext(settings))
120+
91121
private val http = Http()
122+
92123
private val connectionPoolWithoutRetries = ConnectionPoolSettings(context.system).withMaxRetries(0)
93124
import context.dispatcher
94125

@@ -111,7 +142,13 @@ private[bootstrap] class HttpContactPointBootstrap(
111142
override def receive = {
112143
case ProbeTick =>
113144
log.debug("Probing [{}] for seed nodes...", probeRequest.uri)
114-
val reply = http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries).flatMap(handleResponse)
145+
val reply = if (probeRequest.uri.scheme == "https") {
146+
http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries,
147+
connectionContext = clientSslContext)
148+
} else {
149+
http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries)
150+
}.flatMap(handleResponse)
151+
115152
val afterTimeout = after(settings.contactPoint.probingFailureTimeout, context.system.scheduler)(replyTimeout)
116153
Future.firstCompletedOf(List(reply, afterTimeout)).pipeTo(self)
117154

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
-----BEGIN CERTIFICATE-----
2+
MIIC5zCCAc+gAwIBAgIBATANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDEwptaW5p
3+
a3ViZUNBMB4XDTE3MTIxMjEzMzY1MVoXDTI3MTIxMDEzMzY1MVowFTETMBEGA1UE
4+
AxMKbWluaWt1YmVDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMrk
5+
QcE8e2L3Rnm8K51y1Y4CHWwx4XwD0SqPwGq9nnygFaBsibIIrex89Im4f73QaqR5
6+
h87ypi0dyqlaTZdleZN7Q4hNSpWF1t/zSGanm7QOSl76FlTAFNm/eVNamfuGRf1x
7+
OYWGWRwdct3Six5K+R/qHh6oJ9XDli9LuV4vxHTDB/mr/2Xgyz1MDrIdRDYpiqev
8+
3HNJqnfXFT3eGWXk4ENZsc+I/R5LbSXA+cSQd9xrkrBhbreHLk99pif7eAKwVKNZ
9+
Rcsp9QBgMOUAoFgk+sU6YeVrasXIF1R4BB7g+LpqpM3F6jqmD79j2mREMIU3kjEQ
10+
eXMqi1W31i9ug1VxwTUCAwEAAaNCMEAwDgYDVR0PAQH/BAQDAgKkMB0GA1UdJQQW
11+
MBQGCCsGAQUFBwMCBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3
12+
DQEBCwUAA4IBAQAyRchLY4Jhu1EBFlhYebGLrEO/twZCu2NQyM0by5XUoXApJeqf
13+
S00Q7A67CRcQlbtRAH5vqhpCxutlKc26dF5Y1MmJmkGT7WIjujV0UIF/jJDnmwKK
14+
+DRQl1UgA1e4WS6XOwUaSo9ltgJQ+GJfgkg3Xs3pzjjIpX94eF4V9ArJ8npRVO+w
15+
cCxE01P+Nm9U5H24QnlY+1IxNeszitm34SGiRy6SqoKSfYQoNyQadG9KVybs4FAs
16+
7aeYAB10I7FLFt4+Ji93zZjnWcKXjv59vz7NBDPtCsaXhJ82983GsfV2z+WQ3kRZ
17+
R2XVTsdz8yu0rgmyewxVKH7Roo5Ts+qpZFbi
18+
-----END CERTIFICATE-----

management-cluster-bootstrap/src/test/resources/reference.conf renamed to management-cluster-bootstrap/src/test/resources/application.conf

File renamed without changes.

management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/BootstrapCoordinatorSpec.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,23 @@
1414
package org.apache.pekko.management.cluster.bootstrap.internal
1515

1616
import java.util.concurrent.atomic.AtomicReference
17+
1718
import org.apache.pekko
1819
import pekko.actor.{ ActorRef, ActorSystem, Props }
1920
import pekko.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
2021
import pekko.discovery.{ Lookup, MockDiscovery }
2122
import pekko.http.scaladsl.model.Uri
22-
import com.typesafe.config.ConfigFactory
2323
import pekko.management.cluster.bootstrap.internal.BootstrapCoordinator.Protocol.InitiateBootstrapping
2424
import pekko.management.cluster.bootstrap.{ ClusterBootstrapSettings, LowestAddressJoinDecider }
25-
import org.scalatest.concurrent.Eventually
25+
import com.typesafe.config.ConfigFactory
2626
import org.scalatest.BeforeAndAfterAll
27+
import org.scalatest.concurrent.Eventually
28+
import org.scalatest.matchers.should.Matchers
2729
import org.scalatest.time.{ Millis, Seconds, Span }
30+
import org.scalatest.wordspec.AnyWordSpec
2831

2932
import scala.concurrent.{ Await, Future }
3033
import scala.concurrent.duration._
31-
import org.scalatest.matchers.should.Matchers
32-
import org.scalatest.wordspec.AnyWordSpec
3334

3435
class BootstrapCoordinatorSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eventually {
3536
val serviceName = "bootstrap-coordinator-test-service"

management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,82 @@
1313

1414
package org.apache.pekko.management.cluster.bootstrap.internal
1515

16+
import java.nio.file.NoSuchFileException
17+
1618
import org.apache.pekko
17-
import pekko.actor.ActorPath
19+
import pekko.actor.{ ActorPath, ActorSystem }
20+
import pekko.event.Logging
21+
import pekko.management.cluster.bootstrap.ClusterBootstrapSettings
1822
import pekko.http.scaladsl.model.Uri.Host
23+
import com.typesafe.config.ConfigFactory
1924
import org.scalatest.matchers.should.Matchers
2025
import org.scalatest.wordspec.AnyWordSpec
2126

2227
class HttpContactPointBootstrapSpec extends AnyWordSpec with Matchers {
28+
2329
"HttpContactPointBootstrap" should {
2430
"use a safe name when connecting over IPv6" in {
2531
val name = HttpContactPointBootstrap.name(Host("[fe80::1013:2070:258a:c662]"), 443)
2632
ActorPath.isValidPathElement(name) should be(true)
2733
}
34+
"generate SSLContext with default config" in {
35+
val sys = ActorSystem("HttpContactPointBootstrapSpec")
36+
val log = Logging(sys, classOf[HttpContactPointBootstrapSpec])
37+
try {
38+
val settings = new ClusterBootstrapSettings(sys.settings.config, log)
39+
HttpContactPointBootstrap.generateSSLContext(settings) should not be null
40+
} finally {
41+
sys.terminate()
42+
}
43+
}
44+
"generate SSLContext with cert" in {
45+
val sys = ActorSystem("HttpContactPointBootstrapSpec")
46+
val log = Logging(sys, classOf[HttpContactPointBootstrapSpec])
47+
try {
48+
val cfg = ConfigFactory.parseString("""
49+
pekko.management.cluster.bootstrap.contact-point.http-client {
50+
ca-path = "management-cluster-bootstrap/src/test/files/ca.crt"
51+
}""").withFallback(sys.settings.config)
52+
val settings = new ClusterBootstrapSettings(cfg, log)
53+
HttpContactPointBootstrap.generateSSLContext(settings) should not be null
54+
} finally {
55+
sys.terminate()
56+
}
57+
}
58+
"fail to generate SSLContext with missing cert" in {
59+
val sys = ActorSystem("HttpContactPointBootstrapSpec")
60+
val log = Logging(sys, classOf[HttpContactPointBootstrapSpec])
61+
try {
62+
val cfg = ConfigFactory.parseString("""
63+
pekko.management.cluster.bootstrap.contact-point.http-client {
64+
ca-path = "management-cluster-bootstrap/src/test/files/non-existent.crt"
65+
}""").withFallback(sys.settings.config)
66+
val settings = new ClusterBootstrapSettings(cfg, log)
67+
intercept[NoSuchFileException] {
68+
HttpContactPointBootstrap.generateSSLContext(settings)
69+
}
70+
} finally {
71+
sys.terminate()
72+
}
73+
}
74+
"fail to generate SSLContext with bad tls-version" in {
75+
val sys = ActorSystem("HttpContactPointBootstrapSpec")
76+
val log = Logging(sys, classOf[HttpContactPointBootstrapSpec])
77+
try {
78+
val cfg = ConfigFactory.parseString("""
79+
pekko.management.cluster.bootstrap.contact-point.http-client {
80+
ca-path = "management-cluster-bootstrap/src/test/files/ca.crt"
81+
tls-version = "BAD_VERSION"
82+
}""").withFallback(sys.settings.config)
83+
val settings = new ClusterBootstrapSettings(cfg, log)
84+
val noSuchAlgorithmException = intercept[java.security.NoSuchAlgorithmException] {
85+
HttpContactPointBootstrap.generateSSLContext(settings)
86+
}
87+
noSuchAlgorithmException.getMessage.contains("BAD_VERSION") should be(true)
88+
} finally {
89+
sys.terminate()
90+
}
91+
}
92+
2893
}
2994
}

0 commit comments

Comments
 (0)