Skip to content

Commit aa87e43

Browse files
committed
ASYNC-275: Changed futurize return Future from CompletableFuture to FutureTask to match the same type returned from ExecutorService.submit. Added a regression test to catch this case.
1 parent 76c2b26 commit aa87e43

File tree

2 files changed

+19
-11
lines changed

2 files changed

+19
-11
lines changed

src/main/clojure/clojure/core/async/flow/impl.clj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
[clojure.core.async.impl.dispatch :as disp]
1515
[clojure.walk :as walk]
1616
[clojure.datafy :as datafy])
17-
(:import [java.util.concurrent Future Executor TimeUnit CompletableFuture]
17+
(:import [java.util.concurrent Future Executor TimeUnit FutureTask]
1818
[java.util.concurrent.locks ReentrantLock]))
1919

2020
(set! *warn-on-reflection* true)
@@ -31,8 +31,8 @@
3131
(let [^Executor e (if (instance? Executor exec)
3232
exec
3333
(disp/executor-for exec))
34-
fut (CompletableFuture.)]
35-
(.execute e #(.complete fut (apply f args)))
34+
fut (FutureTask. #(apply f args))]
35+
(.execute e fut)
3636
fut)))
3737

3838
(defn prep-proc [ret pid {:keys [proc, args, chan-opts] :or {chan-opts {}}}]

src/test/clojure/clojure/core/async/flow_test.clj

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,28 @@
55
;; By using this software in any fashion, you are agreeing to be bound by
66
;; the terms of this license.
77
;; You must not remove this notice, or any other, from this software.
8-
98
(ns clojure.core.async.flow-test
109
(:require [clojure.test :refer :all]
11-
[clojure.core.async.flow :as flow]))
10+
[clojure.core.async.flow :as flow])
11+
(:import [java.util.concurrent TimeUnit Future ExecutionException]))
12+
13+
(set! *warn-on-reflection* true)
1214

1315
(deftest test-futurize
14-
(testing ""
16+
(testing "Happy path use of futurize"
1517
(let [in-es? (atom false)
1618
es (reify java.util.concurrent.Executor
17-
(^void execute [_ ^Runnable f]
18-
(reset! in-es? true)
19-
(future-call f)))]
19+
(^void execute [_ ^Runnable r]
20+
(reset! in-es? true)
21+
(future (.run r))))]
2022
(is (= 16 @((flow/futurize #(* % %) {:exec :mixed}) 4)))
2123
(is (= 16 @((flow/futurize #(* % %)) 4)))
2224
(is (= 16 @((flow/futurize #(* % %) {:exec es}) 4)))
23-
(is @in-es?))))
24-
25+
(is @in-es?)))
26+
(testing "ASYNC-275 regression: futurize Future instance propagates exceptions"
27+
(let [cause (ex-info "boom" {})
28+
fut ((flow/futurize (fn [] (throw cause)) {:exec :mixed}))
29+
ex (try
30+
(.get ^Future fut 1000 TimeUnit/MILLISECONDS)
31+
(catch ExecutionException e e))]
32+
(is (= cause (.getCause ^ExecutionException ex))))))

0 commit comments

Comments
 (0)