Skip to content

Commit 6b47312

Browse files
committed
Revert "ASYNC-262: Added Java system property clojure.core.async.vthreads to control how core.async uses JDK 21+ virtual threads. When the user wishes to target vthreads then core.async will use a vthreads ExecutorService for :io workloads, including the io-thread macro. Also, the go macro will use io-thread as its basis instead of the IOC machinery. Also added a runtime check for vthreads in the case where AOT compiles to vthreads. Finally, added requiring-resolve use in the go macro to load the IOC machinery in the go macro when needed and also when supported by the CLJ version. As a backup to this, added a check at the c.a top level to load the ioc support if the CLJ version doesn't allow nested compilation patterns."
This reverts commit 634d6cb.
1 parent b871f35 commit 6b47312

File tree

3 files changed

+34
-139
lines changed

3 files changed

+34
-139
lines changed

src/main/clojure/clojure/core/async.clj

Lines changed: 23 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -40,34 +40,7 @@ core.async. If not supplied the ExecutorService for :io will be
4040
used instead.
4141
4242
The set of contexts may grow in the future so the function should
43-
return nil for unexpected contexts.
44-
45-
Use the Java system property `clojure.core.async.vthreads` to control
46-
how core.async uses JDK 21+ virtual threads. The property can be one of
47-
the following values:
48-
49-
unset - core.async will opportunistically use vthreads when available
50-
(≥ Java 21) and will otherwise use the old IOC impl. io-thread and :io
51-
thread pool will run on platform threads if vthreads are not available.
52-
If AOT compiling, go blocks will always use IOC so that the resulting
53-
bytecode works on all JVMs (so no change in compiled output)
54-
55-
\"target\" - means that you are targeting virtual threads. At runtime
56-
from source, go blocks will throw if vthreads are not available.
57-
If AOT compiling, go blocks are always compiled as normal Clojure
58-
code to be run on vthreads and will throw at runtime if vthreads are
59-
not available (Java <21)
60-
61-
\"avoid\" - means that vthreads will not be used by core.async - you can
62-
use this to minimize impacts if you are not yet ready to utilize vthreads
63-
in your app. If AOT compiling, go blocks will use IOC. At runtime, io-thread
64-
and the :io thread pool use platform threads
65-
66-
Note: existing IOC compiled go blocks from older core.async versions continue
67-
to work (we retain and load the IOC state machine runtime - this does not
68-
require the analyzer), and you can interact with the same channels from both
69-
IOC and vthread code.
70-
"
43+
return nil for unexpected contexts."
7144
(:refer-clojure :exclude [reduce transduce into merge map take partition
7245
partition-by bounded-count])
7346
(:require [clojure.core.async.impl.protocols :as impl]
@@ -76,18 +49,14 @@ IOC and vthread code.
7649
[clojure.core.async.impl.timers :as timers]
7750
[clojure.core.async.impl.dispatch :as dispatch]
7851
[clojure.core.async.impl.ioc-macros :as ioc]
52+
clojure.core.async.impl.go ;; TODO: make conditional
7953
[clojure.core.async.impl.mutex :as mutex]
8054
)
8155
(:import [java.util.concurrent.atomic AtomicLong]
8256
[java.util.concurrent.locks Lock]
8357
[java.util.concurrent ThreadLocalRandom]
8458
[java.util Arrays ArrayList]))
8559

