Skip to content

Commit be39d9f

Browse files
Restore the previously deleted streamly-metrics modules
1 parent dd85bc3 commit be39d9f

14 files changed

Lines changed: 1329 additions & 27 deletions

File tree

haskell-perf.cabal

Lines changed: 88 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ extra-doc-files:
3232

3333
extra-source-files:
3434
examples/*.hs
35+
lib/Streamly/Metrics/File.hs
36+
lib/Streamly/Metrics/Console.hs
3537

3638
source-repository head
3739
type: git
@@ -52,39 +54,34 @@ flag dev
5254
-------------------------------------------------------------------------------
5355

5456
common default-extensions
57+
-- In GHC 2024
5558
default-extensions:
5659
BangPatterns
57-
CApiFFI
5860
ConstraintKinds
5961
DeriveDataTypeable
6062
DeriveGeneric
6163
DeriveTraversable
62-
DoAndIfThenElse
6364
ExistentialQuantification
6465
FlexibleContexts
6566
FlexibleInstances
6667
GeneralizedNewtypeDeriving
6768
InstanceSigs
6869
KindSignatures
6970
LambdaCase
70-
MagicHash
7171
MultiParamTypeClasses
72-
PatternSynonyms
7372
RankNTypes
74-
RecordWildCards
7573
ScopedTypeVariables
7674
TupleSections
7775
TypeApplications
78-
TypeFamilies
79-
ViewPatterns
76+
TypeOperators
8077

81-
-- MonoLocalBinds, enabled by TypeFamilies, causes performance
82-
-- regressions. Disable it. This must come after TypeFamilies,
83-
-- otherwise TypeFamilies will enable it again.
84-
NoMonoLocalBinds
85-
86-
-- UndecidableInstances -- Does not show any perf impact
87-
-- UnboxedTuples -- interferes with (#.)
78+
-- Not in GHC2024
79+
default-extensions:
80+
CPP
81+
CApiFFI
82+
MagicHash
83+
PatternSynonyms
84+
RecordWildCards
8885

8986
common compile-options
9087
import: default-extensions
@@ -96,6 +93,7 @@ common compile-options
9693
-Wno-missing-exported-signatures
9794
-Wno-missing-import-lists
9895
-Wno-missing-local-signatures
96+
-Wno-missing-role-annotations
9997
-Wno-missing-safe-haskell-mode
10098
-Wno-missed-specialisations
10199
-Wno-all-missed-specialisations
@@ -113,6 +111,71 @@ common compile-options
113111
-Wall-missed-specialisations
114112
-fno-ignore-asserts
115113

114+
common optimization-options
115+
ghc-options: -O2
116+
-fdicts-strict
117+
-fspec-constr-recursive=16
118+
-fmax-worker-args=16
119+
120+
common lib-options
121+
import: compile-options, optimization-options
122+
123+
common test-options
124+
import: compile-options, optimization-options
125+
ghc-options: -with-rtsopts -T
126+
127+
-------------------------------------------------------------------------------
128+
-- Library
129+
-------------------------------------------------------------------------------
130+
131+
library
132+
import: lib-options
133+
134+
hs-source-dirs: lib
135+
exposed-modules:
136+
Streamly.KeyValue.Type
137+
, Streamly.Metrics.Type
138+
, Streamly.Metrics.Perf
139+
, Streamly.Metrics.Perf.Type
140+
, Streamly.Metrics.Perf.RUsage
141+
, Streamly.Metrics.Channel
142+
, Streamly.Metrics.Channel.Unbounded
143+
, Streamly.Metrics.Channel.Common
144+
, Streamly.Metrics.Measure
145+
-- , Streamly.Metrics.File
146+
-- , Streamly.Metrics.Console
147+
148+
build-depends:
149+
base >= 4.9 && < 5
150+
, pretty-show >= 1.10 && < 1.11
151+
, stm >= 2.5.3 && < 2.6
152+
, streamly >= 0.11.0 && < 0.12
153+
, streamly-core >= 0.3.0 && < 0.4
154+
155+
-------------------------------------------------------------------------------
156+
-- Executables
157+
-------------------------------------------------------------------------------
158+
159+
executable hperf
160+
import: compile-options, optimization-options
161+
hs-source-dirs: src
162+
main-is: Main.hs
163+
other-modules: Aggregator, EventParser
164+
build-depends:
165+
base >= 4.9 && < 5
166+
, containers < 0.9
167+
, format-numbers < 0.2
168+
, optparse-applicative < 0.20
169+
, streamly-core >= 0.3.0 && < 0.4
170+
, streamly-statistics < 0.3
171+
, text < 2.2
172+
173+
-------------------------------------------------------------------------------
174+
-- Examples
175+
-------------------------------------------------------------------------------
176+
177+
-- XXX Use examples flag to compile these
178+
116179
executable console-loop-multi-thread
117180
import: compile-options
118181
hs-source-dirs: examples
@@ -124,17 +187,17 @@ executable console-loop-multi-thread
124187
if os(windows)
125188
buildable: False
126189

127-
executable hperf
128-
import: compile-options
129-
hs-source-dirs: src
190+
-------------------------------------------------------------------------------
191+
-- Tests
192+
-------------------------------------------------------------------------------
193+
194+
test-suite basic
195+
import: test-options
196+
type: exitcode-stdio-1.0
197+
hs-source-dirs: test
130198
main-is: Main.hs
131-
other-modules: Aggregator, EventParser
132199
ghc-options: -O2 -fmax-worker-args=16 -fspec-constr-recursive=16
133200
build-depends:
134-
base >= 4.9 && < 5
135-
, containers < 0.9
136-
, optparse-applicative < 0.20
137-
, streamly-core >= 0.3.0 && < 0.4
138-
, streamly-statistics < 0.3
139-
, format-numbers < 0.2
140-
, text < 2.2
201+
base
202+
, haskell-perf
203+
, streamly-core

lib/Streamly/KeyValue/Type.hs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
-- |
2+
-- Module : Streamly.KeyValue.Type
3+
-- Copyright : (c) 2021 Composewell Technologies
4+
-- License : Apache-2.0
5+
-- Maintainer : streamly@composewell.com
6+
-- Stability : experimental
7+
-- Portability : GHC
8+
--
9+
module Streamly.KeyValue.Type
10+
(
11+
-- * KeyValue
12+
KeyValue (..)
13+
, Zip (..)
14+
)
15+
where
16+
17+
import Control.Exception (assert)
18+
import Prelude hiding (zip, zipWith)
19+
20+
-- XXX Use the approach like in https://hackage.haskell.org/package/keys? Or we
21+
-- could use a lens based approach? However, the more we abstract the less
22+
-- comprehensible it becomes.
23+
--
24+
-- XXX We could also use a columnar storage (arrays) to store the keys and
25+
-- values. Further, keys could be stored in boxed arrays whereas the values
26+
-- could be stored in unboxed arrays.
27+
28+
-- | A key value represents a sample of some value labeled with a unique key of
29+
-- type k and having a value of type v.
30+
--
31+
data KeyValue k v = KeyValue !k !v
32+
33+
-------------------------------------------------------------------------------
34+
-- Instances
35+
-------------------------------------------------------------------------------
36+
37+
instance Functor (KeyValue k) where
38+
fmap f (KeyValue k v) = KeyValue k (f v)
39+
40+
-- | Append two key values using the Semigroup instance of the underlying
41+
-- value.
42+
--
43+
instance (Eq k, Semigroup v) => Semigroup (KeyValue k v) where
44+
{-# INLINE (<>) #-}
45+
-- XXX Only KeyValue having the same key make sense to be combined.
46+
-- However, matching the keys at run time would incur some cost depending
47+
-- on the comparison function.
48+
KeyValue k1 v1 <> KeyValue k2 v2 =
49+
assert (k1 == k2) (KeyValue k1 (v1 <> v2))
50+
51+
-- Functors that can be Zipped.
52+
class Functor f => Zip f where
53+
zipWith :: (a -> b -> c) -> f a -> f b -> f c
54+
zipWith f a b = uncurry f <$> zip a b
55+
56+
zip :: f a -> f b -> f (a, b)
57+
zip = zipWith (,)
58+
59+
-- | Zip applicative
60+
zap :: f (a -> b) -> f a -> f b
61+
zap = zipWith id
62+
63+
instance Eq k => Zip (KeyValue k) where
64+
zipWith f (KeyValue k1 v1) (KeyValue k2 v2) =
65+
assert (k1 == k2) $ KeyValue k1 (f v1 v2)

lib/Streamly/Metrics/Channel.hs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
module Streamly.Metrics.Channel
2+
(
3+
Channel
4+
, newChannel
5+
, send
6+
, printChannel
7+
, forkChannelPrinter
8+
, benchOn
9+
, benchOnWith
10+
)
11+
where
12+
13+
import Control.Concurrent (forkIO, ThreadId)
14+
import Control.Concurrent.STM (atomically)
15+
import Control.Concurrent.STM.TBQueue
16+
(TBQueue, newTBQueue, readTBQueue, writeTBQueue)
17+
import Control.Monad.IO.Class (liftIO, MonadIO)
18+
import Data.Bifunctor (second)
19+
import Data.Function ((&))
20+
import Data.Maybe (fromJust, isJust)
21+
import Streamly.Data.Stream (Stream)
22+
import Streamly.Internal.Data.Time.Clock (getTime, Clock (Monotonic))
23+
import Streamly.Internal.Data.Time.Units (AbsTime)
24+
import Streamly.Metrics.Perf.Type (PerfMetrics(..))
25+
import Streamly.Metrics.Perf (benchWith)
26+
import Streamly.Metrics.Type (showList, Indexable)
27+
import Streamly.Data.Stream.Prelude (MonadAsync)
28+
29+
import qualified Streamly.Data.Fold as Fold
30+
import qualified Streamly.Internal.Data.Fold as Fold
31+
import qualified Streamly.Data.Stream as Stream
32+
import qualified Streamly.Internal.Data.Stream.Prelude as Stream
33+
34+
import Prelude hiding (showList)
35+
36+
-------------------------------------------------------------------------------
37+
-- Event processing
38+
-------------------------------------------------------------------------------
39+
40+
-- XXX Use streamly SVar instead so that we do not need STM and we can use just
41+
-- one channel type.
42+
43+
-- | A metrics channel.
44+
newtype Channel a = Channel (TBQueue (AbsTime, ([Char], [a])))
45+
46+
-- | Create a new metrics channel.
47+
newChannel :: IO (Channel a)
48+
newChannel = atomically $ do
49+
tbq <- newTBQueue 1
50+
return $ Channel tbq
51+
52+
-- | Send a list of metrics to a metrics channel.
53+
-- @send channel description metrics@
54+
send :: MonadIO m => Channel a -> String -> [a] -> m ()
55+
send (Channel chan) desc metrics = do
56+
-- XXX should use asyncClock
57+
now <- liftIO $ getTime Monotonic
58+
liftIO $ atomically $ writeTBQueue chan (now, (desc, metrics))
59+
60+
fromChan :: MonadAsync m => TBQueue a -> Stream m a
61+
fromChan = Stream.repeatM . (liftIO . atomically . readTBQueue)
62+
63+
aggregateListBy :: (MonadAsync m, Ord k, Fractional a) =>
64+
Double -> Int -> Stream m (AbsTime, (k, [a])) -> Stream m (k, [a])
65+
aggregateListBy timeout batchsize stream =
66+
fmap (second fromJust)
67+
$ Stream.filter (isJust . snd)
68+
$ Stream.classifySessionsBy
69+
0.1 False (return . (> 1000)) timeout f stream
70+
71+
where
72+
73+
scale Nothing _ = Nothing
74+
scale (Just xs) count = Just $ map (/ count) xs
75+
76+
f =
77+
Fold.teeWithFst
78+
scale
79+
(Fold.take batchsize (Fold.foldl1' (zipWith (+))))
80+
(Fold.lmap (const 1) Fold.sum)
81+
82+
printKV :: (MonadIO m, Show k, Show a, Indexable a) => Stream m (k, [a]) -> m b
83+
printKV stream =
84+
let f (k, xs) = liftIO $ putStrLn $ show k ++ ":\n" ++ showList xs
85+
in Stream.fold (Fold.drainMapM f) stream >> error "printChannel: Metrics channel closed"
86+
87+
-- XXX Print actual batch size and also scale the results per event.
88+
89+
-- | Forever print the metrics on a channel to the console periodically after
90+
-- aggregating the metrics collected till now.
91+
printChannel :: (MonadAsync m, Show a, Fractional a, Indexable a) =>
92+
Channel a -> Double -> Int -> m b
93+
printChannel (Channel chan) timeout batchSize =
94+
fromChan chan
95+
& aggregateListBy timeout batchSize
96+
& printKV
97+
98+
forkChannelPrinter :: (MonadAsync m, Show a, Fractional a, Indexable a) =>
99+
Channel a -> Double -> Int -> m ThreadId
100+
forkChannelPrinter chan timeout = liftIO . forkIO . printChannel chan timeout
101+
102+
-- | Benchmark a function application and send the results to the specified
103+
-- metrics channel.
104+
benchOnWith :: Channel PerfMetrics -> String -> (a -> IO b) -> a -> IO b
105+
benchOnWith chan desc f arg = do
106+
(r, xs) <- benchWith f arg
107+
send chan desc (Count 1 : xs)
108+
return r
109+
110+
-- | Like 'benchOnWith' but benchmark an action instead of function
111+
-- application.
112+
benchOn :: Channel PerfMetrics -> String -> IO b -> IO b
113+
benchOn chan desc f = benchOnWith chan desc (const f) ()
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
module Streamly.Metrics.Channel.Common
2+
(
3+
aggregateListBy
4+
, printKV
5+
)
6+
where
7+
8+
import Control.Monad.IO.Class (liftIO, MonadIO)
9+
import Data.Bifunctor (second)
10+
import Data.Maybe (fromJust, isJust)
11+
import Streamly.Internal.Data.Time.Units (AbsTime)
12+
import Streamly.Metrics.Type (showList, Indexable)
13+
import Streamly.Data.Stream (Stream)
14+
import Streamly.Data.Stream.Prelude (MonadAsync)
15+
16+
import qualified Streamly.Internal.Data.Fold as Fold
17+
import qualified Streamly.Data.Stream as Stream
18+
import qualified Streamly.Internal.Data.Stream.Prelude as Stream
19+
20+
import Prelude hiding (showList)
21+
22+
-------------------------------------------------------------------------------
23+
-- Event processing
24+
-------------------------------------------------------------------------------
25+
26+
aggregateListBy :: (MonadAsync m, Ord k, Fractional a) =>
27+
Double -> Int -> Stream m (AbsTime, (k, [a])) -> Stream m (k, [a])
28+
aggregateListBy timeout batchsize stream =
29+
fmap (second fromJust)
30+
$ Stream.filter (isJust . snd)
31+
$ Stream.classifySessionsBy
32+
0.1 False (return . (> 1000)) timeout f stream
33+
34+
where
35+
36+
scale Nothing _ = Nothing
37+
scale (Just xs) count = Just $ map (/ count) xs
38+
39+
f =
40+
Fold.teeWithFst
41+
scale
42+
(Fold.take batchsize (Fold.foldl1' (zipWith (+))))
43+
(Fold.lmap (const 1) Fold.sum)
44+
45+
printKV :: (MonadIO m, Show k, Show a, Indexable a) => Stream m (k, [a]) -> m b
46+
printKV stream =
47+
let f (k, xs) = liftIO $ putStrLn $ show k ++ ":\n" ++ showList xs
48+
in Stream.fold (Fold.drainMapM f) stream >> error "printChannel: Metrics channel closed"

0 commit comments

Comments
 (0)