From fbf30a182d377c499510b77ef603876c0aebf2bf Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 2 May 2025 18:49:50 +0100 Subject: [PATCH 1/3] cluster-bootstrap support TLS requests in client calls Update HttpContactPointBootstrap.scala Update HttpContactPointBootstrap.scala add cert Update BootstrapCoordinatorSpec.scala extra test make TLS version configurable cert unused Update BootstrapCoordinatorSpec.scala Update BootstrapCoordinatorSpec.scala Update HttpContactPointBootstrap.scala Revert "cert unused" This reverts commit 83b45b537de0cef771f6e029f2663c07bbd5ec26. add tests --- build.sbt | 1 + .../src/main/resources/reference.conf | 7 ++ .../bootstrap/ClusterBootstrapSettings.scala | 7 ++ .../internal/HttpContactPointBootstrap.scala | 39 ++++++++++- .../src/test/files/ca.crt | 18 +++++ .../{reference.conf => application.conf} | 0 .../internal/BootstrapCoordinatorSpec.scala | 9 +-- .../HttpContactPointBootstrapSpec.scala | 67 ++++++++++++++++++- 8 files changed, 142 insertions(+), 6 deletions(-) create mode 100644 management-cluster-bootstrap/src/test/files/ca.crt rename management-cluster-bootstrap/src/test/resources/{reference.conf => application.conf} (100%) diff --git a/build.sbt b/build.sbt index f61a1575..d4628aac 100644 --- a/build.sbt +++ b/build.sbt @@ -151,6 +151,7 @@ lazy val managementClusterBootstrap = pekkoModule("management-cluster-bootstrap" libraryDependencies := Dependencies.managementClusterBootstrap, mimaPreviousArtifactsSet) .dependsOn(management) + .dependsOn(managementPki) lazy val leaseKubernetes = pekkoModule("lease-kubernetes") .enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin) diff --git a/management-cluster-bootstrap/src/main/resources/reference.conf b/management-cluster-bootstrap/src/main/resources/reference.conf index 5332ae2e..4f7cf06a 100644 --- a/management-cluster-bootstrap/src/main/resources/reference.conf +++ b/management-cluster-bootstrap/src/main/resources/reference.conf @@ -134,6 +134,13 @@ pekko.management { # Max amount of jitter to be added on retries probe-interval-jitter = 0.2 + + http-client { + # set this to your HTTPS certificate path if you want to setup a HTTPS trust store + ca-path = "" + # the TLS version to use when connecting to the API server + tls-version = "TLSv1.2" + } } join-decider { diff --git a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrapSettings.scala b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrapSettings.scala index 3990b323..1ffc49c8 100644 --- a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrapSettings.scala +++ b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/ClusterBootstrapSettings.scala @@ -135,6 +135,13 @@ final class ClusterBootstrapSettings(config: Config, log: LoggingAdapter) { object contactPoint { private val contactPointConfig = bootConfig.getConfig("contact-point") + object httpClient { + private val httpClientConfig = contactPointConfig.getConfig("http-client") + + val caPath: String = httpClientConfig.getString("ca-path") + val tlsVersion: String = httpClientConfig.getString("tls-version") + } + val fallbackPort: Int = contactPointConfig .optDefinedValue("fallback-port") diff --git a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala index cea30b9b..f9a2c62a 100644 --- a/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala +++ b/management-cluster-bootstrap/src/main/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrap.scala @@ -14,10 +14,13 @@ package org.apache.pekko.management.cluster.bootstrap.internal import java.time.LocalDateTime +import java.security.{ KeyStore, SecureRandom } import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeoutException +import javax.net.ssl.{ KeyManager, KeyManagerFactory, SSLContext, TrustManager } import scala.concurrent.Future import scala.concurrent.duration._ + import org.apache.pekko import pekko.actor.Actor import pekko.actor.ActorLogging @@ -29,7 +32,9 @@ import pekko.actor.Timers import pekko.annotation.InternalApi import pekko.cluster.Cluster import pekko.discovery.ServiceDiscovery.ResolvedTarget +import pekko.http.scaladsl.ConnectionContext import pekko.http.scaladsl.Http +import pekko.http.scaladsl.HttpsConnectionContext import pekko.http.scaladsl.model.HttpResponse import pekko.http.scaladsl.model.StatusCodes import pekko.http.scaladsl.model.Uri @@ -41,6 +46,7 @@ import pekko.management.cluster.bootstrap.contactpoint.HttpBootstrapJsonProtocol import pekko.management.cluster.bootstrap.contactpoint.{ ClusterBootstrapRequests, HttpBootstrapJsonProtocol } import pekko.pattern.after import pekko.pattern.pipe +import pekko.pki.kubernetes.PemManagersProvider @InternalApi private[bootstrap] object HttpContactPointBootstrap { @@ -56,6 +62,26 @@ private[bootstrap] object HttpContactPointBootstrap { private case object ProbeTick extends DeadLetterSuppression private val ProbingTimerKey = "probing-key" + + def generateSSLContext(settings: ClusterBootstrapSettings): SSLContext = { + val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + val keyStore = KeyStore.getInstance("PKCS12") + keyStore.load(null) + factory.init(keyStore, Array.empty) + val km: Array[KeyManager] = factory.getKeyManagers + val caPath = settings.contactPoint.httpClient.caPath.trim + val tm: Array[TrustManager] = if (caPath.isEmpty) { + Array.empty + } else { + val certificates = PemManagersProvider.loadCertificates(caPath) + PemManagersProvider.buildTrustManagers(certificates) + } + val tlsVersion = settings.contactPoint.httpClient.tlsVersion.trim + val random: SecureRandom = new SecureRandom + val sslContext = SSLContext.getInstance(tlsVersion) + sslContext.init(km, tm, random) + sslContext + } } /** @@ -88,7 +114,12 @@ private[bootstrap] class HttpContactPointBootstrap( } private implicit val sys: ActorSystem = context.system + + private lazy val clientSslContext: HttpsConnectionContext = + ConnectionContext.httpsClient(HttpContactPointBootstrap.generateSSLContext(settings)) + private val http = Http() + private val connectionPoolWithoutRetries = ConnectionPoolSettings(context.system).withMaxRetries(0) import context.dispatcher @@ -111,7 +142,13 @@ private[bootstrap] class HttpContactPointBootstrap( override def receive = { case ProbeTick => log.debug("Probing [{}] for seed nodes...", probeRequest.uri) - val reply = http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries).flatMap(handleResponse) + val reply = if (probeRequest.uri.scheme == "https") { + http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries, + connectionContext = clientSslContext) + } else { + http.singleRequest(probeRequest, settings = connectionPoolWithoutRetries) + }.flatMap(handleResponse) + val afterTimeout = after(settings.contactPoint.probingFailureTimeout, context.system.scheduler)(replyTimeout) Future.firstCompletedOf(List(reply, afterTimeout)).pipeTo(self) diff --git a/management-cluster-bootstrap/src/test/files/ca.crt b/management-cluster-bootstrap/src/test/files/ca.crt new file mode 100644 index 00000000..7fc98192 --- /dev/null +++ b/management-cluster-bootstrap/src/test/files/ca.crt @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC5zCCAc+gAwIBAgIBATANBgkqhkiG9w0BAQsFADAVMRMwEQYDVQQDEwptaW5p +a3ViZUNBMB4XDTE3MTIxMjEzMzY1MVoXDTI3MTIxMDEzMzY1MVowFTETMBEGA1UE +AxMKbWluaWt1YmVDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMrk +QcE8e2L3Rnm8K51y1Y4CHWwx4XwD0SqPwGq9nnygFaBsibIIrex89Im4f73QaqR5 +h87ypi0dyqlaTZdleZN7Q4hNSpWF1t/zSGanm7QOSl76FlTAFNm/eVNamfuGRf1x +OYWGWRwdct3Six5K+R/qHh6oJ9XDli9LuV4vxHTDB/mr/2Xgyz1MDrIdRDYpiqev +3HNJqnfXFT3eGWXk4ENZsc+I/R5LbSXA+cSQd9xrkrBhbreHLk99pif7eAKwVKNZ +Rcsp9QBgMOUAoFgk+sU6YeVrasXIF1R4BB7g+LpqpM3F6jqmD79j2mREMIU3kjEQ +eXMqi1W31i9ug1VxwTUCAwEAAaNCMEAwDgYDVR0PAQH/BAQDAgKkMB0GA1UdJQQW +MBQGCCsGAQUFBwMCBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3 +DQEBCwUAA4IBAQAyRchLY4Jhu1EBFlhYebGLrEO/twZCu2NQyM0by5XUoXApJeqf +S00Q7A67CRcQlbtRAH5vqhpCxutlKc26dF5Y1MmJmkGT7WIjujV0UIF/jJDnmwKK ++DRQl1UgA1e4WS6XOwUaSo9ltgJQ+GJfgkg3Xs3pzjjIpX94eF4V9ArJ8npRVO+w +cCxE01P+Nm9U5H24QnlY+1IxNeszitm34SGiRy6SqoKSfYQoNyQadG9KVybs4FAs +7aeYAB10I7FLFt4+Ji93zZjnWcKXjv59vz7NBDPtCsaXhJ82983GsfV2z+WQ3kRZ +R2XVTsdz8yu0rgmyewxVKH7Roo5Ts+qpZFbi +-----END CERTIFICATE----- diff --git a/management-cluster-bootstrap/src/test/resources/reference.conf b/management-cluster-bootstrap/src/test/resources/application.conf similarity index 100% rename from management-cluster-bootstrap/src/test/resources/reference.conf rename to management-cluster-bootstrap/src/test/resources/application.conf diff --git a/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/BootstrapCoordinatorSpec.scala b/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/BootstrapCoordinatorSpec.scala index 32c72aae..9995694c 100644 --- a/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/BootstrapCoordinatorSpec.scala +++ b/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/BootstrapCoordinatorSpec.scala @@ -14,22 +14,23 @@ package org.apache.pekko.management.cluster.bootstrap.internal import java.util.concurrent.atomic.AtomicReference + import org.apache.pekko import pekko.actor.{ ActorRef, ActorSystem, Props } import pekko.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } import pekko.discovery.{ Lookup, MockDiscovery } import pekko.http.scaladsl.model.Uri -import com.typesafe.config.ConfigFactory import pekko.management.cluster.bootstrap.internal.BootstrapCoordinator.Protocol.InitiateBootstrapping import pekko.management.cluster.bootstrap.{ ClusterBootstrapSettings, LowestAddressJoinDecider } -import org.scalatest.concurrent.Eventually +import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers import org.scalatest.time.{ Millis, Seconds, Span } +import org.scalatest.wordspec.AnyWordSpec import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec class BootstrapCoordinatorSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with Eventually { val serviceName = "bootstrap-coordinator-test-service" diff --git a/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala b/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala index ef80e5be..c593dc79 100644 --- a/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala +++ b/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala @@ -13,17 +13,82 @@ package org.apache.pekko.management.cluster.bootstrap.internal +import java.nio.file.NoSuchFileException + import org.apache.pekko -import pekko.actor.ActorPath +import pekko.actor.{ ActorPath, ActorSystem } +import pekko.event.Logging +import pekko.management.cluster.bootstrap.ClusterBootstrapSettings import pekko.http.scaladsl.model.Uri.Host +import com.typesafe.config.ConfigFactory import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec class HttpContactPointBootstrapSpec extends AnyWordSpec with Matchers { + "HttpContactPointBootstrap" should { "use a safe name when connecting over IPv6" in { val name = HttpContactPointBootstrap.name(Host("[fe80::1013:2070:258a:c662]"), 443) ActorPath.isValidPathElement(name) should be(true) } + "generate SSLContext with default config" in { + val sys = ActorSystem("HttpContactPointBootstrapSpec") + val log = Logging(sys, classOf[HttpContactPointBootstrapSpec]) + try { + val settings = new ClusterBootstrapSettings(sys.settings.config, log) + HttpContactPointBootstrap.generateSSLContext(settings) should not be null + } finally { + sys.terminate() + } + } + "generate SSLContext with cert" in { + val sys = ActorSystem("HttpContactPointBootstrapSpec") + val log = Logging(sys, classOf[HttpContactPointBootstrapSpec]) + try { + val cfg = ConfigFactory.parseString(""" + pekko.management.cluster.bootstrap.contact-point.http-client { + ca-path = "management-cluster-bootstrap/src/test/files/ca.crt" + }""").withFallback(sys.settings.config) + val settings = new ClusterBootstrapSettings(cfg, log) + HttpContactPointBootstrap.generateSSLContext(settings) should not be null + } finally { + sys.terminate() + } + } + "fail to generate SSLContext with missing cert" in { + val sys = ActorSystem("HttpContactPointBootstrapSpec") + val log = Logging(sys, classOf[HttpContactPointBootstrapSpec]) + try { + val cfg = ConfigFactory.parseString(""" + pekko.management.cluster.bootstrap.contact-point.http-client { + ca-path = "management-cluster-bootstrap/src/test/files/non-existent.crt" + }""").withFallback(sys.settings.config) + val settings = new ClusterBootstrapSettings(cfg, log) + intercept[NoSuchFileException] { + HttpContactPointBootstrap.generateSSLContext(settings) + } + } finally { + sys.terminate() + } + } + "fail to generate SSLContext with bad tls-version" in { + val sys = ActorSystem("HttpContactPointBootstrapSpec") + val log = Logging(sys, classOf[HttpContactPointBootstrapSpec]) + try { + val cfg = ConfigFactory.parseString(""" + pekko.management.cluster.bootstrap.contact-point.http-client { + ca-path = "management-cluster-bootstrap/src/test/files/ca.crt" + tls-version = "BAD_VERSION" + }""").withFallback(sys.settings.config) + val settings = new ClusterBootstrapSettings(cfg, log) + val nsae = intercept[java.security.NoSuchAlgorithmException] { + HttpContactPointBootstrap.generateSSLContext(settings) + } + nsae.getMessage.contains("BAD_VERSION") should be(true) + } finally { + sys.terminate() + } + } + } } From 473659e86030a86d02a9ac37ca376126a82b5a08 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 18 Nov 2025 18:55:00 +0100 Subject: [PATCH 2/3] Update management-cluster-bootstrap/src/main/resources/reference.conf Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- management-cluster-bootstrap/src/main/resources/reference.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/management-cluster-bootstrap/src/main/resources/reference.conf b/management-cluster-bootstrap/src/main/resources/reference.conf index 4f7cf06a..0f4d7844 100644 --- a/management-cluster-bootstrap/src/main/resources/reference.conf +++ b/management-cluster-bootstrap/src/main/resources/reference.conf @@ -138,7 +138,7 @@ pekko.management { http-client { # set this to your HTTPS certificate path if you want to setup a HTTPS trust store ca-path = "" - # the TLS version to use when connecting to the API server + # the TLS version to use when connecting to contact points tls-version = "TLSv1.2" } } From cdf855322e97d990932e823afe6f3e2ca67576e9 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 18 Nov 2025 18:56:53 +0100 Subject: [PATCH 3/3] Update management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../bootstrap/internal/HttpContactPointBootstrapSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala b/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala index c593dc79..b3f8892d 100644 --- a/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala +++ b/management-cluster-bootstrap/src/test/scala/org/apache/pekko/management/cluster/bootstrap/internal/HttpContactPointBootstrapSpec.scala @@ -81,10 +81,10 @@ class HttpContactPointBootstrapSpec extends AnyWordSpec with Matchers { tls-version = "BAD_VERSION" }""").withFallback(sys.settings.config) val settings = new ClusterBootstrapSettings(cfg, log) - val nsae = intercept[java.security.NoSuchAlgorithmException] { + val noSuchAlgorithmException = intercept[java.security.NoSuchAlgorithmException] { HttpContactPointBootstrap.generateSSLContext(settings) } - nsae.getMessage.contains("BAD_VERSION") should be(true) + noSuchAlgorithmException.getMessage.contains("BAD_VERSION") should be(true) } finally { sys.terminate() }