Skip to content

Commit f2fa4f9

Browse files
adithyaovAdithya Obilisetty
authored andcommitted
Add an executable to collect live statitics
1 parent c9b1357 commit f2fa4f9

3 files changed

Lines changed: 327 additions & 0 deletions

File tree

cabal.project.user

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,8 @@ source-repository-package
55
location: https://github.com/composewell/streamly.git
66
tag: master
77
subdir: core
8+
9+
source-repository-package
10+
type: git
11+
location: https://github.com/composewell/streamly.git
12+
tag: master

haskell-perf.cabal

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,18 @@ executable hperf
117117
, streamly-core == 0.2.0
118118
, format-numbers
119119
, text
120+
121+
executable stat-collector
122+
import: compile-options
123+
hs-source-dirs: stat-collector-src
124+
main-is: Main.hs
125+
ghc-options: -O2 -fmax-worker-args=16 -fspec-constr-recursive=16
126+
build-depends:
127+
base
128+
, streamly-core
129+
, streamly
130+
, containers
131+
, ansi-terminal
132+
, format-numbers
133+
, unix
134+
, text

stat-collector-src/Main.hs

Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
{-# LANGUAGE QuasiQuotes #-}
2+
3+
module Main (main) where
4+
5+
--------------------------------------------------------------------------------
6+
-- Imports
7+
--------------------------------------------------------------------------------
8+
9+
-- import Control.Concurrent (threadDelay)
10+
import Data.Int (Int32, Int64)
11+
import System.Environment (getArgs)
12+
import Control.Monad.IO.Class (MonadIO(..))
13+
import Data.Function ((&))
14+
import Data.List (foldl', findIndex, sortBy, find)
15+
import Data.Map (Map)
16+
import Data.Maybe (fromMaybe, fromJust)
17+
import Data.Word (Word8)
18+
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
19+
import Foreign.Storable (Storable, peek)
20+
import Numeric (showFFloat)
21+
import Streamly.Data.Array (Array)
22+
import Streamly.Data.Fold (Fold)
23+
import Streamly.Data.Stream (Stream)
24+
import Streamly.Internal.Data.Fold (Fold(..), Step(..))
25+
import Streamly.Internal.Data.Ring (slidingWindow)
26+
import Streamly.Internal.Data.Tuple.Strict (Tuple3Fused' (Tuple3Fused'))
27+
import Streamly.Unicode.String (str)
28+
import System.IO (hFlush, stdout, stdin)
29+
import Text.Read (readMaybe)
30+
import System.Posix.Signals (installHandler, Handler(Catch), sigINT, sigTERM)
31+
import Data.Text.Format.Numbers (prettyI)
32+
33+
import qualified Data.Text as Text
34+
import qualified Data.Map as Map
35+
import qualified Streamly.Data.Fold as Fold
36+
import qualified Streamly.Data.Stream.Prelude as Stream
37+
import qualified Streamly.FileSystem.Handle as Handle
38+
import qualified Streamly.Internal.Data.Fold as Fold
39+
import qualified Streamly.Internal.Data.Ring as Ring
40+
import qualified Streamly.Unicode.Stream as Unicode
41+
import qualified System.Console.ANSI as ANSI
42+
43+
--------------------------------------------------------------------------------
44+
-- Utils
45+
--------------------------------------------------------------------------------
46+
47+
double :: Int -> Double
48+
double = fromIntegral
49+
50+
--------------------------------------------------------------------------------
51+
-- Types
52+
--------------------------------------------------------------------------------
53+
54+
data Boundary a b
55+
= Start a
56+
| End a (Maybe b)
57+
deriving (Read, Show, Ord, Eq)
58+
59+
getWindowId :: Boundary WindowId Label -> WindowId
60+
getWindowId (Start a) = a
61+
getWindowId (End a _) = a
62+
63+
data Counter
64+
= ThreadCpuTime
65+
| ProcessCpuTime
66+
| WallClockTime
67+
| Allocated
68+
| SchedOut
69+
deriving (Read, Show, Ord, Eq)
70+
71+
type WindowId = String
72+
type Label = String
73+
type ThreadId = Int32
74+
type Tag = String
75+
type Value = Int64
76+
77+
data EventId =
78+
EventId
79+
{ evTid :: ThreadId
80+
, evCounter :: Counter
81+
, evTag :: Tag
82+
}
83+
deriving (Eq, Ord, Show)
84+
85+
data UnboundedEvent
86+
= UEvent (Boundary WindowId Label) ThreadId Counter Value
87+
deriving (Show)
88+
89+
data Event
90+
= Event EventId Value
91+
deriving (Show)
92+
93+
getEventId :: Event -> EventId
94+
getEventId (Event evId _) = evId
95+
96+
getEventVal :: Event -> Value
97+
getEventVal (Event _ evVal) = evVal
98+
99+
--------------------------------------------------------------------------------
100+
-- Folds
101+
--------------------------------------------------------------------------------
102+
103+
statsLayout :: [String]
104+
statsLayout =
105+
[ "latest", "total", "count", "avg", "minimum", "maximum", "stddev"]
106+
107+
{-# INLINE stats #-}
108+
stats :: Fold IO Int64 [(String, Int)]
109+
stats =
110+
Fold.lmap (fromIntegral :: Int64 -> Int)
111+
$ Fold.distribute
112+
[ fmap (\x -> ("latest", fromJust x)) Fold.latest
113+
, fmap (\x -> ("total", x)) Fold.sum
114+
, fmap (\x -> ("count", x)) Fold.length
115+
, fmap (\x -> ("avg", round x)) (Fold.lmap double Fold.mean)
116+
, fmap (\x -> ("minimum", fromJust x)) Fold.minimum
117+
, fmap (\x -> ("maximum", fromJust x)) Fold.maximum
118+
, fmap (\x -> ("stddev", round x)) (Fold.lmap double Fold.stdDev)
119+
]
120+
121+
--------------------------------------------------------------------------------
122+
-- Parsing Input
123+
--------------------------------------------------------------------------------
124+
125+
-- Event format:
126+
-- Start/<window-id>/<label>/<tid>/<counterName>/<value>
127+
-- End/<window-id>/<label>/<tid>/<counterName/<value>
128+
129+
errorString :: String -> String -> String
130+
errorString line reason = [str|Error:
131+
Line: #{line}
132+
Reason: #{reason}
133+
|]
134+
135+
parseLineToEvent :: Monad m => String -> m (Either String UnboundedEvent)
136+
parseLineToEvent line = do
137+
res <-
138+
Stream.fromList line
139+
& Stream.foldMany (Fold.takeEndBy_ (== '/') Fold.toList)
140+
& Stream.toList
141+
case res of
142+
["Start", windowId, tid, counter, val] ->
143+
case withParsed (UEvent (Start windowId)) tid counter val of
144+
Just val -> pure $ Right val
145+
Nothing -> pure $ Left $ errorString line "Not valid"
146+
["End", windowId, tid, counter, val] ->
147+
case withParsed (UEvent (End windowId Nothing)) tid counter val of
148+
Just val -> pure $ Right val
149+
Nothing -> pure $ Left $ errorString line "Not valid"
150+
["End", windowId, label, tid, counter, val] ->
151+
case withParsed (UEvent (End windowId (Just label))) tid counter val of
152+
Just val -> pure $ Right val
153+
Nothing -> pure $ Left $ errorString line "Not valid"
154+
_ -> pure $ Left $ errorString line "Chunks /= 4"
155+
156+
where
157+
158+
withParsed
159+
:: (ThreadId -> Counter -> Value -> UnboundedEvent)
160+
-> String
161+
-> String
162+
-> String
163+
-> Maybe UnboundedEvent
164+
withParsed func tid counter val =
165+
func <$> readMaybe tid <*> readMaybe counter <*> readMaybe val
166+
167+
parseInputToEventStream
168+
:: MonadIO m => Stream m (Array Word8) -> Stream m UnboundedEvent
169+
parseInputToEventStream inp =
170+
Unicode.decodeUtf8Chunks inp
171+
& Stream.foldMany
172+
(Fold.takeEndBy_
173+
(== '\n')
174+
(Fold.rmapM parseLineToEvent Fold.toList))
175+
& Stream.catRights
176+
177+
--------------------------------------------------------------------------------
178+
-- Processing stats
179+
--------------------------------------------------------------------------------
180+
181+
boundEvents :: Monad m => Fold m UnboundedEvent (Maybe Event)
182+
boundEvents = Fold step initial extract extract
183+
where
184+
initial = pure $ Partial (Nothing, Map.empty)
185+
186+
alterFunc :: UnboundedEvent -> Maybe Value -> (Maybe Event, Maybe Value)
187+
alterFunc (UEvent (Start _) _ _ val) Nothing = (Nothing, Just val)
188+
alterFunc (UEvent (Start _) _ _ val) (Just _) = (Nothing, Just val)
189+
alterFunc (UEvent (End w Nothing) tid counter val) (Just prevVal) =
190+
( Just (Event (EventId tid counter w) (val - prevVal))
191+
, Nothing
192+
)
193+
alterFunc (UEvent (End w (Just tag)) tid counter val) (Just prevVal) =
194+
( Just (Event (EventId tid counter (w ++ ":" ++ tag)) (val - prevVal))
195+
, Just prevVal
196+
)
197+
alterFunc _ Nothing = (Nothing, Nothing)
198+
199+
step (_, mp) uev@(UEvent b tid counter _) =
200+
pure $ Partial
201+
$ Map.alterF (alterFunc uev) (getWindowId b, tid, counter) mp
202+
203+
extract (ev, _) = pure ev
204+
205+
statCollector :: Fold IO Event (Map EventId [(String, Int)])
206+
statCollector =
207+
Fold.demuxToMap getEventId deriveFold
208+
209+
where
210+
211+
deriveFold ev = pure (Fold.lmap getEventVal stats)
212+
213+
--------------------------------------------------------------------------------
214+
-- Printing stats
215+
--------------------------------------------------------------------------------
216+
217+
fill :: Int -> String -> String
218+
fill i x =
219+
let len = length x
220+
in replicate (i - len) ' ' ++ x
221+
222+
printTable :: [[String]] -> IO ()
223+
printTable rows = do
224+
case map (unwords . fillRow) rows of
225+
[] -> putStrLn "printTable: empty rows"
226+
(header:rest) -> putStrLn $ unlines $ header:unwords separatorRow:rest
227+
228+
where
229+
230+
rowLengths = map (map length) rows -- [[Int]]
231+
maxLengths = foldl' (zipWith max) (head rowLengths) rowLengths
232+
separatorRow = map (\n -> replicate n '-') maxLengths
233+
fillRow r = zipWith (\n x -> fill n x) maxLengths r
234+
235+
printStatsMap
236+
:: (Show a, Show b, Show c, Ord a, Ord b, Ord c)
237+
=> (EventId -> a)
238+
-> (EventId -> b)
239+
-> (EventId -> c)
240+
-> Map EventId [(String, Int)]
241+
-> IO ()
242+
printStatsMap index1 index2 index3 mp =
243+
mapM_ printOneTable $ Map.toList $ anchorOnTidAndCounter mp
244+
245+
where
246+
247+
alterFunction v Nothing = Just [v]
248+
alterFunction v (Just v0) = Just (v:v0)
249+
250+
foldingFunction mp ev v =
251+
Map.alter (alterFunction (index3 ev, v)) (index1 ev, index2 ev) mp
252+
253+
anchorOnTidAndCounter mp =
254+
Map.foldlWithKey' foldingFunction Map.empty mp
255+
256+
printOneTable ((i1, i2), rows) = do
257+
let i1Str = show i1
258+
i2Str = show i2
259+
headingL1 = [str|Index1: #{i1Str}|]
260+
headingL2 = [str|Index2: #{i2Str}|]
261+
divider = replicate (max (length headingL2) (length headingL1)) '-'
262+
putStrLn divider
263+
putStrLn headingL1
264+
putStrLn headingL2
265+
putStrLn divider
266+
putStrLn ""
267+
printTable
268+
$ (:) tableHeader
269+
$ map (\(i3, v) -> show i3 : map (pShowInt . snd) v) rows
270+
putStrLn ""
271+
272+
pShowInt = Text.unpack . prettyI (Just ',')
273+
274+
tableHeader = "Index3":statsLayout
275+
276+
--------------------------------------------------------------------------------
277+
-- Main
278+
--------------------------------------------------------------------------------
279+
280+
main :: IO ()
281+
main = do
282+
statsMap <-
283+
Stream.unfold Handle.chunkReader stdin
284+
& parseInputToEventStream
285+
& Stream.scan boundEvents
286+
& Stream.catMaybes
287+
& Stream.fold statCollector
288+
(arg:[]) <- getArgs
289+
case arg of
290+
"Tag" ->
291+
printStatsMap
292+
(evTid)
293+
(evCounter)
294+
(evTag)
295+
statsMap
296+
"Counter" ->
297+
printStatsMap
298+
(evTid)
299+
(evTag)
300+
(evCounter)
301+
statsMap
302+
"ThreadId" ->
303+
printStatsMap
304+
(evCounter)
305+
(evTag)
306+
(evTid)
307+
statsMap

0 commit comments

Comments
 (0)