Skip to content

Commit a31e2b8

Browse files
committed
add discovery implementation and test
1 parent d8e2647 commit a31e2b8

10 files changed

Lines changed: 237 additions & 11 deletions

File tree

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ lazy val discoveryEureka = pekkoModule("discovery-eureka")
109109
.enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin)
110110
.settings(
111111
name := "pekko-discovery-eureka",
112-
libraryDependencies := Dependencies.discoveryMarathonApi,
112+
libraryDependencies := Dependencies.discoveryEureka,
113113
mimaPreviousArtifactsSet)
114114

115115
// gathers all enabled routes and serves them (HTTP or otherwise)

discovery-eureka/src/main/resources/reference.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ pekko.discovery {
88

99
eureka {
1010
class = org.apache.pekko.discovery.eureka.EurekaServiceDiscovery
11+
# default eureka schema
12+
eureka-schema = "http"
1113
# default eureka host
1214
eureka-host = "127.0.0.1"
1315
# default eureka port
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.pekko.discovery.eureka
21+
22+
object EurekaResponse {
23+
case class Application(name: String, instance: Seq[Instance])
24+
case class Instance(hostName: String, app: String, vipAddress: String, secureVipAddress: String, ipAddr: String, status: String, port: PortWrapper, securePort: PortWrapper, healthCheckUrl: String, statusPageUrl: String, homePageUrl: String, appGroupName: String, dataCenterInfo: DataCenterInfo, lastDirtyTimestamp: String)
25+
case class Status()
26+
case class PortWrapper(port: Int, enabled: Boolean)
27+
case class DataCenterInfo(name: String = "MyOwn", clz : String = "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo")
28+
}
29+
30+
import EurekaResponse._
31+
32+
case class EurekaResponse(application: Application, errorCode: Option[String])

discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/EurekaServiceDiscovery.scala

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,70 @@
2020
package org.apache.pekko.discovery.eureka
2121

2222
import org.apache.pekko.actor.ActorSystem
23-
import org.apache.pekko.discovery.{ Lookup, ServiceDiscovery }
23+
import org.apache.pekko.discovery.ServiceDiscovery.{Resolved, ResolvedTarget}
24+
import org.apache.pekko.discovery.eureka.JsonFormat._
25+
import org.apache.pekko.discovery.{Lookup, ServiceDiscovery}
26+
import org.apache.pekko.event.{LogSource, Logging}
27+
import org.apache.pekko.http.scaladsl.Http
28+
import org.apache.pekko.http.scaladsl.model.headers._
29+
import org.apache.pekko.http.scaladsl.model.{HttpRequest, MediaRange, MediaTypes, Uri}
30+
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshal
2431

32+
import java.net.InetAddress
2533
import scala.concurrent.Future
2634
import scala.concurrent.duration.FiniteDuration
35+
import scala.util.Try
2736

28-
class EurekaServiceDiscovery(system: ActorSystem) extends ServiceDiscovery {
37+
class EurekaServiceDiscovery(implicit system: ActorSystem) extends ServiceDiscovery {
38+
39+
import system.dispatcher
40+
41+
private val log = Logging(system, getClass)(LogSource.fromClass)
42+
private val settings = EurekaSettings(system)
43+
private val (schema, host, port, path, group) = (settings.schema, settings.host, settings.port, settings.path, settings.groupName)
44+
private val http = Http()
45+
46+
override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[ServiceDiscovery.Resolved] = {
47+
48+
val uriPath = Uri.Path.Empty / path / "apps" / lookup.serviceName
49+
val uri = Uri.from(scheme = schema, host = host, port = port).withPath(uriPath)
50+
val request = HttpRequest(uri = uri, headers = Seq(`Accept-Encoding`(HttpEncodings.gzip), Accept(MediaRange(MediaTypes.`application/json`))))
51+
52+
log.info("Requesting seed nodes by: {}", request.uri)
53+
54+
for {
55+
response <- http.singleRequest(request)
56+
entity <- response.entity.toStrict(resolveTimeout)
57+
response <- {
58+
log.debug("Eureka response: [{}]", entity.data.utf8String)
59+
val unmarshalled = Unmarshal(entity).to[EurekaResponse]
60+
unmarshalled.failed.foreach { _ =>
61+
log.error(
62+
"Failed to unmarshal Eureka response status [{}], entity: [{}], uri: [{}]",
63+
response.status.value,
64+
entity.data.utf8String,
65+
uri)
66+
}
67+
unmarshalled
68+
}
69+
instances <- pick(response.application.instance)
70+
} yield Resolved(lookup.serviceName, targets(instances))
71+
72+
}
73+
74+
private[eureka] def pick(instances: Seq[EurekaResponse.Instance]): Future[Seq[EurekaResponse.Instance]] = {
75+
Future.successful(instances.collect({
76+
case instance if instance.status == "UP" && instance.appGroupName == group => instance
77+
}))
78+
}
79+
80+
private[eureka] def targets(instances: Seq[EurekaResponse.Instance]): Seq[ResolvedTarget] = {
81+
instances.map { instance =>
82+
ResolvedTarget(
83+
host = instance.ipAddr,
84+
port = Some(instance.port.port),
85+
address = Try(InetAddress.getByName(instance.ipAddr)).toOption)
86+
}
87+
}
2988

30-
override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[ServiceDiscovery.Resolved] = ???
3189
}

discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/EurekaSettings.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,16 @@ import pekko.annotation.ApiMayChange
3434
final class EurekaSettings(system: ExtendedActorSystem) extends Extension {
3535
private val eurekaConfig = system.settings.config.getConfig("pekko.discovery.eureka")
3636

37-
val eurekaHost: String = eurekaConfig.getString("eureka-host")
38-
val eurekaPort: Int = eurekaConfig.getInt("eureka-port")
39-
val eurekaPath: String = eurekaConfig.getString("eureka-path")
37+
val schema: String = eurekaConfig.getString("eureka-schema")
38+
val host: String = eurekaConfig.getString("eureka-host")
39+
val port: Int = eurekaConfig.getInt("eureka-port")
40+
val path: String = eurekaConfig.getString("eureka-path")
4041
val groupName: String = eurekaConfig.getString("group-name")
4142
val statusPageUrl: String = eurekaConfig.getString("status-page-url")
4243
val healthCheckUrl: String = eurekaConfig.getString("health-page-url")
4344
val homePageUrl: String = eurekaConfig.getString("home-page-url")
4445
val servicePort: Int = eurekaConfig.getInt("service-port")
4546
val serviceName: String = system.name
46-
val managementPort: Int = system.settings.config.getInt("pekko.management.http.port");
4747
val renewInterval: Long = eurekaConfig.getLong("renew-interval")
4848
}
4949

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* license agreements; and to You under the Apache License, version 2.0:
4+
*
5+
* https://www.apache.org/licenses/LICENSE-2.0
6+
*
7+
* This file is part of the Apache Pekko project, which was derived from Akka.
8+
*/
9+
10+
/*
11+
* Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
12+
*/
13+
14+
package org.apache.pekko.discovery.eureka
15+
16+
import org.apache.pekko.discovery.eureka.EurekaResponse.{Application, DataCenterInfo, Instance, PortWrapper}
17+
import org.apache.pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
18+
import spray.json._
19+
20+
object JsonFormat extends SprayJsonSupport with DefaultJsonProtocol {
21+
implicit val portFormat: JsonFormat[PortWrapper] = new JsonFormat[PortWrapper] {
22+
23+
override def read(json: JsValue): PortWrapper = {
24+
json.asJsObject.getFields("$", "@enabled") match {
25+
case Seq(JsNumber(port), JsString(enabled)) => PortWrapper(port.toInt, enabled.toBoolean)
26+
case _ => throw DeserializationException("PortWrapper expected")
27+
}
28+
}
29+
30+
override def write(obj: PortWrapper): JsValue = JsObject(
31+
"$" -> JsNumber(obj.port),
32+
"@enabled" -> JsString(obj.enabled.toString))
33+
}
34+
implicit val dataCenterInfoFormat: JsonFormat[DataCenterInfo] = new JsonFormat[DataCenterInfo] {
35+
36+
override def read(json: JsValue): DataCenterInfo = {
37+
json.asJsObject.getFields("name","@class") match {
38+
case Seq(JsString(name), JsString(clz)) => DataCenterInfo(name, clz)
39+
case _ => throw DeserializationException("DataCenterInfo expected")
40+
}
41+
}
42+
43+
override def write(obj: DataCenterInfo): JsValue = JsObject(
44+
"name" -> JsString(obj.name),
45+
"@class" -> JsString(obj.clz))
46+
}
47+
implicit val instanceFormat: JsonFormat[Instance] = jsonFormat14(Instance.apply)
48+
implicit val applicationFormat: JsonFormat[Application] = jsonFormat2(Application.apply)
49+
implicit val rootFormat: RootJsonFormat[EurekaResponse] = jsonFormat2(EurekaResponse.apply)
50+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
import org.apache.pekko.actor.ActorSystem
19+
import org.apache.pekko.discovery.ServiceDiscovery.ResolvedTarget
20+
import org.apache.pekko.discovery.eureka.EurekaServiceDiscovery
21+
import org.apache.pekko.testkit.TestKitBase
22+
import org.scalatest.BeforeAndAfterAll
23+
import org.scalatest.concurrent.ScalaFutures
24+
import org.scalatest.matchers.should.Matchers
25+
import org.scalatest.time.{Millis, Seconds, Span}
26+
import org.scalatest.wordspec.AnyWordSpecLike
27+
28+
import java.net.InetAddress
29+
import scala.concurrent.duration.DurationInt
30+
import scala.io.Source
31+
import scala.util.Try
32+
33+
/*
34+
* Licensed to the Apache Software Foundation (ASF) under one
35+
* or more contributor license agreements. See the NOTICE file
36+
* distributed with this work for additional information
37+
* regarding copyright ownership. The ASF licenses this file
38+
* to you under the Apache License, Version 2.0 (the
39+
* "License"); you may not use this file except in compliance
40+
* with the License. You may obtain a copy of the License at
41+
*
42+
* http://www.apache.org/licenses/LICENSE-2.0
43+
*
44+
* Unless required by applicable law or agreed to in writing,
45+
* software distributed under the License is distributed on an
46+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
47+
* KIND, either express or implied. See the License for the
48+
* specific language governing permissions and limitations
49+
* under the License.
50+
*/
51+
52+
53+
class EurekaServiceDiscoverySpec
54+
extends AnyWordSpecLike
55+
with Matchers
56+
with BeforeAndAfterAll
57+
with TestKitBase
58+
with ScalaFutures {
59+
"Eureka Discovery" should {
60+
"work for defaults" in {
61+
62+
val lookupService = new EurekaServiceDiscovery()
63+
val resolved = lookupService.lookup("BANK-ACCOUNT", 10.seconds).futureValue
64+
resolved.addresses should contain(
65+
ResolvedTarget(
66+
host = "127.0.0.1",
67+
port = Some(8558),
68+
address = Try(InetAddress.getByName("127.0.0.1")).toOption))
69+
70+
}
71+
}
72+
73+
private def resourceAsString(name: String): String =
74+
Source.fromInputStream(getClass.getClassLoader.getResourceAsStream(name)).mkString
75+
76+
override def afterAll(): Unit = {
77+
super.afterAll()
78+
print("clean up \n")
79+
}
80+
81+
override implicit lazy val system: ActorSystem = ActorSystem("test")
82+
83+
implicit override val patienceConfig: PatienceConfig =
84+
PatienceConfig(timeout = scaled(Span(30, Seconds)), interval = scaled(Span(50, Millis)))
85+
}

discovery-marathon-api/src/main/scala/org/apache/pekko/discovery/marathon/AppList.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313

1414
package org.apache.pekko.discovery.marathon
1515

16-
import scala.collection.immutable.Seq
17-
1816
object AppList {
1917
case class App(container: Option[Container], portDefinitions: Option[Seq[PortDefinition]], tasks: Option[Seq[Task]])
2018
case class Container(portMappings: Option[Seq[PortMapping]], docker: Option[Docker])

discovery-marathon-api/src/main/scala/org/apache/pekko/discovery/marathon/MarathonApiServiceDiscovery.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import pekko.http.scaladsl._
2020
import pekko.http.scaladsl.model._
2121
import pekko.http.scaladsl.unmarshalling.Unmarshal
2222

23-
import scala.collection.immutable.Seq
2423
import scala.concurrent.Future
2524
import scala.concurrent.duration.FiniteDuration
2625
import scala.util.Try

project/Dependencies.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ object Dependencies {
8181
"org.apache.pekko" %% "pekko-stream" % pekkoVersion,
8282
"org.apache.pekko" %% "pekko-http" % pekkoHttpVersion,
8383
"org.apache.pekko" %% "pekko-http-spray-json" % pekkoHttpVersion,
84+
"org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test,
85+
"org.apache.pekko" %% "pekko-slf4j" % pekkoVersion % Test,
8486
"org.scalatest" %% "scalatest" % scalaTestVersion % Test)
8587

8688
val discoveryKubernetesApi = Seq(

0 commit comments

Comments
 (0)