Skip to content

Commit 472850a

Browse files
vladislavsheludchenkovhamnis
authored andcommitted
Makes requests uncancelable when running on Java lower than 16 where cancellation is not supported
1 parent 892c4a3 commit 472850a

7 files changed

Lines changed: 192 additions & 89 deletions

File tree

core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,10 @@ object JdkHttpClient {
178178
): Resource[F, Response[F]] =
179179
Resource
180180
.makeFull { (poll: Poll[F]) =>
181-
(Deferred[F, Unit], poll(responseF)).tupled
181+
val maybeCancelableResponseF =
182+
if (JdkVersion.supportsCancellation) poll(responseF)
183+
else responseF
184+
(Deferred[F, Unit], maybeCancelableResponseF).tupled
182185
} { case (subscription, response) =>
183186
subscription.tryGet.flatMap {
184187
case None =>
@@ -274,7 +277,7 @@ object JdkHttpClient {
274277
F.delay {
275278
val builder = HttpClient.newBuilder()
276279
// workaround for https://github.com/http4s/http4s-jdk-http-client/issues/200
277-
if (Runtime.version().feature() == 11) {
280+
if (JdkVersion.tls13TriggersDeadlock) {
278281
val params = javax.net.ssl.SSLContext.getDefault().getDefaultSSLParameters()
279282
params.setProtocols(params.getProtocols().filter(_ != "TLSv1.3"))
280283
val _ = builder.sslParameters(params)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2019 http4s.org
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.http4s.jdkhttpclient
18+
19+
private object JdkVersion {
20+
val supportsCancellation: Boolean = Runtime.version().feature() >= 16
21+
22+
// see https://github.com/http4s/http4s-jdk-http-client/issues/200
23+
val tls13TriggersDeadlock: Boolean = Runtime.version().feature() == 11
24+
}

core/src/main/scala/org/http4s/jdkhttpclient/JdkWSClient.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,12 @@ object JdkWSClient {
104104
handleReceive(error.asLeft); ()
105105
}
106106
}
107-
webSocket <- poll(
108-
F.fromCompletableFuture(
109-
F.delay(wsBuilder.buildAsync(URI.create(req.uri.renderString), wsListener))
110-
)
107+
webSocketF = F.fromCompletableFuture(
108+
F.delay(wsBuilder.buildAsync(URI.create(req.uri.renderString), wsListener))
111109
)
110+
webSocket <-
111+
if (JdkVersion.supportsCancellation) poll(webSocketF)
112+
else webSocketF
112113
sendSem <- Semaphore[F](1L)
113114
} yield (webSocket, queue, closedDef, sendSem)
114115
} { case (webSocket, queue, _, _) =>

core/src/test/scala/org/http4s/jdkhttpclient/CompletableFutureTerminationTest.scala

Lines changed: 99 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit
3636
import scala.concurrent.duration._
3737

3838
final class CompletableFutureTerminationTest extends CatsEffectSuite {
39+
3940
import CompletableFutureTerminationTest._
4041

4142
private val duration: FiniteDuration =
@@ -65,81 +66,105 @@ final class CompletableFutureTerminationTest extends CatsEffectSuite {
6566
//
6667
// See: https://docs.oracle.com/en/java/javase/14/docs/api/java.net.http/java/net/http/HttpResponse.BodySubscriber.html
6768
test("Terminating an effect generated from a CompletableFuture") {
68-
(Semaphore[IO](1L), Deferred[IO, Observation[HttpResponse[String]]], Semaphore[IO](1L)).tupled
69-
.flatMap { case (stallServer, observation, gotRequest) =>
70-
// Acquire the `stallServer` semaphore so that the server will not
71-
// return _any_ bytes until we release a permit.
72-
stallServer.acquire *>
73-
// Acquire the `gotRequest` semaphore. The server will release this
74-
// once it gets our Request. We wait until this happens to start our
75-
// timeout logic.
76-
gotRequest.acquire *>
77-
// Start a Http4s Server, it will be terminated at the conclusion of
78-
// this test.
79-
stallingServerR[IO](stallServer, gotRequest).use { (server: Server) =>
80-
// Call the server, using the JDK client. We call directly with
81-
// the JDK client because we need to have low level control over
82-
// the result to observe whether or not the
83-
// java.util.concurrent.CompletableFuture is still executing (and
84-
// holding on to resources).
85-
callServer[IO](server).flatMap((cf: CompletableFuture[HttpResponse[String]]) =>
86-
// Attach a handler onto the result. This will populate our
87-
// `observation` Deferred value when the CompletableFuture
88-
// finishes for any reason.
89-
//
90-
// We start executing this in the background, so that we
91-
// asynchronously populate our Observation.
92-
observeCompletableFuture(observation, cf).start.flatMap(fiber =>
93-
// Wait until we are sure the Http4s Server has received the
94-
// request.
95-
gotRequest.acquire *>
96-
// Lift the CompletableFuture to a IO value and attach a
97-
// (short) timeout to the termination.
98-
//
99-
// Important! The IO result _must_ be terminated via the
100-
// timeout _before any bytes_ have been received by the JDK
101-
// HttpClient in order to validate resource safety. Once we
102-
// start getting bytes back, the CompletableFuture _is
103-
// complete_ and we are in a different context.
104-
//
105-
// Notice that we release stallServer _after_ the
106-
// timeout. _This is the crux of this entire test_. Once
107-
// we release `stallServer`, the Http4s Server will
108-
// attempt to send back an Http Response to our JDK
109-
// client. If the CompletableFuture and associated
110-
// resources were properly cleaned up after the
111-
// timeoutTo terminated the running effect, then the JDK
112-
// client connection will either be closed, or the
113-
// attempt to invoke `complete` on the
114-
// `CompletableFuture` will fail, in both cases
115-
// releasing any resources being held. If not, then it
116-
// will still receive bytes, meaning there is a resource
117-
// leak.
118-
IO.fromCompletableFuture(IO(cf))
119-
.void
120-
.timeoutTo(duration, stallServer.release) *>
121-
// After the timeout has triggered, wait for the observation to complete.
122-
fiber.join *>
123-
// Check our observation. Whether or not there is an exception
124-
// is not actually relevant to the success case. What _is_
125-
// important is that there is no result. If there is a result,
126-
// then that means that _after_ `timeoutTo` released
127-
// `stallServer` the CompletableFuture for the Http response
128-
// body still processed data, which indicates a resource leak.
129-
observation.get.flatMap {
130-
case Observation(None, _) => IO.pure(true)
131-
case otherwise =>
132-
IO.raiseError(new AssertionError(s"Expected no result, got $otherwise"))
69+
assume(
70+
JdkVersion.supportsCancellation,
71+
"This test checks cancellation behavior, which was only introduced in JDK 16."
72+
)
73+
74+
JdkHttpClient.defaultHttpClientResource[IO].use { client =>
75+
(Semaphore[IO](1L), Deferred[IO, Observation[HttpResponse[String]]], Semaphore[IO](1L)).tupled
76+
.flatMap { case (stallServer, observation, gotRequest) =>
77+
// Acquire the `stallServer` semaphore so that the server will not
78+
// return _any_ bytes until we release a permit.
79+
stallServer.acquire *>
80+
// Acquire the `gotRequest` semaphore. The server will release this
81+
// once it gets our Request. We wait until this happens to start our
82+
// timeout logic.
83+
gotRequest.acquire *>
84+
// Start a Http4s Server, it will be terminated at the conclusion of
85+
// this test.
86+
Async[IO].bracket(stallingServerR[IO](stallServer, gotRequest).allocated) {
87+
case (server: Server, _) =>
88+
// Call the server, using the JDK client. We call directly with
89+
// the JDK client because we need to have low level control over
90+
// the result to observe whether or not the
91+
// java.util.concurrent.CompletableFuture is still executing (and
92+
// holding on to resources).
93+
callServer[IO](client, server).flatMap(
94+
(cf: CompletableFuture[HttpResponse[String]]) =>
95+
// Attach a handler onto the result. This will populate our
96+
// `observation` Deferred value when the CompletableFuture
97+
// finishes for any reason.
98+
//
99+
// We start executing this in the background, so that we
100+
// asynchronously populate our Observation.
101+
observeCompletableFuture(observation, cf).start.flatMap(fiber =>
102+
// Wait until we are sure the Http4s Server has received the
103+
// request.
104+
gotRequest.acquire *>
105+
// Lift the CompletableFuture to a IO value and attach a
106+
// (short) timeout to the termination.
107+
//
108+
// Important! The IO result _must_ be terminated via the
109+
// timeout _before any bytes_ have been received by the JDK
110+
// HttpClient in order to validate resource safety. Once we
111+
// start getting bytes back, the CompletableFuture _is
112+
// complete_ and we are in a different context.
113+
//
114+
// Notice that we release stallServer _after_ the
115+
// timeout. _This is the crux of this entire test_. Once
116+
// we release `stallServer`, the Http4s Server will
117+
// attempt to send back an Http Response to our JDK
118+
// client. If the CompletableFuture and associated
119+
// resources were properly cleaned up after the
120+
// timeoutTo terminated the running effect, then the JDK
121+
// client connection will either be closed, or the
122+
// attempt to invoke `complete` on the
123+
// `CompletableFuture` will fail, in both cases
124+
// releasing any resources being held. If not, then it
125+
// will still receive bytes, meaning there is a resource
126+
// leak.
127+
IO.fromCompletableFuture(IO(cf))
128+
.void
129+
.timeoutTo(duration, stallServer.release) *>
130+
// After the timeout has triggered, wait for the observation to complete.
131+
fiber.join *>
132+
// Check our observation. Whether or not there is an exception
133+
// is not actually relevant to the success case. What _is_
134+
// important is that there is no result. If there is a result,
135+
// then that means that _after_ `timeoutTo` released
136+
// `stallServer` the CompletableFuture for the Http response
137+
// body still processed data, which indicates a resource leak.
138+
observation.get.flatMap {
139+
case Observation(None, _) => IO.pure(true)
140+
case otherwise =>
141+
IO.raiseError(new AssertionError(s"Expected no result, got $otherwise"))
142+
}
143+
)
144+
)
145+
} { case (_, release) =>
146+
release.timed
147+
.flatMap { case (duration, _) =>
148+
IO {
149+
assert(
150+
clue(duration) < serverTimeout,
151+
"Finalization didn't complete until server shutdown timeout was reached, a connection is likely leaked by the client"
152+
)
133153
}
134-
)
135-
)
136-
}
137-
}
154+
}
155+
156+
}
157+
}
158+
}
138159
}
139160
}
140161

141162
object CompletableFutureTerminationTest {
142163

164+
val a: Boolean = () < 5.seconds
165+
166+
private val serverTimeout = 5.seconds
167+
143168
/** ADT to contain the result of an invocation to
144169
* [[java.util.concurrent.CompletionStage#handleAsync]]
145170
*
@@ -179,14 +204,12 @@ object CompletableFutureTerminationTest {
179204
EmberServerBuilder
180205
.default[F]
181206
.withHttpApp(
182-
Kleisli(
183-
Function.const(
184-
gotRequest.release *>
185-
semaphore.permit.use(_ => F.pure(Response[F]()))
186-
)
207+
Kleisli.liftF(
208+
gotRequest.release *>
209+
semaphore.permit.use(_ => F.pure(Response[F]()))
187210
)
188211
)
189-
.withShutdownTimeout(1.second)
212+
.withShutdownTimeout(serverTimeout)
190213
.withPort(port"0")
191214
.build
192215

@@ -218,11 +241,11 @@ object CompletableFutureTerminationTest {
218241
* in a [[java.util.concurrent.CompletableFuture]].
219242
*/
220243
private def callServer[F[_]](
244+
client: HttpClient,
221245
server: Server
222246
)(implicit F: Sync[F]): F[CompletableFuture[HttpResponse[String]]] =
223247
for {
224248
jURI <- F.catchNonFatal(new URI(server.baseUri.renderString))
225-
client <- F.delay(HttpClient.newHttpClient)
226249
result <- F.delay(
227250
client.sendAsync(HttpRequest.newBuilder(jURI).build(), HttpResponse.BodyHandlers.ofString)
228251
)

core/src/test/scala/org/http4s/jdkhttpclient/DeadlockWorkaround.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@ import javax.net.ssl.SSLHandshakeException
2727
class DeadlockWorkaround extends CatsEffectSuite {
2828

2929
test("fail to connect via TLSv1.3 on Java 11") {
30-
if (Runtime.version().feature() > 11) IO.pure(true)
31-
else
32-
(JdkHttpClient.simple[IO], JdkWSClient.simple[IO]).tupled.use { case (http, ws) =>
33-
def testSSLFailure(r: IO[Unit]) = r.intercept[SSLHandshakeException]
34-
testSSLFailure(http.expect[Unit](uri"https://tls13.1d.pw")) *>
35-
testSSLFailure(ws.connectHighLevel(WSRequest(uri"wss://tls13.1d.pw")).use(_ => IO.unit))
36-
}
30+
assume(
31+
JdkVersion.tls13TriggersDeadlock,
32+
"Test only applies to JDK 11, which has a deadlock issue that is triggered by using TLSv1.3"
33+
)
34+
35+
(JdkHttpClient.simple[IO], JdkWSClient.simple[IO]).tupled.use { case (http, ws) =>
36+
def testSSLFailure(r: IO[Unit]) = r.intercept[SSLHandshakeException]
37+
testSSLFailure(http.expect[Unit](uri"https://tls13.1d.pw")) *>
38+
testSSLFailure(ws.connectHighLevel(WSRequest(uri"wss://tls13.1d.pw")).use(_ => IO.unit))
39+
}
3740
}
3841

3942
}

core/src/test/scala/org/http4s/jdkhttpclient/JdkHttpClientSpec.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ class JdkHttpClientSpec extends ClientRouteTestBattery("JdkHttpClient") {
4242
}
4343

4444
test("timeout request") {
45+
assume(
46+
JdkVersion.supportsCancellation,
47+
"This test checks cancellation behavior, which was only introduced in JDK 16."
48+
)
49+
4550
val address = server().addresses.head
4651
val path = GetRoutes.DelayedPath // 1s delay before response
4752
val uri = Uri.fromString(s"http://$address$path").toOption.get
@@ -54,4 +59,23 @@ class JdkHttpClientSpec extends ClientRouteTestBattery("JdkHttpClient") {
5459
}
5560
}
5661
}
62+
63+
test("uncancelable request") {
64+
assume(
65+
!JdkVersion.supportsCancellation,
66+
"This test checks behavior when cancellation is not available, which is pre JDK 16."
67+
)
68+
69+
val address = server().addresses.head
70+
val path = GetRoutes.DelayedPath // 1s delay before response
71+
val uri = Uri.fromString(s"http://$address$path").toOption.get
72+
val req = Request[IO](uri = uri)
73+
val res = client().expect[String](req)
74+
res.as(false).timeoutTo(100.millis, IO.pure(true)).timed.flatMap { case (duration, result) =>
75+
IO {
76+
assert(clue(duration) >= 1.second)
77+
assert(result)
78+
}
79+
}
80+
}
5781
}

core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,11 @@ class JdkWSClientSpec extends CatsEffectSuite {
203203
}
204204

205205
test("connect timeout") {
206+
assume(
207+
JdkVersion.supportsCancellation,
208+
"This test checks cancellation behavior, which was only introduced in JDK 16."
209+
)
210+
206211
webSocket()
207212
.connectHighLevel(WSRequest(echoServerUri() / "delayed"))
208213
.use_
@@ -217,6 +222,26 @@ class JdkWSClientSpec extends CatsEffectSuite {
217222
}
218223
}
219224

225+
test("uncancelable connect") {
226+
assume(
227+
!JdkVersion.supportsCancellation,
228+
"This test checks behavior when cancellation is not available, which is pre JDK 16."
229+
)
230+
231+
webSocket()
232+
.connectHighLevel(WSRequest(echoServerUri() / "delayed"))
233+
.use_
234+
.as(false)
235+
.timeoutTo(100.millis, IO.pure(true))
236+
.timed
237+
.flatMap { case (duration, result) =>
238+
IO {
239+
assert(clue(duration) >= 1.second)
240+
assert(result)
241+
}
242+
}
243+
}
244+
220245
def httpToWsUri(uri: Uri): Uri = uri.copy(scheme = scheme"ws".some)
221246

222247
}

0 commit comments

Comments
 (0)