@@ -6,27 +6,63 @@ import Streamly.Metrics.Perf.Type (PerfMetrics)
66
77import qualified Streamly.Data.Fold as Fold
88import qualified Streamly.Data.Stream as Stream
9- import Prelude hiding (sum )
109
11- noop :: Channel PerfMetrics -> IO ()
12- noop chan = do
13- benchOnWith chan " noop" (const (return () )) (1000000 :: Int )
14-
15- sum :: Channel PerfMetrics -> IO ()
16- sum chan = do
17- _ <- benchOnWith
18- chan " sum" (Stream. fold Fold. sum . Stream. enumerateFromTo (1 :: Int )) 1000000
10+ runWithStats :: Channel PerfMetrics -> String -> (a -> IO b ) -> a -> IO ()
11+ runWithStats chan label f arg = do
12+ _ <- benchOnWith chan label f arg
1913 return ()
2014
15+ -- A simple operation that does nothing. When we measure this operation the cpu
16+ -- time that is spent is just the overhead of the measuring code.
17+ noOp :: b -> IO ()
18+ noOp = (const (return () ))
19+
20+ sumOp :: Int -> IO Int
21+ sumOp =
22+ Stream. fold Fold. sum
23+ . Stream. enumerateFromTo (1 :: Int )
24+
25+ {-
26+ -- Pure code example
27+ listSum :: Int -> Int
28+ listSum =
29+ sum
30+ . enumFromTo (1::Int)
31+ -}
32+
33+ timeout :: Int
34+ timeout = 1
35+
36+ initStats :: IO (Channel PerfMetrics )
37+ initStats = do
38+ chan <- newChannel
39+ -- The channel will collect 100 samples per label, as soon as it receives
40+ -- 100 it will print the stats and start collecting the next batch.
41+ -- If no sample comes in "timeout" seconds then print the batch anyway.
42+ -- @forkChannelPrinter channel timeout batch-size@.
43+ _ <- forkChannelPrinter chan (fromIntegral timeout) 100
44+ return chan
45+
2146main :: IO ()
2247main = do
23- chan <- newChannel
24- _ <- forkChannelPrinter chan 10 100
25- Stream. fold Fold. drain (Stream. replicateM 1000 (noop chan))
26- Stream. fold Fold. drain (Stream. replicateM 1000 (sum chan))
27- threadDelay 1000000
28- {-
29- Stream.drain
30- ((Stream.replicateM 1000 (noop chan) <> Stream.replicateM 1000 (sum chan))
31- `Stream.parallelFst` Stream.fromEffect (printChannel chan 1 10))
32- -}
48+ -- Initialize a channel to send the stats to
49+ chan <- initStats
50+
51+ let withStats = runWithStats chan
52+
53+ -- One shot measurement, just one call
54+ withStats " noOpOne" noOp (1000000 :: Int )
55+ withStats " sumOpOne" sumOp (1000000 :: Int )
56+
57+ -- Run many iterations and print the stats for batches of 100
58+ let iterations n = Stream. fold Fold. drain . Stream. replicateM n
59+ withStatsMany label f arg = iterations 1000 $ runWithStats chan label f arg
60+
61+ -- Run the "noOp" and "sumOp" 1000 times, passing 1000000 as argument and
62+ -- sending the stats to "chan". The stats collected for noOp and sumOp will
63+ -- be sent to the channel and printed by it on console.
64+ withStatsMany " noOpMany" noOp (1000000 :: Int )
65+ withStatsMany " sumOpMany" sumOp (1000000 :: Int )
66+
67+ -- Wait for the channel to drain
68+ threadDelay ((timeout + 2 ) * 1000000 )
0 commit comments