-
Notifications
You must be signed in to change notification settings - Fork 62
Expand file tree
/
Copy pathAsync.hs
More file actions
177 lines (149 loc) · 5.3 KB
/
Copy pathAsync.hs
File metadata and controls
177 lines (149 loc) · 5.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
-- |
-- Module : Data.Array.Accelerate.LLVM.Native.Execute.Async
-- Copyright : [2014..2020] The Accelerate Team
-- License : BSD3
--
-- Maintainer : Trevor L. McDonell <trevor.mcdonell@gmail.com>
-- Stability : experimental
-- Portability : non-portable (GHC extensions)
--
module Data.Array.Accelerate.LLVM.Native.Execute.Async (
Async(..), Future(..), IVar(..), getArrays,
evalPar, putIO,
) where
-- accelerate
import Data.Array.Accelerate.Error
import Data.Array.Accelerate.LLVM.Execute.Async
import Data.Array.Accelerate.LLVM.Native.Execute.Scheduler
import Data.Array.Accelerate.LLVM.Native.Target
import Data.Array.Accelerate.LLVM.State
-- standard library
import Control.Concurrent
import Control.Monad.Cont
import Control.Monad.State
import Data.IORef
import Data.Sequence ( Seq )
import qualified Data.Sequence as Seq
-- | Evaluate a parallel computation
--
-- The worker threads execute the computation, while the calling thread
-- effectively sleeps waiting for the result.
--
{-# INLINEABLE evalPar #-}
evalPar :: Par Native a -> LLVM Native a
evalPar work = do
result <- liftIO newEmptyMVar
runContT (runPar work) (liftIO . putMVar result)
liftIO $ takeMVar result
-- XXX: Running the initial computation on the worker threads can lead to the
-- workers becoming blocked, possibly waiting for the result MVars to be
-- filled from previous (lazily evaluated) computations (speculation). This
-- happened for example with the code from Issue255, when extracting the
-- result at index > number of worker threads.
--
-- liftIO $ do
-- schedule (workers native)
-- Job { jobTasks = Seq.singleton $ evalLLVM native (runContT (runPar work) (liftIO . putMVar result))
-- , jobDone = Nothing
-- }
-- takeMVar result
-- Implementation
-- --------------
data Future a = Future {-# UNPACK #-} !(IORef (IVar a))
data IVar a
= Full !a
| Blocked !(Seq (a -> IO ()))
instance Async Native where
type FutureR Native = Future
newtype Par Native a = Par { runPar :: ContT () (LLVM Native) a }
deriving ( Functor, Applicative, Monad, MonadIO, MonadCont, MonadState Native )
{-# INLINE new #-}
{-# INLINE newFull #-}
new = Future <$> liftIO (newIORef (Blocked Seq.empty))
newFull v = Future <$> liftIO (newIORef (Full v))
{-# INLINE fork #-}
{-# INLINE spawn #-}
fork = id
spawn = id
{-# INLINE get #-}
get (Future ref) =
callCC $ \k -> do
native <- gets llvmTarget
next <- liftIO . atomicModifyIORef' ref $ \case
Blocked ks -> (Blocked (ks Seq.|> evalParIO native . k), reschedule)
Full a -> (Full a, return a)
next
{-# INLINE put #-}
put future ref = do
Native{..} <- gets llvmTarget
liftIO (putIO workers future ref)
{-# INLINE liftPar #-}
liftPar = Par . lift
{-# INLINE statusHandle #-}
statusHandle (Future ref) = do
emptyFut@(Future emptyIVar) <- new
fullFut <- newFull ()
liftIO $ atomicModifyIORef' ref $ \case
Blocked ks -> (Blocked (ks Seq.|> const (writeIORef emptyIVar (Full ()))), emptyFut)
Full v -> (Full v, fullFut)
{-# INLINE poll #-}
poll (Future ref) = do
ivar <- liftIO $ readIORef ref
case ivar of
Full v -> return (Just v)
_ -> return Nothing
-- | Evaluate a continuation
--
{-# INLINE evalParIO #-}
evalParIO :: Native -> Par Native () -> IO ()
evalParIO native@Native{} work =
evalLLVM native (runContT (runPar work) return)
-- | The value represented by a future is now available. Push any blocked
-- continuations to the worker threads.
--
{-# INLINEABLE putIO #-}
putIO :: HasCallStack => Workers -> Future a -> a -> IO ()
putIO workers (Future ref) v = do
ks <- atomicModifyIORef' ref $ \case
Blocked ks -> (Full v, ks)
_ -> internalError "multiple put"
--
schedule workers Job { jobTasks = fmap ($ v) ks
, jobDone = Nothing
}
-- | The worker threads should search for other work to execute
--
{-# INLINE reschedule #-}
reschedule :: Par Native a
reschedule = Par $ ContT (\_ -> return ())
-- reschedule :: Par Native a
-- reschedule = Par $ ContT (const loop)
-- where
-- loop :: ReaderT Schedule (LLVM Native) ()
-- loop = do
-- queue <- ask
-- mwork <- liftIO $ tryPopR queue
-- case mwork of
-- Just work -> runContT (runPar work) (const loop)
-- Nothing -> liftIO yield >> loop
-- pushL :: MVar (Seq a) -> a -> IO ()
-- pushL ref a =
-- mask_ $ do
-- ma <- tryTakeMVar ref
-- case ma of
-- Nothing -> putMVar ref (Seq.singleton a)
-- Just as -> putMVar ref (a Seq.<| as)
-- popR :: MVar (Seq a) -> IO a
-- popR ref = do
-- q <- takeMVar ref
-- case Seq.viewr q of
-- Seq.EmptyR -> popR ref -- should be impossible
-- as Seq.:> a -> putMVar ref as >> return a