11{-# LANGUAGE NamedFieldPuns #-}
22{-# LANGUAGE OverloadedStrings #-}
33{-# LANGUAGE TupleSections #-}
4+ {-# LANGUAGE ViewPatterns #-}
45
56{-# OPTIONS_GHC -Wno-redundant-constraints #-}
67
@@ -9,29 +10,38 @@ module Cardano.Tracer.Acceptors.Utils
910 , prepareMetricsStores
1011 , removeDisconnectedNode
1112 , notifyAboutNodeDisconnected
13+ , store
1214 ) where
1315
1416#if RTVIEW
1517import Cardano.Logging (SeverityS (.. ))
18+ #endif
19+ import qualified Cardano.Timeseries.Component as Timeseries
20+ import Cardano.Timeseries.Domain.Types (MetricIdentifier )
21+ import Cardano.Tracer.Environment
22+ #if RTVIEW
1623import Cardano.Tracer.Handlers.Notifications.Types
1724import Cardano.Tracer.Handlers.Notifications.Utils
1825#endif
19- import Cardano.Tracer.Environment
26+ import Cardano.Tracer.Time ( getTimeMs )
2027import Cardano.Tracer.Types
2128import Cardano.Tracer.Utils
2229import Ouroboros.Network.Socket (ConnectionId (.. ))
2330
2431import Control.Concurrent.STM (atomically )
2532import Control.Concurrent.STM.TVar (TVar , modifyTVar' , newTVarIO )
2633import qualified Data.Bimap as BM
34+ import Data.Foldable
2735import qualified Data.Map.Strict as M
36+ import Data.Maybe (mapMaybe )
2837import qualified Data.Set as S
29- import Data.Time.Clock.POSIX (getPOSIXTime )
3038#if RTVIEW
3139import Data.Time.Clock.System (getSystemTime , systemToUTCTime )
3240#endif
3341import qualified System.Metrics as EKG
34- import System.Metrics.Store.Acceptor (MetricsLocalStore , emptyMetricsLocalStore )
42+ import System.Metrics.ReqResp
43+ import System.Metrics.Store.Acceptor (MetricsLocalStore , emptyMetricsLocalStore ,
44+ storeMetrics )
3545
3646import Trace.Forward.Utils.DataPoint (DataPointRequestor , initDataPointRequestor )
3747
@@ -54,26 +64,17 @@ prepareMetricsStores
5464 -> IO (EKG. Store , TVar MetricsLocalStore )
5565prepareMetricsStores TracerEnv {teConnectedNodes, teAcceptedMetrics} connId = do
5666 addConnectedNode teConnectedNodes connId
57- store <- EKG. newStore
67+ st <- EKG. newStore
5868
59- EKG. registerCounter " ekg.server_timestamp_ms" getTimeMs store
60- storesForNewNode <- (store ,) <$> newTVarIO emptyMetricsLocalStore
69+ EKG. registerCounter " ekg.server_timestamp_ms" getTimeMs st
70+ storesForNewNode <- (st ,) <$> newTVarIO emptyMetricsLocalStore
6171
6272 atomically do
6373 modifyTVar' teAcceptedMetrics do
6474 M. insert (connIdToNodeId connId) storesForNewNode
6575
6676 return storesForNewNode
6777
68- where
69- -- forkServer definition of `getTimeMs'. The ekg frontend relies
70- -- on the "ekg.server_timestamp_ms" metric being in every
71- -- store. While forkServer adds that that automatically we must
72- -- manually add it.
73- -- url
74- -- + https://github.com/tvh/ekg-wai/blob/master/System/Remote/Monitoring/Wai.hs#L237-L238
75- getTimeMs = (round . (* 1000 )) `fmap` getPOSIXTime
76-
7778addConnectedNode
7879 :: Show addr
7980 => ConnectedNodes
@@ -115,3 +116,21 @@ notifyAboutNodeDisconnected TracerEnvRTView{teEventsQueues} connId = do
115116#else
116117notifyAboutNodeDisconnected _ _ = pure ()
117118#endif
119+
120+ store :: TracerEnv -> NodeId -> (EKG. Store , TVar MetricsLocalStore ) -> Response -> IO ()
121+ store tracerEnv (NodeId nodeId) (ekgStore, localStore) resp@ (ResponseMetrics ms) = do
122+ storeMetrics resp ekgStore localStore
123+ ts <- getTimeMs
124+ for_ (teTimeseriesHandle tracerEnv) $ \ h ->
125+ Timeseries. insert h " node_id" nodeId (fromIntegral ts) (mapMaybe parseMetric ms)
126+
127+ where
128+ numeralOnly :: MetricValue -> Maybe Double
129+ numeralOnly (GaugeValue x) = Just (fromIntegral x)
130+ numeralOnly (CounterValue x) = Just (fromIntegral x)
131+ numeralOnly (LabelValue _) = Nothing
132+
133+ parseMetric :: (MetricName , MetricValue ) -> Maybe (MetricIdentifier , Double )
134+ parseMetric (k, numeralOnly -> Just v) = Just (k, v)
135+ parseMetric _ = Nothing
136+
0 commit comments