86-
(def ^:private lazy-loading-supported? (dispatch/at-least-clojure-version? [1 12 3]))
87-
88-
(when-not lazy-loading-supported?
89-
(require 'clojure.core.async.impl.go))
90-
9160
(alias 'core 'clojure.core)
9261

9362
(set! *warn-on-reflection* false)
@@ -169,21 +138,6 @@ IOC and vthread code.
169138
[^long msecs]
170139
(timers/timeout msecs))
171140

172-
(defn- defparkingop* [op doc arglist]
173-
(let [as (mapv #(list 'quote %) arglist)
174-
blockingop (-> op name (str "!") symbol)]
175-
`(def ~(with-meta op {:arglists `(list ~as) :doc doc})
176-
(fn [~'& ~'args]
177-
(if (dispatch/in-vthread?)
178-
~(list* apply blockingop '[args])
179-
(assert nil ~(str op " used not in (go ...) block")))))))
180-
181-
(defmacro defparkingop
182-
"Emits a Var with a function that checks if it's running in a virtual thread. If so then
183-
the related blocking op will be called, otherwise the function throws."
184-
[op doc arglist]
185-
(defparkingop* op doc arglist))
186-
187141
(defmacro defblockingop
188142
[op doc arglist & body]
189143
(let [as (mapv #(list 'quote %) arglist)]
@@ -208,11 +162,11 @@ IOC and vthread code.
208162
@ret
209163
(deref p))))
210164

211-
(defparkingop <!
212-
"takes a val from port. Must be called inside a (go ...) block, or on
213-
a virtual thread (no matter how it was started). Will return nil if
214-
closed. Will park if nothing is available."
215-
[port])
165+
(defn <!
166+
"takes a val from port. Must be called inside a (go ...) block. Will
167+
return nil if closed. Will park if nothing is available."
168+
[port]
169+
(assert nil "<! used not in (go ...) block"))
216170

217171
(defn take!
218172
"Asynchronously takes a val from port, passing to fn1. Will pass nil
@@ -247,12 +201,12 @@ IOC and vthread code.
247201
@ret
248202
(deref p))))
249203

250-
(defparkingop >!
204+
(defn >!
251205
"puts a val into port. nil values are not allowed. Must be called
252-
inside a (go ...) block, or on a virtual thread (no matter how it
253-
was started). Will park if no buffer space is available.
206+
inside a (go ...) block. Will park if no buffer space is available.
254207
Returns true unless port is already closed."
255-
[port val])
208+
[port val]
209+
(assert nil ">! used not in (go ...) block"))
256210

257211
(def ^:private nop (on-caller (fn [_])))
258212
(def ^:private fhnop (fn-handler nop))
@@ -390,16 +344,16 @@ IOC and vthread code.
390344
@ret
391345
(deref p))))
392346

393-
(defparkingop alts!
347+
(defn alts!
394348
"Completes at most one of several channel operations. Must be called
395-
inside a (go ...) block, or on a virtual thread (no matter how it was
396-
started). ports is a vector of channel endpoints, which can be either
397-
a channel to take from or a vector of [channel-to-put-to val-to-put],
398-
in any combination. Takes will be made as if by <!, and puts will be
399-
made as if by >!. Unless the :priority option is true, if more than one
400-
port operation is ready a non-deterministic choice will be made. If no
401-
operation is ready and a :default value is supplied, [default-val :default]
402-
will be returned, otherwise alts! will park until the first operation to
349+
inside a (go ...) block. ports is a vector of channel endpoints,
350+
which can be either a channel to take from or a vector of
351+
[channel-to-put-to val-to-put], in any combination. Takes will be
352+
made as if by <!, and puts will be made as if by >!. Unless
353+
the :priority option is true, if more than one port operation is
354+
ready a non-deterministic choice will be made. If no operation is
355+
ready and a :default value is supplied, [default-val :default] will
356+
be returned, otherwise alts! will park until the first operation to
403357
become ready completes. Returns [val port] of the completed
404358
operation, where val is the value taken for takes, and a
405359
boolean (true unless already closed, as per put!) for puts.
@@ -413,7 +367,8 @@ IOC and vthread code.
413367
used, nor in what order should they be, so they should not be
414368
depended upon for side effects."
415369

416-
[ports & {:as opts}])
370+
[ports & {:as opts}]
371+
(assert nil "alts! used not in (go ...) block"))
417372

418373
(defn do-alt [alts clauses]
419374
(assert (even? (count clauses)) "unbalanced clauses")
@@ -516,22 +471,6 @@ IOC and vthread code.
516471
(let [ret (impl/take! port (fn-handler nop false))]
517472
(when ret @ret)))
518473

