Skip to content

Commit

Permalink
wip, task impl.
Browse files Browse the repository at this point in the history
  • Loading branch information
fogus committed Nov 25, 2024
1 parent bf03523 commit 5ecb7c3
Showing 1 changed file with 56 additions and 32 deletions.
88 changes: 56 additions & 32 deletions src/main/clojure/clojure/core/async.clj
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,23 @@ to catch and handle."
[^long msecs]
(timers/timeout msecs))

;; task

(defn- check-not-in-vthread [op]
(when (not (Thread/.isVirtual (Thread/currentThread)))
(assert nil (str op " used not in virtual thread"))))

(defmacro defvthreadcheckingop
[op doc arglist & body]
(let [as (mapv #(list 'quote %) arglist)]
`(def ~(with-meta op {:arglists `(list ~as) :doc doc})
(if (Boolean/getBoolean "clojure.core.async.vthread-checking")
(fn ~arglist
(check-not-in-vthread ~op)
~@body)
(fn ~arglist
~@body)))))

(defmacro defblockingop
[op doc arglist & body]
(let [as (mapv #(list 'quote %) arglist)]
Expand All @@ -137,11 +154,11 @@ to catch and handle."
@ret
(deref p))))

(defn <!
"takes a val from port. Must be called inside a (go ...) block. Will
(defvthreadcheckingop <!
"takes a val from port. Must be called inside a (task ...) block. Will
return nil if closed. Will park if nothing is available."
[port]
(assert nil "<! used not in (go ...) block"))
(<!! port))

(defn take!
"Asynchronously takes a val from port, passing to fn1. Will pass nil
Expand Down Expand Up @@ -176,12 +193,12 @@ to catch and handle."
@ret
(deref p))))

(defn >!
(defvthreadcheckingop >!
"puts a val into port. nil values are not allowed. Must be called
inside a (go ...) block. Will park if no buffer space is available.
inside a (task ...) block. Will park if no buffer space is available.
Returns true unless port is already closed."
[port val]
(assert nil ">! used not in (go ...) block"))
(>!! port val))

(defn- nop [_])
(def ^:private fhnop (fn-handler nop))
Expand Down Expand Up @@ -313,7 +330,7 @@ to catch and handle."
@ret
(deref p))))

(defn alts!
(defvthreadcheckingop alts!
"Completes at most one of several channel operations. Must be called
inside a (go ...) block. ports is a vector of channel endpoints,
which can be either a channel to take from or a vector of
Expand All @@ -337,7 +354,7 @@ to catch and handle."
depended upon for side effects."

[ports & {:as opts}]
(assert nil "alts! used not in (go ...) block"))
(alts!! ports opts))

(defn do-alt [alts clauses]
(assert (even? (count clauses)) "unbalanced clauses")
Expand Down Expand Up @@ -440,34 +457,41 @@ to catch and handle."
(let [ret (impl/take! port (fn-handler nop false))]
(when ret @ret)))

(defmacro go
"Asynchronously executes the body, returning immediately to the
calling thread. Additionally, any visible calls to <!, >! and alt!/alts!
channel operations within the body will block (if necessary) by
'parking' the calling thread rather than tying up an OS thread (or
the only JS thread when in ClojureScript). Upon completion of the
operation, the body will be resumed.
go blocks should not (either directly or indirectly) perform operations
that may block indefinitely. Doing so risks depleting the fixed pool of
go block threads, causing all go block processing to stop. This includes
core.async blocking ops (those ending in !!) and other blocking IO.
;; task

(def ^:private task-factory
(-> (Thread/ofVirtual)
(Thread$Builder/.name "task-" 0)
.factory))

(defmacro task
"Asynchronously executes the body in a virtual thread, returning immediately
to the calling thread.
task blocks should not (either directly or indirectly) perform operations
that may block indefinitely. Doing so risks pinning the virtual thread
to its carrier thread.
Returns a channel which will receive the result of the body when
completed"
[& body]
(let [crossing-env (zipmap (keys &env) (repeatedly gensym))]
`(let [c# (chan 1)
captured-bindings# (Var/getThreadBindingFrame)]
(dispatch/run
(^:once fn* []
(let [~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env)
f# ~(ioc/state-machine `(do ~@body) 1 [crossing-env &env] ioc/async-custom-terminators)
state# (-> (f#)
(ioc/aset-all! ioc/USER-START-IDX c#
ioc/BINDINGS-IDX captured-bindings#))]
(ioc/run-state-machine-wrapped state#))))
c#)))
`(let [c# (chan 1)
captured-bindings# (Var/getThreadBindingFrame)]
(.execute
(Executors/newThreadPerTaskExecutor task-factory)
(^:once fn* []
(Var/resetThreadBindingFrame captured-bindings#)
(try
(let [result# (do ~@body)]
(>!! c# result#))
(finally
(close! c#)))))
c#))

(defmacro go
"Dispatches to task macro."
[& body]
`(task ~body))

(defonce ^:private ^Executor thread-macro-executor
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-thread-macro-%d" true)))
Expand Down

0 comments on commit 5ecb7c3

Please sign in to comment.