Skip to content

Commit e32ad71

Browse files
Wire up response-executor option in connection-pool (fixes #773)
- Add PoolWithExecutor record to associate response-executor with pool - Update connection-pool to return wrapped pool when response-executor is provided - Update request function to extract executor from pool wrapper - Add test to verify executor affinity correctly
1 parent 40b3fc4 commit e32ad71

3 files changed

Lines changed: 58 additions & 14 deletions

File tree

project.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
:description "A framework for asynchronous communication"
88
:url "https://github.com/clj-commons/aleph"
99
:license {:name "MIT License"}
10-
:dependencies [[org.clojure/clojure "1.12.2"]
10+
:dependencies [[org.clojure/clojure "1.12.0"]
1111
[org.clojure/tools.logging "1.3.0" :exclusions [org.clojure/clojure]]
1212
[manifold "0.4.3" :exclusions [org.clojure/tools.logging]]
1313
[org.clj-commons/byte-streams "0.3.4"]

src/aleph/http.clj

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@
2323
ReadTimeoutException
2424
RequestCancellationException
2525
RequestTimeoutException)
26-
(io.aleph.dirigiste Pools)
26+
(io.aleph.dirigiste IPool Pools)
2727
(io.netty.channel ConnectTimeoutException)
2828
(io.netty.handler.codec Headers)
2929
(io.netty.handler.codec.http HttpHeaders)
3030
(java.net
3131
InetSocketAddress
3232
URI)
33-
(java.util.concurrent TimeoutException)))
33+
(java.util.concurrent Executor TimeoutException)))
3434

3535
(defn start-server
3636
"Starts an HTTP server using the provided Ring `handler`. Returns a server
@@ -133,6 +133,29 @@
133133
(def ^:no-doc default-response-executor
134134
(flow/utilization-executor 0.9 256 {:onto? false}))
135135

136+
;; Wrapper record to associate a response-executor with a connection pool.
137+
;; This enables the `request` function to use the pool's executor for response deferreds.
138+
(defrecord PoolWithExecutor [^IPool pool ^Executor response-executor]
139+
java.io.Closeable
140+
(close [_] (.shutdown pool))
141+
142+
IPool
143+
(shutdown [_] (.shutdown pool)))
144+
145+
(defn- get-pool
146+
"Returns the underlying IPool from a pool or PoolWithExecutor."
147+
^IPool [pool]
148+
(if (instance? PoolWithExecutor pool)
149+
(:pool pool)
150+
pool))
151+
152+
(defn- get-response-executor
153+
"Returns the response-executor from a PoolWithExecutor, or the default executor."
154+
[pool]
155+
(if (instance? PoolWithExecutor pool)
156+
(:response-executor pool)
157+
default-response-executor))
158+
136159
(defn connection-pool
137160
"Returns a connection pool which can be used as an argument in `request`.
138161
@@ -251,19 +274,20 @@
251274

252275
true
253276
(update :ssl-context #(client/ssl-context % http-versions insecure?)))
277+
response-executor' (:response-executor connection-options)
254278
p (promise)
255279
create-pool-fn (or pool-builder-fn
256280
flow/instrumented-pool)
257281
create-pool-ctrl-fn (or pool-controller-builder-fn
258282
#(Pools/utilizationController target-utilization connections-per-host total-connections))
259-
pool (create-pool-fn
283+
raw-pool (create-pool-fn
260284
{:generate (fn [host]
261285
(let [c (promise)
262286
conn (create-connection
263287
host
264288
conn-options'
265289
middleware
266-
#(flow/dispose @p host [@c]))]
290+
#(flow/dispose (get-pool @p) host [@c]))]
267291
(deliver c conn)
268292
[conn]))
269293
:destroy (fn [_ c]
@@ -273,7 +297,10 @@
273297
:control-period control-period
274298
:max-queue-size max-queue-size
275299
:controller (create-pool-ctrl-fn)
276-
:stats-callback stats-callback})]
300+
:stats-callback stats-callback})
301+
pool (if response-executor'
302+
(->PoolWithExecutor raw-pool response-executor')
303+
raw-pool)]
277304
@(deliver p pool)))
278305

279306
(def ^:no-doc default-connection-pool
@@ -384,20 +411,21 @@
384411
request-timeout
385412
read-timeout]
386413
:or {pool default-connection-pool
387-
response-executor default-response-executor
388414
middleware identity
389415
connection-timeout aleph.netty/default-connect-timeout}
390416
:as req}]
391-
(let [dispose-conn! (atom (fn []))
392-
result (d/deferred response-executor)
393-
response (executor/with-executor response-executor
417+
(let [response-executor' (or response-executor (get-response-executor pool))
418+
raw-pool (get-pool pool)
419+
dispose-conn! (atom (fn []))
420+
result (d/deferred response-executor')
421+
response (executor/with-executor response-executor'
394422
((middleware
395423
(fn [req]
396424
(let [k (client/req->domain req)
397425
start (System/currentTimeMillis)]
398426

399427
;; acquire a connection
400-
(-> (flow/acquire pool k)
428+
(-> (flow/acquire raw-pool k)
401429
(maybe-timeout! pool-timeout)
402430

403431
;; pool timeout triggered
@@ -410,7 +438,7 @@
410438
;; NOTE: All error handlers below delegate disposal of the
411439
;; connection to the error handler on `result` which uses this
412440
;; function.
413-
(reset! dispose-conn! (fn [] (flow/dispose pool k conn)))
441+
(reset! dispose-conn! (fn [] (flow/dispose raw-pool k conn)))
414442

415443
(if (realized? result)
416444
;; to account for race condition between setting `dispose-conn!`
@@ -478,8 +506,8 @@
478506
(<= 400 (:status rsp)))
479507
(do
480508
(log/trace "Connection finished. Disposing...")
481-
(flow/dispose pool k conn))
482-
(flow/release pool k conn)))))
509+
(flow/dispose raw-pool k conn))
510+
(flow/release raw-pool k conn)))))
483511
(-> rsp
484512
(dissoc :aleph/destroy-conn?)
485513
(assoc :connection-time (- end start)))))))))

test/aleph/http_test.clj

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,22 @@
757757
(let [rsp (http-get "/" {:connection-pool pool})]
758758
(is (= http/default-response-executor (.executor rsp))))))))
759759

760+
(deftest test-connection-pool-response-executor
761+
(let [pool-executor (java.util.concurrent.Executors/newFixedThreadPool 1)
762+
pool (http/connection-pool
763+
{:connection-options
764+
{:response-executor pool-executor
765+
:insecure? true}})]
766+
(try
767+
(with-both-handlers basic-handler
768+
(let [rsp (http-get "/string" {:pool pool})]
769+
;; Verify that the response deferred uses the custom pool executor
770+
(is (= pool-executor (.executor rsp))
771+
"Response deferred should use the connection pool's response-executor")))
772+
(finally
773+
(.shutdown pool-executor)
774+
(.shutdown pool)))))
775+
760776
(deftest test-trace-request-omitted-body
761777
(with-handler echo-handler
762778
(is (= "" (-> @(http-trace "/" {:body "REQUEST"})

0 commit comments

Comments
 (0)