519-
(defn- go* [body env]
520-
(cond (and (not dispatch/virtual-threads-available?)
521-
dispatch/target-vthreads?
522-
(not clojure.core/*compile-files*))
523-
(dispatch/report-vthreads-not-available-error!)
524-
525-
(or dispatch/target-vthreads?
526-
(and dispatch/unset-vthreads?
527-
dispatch/virtual-threads-available?
528-
(not clojure.core/*compile-files*)))
529-
`(do (dispatch/ensure-runtime-vthreads!)
530-
(thread-call (^:once fn* [] ~@body) :io))
531-
532-
:else
533-
((requiring-resolve 'clojure.core.async.impl.go/go-impl) env body)))
534-
535474
(defmacro go
536475
"Asynchronously executes the body, returning immediately to the
537476
calling thread. Additionally, any visible calls to <!, >! and alt!/alts!
@@ -548,7 +487,7 @@ IOC and vthread code.
548487
Returns a channel which will receive the result of the body when
549488
completed"
550489
[& body]
551-
(go* body &env))
490+
(#'clojure.core.async.impl.go/go-impl &env body))
552491

553492
(defonce ^:private thread-macro-executor nil)
554493

src/main/clojure/clojure/core/async/impl/dispatch.clj

Lines changed: 1 addition & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -72,63 +72,11 @@
7272
[workload]
7373
(Executors/newCachedThreadPool (counted-thread-factory (str "async-" (name workload) "-%d") true)))
7474

75-
(defn at-least-clojure-version?
76-
[[maj min incr]]
77-
(let [{:keys [major minor incremental]} *clojure-version*]
78-
(not (neg? (compare [major minor incremental] [maj min incr])))))
79-
80-
(def virtual-threads-available?
81-
(try
82-
(Class/forName "java.lang.Thread$Builder$OfVirtual")
83-
true
84-
(catch ClassNotFoundException _
85-
false)))
86-
87-
(defn- vthreads-directive
88-
"Retrieves the value of the sysprop clojure.core.async.vthreads."
89-
[]
90-
(System/getProperty "clojure.core.async.vthreads"))
91-
92-
(def target-vthreads?
93-
(= (vthreads-directive) "target"))
94-
95-
(def unset-vthreads?
96-
(nil? (vthreads-directive)))
97-
98-
(def vthreads-available-and-allowed?
99-
(and virtual-threads-available?
100-
(not= (vthreads-directive) "avoid")))
101-
102-
(def ^:private virtual-thread?
103-
(if virtual-threads-available?
104-
(eval `(fn [^Thread t#] (~'.isVirtual t#)))
105-
(constantly false)))
106-
107-
(defn in-vthread? []
108-
(and virtual-threads-available?
109-
(virtual-thread? (Thread/currentThread))))
110-
111-
(defn report-vthreads-not-available-error! []
112-
(throw (ex-info "Code compiled to target virtual threads, but is running without vthread support."
113-
{:runtime-jvm-version (System/getProperty "java.version")
114-
:vthreads-directive (vthreads-directive)})))
115-
116-
(defn ensure-runtime-vthreads! []
117-
(when (not vthreads-available-and-allowed?)
118-
(report-vthreads-not-available-error!)))
119-
120-
(defn- make-io-executor
121-
[]
122-
(if vthreads-available-and-allowed?
123-
(-> (.getDeclaredMethod Executors "newVirtualThreadPerTaskExecutor" (make-array Class 0))
124-
(.invoke nil (make-array Class 0)))
125-
(make-ctp-named :io)))
126-
12775
(defn ^:private create-default-executor
12876
[workload]
12977
(case workload
13078
:compute (make-ctp-named :compute)
131-
:io (make-io-executor)
79+
:io (make-ctp-named :io)
13280
:mixed (make-ctp-named :mixed)))
13381

13482
(def executor-for

src/test/clojure/clojure/core/async_test.clj

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,15 @@
193193
(io-thread (>!! c2 (clojure.string/upper-case (<!! c1))))
194194
(io-thread (>!! c3 (clojure.string/reverse (<!! c2))))
195195
(>!! c1 "loop")
196-
(is (= "POOL" (<!! c3))))))
196+
(is (= "POOL" (<!! c3)))))
197+
(testing "io-thread parking op should fail"
198+
(let [c1 (chan)]
199+
(io-thread
200+
(try
201+
(>! c1 :no)
202+
(catch AssertionError _
203+
(>!! c1 :yes))))
204+
(is (= :yes (<!! c1))))))
197205

198206
(deftest ops-tests
199207
(testing "map<"
@@ -480,4 +488,4 @@
480488
(thrown? AssertionError
481489
(let [c1 (a/chan)
482490
c2 (a/chan)]
483-
(a/alts!! [c1 [c2 nil]])))))
491+
(a/alts!! [c1 [c2 nil]])))))

0 commit comments

Comments
 (0)