@@ -67,19 +67,19 @@ data Future a = Future {-# UNPACK #-} !(IORef (IVar a))
6767
6868data IVar a
6969 = Full ! a
70- | Pending {- # UNPACK #-} !Event ! (Maybe (Lifetime FunctionTable )) ! a
71- | Empty
70+ | Pending {- # UNPACK #-} !Event ! (Maybe (Lifetime FunctionTable )) ! [ Future () ] ! a
71+ | Empty ! [ Future () ]
7272
7373
7474instance Async PTX where
7575 type FutureR PTX = Future
7676
7777 newtype Par PTX a = Par { runPar :: ReaderT ParState (LLVM PTX ) a }
78- deriving ( Functor , Applicative , Monad , MonadIO , MonadReader ParState , MonadState PTX )
78+ deriving ( Functor , Applicative , Monad , MonadIO , MonadReader ParState , MonadState PTX , MonadThrow , MonadCatch , MonadMask )
7979
8080 {-# INLINEABLE new #-}
8181 {-# INLINEABLE newFull #-}
82- new = Future <$> liftIO (newIORef Empty )
82+ new = Future <$> liftIO (newIORef ( Empty [] ) )
8383 newFull v = Future <$> liftIO (newIORef (Full v))
8484
8585 {-# INLINEABLE spawn #-}
@@ -104,11 +104,17 @@ instance Async PTX where
104104 stream <- asks ptxStream
105105 kernel <- asks ptxKernel
106106 event <- liftPar (Event. waypoint stream)
107- ready <- liftIO (Event. query event)
108- liftIO . modifyIORef' ref $ \ case
109- Empty -> if ready then Full v
110- else Pending event kernel v
111- _ -> internalError " multiple put"
107+ liftIO $ do
108+ ready <- Event. query event
109+ ivar <- readIORef ref
110+ case ivar of
111+ Empty statusHandles ->
112+ if ready then do
113+ writeIORef ref $ Full v
114+ signalCompletion statusHandles
115+ else
116+ writeIORef ref $ Pending event kernel statusHandles v
117+ _ -> internalError " multiple put"
112118
113119 -- Get the value of Future. Since the actual cross-stream synchronisation
114120 -- happens on the device, we should never have to block/reschedule the main
@@ -122,7 +128,7 @@ instance Async PTX where
122128 ivar <- readIORef ref
123129 case ivar of
124130 Full v -> return v
125- Pending event k v -> do
131+ Pending event k statusHandles v -> do
126132 ready <- Event. query event
127133 if ready
128134 then do
@@ -132,15 +138,33 @@ instance Async PTX where
132138 Nothing -> return ()
133139 else
134140 Event. after event stream
141+ signalCompletion statusHandles
135142 return v
136- Empty -> internalError " blocked on an IVar"
143+ Empty _ -> internalError " blocked on an IVar"
137144
138145 {-# INLINEABLE block #-}
139146 block = liftIO . wait
140147
141148 {-# INLINE liftPar #-}
142149 liftPar = Par . lift
143150
151+ {-# INLINE statusHandle #-}
152+
153+ statusHandle (Future ref) = do
154+ emptyFut <- new
155+ fullFut <- newFull ()
156+ liftIO $ atomicModifyIORef' ref $ \ case
157+ Full v -> (Full v, fullFut)
158+ Empty statusHandles -> (Empty (emptyFut: statusHandles), emptyFut)
159+ Pending e k statusHandles v -> (Pending e k (emptyFut: statusHandles) v, emptyFut)
160+
161+ {-# INLINE poll #-}
162+
163+ poll (Future ref) = do
164+ ivar <- liftIO $ readIORef ref
165+ case ivar of
166+ Full v -> return (Just v)
167+ _ -> return Nothing
144168
145169-- | Block the calling _host_ thread until the value offered by the future is
146170-- available.
@@ -150,13 +174,17 @@ wait :: Future a -> IO a
150174wait (Future ref) = do
151175 ivar <- readIORef ref
152176 case ivar of
153- Full v -> return v
154- Pending event k v -> do
177+ Full v -> return v
178+ Pending event k statusHandles v -> do
155179 Event. block event
156180 writeIORef ref (Full v)
157181 case k of
158182 Just f -> touchLifetime f
159183 Nothing -> return ()
184+ signalCompletion statusHandles
160185 return v
161- Empty -> internalError " blocked on an IVar"
186+ Empty _ -> internalError " blocked on an IVar"
187+
188+ signalCompletion :: [Future () ] -> IO ()
189+ signalCompletion = mapM_ $ \ (Future ref) -> writeIORef ref $ Full ()
162190
0 commit comments