-
Notifications
You must be signed in to change notification settings - Fork 333
Expand file tree
/
Copy pathPekkoHttpTestWebServer.scala
More file actions
293 lines (265 loc) · 10.3 KB
/
PekkoHttpTestWebServer.scala
File metadata and controls
293 lines (265 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
import PekkoHttpTestWebServer.Binder
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.Http.ServerBinding
import org.apache.pekko.http.scaladsl.model.HttpMethods.GET
import org.apache.pekko.http.scaladsl.model._
import org.apache.pekko.http.scaladsl.model.headers.RawHeader
import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.http.scaladsl.server._
import org.apache.pekko.http.scaladsl.settings.ServerSettings
import org.apache.pekko.http.scaladsl.util.FastFuture.EnhancedFuture
import org.apache.pekko.stream.{ActorMaterializer, Materializer}
import com.typesafe.config.Config
import datadog.trace.agent.test.base.HttpServerTest.ServerEndpoint._
import datadog.trace.agent.test.base.{HttpServer, HttpServerTest}
import datadog.trace.agent.test.utils.TraceUtils
import datadog.trace.bootstrap.instrumentation.api.AgentTracer
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan
import groovy.lang.Closure
import java.net.URI
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.postfixOps
import scala.util.control.NonFatal
class PekkoHttpTestWebServer(binder: Binder) extends HttpServer {
implicit val system = {
val name = s"${binder.name}"
binder.config match {
case None => ActorSystem(name)
case Some(config) => ActorSystem(name, config)
}
}
implicit val materializer = ActorMaterializer()
private var port: Int = 0
private var portBinding: Future[ServerBinding] = null
override def start(): Unit = {
portBinding = Await.ready(binder.bind(0), 10 seconds)
port = portBinding.value.get.get.localAddress.getPort
}
override def stop(): Unit = {
import materializer.executionContext
portBinding
.flatMap(_.unbind())
.onComplete(_ => system.terminate())
}
override def address(): URI = {
new URI("http://localhost:" + port + "/")
}
}
object PekkoHttpTestWebServer {
trait Binder {
def name: String
def config: Option[Config] = None
def bind(port: Int)(implicit
system: ActorSystem,
materializer: Materializer
): Future[ServerBinding]
}
val BindAndHandle: Binder = new Binder {
override def name: String = "bind-and-handle"
override def bind(port: Int)(implicit
system: ActorSystem,
materializer: Materializer
): Future[ServerBinding] = {
import materializer.executionContext
Http().bindAndHandle(route, "localhost", port)
}
}
val BindAndHandleAsyncWithRouteAsyncHandler: Binder = new Binder {
override def name: String = "bind-and-handle-async-with-route-async-handler"
override def bind(port: Int)(implicit
system: ActorSystem,
materializer: Materializer
): Future[ServerBinding] = {
import materializer.executionContext
Http().bindAndHandleAsync(Route.asyncHandler(route), "localhost", port)
}
}
val BindAndHandleSync: Binder = new Binder {
override def name: String = "bind-and-handle-sync"
override def bind(port: Int)(implicit
system: ActorSystem,
materializer: Materializer
): Future[ServerBinding] = {
Http().bindAndHandleSync(syncHandler, "localhost", port)
}
}
val BindAndHandleAsync: Binder = new Binder {
override def name: String = "bind-and-handle-async"
override def bind(port: Int)(implicit
system: ActorSystem,
materializer: Materializer
): Future[ServerBinding] = {
import materializer.executionContext
Http().bindAndHandleAsync(asyncHandler, "localhost", port)
}
}
val BindAndHandleAsyncHttp2: Binder = new Binder {
override def name: String = "bind-and-handle-async-http2"
override def bind(port: Int)(implicit
system: ActorSystem,
materializer: Materializer
): Future[ServerBinding] = {
import materializer.executionContext
val serverSettings = enableHttp2(ServerSettings(system))
Http().bindAndHandleAsync(
asyncHandler,
"localhost",
port,
settings = serverSettings
)
}
}
// This part defines the routes using the Scala routing DSL
// ---------------------------------------------------------------------- //
private val exceptionHandler = ExceptionHandler { case e: Exception =>
val span = activeSpan()
if (span != null) {
// The exception handler is bypassing the normal instrumentation flow, so we need to handle things here
TraceUtils.handleException(span, e)
span.finish()
}
complete(
HttpResponse(status = EXCEPTION.getStatus, entity = e.getMessage)
)
}
// Since the pekko-http route DSL produces a Route that is evaluated for every
// incoming request, we need to wrap the HttpServerTest.controller call and exception
// handling in a custom Directive
private def withController: Directive0 = Directive[Unit] { inner => ctx =>
def handleException: PartialFunction[Throwable, Future[RouteResult]] =
exceptionHandler andThen (_(ctx.withAcceptAll))
val uri = ctx.request.uri
val endpoint = HttpServerTest.ServerEndpoint.forPath(uri.path.toString())
HttpServerTest.controller(
endpoint,
new Closure[Future[RouteResult]](()) {
def doCall(): Future[RouteResult] = {
try
inner(())(ctx).fast
.recoverWith(handleException)(ctx.executionContext)
catch {
case NonFatal(e) =>
handleException
.applyOrElse[Throwable, Future[RouteResult]](e, throw _)
}
}
}
)
}
private val defaultHeader =
RawHeader(HttpServerTest.getIG_RESPONSE_HEADER, HttpServerTest.getIG_RESPONSE_HEADER_VALUE)
def route(implicit ec: ExecutionContext): Route = withController {
respondWithDefaultHeader(defaultHeader) {
get {
path(SUCCESS.relativePath) {
complete(
HttpResponse(status = SUCCESS.getStatus, entity = SUCCESS.getBody)
)
} ~ path(FORWARDED.relativePath) {
headerValueByName("x-forwarded-for") { address =>
complete(
HttpResponse(status = FORWARDED.getStatus, entity = address)
)
}
} ~ path(
QUERY_PARAM.relativePath | QUERY_ENCODED_BOTH.relativePath | QUERY_ENCODED_QUERY.relativePath
) {
parameter("some") { query =>
complete(
HttpResponse(
status = QUERY_PARAM.getStatus,
entity = s"some=$query"
)
)
}
} ~ path(REDIRECT.relativePath) {
redirect(Uri(REDIRECT.getBody), StatusCodes.Found)
} ~ path(ERROR.relativePath) {
complete(HttpResponse(status = ERROR.getStatus, entity = ERROR.getBody))
} ~ path(EXCEPTION.relativePath) {
throw new Exception(EXCEPTION.getBody)
} ~ pathPrefix("injected-id") {
path("ping" / IntNumber) { id =>
val traceId = AgentTracer.activeSpan().getTraceId
complete(s"pong $id -> $traceId")
} ~ path("fing" / IntNumber) { id =>
// force the response to happen on another thread or in another context
onSuccess(Future {
Thread.sleep(10);
id
}) { fid =>
val traceId = AgentTracer.activeSpan().getTraceId
complete(s"fong $fid -> $traceId")
}
}
}
}
}
}
// This part defines the sync and async handler functions
// ---------------------------------------------------------------------- //
val syncHandler: HttpRequest => HttpResponse = {
case HttpRequest(GET, uri: Uri, _, _, _) => {
val path = uri.path.toString()
val endpoint = HttpServerTest.ServerEndpoint.forPath(path)
HttpServerTest
.controller(
endpoint,
new Closure[HttpResponse](()) {
def doCall(): HttpResponse = {
val resp = HttpResponse(status = endpoint.getStatus)
endpoint match {
case SUCCESS => resp.withEntity(endpoint.getBody)
case FORWARDED => resp.withEntity(endpoint.getBody) // cheating
case QUERY_PARAM | QUERY_ENCODED_BOTH | QUERY_ENCODED_QUERY =>
resp.withEntity(uri.queryString().orNull)
case REDIRECT =>
resp.withHeaders(headers.Location(endpoint.getBody))
case ERROR => resp.withEntity(endpoint.getBody)
case EXCEPTION => throw new Exception(endpoint.getBody)
case _ =>
if (path.startsWith("/injected-id/")) {
val groups = path.split('/')
if (groups.size == 4) { // The path starts with a / and has 3 segments
val traceId = AgentTracer.activeSpan().getTraceId
val id = groups(3).toInt
groups(2) match {
case "ping" =>
return HttpResponse(entity = s"pong $id -> $traceId")
case "fing" =>
return HttpResponse(entity = s"fong $id -> $traceId")
case _ =>
}
}
}
HttpResponse(status = NOT_FOUND.getStatus)
.withEntity(NOT_FOUND.getBody)
}
}
}
)
.withDefaultHeaders(defaultHeader)
}
}
def asyncHandler(implicit
ec: ExecutionContext
): HttpRequest => Future[HttpResponse] = { request =>
Future {
syncHandler(request)
}.recover { case e: Exception =>
// Recover from exceptions to return a proper HTTP response instead of a
// failed Future. When the Future fails, the span completion depends on
// async continuation cleanup which can race with the test's trace assertion,
// causing flaky timeouts waiting for the trace to be written.
HttpResponse(status = EXCEPTION.getStatus, entity = e.getMessage)
.withDefaultHeaders(defaultHeader)
}
}
def enableHttp2(serverSettings: ServerSettings): ServerSettings = {
val previewServerSettings =
serverSettings.previewServerSettings.withEnableHttp2(true)
serverSettings.withPreviewServerSettings(previewServerSettings)
}
}