Skip to content

Commit 19a7ca2

Browse files
Add a testsuite for resource cleanup tests
1 parent f07de1f commit 19a7ca2

2 files changed

Lines changed: 141 additions & 0 deletions

File tree

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
module Streamly.Test.Data.Stream.Exception (main)
2+
3+
where
4+
5+
import Control.Concurrent (threadDelay)
6+
import Control.Exception (SomeException, throw, catch, finally, bracket_)
7+
import Control.Monad (when)
8+
import Data.Foldable (sequenceA_)
9+
import Data.Function ((&))
10+
import Data.IORef (IORef, newIORef, atomicModifyIORef, readIORef)
11+
import System.Mem (performMajorGC)
12+
13+
import qualified Streamly.Internal.Data.Stream.Prelude as Stream
14+
import qualified Streamly.Internal.Data.Stream as Stream
15+
import qualified Streamly.Data.Fold as Fold
16+
17+
incr :: Num a => IORef a -> IO ()
18+
incr ref = do
19+
-- tid <- myThreadId
20+
-- putStrLn $ "Incrementing the counter: " ++ show tid
21+
atomicModifyIORef ref (\x -> (x + 1, ()))
22+
23+
decr :: Num a => IORef a -> IO ()
24+
decr ref = do
25+
atomicModifyIORef ref (\x -> (x - 1, ()))
26+
-- tid <- myThreadId
27+
-- putStrLn $ "Decremented the counter: " ++ show tid
28+
29+
handler :: SomeException -> IO b
30+
handler (e :: SomeException) = do
31+
-- tid <- myThreadId
32+
-- putStrLn $ "Child: " ++ show tid ++ " " ++ show e
33+
-- Rethrowing the exception is important, otherwise the thread will not
34+
-- exit.
35+
throw e
36+
37+
run :: Num a => IORef a -> IO c -> IO c
38+
run ref x = bracket_ (incr ref) (decr ref) (x `catch` handler)
39+
40+
timeout :: Int
41+
timeout = 1000000
42+
43+
takeCount :: Int
44+
takeCount = 1
45+
46+
stream :: Num a =>
47+
IORef a -> (Stream.Config -> Stream.Config) -> Stream.Stream IO ()
48+
stream ref modifier =
49+
Stream.enumerateFrom (1 :: Int)
50+
& Stream.parMapM modifier
51+
( \x ->
52+
-- somehow if all of them have same timeout then the chances of
53+
-- failure are more.
54+
run ref $ threadDelay (if x == 1 then 1000000 else timeout)
55+
)
56+
& Stream.take takeCount
57+
58+
finalAction :: (Show a, Eq a, Num a) => Bool -> IORef a -> Int -> IO ()
59+
finalAction gc ref t = do
60+
-- When cleanup happens via GC, ghc creates a thread for the finalizer to
61+
-- run, actual cleanup time depends on when that thread is scheduled. The
62+
-- thread may outlive one or more GCs. So we have to give it some time to
63+
-- finish. But it cannot be deterministic.
64+
-- threadDelay 1000000
65+
when gc $ do
66+
performMajorGC
67+
threadDelay t
68+
performMajorGC
69+
threadDelay t
70+
r <- readIORef ref
71+
putStrLn $ "Pending computations: " ++ show r
72+
-- Delay for letting any gc based cleanup threads drain and print output
73+
-- for debugging
74+
-- when gc $ threadDelay 1000000
75+
when (r /= 0) $ error "Failed"
76+
77+
cleanup :: Int -> (Stream.Config -> Stream.Config) -> IO ()
78+
cleanup t cfg = do
79+
ref <- newIORef (0 :: Int)
80+
(Stream.cleanupIO (\f -> stream ref (cfg . Stream.addCleanup f))
81+
& Stream.fold Fold.drain) `finally` finalAction False ref t
82+
83+
cleanupEffect :: Int -> (Stream.Config -> Stream.Config) -> IO ()
84+
cleanupEffect t cfg = do
85+
ref <- newIORef (0 :: Int)
86+
Stream.cleanupEffectIO (\f -> stream ref (cfg . Stream.addCleanup f)
87+
& Stream.fold Fold.drain) `finally` finalAction False ref t
88+
89+
finallyGC :: Int -> (Stream.Config -> Stream.Config) -> IO ()
90+
finallyGC t cfg = do
91+
ref <- newIORef (0 :: Int)
92+
Stream.finallyIO (finalAction True ref t) (stream ref cfg)
93+
& Stream.fold Fold.drain
94+
95+
-- XXX Include rate as well
96+
limits :: [(String, Stream.Config -> Stream.Config)]
97+
limits =
98+
[ ("default", id)
99+
, ("maxBuffer 10", Stream.maxBuffer 10)
100+
, ("maxThreads 10", Stream.maxThreads 10)
101+
]
102+
103+
sched :: [(String, Stream.Config -> Stream.Config)]
104+
sched =
105+
[ ("default", id)
106+
, ("eager", Stream.eager True)
107+
, ("ordered", Stream.ordered True)
108+
, ("interleaved", Stream.interleaved True)
109+
]
110+
111+
funcs :: [(String, Int -> (Stream.Config -> Stream.Config) -> IO ())]
112+
funcs =
113+
[ ("cleanup", cleanup)
114+
, ("cleanupEffect", cleanupEffect)
115+
, ("finallyGC", finallyGC)
116+
]
117+
118+
main :: IO ()
119+
main = do
120+
let cfg = id -- Stream.inspect True
121+
122+
-- TODO: Interrupt test
123+
-- Run the main test in a separate thread. Keep the thread-id in a global
124+
-- variable which will be used to interrupt the thread. Once one thread is
125+
-- over then the next test will keep it's threadId in the global var.
126+
-- Run another thread which sleeps for random intervals and sends
127+
-- UserInterrupt exception to the current test thread-id stored in the
128+
-- glbal variable in a loop.
129+
sequenceA_
130+
[ putStrLn ("Running: " ++ fst f ++ " " ++ fst x1 ++ " " ++ fst x2)
131+
>> snd f
132+
(if fst x1 == "default" then 500000 else 100000)
133+
(snd x1 . snd x2 . cfg)
134+
| f <- funcs, x1 <- limits, x2 <- sched
135+
]

test/streamly-tests.cabal

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,12 @@ test-suite Data.Stream.Concurrent
369369
if flag(use-streamly-core)
370370
buildable: False
371371

372+
test-suite Data.Stream.Exception
373+
import: test-options
374+
type: exitcode-stdio-1.0
375+
main-is: Streamly/Test/Data/Stream/Exception.hs
376+
ghc-options: -main-is Streamly.Test.Data.Stream.Exception.main
377+
372378
test-suite Data.Stream.Time
373379
import: test-options
374380
type: exitcode-stdio-1.0

0 commit comments

Comments
 (0)