66{-# LANGUAGE TypeFamilies #-}
77{-# LANGUAGE TypeSynonymInstances #-}
88{-# OPTIONS_GHC -fno-warn-orphans #-}
9+ {-# LANGUAGE TupleSections #-}
910-- |
1011-- Module : Data.Array.Accelerate.LLVM.PTX.Execute.Async
1112-- Copyright : [2014..2020] The Accelerate Team
@@ -36,9 +37,10 @@ import Data.Array.Accelerate.LLVM.PTX.Link.Object ( FunctionTa
3637import qualified Data.Array.Accelerate.LLVM.PTX.Execute.Event as Event
3738import qualified Data.Array.Accelerate.LLVM.PTX.Execute.Stream as Stream
3839
40+ import Control.Concurrent.MVar
41+ import Control.Monad.Catch
3942import Control.Monad.Reader
4043import Control.Monad.State
41- import Data.IORef
4244
4345
4446-- | Evaluate a parallel computation
@@ -63,24 +65,24 @@ ptxKernel = snd
6365-- Implementation
6466-- --------------
6567
66- data Future a = Future {- # UNPACK #-} !(IORef (IVar a ))
68+ data Future a = Future {- # UNPACK #-} !(MVar (IVar a ))
6769
6870data IVar a
6971 = Full ! a
70- | Pending {- # UNPACK #-} !Event ! (Maybe (Lifetime FunctionTable )) ! a
71- | Empty
72+ | Pending {- # UNPACK #-} !Event ! (Maybe (Lifetime FunctionTable )) ! [ Future () ] ! a
73+ | Empty ! [ Future () ]
7274
7375
7476instance Async PTX where
7577 type FutureR PTX = Future
7678
7779 newtype Par PTX a = Par { runPar :: ReaderT ParState (LLVM PTX ) a }
78- deriving ( Functor , Applicative , Monad , MonadIO , MonadReader ParState , MonadState PTX )
80+ deriving ( Functor , Applicative , Monad , MonadIO , MonadReader ParState , MonadState PTX , MonadThrow , MonadCatch , MonadMask )
7981
8082 {-# INLINEABLE new #-}
8183 {-# INLINEABLE newFull #-}
82- new = Future <$> liftIO (newIORef Empty )
83- newFull v = Future <$> liftIO (newIORef (Full v))
84+ new = Future <$> liftIO (newMVar ( Empty [] ) )
85+ newFull v = Future <$> liftIO (newMVar (Full v))
8486
8587 {-# INLINEABLE spawn #-}
8688 spawn m = do
@@ -91,7 +93,7 @@ instance Async PTX where
9193
9294 {-# INLINEABLE fork #-}
9395 fork m = do
94- s' <- liftPar ( Stream. create)
96+ s' <- liftPar Stream. create
9597 () <- local (const (s', Nothing )) m
9698 liftIO (Stream. destroy s')
9799
@@ -104,59 +106,98 @@ instance Async PTX where
104106 stream <- asks ptxStream
105107 kernel <- asks ptxKernel
106108 event <- liftPar (Event. waypoint stream)
107- ready <- liftIO (Event. query event)
108- liftIO . modifyIORef' ref $ \ case
109- Empty -> if ready then Full v
110- else Pending event kernel v
111- _ -> internalError " multiple put"
109+ liftIO $ do
110+ ready <- Event. query event
111+ ivar <- readMVar ref
112+ case ivar of
113+ Empty statusHandles ->
114+ if ready then do
115+ modifyMVar_ ref $ const $ pure $ Full v
116+ signalCompletion statusHandles
117+ else
118+ modifyMVar_ ref $ const $ pure $ Pending event kernel statusHandles v
119+ _ -> internalError " multiple put"
112120
113121 -- Get the value of Future. Since the actual cross-stream synchronisation
114122 -- happens on the device, we should never have to block/reschedule the main
115123 -- thread waiting on a value; if we get an empty IVar at this point, something
116124 -- has gone wrong.
117125 --
118126 {-# INLINEABLE get #-}
119- get (Future ref) = do
127+ get fut @ (Future ref) = do
120128 stream <- asks ptxStream
121129 liftIO $ do
122- ivar <- readIORef ref
130+ ivar <- readMVar ref
123131 case ivar of
124132 Full v -> return v
125- Pending event k v -> do
133+ Pending event _ _ v -> do
126134 ready <- Event. query event
127- if ready
128- then do
129- writeIORef ref (Full v)
130- case k of
131- Just f -> touchLifetime f
132- Nothing -> return ()
133- else
134- Event. after event stream
135- return v
136- Empty -> internalError " blocked on an IVar"
135+ if ready then
136+ completePending fut
137+ else do
138+ Event. after event stream
139+ return v
140+ Empty _ -> internalError " blocked on an IVar"
137141
138142 {-# INLINEABLE block #-}
139143 block = liftIO . wait
140144
141145 {-# INLINE liftPar #-}
142146 liftPar = Par . lift
143147
148+ {-# INLINE statusHandle #-}
149+
150+ statusHandle (Future ref) =
151+ liftIO $ modifyMVar ref $ \ case
152+ Full v -> (Full v,) . Future <$> newMVar (Full () )
153+ Empty statusHandles -> do
154+ emptyFut <- Future <$> newMVar (Empty [] )
155+ pure (Empty (emptyFut: statusHandles), emptyFut)
156+ Pending e k statusHandles v -> do
157+ pendingFut <- Future <$> newMVar (Pending e k [] () )
158+ pure (Pending e k (pendingFut: statusHandles) v, pendingFut)
159+
160+ {-# INLINE poll #-}
161+
162+ poll fut@ (Future ref) = liftIO $ do
163+ ivar <- readMVar ref
164+ case ivar of
165+ Full v ->
166+ return (Just v)
167+ Pending event _ _ _ -> do
168+ ready <- Event. query event
169+ if ready then
170+ Just <$> completePending fut
171+ else
172+ pure Nothing
173+ _ ->
174+ return Nothing
144175
145176-- | Block the calling _host_ thread until the value offered by the future is
146177-- available.
147178--
148179{-# INLINEABLE wait #-}
149180wait :: Future a -> IO a
150- wait (Future ref) = do
151- ivar <- readIORef ref
181+ wait fut @ (Future ref) = do
182+ ivar <- readMVar ref
152183 case ivar of
153- Full v -> return v
154- Pending event k v -> do
155- Event. block event
156- writeIORef ref (Full v)
157- case k of
158- Just f -> touchLifetime f
159- Nothing -> return ()
184+ Full v ->
160185 return v
161- Empty -> internalError " blocked on an IVar"
162-
186+ Pending event _ _ _-> do
187+ Event. block event
188+ completePending fut
189+ Empty _ ->
190+ internalError " blocked on an IVar"
191+
192+ signalCompletion :: [Future () ] -> IO ()
193+ signalCompletion = mapM_ $ \ (Future ref) -> modifyMVar_ ref $ const $ pure $ Full ()
194+
195+ completePending :: Future a -> IO a
196+ completePending (Future ref) =
197+ modifyMVar ref $ \ case
198+ Pending _ k statusHandles v -> do
199+ signalCompletion statusHandles
200+ maybe (pure () ) touchLifetime k
201+ pure (Full v, v)
202+ _ ->
203+ internalError " Expected (Pending ...)"
0 commit comments