Skip to content

Commit 93df4d0

Browse files
foguspuredanger
authored andcommitted
AYNC-272: Restricted vthreads use to io-thread. go again uses IOC. Removed sysprops around vthreads. go block dispatch uses :mixed instead of :io workload profile. ExecutorService -> Executor and fixed flow to use this. Removed use of explicit ExecutorService to create vthreads and use the one-shot in a reify instance instead. Removed ability to call parking ops in io-thread. Modified docs.
1 parent 6b47312 commit 93df4d0

File tree

5 files changed

+61
-27
lines changed

5 files changed

+61
-27
lines changed

src/main/clojure/clojure/core/async.clj

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ go block threads - use Thread.setDefaultUncaughtExceptionHandler()
1717
to catch and handle.
1818
1919
Use the Java system property `clojure.core.async.executor-factory`
20-
to specify a function that will provide ExecutorServices for
20+
to specify a function that will provide an Executor instance for
2121
application-wide use by core.async in lieu of its defaults. The
2222
property value should name a fully qualified var. The function
2323
will be passed a keyword indicating the context of use of the
24-
executor, and should return either an ExecutorService, or nil to
24+
executor, and should return either an Executor, or nil to
2525
use the default. Results per keyword will be cached and used for
2626
the remainder of the application. Possible context arguments are:
2727
@@ -36,7 +36,7 @@ flow/process
3636
3737
:core-async-dispatch - used for completion fn handling (e.g. in put!
3838
and take!, as well as go block IOC thunk processing) throughout
39-
core.async. If not supplied the ExecutorService for :io will be
39+
core.async. If not supplied, the Executor for :io will be
4040
used instead.
4141
4242
The set of contexts may grow in the future so the function should
@@ -49,14 +49,23 @@ return nil for unexpected contexts."
4949
[clojure.core.async.impl.timers :as timers]
5050
[clojure.core.async.impl.dispatch :as dispatch]
5151
[clojure.core.async.impl.ioc-macros :as ioc]
52-
clojure.core.async.impl.go ;; TODO: make conditional
5352
[clojure.core.async.impl.mutex :as mutex]
5453
)
5554
(:import [java.util.concurrent.atomic AtomicLong]
5655
[java.util.concurrent.locks Lock]
5756
[java.util.concurrent ThreadLocalRandom]
5857
[java.util Arrays ArrayList]))
5958

