Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions src/aleph/http.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
ReadTimeoutException
RequestCancellationException
RequestTimeoutException)
(io.aleph.dirigiste Pools)
(io.aleph.dirigiste IPool Pools)
(io.netty.channel ConnectTimeoutException)
(io.netty.handler.codec Headers)
(io.netty.handler.codec.http HttpHeaders)
(java.net
InetSocketAddress
URI)
(java.util.concurrent TimeoutException)))
(java.util.concurrent Executor TimeoutException)))

(defn start-server
"Starts an HTTP server using the provided Ring `handler`. Returns a server
Expand Down Expand Up @@ -133,6 +133,25 @@
(def ^:no-doc default-response-executor
(flow/utilization-executor 0.9 256 {:onto? false}))

;; Wrapper record to associate a response-executor with a connection pool.
;; This enables the `request` function to use the pool's executor for response deferreds.
(defrecord ConnectionPool [^IPool pool ^Executor response-executor]
IPool
(acquire [_ k callback]
(.acquire pool k callback))
(release [_ k obj]
(.release pool k obj))
(dispose [_ k obj]
(.dispose pool k obj))
(shutdown [_]
(.shutdown pool))

java.io.Closeable
(close [_]
(.shutdown pool)))



(defn connection-pool
"Returns a connection pool which can be used as an argument in `request`.

Expand Down Expand Up @@ -251,12 +270,13 @@

true
(update :ssl-context #(client/ssl-context % http-versions insecure?)))
response-executor' (:response-executor connection-options)
p (promise)
create-pool-fn (or pool-builder-fn
flow/instrumented-pool)
create-pool-ctrl-fn (or pool-controller-builder-fn
#(Pools/utilizationController target-utilization connections-per-host total-connections))
pool (create-pool-fn
raw-pool (create-pool-fn
{:generate (fn [host]
(let [c (promise)
conn (create-connection
Expand All @@ -273,7 +293,10 @@
:control-period control-period
:max-queue-size max-queue-size
:controller (create-pool-ctrl-fn)
:stats-callback stats-callback})]
:stats-callback stats-callback})
pool (if response-executor'
(->ConnectionPool raw-pool response-executor')
raw-pool)]
@(deliver p pool)))

(def ^:no-doc default-connection-pool
Expand Down Expand Up @@ -384,13 +407,15 @@
request-timeout
read-timeout]
:or {pool default-connection-pool
response-executor default-response-executor
middleware identity
connection-timeout aleph.netty/default-connect-timeout}
:as req}]
(let [dispose-conn! (atom (fn []))
result (d/deferred response-executor)
response (executor/with-executor response-executor
(let [response-executor' (or response-executor
(:response-executor pool)
default-response-executor)
dispose-conn! (atom (fn []))
result (d/deferred response-executor')
response (executor/with-executor response-executor'
((middleware
(fn [req]
(let [k (client/req->domain req)
Expand Down
16 changes: 16 additions & 0 deletions test/aleph/http_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,22 @@
(let [rsp (http-get "/" {:connection-pool pool})]
(is (= http/default-response-executor (.executor rsp))))))))

(deftest test-connection-pool-response-executor
(let [pool-executor (java.util.concurrent.Executors/newFixedThreadPool 1)
pool (http/connection-pool
{:connection-options
{:response-executor pool-executor
:insecure? true}})]
(try
(with-both-handlers basic-handler
(let [rsp (http-get "/string" {:pool pool})]
;; Verify that the response deferred uses the custom pool executor
(is (= pool-executor (.executor rsp))
"Response deferred should use the connection pool's response-executor")))
(finally
(.shutdown pool-executor)
(.shutdown pool)))))

(deftest test-trace-request-omitted-body
(with-handler echo-handler
(is (= "" (-> @(http-trace "/" {:body "REQUEST"})
Expand Down