Skip to content

Commit 150d15d

Browse files
Refactor aggregateListBy and add some tracing
1 parent 8e41d21 commit 150d15d

1 file changed

Lines changed: 14 additions & 9 deletions

File tree

lib/Streamly/Metrics/Channel/Common.hs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,31 @@ import Prelude hiding (showList)
2323
-- Event processing
2424
-------------------------------------------------------------------------------
2525

26+
-- XXX We can use incremental window folds instead. We do not need to collect
27+
-- all stats and then calculate.
2628
aggregateListBy :: (MonadAsync m, Ord k, Fractional a) =>
2729
Double -> Int -> Stream m (AbsTime, (k, [a])) -> Stream m (k, [a])
2830
aggregateListBy timeout batchsize stream =
2931
fmap (second fromJust)
3032
$ Stream.filter (isJust . snd)
31-
$ Stream.classifySessionsBy
32-
0.1 False (return . (> 1000)) timeout f stream
33+
-- $ Stream.trace (\x -> liftIO $ putStrLn $ "after classify: " ++ show x)
34+
$ Stream.classifySessionsBy 0.1 False (return . (> 1000)) timeout f
35+
-- $ Stream.trace (liftIO . print)
36+
$ stream
3337

3438
where
3539

3640
scale Nothing _ = Nothing
37-
scale (Just xs) count = Just $ map (/ count) xs
41+
scale (Just xs) cnt = Just $ map (/ cnt) xs
3842

39-
f =
40-
Fold.teeWithFst
41-
scale
42-
(Fold.take batchsize (Fold.foldl1' (zipWith (+))))
43-
(Fold.lmap (const 1) Fold.sum)
43+
addMetrics = Fold.foldl1' (zipWith (+))
44+
collectBatch = Fold.take batchsize addMetrics
45+
count = fmap fromIntegral Fold.length
46+
47+
f = Fold.teeWithFst scale collectBatch count
4448

4549
printKV :: (MonadIO m, Show k, Show a, Indexable a) => Stream m (k, [a]) -> m b
4650
printKV stream =
4751
let f (k, xs) = liftIO $ putStrLn $ show k ++ ":\n" ++ showList xs
48-
in Stream.fold (Fold.drainMapM f) stream >> error "printChannel: Metrics channel closed"
52+
in Stream.fold (Fold.drainMapM f) stream
53+
>> error "printChannel: Metrics channel closed"

0 commit comments

Comments
 (0)