Skip to content

Commit cfc7abb

Browse files
committed
Add Async Future status polling to PTX
1 parent d9c556d commit cfc7abb

5 files changed

Lines changed: 72 additions & 24 deletions

File tree

accelerate-llvm-ptx/accelerate-llvm-ptx.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ Library
101101
, deepseq >= 1.3
102102
, directory >= 1.0
103103
, dlist >= 0.6
104+
, exceptions >= 0.10
104105
, file-embed >= 0.0.8
105106
, filepath >= 1.0
106107
, formatting >= 7.0

accelerate-llvm-ptx/src/Data/Array/Accelerate/LLVM/PTX/Array/Prim.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ nonblocking !stream !action = do
350350
return (Nothing, future)
351351

352352
else do
353-
future <- Future <$> liftIO (newIORef (Pending event Nothing result))
353+
future <- Future <$> liftIO (newIORef (Pending event Nothing [] result))
354354
return (Just event, future)
355355

356356
{-# INLINE withLifetime #-}

accelerate-llvm-ptx/src/Data/Array/Accelerate/LLVM/PTX/Execute/Async.hs

Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,12 @@ import Data.Array.Accelerate.LLVM.PTX.Link.Object ( FunctionTa
3636
import qualified Data.Array.Accelerate.LLVM.PTX.Execute.Event as Event
3737
import qualified Data.Array.Accelerate.LLVM.PTX.Execute.Stream as Stream
3838

39+
import Control.Monad
40+
import Control.Monad.Catch
3941
import Control.Monad.Reader
4042
import Control.Monad.State
4143
import Data.IORef
44+
import Data.Maybe
4245

4346

4447
-- | Evaluate a parallel computation
@@ -67,19 +70,19 @@ data Future a = Future {-# UNPACK #-} !(IORef (IVar a))
6770

6871
data IVar a
6972
= Full !a
70-
| Pending {-# UNPACK #-} !Event !(Maybe (Lifetime FunctionTable)) !a
71-
| Empty
73+
| Pending {-# UNPACK #-} !Event !(Maybe (Lifetime FunctionTable)) ![Future ()] !a
74+
| Empty ![Future ()]
7275

7376

7477
instance Async PTX where
7578
type FutureR PTX = Future
7679

7780
newtype Par PTX a = Par { runPar :: ReaderT ParState (LLVM PTX) a }
78-
deriving ( Functor, Applicative, Monad, MonadIO, MonadReader ParState, MonadState PTX )
81+
deriving ( Functor, Applicative, Monad, MonadIO, MonadReader ParState, MonadState PTX, MonadThrow, MonadCatch, MonadMask )
7982

8083
{-# INLINEABLE new #-}
8184
{-# INLINEABLE newFull #-}
82-
new = Future <$> liftIO (newIORef Empty)
85+
new = Future <$> liftIO (newIORef (Empty []))
8386
newFull v = Future <$> liftIO (newIORef (Full v))
8487

8588
{-# INLINEABLE spawn #-}
@@ -104,11 +107,17 @@ instance Async PTX where
104107
stream <- asks ptxStream
105108
kernel <- asks ptxKernel
106109
event <- liftPar (Event.waypoint stream)
107-
ready <- liftIO (Event.query event)
108-
liftIO . modifyIORef' ref $ \case
109-
Empty -> if ready then Full v
110-
else Pending event kernel v
111-
_ -> internalError "multiple put"
110+
liftIO $ do
111+
ready <- Event.query event
112+
ivar <- readIORef ref
113+
case ivar of
114+
Empty statusHandles ->
115+
if ready then do
116+
writeIORef ref $ Full v
117+
signalCompletion statusHandles
118+
else
119+
writeIORef ref $ Pending event kernel statusHandles v
120+
_ -> internalError "multiple put"
112121

113122
-- Get the value of Future. Since the actual cross-stream synchronisation
114123
-- happens on the device, we should never have to block/reschedule the main
@@ -122,25 +131,49 @@ instance Async PTX where
122131
ivar <- readIORef ref
123132
case ivar of
124133
Full v -> return v
125-
Pending event k v -> do
134+
Pending event k statusHandles v -> do
126135
ready <- Event.query event
127-
if ready
128-
then do
129-
writeIORef ref (Full v)
130-
case k of
131-
Just f -> touchLifetime f
132-
Nothing -> return ()
133-
else
134-
Event.after event stream
136+
if ready then do
137+
writeIORef ref (Full v)
138+
signalCompletion statusHandles
139+
maybe (pure ()) touchLifetime k
140+
else
141+
Event.after event stream
135142
return v
136-
Empty -> internalError "blocked on an IVar"
143+
Empty _ -> internalError "blocked on an IVar"
137144

138145
{-# INLINEABLE block #-}
139146
block = liftIO . wait
140147

141148
{-# INLINE liftPar #-}
142149
liftPar = Par . lift
143150

151+
{-# INLINE statusHandle #-}
152+
153+
statusHandle (Future ref) = do
154+
emptyFut <- new
155+
fullFut <- newFull ()
156+
liftIO $ atomicModifyIORef' ref $ \case
157+
Full v -> (Full v, fullFut)
158+
Empty statusHandles -> (Empty (emptyFut:statusHandles), emptyFut)
159+
Pending e k statusHandles v -> (Pending e k (emptyFut:statusHandles) v, emptyFut)
160+
161+
{-# INLINE poll #-}
162+
163+
poll (Future ref) = liftIO $ do
164+
ivar <- readIORef ref
165+
case ivar of
166+
Full v -> return (Just v)
167+
Pending event k statusHandles v -> do
168+
ready <- Event.query event
169+
if ready then do
170+
writeIORef ref (Full v)
171+
signalCompletion statusHandles
172+
maybe (pure ()) touchLifetime k
173+
pure (Just v)
174+
else
175+
pure Nothing
176+
_ -> return Nothing
144177

145178
-- | Block the calling _host_ thread until the value offered by the future is
146179
-- available.
@@ -150,13 +183,17 @@ wait :: Future a -> IO a
150183
wait (Future ref) = do
151184
ivar <- readIORef ref
152185
case ivar of
153-
Full v -> return v
154-
Pending event k v -> do
186+
Full v -> return v
187+
Pending event k statusHandles v -> do
155188
Event.block event
156189
writeIORef ref (Full v)
157190
case k of
158191
Just f -> touchLifetime f
159192
Nothing -> return ()
193+
signalCompletion statusHandles
160194
return v
161-
Empty -> internalError "blocked on an IVar"
195+
Empty _ -> internalError "blocked on an IVar"
196+
197+
signalCompletion :: [Future ()] -> IO ()
198+
signalCompletion = mapM_ $ \(Future ref) -> writeIORef ref $ Full ()
162199

accelerate-llvm-ptx/src/Language/Haskell/TH/Extra.hs

Lines changed: 0 additions & 1 deletion
This file was deleted.

accelerate-llvm/src/Data/Array/Accelerate/LLVM/Execute/Async.hs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,17 @@ class (Monad (Par arch), MonadIO (Par arch)) => Async arch where
6262
--
6363
liftPar :: HasCallStack => LLVM arch a -> Par arch a
6464

65+
-- | Produce a future which is linked to another future where the completion
66+
-- status of this original future is reflected in the completion status of
67+
-- this status-only future.
68+
--
69+
statusHandle :: HasCallStack => FutureR arch a -> Par arch (FutureR arch ())
70+
71+
-- | Check the completion of a Future without blocking and if it is Full
72+
-- yield its contents, else return Nothing.
73+
--
74+
poll :: HasCallStack => FutureR arch a -> Par arch (Maybe a)
75+
6576
-- | Read a value stored in a future, once it is available. This is blocking
6677
-- with respect to both the host and remote device.
6778
--

0 commit comments

Comments
 (0)