Skip to content

Commit 0dac2ee

Browse files
committed
AYNC-272: 2: Restricted vthreads use to io-thread. go again uses IOC. Removed sysprops around vthreads. go block dispatch uses :mixed instead of :io workload profile. ExecutorService -> Executor and fixed flow to use this. Removed use of explicit ExecutorService to create vthreads and use the one-shot in a reify instance instead. Modified docs.
1 parent 85366ba commit 0dac2ee

5 files changed

Lines changed: 48 additions & 23 deletions

File tree

src/main/clojure/clojure/core/async.clj

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ go block threads - use Thread.setDefaultUncaughtExceptionHandler()
1717
to catch and handle.
1818
1919
Use the Java system property `clojure.core.async.executor-factory`
20-
to specify a function that will provide ExecutorServices for
20+
to specify a function that will provide an Executor instance for
2121
application-wide use by core.async in lieu of its defaults. The
2222
property value should name a fully qualified var. The function
2323
will be passed a keyword indicating the context of use of the
24-
executor, and should return either an ExecutorService, or nil to
24+
executor, and should return either an Executor, or nil to
2525
use the default. Results per keyword will be cached and used for
2626
the remainder of the application. Possible context arguments are:
2727
@@ -36,7 +36,7 @@ flow/process
3636
3737
:core-async-dispatch - used for completion fn handling (e.g. in put!
3838
and take!, as well as go block IOC thunk processing) throughout
39-
core.async. If not supplied the ExecutorService for :io will be
39+
core.async. If not supplied, the Executor for :io will be
4040
used instead.
4141
4242
The set of contexts may grow in the future so the function should
@@ -487,7 +487,7 @@ return nil for unexpected contexts."
487487
Returns a channel which will receive the result of the body when
488488
completed"
489489
[& body]
490-
(#'clojure.core.async.impl.go/go-impl &env body))
490+
((requiring-resolve 'clojure.core.async.impl.go/go-impl) &env body))
491491

492492
(defonce ^:private thread-macro-executor nil)
493493

src/main/clojure/clojure/core/async/flow.clj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@
9898
async/sliding-buffer of size 100, thus signals not handled in a
9999
timely manner will be dropped in favor of later arriving signals.
100100
101-
:mixed-exec/:io-exec/:compute-exec -> ExecutorService
102-
These can be used to specify the ExecutorService to use for the
101+
:mixed-exec/:io-exec/:compute-exec -> Executor
102+
These can be used to specify the Executor to use for the
103103
corresonding workload, in lieu of the lib defaults.
104104
105105
N.B. The flow is not started. See 'start'"
@@ -334,7 +334,7 @@
334334
335335
futurize accepts kwarg options:
336336
:exec - one of the workloads :mixed, :io, :compute
337-
or a j.u.c.ExecutorService object,
337+
or a j.u.c.Executor object,
338338
default :mixed"
339339
[f & {:keys [exec]
340340
:or {exec :mixed} :as opts}]

src/main/clojure/clojure/core/async/flow/impl.clj

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,24 @@
1414
[clojure.core.async.impl.dispatch :as disp]
1515
[clojure.walk :as walk]
1616
[clojure.datafy :as datafy])
17-
(:import [java.util.concurrent Future Executors ExecutorService TimeUnit]
17+
(:import [java.util.concurrent Future Executors Executor TimeUnit]
1818
[java.util.concurrent.locks ReentrantLock]))
1919

2020
(set! *warn-on-reflection* true)
2121

2222
(defn datafy [x]
2323
(condp instance? x
2424
clojure.lang.Fn (-> x str symbol)
25-
ExecutorService (str x)
25+
Executor (str x)
2626
clojure.lang.Var (symbol x)
2727
(datafy/datafy x)))
2828

2929
(defn futurize [f {:keys [exec]}]
3030
(fn [& args]
31-
(let [^ExecutorService e (if (instance? ExecutorService exec)
32-
exec
33-
(disp/executor-for exec))]
34-
(.submit e ^Callable #(apply f args)))))
31+
(let [^Executor e (if (instance? Executor exec)
32+
exec
33+
(disp/executor-for exec))]
34+
(.execute e ^Runnable #(apply f args)))))
3535

3636
(defn prep-proc [ret pid {:keys [proc, args, chan-opts] :or {chan-opts {}}}]
3737
(let [{:keys [ins outs signal-select]} (spi/describe proc)
@@ -51,8 +51,8 @@
5151
(let [lock (ReentrantLock.)
5252
chans (atom nil)
5353
execs {:mixed mixed-exec :io io-exec :compute compute-exec}
54-
_ (assert (every? #(or (nil? %) (instance? ExecutorService %)) (vals execs))
55-
"mixed-exe, io-exec and compute-exec must be ExecutorServices")
54+
_ (assert (every? #(or (nil? %) (instance? Executor %)) (vals execs))
55+
"mixed-exe, io-exec and compute-exec must be Executors")
5656
pdescs (reduce-kv prep-proc {} procs)
5757
allopts (fn [iok] (into {} (mapcat #(map (fn [[k opts]] [[(:pid %) k] opts]) (iok %)) (vals pdescs))))
5858
inopts (allopts :ins)

src/main/clojure/clojure/core/async/flow/spi.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282
N.B. outputs may be nil if not connected
8383
:resolver - an impl of spi/Resolver, which can be used to find
8484
channels given their logical [pid cid] coordinates, as well as to
85-
obtain ExecutorServices corresponding to the
85+
obtain Executors corresponding to the
8686
logical :mixed/:io/:compute contexts"))
8787

8888
(defprotocol Resolver
@@ -91,5 +91,5 @@
9191
write to or nil (in which case the output should be dropped,
9292
e.g. nothing is connected).")
9393
(get-exec [_ context]
94-
"returns the ExecutorService for the given context, one
94+
"returns the Executor for the given context, one
9595
of :mixed, :io, :compute"))

src/main/clojure/clojure/core/async/impl/dispatch.clj

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
(ns ^{:skip-wiki true}
1010
clojure.core.async.impl.dispatch
11-
(:import [java.util.concurrent Executors ExecutorService ThreadFactory]))
11+
(:import [java.util.concurrent Executors Executor ThreadFactory]))
1212

1313
(set! *warn-on-reflection* true)
1414

@@ -72,6 +72,31 @@
7272
[workload]
7373
(Executors/newCachedThreadPool (counted-thread-factory (str "async-" (name workload) "-%d") true)))
7474

75+
(def virtual-threads-available?
76+
(try
77+
(Class/forName "java.lang.Thread$Builder$OfVirtual")
78+
true
79+
(catch ClassNotFoundException _
80+
false)))
81+
82+
(def ^:private virtual-thread?
83+
(if virtual-threads-available?
84+
(eval `(fn [^Thread t#] (~'.isVirtual t#)))
85+
(constantly false)))
86+
87+
(defn in-vthread? []
88+
(and virtual-threads-available?
89+
(virtual-thread? (Thread/currentThread))))
90+
91+
(defn- make-io-executor
92+
[]
93+
(if virtual-threads-available?
94+
(let [svt (.getDeclaredMethod Thread "startVirtualThread" (into-array Class [Runnable]))]
95+
(reify Executor
96+
(execute [_ r]
97+
(.invoke svt nil (object-array [r])))))
98+
(make-ctp-named :io)))
99+
75100
(defn ^:private create-default-executor
76101
[workload]
77102
(case workload
@@ -80,23 +105,23 @@
80105
:mixed (make-ctp-named :mixed)))
81106

82107
(def executor-for
83-
"Given a workload tag, returns an ExecutorService instance and memoizes the result. By
108+
"Given a workload tag, returns an Executor instance and memoizes the result. By
84109
default, core.async will defer to a user factory (if provided via sys prop) or construct
85-
a specialized ExecutorService instance for each tag :io, :compute, and :mixed. When
110+
a specialized Executor instance for each tag :io, :compute, and :mixed. When
86111
given the tag :core-async-dispatch it will default to the executor service for :io."
87112
(memoize
88-
(fn ^ExecutorService [workload]
113+
(fn ^Executor [workload]
89114
(let [sysprop-factory (when-let [esf (System/getProperty "clojure.core.async.executor-factory")]
90115
(requiring-resolve (symbol esf)))
91116
sp-exec (and sysprop-factory (sysprop-factory workload))]
92117
(or sp-exec
93118
(if (= workload :core-async-dispatch)
94-
(executor-for :io)
119+
(executor-for :mixed)
95120
(create-default-executor workload)))))))
96121

97122
(defn exec
98123
[^Runnable r workload]
99-
(let [^ExecutorService e (executor-for workload)]
124+
(let [^Executor e (executor-for workload)]
100125
(.execute e r)))
101126

102127
(defn run

0 commit comments

Comments
 (0)