59+
(defn- at-least-clojure-version?
60+
[[maj min incr]]
61+
(let [{:keys [major minor incremental]} *clojure-version*]
62+
(not (neg? (compare [major minor incremental] [maj min incr])))))
63+
64+
(def ^:private lazy-loading-supported? (at-least-clojure-version? [1 12 3]))
65+
66+
(when-not lazy-loading-supported?
67+
(require 'clojure.core.async.impl.go))
68+
6069
(alias 'core 'clojure.core)
6170

6271
(set! *warn-on-reflection* false)
@@ -485,9 +494,13 @@ return nil for unexpected contexts."
485494
core.async blocking ops (those ending in !!) and other blocking IO.
486495
487496
Returns a channel which will receive the result of the body when
488-
completed"
497+
completed.
498+
499+
The resources associated with a go block may be reclaimed, and the block
500+
never resumed, when the channels with which it interacts are no longer
501+
referenced (outside of the go block)."
489502
[& body]
490-
(#'clojure.core.async.impl.go/go-impl &env body))
503+
((requiring-resolve 'clojure.core.async.impl.go/go-impl) &env body))
491504

492505
(defonce ^:private thread-macro-executor nil)
493506

@@ -524,7 +537,12 @@ return nil for unexpected contexts."
524537
"Executes the body in a thread, returning immediately to the calling
525538
thread. The body may do blocking I/O but must not do extended computation.
526539
Returns a channel which will receive the result of the body when completed,
527-
then close."
540+
then close.
541+
542+
Will dispatch to a virtual thread if available in the runtime JVM, and an
543+
ordinary thread if not. Like 'thread's (and unlike 'go' blocks) 'io-thread's
544+
must terminate ordinarily, and will keep referenced resources alive
545+
until they do."
528546
[& body]
529547
`(thread-call (^:once fn* [] ~@body) :io))
530548

src/main/clojure/clojure/core/async/flow.clj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@
9898
async/sliding-buffer of size 100, thus signals not handled in a
9999
timely manner will be dropped in favor of later arriving signals.
100100
101-
:mixed-exec/:io-exec/:compute-exec -> ExecutorService
102-
These can be used to specify the ExecutorService to use for the
101+
:mixed-exec/:io-exec/:compute-exec -> Executor
102+
These can be used to specify the Executor to use for the
103103
corresonding workload, in lieu of the lib defaults.
104104
105105
N.B. The flow is not started. See 'start'"
@@ -334,7 +334,7 @@
334334
335335
futurize accepts kwarg options:
336336
:exec - one of the workloads :mixed, :io, :compute
337-
or a j.u.c.ExecutorService object,
337+
or a j.u.c.Executor object,
338338
default :mixed"
339339
[f & {:keys [exec]
340340
:or {exec :mixed} :as opts}]

src/main/clojure/clojure/core/async/flow/impl.clj

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,24 @@
1414
[clojure.core.async.impl.dispatch :as disp]
1515
[clojure.walk :as walk]
1616
[clojure.datafy :as datafy])
17-
(:import [java.util.concurrent Future Executors ExecutorService TimeUnit]
17+
(:import [java.util.concurrent Future Executors Executor TimeUnit]
1818
[java.util.concurrent.locks ReentrantLock]))
1919

2020
(set! *warn-on-reflection* true)
2121

2222
(defn datafy [x]
2323
(condp instance? x
2424
clojure.lang.Fn (-> x str symbol)
25-
ExecutorService (str x)
25+
Executor (str x)
2626
clojure.lang.Var (symbol x)
2727
(datafy/datafy x)))
2828

2929
(defn futurize [f {:keys [exec]}]
3030
(fn [& args]
31-
(let [^ExecutorService e (if (instance? ExecutorService exec)
32-
exec
33-
(disp/executor-for exec))]
34-
(.submit e ^Callable #(apply f args)))))
31+
(let [^Executor e (if (instance? Executor exec)
32+
exec
33+
(disp/executor-for exec))]
34+
(.execute e ^Runnable #(apply f args)))))
3535

3636
(defn prep-proc [ret pid {:keys [proc, args, chan-opts] :or {chan-opts {}}}]
3737
(let [{:keys [ins outs signal-select]} (spi/describe proc)
@@ -51,8 +51,8 @@
5151
(let [lock (ReentrantLock.)
5252
chans (atom nil)
5353
execs {:mixed mixed-exec :io io-exec :compute compute-exec}
54-
_ (assert (every? #(or (nil? %) (instance? ExecutorService %)) (vals execs))
55-
"mixed-exe, io-exec and compute-exec must be ExecutorServices")
54+
_ (assert (every? #(or (nil? %) (instance? Executor %)) (vals execs))
55+
"mixed-exe, io-exec and compute-exec must be Executors")
5656
pdescs (reduce-kv prep-proc {} procs)
5757
allopts (fn [iok] (into {} (mapcat #(map (fn [[k opts]] [[(:pid %) k] opts]) (iok %)) (vals pdescs))))
5858
inopts (allopts :ins)

src/main/clojure/clojure/core/async/flow/spi.clj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282
N.B. outputs may be nil if not connected
8383
:resolver - an impl of spi/Resolver, which can be used to find
8484
channels given their logical [pid cid] coordinates, as well as to
85-
obtain ExecutorServices corresponding to the
85+
obtain Executors corresponding to the
8686
logical :mixed/:io/:compute contexts"))
8787

8888
(defprotocol Resolver
@@ -91,5 +91,5 @@
9191
write to or nil (in which case the output should be dropped,
9292
e.g. nothing is connected).")
9393
(get-exec [_ context]
94-
"returns the ExecutorService for the given context, one
94+
"returns the Executor for the given context, one
9595
of :mixed, :io, :compute"))

src/main/clojure/clojure/core/async/impl/dispatch.clj

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
(ns ^{:skip-wiki true}
1010
clojure.core.async.impl.dispatch
11-
(:import [java.util.concurrent Executors ExecutorService ThreadFactory]))
11+
(:import [java.util.concurrent Executors Executor ThreadFactory]))
1212

1313
(set! *warn-on-reflection* true)
1414

@@ -72,31 +72,47 @@
7272
[workload]
7373
(Executors/newCachedThreadPool (counted-thread-factory (str "async-" (name workload) "-%d") true)))
7474

75+
(def virtual-threads-available?
76+
(try
77+
(Class/forName "java.lang.Thread$Builder$OfVirtual")
78+
true
79+
(catch ClassNotFoundException _
80+
false)))
81+
82+
(defn- make-io-executor
83+
[]
84+
(if virtual-threads-available?
85+
(let [svt (.getDeclaredMethod Thread "startVirtualThread" (into-array Class [Runnable]))]
86+
(reify Executor
87+
(execute [_ r]
88+
(.invoke svt nil (object-array [r])))))
89+
(make-ctp-named :io)))
90+
7591
(defn ^:private create-default-executor
7692
[workload]
7793
(case workload
7894
:compute (make-ctp-named :compute)
79-
:io (make-ctp-named :io)
95+
:io (make-io-executor)
8096
:mixed (make-ctp-named :mixed)))
8197

8298
(def executor-for
83-
"Given a workload tag, returns an ExecutorService instance and memoizes the result. By
99+
"Given a workload tag, returns an Executor instance and memoizes the result. By
84100
default, core.async will defer to a user factory (if provided via sys prop) or construct
85-
a specialized ExecutorService instance for each tag :io, :compute, and :mixed. When
101+
a specialized Executor instance for each tag :io, :compute, and :mixed. When
86102
given the tag :core-async-dispatch it will default to the executor service for :io."
87103
(memoize
88-
(fn ^ExecutorService [workload]
104+
(fn ^Executor [workload]
89105
(let [sysprop-factory (when-let [esf (System/getProperty "clojure.core.async.executor-factory")]
90106
(requiring-resolve (symbol esf)))
91107
sp-exec (and sysprop-factory (sysprop-factory workload))]
92108
(or sp-exec
93109
(if (= workload :core-async-dispatch)
94-
(executor-for :io)
110+
(executor-for :mixed)
95111
(create-default-executor workload)))))))
96112

97113
(defn exec
98114
[^Runnable r workload]
99-
(let [^ExecutorService e (executor-for workload)]
115+
(let [^Executor e (executor-for workload)]
100116
(.execute e r)))
101117

102118
(defn run

0 commit comments

Comments
 (0)