@@ -7,16 +7,39 @@ Description : This module provides an API for managing worker threads in the IDE
77see Note [Serializing runs in separate thread]
88-}
99module Development.IDE.Core.WorkerThread
10- (withWorkerQueue , awaitRunInThread )
11- where
10+ ( LogWorkerThread (.. ),
11+ withWorkerQueue ,
12+ awaitRunInThread ,
13+ TaskQueue ,
14+ isEmptyTaskQueue ,
15+ writeTaskQueue ,
16+ withWorkerQueueSimple
17+ )
18+ where
1219
1320import Control.Concurrent.Async (withAsync )
1421import Control.Concurrent.STM
1522import Control.Concurrent.Strict (newBarrier , signalBarrier ,
1623 waitBarrier )
17- import Control.Exception.Safe (SomeException , throwIO , try )
18- import Control.Monad ( forever )
24+ import Control.Exception.Safe (SomeException , finally , throwIO ,
25+ try )
1926import Control.Monad.Cont (ContT (ContT ))
27+ import qualified Data.Text as T
28+ import Ide.Logger
29+
30+ data LogWorkerThread
31+ = LogThreadEnding ! T. Text
32+ | LogThreadEnded ! T. Text
33+ | LogSingleWorkStarting ! T. Text
34+ | LogSingleWorkEnded ! T. Text
35+ deriving (Show )
36+
37+ instance Pretty LogWorkerThread where
38+ pretty = \ case
39+ LogThreadEnding t -> " Worker thread ending:" <+> pretty t
40+ LogThreadEnded t -> " Worker thread ended:" <+> pretty t
41+ LogSingleWorkStarting t -> " Worker starting a unit of work: " <+> pretty t
42+ LogSingleWorkEnded t -> " Worker ended a unit of work: " <+> pretty t
2043
2144{-
2245Note [Serializing runs in separate thread]
@@ -28,30 +51,78 @@ Like the db writes, session loading in session loader, shake session restarts.
2851Originally we used various ways to implement this, but it was hard to maintain and error prone.
2952Moreover, we can not stop these threads uniformly when we are shutting down the server.
3053-}
54+ data TaskQueue a = TaskQueue (TQueue a )
55+
56+ data ExitOrTask t = Exit | Task t
57+
58+ newTaskQueueIO :: IO (TaskQueue a )
59+ newTaskQueueIO = TaskQueue <$> newTQueueIO
60+
61+ -- | 'withWorkerQueueSimple' is a simplified version of 'withWorkerQueue'
62+ -- for the common case where the worker function is just 'id'.
63+ withWorkerQueueSimple :: Recorder (WithPriority LogWorkerThread ) -> T. Text -> ContT () IO (TaskQueue (IO () ))
64+ withWorkerQueueSimple recorder title = withWorkerQueue recorder title id
3165
3266-- | 'withWorkerQueue' creates a new 'TQueue', and launches a worker
3367-- thread which polls the queue for requests and runs the given worker
3468-- function on them.
35- withWorkerQueue :: (t -> IO a ) -> ContT () IO (TQueue t )
36- withWorkerQueue workerAction = ContT $ \ mainAction -> do
37- q <- newTQueueIO
38- withAsync (writerThread q) $ \ _ -> mainAction q
39- where
40- writerThread q =
41- forever $ do
42- l <- atomically $ readTQueue q
43- workerAction l
69+ withWorkerQueue :: Recorder (WithPriority LogWorkerThread ) -> T. Text -> (t -> IO () ) -> ContT () IO (TaskQueue t )
70+ withWorkerQueue recorder title workerAction = ContT $ \ mainAction -> do
71+ q <- newTaskQueueIO
72+ -- Use a TMVar as a stop flag to coordinate graceful shutdown.
73+ -- The worker thread checks this flag before dequeuing each job; if set, it exits immediately,
74+ -- ensuring that no new work is started after shutdown is requested.
75+ -- This mechanism is necessary because some downstream code may swallow async exceptions,
76+ -- making 'cancel' unreliable for stopping the thread in all cases.
77+ -- If 'cancel' does interrupt the thread (e.g., while blocked in STM or in a cooperative job),
78+ -- the thread exits immediately and never checks the TMVar; in such cases, the stop flag is redundant.
79+ b <- newEmptyTMVarIO
80+ withAsync (writerThread q b) $ \ _ -> do
81+ mainAction q
82+ -- if we want to debug the exact location the worker swallows an async exception, we can
83+ -- temporarily comment out the `finally` clause.
84+ `finally` atomically (putTMVar b () )
85+ logWith recorder Debug (LogThreadEnding title)
86+ logWith recorder Debug (LogThreadEnded title)
87+ where
88+ writerThread q b =
89+ -- See above: check stop flag before dequeuing, exit if set, otherwise run next job.
90+ do
91+ task <- atomically $ do
92+ task <- tryReadTaskQueue q
93+ isEm <- isEmptyTMVar b
94+ case (isEm, task) of
95+ (False , _) -> return Exit -- stop flag set, exit
96+ (_, Just t) -> return $ Task t -- got a task, run it
97+ (_, Nothing ) -> retry -- no task, wait
98+ case task of
99+ Exit -> return ()
100+ Task t -> do
101+ logWith recorder Debug $ LogSingleWorkStarting title
102+ workerAction t
103+ logWith recorder Debug $ LogSingleWorkEnded title
104+ writerThread q b
105+
44106
45107-- | 'awaitRunInThread' queues up an 'IO' action to be run by a worker thread,
46108-- and then blocks until the result is computed. If the action throws an
47109-- non-async exception, it is rethrown in the calling thread.
48- awaitRunInThread :: TQueue (IO () ) -> IO result -> IO result
49- awaitRunInThread q act = do
50- -- Take an action from TQueue, run it and
51- -- use barrier to wait for the result
52- barrier <- newBarrier
53- atomically $ writeTQueue q $ try act >>= signalBarrier barrier
54- resultOrException <- waitBarrier barrier
55- case resultOrException of
56- Left e -> throwIO (e :: SomeException )
57- Right r -> return r
110+ awaitRunInThread :: TaskQueue (IO () ) -> IO result -> IO result
111+ awaitRunInThread (TaskQueue q) act = do
112+ -- Take an action from TQueue, run it and
113+ -- use barrier to wait for the result
114+ barrier <- newBarrier
115+ atomically $ writeTQueue q (try act >>= signalBarrier barrier)
116+ resultOrException <- waitBarrier barrier
117+ case resultOrException of
118+ Left e -> throwIO (e :: SomeException )
119+ Right r -> return r
120+
121+ writeTaskQueue :: TaskQueue a -> a -> STM ()
122+ writeTaskQueue (TaskQueue q) = writeTQueue q
123+
124+ isEmptyTaskQueue :: TaskQueue a -> STM Bool
125+ isEmptyTaskQueue (TaskQueue q) = isEmptyTQueue q
126+
127+ tryReadTaskQueue :: TaskQueue a -> STM (Maybe a )
128+ tryReadTaskQueue (TaskQueue q) = tryReadTQueue q
0 commit comments