Skip to content

Commit 8e41d21

Browse files
Use common channel code from the Common module
1 parent 519064c commit 8e41d21

1 file changed

Lines changed: 9 additions & 32 deletions

File tree

lib/Streamly/Metrics/Channel.hs

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,17 @@ import Control.Concurrent.STM (atomically)
1515
import Control.Concurrent.STM.TBQueue
1616
(TBQueue, newTBQueue, readTBQueue, writeTBQueue)
1717
import Control.Monad.IO.Class (liftIO, MonadIO)
18-
import Data.Bifunctor (second)
1918
import Data.Function ((&))
20-
import Data.Maybe (fromJust, isJust)
2119
import Streamly.Data.Stream (Stream)
2220
import Streamly.Internal.Data.Time.Clock (getTime, Clock (Monotonic))
2321
import Streamly.Internal.Data.Time.Units (AbsTime)
22+
import Streamly.Metrics.Channel.Common (aggregateListBy, printKV)
2423
import Streamly.Metrics.Perf.Type (PerfMetrics(..))
2524
import Streamly.Metrics.Perf (benchWith)
26-
import Streamly.Metrics.Type (showList, Indexable)
25+
import Streamly.Metrics.Type (Indexable)
2726
import Streamly.Data.Stream.Prelude (MonadAsync)
2827

29-
import qualified Streamly.Data.Fold as Fold
30-
import qualified Streamly.Internal.Data.Fold as Fold
3128
import qualified Streamly.Data.Stream as Stream
32-
import qualified Streamly.Internal.Data.Stream.Prelude as Stream
33-
34-
import Prelude hiding (showList)
3529

3630
-------------------------------------------------------------------------------
3731
-- Event processing
@@ -60,30 +54,6 @@ send (Channel chan) desc metrics = do
6054
fromChan :: MonadAsync m => TBQueue a -> Stream m a
6155
fromChan = Stream.repeatM . (liftIO . atomically . readTBQueue)
6256

63-
aggregateListBy :: (MonadAsync m, Ord k, Fractional a) =>
64-
Double -> Int -> Stream m (AbsTime, (k, [a])) -> Stream m (k, [a])
65-
aggregateListBy timeout batchsize stream =
66-
fmap (second fromJust)
67-
$ Stream.filter (isJust . snd)
68-
$ Stream.classifySessionsBy
69-
0.1 False (return . (> 1000)) timeout f stream
70-
71-
where
72-
73-
scale Nothing _ = Nothing
74-
scale (Just xs) count = Just $ map (/ count) xs
75-
76-
f =
77-
Fold.teeWithFst
78-
scale
79-
(Fold.take batchsize (Fold.foldl1' (zipWith (+))))
80-
(Fold.lmap (const 1) Fold.sum)
81-
82-
printKV :: (MonadIO m, Show k, Show a, Indexable a) => Stream m (k, [a]) -> m b
83-
printKV stream =
84-
let f (k, xs) = liftIO $ putStrLn $ show k ++ ":\n" ++ showList xs
85-
in Stream.fold (Fold.drainMapM f) stream >> error "printChannel: Metrics channel closed"
86-
8757
-- XXX Print actual batch size and also scale the results per event.
8858

8959
-- | Forever print the metrics on a channel to the console periodically after
@@ -95,6 +65,13 @@ printChannel (Channel chan) timeout batchSize =
9565
& aggregateListBy timeout batchSize
9666
& printKV
9767

68+
-- | Start an async thread to print the stats received on the supplied channel
69+
-- and print the stats on console.
70+
--
71+
-- Usage: @forkChannelPrinter channel timeout batch-size@.
72+
--
73+
-- Stats are printed when either as many stat samples as the batch size have
74+
-- been received or we have not received a stat in "timeout" seconds.
9875
forkChannelPrinter :: (MonadAsync m, Show a, Fractional a, Indexable a) =>
9976
Channel a -> Double -> Int -> m ThreadId
10077
forkChannelPrinter chan timeout = liftIO . forkIO . printChannel chan timeout

0 commit comments

Comments
 (0)