diff --git a/src/aleph/http.clj b/src/aleph/http.clj index 02f8e764..0c7f8658 100644 --- a/src/aleph/http.clj +++ b/src/aleph/http.clj @@ -23,14 +23,14 @@ ReadTimeoutException RequestCancellationException RequestTimeoutException) - (io.aleph.dirigiste Pools) + (io.aleph.dirigiste IPool Pools) (io.netty.channel ConnectTimeoutException) (io.netty.handler.codec Headers) (io.netty.handler.codec.http HttpHeaders) (java.net InetSocketAddress URI) - (java.util.concurrent TimeoutException))) + (java.util.concurrent Executor TimeoutException))) (defn start-server "Starts an HTTP server using the provided Ring `handler`. Returns a server @@ -133,6 +133,25 @@ (def ^:no-doc default-response-executor (flow/utilization-executor 0.9 256 {:onto? false})) +;; Wrapper record to associate a response-executor with a connection pool. +;; This enables the `request` function to use the pool's executor for response deferreds. +(defrecord ConnectionPool [^IPool pool ^Executor response-executor] + IPool + (acquire [_ k callback] + (.acquire pool k callback)) + (release [_ k obj] + (.release pool k obj)) + (dispose [_ k obj] + (.dispose pool k obj)) + (shutdown [_] + (.shutdown pool)) + + java.io.Closeable + (close [_] + (.shutdown pool))) + + + (defn connection-pool "Returns a connection pool which can be used as an argument in `request`. @@ -251,12 +270,13 @@ true (update :ssl-context #(client/ssl-context % http-versions insecure?))) + response-executor' (:response-executor connection-options) p (promise) create-pool-fn (or pool-builder-fn flow/instrumented-pool) create-pool-ctrl-fn (or pool-controller-builder-fn #(Pools/utilizationController target-utilization connections-per-host total-connections)) - pool (create-pool-fn + raw-pool (create-pool-fn {:generate (fn [host] (let [c (promise) conn (create-connection @@ -273,7 +293,10 @@ :control-period control-period :max-queue-size max-queue-size :controller (create-pool-ctrl-fn) - :stats-callback stats-callback})] + :stats-callback stats-callback}) + pool (if response-executor' + (->ConnectionPool raw-pool response-executor') + raw-pool)] @(deliver p pool))) (def ^:no-doc default-connection-pool @@ -384,13 +407,15 @@ request-timeout read-timeout] :or {pool default-connection-pool - response-executor default-response-executor middleware identity connection-timeout aleph.netty/default-connect-timeout} :as req}] - (let [dispose-conn! (atom (fn [])) - result (d/deferred response-executor) - response (executor/with-executor response-executor + (let [response-executor' (or response-executor + (:response-executor pool) + default-response-executor) + dispose-conn! (atom (fn [])) + result (d/deferred response-executor') + response (executor/with-executor response-executor' ((middleware (fn [req] (let [k (client/req->domain req) diff --git a/test/aleph/http_test.clj b/test/aleph/http_test.clj index 9cf29308..e72ee635 100644 --- a/test/aleph/http_test.clj +++ b/test/aleph/http_test.clj @@ -757,6 +757,22 @@ (let [rsp (http-get "/" {:connection-pool pool})] (is (= http/default-response-executor (.executor rsp)))))))) +(deftest test-connection-pool-response-executor + (let [pool-executor (java.util.concurrent.Executors/newFixedThreadPool 1) + pool (http/connection-pool + {:connection-options + {:response-executor pool-executor + :insecure? true}})] + (try + (with-both-handlers basic-handler + (let [rsp (http-get "/string" {:pool pool})] + ;; Verify that the response deferred uses the custom pool executor + (is (= pool-executor (.executor rsp)) + "Response deferred should use the connection pool's response-executor"))) + (finally + (.shutdown pool-executor) + (.shutdown pool))))) + (deftest test-trace-request-omitted-body (with-handler echo-handler (is (= "" (-> @(http-trace "/" {:body "REQUEST"})